#include #include #include #include #include "primes.hpp" #include "integer.hpp" #define JOBSIZE 100 enum { NEXT_JOB, NEXT_RESULT, FINISH }; void send_integer(const Integer& value, int dest, int tag) { assert(dest >= 0); if (tag == FINISH) { MPI_Send(0, 0, MPI_INT, dest, tag, MPI_COMM_WORLD); } else { size_t len; Integer::Word* words = value.get_export(len); int lenval = len; MPI_Send(&lenval, 1, MPI_INT, dest, tag, MPI_COMM_WORLD); MPI_Send(words, len, MPI_INT, dest, tag, MPI_COMM_WORLD); delete words; } } void send_finish(int dest) { Integer dummy; send_integer(dummy, dest, FINISH); } bool receive_integer(Integer& value, int& source, int& tag) { int lenval; MPI_Status status; MPI_Recv(&lenval, 1, MPI_INT, source, tag, MPI_COMM_WORLD, &status); tag = status.MPI_TAG; source = status.MPI_SOURCE; if (status.MPI_TAG == FINISH) return false; size_t len = lenval; Integer::Word* words = new Integer::Word[len]; MPI_Recv(words, len, MPI_INT, source, tag, MPI_COMM_WORLD, &status); value.import(words, len); delete words; return true; } static void primes_master(Integer& start_interval, Integer& end_interval, unsigned int k, unsigned int offsets[], int nofslaves) { // broadcast parameters that are required by all slaves MPI_Bcast(&k, 1, MPI_UNSIGNED, 0, MPI_COMM_WORLD); MPI_Bcast(offsets, k-1, MPI_UNSIGNED, 0, MPI_COMM_WORLD); // send out initial tasks for all slaves struct Task { Integer start; Integer end; Task(const Integer& startval, unsigned long int intervallen, const Integer& end_interval) : start(startval), end(startval) { end += intervallen; if (end > end_interval) { end = end_interval; } } }; int running_tasks = 0; Integer start(start_interval); for (int slave = 1; slave <= nofslaves; ++slave) { if (start < end_interval) { Task task(start, JOBSIZE, end_interval); start += JOBSIZE; send_integer(task.start, slave, NEXT_JOB); send_integer(task.end, slave, NEXT_JOB); ++running_tasks; } else { // there is no work left for this slave send_finish(slave); } } // collect results and send out remaining tasks while (running_tasks > 0) { // receive result of a completed task int source = MPI_ANY_SOURCE; int tag = MPI_ANY_TAG; Integer result; if (receive_integer(result, source, tag)) { std::cout << result << " (received from " << source << ")" << std::endl; } else if (start < end_interval) { Task task(start, JOBSIZE, end_interval); start += JOBSIZE; send_integer(task.start, source, NEXT_JOB); send_integer(task.end, source, NEXT_JOB); } else { send_finish(source); --running_tasks; } } } static void primes_slave() { unsigned int k; MPI_Bcast(&k, 1, MPI_UNSIGNED, 0, MPI_COMM_WORLD); unsigned int* offsets = new unsigned int[k-1]; MPI_Bcast(offsets, k-1, MPI_UNSIGNED, 0, MPI_COMM_WORLD); // receive tasks and process them for(;;) { int source = 0; int tag = MPI_ANY_TAG; Integer start; Integer end; if (!receive_integer(start, source, tag)) break; if (!receive_integer(end, source, tag)) break; Integer prime; while (search_prime_constellation(start, end, k, offsets, prime)) { send_integer(prime, 0, NEXT_RESULT); start = prime; start += 1; } send_finish(0); } // release allocated memory delete[] offsets; } const char* progname; void usage() { std::cerr << "Usage: " << progname << " N1 N2 {n_i} " << std::endl; exit(1); } int main(int argc, char** argv) { MPI_Init(&argc, &argv); int rank; MPI_Comm_rank(MPI_COMM_WORLD, &rank); int nofslaves; MPI_Comm_size(MPI_COMM_WORLD, &nofslaves); --nofslaves; assert(nofslaves > 0); if (rank == 0) { progname = *argv++; --argc; if (argc < 3) usage(); Integer start(*argv++); --argc; Integer end(*argv++); --argc; int k = argc + 1; unsigned int* offsets = new unsigned int[k-1]; for (int i = 0; i < k-1; ++i) { offsets[i] = atoi(*argv++); --argc; } primes_master(start, end, k, offsets, nofslaves); } else { primes_slave(); } MPI_Finalize(); }