Port ovnisort

This commit is contained in:
Rodrigo Arias 2023-02-15 19:32:34 +01:00 committed by Rodrigo Arias Mallo
parent 61b0efee34
commit 451d4be386
7 changed files with 74 additions and 55 deletions

View File

@ -58,10 +58,10 @@ target_link_libraries(ovniemu emu parson-static ovni-static)
add_executable(ovnidump ovnidump.c) add_executable(ovnidump ovnidump.c)
target_link_libraries(ovnidump emu parson-static ovni-static) target_link_libraries(ovnidump emu parson-static ovni-static)
#
#add_executable(ovnisort ovnisort.c) add_executable(ovnisort ovnisort.c)
#target_link_libraries(ovnisort emu trace) target_link_libraries(ovnisort emu parson-static ovni-static)
#
## Use <PackageName>_ROOT variables if available, commonly used by MPI ## Use <PackageName>_ROOT variables if available, commonly used by MPI
## installations ## installations
#if(POLICY CMP0074) #if(POLICY CMP0074)

View File

@ -423,6 +423,9 @@ process_ev(struct emu *emu)
return pre_cpu(emu); return pre_cpu(emu);
case 'F': case 'F':
return pre_flush(emu); return pre_flush(emu);
case 'U':
/* Ignore sorting events */
return 0;
default: default:
err("unknown ovni event category %c\n", err("unknown ovni event category %c\n",
emu->ev->c); emu->ev->c);

View File

