Store stream metadata in stream.json
Place all stream files in its own stream directory.
This commit is contained in:
		
							parent
							
								
									85859a488d
								
							
						
					
					
						commit
						9a8dc382a2
					
				
							
								
								
									
										106
									
								
								src/emu/stream.c
									
									
									
									
									
								
							
							
						
						
									
										106
									
								
								src/emu/stream.c
									
									
									
									
									
								
							| @ -71,41 +71,22 @@ load_stream_fd(struct stream *stream, int fd) | |||||||
| 	return 0; | 	return 0; | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| int | static int | ||||||
| stream_load(struct stream *stream, const char *tracedir, const char *relpath) | load_obs(struct stream *stream, const char *path) | ||||||
| { | { | ||||||
| 	memset(stream, 0, sizeof(struct stream)); |  | ||||||
| 
 |  | ||||||
| 	if (snprintf(stream->path, PATH_MAX, "%s/%s", tracedir, relpath) >= PATH_MAX) { |  | ||||||
| 		err("path too long: %s/%s", tracedir, relpath); |  | ||||||
| 		return -1; |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	/* Allow loading a trace with empty relpath */ |  | ||||||
| 	path_remove_trailing(stream->path); |  | ||||||
| 
 |  | ||||||
| 	if (snprintf(stream->relpath, PATH_MAX, "%s", relpath) >= PATH_MAX) { |  | ||||||
| 		err("path too long: %s", relpath); |  | ||||||
| 		return -1; |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	dbg("loading %s", stream->relpath); |  | ||||||
| 
 |  | ||||||
| 	int fd; | 	int fd; | ||||||
| 	if ((fd = open(stream->path, O_RDWR)) == -1) { | 	if ((fd = open(path, O_RDWR)) == -1) { | ||||||
| 		err("open %s failed:", stream->path); | 		err("open %s failed:", path); | ||||||
| 		return -1; | 		return -1; | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	if (load_stream_fd(stream, fd) != 0) { | 	if (load_stream_fd(stream, fd) != 0) { | ||||||
| 		err("load_stream_fd failed for stream '%s'", | 		err("load_stream_fd failed for: %s", path); | ||||||
| 				stream->path); |  | ||||||
| 		return -1; | 		return -1; | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	if (check_stream_header(stream) != 0) { | 	if (check_stream_header(stream) != 0) { | ||||||
| 		err("stream '%s' has bad header", | 		err("stream has bad header: %s", path); | ||||||
| 				stream->path); |  | ||||||
| 		return -1; | 		return -1; | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| @ -132,6 +113,75 @@ stream_load(struct stream *stream, const char *tracedir, const char *relpath) | |||||||
| 	return 0; | 	return 0; | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | static JSON_Object * | ||||||
|  | load_json(const char *path) | ||||||
|  | { | ||||||
|  | 	JSON_Value *vmeta = json_parse_file_with_comments(path); | ||||||
|  | 	if (vmeta == NULL) { | ||||||
|  | 		err("json_parse_file_with_comments() failed"); | ||||||
|  | 		return NULL; | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	JSON_Object *meta = json_value_get_object(vmeta); | ||||||
|  | 	if (meta == NULL) { | ||||||
|  | 		err("json_value_get_object() failed"); | ||||||
|  | 		return NULL; | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return meta; | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | /** Loads a stream from disk.
 | ||||||
|  |  * | ||||||
|  |  * The relpath must be pointing to a directory with the stream.json and | ||||||
|  |  * stream.obs files. | ||||||
|  |  */ | ||||||
|  | int | ||||||
|  | stream_load(struct stream *stream, const char *tracedir, const char *relpath) | ||||||
|  | { | ||||||
|  | 	memset(stream, 0, sizeof(struct stream)); | ||||||
|  | 
 | ||||||
|  | 	if (snprintf(stream->path, PATH_MAX, "%s/%s", tracedir, relpath) >= PATH_MAX) { | ||||||
|  | 		err("path too long: %s/%s", tracedir, relpath); | ||||||
|  | 		return -1; | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	/* Allow loading a trace with empty relpath */ | ||||||
|  | 	path_remove_trailing(stream->path); | ||||||
|  | 
 | ||||||
|  | 	if (snprintf(stream->relpath, PATH_MAX, "%s", relpath) >= PATH_MAX) { | ||||||
|  | 		err("path too long: %s", relpath); | ||||||
|  | 		return -1; | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	dbg("loading %s", stream->relpath); | ||||||
|  | 
 | ||||||
|  | 	char path_json[PATH_MAX]; | ||||||
|  | 	char path_obs[PATH_MAX]; | ||||||
|  | 
 | ||||||
|  | 	if (path_append(path_json, stream->path, "stream.json") != 0) { | ||||||
|  | 		err("path_append failed"); | ||||||
|  | 		return -1; | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	if ((stream->meta = load_json(path_json)) == NULL) { | ||||||
|  | 		err("load_json failed for: %s", path_json); | ||||||
|  | 		return -1; | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	if (path_append(path_obs, stream->path, "stream.obs") != 0) { | ||||||
|  | 		err("path_append failed"); | ||||||
|  | 		return -1; | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	if (load_obs(stream, path_obs) != 0) { | ||||||
|  | 		err("load_obs failed"); | ||||||
|  | 		return -1; | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return 0; | ||||||
|  | } | ||||||
|  | 
 | ||||||
| void | void | ||||||
| stream_data_set(struct stream *stream, void *data) | stream_data_set(struct stream *stream, void *data) | ||||||
| { | { | ||||||
| @ -144,6 +194,12 @@ stream_data_get(struct stream *stream) | |||||||
| 	return stream->data; | 	return stream->data; | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | JSON_Object * | ||||||
|  | stream_metadata(struct stream *stream) | ||||||
|  | { | ||||||
|  | 	return stream->meta; | ||||||
|  | } | ||||||
|  | 
 | ||||||
| int | int | ||||||
| stream_clkoff_set(struct stream *stream, int64_t clkoff) | stream_clkoff_set(struct stream *stream, int64_t clkoff) | ||||||
| { | { | ||||||
|  | |||||||
| @ -1,4 +1,4 @@ | |||||||
| /* Copyright (c) 2021-2023 Barcelona Supercomputing Center (BSC)
 | /* Copyright (c) 2021-2024 Barcelona Supercomputing Center (BSC)
 | ||||||
|  * SPDX-License-Identifier: GPL-3.0-or-later */ |  * SPDX-License-Identifier: GPL-3.0-or-later */ | ||||||
| 
 | 
 | ||||||
| #ifndef STREAM_H | #ifndef STREAM_H | ||||||
| @ -8,6 +8,7 @@ | |||||||
| #include <stdint.h> | #include <stdint.h> | ||||||
| #include "common.h" | #include "common.h" | ||||||
| #include "heap.h" | #include "heap.h" | ||||||
|  | #include "parson.h" | ||||||
| struct ovni_ev; | struct ovni_ev; | ||||||
| 
 | 
 | ||||||
| struct stream { | struct stream { | ||||||
| @ -34,6 +35,8 @@ struct stream { | |||||||
| 	int64_t offset; | 	int64_t offset; | ||||||
| 
 | 
 | ||||||
| 	double progress; | 	double progress; | ||||||
|  | 
 | ||||||
|  | 	JSON_Object *meta; | ||||||
| }; | }; | ||||||
| 
 | 
 | ||||||
| USE_RET int stream_load(struct stream *stream, const char *tracedir, const char *relpath); | USE_RET int stream_load(struct stream *stream, const char *tracedir, const char *relpath); | ||||||
| @ -46,5 +49,6 @@ USE_RET int64_t stream_lastclock(struct stream *stream); | |||||||
|         void stream_allow_unsorted(struct stream *stream); |         void stream_allow_unsorted(struct stream *stream); | ||||||
|         void stream_data_set(struct stream *stream, void *data); |         void stream_data_set(struct stream *stream, void *data); | ||||||
| USE_RET void *stream_data_get(struct stream *stream); | USE_RET void *stream_data_get(struct stream *stream); | ||||||
|  | USE_RET JSON_Object *stream_metadata(struct stream *stream); | ||||||
| 
 | 
 | ||||||
| #endif /* STREAM_H */ | #endif /* STREAM_H */ | ||||||
|  | |||||||
| @ -27,7 +27,7 @@ add_stream(struct trace *trace, struct stream *stream) | |||||||
| } | } | ||||||
| 
 | 
 | ||||||
| static int | static int | ||||||
| load_stream(struct trace *trace, const char *path) | load_stream(struct trace *trace, const char *json_path) | ||||||
| { | { | ||||||
| 	struct stream *stream = calloc(1, sizeof(struct stream)); | 	struct stream *stream = calloc(1, sizeof(struct stream)); | ||||||
| 
 | 
 | ||||||
| @ -36,6 +36,14 @@ load_stream(struct trace *trace, const char *path) | |||||||
| 		return -1; | 		return -1; | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | 	/* The json_path must end in .../stream.json, so remove it */ | ||||||
|  | 	char path[PATH_MAX]; | ||||||
|  | 	if (path_copy(path, json_path) != 0) { | ||||||
|  | 		err("path_copy failed"); | ||||||
|  | 		return -1; | ||||||
|  | 	} | ||||||
|  | 	path_dirname(path); | ||||||
|  | 
 | ||||||
| 	int offset = (int) strlen(trace->tracedir); | 	int offset = (int) strlen(trace->tracedir); | ||||||
| 	const char *relpath = path + offset; | 	const char *relpath = path + offset; | ||||||
| 
 | 
 | ||||||
| @ -53,46 +61,16 @@ load_stream(struct trace *trace, const char *path) | |||||||
| } | } | ||||||
| 
 | 
 | ||||||
| static int | static int | ||||||
| has_suffix(const char *str, const char *suffix) | is_stream(const char *fpath) | ||||||
| { | { | ||||||
| 	if (!str || !suffix) | 	const char *filename = path_filename(fpath); | ||||||
| 		return 0; |  | ||||||
| 
 | 
 | ||||||
| 	int lenstr = (int) strlen(str); | 	if (strcmp(filename, "stream.json") == 0) | ||||||
| 	int lensuffix = (int) strlen(suffix); |  | ||||||
| 
 |  | ||||||
| 	if (lensuffix > lenstr) |  | ||||||
| 		return 0; |  | ||||||
| 
 |  | ||||||
| 	const char *p = str + lenstr - lensuffix; |  | ||||||
| 	if (strncmp(p, suffix, (size_t) lensuffix) == 0) |  | ||||||
| 		return 1; | 		return 1; | ||||||
| 
 | 
 | ||||||
| 	return 0; | 	return 0; | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| static int |  | ||||||
| is_stream(const char *fpath) |  | ||||||
| { |  | ||||||
| 	if (has_suffix(fpath, OVNI_STREAM_EXT)) |  | ||||||
| 		return 1; |  | ||||||
| 
 |  | ||||||
| 	/* For compatibility load the old streams too */ |  | ||||||
| 	const char *filename = path_filename(fpath); |  | ||||||
| 
 |  | ||||||
| 	const char prefix[] = "thread."; |  | ||||||
| 	if (!path_has_prefix(filename, prefix)) |  | ||||||
| 		return 0; |  | ||||||
| 
 |  | ||||||
| 	const char *tid = filename + strlen(prefix); |  | ||||||
| 	for (int i = 0; tid[i]; i++) { |  | ||||||
| 		if (tid[i] < '0' || tid[i] > '9') |  | ||||||
| 			return 0; |  | ||||||
| 	} |  | ||||||
| 
 |  | ||||||
| 	return 1; |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| static int | static int | ||||||
| cb_nftw(const char *fpath, const struct stat *sb, | cb_nftw(const char *fpath, const struct stat *sb, | ||||||
| 		int typeflag, struct FTW *ftwbuf) | 		int typeflag, struct FTW *ftwbuf) | ||||||
|  | |||||||
| @ -104,17 +104,34 @@ void ovni_version_check_str(const char *version) | |||||||
| 	/* Ignore the patch number */ | 	/* Ignore the patch number */ | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | static void | ||||||
|  | create_thread_dir(void) | ||||||
|  | { | ||||||
|  | 	char path[PATH_MAX]; | ||||||
|  | 
 | ||||||
|  | 	int written = snprintf(path, PATH_MAX, "%s/thread.%d", | ||||||
|  | 			rproc.procdir, rthread.tid); | ||||||
|  | 
 | ||||||
|  | 	if (written >= PATH_MAX) | ||||||
|  | 		die("path too long: %s/thread.%d", rproc.procdir, rthread.tid); | ||||||
|  | 
 | ||||||
|  | 	/* The procdir must have been created earlier */ | ||||||
|  | 	if (mkdir(path, 0755) != 0) | ||||||
|  | 		die("mkdir(%s) failed:", path); | ||||||
|  | } | ||||||
|  | 
 | ||||||
| static void | static void | ||||||
| create_trace_stream(void) | create_trace_stream(void) | ||||||
| { | { | ||||||
| 	char path[PATH_MAX]; | 	char path[PATH_MAX]; | ||||||
| 
 | 
 | ||||||
| 	int written = snprintf(path, PATH_MAX, "%s/thread.%d%s", | 	int written = snprintf(path, PATH_MAX, "%s/thread.%d/stream.obs", | ||||||
| 			rproc.procdir, rthread.tid, OVNI_STREAM_EXT); | 			rproc.procdir, rthread.tid); | ||||||
| 
 | 
 | ||||||
| 	if (written >= PATH_MAX) | 	if (written >= PATH_MAX) { | ||||||
| 		die("thread trace path too long: %s/thread.%d%s", | 		die("path too long: %s/thread.%d/stream.obs", | ||||||
| 				rproc.procdir, rthread.tid, OVNI_STREAM_EXT); | 				rproc.procdir, rthread.tid); | ||||||
|  | 	} | ||||||
| 
 | 
 | ||||||
| 	rthread.streamfd = open(path, O_WRONLY | O_CREAT, 0644); | 	rthread.streamfd = open(path, O_WRONLY | O_CREAT, 0644); | ||||||
| 
 | 
 | ||||||
| @ -465,11 +482,11 @@ static void | |||||||
| thread_metadata_store(void) | thread_metadata_store(void) | ||||||
| { | { | ||||||
| 	char path[PATH_MAX]; | 	char path[PATH_MAX]; | ||||||
| 	int written = snprintf(path, PATH_MAX, "%s/thread.%d.json", | 	int written = snprintf(path, PATH_MAX, "%s/thread.%d/stream.json", | ||||||
| 			rproc.procdir, rthread.tid); | 			rproc.procdir, rthread.tid); | ||||||
| 
 | 
 | ||||||
| 	if (written >= PATH_MAX) | 	if (written >= PATH_MAX) | ||||||
| 		die("thread trace path too long: %s/thread.%d.json", | 		die("thread trace path too long: %s/thread.%d/stream.json", | ||||||
| 				rproc.procdir, rthread.tid); | 				rproc.procdir, rthread.tid); | ||||||
| 
 | 
 | ||||||
| 	if (json_serialize_to_file_pretty(rthread.meta, path) != JSONSuccess) | 	if (json_serialize_to_file_pretty(rthread.meta, path) != JSONSuccess) | ||||||
| @ -578,6 +595,7 @@ ovni_thread_init(pid_t tid) | |||||||
| 	if (rthread.evbuf == NULL) | 	if (rthread.evbuf == NULL) | ||||||
| 		die("malloc failed:"); | 		die("malloc failed:"); | ||||||
| 
 | 
 | ||||||
|  | 	create_thread_dir(); | ||||||
| 	create_trace_stream(); | 	create_trace_stream(); | ||||||
| 	write_stream_header(); | 	write_stream_header(); | ||||||
| 
 | 
 | ||||||
|  | |||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user