From f31e73003fd45dd6eb395453ce561ecd9bb29076 Mon Sep 17 00:00:00 2001 From: Rodrigo Arias Date: Mon, 9 Sep 2024 16:41:41 +0200 Subject: [PATCH] Store stream metadata in stream.json Place all stream files in its own stream directory. --- src/emu/stream.c | 106 ++++++++++++++++++++++++++++++++++++----------- src/emu/stream.h | 6 ++- src/emu/trace.c | 46 ++++++-------------- src/rt/ovni.c | 32 ++++++++++---- 4 files changed, 123 insertions(+), 67 deletions(-) diff --git a/src/emu/stream.c b/src/emu/stream.c index ddfe2b6..52f0b59 100644 --- a/src/emu/stream.c +++ b/src/emu/stream.c @@ -71,41 +71,22 @@ load_stream_fd(struct stream *stream, int fd) return 0; } -int -stream_load(struct stream *stream, const char *tracedir, const char *relpath) +static int +load_obs(struct stream *stream, const char *path) { - memset(stream, 0, sizeof(struct stream)); - - if (snprintf(stream->path, PATH_MAX, "%s/%s", tracedir, relpath) >= PATH_MAX) { - err("path too long: %s/%s", tracedir, relpath); - return -1; - } - - /* Allow loading a trace with empty relpath */ - path_remove_trailing(stream->path); - - if (snprintf(stream->relpath, PATH_MAX, "%s", relpath) >= PATH_MAX) { - err("path too long: %s", relpath); - return -1; - } - - dbg("loading %s", stream->relpath); - int fd; - if ((fd = open(stream->path, O_RDWR)) == -1) { - err("open %s failed:", stream->path); + if ((fd = open(path, O_RDWR)) == -1) { + err("open %s failed:", path); return -1; } if (load_stream_fd(stream, fd) != 0) { - err("load_stream_fd failed for stream '%s'", - stream->path); + err("load_stream_fd failed for: %s", path); return -1; } if (check_stream_header(stream) != 0) { - err("stream '%s' has bad header", - stream->path); + err("stream has bad header: %s", path); return -1; } @@ -132,6 +113,75 @@ stream_load(struct stream *stream, const char *tracedir, const char *relpath) return 0; } +static JSON_Object * +load_json(const char *path) +{ + JSON_Value *vmeta = json_parse_file_with_comments(path); + if (vmeta == NULL) { + err("json_parse_file_with_comments() failed"); + return NULL; + } + + JSON_Object *meta = json_value_get_object(vmeta); + if (meta == NULL) { + err("json_value_get_object() failed"); + return NULL; + } + + return meta; +} + +/** Loads a stream from disk. + * + * The relpath must be pointing to a directory with the stream.json and + * stream.obs files. + */ +int +stream_load(struct stream *stream, const char *tracedir, const char *relpath) +{ + memset(stream, 0, sizeof(struct stream)); + + if (snprintf(stream->path, PATH_MAX, "%s/%s", tracedir, relpath) >= PATH_MAX) { + err("path too long: %s/%s", tracedir, relpath); + return -1; + } + + /* Allow loading a trace with empty relpath */ + path_remove_trailing(stream->path); + + if (snprintf(stream->relpath, PATH_MAX, "%s", relpath) >= PATH_MAX) { + err("path too long: %s", relpath); + return -1; + } + + dbg("loading %s", stream->relpath); + + char path_json[PATH_MAX]; + char path_obs[PATH_MAX]; + + if (path_append(path_json, stream->path, "stream.json") != 0) { + err("path_append failed"); + return -1; + } + + if ((stream->meta = load_json(path_json)) == NULL) { + err("load_json failed for: %s", path_json); + return -1; + } + + if (path_append(path_obs, stream->path, "stream.obs") != 0) { + err("path_append failed"); + return -1; + } + + if (load_obs(stream, path_obs) != 0) { + err("load_obs failed"); + return -1; + } + + return 0; +} + void stream_data_set(struct stream *stream, void *data) { @@ -144,6 +194,12 @@ stream_data_get(struct stream *stream) return stream->data; } +JSON_Object * +stream_metadata(struct stream *stream) +{ + return stream->meta; +} + int stream_clkoff_set(struct stream *stream, int64_t clkoff) { diff --git a/src/emu/stream.h b/src/emu/stream.h index 2bbfd13..8acee0c 100644 --- a/src/emu/stream.h +++ b/src/emu/stream.h @@ -1,4 +1,4 @@ -/* Copyright (c) 2021-2023 Barcelona Supercomputing Center (BSC) +/* Copyright (c) 2021-2024 Barcelona Supercomputing Center (BSC) * SPDX-License-Identifier: GPL-3.0-or-later */ #ifndef STREAM_H @@ -8,6 +8,7 @@ #include #include "common.h" #include "heap.h" +#include "parson.h" struct ovni_ev; struct stream { @@ -34,6 +35,8 @@ struct stream { int64_t offset; double progress; + + JSON_Object *meta; }; USE_RET int stream_load(struct stream *stream, const char *tracedir, const char *relpath); @@ -46,5 +49,6 @@ USE_RET int64_t stream_lastclock(struct stream *stream); void stream_allow_unsorted(struct stream *stream); void stream_data_set(struct stream *stream, void *data); USE_RET void *stream_data_get(struct stream *stream); +USE_RET JSON_Object *stream_metadata(struct stream *stream); #endif /* STREAM_H */ diff --git a/src/emu/trace.c b/src/emu/trace.c index 4b8bd49..96aa414 100644 --- a/src/emu/trace.c +++ b/src/emu/trace.c @@ -27,7 +27,7 @@ add_stream(struct trace *trace, struct stream *stream) } static int -load_stream(struct trace *trace, const char *path) +load_stream(struct trace *trace, const char *json_path) { struct stream *stream = calloc(1, sizeof(struct stream)); @@ -36,6 +36,14 @@ load_stream(struct trace *trace, const char *path) return -1; } + /* The json_path must end in .../stream.json, so remove it */ + char path[PATH_MAX]; + if (path_copy(path, json_path) != 0) { + err("path_copy failed"); + return -1; + } + path_dirname(path); + int offset = (int) strlen(trace->tracedir); const char *relpath = path + offset; @@ -53,46 +61,16 @@ load_stream(struct trace *trace, const char *path) } static int -has_suffix(const char *str, const char *suffix) +is_stream(const char *fpath) { - if (!str || !suffix) - return 0; + const char *filename = path_filename(fpath); - int lenstr = (int) strlen(str); - int lensuffix = (int) strlen(suffix); - - if (lensuffix > lenstr) - return 0; - - const char *p = str + lenstr - lensuffix; - if (strncmp(p, suffix, (size_t) lensuffix) == 0) + if (strcmp(filename, "stream.json") == 0) return 1; return 0; } -static int -is_stream(const char *fpath) -{ - if (has_suffix(fpath, OVNI_STREAM_EXT)) - return 1; - - /* For compatibility load the old streams too */ - const char *filename = path_filename(fpath); - - const char prefix[] = "thread."; - if (!path_has_prefix(filename, prefix)) - return 0; - - const char *tid = filename + strlen(prefix); - for (int i = 0; tid[i]; i++) { - if (tid[i] < '0' || tid[i] > '9') - return 0; - } - - return 1; -} - static int cb_nftw(const char *fpath, const struct stat *sb, int typeflag, struct FTW *ftwbuf) diff --git a/src/rt/ovni.c b/src/rt/ovni.c index 0cec34d..523dfea 100644 --- a/src/rt/ovni.c +++ b/src/rt/ovni.c @@ -104,17 +104,34 @@ void ovni_version_check_str(const char *version) /* Ignore the patch number */ } +static void +create_thread_dir(void) +{ + char path[PATH_MAX]; + + int written = snprintf(path, PATH_MAX, "%s/thread.%d", + rproc.procdir, rthread.tid); + + if (written >= PATH_MAX) + die("path too long: %s/thread.%d", rproc.procdir, rthread.tid); + + /* The procdir must have been created earlier */ + if (mkdir(path, 0755) != 0) + die("mkdir(%s) failed:", path); +} + static void create_trace_stream(void) { char path[PATH_MAX]; - int written = snprintf(path, PATH_MAX, "%s/thread.%d%s", - rproc.procdir, rthread.tid, OVNI_STREAM_EXT); + int written = snprintf(path, PATH_MAX, "%s/thread.%d/stream.obs", + rproc.procdir, rthread.tid); - if (written >= PATH_MAX) - die("thread trace path too long: %s/thread.%d%s", - rproc.procdir, rthread.tid, OVNI_STREAM_EXT); + if (written >= PATH_MAX) { + die("path too long: %s/thread.%d/stream.obs", + rproc.procdir, rthread.tid); + } rthread.streamfd = open(path, O_WRONLY | O_CREAT, 0644); @@ -465,11 +482,11 @@ static void thread_metadata_store(void) { char path[PATH_MAX]; - int written = snprintf(path, PATH_MAX, "%s/thread.%d.json", + int written = snprintf(path, PATH_MAX, "%s/thread.%d/stream.json", rproc.procdir, rthread.tid); if (written >= PATH_MAX) - die("thread trace path too long: %s/thread.%d.json", + die("thread trace path too long: %s/thread.%d/stream.json", rproc.procdir, rthread.tid); if (json_serialize_to_file_pretty(rthread.meta, path) != JSONSuccess) @@ -578,6 +595,7 @@ ovni_thread_init(pid_t tid) if (rthread.evbuf == NULL) die("malloc failed:"); + create_thread_dir(); create_trace_stream(); write_stream_header();