Commit 5ca4bf3b authored by Martin Rose's avatar Martin Rose
Browse files

initial release

parents
# Overview
This kernel code implements dynamic load balancing among neighbouring MPI ranks.
Here, the kernel starts with a uniform load for all ranks and constantly adds load to rank 0 after each iteration of the computation. The load imbalance is then handled by shifting work packages from those ranks that have more work to do to those ranks with less work.
This kernel assumes that the work per item is equal for all items and all ranks. This is probably not true for real world applications. Load, i.e. work items, is transfered among neighbouring ranks only while in a real application, load could be transfered among arbitrary ranks.
# Getting started
## Prerequisites
To build and run this kernel you will need a
* MPI library
* C++ compiler
## Building and running the kernel
The kernel can be compiled with the provided Makefile via
```
make
```
The resulting executable `rankdlb` can be run using mpirun. It takes three arguments:
1. number of compute cycles per round
2. initial number of work items per MPI rank
3. maximum number of work items per MPI rank
To run the kernel execute
```
mpirun -np <num processes> rankdlb <cycles> <items_per_rank_initial> <max_items_per_rank>
```
For example: mpirun -np 16 1000 100000 150000
The kernel continuously reports the rate at which it processes work items. A higher value indicates better load balance. It can be seen that the rate drops while rank 0 creates new work items. The rate increaes again when load balcing was performed.
/*
* POP2 HLRS kernel-2: dynamic load distribution in MPI
*
* The programm demonstrates dynamic load balancing among neigbouriung MPI ranks.
* In this demonstration, the work item is a long integer, the processing of the work elements
* consists in add in up all integers within a MPI rank.
* The output of the program is the achieved performance, measured in processed items per second.
*
* Copyright(c) 2020 High Performance Computing Center Stuttgart (HLRS)
*
* This code is published under the BSD license.
*
*/
#include <math.h>
#include <mpi.h>
#include <stdlib.h>
#include <sys/time.h>
#include <iostream>
using namespace ::std;
long process_data(long *pdata, int n) {
long result_l = 0;
for (int i = 0; i < n; i++) {
result_l += pdata[i];
}
return result_l;
}
void extend_array(long **data, int ndata, int *ndata_max) {
int ndata_max_new = ndata * 2;
long *data_new = new long[ndata_max_new];
for (int i = 0; i < *ndata_max; i++) {
data_new[i] = (*data)[i];
}
delete[] * data;
*data = data_new;
*ndata_max = ndata_max_new;
}
int main(int argc, char *argv[]) {
int rank, nranks;
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &nranks);
// get block size from command line
if (argc != 4) {
if (rank == 0) {
cout << "usage example: mpirun -l -n 2 ./rankdlb ncycles ndata "
"ndata_limit"
<< endl;
}
return 0;
}
int ncycles = atoi(argv[1]);
int ndata = atoi(argv[2]);
int ndata_limit = atoi(argv[3]);
int ndata_max = 2 * ndata; // size of array data
int cycle_lb=ncycles*0.85;
if (rank == 0) {
cout << "# hi, this is rankdlb-2020" << endl;
cout << "# using " << nranks << " ranks..." << endl;
cout << "# number of cycles: " << ncycles << endl;
cout << "# initial number of work items per rank: " << ndata << endl;
cout << "# max number of work items per rank: " << ndata_limit << endl;
cout << "# load balancing stops after cycle: " << cycle_lb << endl;
}
// the values below are updated after every cycle, they introduce the
// coupling between neighbouring domains in the feal application these
// values represent data that needs to be transfered befor the next work
// package can be processed
double seed_up = 0;
double seed_down = 0;
long *data = new long[ndata_max];
// init data
for (int i = 0; i < ndata; i++) {
data[i] = i;
}
long nprocessed = 0;
struct timeval t_last, t_now;
gettimeofday(&t_last, NULL);
bool last_lb_done = false;
// compute cycles
for (int icycle = 0; icycle < ncycles; icycle++) {
// change amount of work - create imbalance
if ((rank == 0) && (icycle > 100) && (icycle < 8000)) {
int ndata_new = ndata * 1.001;
if (ndata_new < ndata_limit) {
// extend array if necessary
if (ndata_new > ndata_max) {
ndata_max = ndata_new * 2;
long *data_new = new long[ndata_max];
for (int i = 0; i < ndata; i++) {
data_new[i] = data[i];
}
delete[] data;
data = data_new;
}
for (int i = ndata; i < ndata_new; i++) {
data[i] = i;
}
ndata = ndata_new;
}
}
// process data
long result = seed_up + seed_down;
result += process_data(data, ndata);
nprocessed += ndata;
// exchange data at boundaries - here communicate result to neighbours
long send_up = (result % 1000) + 1;
long send_down = (result % 1000) - 1;
// receive from previous rank
long rec_up = 0;
long rec_down = 0;
MPI_Request reqs[4];
MPI_Status stats[4];
int nwait = 0;
if (rank > 0) {
// receive from previous rank
MPI_Irecv(&rec_down, 1, MPI_LONG, rank - 1, icycle, MPI_COMM_WORLD,
&reqs[nwait]);
nwait++;
// send to previous rank
MPI_Isend(&send_down, 1, MPI_LONG, rank - 1, icycle, MPI_COMM_WORLD,
&reqs[nwait]);
nwait++;
}
if (rank < nranks - 1) {
// receive from next rank
MPI_Irecv(&rec_up, 1, MPI_LONG, rank + 1, icycle, MPI_COMM_WORLD,
&reqs[nwait]);
nwait++;
// send to next rank
MPI_Isend(&send_up, 1, MPI_LONG, rank + 1, icycle, MPI_COMM_WORLD,
&reqs[nwait]);
nwait++;
}
MPI_Waitall(nwait, reqs, stats);
// apply received values
seed_up = rec_up;
seed_down = rec_down;
// print performance data
if (icycle % 20 == 0) {
long sum_processed = 0;
MPI_Reduce(&nprocessed, &sum_processed, 1, MPI_LONG, MPI_SUM, 0,
MPI_COMM_WORLD);
nprocessed = 0;
gettimeofday(&t_now, NULL); // measure time after reception of data
// in order to include wait time
if (rank == 0) {
long t = (t_now.tv_sec - t_last.tv_sec) * 1000000 +
(t_now.tv_usec - t_last.tv_usec);
t_last = t_now;
if (icycle > 50) {
double dt = (double)t / 1000000.0;
cout << icycle << " " << sum_processed << " "
<< sum_processed / dt << " #throughput" << endl;
}
}
}
// perform load balancing
if (icycle % 50 == 0) {
int *nwork = new int[nranks];
MPI_Allgather(&ndata, 1, MPI_INT, nwork, 1, MPI_INT,
MPI_COMM_WORLD);
// load balancing is only performed when there is enough imbalance
int mean = 0;
int sum = 0;
for (int i = 0; i < nranks; i++) {
sum += nwork[i];
}
mean = sum / nranks;
bool do_load_balance = false;
double max_imbalance = 0;
for (int i = 0; i < nranks; i++) {
int diff = abs(nwork[i] - mean);
// cout << diff << " " <<
// (double)diff/(double)mean << endl;
double imbalance = (double)diff / (double)mean;
if (imbalance > 0.1) do_load_balance = true;
if (imbalance > max_imbalance) max_imbalance = imbalance;
}
if (icycle > cycle_lb) {
if (last_lb_done == false) {
last_lb_done = true;
do_load_balance = true;
}
}
if (do_load_balance) {
// balance load so that every rank contains the same number of
// work items here data is transfered between neighbours only
// each rank can transfer work items up and down, but it can not
// transfer and receive in the same direction at the same time
cout << "# performing load balancing " << max_imbalance << endl;
// compute the number of items to transfer between rank i and
// i+1, positive values mean transfer from i to i+1, negative
// values mean transfer i+1 to i
int *ntransfer = new int[nranks - 1];
ntransfer[0] = nwork[0] - mean;
for (int irank = 1; irank < nranks - 1; irank++) {
int n_new = nwork[irank] + ntransfer[irank - 1];
ntransfer[irank] = n_new - mean;
}
// every rank kntransfernows now how many items it has to
// exchange with its neighbours (send / receive)
int x_up = 0;
int x_down = 0;
if (rank > 0) x_down = ntransfer[rank - 1];
if (rank < nranks - 1) x_up = ntransfer[rank];
MPI_Request reqs[4];
MPI_Status stats[4];
int nwait = 0;
int data_send_up = 0;
int data_send_down = 0;
int data_rec_up = 0;
int data_rec_down = 0;
// receive from previous rank
if (x_down > 0) {
// receive from previous rank
MPI_Irecv(&data_rec_down, 1, MPI_INT, rank - 1, icycle,
MPI_COMM_WORLD, &reqs[nwait]);
nwait++;
} else if (x_down < 0) {
// send to previous rank - for simplicity only the number of
// items is sent, not the items themselves
data_send_down = abs(x_down);
MPI_Isend(&data_send_down, 1, MPI_INT, rank - 1, icycle,
MPI_COMM_WORLD, &reqs[nwait]);
nwait++;
// remove data that was sent
ndata -= data_send_down;
}
if (x_up > 0) {
// send to next rank - for simplicity only the number of
// items is sent, not the items themselves
data_send_up = abs(x_up);
MPI_Isend(&data_send_up, 1, MPI_INT, rank + 1, icycle,
MPI_COMM_WORLD, &reqs[nwait]);
nwait++;
// remove data that was sent
ndata -= data_send_up;
} else if (x_up < 0) {
// receive from next rank
MPI_Irecv(&data_rec_up, 1, MPI_INT, rank - 1, icycle,
MPI_COMM_WORLD, &reqs[nwait]);
nwait++;
}
MPI_Waitall(nwait, reqs, stats);
// add received data
if (data_rec_down > 0) {
int ndata_new = ndata + data_rec_down;
if (ndata_new > ndata_max)
extend_array(&data, ndata, &ndata_max);
ndata = ndata_new;
}
if (data_rec_up > 0) {
int ndata_new = ndata + data_rec_up;
if (ndata_new > ndata_max)
extend_array(&data, ndata, &ndata_max);
ndata = ndata_new;
}
if (ndata > ndata_max) cout << "prob" << endl;
delete[] ntransfer;
} else {
if (rank == 0) {
cout << "# skipping load balancing: " << max_imbalance
<< endl;
}
}
delete[] nwork;
}
}
delete[] data;
MPI_Finalize();
}
rankdlb: main.cpp
mpicxx -g -o rankdlb -Wextra -Wall main.cpp
clean:
rm rankdlb *.o *~
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment