1
      2
      3
      4
      5
      6
      7
      8
      9
     10
     11
     12
     13
     14
     15
     16
     17
     18
     19
     20
     21
     22
     23
     24
     25
     26
     27
     28
     29
     30
     31
     32
     33
     34
     35
     36
     37
     38
     39
     40
     41
     42
     43
     44
     45
     46
     47
     48
     49
     50
     51
     52
     53
     54
     55
     56
     57
     58
     59
     60
     61
     62
     63
     64
     65
     66
     67
     68
     69
     70
     71
     72
     73
     74
     75
     76
     77
     78
     79
     80
     81
     82
     83
     84
     85
     86
     87
     88
     89
     90
     91
     92
     93
     94
     95
     96
     97
     98
     99
     100
#include <fcntl.h>
#include <poll.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/wait.h>
#include <unistd.h>
#include <afblib/outbuf_printf.h>

/* function to be executed by a worker,
   where
      id            is an integer in [0, # workers)
      out_fd        is the writing end of the pipeline for the results, and
      control_fd    the writing end of the control pipe */
typedef void (*JobHandler)(unsigned int id, int out_fd, int control_fd);

typedef struct Worker {
   pid_t pid;
   int fd; /* reading end of the pipeline */
} Worker;

int spawn_workers(Worker workers[], unsigned int number_of_workers,
      JobHandler handler) {
   int controlfds[2];
   if (pipe(controlfds) < 0) return -1;
   for (unsigned int i = 0; i < number_of_workers; ++i) {
      int pipefds[2];
      if (pipe(pipefds) < 0) return false;
      pid_t child = fork();
      if (child < 0) return false;
      if (child == 0) {
	 close(pipefds[0]); close(controlfds[0]);
	 for (unsigned int j = 0; j < i; ++j) {
	    close(workers[i].fd);
	 }
	 handler(i, pipefds[1], controlfds[1]);
	 exit(0);
      }
      close(pipefds[1]);
      int flags = fcntl(pipefds[0], F_GETFL) | O_NONBLOCK;
      fcntl(pipefds[0], F_SETFL, flags);
      workers[i] = (Worker) {
	 .pid = child,
	 .fd = pipefds[0],
      };
   }
   close(controlfds[1]);
   return controlfds[0];
}

bool signal_output(unsigned int id, int control_fd) {
   return write(control_fd, &id, sizeof id) == sizeof id;
}

void do_sth(unsigned int id, int out_fd, int control_fd) {
   srand(getpid());
   outbuf out = {out_fd};
   unsigned int count = 0;
   for(;;) {
      outbuf_printf(&out, "Greetings #%u from worker %u!\n", ++count, id);
      outbuf_flush(&out);
      if (!signal_output(id, control_fd)) break;
      if (rand() % 5 == 0) break;
      int timeout = 100 + rand() % 500;
      poll(0, 0, timeout);
   }
}

bool copy(int in, int out) {
   char buf[8192];
   ssize_t nbytes;
   while ((nbytes = read(in, buf, sizeof buf)) > 0) {
      ssize_t written = 0;
      while (written < nbytes) {
	 ssize_t count = write(out, buf + written, nbytes - written);
	 if (count <= 0) return false;
	 written += count;
      }
   }
   return nbytes == 0;
}

#define WORKERS (10)

int main() {
   Worker workers[WORKERS];
   int control_fd = spawn_workers(workers, WORKERS, do_sth);
   if (control_fd < 0) {
      perror("spawn_workers"); exit(1);
   }
   unsigned int id;
   while (read(control_fd, &id, sizeof id) == sizeof id) {
      copy(workers[id].fd, 1);
   }
   close(control_fd);
   for (unsigned id = 0; id < WORKERS; ++id) {
      close(workers[id].fd);
      int wstat; waitpid(workers[id].pid, &wstat, 0);
   }
}