1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      71
      72
      73
      74
      75
      76
      77
      78
      79
      80
      81
      82
      83
      84
      85
      86
      87
      88
      89
      90
      91
      92
      93
      94
      95
      96
      97
      98
      99
     100
     101
     102
     103
     104
     105
     106
     107
     108
     109
     110
     111
     112
     113
     114
     115
     116
     117
     118
     119
     120
     121
     122
     123
     124
     125
     126
     127
     128
     129
     130
     131
     132
     133
     134
     135
     136
     137
     138
     139
     140
     141
     142
     143
     144
     145
     146
     147
     148
     149
     150
     151
     152
     153
     154
     155
     156
     157
#include <mpi.h>
#include <iostream>
#include <cassert>
#include <cstdlib>
#include "primes.hpp"
#include <hpc/gmp/integer.hpp>

using namespace std;
using namespace hpc::gmp;

// #define JOBSIZE (1<<20)
#define JOBSIZE 100

enum { NEXT_JOB, NEXT_RESULT, FINISH };

void send_integer(const Integer& value, int dest, int tag) {
   assert(dest >= 0);
   if (tag == FINISH) {
      MPI_Send(0, 0, MPI_INT, dest, tag, MPI_COMM_WORLD);
   } else {
      ExportedInteger exp_value(value);
      int len = (int) exp_value.length();
      int header[2] = {exp_value.sgn(), len};
      MPI_Send(header, 2, MPI_INT, dest, tag, MPI_COMM_WORLD);
      MPI_Send(exp_value.words, len, MPI_INT, dest, tag, MPI_COMM_WORLD);
   }
}

void send_finish(int dest) {
   Integer dummy;
   send_integer(dummy, dest, FINISH);
}

bool receive_integer(Integer& value, int& source, int& tag) {
   MPI_Status status;
   int header[2];
   MPI_Recv(header, 2, MPI_INT, source, tag, MPI_COMM_WORLD, &status);
   tag = status.MPI_TAG;
   source = status.MPI_SOURCE;
   if (status.MPI_TAG == FINISH) return false;
   int sgn = header[0]; unsigned int len = header[1];
   ExportedInteger exp_value(sgn, len);
   MPI_Recv(exp_value.words, len, MPI_INT, source, tag, MPI_COMM_WORLD, &status);
   value = exp_value.get();
   return true;
}

static void
primes_master(Integer& start_interval, Integer& end_interval,
      unsigned int k, unsigned int offsets[], int nofslaves) {
   // broadcast parameters that are required by all slaves
   MPI_Bcast(&k, 1, MPI_UNSIGNED, 0, MPI_COMM_WORLD);
   MPI_Bcast(offsets, k-1, MPI_UNSIGNED, 0, MPI_COMM_WORLD);

   // send out initial tasks for all slaves
   struct Task {
      Integer start;
      Integer end;
      Task(const Integer& startval, unsigned long int intervallen,
	       const Integer& end_interval) :
	    start(startval), end(startval) {
	 end += intervallen;
	 if (end > end_interval) {
	    end = end_interval;
	 }
      }
   };
   int running_tasks = 0;
   Integer start(start_interval);
   for (int slave = 1; slave <= nofslaves; ++slave) {
      if (start < end_interval) {
	 Task task(start, JOBSIZE, end_interval); start += JOBSIZE;
	 send_integer(task.start, slave, NEXT_JOB);
	 send_integer(task.end, slave, NEXT_JOB);
	 ++running_tasks;
      } else {
         // there is no work left for this slave
	 send_finish(slave);
      }
   }

   // collect results and send out remaining tasks
   while (running_tasks > 0) {
      // receive result of a completed task
      int source = MPI_ANY_SOURCE; int tag = MPI_ANY_TAG;
      Integer result;
      if (receive_integer(result, source, tag)) {
	 cout << result << " (received from " << source << ")" << endl;
      } else if (start < end_interval) {
	 Task task(start, JOBSIZE, end_interval); start += JOBSIZE;
	 send_integer(task.start, source, NEXT_JOB);
	 send_integer(task.end, source, NEXT_JOB);
      } else {
	 send_finish(source); --running_tasks;
      }
   }
}

static void primes_slave() {
   unsigned int k;
   MPI_Bcast(&k, 1, MPI_UNSIGNED, 0, MPI_COMM_WORLD);
   unsigned int* offsets = new unsigned int[k-1];
   MPI_Bcast(offsets, k-1, MPI_UNSIGNED, 0, MPI_COMM_WORLD);

   // receive tasks and process them
   for(;;) {
      int source = 0;
      int tag = MPI_ANY_TAG;
      Integer start;
      Integer end;
      if (!receive_integer(start, source, tag)) break;
      if (!receive_integer(end, source, tag)) break;
      Integer prime;
      while (search_prime_constellation(start, end, k, offsets, prime)) {
	 send_integer(prime, 0, NEXT_RESULT);
	 start = prime;
	 start += 1;
      }
      send_finish(0);
   }
   // release allocated memory
   delete[] offsets;
}

char* progname;

void usage() {
   cerr << "Usage: " << progname << " N1 N2 {n_i} " << endl;
   exit(1);
}

int main(int argc, char** argv) {
   MPI_Init(&argc, &argv);

   int rank; MPI_Comm_rank(MPI_COMM_WORLD, &rank);
   int nofslaves; MPI_Comm_size(MPI_COMM_WORLD, &nofslaves);
   --nofslaves; assert(nofslaves > 0);

   if (rank == 0) {
      progname = *argv++; --argc;
      if (argc < 3) usage();
      Integer start(*argv++); --argc;
      Integer end(*argv++); --argc;

      int k = argc + 1;
      unsigned int* offsets = new unsigned int[k-1];
      for (int i = 0; i < k-1; ++i) {
	 offsets[i] = atoi(*argv++); --argc;
      }

      primes_master(start, end, k, offsets, nofslaves);
   } else {
      primes_slave();
   }

   MPI_Finalize();
}