#include <mpi.h>
#include <iostream>
#include <cassert>
#include <cstdlib>
#include "primes.hpp"
#include <hpc/gmp/integer.hpp>
using namespace std;
using namespace hpc::gmp;
// #define JOBSIZE (1<<20)
#define JOBSIZE 100
enum { NEXT_JOB, NEXT_RESULT, FINISH };
void send_integer(const Integer& value, int dest, int tag) {
assert(dest >= 0);
if (tag == FINISH) {
MPI_Send(0, 0, MPI_INT, dest, tag, MPI_COMM_WORLD);
} else {
ExportedInteger exp_value(value);
int len = (int) exp_value.length();
int header[2] = {exp_value.sgn(), len};
MPI_Send(header, 2, MPI_INT, dest, tag, MPI_COMM_WORLD);
MPI_Send(exp_value.words, len, MPI_INT, dest, tag, MPI_COMM_WORLD);
}
}
void send_finish(int dest) {
Integer dummy;
send_integer(dummy, dest, FINISH);
}
bool receive_integer(Integer& value, int& source, int& tag) {
MPI_Status status;
int header[2];
MPI_Recv(header, 2, MPI_INT, source, tag, MPI_COMM_WORLD, &status);
tag = status.MPI_TAG;
source = status.MPI_SOURCE;
if (status.MPI_TAG == FINISH) return false;
int sgn = header[0]; unsigned int len = header[1];
ExportedInteger exp_value(sgn, len);
MPI_Recv(exp_value.words, len, MPI_INT, source, tag, MPI_COMM_WORLD, &status);
value = exp_value.get();
return true;
}
static void
primes_master(Integer& start_interval, Integer& end_interval,
unsigned int k, unsigned int offsets[], int nofslaves) {
// broadcast parameters that are required by all slaves
MPI_Bcast(&k, 1, MPI_UNSIGNED, 0, MPI_COMM_WORLD);
MPI_Bcast(offsets, k-1, MPI_UNSIGNED, 0, MPI_COMM_WORLD);
// send out initial tasks for all slaves
struct Task {
Integer start;
Integer end;
Task(const Integer& startval, unsigned long int intervallen,
const Integer& end_interval) :
start(startval), end(startval) {
end += intervallen;
if (end > end_interval) {
end = end_interval;
}
}
};
int running_tasks = 0;
Integer start(start_interval);
for (int slave = 1; slave <= nofslaves; ++slave) {
if (start < end_interval) {
Task task(start, JOBSIZE, end_interval); start += JOBSIZE;
send_integer(task.start, slave, NEXT_JOB);
send_integer(task.end, slave, NEXT_JOB);
++running_tasks;
} else {
// there is no work left for this slave
send_finish(slave);
}
}
// collect results and send out remaining tasks
while (running_tasks > 0) {
// receive result of a completed task
int source = MPI_ANY_SOURCE; int tag = MPI_ANY_TAG;
Integer result;
if (receive_integer(result, source, tag)) {
cout << result << " (received from " << source << ")" << endl;
} else if (start < end_interval) {
Task task(start, JOBSIZE, end_interval); start += JOBSIZE;
send_integer(task.start, source, NEXT_JOB);
send_integer(task.end, source, NEXT_JOB);
} else {
send_finish(source); --running_tasks;
}
}
}
static void primes_slave() {
unsigned int k;
MPI_Bcast(&k, 1, MPI_UNSIGNED, 0, MPI_COMM_WORLD);
unsigned int* offsets = new unsigned int[k-1];
MPI_Bcast(offsets, k-1, MPI_UNSIGNED, 0, MPI_COMM_WORLD);
// receive tasks and process them
for(;;) {
int source = 0;
int tag = MPI_ANY_TAG;
Integer start;
Integer end;
if (!receive_integer(start, source, tag)) break;
if (!receive_integer(end, source, tag)) break;
Integer prime;
while (search_prime_constellation(start, end, k, offsets, prime)) {
send_integer(prime, 0, NEXT_RESULT);
start = prime;
start += 1;
}
send_finish(0);
}
// release allocated memory
delete[] offsets;
}
char* progname;
void usage() {
cerr << "Usage: " << progname << " N1 N2 {n_i} " << endl;
exit(1);
}
int main(int argc, char** argv) {
MPI_Init(&argc, &argv);
int rank; MPI_Comm_rank(MPI_COMM_WORLD, &rank);
int nofslaves; MPI_Comm_size(MPI_COMM_WORLD, &nofslaves);
--nofslaves; assert(nofslaves > 0);
if (rank == 0) {
progname = *argv++; --argc;
if (argc < 3) usage();
Integer start(*argv++); --argc;
Integer end(*argv++); --argc;
int k = argc + 1;
unsigned int* offsets = new unsigned int[k-1];
for (int i = 0; i < k-1; ++i) {
offsets[i] = atoi(*argv++); --argc;
}
primes_master(start, end, k, offsets, nofslaves);
} else {
primes_slave();
}
MPI_Finalize();
}