commit 3790b13ee38cfce6f8f6aa25f59c80c66124cdd5
parent 6d0f65e757e3840588b0de5175400993ccc857f2
Author: Vincent Forest <vincent.forest@meso-star.com>
Date: Tue, 1 Dec 2020 14:59:23 +0100
Upd how RNG states are managed by a proxy RNG
Previously, RNG states are saved in a temporary file that can increase
infinitely. The size of these files can be huge leading to excessive
disk space consumption and issues on systems where file offsets are
encoded on 32 bits.
This commit fully rewrite the caching mechanism. RNG states are now
saved in a file in a circular manner, i.e. when the file size reaches a
given hard-coded limit, the states are written at the beginning of the
file, excepted if it overlaps an non-read state. In this situation, the
states are no more saved from the RNG proxy to a stream but are computed
on the fly by the bucket RNG.
Diffstat:
| M | src/ssp_rng_proxy.c | | | 200 | ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------------- |
1 file changed, 167 insertions(+), 33 deletions(-)
diff --git a/src/ssp_rng_proxy.c b/src/ssp_rng_proxy.c
@@ -31,6 +31,7 @@
#include "ssp_rng_c.h"
+#include <rsys/dynamic_array_char.h>
#include <rsys/mutex.h>
#include <rsys/ref_count.h>
#include <rsys/signal.h>
@@ -38,12 +39,19 @@
#include <limits.h>
-#define BUCKET_SIZE_DEFAULT 1000000
+#define BUCKET_SIZE_DEFAULT 1000000 /* #RNs per bucket */
+#define STATE_CACHE_HINT_MAX_SIZE (32*(1024*1024)) /* 32 MB */
-/* FIFO list of RNG states */
+/* Cache of RNG states */
struct rng_state_cache {
- FILE* stream; /* Stream in which the RNG states are stored */
- long head, tail; /* Position onto the head/tail `states' stream */
+ struct darray_char state; /* Save the next RNG state with 'no_wstream' */
+ struct darray_char state_scratch; /* Scracth state buffer */
+ FILE* stream; /* Stream into which the RNG states are stored */
+ size_t state_pitch; /* #RNs between 2 cached states */
+ size_t nstates; /* #cached states */
+ long read, write; /* Offset into the stream where to read/write RNG states */
+ int no_wstream; /* Define if the RNG states are no more written to a stream */
+ int no_rstream; /* Define if the RNG states are no more read from a stream */
};
CLBK(rng_proxy_cb_T, ARG1(const struct ssp_rng_proxy*));
@@ -88,7 +96,7 @@ struct ssp_rng_proxy {
};
/* Return a RNG with a pool of `bucket_size' indenpendant random numbers. Each
- * pool are ensure to be independant per `bucket_id' in [0, N) and per function
+ * pool are ensured to be independant per `bucket_id' in [0, N) and per function
* call, i.e. calling this function X times with the same bucket_id will
* provide X different random pools.
*
@@ -107,16 +115,58 @@ rng_proxy_next_ran_pool
(struct ssp_rng_proxy* proxy,
const size_t bucket_id);
+/* Write the RNG state into buf. State data are terminated by a null char */
+static res_T
+rng_write_cstr
+ (const struct ssp_rng* rng,
+ struct darray_char* buf,
+ size_t* out_len) /* May be NULL. String length without the null char */
+{
+ size_t len;
+ res_T res = RES_OK;
+ ASSERT(rng && buf);
+
+ /* Write the RNG state into a temporary buffer */
+ res = ssp_rng_write_cstr
+ (rng, darray_char_data_get(buf), darray_char_size_get(buf), &len);
+ if(res != RES_OK) goto error;
+
+ /* Not sufficient space to store the state */
+ if(len >= darray_char_size_get(buf)) {
+ res = darray_char_resize(buf, len + 1/*null char*/);
+ if(res != RES_OK) goto error;
+
+ res = ssp_rng_write_cstr
+ (rng, darray_char_data_get(buf), darray_char_size_get(buf), &len);
+ if(res != RES_OK) goto error;
+ ASSERT(len + 1/*null char*/ == darray_char_size_get(buf));
+ }
+
+ if(out_len) *out_len = len;
+
+exit:
+ return res;
+error:
+ goto exit;
+}
+
/*******************************************************************************
* Cache of RNG states
******************************************************************************/
static res_T
-rng_state_cache_init(struct rng_state_cache* cache)
+rng_state_cache_init
+ (struct mem_allocator* allocator,
+ const size_t state_pitch, /* #RNs between cached states */
+ struct rng_state_cache* cache)
{
ASSERT(cache);
+ memset(cache, 0, sizeof(*cache));
+ darray_char_init(allocator, &cache->state);
+ darray_char_init(allocator, &cache->state_scratch);
cache->stream = tmpfile();
if(!cache->stream) return RES_IO_ERR;
- cache->head = cache->tail = ftell(cache->stream);
+ cache->read = cache->write = ftell(cache->stream);
+ cache->state_pitch = state_pitch;
return RES_OK;
}
@@ -125,48 +175,126 @@ rng_state_cache_release(struct rng_state_cache* cache)
{
ASSERT(cache);
if(cache->stream) fclose(cache->stream);
+ darray_char_release(&cache->state);
+ darray_char_release(&cache->state_scratch);
}
static char
rng_state_cache_is_empty(struct rng_state_cache* cache)
{
ASSERT(cache);
- return cache->head == cache->tail;
+ return cache->nstates == 0;
}
static res_T
rng_state_cache_read(struct rng_state_cache* cache, struct ssp_rng* rng)
{
- res_T res;
+ res_T res = RES_OK;
ASSERT(cache && rng && !rng_state_cache_is_empty(cache));
- /* Read the rng state from the stream */
- fseek(cache->stream, cache->head, SEEK_SET);
- res = ssp_rng_read(rng, cache->stream);
- if(res != RES_OK) return res;
- cache->head = ftell(cache->stream);
-
- /* Flush the state cache stream if there is no more RNG state */
- if(rng_state_cache_is_empty(cache)) {
+ if(!cache->no_rstream
+ && cache->read == cache->write
+ && cache->nstates == 1/* A state is saved in 'cache->state' */) {
+ /* There is not more data cached into the stream. Close the stream and do
+ * not rely anymore on the proxy RNG to generate the RNG states */
fclose(cache->stream);
- cache->stream = tmpfile();
- if(!cache->stream) return RES_IO_ERR;
- cache->head = cache->tail = ftell(cache->stream);
+ cache->stream = NULL;
+ cache->no_rstream = 1;
}
- return RES_OK;
+ /* Read the cached RNG state from the stream */
+ if(!cache->no_rstream) {
+ fseek(cache->stream, cache->read, SEEK_SET);
+ res = ssp_rng_read(rng, cache->stream);
+ if(res != RES_OK) goto error;
+ cache->read = ftell(cache->stream);
+
+ /* The fp reaches the end of the cached data */
+ if(cache->read >= STATE_CACHE_HINT_MAX_SIZE) {
+ cache->read = 0;
+ }
+
+ /* Remove one cached states */
+ cache->nstates -= 1;
+
+ /* Generate the next RNG state and load the cached one */
+ } else {
+ /* Copy the cached RNG state */
+ res = darray_char_copy(&cache->state_scratch, &cache->state);
+ if(res != RES_OK) goto error;
+
+ /* Load the cached RNG state */
+ res = ssp_rng_read_cstr(rng, darray_char_cdata_get(&cache->state));
+ if(res != RES_OK) goto error;
+
+ /* Setup the next RNG state */
+ res = ssp_rng_discard(rng, cache->state_pitch);
+ if(res != RES_OK) goto error;
+
+ /* Save the next RNG state */
+ res = rng_write_cstr(rng, &cache->state, NULL);
+ if(res != RES_OK) goto error;
+
+ /* Setup the current RNG state */
+ res = ssp_rng_read_cstr(rng, darray_char_cdata_get(&cache->state_scratch));
+ if(res != RES_OK) goto error;
+ }
+
+exit:
+ return res;
+error:
+ goto exit;
}
static res_T
rng_state_cache_write(struct rng_state_cache* cache, struct ssp_rng* rng)
{
- res_T res;
+ res_T res = RES_OK;
ASSERT(cache && rng);
- fseek(cache->stream, cache->tail, SEEK_SET);
- res = ssp_rng_write(rng, cache->stream);
- if(res != RES_OK) return res;
- cache->tail = ftell(cache->stream);
- return RES_OK;
+
+ if(cache->no_wstream) goto exit; /* Do not cache the submitted state */
+
+ fseek(cache->stream, cache->write, SEEK_SET);
+ if(rng_state_cache_is_empty(cache) || cache->write > cache->read) {
+ /* Directly write the RNG state into the cache stream */
+ res = ssp_rng_write(rng, cache->stream);
+ if(res != RES_OK) goto error;
+ cache->write = ftell(cache->stream);
+
+ /* The fp exceed the amount of cached data */
+ if(cache->write >= STATE_CACHE_HINT_MAX_SIZE) {
+ cache->write = 0;
+ }
+
+ } else {
+ size_t len;
+ res = rng_write_cstr(rng, &cache->state, &len);
+ if(res != RES_OK) goto error;
+
+ if(len > (size_t)(cache->read - cache->write)) {
+ /* No sufficient space into the cache stream to save the RNG state */
+ cache->no_wstream = 1;
+ } else {
+ /* Write the RNG state into the cached stream */
+ size_t sz;
+ sz = fwrite(darray_char_cdata_get(&cache->state), 1, len, cache->stream);
+ if(sz != len) { res = RES_IO_ERR; goto error; }
+ cache->write = ftell(cache->stream);
+
+ /* The fp exceed the amount of cached data */
+ if(cache->write >= STATE_CACHE_HINT_MAX_SIZE) {
+ cache->write = 0;
+ }
+ }
+ }
+
+ /* Update the number of cached states */
+ cache->nstates += 1;
+
+exit:
+ return res;
+error:
+ goto exit;
}
/*******************************************************************************
@@ -361,12 +489,15 @@ rng_proxy_clear(struct ssp_rng_proxy* proxy)
}
static res_T
-rng_proxy_setup(struct ssp_rng_proxy* proxy, const size_t nbuckets)
+rng_proxy_setup
+ (struct ssp_rng_proxy* proxy,
+ const size_t sequence_pitch, /* #RNs between 2 consecutive sequences */
+ const size_t nbuckets)
{
size_t ibucket;
res_T res = RES_OK;
- ASSERT(proxy && nbuckets);
+ ASSERT(proxy && sequence_pitch && nbuckets);
rng_proxy_clear(proxy);
sa_add(proxy->states, nbuckets);
@@ -374,7 +505,8 @@ rng_proxy_setup(struct ssp_rng_proxy* proxy, const size_t nbuckets)
sa_add(proxy->buckets, nbuckets);
FOR_EACH(ibucket, 0, nbuckets) {
- res = rng_state_cache_init(proxy->states+ibucket);
+ res = rng_state_cache_init
+ (proxy->allocator, sequence_pitch, proxy->states+ibucket);
if(res != RES_OK) goto error;
res = ssp_rng_create(proxy->allocator, &proxy->type, proxy->pools+ibucket);
if(res != RES_OK) goto error;
@@ -468,7 +600,7 @@ ssp_rng_proxy_create2
SIG_INIT(proxy->signals + i);
}
- res = rng_proxy_setup(proxy, nbuckets);
+ res = rng_proxy_setup(proxy, sequence_pitch, nbuckets);
if(res != RES_OK) goto error;
exit:
@@ -493,6 +625,7 @@ ssp_rng_proxy_create_from_rng
struct ssp_rng_proxy* proxy = NULL;
struct ssp_rng_type type;
FILE* stream = NULL;
+ size_t sequence_pitch;
size_t i;
res_T res = RES_OK;
@@ -511,7 +644,8 @@ ssp_rng_proxy_create_from_rng
proxy->allocator = allocator;
ref_init(&proxy->ref);
proxy->bucket_size = BUCKET_SIZE_DEFAULT;
- proxy->sequence_bias = 1;
+ proxy->sequence_bias = 0;
+ sequence_pitch = proxy->bucket_size * nbuckets;
res = ssp_rng_get_type(rng, &type);
if(res != RES_OK) goto error;
@@ -543,7 +677,7 @@ ssp_rng_proxy_create_from_rng
SIG_INIT(proxy->signals + i);
}
- res = rng_proxy_setup(proxy, nbuckets);
+ res = rng_proxy_setup(proxy, sequence_pitch, nbuckets);
if(res != RES_OK) goto error;
exit: