#include <fcntl.h>
#include <poll.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/wait.h>
#include <unistd.h>
#include <afblib/outbuf_printf.h>
/* function to be executed by a worker,
where
id is an integer in [0, # workers)
out_fd is the writing end of the pipeline for the results, and
control_fd the writing end of the control pipe */
typedef void (*JobHandler)(unsigned int id, int out_fd, int control_fd);
typedef struct Worker {
pid_t pid;
int fd; /* reading end of the pipeline */
} Worker;
int spawn_workers(Worker workers[], unsigned int number_of_workers,
JobHandler handler) {
int controlfds[2];
if (pipe(controlfds) < 0) return -1;
for (unsigned int i = 0; i < number_of_workers; ++i) {
int pipefds[2];
if (pipe(pipefds) < 0) return false;
pid_t child = fork();
if (child < 0) return false;
if (child == 0) {
close(pipefds[0]); close(controlfds[0]);
for (unsigned int j = 0; j < i; ++j) {
close(workers[i].fd);
}
handler(i, pipefds[1], controlfds[1]);
exit(0);
}
close(pipefds[1]);
int flags = fcntl(pipefds[0], F_GETFL) | O_NONBLOCK;
fcntl(pipefds[0], F_SETFL, flags);
workers[i] = (Worker) {
.pid = child,
.fd = pipefds[0],
};
}
close(controlfds[1]);
return controlfds[0];
}
bool signal_output(unsigned int id, int control_fd) {
return write(control_fd, &id, sizeof id) == sizeof id;
}
void do_sth(unsigned int id, int out_fd, int control_fd) {
srand(getpid());
outbuf out = {out_fd};
unsigned int count = 0;
for(;;) {
outbuf_printf(&out, "Greetings #%u from worker %u!\n", ++count, id);
outbuf_flush(&out);
if (!signal_output(id, control_fd)) break;
if (rand() % 5 == 0) break;
int timeout = 100 + rand() % 500;
poll(0, 0, timeout);
}
}
bool copy(int in, int out) {
char buf[8192];
ssize_t nbytes;
while ((nbytes = read(in, buf, sizeof buf)) > 0) {
ssize_t written = 0;
while (written < nbytes) {
ssize_t count = write(out, buf + written, nbytes - written);
if (count <= 0) return false;
written += count;
}
}
return nbytes == 0;
}
#define WORKERS (10)
int main() {
Worker workers[WORKERS];
int control_fd = spawn_workers(workers, WORKERS, do_sth);
if (control_fd < 0) {
perror("spawn_workers"); exit(1);
}
unsigned int id;
while (read(control_fd, &id, sizeof id) == sizeof id) {
copy(workers[id].fd, 1);
}
close(control_fd);
for (unsigned id = 0; id < WORKERS; ++id) {
close(workers[id].fd);
int wstat; waitpid(workers[id].pid, &wstat, 0);
}
}