Store stream metadata in stream.json
Place all stream files in its own stream directory.
This commit is contained in:
parent
add2c5638a
commit
f31e73003f
106
src/emu/stream.c
106
src/emu/stream.c
@ -71,41 +71,22 @@ load_stream_fd(struct stream *stream, int fd)
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int
|
static int
|
||||||
stream_load(struct stream *stream, const char *tracedir, const char *relpath)
|
load_obs(struct stream *stream, const char *path)
|
||||||
{
|
{
|
||||||
memset(stream, 0, sizeof(struct stream));
|
|
||||||
|
|
||||||
if (snprintf(stream->path, PATH_MAX, "%s/%s", tracedir, relpath) >= PATH_MAX) {
|
|
||||||
err("path too long: %s/%s", tracedir, relpath);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Allow loading a trace with empty relpath */
|
|
||||||
path_remove_trailing(stream->path);
|
|
||||||
|
|
||||||
if (snprintf(stream->relpath, PATH_MAX, "%s", relpath) >= PATH_MAX) {
|
|
||||||
err("path too long: %s", relpath);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
dbg("loading %s", stream->relpath);
|
|
||||||
|
|
||||||
int fd;
|
int fd;
|
||||||
if ((fd = open(stream->path, O_RDWR)) == -1) {
|
if ((fd = open(path, O_RDWR)) == -1) {
|
||||||
err("open %s failed:", stream->path);
|
err("open %s failed:", path);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (load_stream_fd(stream, fd) != 0) {
|
if (load_stream_fd(stream, fd) != 0) {
|
||||||
err("load_stream_fd failed for stream '%s'",
|
err("load_stream_fd failed for: %s", path);
|
||||||
stream->path);
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (check_stream_header(stream) != 0) {
|
if (check_stream_header(stream) != 0) {
|
||||||
err("stream '%s' has bad header",
|
err("stream has bad header: %s", path);
|
||||||
stream->path);
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -132,6 +113,75 @@ stream_load(struct stream *stream, const char *tracedir, const char *relpath)
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static JSON_Object *
|
||||||
|
load_json(const char *path)
|
||||||
|
{
|
||||||
|
JSON_Value *vmeta = json_parse_file_with_comments(path);
|
||||||
|
if (vmeta == NULL) {
|
||||||
|
err("json_parse_file_with_comments() failed");
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
JSON_Object *meta = json_value_get_object(vmeta);
|
||||||
|
if (meta == NULL) {
|
||||||
|
err("json_value_get_object() failed");
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
return meta;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Loads a stream from disk.
|
||||||
|
*
|
||||||
|
* The relpath must be pointing to a directory with the stream.json and
|
||||||
|
* stream.obs files.
|
||||||
|
*/
|
||||||
|
int
|
||||||
|
stream_load(struct stream *stream, const char *tracedir, const char *relpath)
|
||||||
|
{
|
||||||
|
memset(stream, 0, sizeof(struct stream));
|
||||||
|
|
||||||
|
if (snprintf(stream->path, PATH_MAX, "%s/%s", tracedir, relpath) >= PATH_MAX) {
|
||||||
|
err("path too long: %s/%s", tracedir, relpath);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Allow loading a trace with empty relpath */
|
||||||
|
path_remove_trailing(stream->path);
|
||||||
|
|
||||||
|
if (snprintf(stream->relpath, PATH_MAX, "%s", relpath) >= PATH_MAX) {
|
||||||
|
err("path too long: %s", relpath);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
dbg("loading %s", stream->relpath);
|
||||||
|
|
||||||
|
char path_json[PATH_MAX];
|
||||||
|
char path_obs[PATH_MAX];
|
||||||
|
|
||||||
|
if (path_append(path_json, stream->path, "stream.json") != 0) {
|
||||||
|
err("path_append failed");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((stream->meta = load_json(path_json)) == NULL) {
|
||||||
|
err("load_json failed for: %s", path_json);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (path_append(path_obs, stream->path, "stream.obs") != 0) {
|
||||||
|
err("path_append failed");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (load_obs(stream, path_obs) != 0) {
|
||||||
|
err("load_obs failed");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
stream_data_set(struct stream *stream, void *data)
|
stream_data_set(struct stream *stream, void *data)
|
||||||
{
|
{
|
||||||
@ -144,6 +194,12 @@ stream_data_get(struct stream *stream)
|
|||||||
return stream->data;
|
return stream->data;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
JSON_Object *
|
||||||
|
stream_metadata(struct stream *stream)
|
||||||
|
{
|
||||||
|
return stream->meta;
|
||||||
|
}
|
||||||
|
|
||||||
int
|
int
|
||||||
stream_clkoff_set(struct stream *stream, int64_t clkoff)
|
stream_clkoff_set(struct stream *stream, int64_t clkoff)
|
||||||
{
|
{
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
/* Copyright (c) 2021-2023 Barcelona Supercomputing Center (BSC)
|
/* Copyright (c) 2021-2024 Barcelona Supercomputing Center (BSC)
|
||||||
* SPDX-License-Identifier: GPL-3.0-or-later */
|
* SPDX-License-Identifier: GPL-3.0-or-later */
|
||||||
|
|
||||||
#ifndef STREAM_H
|
#ifndef STREAM_H
|
||||||
@ -8,6 +8,7 @@
|
|||||||
#include <stdint.h>
|
#include <stdint.h>
|
||||||
#include "common.h"
|
#include "common.h"
|
||||||
#include "heap.h"
|
#include "heap.h"
|
||||||
|
#include "parson.h"
|
||||||
struct ovni_ev;
|
struct ovni_ev;
|
||||||
|
|
||||||
struct stream {
|
struct stream {
|
||||||
@ -34,6 +35,8 @@ struct stream {
|
|||||||
int64_t offset;
|
int64_t offset;
|
||||||
|
|
||||||
double progress;
|
double progress;
|
||||||
|
|
||||||
|
JSON_Object *meta;
|
||||||
};
|
};
|
||||||
|
|
||||||
USE_RET int stream_load(struct stream *stream, const char *tracedir, const char *relpath);
|
USE_RET int stream_load(struct stream *stream, const char *tracedir, const char *relpath);
|
||||||
@ -46,5 +49,6 @@ USE_RET int64_t stream_lastclock(struct stream *stream);
|
|||||||
void stream_allow_unsorted(struct stream *stream);
|
void stream_allow_unsorted(struct stream *stream);
|
||||||
void stream_data_set(struct stream *stream, void *data);
|
void stream_data_set(struct stream *stream, void *data);
|
||||||
USE_RET void *stream_data_get(struct stream *stream);
|
USE_RET void *stream_data_get(struct stream *stream);
|
||||||
|
USE_RET JSON_Object *stream_metadata(struct stream *stream);
|
||||||
|
|
||||||
#endif /* STREAM_H */
|
#endif /* STREAM_H */
|
||||||
|
@ -27,7 +27,7 @@ add_stream(struct trace *trace, struct stream *stream)
|
|||||||
}
|
}
|
||||||
|
|
||||||
static int
|
static int
|
||||||
load_stream(struct trace *trace, const char *path)
|
load_stream(struct trace *trace, const char *json_path)
|
||||||
{
|
{
|
||||||
struct stream *stream = calloc(1, sizeof(struct stream));
|
struct stream *stream = calloc(1, sizeof(struct stream));
|
||||||
|
|
||||||
@ -36,6 +36,14 @@ load_stream(struct trace *trace, const char *path)
|
|||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* The json_path must end in .../stream.json, so remove it */
|
||||||
|
char path[PATH_MAX];
|
||||||
|
if (path_copy(path, json_path) != 0) {
|
||||||
|
err("path_copy failed");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
path_dirname(path);
|
||||||
|
|
||||||
int offset = (int) strlen(trace->tracedir);
|
int offset = (int) strlen(trace->tracedir);
|
||||||
const char *relpath = path + offset;
|
const char *relpath = path + offset;
|
||||||
|
|
||||||
@ -53,46 +61,16 @@ load_stream(struct trace *trace, const char *path)
|
|||||||
}
|
}
|
||||||
|
|
||||||
static int
|
static int
|
||||||
has_suffix(const char *str, const char *suffix)
|
is_stream(const char *fpath)
|
||||||
{
|
{
|
||||||
if (!str || !suffix)
|
const char *filename = path_filename(fpath);
|
||||||
return 0;
|
|
||||||
|
|
||||||
int lenstr = (int) strlen(str);
|
if (strcmp(filename, "stream.json") == 0)
|
||||||
int lensuffix = (int) strlen(suffix);
|
|
||||||
|
|
||||||
if (lensuffix > lenstr)
|
|
||||||
return 0;
|
|
||||||
|
|
||||||
const char *p = str + lenstr - lensuffix;
|
|
||||||
if (strncmp(p, suffix, (size_t) lensuffix) == 0)
|
|
||||||
return 1;
|
return 1;
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int
|
|
||||||
is_stream(const char *fpath)
|
|
||||||
{
|
|
||||||
if (has_suffix(fpath, OVNI_STREAM_EXT))
|
|
||||||
return 1;
|
|
||||||
|
|
||||||
/* For compatibility load the old streams too */
|
|
||||||
const char *filename = path_filename(fpath);
|
|
||||||
|
|
||||||
const char prefix[] = "thread.";
|
|
||||||
if (!path_has_prefix(filename, prefix))
|
|
||||||
return 0;
|
|
||||||
|
|
||||||
const char *tid = filename + strlen(prefix);
|
|
||||||
for (int i = 0; tid[i]; i++) {
|
|
||||||
if (tid[i] < '0' || tid[i] > '9')
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int
|
static int
|
||||||
cb_nftw(const char *fpath, const struct stat *sb,
|
cb_nftw(const char *fpath, const struct stat *sb,
|
||||||
int typeflag, struct FTW *ftwbuf)
|
int typeflag, struct FTW *ftwbuf)
|
||||||
|
@ -104,17 +104,34 @@ void ovni_version_check_str(const char *version)
|
|||||||
/* Ignore the patch number */
|
/* Ignore the patch number */
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
create_thread_dir(void)
|
||||||
|
{
|
||||||
|
char path[PATH_MAX];
|
||||||
|
|
||||||
|
int written = snprintf(path, PATH_MAX, "%s/thread.%d",
|
||||||
|
rproc.procdir, rthread.tid);
|
||||||
|
|
||||||
|
if (written >= PATH_MAX)
|
||||||
|
die("path too long: %s/thread.%d", rproc.procdir, rthread.tid);
|
||||||
|
|
||||||
|
/* The procdir must have been created earlier */
|
||||||
|
if (mkdir(path, 0755) != 0)
|
||||||
|
die("mkdir(%s) failed:", path);
|
||||||
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
create_trace_stream(void)
|
create_trace_stream(void)
|
||||||
{
|
{
|
||||||
char path[PATH_MAX];
|
char path[PATH_MAX];
|
||||||
|
|
||||||
int written = snprintf(path, PATH_MAX, "%s/thread.%d%s",
|
int written = snprintf(path, PATH_MAX, "%s/thread.%d/stream.obs",
|
||||||
rproc.procdir, rthread.tid, OVNI_STREAM_EXT);
|
rproc.procdir, rthread.tid);
|
||||||
|
|
||||||
if (written >= PATH_MAX)
|
if (written >= PATH_MAX) {
|
||||||
die("thread trace path too long: %s/thread.%d%s",
|
die("path too long: %s/thread.%d/stream.obs",
|
||||||
rproc.procdir, rthread.tid, OVNI_STREAM_EXT);
|
rproc.procdir, rthread.tid);
|
||||||
|
}
|
||||||
|
|
||||||
rthread.streamfd = open(path, O_WRONLY | O_CREAT, 0644);
|
rthread.streamfd = open(path, O_WRONLY | O_CREAT, 0644);
|
||||||
|
|
||||||
@ -465,11 +482,11 @@ static void
|
|||||||
thread_metadata_store(void)
|
thread_metadata_store(void)
|
||||||
{
|
{
|
||||||
char path[PATH_MAX];
|
char path[PATH_MAX];
|
||||||
int written = snprintf(path, PATH_MAX, "%s/thread.%d.json",
|
int written = snprintf(path, PATH_MAX, "%s/thread.%d/stream.json",
|
||||||
rproc.procdir, rthread.tid);
|
rproc.procdir, rthread.tid);
|
||||||
|
|
||||||
if (written >= PATH_MAX)
|
if (written >= PATH_MAX)
|
||||||
die("thread trace path too long: %s/thread.%d.json",
|
die("thread trace path too long: %s/thread.%d/stream.json",
|
||||||
rproc.procdir, rthread.tid);
|
rproc.procdir, rthread.tid);
|
||||||
|
|
||||||
if (json_serialize_to_file_pretty(rthread.meta, path) != JSONSuccess)
|
if (json_serialize_to_file_pretty(rthread.meta, path) != JSONSuccess)
|
||||||
@ -578,6 +595,7 @@ ovni_thread_init(pid_t tid)
|
|||||||
if (rthread.evbuf == NULL)
|
if (rthread.evbuf == NULL)
|
||||||
die("malloc failed:");
|
die("malloc failed:");
|
||||||
|
|
||||||
|
create_thread_dir();
|
||||||
create_trace_stream();
|
create_trace_stream();
|
||||||
write_stream_header();
|
write_stream_header();
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user