htrdr.c (17308B)
1 /* Copyright (C) 2018-2019, 2022-2025 Centre National de la Recherche Scientifique 2 * Copyright (C) 2020-2022 Institut Mines Télécom Albi-Carmaux 3 * Copyright (C) 2022-2025 Institut Pierre-Simon Laplace 4 * Copyright (C) 2022-2025 Institut de Physique du Globe de Paris 5 * Copyright (C) 2018-2025 |Méso|Star> (contact@meso-star.com) 6 * Copyright (C) 2022-2025 Observatoire de Paris 7 * Copyright (C) 2022-2025 Université de Reims Champagne-Ardenne 8 * Copyright (C) 2022-2025 Université de Versaille Saint-Quentin 9 * Copyright (C) 2018-2019, 2022-2025 Université Paul Sabatier 10 * 11 * This program is free software: you can redistribute it and/or modify 12 * it under the terms of the GNU General Public License as published by 13 * the Free Software Foundation, either version 3 of the License, or 14 * (at your option) any later version. 15 * 16 * This program is distributed in the hope that it will be useful, 17 * but WITHOUT ANY WARRANTY; without even the implied warranty of 18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 19 * GNU General Public License for more details. 20 * 21 * You should have received a copy of the GNU General Public License 22 * along with this program. If not, see <http://www.gnu.org/licenses/>. */ 23 24 #define _POSIX_C_SOURCE 200809L /* stat.st_time support */ 25 26 #include "core/htrdr.h" 27 #include "core/htrdr_c.h" 28 #include "core/htrdr_args.h" 29 #include "core/htrdr_log.h" 30 #include "core/htrdr_version.h" 31 32 #include <rsys/cstr.h> 33 #include <rsys/mem_allocator.h> 34 #include <rsys/str.h> 35 36 #include <star/s3d.h> 37 #include <star/ssf.h> 38 39 #include <errno.h> 40 #include <fcntl.h> /* open */ 41 #include <libgen.h> /* basename */ 42 #include <stdarg.h> 43 #include <stdio.h> 44 #include <unistd.h> 45 #include <time.h> 46 #include <sys/time.h> /* timespec */ 47 #include <sys/stat.h> /* S_IRUSR & S_IWUSR */ 48 49 #include <omp.h> 50 #include <mpi.h> 51 52 /******************************************************************************* 53 * Helper functions 54 ******************************************************************************/ 55 static const char* 56 mpi_thread_support_string(const int val) 57 { 58 switch(val) { 59 case MPI_THREAD_SINGLE: return "MPI_THREAD_SINGLE"; 60 case MPI_THREAD_FUNNELED: return "MPI_THREAD_FUNNELED"; 61 case MPI_THREAD_SERIALIZED: return "MPI_THREAD_SERIALIZED"; 62 case MPI_THREAD_MULTIPLE: return "MPI_THREAD_MULTIPLE"; 63 default: FATAL("Unreachable code.\n"); break; 64 } 65 } 66 67 static const char* 68 mpi_error_string(struct htrdr* htrdr, const int mpi_err) 69 { 70 const int ithread = omp_get_thread_num(); 71 char* str; 72 int strlen_err; 73 int err; 74 ASSERT(htrdr && (size_t)ithread < htrdr->nthreads); 75 str = htrdr->mpi_err_str + ithread*MPI_MAX_ERROR_STRING; 76 err = MPI_Error_string(mpi_err, str, &strlen_err); 77 return err == MPI_SUCCESS ? str : "Invalid MPI error"; 78 } 79 80 static void 81 release_mpi(struct htrdr* htrdr) 82 { 83 ASSERT(htrdr); 84 if(htrdr->mpi_working_procs) { 85 MEM_RM(htrdr->allocator, htrdr->mpi_working_procs); 86 htrdr->mpi_working_procs = NULL; 87 } 88 if(htrdr->mpi_progress_octree) { 89 MEM_RM(htrdr->allocator, htrdr->mpi_progress_octree); 90 htrdr->mpi_progress_octree = NULL; 91 } 92 if(htrdr->mpi_progress_render) { 93 MEM_RM(htrdr->allocator, htrdr->mpi_progress_render); 94 htrdr->mpi_progress_render = NULL; 95 } 96 if(htrdr->mpi_err_str) { 97 MEM_RM(htrdr->allocator, htrdr->mpi_err_str); 98 htrdr->mpi_err_str = NULL; 99 } 100 if(htrdr->mpi_mutex) { 101 mutex_destroy(htrdr->mpi_mutex); 102 htrdr->mpi_mutex = NULL; 103 } 104 } 105 106 static res_T 107 mpi_print_proc_info(struct htrdr* htrdr) 108 { 109 char proc_name[MPI_MAX_PROCESSOR_NAME]; 110 int proc_name_len; 111 char* proc_names = NULL; 112 uint32_t* proc_nthreads = NULL; 113 uint32_t nthreads = 0; 114 int iproc; 115 res_T res = RES_OK; 116 ASSERT(htrdr); 117 118 if(htrdr->mpi_rank == 0) { 119 proc_names = MEM_CALLOC(htrdr->allocator, (size_t)htrdr->mpi_nprocs, 120 MPI_MAX_PROCESSOR_NAME*sizeof(*proc_names)); 121 if(!proc_names) { 122 res = RES_MEM_ERR; 123 htrdr_log_err(htrdr, 124 "could not allocate the temporary memory for MPI process names -- " 125 "%s.\n", res_to_cstr(res)); 126 goto error; 127 } 128 129 proc_nthreads = MEM_CALLOC(htrdr->allocator, (size_t)htrdr->mpi_nprocs, 130 sizeof(*proc_nthreads)); 131 if(!proc_nthreads) { 132 res = RES_MEM_ERR; 133 htrdr_log_err(htrdr, 134 "could not allocate the temporary memory for the #threads of the MPI " 135 "processes -- %s.\n", res_to_cstr(res)); 136 goto error; 137 } 138 } 139 140 /* Gather process name */ 141 MPI(Get_processor_name(proc_name, &proc_name_len)); 142 MPI(Gather(proc_name, MPI_MAX_PROCESSOR_NAME, MPI_CHAR, proc_names, 143 MPI_MAX_PROCESSOR_NAME, MPI_CHAR, 0, MPI_COMM_WORLD)); 144 145 /* Gather process #threads */ 146 nthreads = (uint32_t)htrdr->nthreads; 147 MPI(Gather(&nthreads, 1, MPI_UINT32_T, proc_nthreads, 1, MPI_UINT32_T, 0, 148 MPI_COMM_WORLD)); 149 150 if(htrdr->mpi_rank == 0) { 151 FOR_EACH(iproc, 0, htrdr->mpi_nprocs) { 152 htrdr_log(htrdr, "Process %d -- %s; #threads: %u\n", 153 iproc, proc_names + iproc*MPI_MAX_PROCESSOR_NAME, proc_nthreads[iproc]); 154 } 155 } 156 157 exit: 158 if(proc_names) MEM_RM(htrdr->allocator, proc_names); 159 if(proc_nthreads) MEM_RM(htrdr->allocator, proc_nthreads); 160 return res; 161 error: 162 goto exit; 163 } 164 165 static res_T 166 init_mpi(struct htrdr* htrdr) 167 { 168 size_t n; 169 int err; 170 res_T res = RES_OK; 171 ASSERT(htrdr); 172 173 htrdr->mpi_err_str = MEM_CALLOC 174 (htrdr->allocator, htrdr->nthreads, MPI_MAX_ERROR_STRING); 175 if(!htrdr->mpi_err_str) { 176 res = RES_MEM_ERR; 177 htrdr_log_err(htrdr, 178 "could not allocate the MPI error strings -- %s.\n", 179 res_to_cstr(res)); 180 goto error; 181 } 182 183 err = MPI_Comm_rank(MPI_COMM_WORLD, &htrdr->mpi_rank); 184 if(err != MPI_SUCCESS) { 185 htrdr_log_err(htrdr, 186 "could not determine the MPI rank of the calling process -- %s.\n", 187 mpi_error_string(htrdr, err)); 188 res = RES_UNKNOWN_ERR; 189 goto error; 190 } 191 192 err = MPI_Comm_size(MPI_COMM_WORLD, &htrdr->mpi_nprocs); 193 if(err != MPI_SUCCESS) { 194 htrdr_log_err(htrdr, 195 "could retrieve the size of the MPI group -- %s.\n", 196 mpi_error_string(htrdr, err)); 197 res = RES_UNKNOWN_ERR; 198 goto error; 199 } 200 201 htrdr->mpi_working_procs = MEM_CALLOC(htrdr->allocator, 202 (size_t)htrdr->mpi_nprocs, sizeof(*htrdr->mpi_working_procs)); 203 if(!htrdr->mpi_working_procs) { 204 htrdr_log_err(htrdr, 205 "could not allocate the list of working processes.\n"); 206 res = RES_MEM_ERR; 207 goto error; 208 } 209 210 /* Initialy, all the processes are working */ 211 htrdr->mpi_nworking_procs = (size_t)htrdr->mpi_nprocs; 212 memset(htrdr->mpi_working_procs, 0xFF, 213 htrdr->mpi_nworking_procs*sizeof(*htrdr->mpi_working_procs)); 214 215 /* Allocate #processes progress statuses on the master process and only 1 216 * progress status on the other ones: the master process will gather the 217 * status of the other processes to report their progression. */ 218 n = (size_t)(htrdr->mpi_rank == 0 ? htrdr->mpi_nprocs : 1); 219 220 htrdr->mpi_progress_octree = MEM_CALLOC 221 (htrdr->allocator, n, sizeof(*htrdr->mpi_progress_octree)); 222 if(!htrdr->mpi_progress_octree) { 223 res = RES_MEM_ERR; 224 htrdr_log_err(htrdr, 225 "could not allocate the progress state of the octree building -- %s.\n", 226 res_to_cstr(res)); 227 goto error; 228 } 229 230 htrdr->mpi_progress_render = MEM_CALLOC 231 (htrdr->allocator, n, sizeof(*htrdr->mpi_progress_render)); 232 if(!htrdr->mpi_progress_render) { 233 res = RES_MEM_ERR; 234 htrdr_log_err(htrdr, 235 "could not allocate the progress state of the scene rendering -- %s.\n", 236 res_to_cstr(res)); 237 goto error; 238 } 239 240 htrdr->mpi_mutex = mutex_create(); 241 if(!htrdr->mpi_mutex) { 242 res = RES_MEM_ERR; 243 htrdr_log_err(htrdr, 244 "could not create the mutex to protect MPI calls from concurrent " 245 "threads -- %s.\n", res_to_cstr(res)); 246 goto error; 247 } 248 249 mpi_print_proc_info(htrdr); 250 251 exit: 252 return res; 253 error: 254 release_mpi(htrdr); 255 goto exit; 256 } 257 258 static void 259 release_htrdr(ref_T* ref) 260 { 261 struct htrdr* htrdr = CONTAINER_OF(ref, struct htrdr, ref); 262 ASSERT(ref); 263 264 if(htrdr->s3d) S3D(device_ref_put(htrdr->s3d)); 265 release_mpi(htrdr); 266 if(htrdr->lifo_allocators) { 267 size_t i; 268 FOR_EACH(i, 0, htrdr->nthreads) { 269 mem_shutdown_lifo_allocator(&htrdr->lifo_allocators[i]); 270 } 271 MEM_RM(htrdr->allocator, htrdr->lifo_allocators); 272 } 273 logger_release(&htrdr->logger); 274 275 MEM_RM(htrdr->allocator, htrdr); 276 } 277 278 /******************************************************************************* 279 * Exported functions 280 ******************************************************************************/ 281 res_T 282 htrdr_mpi_init(int argc, char** argv) 283 { 284 char str[MPI_MAX_ERROR_STRING]; 285 int thread_support; 286 int len; 287 int err = 0; 288 res_T res = RES_OK; 289 290 err = MPI_Init_thread(&argc, &argv, MPI_THREAD_SERIALIZED, &thread_support); 291 if(err != MPI_SUCCESS) { 292 MPI_Error_string(err, str, &len); 293 fprintf(stderr, "Error initializing MPI -- %s.\n", str); 294 res = RES_UNKNOWN_ERR; 295 goto error; 296 } 297 298 if(thread_support != MPI_THREAD_SERIALIZED) { 299 fprintf(stderr, "The provided MPI implementation does not support " 300 "serialized API calls from multiple threads. Provided thread support: " 301 "%s.\n", mpi_thread_support_string(thread_support)); 302 res = RES_BAD_OP; 303 goto error; 304 } 305 306 exit: 307 return res; 308 error: 309 goto exit; 310 } 311 312 void 313 htrdr_mpi_finalize(void) 314 { 315 MPI_Finalize(); 316 } 317 318 res_T 319 htrdr_create 320 (struct mem_allocator* mem_allocator, 321 const struct htrdr_args* args, 322 struct htrdr** out_htrdr) 323 { 324 struct mem_allocator* allocator = NULL; 325 struct htrdr* htrdr = NULL; 326 size_t ithread; 327 int nthreads_max; 328 res_T res = RES_OK; 329 ASSERT(args && out_htrdr); 330 331 allocator = mem_allocator ? mem_allocator : &mem_default_allocator; 332 htrdr = MEM_CALLOC(allocator, 1, sizeof(*htrdr)); 333 if(!htrdr) { 334 fprintf(stderr, HTRDR_LOG_ERROR_PREFIX 335 "Could not allocate the htrdr data structure.\n"); 336 res = RES_MEM_ERR; 337 goto error; 338 } 339 htrdr->allocator = allocator; 340 ref_init(&htrdr->ref); 341 nthreads_max = MMAX(omp_get_max_threads(), omp_get_num_procs()); 342 htrdr->verbose = args->verbose; 343 htrdr->nthreads = MMIN(args->nthreads, (unsigned)nthreads_max); 344 345 setup_logger(htrdr); 346 347 res = init_mpi(htrdr); 348 if(res != RES_OK) goto error; 349 350 /* Disable the Star-3D verbosity since the Embree backend prints some messages 351 * on stdout rather than stderr. This is annoying since stdout may be used by 352 * htrdr to write output data */ 353 res = s3d_device_create(&htrdr->logger, htrdr->allocator, 0, &htrdr->s3d); 354 if(res != RES_OK) { 355 htrdr_log_err(htrdr, 356 "%s: could not create the Star-3D device -- %s.\n", 357 FUNC_NAME, res_to_cstr(res)); 358 goto error; 359 } 360 361 htrdr->lifo_allocators = MEM_CALLOC 362 (htrdr->allocator, htrdr->nthreads, sizeof(*htrdr->lifo_allocators)); 363 if(!htrdr->lifo_allocators) { 364 res = RES_MEM_ERR; 365 htrdr_log_err(htrdr, 366 "%s: could not allocate the list of per thread LIFO allocator -- %s.\n", 367 FUNC_NAME, res_to_cstr(res)); 368 goto error; 369 } 370 371 FOR_EACH(ithread, 0, htrdr->nthreads) { 372 const size_t per_thread_size = 373 4*1024*1024 /* 4 MB for the RNG cache */ 374 + 16*1024; /* 16 KB for remaining allocations */ 375 376 res = mem_init_lifo_allocator 377 (&htrdr->lifo_allocators[ithread], htrdr->allocator, per_thread_size); 378 if(res != RES_OK) { 379 htrdr_log_err(htrdr, 380 "%s: could not initialise the LIFO allocator of the thread %lu -- %s.\n", 381 FUNC_NAME, (unsigned long)ithread, res_to_cstr(res)); 382 goto error; 383 } 384 } 385 386 exit: 387 *out_htrdr = htrdr; 388 return res; 389 error: 390 if(htrdr) { 391 htrdr_ref_put(htrdr); 392 htrdr = NULL; 393 } 394 goto exit; 395 } 396 397 void 398 htrdr_ref_get(struct htrdr* htrdr) 399 { 400 ASSERT(htrdr); 401 ref_get(&htrdr->ref); 402 } 403 404 void 405 htrdr_ref_put(struct htrdr* htrdr) 406 { 407 ASSERT(htrdr); 408 ref_put(&htrdr->ref, release_htrdr); 409 } 410 411 size_t 412 htrdr_get_threads_count(const struct htrdr* htrdr) 413 { 414 ASSERT(htrdr); 415 return htrdr->nthreads; 416 } 417 418 size_t 419 htrdr_get_procs_count(const struct htrdr* htrdr) 420 { 421 ASSERT(htrdr); 422 return (size_t)htrdr->mpi_nprocs; 423 } 424 425 int 426 htrdr_get_mpi_rank(const struct htrdr* htrdr) 427 { 428 ASSERT(htrdr); 429 return htrdr->mpi_rank; 430 } 431 432 struct mem_allocator* 433 htrdr_get_allocator(struct htrdr* htrdr) 434 { 435 ASSERT(htrdr); 436 return htrdr->allocator; 437 } 438 439 struct mem_allocator* 440 htrdr_get_thread_allocator(struct htrdr* htrdr, const size_t ithread) 441 { 442 ASSERT(htrdr && ithread < htrdr_get_threads_count(htrdr)); 443 return htrdr->lifo_allocators + ithread; 444 } 445 446 struct logger* 447 htrdr_get_logger(struct htrdr* htrdr) 448 { 449 ASSERT(htrdr); 450 return &htrdr->logger; 451 } 452 453 int 454 htrdr_get_verbosity_level(const struct htrdr* htrdr) 455 { 456 ASSERT(htrdr); 457 return htrdr->verbose; 458 } 459 460 struct s3d_device* 461 htrdr_get_s3d(struct htrdr* htrdr) 462 { 463 ASSERT(htrdr); 464 return htrdr->s3d; 465 } 466 467 res_T 468 htrdr_open_output_stream 469 (struct htrdr* htrdr, 470 const char* filename, 471 const int read, 472 int force_overwrite, 473 FILE** out_fp) 474 { 475 FILE* fp = NULL; 476 int fd = -1; 477 const char* mode; 478 res_T res = RES_OK; 479 ASSERT(htrdr && filename && out_fp); 480 481 mode = read ? "w+" : "w"; 482 483 if(force_overwrite) { 484 fp = fopen(filename, mode); 485 if(!fp) { 486 htrdr_log_err(htrdr, "could not open the output file `%s'.\n", filename); 487 goto error; 488 } 489 } else { 490 const int access_flags = read ? O_RDWR : O_WRONLY; 491 fd = open(filename, O_CREAT|O_EXCL|O_TRUNC|access_flags, S_IRUSR|S_IWUSR); 492 if(fd >= 0) { 493 fp = fdopen(fd, mode); 494 if(fp == NULL) { 495 htrdr_log_err(htrdr, "could not open the output file `%s'.\n", filename); 496 goto error; 497 } 498 } else if(errno == EEXIST) { 499 htrdr_log_err(htrdr, "the output file `%s' already exists. \n", 500 filename); 501 goto error; 502 } else { 503 htrdr_log_err(htrdr, 504 "unexpected error while opening the output file `%s'.\n", filename); 505 goto error; 506 } 507 } 508 exit: 509 *out_fp = fp; 510 return res; 511 error: 512 res = RES_IO_ERR; 513 if(fp) { 514 CHK(fclose(fp) == 0); 515 fp = NULL; 516 } else if(fd >= 0) { 517 CHK(close(fd) == 0); 518 } 519 goto exit; 520 } 521 522 /******************************************************************************* 523 * Local functions 524 ******************************************************************************/ 525 void 526 send_mpi_progress 527 (struct htrdr* htrdr, const enum htrdr_mpi_message msg, int32_t percent) 528 { 529 ASSERT(htrdr); 530 ASSERT(msg == HTRDR_MPI_PROGRESS_RENDERING); 531 (void)htrdr; 532 mutex_lock(htrdr->mpi_mutex); 533 MPI(Send(&percent, 1, MPI_INT32_T, 0, msg, MPI_COMM_WORLD)); 534 mutex_unlock(htrdr->mpi_mutex); 535 } 536 537 void 538 fetch_mpi_progress(struct htrdr* htrdr, const enum htrdr_mpi_message msg) 539 { 540 struct timespec t; 541 int32_t* progress = NULL; 542 int iproc; 543 ASSERT(htrdr && htrdr->mpi_rank == 0); 544 545 t.tv_sec = 0; 546 t.tv_nsec = 10000000; /* 10ms */ 547 548 switch(msg) { 549 case HTRDR_MPI_PROGRESS_RENDERING: 550 progress = htrdr->mpi_progress_render; 551 break; 552 default: FATAL("Unreachable code.\n"); break; 553 } 554 555 FOR_EACH(iproc, 1, htrdr->mpi_nprocs) { 556 /* Flush the last sent percentage of the process `iproc' */ 557 for(;;) { 558 MPI_Request req; 559 int flag; 560 int complete; 561 562 mutex_lock(htrdr->mpi_mutex); 563 MPI(Iprobe(iproc, msg, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE)); 564 mutex_unlock(htrdr->mpi_mutex); 565 566 if(flag == 0) break; /* No more message */ 567 568 mutex_lock(htrdr->mpi_mutex); 569 MPI(Irecv(&progress[iproc], 1, MPI_INT32_T, iproc, msg, MPI_COMM_WORLD, &req)); 570 mutex_unlock(htrdr->mpi_mutex); 571 for(;;) { 572 mutex_lock(htrdr->mpi_mutex); 573 MPI(Test(&req, &complete, MPI_STATUS_IGNORE)); 574 mutex_unlock(htrdr->mpi_mutex); 575 if(complete) break; 576 nanosleep(&t, NULL); 577 } 578 } 579 } 580 } 581 582 void 583 print_mpi_progress(struct htrdr* htrdr, const enum htrdr_mpi_message msg) 584 { 585 ASSERT(htrdr && htrdr->mpi_rank == 0); 586 587 if(htrdr->mpi_nprocs == 1) { 588 switch(msg) { 589 case HTRDR_MPI_PROGRESS_RENDERING: 590 htrdr_log(htrdr, "\33[2K\r"); /* Erase the line */ 591 htrdr_log(htrdr, "Rendering: %3d%%\r", htrdr->mpi_progress_render[0]); 592 break; 593 default: FATAL("Unreachable code.\n"); break; 594 } 595 } else { 596 int iproc; 597 FOR_EACH(iproc, 0, htrdr->mpi_nprocs) { 598 switch(msg) { 599 case HTRDR_MPI_PROGRESS_RENDERING: 600 htrdr_log(htrdr, "Process %d -- rendering: %3d%%%c", 601 iproc, htrdr->mpi_progress_render[iproc], 602 iproc == htrdr->mpi_nprocs - 1 ? '\r' : '\n'); 603 break; 604 default: FATAL("Unreachable code.\n"); break; 605 } 606 } 607 } 608 } 609 610 void 611 clear_mpi_progress(struct htrdr* htrdr, const enum htrdr_mpi_message msg) 612 { 613 ASSERT(htrdr); 614 (void)msg; 615 if(htrdr->mpi_nprocs > 1) { 616 int iproc; 617 FOR_EACH(iproc, 0, htrdr->mpi_nprocs) { 618 htrdr_log(htrdr, "\33[2K\r"); /* Erase the line */ 619 if(iproc != htrdr->mpi_nprocs-1) { 620 htrdr_log(htrdr, "\033[1A\r"); /* Move up */ 621 } 622 } 623 } 624 } 625 626 int 627 total_mpi_progress(const struct htrdr* htrdr, const enum htrdr_mpi_message msg) 628 { 629 const int* progress = NULL; 630 int total = 0; 631 int iproc; 632 ASSERT(htrdr && htrdr->mpi_rank == 0); 633 634 switch(msg) { 635 case HTRDR_MPI_PROGRESS_RENDERING: 636 progress = htrdr->mpi_progress_render; 637 break; 638 default: FATAL("Unreachable code.\n"); break; 639 } 640 641 FOR_EACH(iproc, 0, htrdr->mpi_nprocs) { 642 total += progress[iproc]; 643 } 644 total = total / htrdr->mpi_nprocs; 645 return total; 646 } 647