Program Listing for File loadBalancerMaster.h¶
↰ Return to documentation for file (include/parpeloadbalancer/loadBalancerMaster.h
)
#ifndef LOADBALANCERMASTER_H
#define LOADBALANCERMASTER_H
#include <parpecommon/parpeConfig.h>
#include <atomic>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <semaphore.h>
#include <functional>
#ifdef PARPE_ENABLE_MPI
#include <mpi.h>
#endif
namespace parpe {
struct JobData {
JobData() = default;
JobData(int *jobDone,
std::condition_variable *jobDoneChangedCondition,
std::mutex *jobDoneChangedMutex)
: jobDone(jobDone),
jobDoneChangedCondition(jobDoneChangedCondition),
jobDoneChangedMutex(jobDoneChangedMutex) {
}
int jobId = -1;
std::vector<char> sendBuffer;
std::vector<char> recvBuffer;
int *jobDone = nullptr;
std::condition_variable *jobDoneChangedCondition = nullptr;
std::mutex *jobDoneChangedMutex = nullptr;
std::function<void(JobData*)> callbackJobFinished = nullptr;
};
#ifdef PARPE_ENABLE_MPI
class LoadBalancerMaster {
public:
LoadBalancerMaster() = default;
LoadBalancerMaster(LoadBalancerMaster& other) = delete;
LoadBalancerMaster& operator=(const LoadBalancerMaster& other) = delete;
LoadBalancerMaster(LoadBalancerMaster &&other) noexcept = delete;
LoadBalancerMaster const & operator=(LoadBalancerMaster &&fp) = delete;
~LoadBalancerMaster();
void run();
#ifndef QUEUE_MASTER_TEST
static void assertMpiActive();
#endif
void queueJob(JobData *data);
void terminate();
void sendTerminationSignalToAllWorkers();
bool isRunning() const;
int getNumQueuedJobs() const;
private:
void loadBalancerThreadRun();
void freeEmptiedSendBuffers();
int handleFinishedJobs();
int getNextFreeWorkerIndex();
JobData *getNextJob();
void sendToWorker(int workerIdx, JobData *data);
int handleReply(MPI_Status *mpiStatus);
bool sendQueuedJob(int freeWorkerIndex);
MPI_Comm mpiComm = MPI_COMM_WORLD;
MPI_Datatype mpiJobDataType = MPI_BYTE;
std::atomic_bool isRunning_ = false;
int numWorkers = 0;
std::queue<JobData *> queue;
int lastJobId = 0;
std::vector<bool> workerIsBusy;
std::vector<MPI_Request> sendRequests;
std::vector<JobData *> sentJobsData;
mutable std::mutex mutexQueue;
sem_t semQueue = {};
std::thread queueThread;
std::atomic_bool queue_thread_continue_ = true;
constexpr static int NO_FREE_WORKER = -1;
};
#endif
} // namespace parpe
#endif // LOADBALANCERMASTER_H