commit 7fbc3d5b233ac4a9448ca1bbeee1b607bc063406
parent 58e780bb0548abafa9a6d2c11b33836362c42475
Author: Vincent Forest <vincent.forest@meso-star.com>
Date: Fri, 8 Dec 2023 14:46:51 +0100
Finalize implementation of the sdis_solve_probe_list function
This is a first draft, not debugged, commited here mainly for backup.
Mixed parallelism is handled, i.e. multithread and MPI are used to
distribute work.
Diffstat:
5 files changed, 360 insertions(+), 35 deletions(-)
diff --git a/src/sdis.c b/src/sdis.c
@@ -19,6 +19,7 @@
#include "sdis_c.h"
#include "sdis_device_c.h"
#include "sdis_estimator_c.h"
+#include "sdis_estimator_buffer_c.h"
#include "sdis_green.h"
#include "sdis_log.h"
#include "sdis_misc.h"
@@ -586,6 +587,114 @@ error:
#ifndef SDIS_ENABLE_MPI
res_T
+gather_accumulators_list
+ (struct sdis_device* dev,
+ const enum mpi_sdis_message msg,
+ const size_t nprobes, /* Total number of probes */
+ const size_t process_probes[2], /* Ids of the probes managed by the process */
+ struct accum* per_probe_acc) /* List of per probe accumulators */
+{
+ (void)dev, (void)msg, (void) nprobes;
+ (void)process_probes, (void)per_probe_acc;
+ return RES_OK;
+}
+#else
+res_T
+gather_accumulators_list
+ (struct sdis_device* dev,
+ const enum mpi_sdis_message msg,
+ const size_t nprobes, /* Total number of probes */
+ const size_t process_probes[2], /* Range of probes managed by the process */
+ struct accum* per_probe_acc) /* List of per probe accumulators */
+{
+ struct accum_list {
+ size_t size;
+ /* Simulate a C99 flexible array */
+ ALIGN(16) struct accum accums[1/*Dummy element*/];
+ }* accum_list = NULL;
+ size_t max_per_process_nprobes = 0; /* Maximum #probes per process */
+ size_t process_nprobes = 0; /* Number of process probes */
+ size_t msg_sz = 0; /* Size in bytes of the message to send */
+ res_T res = RES_OK;
+
+ /* Check pre-conditions */
+ ASSERT(dev);
+ ASSERT(process_nprobes == 0 || (process_probes && per_probe_acc));
+
+ /* Defines the maximum number of probes managed by a process. In fact, it's
+ * the number of probes divided by the number of processes, plus one to manage
+ * the remainder of the entire division: the remaining probes are distributed
+ * between the processes */
+ max_per_process_nprobes = nprobes/(size_t)dev->mpi_nprocs + 1;
+
+ /* Number of probes */
+ process_nprobes = process_probes[1] - process_probes[0];
+
+ /* Allocate the array into which the data to be collected is copied */
+ msg_sz =
+ sizeof(struct accum_list)
+ + sizeof(struct accum)*max_per_process_nprobes
+ - 1/*Dummy element */;
+ if(msg_sz > INT_MAX) {
+ log_err(dev, "%s: invalid MPI message size %lu.\n",
+ FUNC_NAME, (unsigned long)msg_sz);
+ res = RES_BAD_ARG;
+ goto error;
+ }
+
+ accum_list = MEM_CALLOC(dev->allocator, 1, msg_sz);
+ if(!accum_list) {
+ log_err(dev,
+ "%s: unable to allocate the temporary list of accumulators.\n",
+ FUNC_NAME);
+ res = RES_MEM_ERR;
+ goto error;
+ }
+
+ /* Non master process */
+ if(dev->mpi_rank != 0) {
+
+ /* Setup the message to be sent */
+ accum_list->size = process_nprobes;
+ memcpy(accum_list->accums, per_probe_acc,
+ sizeof(struct accum)*process_nprobes);
+
+ mutex_lock(dev->mpi_mutex);
+ MPI(Send(accum_list, (int)msg_sz, MPI_CHAR, 0/*Dst*/, msg, MPI_COMM_WORLD));
+ mutex_unlock(dev->mpi_mutex);
+
+ /* Master process */
+ } else {
+ size_t gathered_nprobes = process_nprobes;
+ int iproc;
+
+ FOR_EACH(iproc, 1, dev->mpi_nprocs) {
+ MPI_Request req;
+
+ /* Asynchronously receive the accumulator of `iproc' */
+ mutex_lock(dev->mpi_mutex);
+ MPI(Irecv
+ (accum_list, (int)msg_sz, MPI_CHAR, iproc, msg, MPI_COMM_WORLD, &req));
+ mutex_unlock(dev->mpi_mutex);
+
+ mpi_waiting_for_request(dev, &req);
+
+ memcpy(per_probe_acc+gathered_nprobes, accum_list->accums,
+ sizeof(struct accum)*accum_list->size);
+
+ gathered_nprobes += accum_list->size;
+ }
+ }
+
+exit:
+ return res;
+error:
+ goto exit;
+}
+#endif
+
+#ifndef SDIS_ENABLE_MPI
+res_T
gather_green_functions
(struct sdis_scene* scn,
struct ssp_rng_proxy* rng_proxy,
diff --git a/src/sdis.h b/src/sdis.h
@@ -466,6 +466,25 @@ struct sdis_solve_probe_args {
static const struct sdis_solve_probe_args SDIS_SOLVE_PROBE_ARGS_DEFAULT =
SDIS_SOLVE_PROBE_ARGS_DEFAULT__;
+struct sdis_solve_probe_list_args {
+ struct sdis_solve_probe_args* probes; /* List of probes to compute */
+ size_t nprobes; /* Total number of probes */
+
+ /* State/type of the RNG to use for the list of probes to calculate.
+ * If a probe defines its own state/type, it takes precedence over the
+ * following variables */
+ struct ssp_rng* rng_state; /* Initial RNG state. May be NULL */
+ enum ssp_rng_type rng_type; /* RNG type to use if `rng_state' is NULL */
+};
+#define SDIS_SOLVE_PROBE_LIST_ARGS_DEFAULT__ { \
+ NULL, /* List of probes */ \
+ 0, /* #probes */ \
+ NULL, /* RNG state */ \
+ SSP_RNG_THREEFRY /* RNG type */ \
+}
+static const struct sdis_solve_probe_list_args
+SDIS_SOLVE_PROBE_LIST_ARGS_DEFAULT = SDIS_SOLVE_PROBE_LIST_ARGS_DEFAULT__;
+
/* Arguments of a probe simulation */
struct sdis_solve_probe_boundary_args {
size_t nrealisations; /* #realisations */
@@ -1339,8 +1358,7 @@ sdis_solve_probe
SDIS_API res_T
sdis_solve_probe_list
(struct sdis_scene* scn,
- const struct sdis_solve_probe_args args[],
- const size_t nprobes,
+ const struct sdis_solve_probe_list_args* args,
struct sdis_estimator_buffer** buf);
SDIS_API res_T
diff --git a/src/sdis_c.h b/src/sdis_c.h
@@ -116,6 +116,21 @@ gather_accumulators
const struct accum* per_thread_acc,
struct accum* acc);
+/* Collect accumulators evaluated over multiple processes, with each accumulator
+ * storing a complete Monte Carlo calculation. Without MPI, nothing happens
+ * since the per_probe_acc variable already stores the entire list of
+ * accumulators. With MPI, non-master processes send their list of accumulators
+ * to the master process which saves them in the per_probe_acc, after its
+ * accumulators that it has managed, sorted against the identifiers of the
+ * probes listed in process_probes. */
+extern LOCAL_SYM res_T
+gather_accumulators_list
+ (struct sdis_device* dev,
+ const enum mpi_sdis_message msg,
+ const size_t nprobes, /* Total number of probes */
+ const size_t process_probes[2], /* Ids of the probes managed by the process */
+ struct accum* per_probe_acc); /* List of per probe accumulators */
+
/* Gather the green functions. With MPI, non master processes store in green
* the gathering of their per thread green functions and sent the result to the
* master process. The master process gathers both per thread green functions
diff --git a/src/sdis_solve.c b/src/sdis_solve.c
@@ -59,15 +59,14 @@ sdis_solve_probe
res_T
sdis_solve_probe_list
(struct sdis_scene* scn,
- const struct sdis_solve_probe_args args[],
- const size_t nprobes,
+ const struct sdis_solve_probe_list_args args[],
struct sdis_estimator_buffer** out_buf)
{
if(!scn) return RES_BAD_ARG;
if(scene_is_2d(scn)) {
- return solve_probe_list_2d(scn, args, nprobes, out_buf);
+ return solve_probe_list_2d(scn, args, out_buf);
} else {
- return solve_probe_list_3d(scn, args, nprobes, out_buf);
+ return solve_probe_list_3d(scn, args, out_buf);
}
}
diff --git a/src/sdis_solve_probe_Xd.h b/src/sdis_solve_probe_Xd.h
@@ -66,8 +66,93 @@ check_solve_probe_args(const struct sdis_solve_probe_args* args)
return RES_OK;
}
+static INLINE res_T
+check_solve_probe_list_args(const struct sdis_solve_probe_list_args* args)
+{
+ size_t iprobe = 0;
+ if(!args) return RES_BAD_ARG;
+
+ /* Check the list of probes */
+ if(!args->probes || !args->nprobes) {
+ return RES_BAD_ARG;
+ }
+
+ /* Check the RNG type */
+ if(!args->rng_state && args->rng_type >= SSP_RNG_TYPES_COUNT__) {
+ return RES_BAD_ARG;
+ }
+
+ FOR_EACH(iprobe, 0, args->nprobes) {
+ const res_T res = check_solve_probe_args(args->probes+iprobe);
+ if(res != RES_OK) return res;
+ }
+
+ return RES_OK;
+}
+
#endif /* SDIS_SOLVE_PROBE_XD_H */
+static res_T
+XD(solve_one_probe)
+ (struct sdis_scene* scn,
+ struct ssp_rng* rng,
+ const struct sdis_solve_probe_args* args,
+ struct accum* acc_temp,
+ struct accum* acc_time)
+{
+ struct sdis_medium* medium = NULL; /* Medium in which the probe lies */
+ size_t irealisation = 0;
+ res_T res = RES_OK;
+ ASSERT(scn && rng && check_solve_probe_args(args) == RES_OK);
+ ASSERT(acc_temp && acc_time);
+
+ *acc_temp = ACCUM_NULL;
+ *acc_time = ACCUM_NULL;
+
+ /* Retrieve the medium in which the submitted position lies */
+ res = scene_get_medium(scn, args->position, NULL, &medium);
+ if(res != RES_OK) goto error;
+
+ FOR_EACH(irealisation, 0, args->nrealisations) {
+ struct probe_realisation_args realis_args = PROBE_REALISATION_ARGS_NULL;
+ double w = NaN; /* MC weight */
+ double usec = 0; /* Time of a realisation */
+ double time = 0; /* Sampled observation time */
+ struct time t0, t1; /* Register the time spent solving a realisation */
+
+ /* Begin time registration of the realisation */
+ time_current(&t0);
+
+ /* Sample observation time */
+ time = sample_time(rng, args->time_range);
+
+ /* Run a realisation */
+ realis_args.rng = rng;
+ realis_args.medium = medium;
+ realis_args.time = time;
+ realis_args.picard_order = args->picard_order;
+ realis_args.irealisation = irealisation;
+ dX(set)(realis_args.position, args->position);
+ res = XD(probe_realisation)(scn, &realis_args, &w);
+ if(res != RES_OK) goto error;
+
+ /* Stop time registration */
+ time_sub(&t0, time_current(&t1), &t0);
+ usec = (double)time_val(&t0, TIME_NSEC) * 0.001;
+
+ /* Update MC weights */
+ acc_temp->sum += w;
+ acc_temp->sum2 += w*w;
+ acc_time->sum += usec;
+ acc_time->sum2 += usec*usec;
+ }
+
+exit:
+ return res;
+error:
+ goto exit;
+}
+
/*******************************************************************************
* Generic solve function
******************************************************************************/
@@ -372,37 +457,42 @@ error:
static res_T
XD(solve_probe_list)
(struct sdis_scene* scn,
- const struct sdis_solve_probe_args args[],
- const size_t nprobes,
- struct sdis_estimator_buffer** out_buf)
+ const struct sdis_solve_probe_list_args* args,
+ struct sdis_estimator_buffer** out_estim_buf)
{
/* Time registration */
struct time time0, time1;
+ char buf[128]; /* Temporary buffer used to store formated time */
/* Device variables */
struct mem_allocator* allocator = NULL;
- size_t nthreads = 0;
/* Stardis variables */
- struct sdis_estimator_buffer* buf = NULL;
+ struct sdis_estimator_buffer* estim_buf = NULL;
+
+ /* Random Number generator */
+ struct ssp_rng_proxy* rng_proxy = NULL;
+ struct ssp_rng** per_thread_rng = NULL;
/* Probe variables */
- size_t per_process_probes[2];
- size_t per_process_nprobes;
- int64_t iprobe = 0;
+ size_t process_probes[2] = {0, 0}; /* Probes range managed by the process */
+ size_t process_nprobes = 0; /* Number of probes managed by the process */
+ size_t nprobes = 0;
+ struct accum* per_probe_acc_temp = NULL;
+ struct accum* per_probe_acc_time = NULL;
/* Miscellaneous */
int32_t* progress = NULL; /* Per process progress bar */
int pcent_progress = 1; /* Percentage requiring progress update */
int is_master_process = 0;
- res_T res = RES_OK;
+ int64_t i = 0;
+ ATOMIC nsolved_probes = 0;
+ ATOMIC res = RES_OK;
/* Check input arguments */
- if(!scn || !args || !out_buf) { res = RES_BAD_ARG; goto error; }
- FOR_EACH(iprobe, 0, nprobes) {
- res = check_solve_probe_args(&args[iprobe]);
- if(res != RES_OK) goto error;
- }
+ if(!scn || !out_estim_buf) { res = RES_BAD_ARG; goto error; }
+ res = check_solve_probe_list_args(args);
+ if(res != RES_OK) goto error;
res = XD(scene_check_dimensionality)(scn);
if(res != RES_OK) goto error;
@@ -410,7 +500,6 @@ XD(solve_probe_list)
is_master_process = !scn->dev->use_mpi || scn->dev->mpi_rank == 0;
#endif
- nthreads = scn->dev->nthreads;
allocator = scn->dev->allocator;
/* Update the progress bar every percent if escape sequences are allowed in
@@ -418,6 +507,11 @@ XD(solve_probe_list)
* This reduces the number of lines of plain text printed */
pcent_progress = scn->dev->no_escape_sequence ? 10 : 1;
+ /* Create the per thread RNGs */
+ res = create_per_thread_rng
+ (scn->dev, args->rng_state, args->rng_type, &rng_proxy, &per_thread_rng);
+ if(res != RES_OK) goto error;
+
/* Allocate the per process progress status */
res = alloc_process_progress(scn->dev, &progress);
if(res != RES_OK) goto error;
@@ -425,33 +519,123 @@ XD(solve_probe_list)
/* Synchronise the processes */
process_barrier(scn->dev);
+ #define PROGRESS_MSG "Solving probes: "
+ print_progress(scn->dev, progress, PROGRESS_MSG);
+
/* Begin time registration of the computation */
time_current(&time0);
- /* Here we go! Calclation of probe list */
- per_process_nprobes = compute_process_index_range
- (scn->dev, nprobes, per_process_probes);
-
- /* Create the global estimator on the master process only */
- if(is_master_process) {
- res = estimator_buffer_create(scn->dev, nprobes, 1, &buf);
- if(res != RES_OK) goto error;
+ /* Define the range of probes to manage in this process */
+ process_nprobes = compute_process_index_range
+ (scn->dev, args->nprobes, process_probes);
+
+ /* Allocate the list of accumulators per probe. On the master process,
+ * allocate a complete list in which the accumulators of all processes will be
+ * stored. */
+ nprobes = is_master_process ? args->nprobes : process_nprobes;
+ per_probe_acc_temp = MEM_CALLOC(allocator, nprobes, sizeof(*per_probe_acc_temp));
+ per_probe_acc_time = MEM_CALLOC(allocator, nprobes, sizeof(*per_probe_acc_time));
+ if(!per_probe_acc_temp || !per_probe_acc_time) {
+ log_err(scn->dev, "Unable to allocate the list of accumulators per probe.\n");
+ res = RES_MEM_ERR;
+ goto error;
}
+ /* Here we go! Calculation of probe list */
omp_set_num_threads((int)scn->dev->nthreads);
#pragma omp parallel for schedule(static)
- for(iprobe = 0; iprobe < (int64_t)per_process_nprobes; ++iprobe) {
- /* TODO */
+ for(i = 0; i < (int64_t)process_nprobes; ++i) {
+ /* Thread */
+ const int ithread = omp_get_thread_num(); /* Thread ID */
+ struct ssp_rng* rng = per_thread_rng[ithread]; /* RNG of the thread */
+
+ /* Probe */
+ struct accum* probe_acc_temp = NULL;
+ struct accum* probe_acc_time = NULL;
+ const struct sdis_solve_probe_args* probe_args = NULL; /* Solve args */
+ const size_t iprobe = process_probes[i]; /* Probe ID */
+
+ /* Misc */
+ size_t n = 0; /* Number of solved probes */
+ int pcent = 0; /* Current progress */
+ res_T res_local = RES_OK;
+
+ if(ATOMIC_GET(&res) != RES_OK) continue; /* An error occured */
+
+ /* Retrieve the probe arguments */
+ probe_args = &args->probes[iprobe];
+
+ /* Retrieve the probe accumulators */
+ probe_acc_temp = &per_probe_acc_temp[i];
+ probe_acc_time = &per_probe_acc_time[i];
+
+ res_local = XD(solve_one_probe)
+ (scn, rng, probe_args, probe_acc_temp, probe_acc_time);
+ if(res_local != RES_OK) {
+ ATOMIC_SET(&res, res_local);
+ continue;
+ }
+
+ /* Update progress */
+ n = (size_t)ATOMIC_INCR(&nsolved_probes);
+ pcent = (int)((double)n * 100.0 / (double)nprobes + 0.5/*round*/);
+
+ #pragma omp critical
+ if(pcent/pcent_progress > progress[0]/pcent_progress) {
+ progress[0] = pcent;
+ print_progress_update(scn->dev, progress, PROGRESS_MSG);
+ }
}
+ /* Synchronise processes */
+ process_barrier(scn->dev);
+
+ res = gather_res_T(scn->dev, (res_T)res);
+ if(res != RES_OK) goto error;
+
+ print_progress_completion(scn->dev, progress, PROGRESS_MSG);
+ #undef PROGRESS_MSG
+
+ /* Report computation time */
+ time_sub(&time0, time_current(&time1), &time0);
+ time_dump(&time0, TIME_ALL, NULL, buf, sizeof(buf));
+ log_info(scn->dev, "%lu probes solved in %s.\n",
+ (unsigned long)args->nprobes, buf);
+
+ /* Gather the RNG proxy sequence IDs and ensure that the RNG proxy state of
+ * the master process is greater than the RNG proxy state of all other
+ * processes */
+ res = gather_rng_proxy_sequence_id(scn->dev, rng_proxy);
+ if(res != RES_OK) goto error;
+
+ time_current(&time0);
+
+ /* Gather the list of accumulators */
+ #define GATHER_ACCUMS_LIST(Msg, Acc) { \
+ res = gather_accumulators_list \
+ (scn->dev, Msg, args->nprobes, process_probes, per_probe_acc_##Acc); \
+ if(res != RES_OK) goto error; \
+ } (void)0
+ GATHER_ACCUMS_LIST(MPI_SDIS_MSG_ACCUM_TEMP, temp);
+ GATHER_ACCUMS_LIST(MPI_SDIS_MSG_ACCUM_TIME, time);
+ #undef GATHER_ACCUMS_LIST
+
+ time_sub(&time0, time_current(&time1), &time0);
+ time_dump(&time0, TIME_ALL, NULL, buf, sizeof(buf));
+ log_info(scn->dev, "Probes accumulator gathered in %s.\n", buf);
+
error:
+ if(per_probe_acc_temp) MEM_RM(allocator, per_probe_acc_temp);
+ if(per_probe_acc_time) MEM_RM(allocator, per_probe_acc_time);
if(progress) free_process_progress(scn->dev, progress);
- if(out_buf) *out_buf = buf;
- return res;
+ if(out_estim_buf) *out_estim_buf = estim_buf;
+ return (res_T)res;
exit:
- if(buf) { SDIS(estimator_buffer_ref_put(buf)); buf = NULL; }
+ if(estim_buf) {
+ SDIS(estimator_buffer_ref_put(estim_buf));
+ estim_buf = NULL;
+ }
goto exit;
}
#include "sdis_Xd_end.h"
-