Program Listing for File loadBalancerMaster.h

Return to documentation for file (include/parpeloadbalancer/loadBalancerMaster.h)

#ifndef LOADBALANCERMASTER_H
#define LOADBALANCERMASTER_H

#include <parpecommon/parpeConfig.h>

#include <atomic>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <semaphore.h>
#include <functional>

#ifdef PARPE_ENABLE_MPI
#include <mpi.h>
#endif

namespace parpe {

struct JobData {
    JobData() = default;

    JobData(int *jobDone,
            std::condition_variable *jobDoneChangedCondition,
            std::mutex *jobDoneChangedMutex)
        : jobDone(jobDone),
          jobDoneChangedCondition(jobDoneChangedCondition),
          jobDoneChangedMutex(jobDoneChangedMutex) {
    }

    int jobId = -1;

    std::vector<char> sendBuffer;

    std::vector<char> recvBuffer;

    int *jobDone = nullptr;

    std::condition_variable *jobDoneChangedCondition = nullptr;
    std::mutex *jobDoneChangedMutex = nullptr;

    std::function<void(JobData*)> callbackJobFinished = nullptr;
};


#ifdef PARPE_ENABLE_MPI
class LoadBalancerMaster {
  public:
    LoadBalancerMaster() = default;

    LoadBalancerMaster(LoadBalancerMaster& other) = delete;

    LoadBalancerMaster& operator=(const LoadBalancerMaster& other) = delete;

    LoadBalancerMaster(LoadBalancerMaster &&other) noexcept = delete;

    LoadBalancerMaster const & operator=(LoadBalancerMaster &&fp) = delete;

    ~LoadBalancerMaster();

    void run();

#ifndef QUEUE_MASTER_TEST
    static void assertMpiActive();
#endif

    void queueJob(JobData *data);

    void terminate();

    void sendTerminationSignalToAllWorkers();

    bool isRunning() const;

    int getNumQueuedJobs() const;

  private:
    void loadBalancerThreadRun();

    void freeEmptiedSendBuffers();

    int handleFinishedJobs();

    int getNextFreeWorkerIndex();

    JobData *getNextJob();

    void sendToWorker(int workerIdx, JobData *data);

    int handleReply(MPI_Status *mpiStatus);

    bool sendQueuedJob(int freeWorkerIndex);

    MPI_Comm mpiComm = MPI_COMM_WORLD;

    MPI_Datatype mpiJobDataType = MPI_BYTE;

    std::atomic_bool isRunning_ = false;

    int numWorkers = 0;

    std::queue<JobData *> queue;

    int lastJobId = 0;

    std::vector<bool> workerIsBusy;

    std::vector<MPI_Request> sendRequests;

    std::vector<JobData *> sentJobsData;

    mutable std::mutex mutexQueue;

    sem_t semQueue = {};

    std::thread queueThread;

    std::atomic_bool queue_thread_continue_ = true;


    constexpr static int NO_FREE_WORKER = -1;
};

#endif

} // namespace parpe

#endif // LOADBALANCERMASTER_H