commit 73ad03ca1d06f87965af76bf97eff3a946647d1b
parent 6e0bfd185bd1452256a177312a35032160d14cff
Author: Vincent Forest <vincent.forest@meso-star.com>
Date: Mon, 3 Jan 2022 15:44:23 +0100
Add MPI support to the function sdis_compute_power
Diffstat:
2 files changed, 140 insertions(+), 79 deletions(-)
diff --git a/src/sdis_c.h b/src/sdis_c.h
@@ -26,6 +26,7 @@ enum mpi_sdis_message {
MPI_SDIS_MSG_ACCUM_FLUX_IMPOSED, /* Imposed flux accumulator */
MPI_SDIS_MSG_ACCUM_FLUX_RADIATIVE, /* Radiative flux accumulator */
MPI_SDIS_MSG_ACCUM_FLUX_TOTAL, /* Total flux accumulator */
+ MPI_SDIS_MSG_ACCUM_MEAN_POWER, /* Mean power accumulator */
MPI_SDIS_MSG_GREEN_FUNCTION, /* Serialized green function */
MPI_SDIS_MSG_PROGRESS, /* Progress status */
MPI_SDIS_MSG_RES_T, /* Result status */
diff --git a/src/sdis_solve_medium_Xd.h b/src/sdis_solve_medium_Xd.h
@@ -165,6 +165,31 @@ check_solve_medium_args(const struct sdis_solve_medium_args* args)
return RES_OK;
}
+static INLINE res_T
+check_compute_power_args(const struct sdis_compute_power_args* args)
+{
+ if(!args) return RES_BAD_ARG;
+
+ /* Check the medium */
+ if(!args->medium) return RES_BAD_ARG;
+
+ /* Check #realisations */
+ if(!args->nrealisations || args->nrealisations > INT64_MAX) {
+ return RES_BAD_ARG;
+ }
+
+ /* Check time range */
+ if(args->time_range[0] < 0 || args->time_range[1] < args->time_range[0]) {
+ return RES_BAD_ARG;
+ }
+ if(args->time_range[1] > DBL_MAX
+ && args->time_range[0] != args->time_range[1]) {
+ return RES_BAD_ARG;
+ }
+
+ return RES_OK;
+}
+
#endif /* !SDIS_SOLVE_MEDIUM_XD_H */
/*******************************************************************************
@@ -532,39 +557,40 @@ XD(compute_power)
const struct sdis_compute_power_args* args,
struct sdis_estimator** out_estimator)
{
- struct darray_enclosure_cumul cumul;
+ /* Time registration */
+ struct time time0, time1;
+ char buf[128]; /* Temporary buffer used to store formated time */
+
+ /* Device variables */
+ struct mem_allocator* allocator = NULL;
+ size_t nthreads = 0;
+
+ /* Stardis variables */
struct sdis_estimator* estimator = NULL;
+
+ /* Random number generator */
struct ssp_rng_proxy* rng_proxy = NULL;
- struct ssp_rng** rngs = NULL;
- struct accum* acc_mpows = NULL;
- struct accum* acc_times = NULL;
+ struct ssp_rng** per_thread_rng = NULL;
+
+ /* Miscellaneous */
+ struct darray_enclosure_cumul cumul;
+ struct accum* per_thread_acc_mpow = NULL;
+ struct accum* per_thread_acc_time = NULL;
double spread = 0;
- size_t i = 0;
size_t nrealisations = 0;
int64_t irealisation = 0;
+ int32_t* progress = NULL; /* Per process progress bar */
int cumul_is_init = 0;
- int progress = 0;
+ int is_master_process = 1;
ATOMIC nsolved_realisations = 0;
ATOMIC res = RES_OK;
- if(!scn
- || !args
- || !out_estimator
- || !args->medium
- || !args->nrealisations
- || args->nrealisations > INT64_MAX
- || args->time_range[0] < 0
- || args->time_range[0] > args->time_range[1]
- || ( args->time_range[1] > DBL_MAX
- && args->time_range[0] != args->time_range[1])) {
- res = RES_BAD_ARG;
- goto error;
- }
-
- if(scene_is_2d(scn) != (SDIS_XD_DIMENSION==2)) {
- res = RES_BAD_ARG;
- goto error;
- }
+ if(!scn) { res = RES_BAD_ARG; goto error; }
+ if(!out_estimator) { res = RES_BAD_ARG; goto error; }
+ res = check_compute_power_args(args);
+ if(res != RES_OK) goto error;
+ res = XD(scene_check_dimensionality)(scn);
+ if(res != RES_OK) goto error;
if(sdis_medium_get_type(args->medium) != SDIS_SOLID) {
log_err(scn->dev, "Could not compute mean power on a non solid medium.\n");
@@ -572,32 +598,27 @@ XD(compute_power)
goto error;
}
- /* Create the RNG proxy */
- if(args->rng_state) {
- res = ssp_rng_proxy_create_from_rng(scn->dev->allocator, args->rng_state,
- scn->dev->nthreads, &rng_proxy);
- if(res != RES_OK) goto error;
- } else {
- res = ssp_rng_proxy_create(scn->dev->allocator, SSP_RNG_MT19937_64,
- scn->dev->nthreads, &rng_proxy);
- if(res != RES_OK) goto error;
- }
+#ifdef SDIS_ENABLE_MPI
+ is_master_process = !scn->dev->use_mpi || scn->dev->mpi_rank == 0;
+#endif
- /* Create the per thread RNG */
- rngs = MEM_CALLOC(scn->dev->allocator, scn->dev->nthreads, sizeof(*rngs));
- if(!rngs) { res = RES_MEM_ERR; goto error; }
- FOR_EACH(i, 0, scn->dev->nthreads) {
- res = ssp_rng_proxy_create_rng(rng_proxy, i, rngs+i);
- if(res != RES_OK) goto error;
- }
+ nthreads = scn->dev->nthreads;
+ allocator = scn->dev->allocator;
+
+ /* Create the per thread RNGs */
+ res = create_per_thread_rng
+ (scn->dev, args->rng_state, &rng_proxy, &per_thread_rng);
+ if(res != RES_OK) goto error;
+
+ /* Allocate the per process progress status */
+ res = alloc_process_progress(scn->dev, &progress);
+ if(res != RES_OK) goto error;
/* Create the per thread accumulators */
- acc_mpows = MEM_CALLOC
- (scn->dev->allocator, scn->dev->nthreads, sizeof(*acc_mpows));
- if(!acc_mpows) { res = RES_MEM_ERR; goto error; }
- acc_times = MEM_CALLOC
- (scn->dev->allocator, scn->dev->nthreads, sizeof(*acc_times));
- if(!acc_times) { res = RES_MEM_ERR; goto error; }
+ per_thread_acc_mpow = MEM_CALLOC(allocator, nthreads, sizeof(struct accum));
+ per_thread_acc_time = MEM_CALLOC(allocator, nthreads, sizeof(struct accum));
+ if(!per_thread_acc_mpow) { res = RES_MEM_ERR; goto error; }
+ if(!per_thread_acc_time) { res = RES_MEM_ERR; goto error; }
/* Compute the cumulative of the spreads of the enclosures surrounding the
* submitted medium */
@@ -610,20 +631,33 @@ XD(compute_power)
spread = darray_enclosure_cumul_cdata_get(&cumul)
[darray_enclosure_cumul_size_get(&cumul)-1].cumul;
- /* Create the estimator */
- res = estimator_create(scn->dev, SDIS_ESTIMATOR_POWER, &estimator);
- if(res != RES_OK) goto error;
+ /* Create the estimator on the master process only. No estimator is needed
+ * for non master process */
+ if(is_master_process) {
+ res = estimator_create(scn->dev, SDIS_ESTIMATOR_POWER, &estimator);
+ if(res != RES_OK) goto error;
+ }
+
+ /* Synchronise the processes */
+ process_barrier(scn->dev);
- nrealisations = args->nrealisations;
+ #define PROGRESS_MSG "Computing mean power: "
+ print_progress(scn->dev, progress, PROGRESS_MSG);
+
+ /* Begin time registration of the computation */
+ time_current(&time0);
+
+ /* Here we go! Launch the Monte Carlo estimation */
+ nrealisations = compute_process_realisations_count(scn->dev, args->nrealisations);
omp_set_num_threads((int)scn->dev->nthreads);
#pragma omp parallel for schedule(static)
for(irealisation = 0; irealisation < (int64_t)nrealisations; ++irealisation) {
struct time t0, t1;
struct sdis_rwalk_vertex vtx = SDIS_RWALK_VERTEX_NULL;
const int ithread = omp_get_thread_num();
- struct ssp_rng* rng = rngs[ithread];
- struct accum* acc_mpow = &acc_mpows[ithread];
- struct accum* acc_time = &acc_times[ithread];
+ struct ssp_rng* rng = per_thread_rng[ithread];
+ struct accum* acc_mpow = &per_thread_acc_mpow[ithread];
+ struct accum* acc_time = &per_thread_acc_time[ithread];
const struct enclosure* enc = NULL;
double power = 0;
double usec = 0;
@@ -664,52 +698,78 @@ XD(compute_power)
n = (size_t)ATOMIC_INCR(&nsolved_realisations);
pcent = (int)((double)n * 100.0 / (double)nrealisations + 0.5/*round*/);
#pragma omp critical
- if(pcent > progress) {
- progress = pcent;
- log_info(scn->dev, "Computing mean power: %3d%%\r", progress);
+ if(pcent > progress[0]) {
+ progress[0] = pcent;
+ print_progress_update(scn->dev, progress, PROGRESS_MSG);
}
exit_it:
continue;
error_it:
goto exit_it;
}
+ /* Synchronise the processes */
+ process_barrier(scn->dev);
+
+ res = gather_res_T(scn->dev, (res_T)res);
if(res != RES_OK) goto error;
- /* Add a new line after the progress status */
- log_info(scn->dev, "Computing mean power: %3d%%\n", progress);
+ print_progress_update(scn->dev, progress, PROGRESS_MSG);
+ log_info(scn->dev, "\n");
+ #undef PROGRESS_MSG
+
+ /* Report computation time */
+ time_sub(&time0, time_current(&time1), &time0);
+ time_dump(&time0, TIME_ALL, NULL, buf, sizeof(buf));
+ log_info(scn->dev, "Mean power computed in in %s.\n", buf);
+
+ /* Gather the RNG proxy sequence IDs and ensure that the RNG proxy state of
+ * the master process is greater than the RNG proxy state of all other
+ * processes */
+ res = gather_rng_proxy_sequence_id(scn->dev, rng_proxy);
+ if(res != RES_OK) goto error;
/* Setup the estimated mean power */
{
struct accum acc_mpow;
struct accum acc_time;
- sum_accums(acc_mpows, scn->dev->nthreads, &acc_mpow);
- sum_accums(acc_times, scn->dev->nthreads, &acc_time);
- ASSERT(acc_mpow.count == acc_time.count);
-
- estimator_setup_realisations_count(estimator, nrealisations, acc_mpow.count);
- estimator_setup_realisation_time(estimator, acc_time.sum, acc_time.sum2);
- estimator_setup_power
- (estimator, acc_mpow.sum, acc_mpow.sum2, spread, args->time_range);
- res = estimator_save_rng_state(estimator, rng_proxy);
- if(res != RES_OK) goto error;
+
+ time_current(&time0);
+
+ #define GATHER_ACCUMS(Msg, Acc) { \
+ res = gather_accumulators(scn->dev, Msg, per_thread_##Acc, &Acc); \
+ if(res != RES_OK) goto error; \
+ } (void)0
+ GATHER_ACCUMS(MPI_SDIS_MSG_ACCUM_MEAN_POWER, acc_mpow);
+ GATHER_ACCUMS(MPI_SDIS_MSG_ACCUM_TIME, acc_time);
+ #undef GATHER_ACCUMS
+
+ time_sub(&time0, time_current(&time1), &time0);
+ time_dump(&time0, TIME_ALL, NULL, buf, sizeof(buf));
+ log_info(scn->dev, "Accumulators gathered in %s.\n", buf);
+
+ /* Return an estimator only on master process */
+ if(is_master_process) {
+ ASSERT(acc_mpow.count == acc_time.count);
+ estimator_setup_realisations_count(estimator, args->nrealisations, acc_mpow.count);
+ estimator_setup_realisation_time(estimator, acc_time.sum, acc_time.sum2);
+ estimator_setup_power
+ (estimator, acc_mpow.sum, acc_mpow.sum2, spread, args->time_range);
+ res = estimator_save_rng_state(estimator, rng_proxy);
+ if(res != RES_OK) goto error;
+ }
}
exit:
- if(rngs) {
- FOR_EACH(i, 0, scn->dev->nthreads) {if(rngs[i]) SSP(rng_ref_put(rngs[i]));}
- MEM_RM(scn->dev->allocator, rngs);
- }
- if(acc_mpows) MEM_RM(scn->dev->allocator, acc_mpows);
- if(acc_times) MEM_RM(scn->dev->allocator, acc_times);
+ if(per_thread_rng) release_per_thread_rng(scn->dev, per_thread_rng);
+ if(progress) free_process_progress(scn->dev, progress);
+ if(per_thread_acc_mpow) MEM_RM(scn->dev->allocator, per_thread_acc_mpow);
+ if(per_thread_acc_time) MEM_RM(scn->dev->allocator, per_thread_acc_time);
if(cumul_is_init) darray_enclosure_cumul_release(&cumul);
if(rng_proxy) SSP(rng_proxy_ref_put(rng_proxy));
if(out_estimator) *out_estimator = estimator;
return (res_T)res;
error:
- if(estimator) {
- SDIS(estimator_ref_put(estimator));
- estimator = NULL;
- }
+ if(estimator) { SDIS(estimator_ref_put(estimator)); estimator = NULL; }
goto exit;
}