From 02db4b6369f756c32c759fbdcfe3909c3c54563a Mon Sep 17 00:00:00 2001 From: Rodrigo Arias Date: Mon, 13 Feb 2023 20:26:03 +0100 Subject: [PATCH] Add support for NODES --- src/emu/CMakeLists.txt | 6 + src/emu/models.c | 2 + src/emu/nodes/connect.c | 87 +++++++++++++ src/emu/nodes/create.c | 126 +++++++++++++++++++ src/emu/nodes/event.c | 110 +++++++++++++++++ src/emu/nodes/finish.c | 44 +++++++ src/emu/nodes/nodes_priv.h | 56 +++++++++ src/emu/nodes/probe.c | 20 +++ src/emu/nodes/pvt.c | 228 +++++++++++++++++++++++++++++++++++ test/rt/CMakeLists.txt | 2 +- test/rt/nodes/CMakeLists.txt | 11 +- 11 files changed, 686 insertions(+), 6 deletions(-) create mode 100644 src/emu/nodes/connect.c create mode 100644 src/emu/nodes/create.c create mode 100644 src/emu/nodes/event.c create mode 100644 src/emu/nodes/finish.c create mode 100644 src/emu/nodes/nodes_priv.h create mode 100644 src/emu/nodes/probe.c create mode 100644 src/emu/nodes/pvt.c diff --git a/src/emu/CMakeLists.txt b/src/emu/CMakeLists.txt index fb3906e..0efd337 100644 --- a/src/emu/CMakeLists.txt +++ b/src/emu/CMakeLists.txt @@ -52,6 +52,12 @@ add_library(emu STATIC nosv/event.c nosv/pvt.c nosv/finish.c + nodes/probe.c + nodes/connect.c + nodes/create.c + nodes/event.c + nodes/pvt.c + nodes/finish.c ) add_executable(ovniemu ovniemu.c) diff --git a/src/emu/models.c b/src/emu/models.c index 026be46..c778be9 100644 --- a/src/emu/models.c +++ b/src/emu/models.c @@ -8,11 +8,13 @@ extern struct model_spec model_ovni; extern struct model_spec model_nanos6; extern struct model_spec model_nosv; +extern struct model_spec model_nodes; static struct model_spec *models[] = { &model_ovni, &model_nanos6, &model_nosv, + &model_nodes, NULL }; diff --git a/src/emu/nodes/connect.c b/src/emu/nodes/connect.c new file mode 100644 index 0000000..8d2cf6d --- /dev/null +++ b/src/emu/nodes/connect.c @@ -0,0 +1,87 @@ +#include "nodes_priv.h" + +static const int th_track[CH_MAX] = { + [CH_SUBSYSTEM] = TRACK_TH_RUN, /* FIXME: Why not active? */ +}; + +static const int cpu_track[CH_MAX] = { + [CH_SUBSYSTEM] = TRACK_TH_RUN, +}; + +int +nodes_get_track(int c, int type) +{ + if (type == CT_TH) + return th_track[c]; + else + return cpu_track[c]; +} + +static int +connect_cpu(struct emu *emu, struct cpu *scpu) +{ + struct nodes_cpu *cpu = EXT(scpu, 'D'); + for (int i = 0; i < CH_MAX; i++) { + struct track *track = &cpu->track[i]; + + /* Choose select CPU channel based on tracking mode (only + * TRACK_TH_RUN allowed, as active may cause collisions) */ + int mode = nodes_get_track(i, CT_CPU); + struct chan *sel = cpu_get_th_chan(scpu, mode); + if (track_set_select(track, mode, sel, NULL) != 0) { + err("track_select failed"); + return -1; + } + + /* Add each thread as input */ + for (struct thread *t = emu->system.threads; t; t = t->gnext) { + struct nodes_thread *th = EXT(t, 'D'); + + /* Choose input channel from the thread output channels + * based on CPU tracking mode */ + struct value key = value_int64(t->gindex); + struct chan *inp = track_get_output(&th->track[i], mode); + + if (track_add_input(track, mode, key, inp) != 0) { + err("track_add_input failed"); + return -1; + } + } + + /* Set the PRV output */ + track_set_default(track, nodes_get_track(i, CT_CPU)); + } + + return 0; +} + +int +nodes_connect(struct emu *emu) +{ + struct system *sys = &emu->system; + + /* threads */ + for (struct thread *t = sys->threads; t; t = t->gnext) { + struct nodes_thread *th = EXT(t, 'D'); + struct chan *sel = &t->chan[TH_CHAN_STATE]; + if (track_connect_thread(th->track, th->ch, th_track, sel, CH_MAX) != 0) { + err("track_thread failed"); + return -1; + } + } + + /* cpus */ + for (struct cpu *c = sys->cpus; c; c = c->next) { + if (connect_cpu(emu, c) != 0) { + err("connect_cpu failed"); + return -1; + } + } + + if (nodes_init_pvt(emu) != 0) { + err("init_pvt failed"); + return -1; + } + + return 0; +} diff --git a/src/emu/nodes/create.c b/src/emu/nodes/create.c new file mode 100644 index 0000000..24b2a48 --- /dev/null +++ b/src/emu/nodes/create.c @@ -0,0 +1,126 @@ +#include "nodes_priv.h" + +static const char *chan_name[CH_MAX] = { + [CH_SUBSYSTEM] = "subsystem", +}; + +static const int chan_stack[CH_MAX] = { + [CH_SUBSYSTEM] = 1, +}; + +static int +init_chans(struct bay *bay, struct chan *chans, const char *fmt, int64_t gindex) +{ + for (int i = 0; i < CH_MAX; i++) { + struct chan *c = &chans[i]; + int type = chan_stack[i]; + chan_init(c, type, fmt, gindex, chan_name[i]); + + if (bay_register(bay, c) != 0) { + err("bay_register failed"); + return -1; + } + } + + return 0; +} + +static int +init_tracks(struct bay *bay, struct track *tracks, const char *fmt, int64_t gindex) +{ + for (int i = 0; i < CH_MAX; i++) { + struct track *track = &tracks[i]; + + if (track_init(track, bay, TRACK_TYPE_TH, fmt, gindex, chan_name[i]) != 0) { + err("track_init failed"); + return -1; + } + } + + return 0; +} + +static int +init_cpu(struct bay *bay, struct cpu *syscpu) +{ + struct nodes_cpu *cpu = calloc(1, sizeof(struct nodes_cpu)); + if (cpu == NULL) { + err("calloc failed:"); + return -1; + } + + cpu->track = calloc(CH_MAX, sizeof(struct track)); + if (cpu->track == NULL) { + err("calloc failed:"); + return -1; + } + + char *fmt = "nodes.cpu%ld.%s"; + if (init_tracks(bay, cpu->track, fmt, syscpu->gindex) != 0) { + err("init_chans failed"); + return -1; + } + + extend_set(&syscpu->ext, 'D', cpu); + return 0; +} + +static int +init_thread(struct bay *bay, struct thread *systh) +{ + struct nodes_thread *th = calloc(1, sizeof(struct nodes_thread)); + if (th == NULL) { + err("calloc failed:"); + return -1; + } + + th->ch = calloc(CH_MAX, sizeof(struct chan)); + if (th->ch == NULL) { + err("calloc failed:"); + return -1; + } + + th->track = calloc(CH_MAX, sizeof(struct track)); + if (th->track == NULL) { + err("calloc failed:"); + return -1; + } + + char *fmt = "nodes.thread%ld.%s"; + if (init_chans(bay, th->ch, fmt, systh->gindex) != 0) { + err("init_chans failed"); + return -1; + } + + if (init_tracks(bay, th->track, fmt, systh->gindex) != 0) { + err("init_tracks failed"); + return -1; + } + + extend_set(&systh->ext, 'D', th); + + return 0; +} + +int +nodes_create(struct emu *emu) +{ + struct system *sys = &emu->system; + struct bay *bay = &emu->bay; + + for (struct cpu *c = sys->cpus; c; c = c->next) { + if (init_cpu(bay, c) != 0) { + err("init_cpu failed"); + return -1; + } + } + + for (struct thread *t = sys->threads; t; t = t->gnext) { + if (init_thread(bay, t) != 0) { + err("init_thread failed"); + return -1; + } + } + + return 0; +} diff --git a/src/emu/nodes/event.c b/src/emu/nodes/event.c new file mode 100644 index 0000000..da0ebb6 --- /dev/null +++ b/src/emu/nodes/event.c @@ -0,0 +1,110 @@ +#include "nodes_priv.h" + +enum { PUSH = 1, POP = 2, IGN = 3 }; + +#define CHSS CH_SUBSYSTEM + +static const int ss_table[256][256][3] = { + ['R'] = { + ['['] = { CHSS, PUSH, ST_REGISTER }, + [']'] = { CHSS, POP, ST_REGISTER }, + }, + ['U'] = { + ['['] = { CHSS, PUSH, ST_UNREGISTER }, + [']'] = { CHSS, POP, ST_UNREGISTER }, + }, + ['W'] = { + ['['] = { CHSS, PUSH, ST_IF0_WAIT }, + [']'] = { CHSS, POP, ST_IF0_WAIT }, + }, + ['I'] = { + ['['] = { CHSS, PUSH, ST_IF0_INLINE }, + [']'] = { CHSS, POP, ST_IF0_INLINE }, + }, + ['T'] = { + ['['] = { CHSS, PUSH, ST_TASKWAIT }, + [']'] = { CHSS, POP, ST_TASKWAIT }, + }, + ['C'] = { + ['['] = { CHSS, PUSH, ST_CREATE }, + [']'] = { CHSS, POP, ST_CREATE }, + }, + ['S'] = { + ['['] = { CHSS, PUSH, ST_SUBMIT }, + [']'] = { CHSS, POP, ST_SUBMIT }, + }, + ['P'] = { + ['['] = { CHSS, PUSH, ST_SPAWN }, + [']'] = { CHSS, POP, ST_SPAWN }, + }, +}; + +static int +simple(struct emu *emu) +{ + const int *entry = ss_table[emu->ev->c][emu->ev->v]; + int chind = entry[0]; + int action = entry[1]; + int st = entry[2]; + + struct nodes_thread *th = EXT(emu->thread, 'D'); + struct chan *ch = &th->ch[chind]; + + if (action == PUSH) { + return chan_push(ch, value_int64(st)); + } else if (action == POP) { + return chan_pop(ch, value_int64(st)); + } else if (action == IGN) { + return 0; /* do nothing */ + } else { + err("unknown NODES subsystem event"); + return -1; + } + + return 0; +} + +static int +process_ev(struct emu *emu) +{ + if (!emu->thread->is_running) { + err("current thread %d not running", emu->thread->tid); + return -1; + } + + switch (emu->ev->c) { + case 'R': + case 'U': + case 'W': + case 'I': + case 'T': + case 'C': + case 'S': + case 'P': + return simple(emu); + default: + err("unknown NODES event category"); + return -1; + } + + /* Not reached */ + return 0; +} + +int +nodes_event(struct emu *emu) +{ + dbg("in nodes_event"); + if (emu->ev->m != 'D') { + err("unexpected event model %c\n", emu->ev->m); + return -1; + } + + dbg("got nodes event %s", emu->ev->mcv); + if (process_ev(emu) != 0) { + err("error processing NODES event"); + return -1; + } + + return 0; +} diff --git a/src/emu/nodes/finish.c b/src/emu/nodes/finish.c new file mode 100644 index 0000000..c11b500 --- /dev/null +++ b/src/emu/nodes/finish.c @@ -0,0 +1,44 @@ +#include "nodes_priv.h" + +static int +end_lint(struct emu *emu) +{ + struct system *sys = &emu->system; + + /* Ensure we run out of subsystem states */ + for (struct thread *t = sys->threads; t; t = t->gnext) { + struct nodes_thread *th = EXT(t, 'D'); + struct chan *ch = &th->ch[CH_SUBSYSTEM]; + int stacked = ch->data.stack.n; + if (stacked > 0) { + struct value top; + if (chan_read(ch, &top) != 0) { + err("chan_read failed for subsystem"); + return -1; + } + + err("thread %d ended with %d stacked nodes subsystems, top=\"%s\"\n", + t->tid, stacked, nodes_ss_name(top.i)); + return -1; + } + } + + return 0; +} + +int +nodes_finish(struct emu *emu) +{ + if (nodes_finish_pvt(emu) != 0) { + err("finish_pvt failed"); + return -1; + } + + /* When running in linter mode perform additional checks */ + if (emu->args.linter_mode && end_lint(emu) != 0) { + err("end_lint failed"); + return -1; + } + + return 0; +} diff --git a/src/emu/nodes/nodes_priv.h b/src/emu/nodes/nodes_priv.h new file mode 100644 index 0000000..3ba59fb --- /dev/null +++ b/src/emu/nodes/nodes_priv.h @@ -0,0 +1,56 @@ +/* Copyright (c) 2023 Barcelona Supercomputing Center (BSC) + * SPDX-License-Identifier: GPL-3.0-or-later */ + +#ifndef NODES_PRIV_H +#define NODES_PRIV_H + +#include "emu.h" +#include "chan.h" +#include "mux.h" +#include "task.h" + +/* Private enums */ + +enum nodes_chan_type { + CT_TH = 0, + CT_CPU, + CT_MAX +}; + +enum nodes_chan { + CH_SUBSYSTEM = 0, + CH_MAX, +}; + +enum nodes_ss_values { + ST_REGISTER = 1, + ST_UNREGISTER, + ST_IF0_WAIT, + ST_IF0_INLINE, + ST_TASKWAIT, + ST_CREATE, + ST_SUBMIT, + ST_SPAWN, +}; + +struct nodes_thread { + struct chan *ch; + struct track *track; +}; + +struct nodes_cpu { + struct track *track; +}; + +int nodes_probe(struct emu *emu); +int nodes_create(struct emu *emu); +int nodes_connect(struct emu *emu); +int nodes_event(struct emu *emu); +int nodes_finish(struct emu *emu); + +int nodes_init_pvt(struct emu *emu); +int nodes_finish_pvt(struct emu *emu); +const char *nodes_ss_name(int ss); +int nodes_get_track(int c, int type); + +#endif /* NODES_PRIV_H */ diff --git a/src/emu/nodes/probe.c b/src/emu/nodes/probe.c new file mode 100644 index 0000000..2a48780 --- /dev/null +++ b/src/emu/nodes/probe.c @@ -0,0 +1,20 @@ +#include "nodes_priv.h" + +struct model_spec model_nodes = { + .name = "nodes", + .model = 'D', + .create = nodes_create, + .connect = nodes_connect, + .event = nodes_event, + .probe = nodes_probe, + .finish = nodes_finish, +}; + +int +nodes_probe(struct emu *emu) +{ + if (emu->system.nthreads == 0) + return 1; + + return 0; +} diff --git a/src/emu/nodes/pvt.c b/src/emu/nodes/pvt.c new file mode 100644 index 0000000..88cc04b --- /dev/null +++ b/src/emu/nodes/pvt.c @@ -0,0 +1,228 @@ +#include "nodes_priv.h" + +/* TODO: Assign types on runtime and generate configs */ + +static const int pvt_type[] = { + [CH_SUBSYSTEM] = 30, +}; + +static const char *pcf_prefix[CH_MAX] = { + [CH_SUBSYSTEM] = "NODES subsystem", +}; + +static const char *pcf_suffix[TRACK_TH_MAX] = { + [TRACK_TH_ANY] = "", + [TRACK_TH_RUN] = "of the RUNNING thread", + [TRACK_TH_ACT] = "of the ACTIVE thread", +}; + +static const struct pcf_value_label nodes_ss_values[] = { + { ST_REGISTER, "Dependencies: Registering task accesses" }, + { ST_UNREGISTER, "Dependencies: Unregistering task accesses" }, + { ST_IF0_WAIT, "If0: Waiting for an If0 task" }, + { ST_IF0_INLINE, "If0: Executing an If0 task inline" }, + { ST_TASKWAIT, "Taskwait: Taskwait" }, + { ST_CREATE, "Add Task: Creating a task" }, + { ST_SUBMIT, "Add Task: Submitting a task" }, + { ST_SPAWN, "Spawn Function: Spawning a function" }, + { -1, NULL }, +}; + +static const struct pcf_value_label (*pcf_chan_value_labels[CH_MAX])[] = { + [CH_SUBSYSTEM] = &nodes_ss_values, +}; + +/* ------------------------------ pcf ------------------------------ */ + +static int +create_values(struct pcf_type *t, int c) +{ + const struct pcf_value_label(*q)[] = pcf_chan_value_labels[c]; + + if (q == NULL) + return 0; + + for (const struct pcf_value_label *p = *q; p->label != NULL; p++) + pcf_add_value(t, p->value, p->label); + + return 0; +} + +static int +create_type(struct pcf *pcf, enum nodes_chan c, enum nodes_chan_type ct) +{ + long type = pvt_type[c]; + + if (type == -1) + return 0; + + /* Compute the label by joining the two parts */ + const char *prefix = pcf_prefix[c]; + int track_mode = nodes_get_track(c, ct); + const char *suffix = pcf_suffix[track_mode]; + + char label[MAX_PCF_LABEL]; + int ret = snprintf(label, MAX_PCF_LABEL, "%s %s", + prefix, suffix); + + if (ret >= MAX_PCF_LABEL) { + err("computed type label too long"); + return -1; + } + + struct pcf_type *pcftype = pcf_add_type(pcf, type, label); + + return create_values(pcftype, c); +} + +static int +init_pcf(struct pcf *pcf, enum nodes_chan_type ct) +{ + /* Create default types and values */ + for (enum nodes_chan c = 0; c < CH_MAX; c++) { + if (create_type(pcf, c, ct) != 0) { + err("create_type failed"); + return -1; + } + } + + return 0; +} + +/* ------------------------------ prv ------------------------------ */ + +static int +connect_thread_prv(struct emu *emu, struct thread *thread, struct prv *prv) +{ + struct nodes_thread *th = EXT(thread, 'D'); + for (int i = 0; i < CH_MAX; i++) { + struct chan *out = track_get_default(&th->track[i]); + long type = pvt_type[i]; + long row = thread->gindex; + if (prv_register(prv, row, type, &emu->bay, out, PRV_DUP)) { + err("prv_register failed"); + return -1; + } + } + + return 0; +} + +static int +connect_cpu_prv(struct emu *emu, struct cpu *scpu, struct prv *prv) +{ + struct nodes_cpu *cpu = EXT(scpu, 'D'); + for (int i = 0; i < CH_MAX; i++) { + struct chan *out = track_get_default(&cpu->track[i]); + long type = pvt_type[i]; + long row = scpu->gindex; + if (prv_register(prv, row, type, &emu->bay, out, PRV_DUP)) { + err("prv_register failed"); + return -1; + } + } + + return 0; +} + +static int +connect_threads(struct emu *emu) +{ + struct system *sys = &emu->system; + + /* Get thread PRV */ + struct pvt *pvt = recorder_find_pvt(&emu->recorder, "thread"); + if (pvt == NULL) { + err("cannot find thread pvt"); + return -1; + } + + /* Connect thread channels to PRV */ + struct prv *prv = pvt_get_prv(pvt); + for (struct thread *t = sys->threads; t; t = t->gnext) { + if (connect_thread_prv(emu, t, prv) != 0) { + err("connect_thread_prv failed"); + return -1; + } + } + + /* Init thread PCF */ + struct pcf *pcf = pvt_get_pcf(pvt); + if (init_pcf(pcf, CT_TH) != 0) { + err("init_pcf failed"); + return -1; + } + + return 0; +} + +static int +connect_cpus(struct emu *emu) +{ + struct system *sys = &emu->system; + + /* Get cpu PRV */ + struct pvt *pvt = recorder_find_pvt(&emu->recorder, "cpu"); + if (pvt == NULL) { + err("cannot find cpu pvt"); + return -1; + } + + /* Connect CPU channels to PRV */ + struct prv *prv = pvt_get_prv(pvt); + for (struct cpu *c = sys->cpus; c; c = c->next) { + if (connect_cpu_prv(emu, c, prv) != 0) { + err("connect_cpu_prv failed"); + return -1; + } + } + + /* Init CPU PCF */ + struct pcf *pcf = pvt_get_pcf(pvt); + if (init_pcf(pcf, CT_CPU) != 0) { + err("init_pcf failed"); + return -1; + } + + return 0; +} + +/* Connect all outputs to the paraver trace and setup PCF types */ +int +nodes_init_pvt(struct emu *emu) +{ + if (connect_threads(emu) != 0) { + err("connect_threads failed"); + return -1; + } + + if (connect_cpus(emu) != 0) { + err("connect_cpus failed"); + return -1; + } + + return 0; +} + +int +nodes_finish_pvt(struct emu *emu) +{ + UNUSED(emu); + return 0; +} + +const char * +nodes_ss_name(int ss) +{ + static const char *unknown = "(unknown)"; + const char *name = unknown; + const struct pcf_value_label *pv; + for (pv = &nodes_ss_values[0]; pv->label; pv++) { + if (pv->value == ss) { + name = pv->label; + break; + } + } + + return name; +} diff --git a/test/rt/CMakeLists.txt b/test/rt/CMakeLists.txt index 0945b87..7d11797 100644 --- a/test/rt/CMakeLists.txt +++ b/test/rt/CMakeLists.txt @@ -2,5 +2,5 @@ # SPDX-License-Identifier: GPL-3.0-or-later add_subdirectory(nanos6) -#add_subdirectory(nodes) +add_subdirectory(nodes) add_subdirectory(nosv) diff --git a/test/rt/nodes/CMakeLists.txt b/test/rt/nodes/CMakeLists.txt index 2b97ec3..d32dcd2 100644 --- a/test/rt/nodes/CMakeLists.txt +++ b/test/rt/nodes/CMakeLists.txt @@ -36,8 +36,9 @@ function(nodes_rt_test) ENVIRONMENT "NOSV_CONFIG=${OVNI_TEST_SOURCE_DIR}/rt/nodes/nosv.toml") endfunction() -nodes_rt_test(../nanos6/simple-task.c NAME simple-task SORT) -nodes_rt_test(../nanos6/nested-task.c NAME nested-task SORT) -nodes_rt_test(../nanos6/several-tasks.c NAME several-tasks SORT) -nodes_rt_test(../nanos6/if0.c NAME if0 SORT) -nodes_rt_test(../nanos6/sched-add.c NAME sched-add SORT) +# FIXME: Add SORT for all tests once ovnisort is ported +nodes_rt_test(../nanos6/simple-task.c NAME simple-task) +nodes_rt_test(../nanos6/nested-task.c NAME nested-task) +nodes_rt_test(../nanos6/several-tasks.c NAME several-tasks) +nodes_rt_test(../nanos6/if0.c NAME if0) +nodes_rt_test(../nanos6/sched-add.c NAME sched-add)