Allow a temporal directory to flush the buffers

The directory can be specified using the OVNI_TMPDIR environment
variable.
This commit is contained in:
Rodrigo Arias 2022-01-12 16:57:52 +01:00
parent 66036fe59e
commit 544c67330c
2 changed files with 164 additions and 34 deletions

187
ovni.c
View File

@ -24,6 +24,7 @@
#define _GNU_SOURCE #define _GNU_SOURCE
#include <dirent.h>
#include <errno.h> #include <errno.h>
#include <fcntl.h> #include <fcntl.h>
#include <fcntl.h> #include <fcntl.h>
@ -45,39 +46,17 @@ struct ovni_rproc rproc = {0};
/* Data per thread */ /* Data per thread */
_Thread_local struct ovni_rthread rthread = {0}; _Thread_local struct ovni_rthread rthread = {0};
static void
create_trace_dirs(char *tracedir, const char *loom, int pid)
{
char path[PATH_MAX];
snprintf(path, PATH_MAX, "%s", tracedir);
/* May fail if another loom created the directory already */
mkdir(path, 0755);
snprintf(path, PATH_MAX, "%s/loom.%s", tracedir, loom);
/* Also may fail */
mkdir(path, 0755);
snprintf(rproc.dir, PATH_MAX, "%s/loom.%s/proc.%d", tracedir, loom, pid);
/* But this one shall not fail */
if(mkdir(rproc.dir, 0755))
die("mkdir %s failed: %s\n", rproc.dir, strerror(errno));
}
static void static void
create_trace_stream(void) create_trace_stream(void)
{ {
char path[PATH_MAX]; char path[PATH_MAX];
int written = snprintf(path, PATH_MAX, "%s/thread.%d", int written = snprintf(path, PATH_MAX, "%s/thread.%d",
rproc.dir, rthread.tid); rproc.procdir, rthread.tid);
if(written >= PATH_MAX) if(written >= PATH_MAX)
die("thread trace path too long: %s/thread.%d\n", die("thread trace path too long: %s/thread.%d\n",
rproc.dir, rthread.tid); rproc.procdir, rthread.tid);
rthread.streamfd = open(path, O_WRONLY | O_CREAT, 0644); rthread.streamfd = open(path, O_WRONLY | O_CREAT, 0644);
@ -95,17 +74,18 @@ proc_metadata_init(struct ovni_rproc *proc)
} }
static void static void
proc_metadata_store(struct ovni_rproc *proc) proc_metadata_store(JSON_Value *meta, const char *procdir)
{ {
char path[PATH_MAX]; char path[PATH_MAX];
if(proc->meta == NULL) if(meta == NULL)
die("process metadata not initialized\n"); die("process metadata not initialized\n");
if(snprintf(path, PATH_MAX, "%s/metadata.json", proc->dir) >= PATH_MAX) if(snprintf(path, PATH_MAX, "%s/metadata.json", procdir) >= PATH_MAX)
die("metadata path too long: %s/metadata.json\n", proc->dir); die("metadata path too long: %s/metadata.json\n",
procdir);
if(json_serialize_to_file_pretty(proc->meta, path) != JSONSuccess) if(json_serialize_to_file_pretty(meta, path) != JSONSuccess)
die("failed to write process metadata\n"); die("failed to write process metadata\n");
} }
@ -205,6 +185,45 @@ ovni_proc_set_rank(int rank, int nranks)
die("json_object_set_number for nranks failed\n"); die("json_object_set_number for nranks failed\n");
} }
/* Create $tracedir/loom.$loom/proc.$pid and return it in path. */
static void
mkdir_proc(char *path, const char *tracedir, const char *loom, int pid)
{
snprintf(path, PATH_MAX, "%s", tracedir);
/* May fail if another loom created the directory already */
mkdir(path, 0755);
snprintf(path, PATH_MAX, "%s/loom.%s", tracedir, loom);
/* Also may fail */
mkdir(path, 0755);
snprintf(path, PATH_MAX, "%s/loom.%s/proc.%d", tracedir, loom, pid);
/* But this one shall not fail */
if(mkdir(path, 0755))
die("mkdir %s failed: %s\n", path, strerror(errno));
}
static void
create_proc_dir(const char *loom, int pid)
{
char *tmpdir = getenv("OVNI_TMPDIR");
if(tmpdir != NULL)
{
rproc.move_to_final = 1;
mkdir_proc(rproc.procdir, tmpdir, loom, pid);
mkdir_proc(rproc.procdir_final, OVNI_TRACEDIR, loom, pid);
}
else
{
rproc.move_to_final = 0;
mkdir_proc(rproc.procdir, OVNI_TRACEDIR, loom, pid);
}
}
void void
ovni_proc_init(int app, const char *loom, int pid) ovni_proc_init(int app, const char *loom, int pid)
{ {
@ -221,7 +240,7 @@ ovni_proc_init(int app, const char *loom, int pid)
rproc.app = app; rproc.app = app;
rproc.clockid = CLOCK_MONOTONIC; rproc.clockid = CLOCK_MONOTONIC;
create_trace_dirs(OVNI_TRACEDIR, loom, pid); create_proc_dir(loom, pid);
proc_metadata_init(&rproc); proc_metadata_init(&rproc);
@ -230,13 +249,119 @@ ovni_proc_init(int app, const char *loom, int pid)
proc_set_app(app); proc_set_app(app);
} }
static int
move_thread_to_final(const char *src, const char *dst)
{
char buffer[1024];
size_t bytes;
FILE *infile = fopen(src, "r");
if(infile == NULL)
{
err("fopen(%s) failed: %s\n", src, strerror(errno));
return -1;
}
FILE *outfile = fopen(dst, "w");
if(outfile == NULL)
{
err("fopen(%s) failed: %s\n", src, strerror(errno));
return -1;
}
while((bytes = fread(buffer, 1, sizeof(buffer), infile)) > 0)
fwrite(buffer, 1, bytes, outfile);
fclose(outfile);
fclose(infile);
if(remove(src) != 0)
{
err("remove(%s) failed: %s\n", src, strerror(errno));
return -1;
}
return 0;
}
static void
move_procdir_to_final(const char *procdir, const char *procdir_final)
{
struct dirent *dirent;
DIR *dir;
char thread[PATH_MAX];
char thread_final[PATH_MAX];
int err = 0;
if((dir = opendir(procdir)) == NULL)
{
err("opendir %s failed: %s\n", procdir, strerror(errno));
return;
}
const char *prefix = "thread.";
while((dirent = readdir(dir)) != NULL)
{
/* It should only contain thread.* directories, skip others */
if(strncmp(dirent->d_name, prefix, strlen(prefix)) != 0)
continue;
if(snprintf(thread, PATH_MAX, "%s/%s", procdir,
dirent->d_name) >= PATH_MAX)
{
err("snprintf: path too large: %s/%s\n", procdir,
dirent->d_name);
err = 1;
continue;
}
if(snprintf(thread_final, PATH_MAX, "%s/%s", procdir_final,
dirent->d_name) >= PATH_MAX)
{
err("snprintf: path too large: %s/%s\n", procdir_final,
dirent->d_name);
err = 1;
continue;
}
if(move_thread_to_final(thread, thread_final) != 0)
err = 1;
}
closedir(dir);
if(rmdir(procdir) != 0)
{
err("rmdir(%s) failed: %s\n", procdir, strerror(errno));
err = 1;
}
/* Warn the user, but we cannot do much at this point */
if(err)
err("errors occurred when moving the trace to %s\n", procdir_final);
}
void void
ovni_proc_fini(void) ovni_proc_fini(void)
{ {
if(!rproc.ready) if(!rproc.ready)
die("ovni_proc_fini: process not initialized\n"); die("ovni_proc_fini: process not initialized\n");
proc_metadata_store(&rproc); /* Mark the process no longer ready */
rproc.ready = 0;
if(rproc.move_to_final)
{
proc_metadata_store(rproc.meta, rproc.procdir_final);
move_procdir_to_final(rproc.procdir, rproc.procdir_final);
}
else
{
proc_metadata_store(rproc.meta, rproc.procdir);
}
} }
void void

11
ovni.h
View File

@ -111,15 +111,20 @@ struct ovni_rthread {
/* State of each process on runtime */ /* State of each process on runtime */
struct ovni_rproc { struct ovni_rproc {
/* Path of the process tracedir */ /* Where the process trace is finally copied */
char dir[PATH_MAX]; char procdir_final[PATH_MAX];
/* Where the process trace is flushed */
char procdir[PATH_MAX];
/* If needs to be moved at the end */
int move_to_final;
int app; int app;
int pid; int pid;
char loom[OVNI_MAX_HOSTNAME]; char loom[OVNI_MAX_HOSTNAME];
int ncpus; int ncpus;
clockid_t clockid; clockid_t clockid;
char procdir[PATH_MAX];
int ready; int ready;