@ -238,13 +238,13 @@ execute_sort_plan(struct sortplan *sp)
/* Sort the events in the stream chronologically using a ring */ /* Sort the events in the stream chronologically using a ring */
static int static int
stream_winsort(struct ovni_stream *stream, struct ring *r) stream_winsort(struct stream *stream, struct ring *r)
{ {
char *fn = stream->thread->tracefile; char *fn = stream->path;
int fd = open(fn, O_WRONLY); int fd = open(fn, O_WRONLY);
if (fd < 0) if (fd < 0)
die("open %s failed: %s\n", fn, strerror(errno)); die("open %s failed:", fn);
ring_reset(r); ring_reset(r);
@ -256,9 +256,10 @@ stream_winsort(struct ovni_stream *stream, struct ring *r)
size_t empty_regions = 0; size_t empty_regions = 0;
size_t updated = 0; size_t updated = 0;
char st = 'S'; char st = 'S';
int ret = 0;
while (ovni_load_next_event(stream) == 0) { while ((ret = stream_step(stream)) == 0) {
struct ovni_ev *ev = stream->cur_ev; struct ovni_ev *ev = stream_ev(stream);
if (st == 'S' && starts_unsorted_region(ev)) { if (st == 'S' && starts_unsorted_region(ev)) {
st = 'U'; st = 'U';
@ -276,10 +277,11 @@ stream_winsort(struct ovni_stream *stream, struct ring *r)
if (ends_unsorted_region(ev)) { if (ends_unsorted_region(ev)) {
updated = 1; updated = 1;
sp.next = ev; sp.next = ev;
dbg("executing sort plan for stream tid=%d\n", stream->tid); dbg("executing sort plan for stream %s\n",
stream->relpath);
if (execute_sort_plan(&sp) < 0) { if (execute_sort_plan(&sp) < 0) {
err("sort failed for stream tid=%d\n", err("sort failed for stream %s\n",
stream->tid); stream->relpath);
return -1; return -1;
} }
@ -294,9 +296,14 @@ stream_winsort(struct ovni_stream *stream, struct ring *r)
ring_add(r, ev); ring_add(r, ev);
} }
if (ret < 0) {
err("stream_step failed");
return -1;
}
if (empty_regions > 0) if (empty_regions > 0)
err("warning: stream %d contains %ld empty sort regions\n", err("warning: stream %s contains %ld empty sort regions\n",
stream->tid, empty_regions); stream->relpath, empty_regions);
if (updated && fdatasync(fd) < 0) if (updated && fdatasync(fd) < 0)
die("fdatasync %s failed: %s\n", fn, strerror(errno)); die("fdatasync %s failed: %s\n", fn, strerror(errno));
@ -309,33 +316,44 @@ stream_winsort(struct ovni_stream *stream, struct ring *r)
/* Ensures that each individual stream is sorted */ /* Ensures that each individual stream is sorted */
static int static int
stream_check(struct ovni_stream *stream) stream_check(struct stream *stream)
{ {
if (ovni_load_next_event(stream) != 0) int ret = stream_step(stream);
if (ret < 0) {
err("stream_step failed");
return -1;
}
/* Reached the end */
if (ret != 0)
return 0; return 0;
struct ovni_ev *ev = stream->cur_ev; struct ovni_ev *ev = stream_ev(stream);
uint64_t last_clock = ev->header.clock; uint64_t last_clock = ev->header.clock;
int ret = 0;
while (ovni_load_next_event(stream) == 0) { while ((ret = stream_step(stream)) == 0) {
ev = stream->cur_ev; ev = stream_ev(stream);
uint64_t cur_clock = ovni_ev_get_clock(ev); uint64_t cur_clock = ovni_ev_get_clock(ev);
if (cur_clock < last_clock) { if (cur_clock < last_clock) {
err("backwards jump in time %ld -> %ld for stream tid=%d\n", err("backwards jump in time %ld -> %ld for stream %s\n",
last_clock, cur_clock, stream->tid); last_clock, cur_clock, stream->relpath);
ret = -1; ret = -1;
} }
last_clock = cur_clock; last_clock = cur_clock;
} }
if (ret < 0) {
err("stream_step failed");
return -1;
}
return ret; return ret;
} }
static int static int
process_trace(struct ovni_trace *trace) process_trace(struct trace *trace)
{ {
struct ring ring; struct ring ring;
int ret = 0; int ret = 0;
@ -346,19 +364,20 @@ process_trace(struct ovni_trace *trace)
if (ring.ev == NULL) if (ring.ev == NULL)
die("malloc failed: %s\n", strerror(errno)); die("malloc failed: %s\n", strerror(errno));
for (size_t i = 0; i < trace->nstreams; i++) { for (struct stream *stream = trace->streams; stream; stream = stream->next) {
struct ovni_stream *stream = &trace->stream[i]; stream_allow_unsorted(stream);
if (operation_mode == SORT) { if (operation_mode == SORT) {
dbg("sorting stream tid=%d\n", stream->tid); dbg("sorting stream %s\n", stream->relpath);
if (stream_winsort(stream, &ring) != 0) { if (stream_winsort(stream, &ring) != 0) {
err("sort stream tid=%d failed\n", stream->tid); err("sort stream %s failed\n", stream->relpath);
/* When sorting, return at the first /* When sorting, return at the first
* attempt */ * attempt */
return -1; return -1;
} }
} else { } else {
if (stream_check(stream) != 0) { if (stream_check(stream) != 0) {
err("stream tid=%d is not sorted\n", stream->tid); err("stream %s is not sorted\n", stream->relpath);
/* When checking, report all errors and /* When checking, report all errors and
* then fail */ * then fail */
ret = -1; ret = -1;
@ -427,31 +446,29 @@ parse_args(int argc, char *argv[])
int int
main(int argc, char *argv[]) main(int argc, char *argv[])
{ {
int ret = 0; progname_set("ovnisort");
parse_args(argc, argv); parse_args(argc, argv);
struct ovni_trace *trace = calloc(1, sizeof(struct ovni_trace)); struct trace *trace = calloc(1, sizeof(struct trace));
if (trace == NULL) { if (trace == NULL) {
perror("calloc"); err("calloc failed:");
exit(EXIT_FAILURE); return 1;
} }
if (ovni_load_trace(trace, tracedir)) if (trace_load(trace, tracedir) != 0) {
err("failed to load trace: %s", tracedir);
return 1; return 1;
}
if (ovni_load_streams(trace)) if (process_trace(trace) != 0) {
err("failed to process trace: %s", tracedir);
return 1; return 1;
}
if (process_trace(trace))
ret = 1;
ovni_free_streams(trace);
ovni_free_trace(trace);
free(trace); free(trace);
return ret; return 0;
} }

View File

@ -7,10 +7,10 @@ if(CMAKE_BUILD_TYPE STREQUAL "Release")
endif() endif()
ovni_test(flush.c) ovni_test(flush.c)
#ovni_test(sort.c SORT) ovni_test(sort.c SORT)
#ovni_test(empty-sort.c SORT) ovni_test(empty-sort.c SORT)
#ovni_test(sort-first-and-full-ring.c SORT ovni_test(sort-first-and-full-ring.c SORT
# SHOULD_FAIL REGEX "cannot find a event previous to clock") SHOULD_FAIL REGEX "cannot find a event previous to clock")
ovni_test(burst-stats.c REGEX "burst stats: median 33 ns, avg 33\.0 ns, max 33 ns") ovni_test(burst-stats.c REGEX "burst stats: median 33 ns, avg 33\.0 ns, max 33 ns")
ovni_test(mp-simple.c MP) ovni_test(mp-simple.c MP)
ovni_test(version-good.c) ovni_test(version-good.c)

View File

@ -21,7 +21,7 @@ static void
emit_jumbo(uint8_t *buf, size_t size, int64_t clock) emit_jumbo(uint8_t *buf, size_t size, int64_t clock)
{ {
struct ovni_ev ev = {0}; struct ovni_ev ev = {0};
ovni_ev_set_mcv(&ev, "O$$"); ovni_ev_set_mcv(&ev, "OUj");
ovni_ev_set_clock(&ev, clock); ovni_ev_set_clock(&ev, clock);
ovni_ev_jumbo_emit(&ev, buf, size); ovni_ev_jumbo_emit(&ev, buf, size);
} }

View File

@ -36,9 +36,8 @@ function(nodes_rt_test)
ENVIRONMENT "NOSV_CONFIG=${OVNI_TEST_SOURCE_DIR}/rt/nodes/nosv.toml") ENVIRONMENT "NOSV_CONFIG=${OVNI_TEST_SOURCE_DIR}/rt/nodes/nosv.toml")
endfunction() endfunction()
# FIXME: Add SORT for all tests once ovnisort is ported nodes_rt_test(../nanos6/simple-task.c NAME simple-task SORT)
nodes_rt_test(../nanos6/simple-task.c NAME simple-task) nodes_rt_test(../nanos6/nested-task.c NAME nested-task SORT)
nodes_rt_test(../nanos6/nested-task.c NAME nested-task) nodes_rt_test(../nanos6/several-tasks.c NAME several-tasks SORT)
nodes_rt_test(../nanos6/several-tasks.c NAME several-tasks) nodes_rt_test(../nanos6/if0.c NAME if0 SORT)
nodes_rt_test(../nanos6/if0.c NAME if0) nodes_rt_test(../nanos6/sched-add.c NAME sched-add SORT)
nodes_rt_test(../nanos6/sched-add.c NAME sched-add)

View File

@ -23,6 +23,6 @@ function(nosv_test)
ENVIRONMENT "NOSV_CONFIG=${OVNI_TEST_SOURCE_DIR}/rt/nosv/nosv.toml") ENVIRONMENT "NOSV_CONFIG=${OVNI_TEST_SOURCE_DIR}/rt/nosv/nosv.toml")
endfunction() endfunction()
nosv_test(attach.c) nosv_test(attach.c SORT)
nosv_test(waitfor.c) nosv_test(waitfor.c SORT)
nosv_test(several-tasks.c) nosv_test(several-tasks.c SORT)