Program Listing for File loadBalancerMaster.h

Return to documentation for file (include/parpeloadbalancer/loadBalancerMaster.h)

#ifndef LOADBALANCERMASTER_H
#define LOADBALANCERMASTER_H

#include <parpecommon/parpeConfig.h>

#include <pthread.h>
#include <queue>
#include <semaphore.h>
#include <functional>

#ifdef PARPE_ENABLE_MPI
#include <mpi.h>
#endif

namespace parpe {

struct JobData {
    JobData() = default;

    JobData(int *jobDone,
            pthread_cond_t *jobDoneChangedCondition,
            pthread_mutex_t *jobDoneChangedMutex)
        : jobDone(jobDone),
          jobDoneChangedCondition(jobDoneChangedCondition),
          jobDoneChangedMutex(jobDoneChangedMutex) {
    }

    int jobId = -1;

    std::vector<char> sendBuffer;

    std::vector<char> recvBuffer;

    int *jobDone = nullptr;

    pthread_cond_t *jobDoneChangedCondition = nullptr;
    pthread_mutex_t *jobDoneChangedMutex = nullptr;

    std::function<void(JobData*)> callbackJobFinished = nullptr;
};


#ifdef PARPE_ENABLE_MPI

class LoadBalancerMaster {
  public:
    LoadBalancerMaster() = default;

    LoadBalancerMaster(LoadBalancerMaster& other) = delete;

    LoadBalancerMaster& operator=(const LoadBalancerMaster& other) = delete;

    LoadBalancerMaster(LoadBalancerMaster &&other) noexcept = delete;

    LoadBalancerMaster const & operator=(LoadBalancerMaster &&fp) = delete;

    ~LoadBalancerMaster();

    void run();

#ifndef QUEUE_MASTER_TEST
    static void assertMpiActive();
#endif

    void queueJob(JobData *data);

    void terminate();

    void sendTerminationSignalToAllWorkers();

    bool isRunning() const;

    int getNumQueuedJobs() const;

  private:
    static void *threadEntryPoint(void *vpLoadBalancerMaster);

    void loadBalancerThreadRun();

    void freeEmptiedSendBuffers();

    int handleFinishedJobs();

    int getNextFreeWorkerIndex();

    JobData *getNextJob();

    void sendToWorker(int workerIdx, JobData *data);

    int handleReply(MPI_Status *mpiStatus);

    bool sendQueuedJob(int freeWorkerIndex);

    MPI_Comm mpiComm = MPI_COMM_WORLD;

    MPI_Datatype mpiJobDataType = MPI_BYTE;

    bool isRunning_ = false;

    int numWorkers = 0;

    std::queue<JobData *> queue;

    int lastJobId = 0;

    std::vector<bool> workerIsBusy;

    std::vector<MPI_Request> sendRequests;

    std::vector<JobData *> sentJobsData;

    pthread_mutex_t mutexQueue = PTHREAD_MUTEX_INITIALIZER;

    sem_t semQueue = {};

    pthread_t queueThread = 0;

    constexpr static int NO_FREE_WORKER = -1;
};

#endif

} // namespace parpe

#endif // LOADBALANCERMASTER_H