sdis_mpi.c (3977B)
1 /* Copyright (C) 2016-2025 |Méso|Star> (contact@meso-star.com) 2 * 3 * This program is free software: you can redistribute it and/or modify 4 * it under the terms of the GNU General Public License as published by 5 * the Free Software Foundation, either version 3 of the License, or 6 * (at your option) any later version. 7 * 8 * This program is distributed in the hope that it will be useful, 9 * but WITHOUT ANY WARRANTY; without even the implied warranty of 10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 11 * GNU General Public License for more details. 12 * 13 * You should have received a copy of the GNU General Public License 14 * along with this program. If not, see <http://www.gnu.org/licenses/>. */ 15 16 #define _POSIX_C_SOURCE 199309L /* nanosleep */ 17 18 #include "sdis_c.h" 19 #include "sdis_device_c.h" 20 #include "sdis_mpi.h" 21 22 #include <rsys/mutex.h> 23 24 #include <time.h> /* nanosleep */ 25 #include <sys/time.h> /* struct timespec */ 26 27 /******************************************************************************* 28 * Local functions 29 ******************************************************************************/ 30 void 31 mpi_send_progress(struct sdis_device* dev, const int32_t progress) 32 { 33 ASSERT(dev && dev->use_mpi); 34 (void)dev; 35 36 mutex_lock(dev->mpi_mutex); 37 MPI(Send(&progress, 1/*#data*/, MPI_INT32_T, 0/*dst rank*/, 38 MPI_SDIS_MSG_PROGRESS, MPI_COMM_WORLD)); 39 mutex_unlock(dev->mpi_mutex); 40 } 41 42 void 43 mpi_fetch_progress(struct sdis_device* dev, int32_t progress[]) 44 { 45 int iproc; 46 ASSERT(dev && dev->use_mpi && dev->mpi_rank == 0); 47 48 FOR_EACH(iproc, 1, dev->mpi_nprocs) { 49 /* Flush all progress messages sent by the process `iproc' */ 50 for(;;) { 51 MPI_Request req; 52 int flag; 53 54 /* Query for a progress message */ 55 mutex_lock(dev->mpi_mutex); 56 MPI(Iprobe(iproc, MPI_SDIS_MSG_PROGRESS, MPI_COMM_WORLD, &flag, 57 MPI_STATUS_IGNORE)); 58 mutex_unlock(dev->mpi_mutex); 59 60 if(flag == 0) break; /* No more progress status to receive */ 61 62 /* Asynchronously receive the progress status */ 63 mutex_lock(dev->mpi_mutex); 64 MPI(Irecv(&progress[iproc], 1/*count*/, MPI_INT32_T, iproc, 65 MPI_SDIS_MSG_PROGRESS, MPI_COMM_WORLD, &req)); 66 mutex_unlock(dev->mpi_mutex); 67 68 /* Actively wait for the completion of the receive procedure */ 69 mpi_waiting_for_request(dev, &req); 70 } 71 } 72 } 73 74 void 75 mpi_waiting_for_request(struct sdis_device* dev, MPI_Request* req) 76 { 77 struct timespec t; 78 ASSERT(dev && dev->use_mpi && req); 79 80 /* Setup the suspend time of the process while waiting for a request */ 81 t.tv_sec = 0; 82 t.tv_nsec = 10000000; /* 10ms */ 83 84 /* Wait for process synchronisation */ 85 for(;;) { 86 int complete; 87 88 mutex_lock(dev->mpi_mutex); 89 MPI(Test(req, &complete, MPI_STATUS_IGNORE)); 90 mutex_unlock(dev->mpi_mutex); 91 92 if(complete) break; 93 nanosleep(&t, NULL); 94 } 95 } 96 97 void 98 mpi_waiting_for_message 99 (struct sdis_device* dev, 100 const int iproc, 101 const enum mpi_sdis_message msg, 102 MPI_Status* status) 103 { 104 struct timespec t; 105 ASSERT(dev && dev->use_mpi && status); 106 107 /* Setup the suspend time of the process while waiting for a message */ 108 t.tv_sec = 0; 109 t.tv_nsec = 10000000; /* 10ms */ 110 111 /* Wait for process synchronisation */ 112 for(;;) { 113 int flag; 114 115 /* Asynchronously probe for green function data */ 116 mutex_lock(dev->mpi_mutex); 117 MPI(Iprobe(iproc, msg, MPI_COMM_WORLD, &flag, status)); 118 mutex_unlock(dev->mpi_mutex); 119 120 if(flag) break; 121 nanosleep(&t, NULL); 122 } 123 } 124 125 void 126 mpi_barrier(struct sdis_device* dev) 127 { 128 MPI_Request req; 129 ASSERT(dev && dev->use_mpi); 130 131 /* Asynchronously wait for process completion. Use an asynchronous barrier to 132 * avoid a dead lock if another thread on the same process queries the 133 * mpi_mutex */ 134 135 mutex_lock(dev->mpi_mutex); 136 MPI(Ibarrier(MPI_COMM_WORLD, &req)); 137 mutex_unlock(dev->mpi_mutex); 138 139 mpi_waiting_for_request(dev, &req); 140 }