diff --git a/src/rt/ovni.c b/src/rt/ovni.c index 6157548..1bac96c 100644 --- a/src/rt/ovni.c +++ b/src/rt/ovni.c @@ -5,6 +5,8 @@ #include #include #include +#include +#include #include #include #include @@ -19,6 +21,13 @@ #include "version.h" #include "utlist.h" +enum { + ST_UNINIT = 0, + ST_INIT, + ST_READY, + ST_GONE, +}; + struct ovni_rcpu { int index; int phyid; @@ -73,8 +82,7 @@ struct ovni_rproc { int rank; int nranks; - int ready; - int finished; + atomic_int st; JSON_Value *meta; }; @@ -171,8 +179,8 @@ ovni_add_cpu(int index, int phyid) if (phyid < 0) die("cannot use negative CPU id %d", phyid); - if (!rproc.ready) - die("process not yet initialized"); + if (rproc.st != ST_READY) + die("process not ready"); if (!rthread.ready) die("thread not yet initialized"); @@ -190,8 +198,8 @@ ovni_add_cpu(int index, int phyid) void ovni_proc_set_rank(int rank, int nranks) { - if (!rproc.ready) - die("process not yet initialized"); + if (rproc.st != ST_READY) + die("process not ready"); rproc.rank_set = 1; rproc.rank = rank; @@ -237,13 +245,19 @@ create_proc_dir(const char *loom, int pid) void ovni_proc_init(int app, const char *loom, int pid) { - if (rproc.ready) - die("pid %d already initialized", pid); + /* Protect against two threads calling at the same time */ + int st = ST_UNINIT; + bool was_uninit = atomic_compare_exchange_strong(&rproc.st, + &st, ST_INIT); - if (rproc.finished) - die("pid %d has finished, cannot init again", pid); - - memset(&rproc, 0, sizeof(rproc)); + if (!was_uninit) { + if (st == ST_INIT) + die("pid %d already being initialized", pid); + else if (st == ST_READY) + die("pid %d already initialized", pid); + else if (st == ST_GONE) + die("pid %d has finished, cannot init again", pid); + } if (strlen(loom) >= OVNI_MAX_HOSTNAME) die("loom name too long: %s", loom); @@ -255,7 +269,7 @@ ovni_proc_init(int app, const char *loom, int pid) create_proc_dir(loom, pid); - rproc.ready = 1; + rproc.st = ST_READY; } static int @@ -353,18 +367,19 @@ try_clean_dir(const char *dir) void ovni_proc_fini(void) { - if (!rproc.ready) - die("process not initialized"); + /* Protect against two threads calling at the same time */ + int st = ST_READY; + bool was_ready = atomic_compare_exchange_strong(&rproc.st, + &st, ST_GONE); + + if (!was_ready) + die("process not ready"); if (rproc.move_to_final) { try_clean_dir(rproc.procdir); try_clean_dir(rproc.loomdir); try_clean_dir(rproc.tmpdir); } - - /* Mark the process no longer ready */ - rproc.finished = 1; - rproc.ready = 0; } static void @@ -516,8 +531,8 @@ ovni_thread_init(pid_t tid) if (tid == 0) die("cannot use tid=%d", tid); - if (!rproc.ready) - die("process not yet initialized"); + if (rproc.st != ST_READY) + die("process not ready"); memset(&rthread, 0, sizeof(rthread)); @@ -751,8 +766,8 @@ ovni_flush(void) if (!rthread.ready) die("thread is not initialized"); - if (!rproc.ready) - die("process is not initialized"); + if (rproc.st != ST_READY) + die("process not ready"); ovni_ev_set_clock(&pre, ovni_clock_now()); ovni_ev_set_mcv(&pre, "OF[");