stardis-solver

Solve coupled heat transfers
git clone git://git.meso-star.fr/stardis-solver.git
Log | Files | Refs | README | LICENSE

sdis_mpi.c (3977B)


      1 /* Copyright (C) 2016-2025 |Méso|Star> (contact@meso-star.com)
      2  *
      3  * This program is free software: you can redistribute it and/or modify
      4  * it under the terms of the GNU General Public License as published by
      5  * the Free Software Foundation, either version 3 of the License, or
      6  * (at your option) any later version.
      7  *
      8  * This program is distributed in the hope that it will be useful,
      9  * but WITHOUT ANY WARRANTY; without even the implied warranty of
     10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
     11  * GNU General Public License for more details.
     12  *
     13  * You should have received a copy of the GNU General Public License
     14  * along with this program. If not, see <http://www.gnu.org/licenses/>. */
     15 
     16 #define _POSIX_C_SOURCE 199309L /* nanosleep */
     17 
     18 #include "sdis_c.h"
     19 #include "sdis_device_c.h"
     20 #include "sdis_mpi.h"
     21 
     22 #include <rsys/mutex.h>
     23 
     24 #include <time.h> /* nanosleep */
     25 #include <sys/time.h> /* struct timespec */
     26 
     27 /*******************************************************************************
     28  * Local functions
     29  ******************************************************************************/
     30 void
     31 mpi_send_progress(struct sdis_device* dev, const int32_t progress)
     32 {
     33   ASSERT(dev && dev->use_mpi);
     34   (void)dev;
     35 
     36   mutex_lock(dev->mpi_mutex);
     37   MPI(Send(&progress, 1/*#data*/, MPI_INT32_T, 0/*dst rank*/,
     38     MPI_SDIS_MSG_PROGRESS, MPI_COMM_WORLD));
     39   mutex_unlock(dev->mpi_mutex);
     40 }
     41 
     42 void
     43 mpi_fetch_progress(struct sdis_device* dev, int32_t progress[])
     44 {
     45   int iproc;
     46   ASSERT(dev && dev->use_mpi && dev->mpi_rank == 0);
     47 
     48   FOR_EACH(iproc, 1, dev->mpi_nprocs) {
     49     /* Flush all progress messages sent by the process `iproc' */
     50     for(;;) {
     51       MPI_Request req;
     52       int flag;
     53 
     54       /* Query for a progress message */
     55       mutex_lock(dev->mpi_mutex);                                   
     56       MPI(Iprobe(iproc, MPI_SDIS_MSG_PROGRESS, MPI_COMM_WORLD, &flag,
     57         MPI_STATUS_IGNORE));
     58       mutex_unlock(dev->mpi_mutex);
     59 
     60       if(flag == 0) break; /* No more progress status to receive */
     61 
     62       /* Asynchronously receive the progress status */
     63       mutex_lock(dev->mpi_mutex);                                   
     64       MPI(Irecv(&progress[iproc], 1/*count*/, MPI_INT32_T, iproc,
     65         MPI_SDIS_MSG_PROGRESS, MPI_COMM_WORLD, &req));
     66       mutex_unlock(dev->mpi_mutex);
     67 
     68       /* Actively wait for the completion of the receive procedure */
     69       mpi_waiting_for_request(dev, &req);
     70     }
     71   }
     72 }
     73 
     74 void
     75 mpi_waiting_for_request(struct sdis_device* dev, MPI_Request* req)
     76 {
     77   struct timespec t;
     78   ASSERT(dev && dev->use_mpi && req);
     79 
     80   /* Setup the suspend time of the process while waiting for a request */
     81   t.tv_sec = 0;
     82   t.tv_nsec = 10000000; /* 10ms */
     83 
     84   /* Wait for process synchronisation */
     85   for(;;) {
     86     int complete;
     87     
     88     mutex_lock(dev->mpi_mutex);
     89     MPI(Test(req, &complete, MPI_STATUS_IGNORE));
     90     mutex_unlock(dev->mpi_mutex);
     91 
     92     if(complete) break;
     93     nanosleep(&t, NULL);
     94   }
     95 }
     96 
     97 void
     98 mpi_waiting_for_message
     99   (struct sdis_device* dev,
    100    const int iproc,
    101    const enum mpi_sdis_message msg,
    102    MPI_Status* status)
    103 {
    104   struct timespec t;
    105   ASSERT(dev && dev->use_mpi && status);
    106 
    107   /* Setup the suspend time of the process while waiting for a message */
    108   t.tv_sec = 0;
    109   t.tv_nsec = 10000000; /* 10ms */
    110 
    111   /* Wait for process synchronisation */
    112   for(;;) {
    113     int flag;
    114 
    115     /* Asynchronously probe for green function data */
    116     mutex_lock(dev->mpi_mutex);
    117     MPI(Iprobe(iproc, msg, MPI_COMM_WORLD, &flag, status));
    118     mutex_unlock(dev->mpi_mutex);
    119 
    120     if(flag) break;
    121     nanosleep(&t, NULL);
    122   }
    123 }
    124 
    125 void
    126 mpi_barrier(struct sdis_device* dev)
    127 {
    128   MPI_Request req;
    129   ASSERT(dev && dev->use_mpi);
    130 
    131   /* Asynchronously wait for process completion. Use an asynchronous barrier to
    132    * avoid a dead lock if another thread on the same process queries the
    133    * mpi_mutex */
    134 
    135   mutex_lock(dev->mpi_mutex);
    136   MPI(Ibarrier(MPI_COMM_WORLD, &req));
    137   mutex_unlock(dev->mpi_mutex);
    138 
    139   mpi_waiting_for_request(dev, &req);
    140 }