htrdr

Solving radiative transfer in heterogeneous media
git clone git://git.meso-star.fr/htrdr.git
Log | Files | Refs | README | LICENSE

htrdr_proc_work.c (8079B)


      1 /* Copyright (C) 2018-2019, 2022-2025 Centre National de la Recherche Scientifique
      2  * Copyright (C) 2020-2022 Institut Mines Télécom Albi-Carmaux
      3  * Copyright (C) 2022-2025 Institut Pierre-Simon Laplace
      4  * Copyright (C) 2022-2025 Institut de Physique du Globe de Paris
      5  * Copyright (C) 2018-2025 |Méso|Star> (contact@meso-star.com)
      6  * Copyright (C) 2022-2025 Observatoire de Paris
      7  * Copyright (C) 2022-2025 Université de Reims Champagne-Ardenne
      8  * Copyright (C) 2022-2025 Université de Versaille Saint-Quentin
      9  * Copyright (C) 2018-2019, 2022-2025 Université Paul Sabatier
     10  *
     11  * This program is free software: you can redistribute it and/or modify
     12  * it under the terms of the GNU General Public License as published by
     13  * the Free Software Foundation, either version 3 of the License, or
     14  * (at your option) any later version.
     15  *
     16  * This program is distributed in the hope that it will be useful,
     17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
     18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
     19  * GNU General Public License for more details.
     20  *
     21  * You should have received a copy of the GNU General Public License
     22  * along with this program. If not, see <http://www.gnu.org/licenses/>. */
     23 
     24 #define _POSIX_C_SOURCE 200112L /* nanosleep */
     25 
     26 #include "core/htrdr_c.h"
     27 #include "core/htrdr_proc_work.h"
     28 
     29 #include <star/ssp.h>
     30 
     31 #include <rsys/mutex.h>
     32 
     33 #include <time.h>
     34 
     35 /*******************************************************************************
     36  * Helper functions
     37  ******************************************************************************/
     38 /* Return the rank of a working process */
     39 static int
     40 sample_working_process(struct htrdr* htrdr, struct ssp_rng* rng)
     41 {
     42   int iproc, i;
     43   int dst_rank;
     44   ASSERT(htrdr && rng && htrdr->mpi_nworking_procs);
     45 
     46   /* Sample the index of the 1st active process */
     47   iproc = (int)(ssp_rng_canonical(rng) * (double)htrdr->mpi_nworking_procs);
     48 
     49   /* Find the rank of the sampled active process. Use a simple linear search
     50    * since the overall number of processes should be quite low; at most few
     51    * dozens.  */
     52   i = 0;
     53   FOR_EACH(dst_rank, 0, htrdr->mpi_nprocs) {
     54     if(htrdr->mpi_working_procs[dst_rank] == 0) continue; /* Inactive process */
     55     if(i == iproc) break; /* The rank of the sampled process is found */
     56     ++i;
     57   }
     58   ASSERT(dst_rank < htrdr->mpi_nprocs);
     59   return dst_rank;
     60 }
     61 
     62 /*******************************************************************************
     63  * Local functions
     64  ******************************************************************************/
     65 void
     66 proc_work_init(struct mem_allocator* allocator, struct proc_work* work)
     67 {
     68   ASSERT(work);
     69   darray_u64_init(allocator, &work->chunks);
     70   work->index = 0;
     71   CHK(work->mutex = mutex_create());
     72 }
     73 
     74 void
     75 proc_work_release(struct proc_work* work)
     76 {
     77   darray_u64_release(&work->chunks);
     78   mutex_destroy(work->mutex);
     79 }
     80 
     81 void
     82 proc_work_reset(struct proc_work* work)
     83 {
     84   ASSERT(work);
     85   mutex_lock(work->mutex);
     86   darray_u64_clear(&work->chunks);
     87   work->index = 0;
     88   mutex_unlock(work->mutex);
     89 }
     90 
     91 void
     92 proc_work_add_chunk(struct proc_work* work, const uint64_t ichunk)
     93 {
     94   mutex_lock(work->mutex);
     95   CHK(darray_u64_push_back(&work->chunks, &ichunk) == RES_OK);
     96   mutex_unlock(work->mutex);
     97 }
     98 
     99 uint64_t
    100 proc_work_get_chunk(struct proc_work* work)
    101 {
    102   uint64_t ichunk = CHUNK_ID_NULL;
    103   ASSERT(work);
    104 
    105   mutex_lock(work->mutex);
    106   if(work->index >= darray_u64_size_get(&work->chunks)) {
    107     ichunk = CHUNK_ID_NULL;
    108   } else {
    109     ichunk = darray_u64_cdata_get(&work->chunks)[work->index];
    110     ++work->index;
    111   }
    112   mutex_unlock(work->mutex);
    113   return ichunk;
    114 }
    115 
    116 size_t
    117 proc_work_get_nchunks(struct proc_work* work)
    118 {
    119   size_t sz = 0;
    120   ASSERT(work);
    121 
    122   mutex_lock(work->mutex);
    123   sz = darray_u64_size_get(&work->chunks);
    124   mutex_unlock(work->mutex);
    125   return sz;
    126 }
    127 
    128 void
    129 mpi_wait_for_request(struct htrdr* htrdr, MPI_Request* req)
    130 {
    131   ASSERT(htrdr && req);
    132 
    133   /* Wait for process synchronisation */
    134   for(;;) {
    135     struct timespec t;
    136     int complete;
    137     t.tv_sec = 0;
    138     t.tv_nsec = 10000000; /* 10ms */
    139 
    140     mutex_lock(htrdr->mpi_mutex);
    141     MPI(Test(req, &complete, MPI_STATUS_IGNORE));
    142     mutex_unlock(htrdr->mpi_mutex);
    143     if(complete) break;
    144 
    145     nanosleep(&t, NULL);
    146   }
    147 }
    148 
    149 void
    150 mpi_probe_thieves
    151   (struct htrdr* htrdr,
    152    struct proc_work* work,
    153    ATOMIC* probe_thieves)
    154 {
    155   uint64_t chunks[UINT8_MAX];
    156   struct timespec t;
    157   ASSERT(htrdr && work && probe_thieves);
    158 
    159   if(htrdr->mpi_nprocs == 1) /* The process is alone. No thief is possible */
    160     return;
    161 
    162   t.tv_sec = 0;
    163 
    164   /* Protect MPI calls of multiple invocations from concurrent threads */
    165   #define P_MPI(Func) {                                                        \
    166     mutex_lock(htrdr->mpi_mutex);                                              \
    167     MPI(Func);                                                                 \
    168     mutex_unlock(htrdr->mpi_mutex);                                            \
    169   } (void)0
    170 
    171   while(ATOMIC_GET(probe_thieves)) {
    172     MPI_Status status;
    173     uint8_t i = 0;
    174     int msg = 0;
    175 
    176     /* Probe if a steal request was submitted by any processes */
    177     P_MPI(Iprobe(MPI_ANY_SOURCE, HTRDR_MPI_STEAL_REQUEST, MPI_COMM_WORLD, &msg,
    178       &status));
    179 
    180     if(msg) { /* A steal request was posted */
    181       MPI_Request req;
    182       uint8_t nchunks_to_steal;
    183 
    184       /* Asynchronously receive the steal request */
    185       P_MPI(Irecv(&nchunks_to_steal, 1, MPI_UINT8_T, status.MPI_SOURCE,
    186         HTRDR_MPI_STEAL_REQUEST, MPI_COMM_WORLD, &req));
    187 
    188       /* Wait for the completion of the steal request */
    189       mpi_wait_for_request(htrdr, &req);
    190 
    191       /* Thief some chunks */
    192       FOR_EACH(i, 0, nchunks_to_steal) {
    193         chunks[i] = proc_work_get_chunk(work);
    194       }
    195       P_MPI(Send(&chunks, nchunks_to_steal, MPI_UINT64_T, status.MPI_SOURCE,
    196         HTRDR_MPI_WORK_STEALING, MPI_COMM_WORLD));
    197     }
    198 
    199     /* Don't constantly check for thieves */
    200     t.tv_nsec = 10000000; /* 10ms */
    201     nanosleep(&t, NULL);
    202   }
    203   #undef P_MPI
    204 }
    205 
    206 /* Return the number of stolen tiles */
    207 size_t
    208 mpi_steal_work
    209   (struct htrdr* htrdr,
    210    struct ssp_rng* rng,
    211    struct proc_work* work)
    212 {
    213   MPI_Request req;
    214   size_t nthieves = 0;
    215   uint64_t chunks[UINT8_MAX]; /* Index of the stolen chunks */
    216   int proc_to_steal; /* Rank of the process to steal */
    217 
    218   /* Empircally set the number of chunks to steal */
    219   const uint8_t nchunks_to_steal = MMIN((uint8_t)(htrdr->nthreads*4), 32);
    220   uint8_t i = 0;
    221 
    222   ASSERT(htrdr && rng && work && htrdr->nthreads < UINT8_MAX);
    223 
    224   /* Protect MPI calls of multiple invocations from concurrent threads */
    225   #define P_MPI(Func) {                                                        \
    226     mutex_lock(htrdr->mpi_mutex);                                              \
    227     MPI(Func);                                                                 \
    228     mutex_unlock(htrdr->mpi_mutex);                                            \
    229   } (void)0
    230 
    231   /* No more working process => nothing to steal */
    232   if(!htrdr->mpi_nworking_procs) return 0;
    233 
    234   /* Sample a process to steal */
    235   proc_to_steal = sample_working_process(htrdr, rng);
    236 
    237   /* Send a steal request to the sampled process and wait for a response */
    238   P_MPI(Send(&nchunks_to_steal, 1, MPI_UINT8_T, proc_to_steal,
    239     HTRDR_MPI_STEAL_REQUEST, MPI_COMM_WORLD));
    240 
    241   /* Receive the stolen chunks from the sampled process */
    242   P_MPI(Irecv(chunks, nchunks_to_steal, MPI_UINT64_T, proc_to_steal,
    243     HTRDR_MPI_WORK_STEALING, MPI_COMM_WORLD, &req));
    244 
    245   mpi_wait_for_request(htrdr, &req);
    246 
    247   FOR_EACH(i, 0, nchunks_to_steal) {
    248 
    249     if(chunks[i] != CHUNK_ID_NULL) {
    250       /* Save stolen chunk in job list */
    251       proc_work_add_chunk(work, chunks[i]);
    252       ++nthieves;
    253 
    254     } else {
    255       /* The process has returned at least one invalid chunk,
    256        * i.e. it has nothing further to do.
    257        * Remove it from the working process */
    258       ASSERT(htrdr->mpi_working_procs[proc_to_steal] != 0);
    259       htrdr->mpi_working_procs[proc_to_steal] = 0;
    260       htrdr->mpi_nworking_procs--;
    261 
    262       break; /* No more to steal */
    263     }
    264   }
    265   #undef P_MPI
    266   return nthieves;
    267 }