commit 2b4c0853c79eb03365fbea98123dd70efd6f78d9
parent 846f79fb128d87d3ed1ab0ce609901546f330a85
Author: Vincent Forest <vincent.forest@meso-star.com>
Date: Tue, 1 Mar 2022 19:06:02 +0100
Fix several critical issues in the proxy RNG
A cache size smaller than the size of one RNG state led to several
bugs. In addition, the sequence id could be wrong when the caching
mechanism was disabled.
Diffstat:
| M | src/ssp_rng_proxy.c | | | 131 | ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--------------------- |
1 file changed, 96 insertions(+), 35 deletions(-)
diff --git a/src/ssp_rng_proxy.c b/src/ssp_rng_proxy.c
@@ -40,7 +40,11 @@
#include <limits.h>
#define BUCKET_SIZE_DEFAULT 1000000 /* #RNs per bucket */
-#define STATE_CACHE_HINT_MAX_SIZE (32*(1024*1024)) /* 32 MB */
+#if 1
+ #define STATE_CACHE_HINT_MAX_SIZE (32*(1024*1024)) /* 32 MB */
+#else
+ #define STATE_CACHE_HINT_MAX_SIZE 0 /* Disable the cache */
+#endif
/* Cache of RNG states */
struct rng_state_cache {
@@ -89,14 +93,21 @@ struct ssp_rng_proxy {
struct ssp_rng** pools; /* `type' RNGs wrapped by bucket RNGs */
struct rng_state_cache* states; /* Cache of `type' RNG states */
- /* Index of the last sequence queried by proxy-managed RNGs. This index is
- * independent of the original seed used by the proxy and is designed to to
- * identify the status of the proxy relative to its original seed. When the
- * proxy is created, the sequence index is SSP_SEQUENCE_ID_NONE, that is,
- * no sequence has been queried. At the first query of a random number, the
- * first sequence is consumed and this sequence index is then 0. It is then
- * incremented by one each time a new sequence is required. */
- size_t sequence_id;
+ /* Index of the last queried sequence. This index is independent of the
+ * original seed used by the proxy and is designed to identify the status of
+ * the proxy relative to its original seed. When the proxy is created, the
+ * sequence index is SSP_SEQUENCE_ID_NONE, that is, no sequence has been
+ * queried. At the first query of a random number, the first sequence is
+ * consumed and this sequence index is then 0. It is then incremented by one
+ * each time a new sequence is required.
+ *
+ * Each bucket registers the sequence id of its local RNG. The sequence id of
+ * the proxy is the max between these per bucket sequence ids. Note that we
+ * also keep track of the sequence_id of the proxy RNG (main_sequence_id)
+ * that is equal to the sequence id of the proxy only when the cache
+ * mechanism is still in use. */
+ size_t* per_bucket_sequence_id;
+ size_t main_sequence_id;
signal_T signals[RNG_PROXY_SIGS_COUNT__];
@@ -189,15 +200,20 @@ rng_state_cache_release(struct rng_state_cache* cache)
darray_char_release(&cache->state_scratch);
}
-static void
+static res_T
rng_state_cache_clear(struct rng_state_cache* cache)
{
- ASSERT(cache->stream);
- rewind(cache->stream);
- cache->read = cache->write = ftell(cache->stream);
+ if(!cache->stream) {
+ cache->stream = tmpfile();
+ if(!cache->stream) return RES_IO_ERR;
+ } else {
+ rewind(cache->stream);
+ cache->read = cache->write = ftell(cache->stream);
+ }
cache->nstates = 0;
cache->no_wstream = 0;
cache->no_rstream = 0;
+ return RES_OK;
}
static char
@@ -214,6 +230,7 @@ rng_state_cache_read(struct rng_state_cache* cache, struct ssp_rng* rng)
ASSERT(cache && rng && !rng_state_cache_is_empty(cache));
if(!cache->no_rstream
+ && cache->no_wstream
&& cache->read == cache->write
&& cache->nstates == 1/* A state is saved in 'cache->state' */) {
/* There is no more data cached into the stream. Close the stream and do
@@ -276,7 +293,8 @@ rng_state_cache_write(struct rng_state_cache* cache, struct ssp_rng* rng)
if(cache->no_wstream) goto exit; /* Do not cache the submitted state */
fseek(cache->stream, cache->write, SEEK_SET);
- if(rng_state_cache_is_empty(cache) || cache->write > cache->read) {
+ if(STATE_CACHE_HINT_MAX_SIZE > 0
+ && (rng_state_cache_is_empty(cache) || cache->write > cache->read)) {
/* Directly write the RNG state into the cache stream */
res = ssp_rng_write(rng, cache->stream);
if(res != RES_OK) goto error;
@@ -467,7 +485,9 @@ rng_proxy_next_ran_pool
const size_t bucket_name)
{
res_T res = RES_OK;
- ASSERT(proxy && bucket_name <= sa_size(proxy->buckets));
+ ASSERT(proxy);
+ ASSERT(bucket_name <= sa_size(proxy->buckets));
+ ASSERT(bucket_name <= sa_size(proxy->per_bucket_sequence_id));
mutex_lock(proxy->mutex);
if(rng_state_cache_is_empty(proxy->states + bucket_name)) {
@@ -481,14 +501,18 @@ rng_proxy_next_ran_pool
/* Discard RNs to reach the next sequence */
ssp_rng_discard(proxy->rng, proxy->sequence_bias);
- /* Increment the index of the sequence */
- proxy->sequence_id += 1;
+ /* Increment the sequence id of the main RNG */
+ proxy->main_sequence_id += 1;
}
/* Read the RNG pool state of `bucket_name' */
res = rng_state_cache_read
(proxy->states + bucket_name, proxy->pools[bucket_name]);
if(res != RES_OK) FATAL("RNG proxy: cannot read from state cache\n");
+
+ /* Update the sequence of the bucket RNG */
+ proxy->per_bucket_sequence_id[bucket_name] += 1;
+
mutex_unlock(proxy->mutex);
return proxy->pools[bucket_name];
@@ -527,6 +551,7 @@ rng_proxy_setup
sa_add(proxy->states, nbuckets);
sa_add(proxy->pools, nbuckets);
sa_add(proxy->buckets, nbuckets);
+ sa_add(proxy->per_bucket_sequence_id, nbuckets);
FOR_EACH(ibucket, 0, nbuckets) {
res = rng_state_cache_init
@@ -535,6 +560,11 @@ rng_proxy_setup
res = ssp_rng_create(proxy->allocator, proxy->type, proxy->pools+ibucket);
if(res != RES_OK) goto error;
proxy->buckets[ibucket] = 0;
+
+ /* Set the sequence index to SIZE_MAX because no sequence is active until a
+ * random number query is made. On the first query, the index will be
+ * incremented to 0 */
+ proxy->per_bucket_sequence_id[ibucket] = SSP_SEQUENCE_ID_NONE/*<=> SIZE_MAX*/;
}
exit:
@@ -544,17 +574,20 @@ error:
goto exit;
}
-static void
+static res_T
rng_proxy_clear_caches(struct ssp_rng_proxy* proxy)
{
size_t ibucket;
+ res_T res = RES_OK;
ASSERT(proxy);
mutex_lock(proxy->mutex);
FOR_EACH(ibucket, 0, sa_size(proxy->pools)) {
- rng_state_cache_clear(proxy->states+ibucket);
+ res = rng_state_cache_clear(proxy->states+ibucket);
+ if(res != RES_OK) break;
}
mutex_unlock(proxy->mutex);
+ return res;
}
void
@@ -567,6 +600,7 @@ rng_proxy_release(ref_T* ref)
sa_release(proxy->states);
sa_release(proxy->pools);
sa_release(proxy->buckets);
+ sa_release(proxy->per_bucket_sequence_id);
if(proxy->rng) SSP(rng_ref_put(proxy->rng));
if(proxy->mutex) mutex_destroy(proxy->mutex);
MEM_RM(proxy->allocator, proxy);
@@ -647,10 +681,7 @@ ssp_rng_proxy_create2
proxy->sequence_bias =
args->sequence_pitch - (proxy->bucket_size * args->nbuckets);
- /* Set the sequence index to SIZE_MAX because no sequence is active until a
- * random number query is made. On the first query, the index will be
- * incremented to 0 */
- proxy->sequence_id = SSP_SEQUENCE_ID_NONE/* <=> SIZE_MAX*/;
+ proxy->main_sequence_id = SSP_SEQUENCE_ID_NONE;
/* Create the proxy RNG in its default state */
if(!args->rng) {
@@ -727,7 +758,8 @@ ssp_rng_proxy_read(struct ssp_rng_proxy* proxy, FILE* stream)
if(res != RES_OK) return res;
/* Discard the cached RNG states */
- rng_proxy_clear_caches(proxy);
+ res = rng_proxy_clear_caches(proxy);
+ if(res != RES_OK) return res;
/* Notify to bucket RNGs that the proxy RNG state was updated */
SIG_BROADCAST
@@ -828,10 +860,23 @@ ssp_rng_proxy_get_type
}
res_T
-ssp_rng_proxy_get_sequence_id(const struct ssp_rng_proxy* proxy, size_t* id)
+ssp_rng_proxy_get_sequence_id(const struct ssp_rng_proxy* proxy, size_t* out_id)
{
- if(!proxy || !id) return RES_BAD_ARG;
- *id = proxy->sequence_id;
+ size_t id = SSP_SEQUENCE_ID_NONE;
+ size_t i;
+
+ if(!proxy || !out_id) return RES_BAD_ARG;
+
+ mutex_lock(proxy->mutex);
+ FOR_EACH(i, 0, sa_size(proxy->per_bucket_sequence_id)) {
+ if(proxy->per_bucket_sequence_id[i] == SSP_SEQUENCE_ID_NONE) continue;
+ id = id == SSP_SEQUENCE_ID_NONE
+ ? proxy->per_bucket_sequence_id[i]
+ : MMAX(id, proxy->per_bucket_sequence_id[i]);
+ }
+ mutex_unlock(proxy->mutex);
+
+ *out_id = id;
return RES_OK;
}
@@ -840,6 +885,9 @@ ssp_rng_proxy_flush_sequences
(struct ssp_rng_proxy* proxy,
const size_t nseqs)
{
+ size_t nseqs_proxy = 0;
+ size_t iseq;
+ size_t i;
res_T res = RES_OK;
if(!proxy) {
@@ -850,22 +898,35 @@ ssp_rng_proxy_flush_sequences
/* Nothing to discard */
if(nseqs == 0) goto exit;
- /* Flush the random numbers of the 'nseqs' sequences. Note that the status of
- * the proxy RNG is set to the state of the 1st random number of the next
- * sequence. To clear the random numbers of the last sequence queried by the
- * proxy-managed RNGs, simply clear their cache and reset them. So we only
- * reject '(nseqs - 1)*sequence_size' random numbers and not
- * 'nseqs*sequence_size' */
+ res = ssp_rng_proxy_get_sequence_id(proxy, &iseq);
+ if(res != RES_OK) goto error;
+
+ /* Compute the number of sequences to flush for the _main_ RNG, i.e. the one
+ * used when the cache is in use. One wants to flush the random numbers of
+ * 'nseqs' sequences from the current sequence id of the proxy. That's said,
+ * since the status of the RNGs are set to the state of the 1st random number
+ * of the _next_ sequence it is only necessary to discard (nseqs-1) sequences
+ * and to clear the cache. Anyway, note that the main sequence id can be
+ * behind the overall sequence id. This occurs when the cache mechanism is no
+ * more in use. In this case we have to flush additionnal sequences from the
+ * main sequence id to the current sequence id of the proxy. */
+ nseqs_proxy = (nseqs - 1) + (iseq - proxy->main_sequence_id);
+
mutex_lock(proxy->mutex);
- res = ssp_rng_discard(proxy->rng, proxy->sequence_size * (nseqs-1));
+ res = ssp_rng_discard(proxy->rng, proxy->sequence_size * nseqs_proxy);
mutex_unlock(proxy->mutex);
if(res != RES_OK) goto error;
- proxy->sequence_id += (nseqs-1);
+ proxy->main_sequence_id += nseqs_proxy;
/* Discard the cached RNG states */
rng_proxy_clear_caches(proxy);
+ /* Reset the RNGs sequence id */
+ FOR_EACH(i, 0, sa_size(proxy->per_bucket_sequence_id)) {
+ proxy->per_bucket_sequence_id[i] = proxy->main_sequence_id;
+ }
+
/* Notify to bucket RNGs that the proxy RNG state was updated */
SIG_BROADCAST
(proxy->signals+RNG_PROXY_SIG_SET_STATE, rng_proxy_cb_T, ARG1(proxy));