htrdr_solve_buffer.c (17165B)
1 /* Copyright (C) 2018-2019, 2022-2025 Centre National de la Recherche Scientifique 2 * Copyright (C) 2020-2022 Institut Mines Télécom Albi-Carmaux 3 * Copyright (C) 2022-2025 Institut Pierre-Simon Laplace 4 * Copyright (C) 2022-2025 Institut de Physique du Globe de Paris 5 * Copyright (C) 2018-2025 |Méso|Star> (contact@meso-star.com) 6 * Copyright (C) 2022-2025 Observatoire de Paris 7 * Copyright (C) 2022-2025 Université de Reims Champagne-Ardenne 8 * Copyright (C) 2022-2025 Université de Versaille Saint-Quentin 9 * Copyright (C) 2018-2019, 2022-2025 Université Paul Sabatier 10 * 11 * This program is free software: you can redistribute it and/or modify 12 * it under the terms of the GNU General Public License as published by 13 * the Free Software Foundation, either version 3 of the License, or 14 * (at your option) any later version. 15 * 16 * This program is distributed in the hope that it will be useful, 17 * but WITHOUT ANY WARRANTY; without even the implied warranty of 18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 19 * GNU General Public License for more details. 20 * 21 * You should have received a copy of the GNU General Public License 22 * along with this program. If not, see <http://www.gnu.org/licenses/>. */ 23 24 #include "core/htrdr.h" 25 #include "core/htrdr_c.h" 26 #include "core/htrdr_buffer.h" 27 #include "core/htrdr_proc_work.h" 28 #include "core/htrdr_log.h" 29 #include "core/htrdr_solve_buffer.h" 30 31 #include <rsys/clock_time.h> 32 #include <rsys/cstr.h> 33 #include <rsys/list.h> 34 #include <rsys/math.h> 35 #include <rsys/mutex.h> 36 #include <rsys/ref_count.h> 37 38 #include <star/ssp.h> 39 40 #include <mpi.h> 41 #include <omp.h> 42 43 #define CHUNK_SIZE 32 /* Number of items in one chunk */ 44 45 /* Collection of items */ 46 struct chunk { 47 struct list_node node; 48 struct mem_allocator* allocator; 49 ref_T ref; 50 51 struct chunk_data { 52 size_t item_sz; /* Size of an item */ 53 size_t item_al; /* Item alignment */ 54 uint64_t index; /* Chunk index in chunk space */ 55 /* Simulate the flexible array member of the C99 standard */ 56 char ALIGN(16) items[1/*Dummy*/]; 57 } data; 58 }; 59 60 /******************************************************************************* 61 * Helper functions 62 ******************************************************************************/ 63 static INLINE res_T 64 check_solve_buffer_args(const struct htrdr_solve_buffer_args* args) 65 { 66 if(!args) return RES_BAD_ARG; 67 68 /* A functor must be defined */ 69 if(!args->solve_item) return RES_BAD_ARG; 70 71 /* The number of realisations cannot be null */ 72 if(!args->nrealisations) return RES_BAD_ARG; 73 74 /* The buffer must in one-dimensional */ 75 if(args->buffer_layout.height != 1) return RES_BAD_ARG; 76 77 /* Check buffer layout consistency */ 78 return htrdr_buffer_layout_check(&args->buffer_layout); 79 } 80 81 static INLINE void 82 release_chunk(ref_T* ref) 83 { 84 struct chunk* chunk = CONTAINER_OF(ref, struct chunk, ref); 85 ASSERT(ref); 86 MEM_RM(chunk->allocator, chunk); 87 } 88 89 static INLINE void 90 chunk_ref_get(struct chunk* chunk) 91 { 92 ASSERT(chunk); 93 ref_get(&chunk->ref); 94 } 95 96 static INLINE void 97 chunk_ref_put(struct chunk* chunk) 98 { 99 ASSERT(chunk); 100 ref_put(&chunk->ref, release_chunk); 101 } 102 103 static FINLINE struct chunk* 104 chunk_create 105 (struct mem_allocator* allocator, 106 const size_t item_sz, /* Size in bytes */ 107 const size_t item_al, /* Alignment in bytes */ 108 const uint64_t ichunk) /* Chunk index */ 109 { 110 struct chunk* chunk = NULL; 111 const size_t header_sz = sizeof(*chunk) - 1/*dummy octet in flexible array*/; 112 const size_t buf_sz = CHUNK_SIZE*item_sz; 113 114 /* Pre conditions */ 115 ASSERT(allocator); 116 ASSERT(IS_ALIGNED(item_sz, item_al)); 117 ASSERT(IS_POW2(item_al)); 118 119 chunk = MEM_ALLOC_ALIGNED(allocator, header_sz + buf_sz, 16); 120 if(!chunk) goto error; 121 ref_init(&chunk->ref); 122 list_init(&chunk->node); 123 chunk->allocator = allocator; 124 chunk->data.item_sz = item_sz; 125 chunk->data.item_al = item_al; 126 chunk->data.index = ichunk; 127 CHK(IS_ALIGNED(chunk->data.items, item_al)); 128 129 exit: 130 return chunk; 131 error: 132 if(chunk) { chunk_ref_put(chunk); chunk = NULL; } 133 goto exit; 134 } 135 136 static FINLINE void* 137 chunk_at(struct chunk* chunk, const size_t i) 138 { 139 ASSERT(chunk && i < CHUNK_SIZE); 140 return (void*)(chunk->data.items + i*chunk->data.item_sz); 141 } 142 143 static void 144 release_chunk_list(struct list_node* chunks) 145 { 146 struct list_node* node = NULL; 147 struct list_node* tmp = NULL; 148 ASSERT(chunks); 149 150 LIST_FOR_EACH_SAFE(node, tmp, chunks) { 151 struct chunk* chunk = CONTAINER_OF(node, struct chunk, node); 152 list_del(node); 153 chunk_ref_put(chunk); 154 } 155 } 156 157 static INLINE uint64_t 158 get_chunk 159 (struct htrdr* htrdr, 160 struct ssp_rng* rng, 161 struct proc_work* work) 162 { 163 uint64_t ichunk = CHUNK_ID_NULL; 164 165 /* Make the function critical, as the entire process must be executed 166 * atomically. Indeed, the first thread to query an invalid chunk steals work 167 * from other processes. So, when the function exits, the other threads will 168 * have valid chunks */ 169 #pragma omp critical 170 { 171 ichunk = proc_work_get_chunk(work); 172 if(ichunk == CHUNK_ID_NULL) { /* No more work on this process */ 173 size_t nthieves = 0; 174 175 proc_work_reset(work); 176 nthieves = mpi_steal_work(htrdr, rng, work); 177 if(nthieves != 0) { 178 ichunk = proc_work_get_chunk(work); 179 } 180 } 181 } 182 183 return ichunk; 184 } 185 186 static INLINE void 187 status_update(struct htrdr* htrdr, const int32_t progress) 188 { 189 ASSERT(htrdr); 190 191 #pragma omp critical 192 if(progress > htrdr->mpi_progress_render[0]) { 193 htrdr->mpi_progress_render[0] = progress; 194 195 /* Print update on master process */ 196 if(htrdr->mpi_rank == 0) { 197 update_mpi_progress(htrdr, HTRDR_MPI_PROGRESS_RENDERING); 198 199 /* Send the progress percentage to the master process */ 200 } else { 201 send_mpi_progress(htrdr, HTRDR_MPI_PROGRESS_RENDERING, progress); 202 } 203 } 204 } 205 206 static struct ssp_rng* 207 rng_create 208 (struct htrdr* htrdr, 209 const size_t ithread, 210 const size_t nchunks, 211 const uint64_t ichunk) 212 { 213 struct ssp_rng_proxy_create2_args args = SSP_RNG_PROXY_CREATE2_ARGS_NULL; 214 struct ssp_rng_proxy* proxy = NULL; 215 struct ssp_rng* rng = NULL; 216 struct mem_allocator* allocator = NULL; 217 ASSERT(htrdr); 218 219 allocator = htrdr_get_thread_allocator(htrdr, (size_t)ithread); 220 221 args.type = SSP_RNG_THREEFRY; 222 args.sequence_offset = RNG_SEQUENCE_SIZE * (size_t)ichunk; 223 args.sequence_size = RNG_SEQUENCE_SIZE; 224 args.sequence_pitch = RNG_SEQUENCE_SIZE * nchunks; 225 args.nbuckets = 1; 226 SSP(rng_proxy_create2(allocator, &args, &proxy)); 227 SSP(rng_proxy_create_rng(proxy, 0, &rng)); 228 SSP(rng_proxy_ref_put(proxy)); 229 230 return rng; 231 } 232 233 static void 234 write_chunk_data 235 (struct htrdr* htrdr, 236 struct htrdr_buffer* buf, 237 const struct chunk_data* chunk_data) 238 { 239 struct htrdr_buffer_layout layout = HTRDR_BUFFER_LAYOUT_NULL; 240 char* mem = NULL; 241 size_t iitem = 0; 242 size_t nitems = 0; 243 244 ASSERT(htrdr && buf && chunk_data); 245 ASSERT(chunk_data->index != CHUNK_ID_NULL); 246 (void)htrdr; 247 248 htrdr_buffer_get_layout(buf, &layout); 249 ASSERT(layout.height == 1); 250 ASSERT(layout.elmt_size == chunk_data->item_sz); 251 252 /* Calculate the index of the first item to write */ 253 iitem = chunk_data->index * CHUNK_SIZE; 254 255 /* Define the number of items to write into the buffer */ 256 nitems = MMIN(iitem + CHUNK_SIZE, layout.width) - iitem; 257 258 /* Calculate destination address for writing chunk data */ 259 mem = htrdr_buffer_get_data(buf); 260 mem = mem + iitem*layout.elmt_size; 261 262 /* Write the chunk items into the buffer */ 263 memcpy(mem, chunk_data->items, nitems*layout.elmt_size); 264 } 265 266 static res_T 267 mpi_gather_chunks 268 (struct htrdr* htrdr, 269 const struct htrdr_buffer_layout* layout, 270 struct htrdr_buffer* buf, /* Can be NULL for non master processes */ 271 const size_t nchunks, 272 struct list_node* chunks) 273 { 274 size_t msg_sz = 0; 275 struct list_node* node = NULL; 276 struct chunk* chunk = NULL; /* Temporary chunk */ 277 res_T res = RES_OK; 278 279 /* Pre conditions */ 280 ASSERT(htrdr && layout && chunks); 281 ASSERT(layout->height == 1); 282 ASSERT(htrdr_buffer_layout_check(layout) == RES_OK); 283 284 /* Compute the size of chunk data */ 285 msg_sz = sizeof(struct chunk_data) 286 + CHUNK_SIZE*layout->elmt_size 287 -1 /* Dummy octet of the flexible array */; 288 ASSERT(msg_sz <= INT_MAX); 289 290 /* Non master process: Send the computed chunk to the master process */ 291 if(htrdr->mpi_rank != 0) { 292 LIST_FOR_EACH(node, chunks) { 293 struct chunk* c = CONTAINER_OF(node, struct chunk, node); 294 MPI(Send(&c->data, (int)msg_sz, MPI_CHAR, 0/*Master process*/, 295 HTRDR_MPI_CHUNK_DATA, MPI_COMM_WORLD)); 296 } 297 298 /* Master rrocess */ 299 } else { 300 size_t ichunk = 0; 301 ASSERT(buf); 302 303 #ifndef NDEBUG 304 /* Check that the submitted buffer layout matches the buffer layout */ 305 { 306 struct htrdr_buffer_layout buf_layout = HTRDR_BUFFER_LAYOUT_NULL; 307 htrdr_buffer_get_layout(buf, &buf_layout); 308 ASSERT(htrdr_buffer_layout_eq(&buf_layout, layout)); 309 } 310 #endif 311 312 /* Write data for chunks resolved by the master process */ 313 LIST_FOR_EACH(node, chunks) { 314 struct chunk* c = CONTAINER_OF(node, struct chunk, node); 315 write_chunk_data(htrdr, buf, &c->data); 316 ++ichunk; 317 } 318 319 /* There are chunks unresolved by the master process */ 320 if(ichunk != nchunks) { 321 ASSERT(htrdr->mpi_nprocs > 1); 322 323 /* Create a temporary chunk to receive the chunk data computed by the 324 * non master processes */ 325 chunk = chunk_create 326 (htrdr->allocator, 327 layout->elmt_size, 328 layout->alignment, 329 CHUNK_ID_NULL); /* Dummy chunk id that is going to be overwritten */ 330 if(!chunk) goto error; 331 } 332 333 /* Gather chunks sent by non master processes */ 334 FOR_EACH(ichunk, ichunk, nchunks) { 335 MPI(Recv(&chunk->data, (int)msg_sz, MPI_CHAR, MPI_ANY_SOURCE, 336 HTRDR_MPI_CHUNK_DATA, MPI_COMM_WORLD, MPI_STATUS_IGNORE)); 337 write_chunk_data(htrdr, buf, &chunk->data); 338 } 339 } 340 341 exit: 342 if(chunk) chunk_ref_put(chunk); 343 return res; 344 error: 345 htrdr_log_err(htrdr, "Error while gathering results -- %s\n", 346 res_to_cstr(res)); 347 goto exit; 348 } 349 350 static res_T 351 solve_chunk 352 (struct htrdr* htrdr, 353 const struct htrdr_solve_buffer_args* args, 354 const size_t ithread, 355 const uint64_t ichunk, 356 struct ssp_rng* rng, 357 struct chunk* chunk) 358 { 359 struct htrdr_solve_item_args item_args = HTRDR_SOLVE_ITEM_ARGS_NULL; 360 size_t i = 0; 361 size_t nitems = 0; 362 363 ASSERT(htrdr && args && rng && chunk); 364 ASSERT(args->buffer_layout.height == 1); 365 ASSERT(ichunk * CHUNK_SIZE < args->buffer_layout.width); 366 367 nitems = args->buffer_layout.width; 368 369 /* Setup the shared item arguments */ 370 item_args.rng = rng; 371 item_args.nrealisations = args->nrealisations; 372 item_args.ithread = ithread; 373 item_args.context = args->context; 374 375 FOR_EACH(i, 0, CHUNK_SIZE) { 376 void* item = NULL; 377 size_t item_id = ichunk*CHUNK_SIZE + i; 378 379 if(item_id >= nitems) { 380 /* The item is out of the range, 381 * i.e. we've reached the total number of items to solve */ 382 break; 383 } 384 385 /* Fetch the item */ 386 item = chunk_at(chunk, i); 387 388 /* Setup the item index */ 389 item_args.item_id = item_id; 390 391 /* Solve the item */ 392 args->solve_item(htrdr, &item_args, item); 393 } 394 return RES_OK; 395 } 396 397 static res_T 398 solve_buffer 399 (struct htrdr* htrdr, 400 const struct htrdr_solve_buffer_args* args, 401 const size_t nchunks, /* Total #chunks distributed between processes */ 402 struct proc_work* work, 403 struct list_node* chunks) 404 { 405 struct ssp_rng* rng_proc = NULL; /* Used to sample a working process */ 406 size_t nchunks_proc = 0; /* #chunks of the process */ 407 size_t nthreads = 0; /* #threads to use */ 408 ATOMIC nchunks_solved = 0; /* #chunks solved bu the process */ 409 ATOMIC res = RES_OK; 410 411 /* Pre-conditions */ 412 ASSERT(htrdr && args && work && chunks); 413 414 res = ssp_rng_create(htrdr->allocator, SSP_RNG_MT19937_64, &rng_proc); 415 if(res != RES_OK) goto error; 416 417 nchunks_proc = proc_work_get_nchunks(work); 418 nthreads = MMIN(htrdr->nthreads, nchunks_proc); 419 420 /* The process is not considered as a working process for himself */ 421 htrdr->mpi_working_procs[htrdr->mpi_rank] = 0; 422 --htrdr->mpi_nworking_procs; 423 424 omp_set_num_threads((int)nthreads); 425 #pragma omp parallel 426 for(;;) { 427 /* Chunk */ 428 uint64_t ichunk = get_chunk(htrdr, rng_proc, work); 429 struct chunk* chunk = NULL; 430 431 /* Miscellaneous */ 432 const size_t ithread = (size_t)omp_get_thread_num(); 433 struct ssp_rng* rng = NULL; 434 size_t n = 0; 435 int32_t pcent = 0; 436 res_T res_local = RES_OK; 437 438 if(ichunk == CHUNK_ID_NULL) break; /* No more work */ 439 440 chunk = chunk_create 441 (htrdr->allocator, 442 args->buffer_layout.elmt_size, 443 args->buffer_layout.alignment, 444 ichunk); 445 if(!chunk) { 446 ATOMIC_SET(&res, RES_MEM_ERR); 447 break; 448 } 449 450 #pragma omp critical 451 list_add_tail(chunks, &chunk->node); /* Register the chunk */ 452 453 rng = rng_create(htrdr, ithread, nchunks, ichunk); 454 res_local = solve_chunk(htrdr, args, ithread, ichunk, rng, chunk); 455 456 SSP(rng_ref_put(rng)); 457 if(res_local != RES_OK) { 458 ATOMIC_SET(&res, res_local); 459 break; 460 } 461 462 /* Status update */ 463 n = (size_t)ATOMIC_INCR(&nchunks_solved); 464 pcent = (int32_t)((double)n * 100.0 / (double)nchunks_proc + 0.5/*round*/); 465 status_update(htrdr, pcent); 466 } 467 468 if(ATOMIC_GET(&res) != RES_OK) goto error; 469 470 /* Asynchronously wait for processes completion. Use an asynchronous barrier to 471 * avoid a dead lock with the `mpi_probe_thieves' thread that requires also 472 * the `mpi_mutex'. */ 473 { 474 MPI_Request req; 475 476 mutex_lock(htrdr->mpi_mutex); 477 MPI(Ibarrier(MPI_COMM_WORLD, &req)); 478 mutex_unlock(htrdr->mpi_mutex); 479 480 mpi_wait_for_request(htrdr, &req); 481 } 482 483 exit: 484 if(rng_proc) SSP(rng_ref_put(rng_proc)); 485 return (res_T)res; 486 error: 487 htrdr_log_err(htrdr, "Error while solving buffer -- %s\n", 488 res_to_cstr((res_T)res)); 489 goto exit; 490 } 491 492 /******************************************************************************* 493 * Local functions 494 ******************************************************************************/ 495 res_T 496 htrdr_solve_buffer 497 (struct htrdr* htrdr, 498 const struct htrdr_solve_buffer_args* args, 499 struct htrdr_buffer* buf) 500 { 501 /* Time registration */ 502 char strbuf[128]; 503 struct time t0, t1; 504 505 /* Chunks */ 506 struct list_node chunks; /* List of solved chunks */ 507 size_t nchunks = 0; /* Overall number of chunks */ 508 size_t nchunks_proc = 0; /* #chunks for the current proc*/ 509 size_t nchunks_remain = 0; /* Remaining #chunks to distribute between procs */ 510 511 /* Miscellaneous */ 512 struct proc_work work; 513 size_t nitems = 0; /* Number of Monte Carlo estimations */ 514 size_t i = 0; 515 ATOMIC probe_thieves = 1; /* Boolean that controls thieves' polling */ 516 517 res_T res = RES_OK; 518 ASSERT(htrdr && check_solve_buffer_args(args) == RES_OK); 519 ASSERT(htrdr->mpi_rank != 0 || buf); 520 521 list_init(&chunks); 522 proc_work_init(htrdr->allocator, &work); 523 524 nitems = args->buffer_layout.width; 525 nchunks = (nitems + (CHUNK_SIZE-1)/*ceil*/) / CHUNK_SIZE; 526 nchunks_proc = nchunks / (size_t)htrdr->mpi_nprocs; 527 nchunks_remain = nchunks - nchunks_proc*(size_t)htrdr->mpi_nprocs; 528 529 /* Distribute the remaining chunks among the processes. Each process whose 530 * rank is lower than the number of remaining chunks takes an additional 531 * chunk */ 532 if((size_t)htrdr->mpi_rank < nchunks_remain) { 533 ++nchunks_proc; 534 } 535 536 /* Register the list of chunks to be processed by the current process */ 537 FOR_EACH(i, 0, nchunks_proc) { 538 size_t ichunk = i * (size_t)htrdr->mpi_nprocs + (size_t)htrdr->mpi_rank; 539 proc_work_add_chunk(&work, (uint64_t)ichunk); 540 } 541 542 /* On the master process, request and print the progress report, since the 543 * other processes have been able to start the calculation */ 544 if(htrdr->mpi_rank == 0) { 545 fetch_mpi_progress(htrdr, HTRDR_MPI_PROGRESS_RENDERING); 546 print_mpi_progress(htrdr, HTRDR_MPI_PROGRESS_RENDERING); 547 } 548 549 /* Start of calculation time recording */ 550 time_current(&t0); 551 552 /* Enable nested threads to enable parallelization of the solve function */ 553 omp_set_nested(1); 554 555 #pragma omp parallel sections num_threads(2) 556 { 557 /* Polling of steal queries */ 558 #pragma omp section 559 mpi_probe_thieves(htrdr, &work, &probe_thieves); 560 561 #pragma omp section 562 { 563 solve_buffer(htrdr, args, nchunks, &work, &chunks); 564 /* The process have no more work to do. Stop polling for thieves */ 565 ATOMIC_SET(&probe_thieves, 0); 566 } 567 } 568 569 if(htrdr->mpi_rank == 0) { 570 update_mpi_progress(htrdr, HTRDR_MPI_PROGRESS_RENDERING); 571 htrdr_log(htrdr, "\n"); /* Add a new line after the progress statuses */ 572 } 573 574 /* Stop time recording */ 575 time_sub(&t0, time_current(&t1), &t0); 576 time_dump(&t0, TIME_ALL, NULL, strbuf, sizeof(strbuf)); 577 htrdr_log(htrdr, "Calculation time: %s\n", strbuf); 578 579 /* Gather chunks on master process */ 580 time_current(&t0); 581 res = mpi_gather_chunks(htrdr, &args->buffer_layout, buf, nchunks, &chunks); 582 if(res != RES_OK) goto error; 583 time_sub(&t0, time_current(&t1), &t0); 584 time_dump(&t0, TIME_ALL, NULL, strbuf, sizeof(strbuf)); 585 htrdr_log(htrdr, "Time to gather results: %s\n", strbuf); 586 587 exit: 588 release_chunk_list(&chunks); 589 proc_work_release(&work); 590 return res; 591 error: 592 goto exit; 593 }