#include #include #include #include #include #include #include #include /* function to be executed by a worker, where id is an integer in [0, # workers) out_fd is the writing end of the pipeline for the results, and control_fd the writing end of the control pipe */ typedef void (*JobHandler)(unsigned int id, int out_fd, int control_fd); typedef struct Worker { pid_t pid; int fd; /* reading end of the pipeline */ } Worker; int spawn_workers(Worker workers[], unsigned int number_of_workers, JobHandler handler) { int controlfds[2]; if (pipe(controlfds) < 0) return -1; for (unsigned int i = 0; i < number_of_workers; ++i) { int pipefds[2]; if (pipe(pipefds) < 0) return false; pid_t child = fork(); if (child < 0) return false; if (child == 0) { close(pipefds[0]); close(controlfds[0]); for (unsigned int j = 0; j < i; ++j) { close(workers[i].fd); } handler(i, pipefds[1], controlfds[1]); exit(0); } close(pipefds[1]); int flags = fcntl(pipefds[0], F_GETFL) | O_NONBLOCK; fcntl(pipefds[0], F_SETFL, flags); workers[i] = (Worker) { .pid = child, .fd = pipefds[0], }; } close(controlfds[1]); return controlfds[0]; } bool signal_output(unsigned int id, int control_fd) { return write(control_fd, &id, sizeof id) == sizeof id; } void do_sth(unsigned int id, int out_fd, int control_fd) { srand(getpid()); outbuf out = {out_fd}; unsigned int count = 0; for(;;) { outbuf_printf(&out, "Greetings #%u from worker %u!\n", ++count, id); outbuf_flush(&out); if (!signal_output(id, control_fd)) break; if (rand() % 5 == 0) break; int timeout = 100 + rand() % 500; poll(0, 0, timeout); } } bool copy(int in, int out) { char buf[8192]; ssize_t nbytes; while ((nbytes = read(in, buf, sizeof buf)) > 0) { ssize_t written = 0; while (written < nbytes) { ssize_t count = write(out, buf + written, nbytes - written); if (count <= 0) return false; written += count; } } return nbytes == 0; } #define WORKERS (10) int main() { Worker workers[WORKERS]; int control_fd = spawn_workers(workers, WORKERS, do_sth); if (control_fd < 0) { perror("spawn_workers"); exit(1); } unsigned int id; while (read(control_fd, &id, sizeof id) == sizeof id) { copy(workers[id].fd, 1); } close(control_fd); for (unsigned id = 0; id < WORKERS; ++id) { close(workers[id].fd); int wstat; waitpid(workers[id].pid, &wstat, 0); } }