Beispiellösung

Content

#include <fcntl.h>
#include <poll.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/wait.h>
#include <unistd.h>
#include <afblib/outbuf_printf.h>

/* function to be executed by a worker,
   where
      id            is an integer in [0, # workers)
      out_fd        is the writing end of the pipeline for the results, and
      control_fd    the writing end of the control pipe */
typedef void (*JobHandler)(unsigned int id, int out_fd, int control_fd);

typedef struct Worker {
   pid_t pid;
   int fd; /* reading end of the pipeline */
} Worker;

int spawn_workers(Worker workers[], unsigned int number_of_workers,
      JobHandler handler) {
   int controlfds[2];
   if (pipe(controlfds) < 0) return -1;
   for (unsigned int i = 0; i < number_of_workers; ++i) {
      int pipefds[2];
      if (pipe(pipefds) < 0) return false;
      pid_t child = fork();
      if (child < 0) return false;
      if (child == 0) {
	 close(pipefds[0]); close(controlfds[0]);
	 for (unsigned int j = 0; j < i; ++j) {
	    close(workers[i].fd);
	 }
	 handler(i, pipefds[1], controlfds[1]);
	 exit(0);
      }
      close(pipefds[1]);
      int flags = fcntl(pipefds[0], F_GETFL) | O_NONBLOCK;
      fcntl(pipefds[0], F_SETFL, flags);
      workers[i] = (Worker) {
	 .pid = child,
	 .fd = pipefds[0],
      };
   }
   close(controlfds[1]);
   return controlfds[0];
}

bool signal_output(unsigned int id, int control_fd) {
   return write(control_fd, &id, sizeof id) == sizeof id;
}

void do_sth(unsigned int id, int out_fd, int control_fd) {
   srand(getpid());
   outbuf out = {out_fd};
   unsigned int count = 0;
   for(;;) {
      outbuf_printf(&out, "Greetings #%u from worker %u!\n", ++count, id);
      outbuf_flush(&out);
      if (!signal_output(id, control_fd)) break;
      if (rand() % 5 == 0) break;
      int timeout = 100 + rand() % 500;
      poll(0, 0, timeout);
   }
}

bool copy(int in, int out) {
   char buf[8192];
   ssize_t nbytes;
   while ((nbytes = read(in, buf, sizeof buf)) > 0) {
      ssize_t written = 0;
      while (written < nbytes) {
	 ssize_t count = write(out, buf + written, nbytes - written);
	 if (count <= 0) return false;
	 written += count;
      }
   }
   return nbytes == 0;
}

#define WORKERS (10)

int main() {
   Worker workers[WORKERS];
   int control_fd = spawn_workers(workers, WORKERS, do_sth);
   if (control_fd < 0) {
      perror("spawn_workers"); exit(1);
   }
   unsigned int id;
   while (read(control_fd, &id, sizeof id) == sizeof id) {
      copy(workers[id].fd, 1);
   }
   close(control_fd);
   for (unsigned id = 0; id < WORKERS; ++id) {
      close(workers[id].fd);
      int wstat; waitpid(workers[id].pid, &wstat, 0);
   }
}
theon$ gcc -Wall -o fork-and-join2 fork-and-join2.c -lafb -lowfat
fork-and-join2.c:8:10: fatal error: afblib/outbuf_printf.h: No such file or directory
    8 | #include <afblib/outbuf_printf.h>
      |          ^~~~~~~~~~~~~~~~~~~~~~~~
compilation terminated.
theon$ ./fork-and-join2
Greetings #1 from worker 0!
Greetings #1 from worker 1!
Greetings #1 from worker 4!
Greetings #1 from worker 7!
Greetings #1 from worker 2!
Greetings #1 from worker 3!
Greetings #1 from worker 5!
Greetings #1 from worker 6!
Greetings #1 from worker 8!
Greetings #1 from worker 9!
Greetings #2 from worker 0!
Greetings #2 from worker 6!
Greetings #2 from worker 1!
Greetings #2 from worker 7!
Greetings #2 from worker 9!
Greetings #2 from worker 4!
Greetings #3 from worker 7!
Greetings #3 from worker 6!
Greetings #2 from worker 5!
Greetings #2 from worker 2!
Greetings #2 from worker 8!
Greetings #2 from worker 3!
Greetings #3 from worker 1!
Greetings #3 from worker 0!
Greetings #3 from worker 9!
Greetings #3 from worker 3!
Greetings #3 from worker 2!
Greetings #4 from worker 3!
Greetings #3 from worker 8!
Greetings #5 from worker 3!
Greetings #6 from worker 3!
theon$