Add support for NODES

This commit is contained in:
Rodrigo Arias 2023-02-13 20:26:03 +01:00 committed by Rodrigo Arias Mallo
parent d48c9758bc
commit 02db4b6369
11 changed files with 686 additions and 6 deletions

View File

@ -52,6 +52,12 @@ add_library(emu STATIC
nosv/event.c
nosv/pvt.c
nosv/finish.c
nodes/probe.c
nodes/connect.c
nodes/create.c
nodes/event.c
nodes/pvt.c
nodes/finish.c
)
add_executable(ovniemu ovniemu.c)

View File

@ -8,11 +8,13 @@
extern struct model_spec model_ovni;
extern struct model_spec model_nanos6;
extern struct model_spec model_nosv;
extern struct model_spec model_nodes;
static struct model_spec *models[] = {
&model_ovni,
&model_nanos6,
&model_nosv,
&model_nodes,
NULL
};

87
src/emu/nodes/connect.c Normal file
View File

@ -0,0 +1,87 @@
#include "nodes_priv.h"
static const int th_track[CH_MAX] = {
[CH_SUBSYSTEM] = TRACK_TH_RUN, /* FIXME: Why not active? */
};
static const int cpu_track[CH_MAX] = {
[CH_SUBSYSTEM] = TRACK_TH_RUN,
};
int
nodes_get_track(int c, int type)
{
if (type == CT_TH)
return th_track[c];
else
return cpu_track[c];
}
static int
connect_cpu(struct emu *emu, struct cpu *scpu)
{
struct nodes_cpu *cpu = EXT(scpu, 'D');
for (int i = 0; i < CH_MAX; i++) {
struct track *track = &cpu->track[i];
/* Choose select CPU channel based on tracking mode (only
* TRACK_TH_RUN allowed, as active may cause collisions) */
int mode = nodes_get_track(i, CT_CPU);
struct chan *sel = cpu_get_th_chan(scpu, mode);
if (track_set_select(track, mode, sel, NULL) != 0) {
err("track_select failed");
return -1;
}
/* Add each thread as input */
for (struct thread *t = emu->system.threads; t; t = t->gnext) {
struct nodes_thread *th = EXT(t, 'D');
/* Choose input channel from the thread output channels
* based on CPU tracking mode */
struct value key = value_int64(t->gindex);
struct chan *inp = track_get_output(&th->track[i], mode);
if (track_add_input(track, mode, key, inp) != 0) {
err("track_add_input failed");
return -1;
}
}
/* Set the PRV output */
track_set_default(track, nodes_get_track(i, CT_CPU));
}
return 0;
}
int
nodes_connect(struct emu *emu)
{
struct system *sys = &emu->system;
/* threads */
for (struct thread *t = sys->threads; t; t = t->gnext) {
struct nodes_thread *th = EXT(t, 'D');
struct chan *sel = &t->chan[TH_CHAN_STATE];
if (track_connect_thread(th->track, th->ch, th_track, sel, CH_MAX) != 0) {
err("track_thread failed");
return -1;
}
}
/* cpus */
for (struct cpu *c = sys->cpus; c; c = c->next) {
if (connect_cpu(emu, c) != 0) {
err("connect_cpu failed");
return -1;
}
}
if (nodes_init_pvt(emu) != 0) {
err("init_pvt failed");
return -1;
}
return 0;
}

126
src/emu/nodes/create.c Normal file
View File

@ -0,0 +1,126 @@
#include "nodes_priv.h"
static const char *chan_name[CH_MAX] = {
[CH_SUBSYSTEM] = "subsystem",
};
static const int chan_stack[CH_MAX] = {
[CH_SUBSYSTEM] = 1,
};
static int
init_chans(struct bay *bay, struct chan *chans, const char *fmt, int64_t gindex)
{
for (int i = 0; i < CH_MAX; i++) {
struct chan *c = &chans[i];
int type = chan_stack[i];
chan_init(c, type, fmt, gindex, chan_name[i]);
if (bay_register(bay, c) != 0) {
err("bay_register failed");
return -1;
}
}
return 0;
}
static int
init_tracks(struct bay *bay, struct track *tracks, const char *fmt, int64_t gindex)
{
for (int i = 0; i < CH_MAX; i++) {
struct track *track = &tracks[i];
if (track_init(track, bay, TRACK_TYPE_TH, fmt, gindex, chan_name[i]) != 0) {
err("track_init failed");
return -1;
}
}
return 0;
}
static int
init_cpu(struct bay *bay, struct cpu *syscpu)
{
struct nodes_cpu *cpu = calloc(1, sizeof(struct nodes_cpu));
if (cpu == NULL) {
err("calloc failed:");
return -1;
}
cpu->track = calloc(CH_MAX, sizeof(struct track));
if (cpu->track == NULL) {
err("calloc failed:");
return -1;
}
char *fmt = "nodes.cpu%ld.%s";
if (init_tracks(bay, cpu->track, fmt, syscpu->gindex) != 0) {
err("init_chans failed");
return -1;
}
extend_set(&syscpu->ext, 'D', cpu);
return 0;
}
static int
init_thread(struct bay *bay, struct thread *systh)
{
struct nodes_thread *th = calloc(1, sizeof(struct nodes_thread));
if (th == NULL) {
err("calloc failed:");
return -1;
}
th->ch = calloc(CH_MAX, sizeof(struct chan));
if (th->ch == NULL) {
err("calloc failed:");
return -1;
}
th->track = calloc(CH_MAX, sizeof(struct track));
if (th->track == NULL) {
err("calloc failed:");
return -1;
}
char *fmt = "nodes.thread%ld.%s";
if (init_chans(bay, th->ch, fmt, systh->gindex) != 0) {
err("init_chans failed");
return -1;
}
if (init_tracks(bay, th->track, fmt, systh->gindex) != 0) {
err("init_tracks failed");
return -1;
}
extend_set(&systh->ext, 'D', th);
return 0;
}
int
nodes_create(struct emu *emu)
{
struct system *sys = &emu->system;
struct bay *bay = &emu->bay;
for (struct cpu *c = sys->cpus; c; c = c->next) {
if (init_cpu(bay, c) != 0) {
err("init_cpu failed");
return -1;
}
}
for (struct thread *t = sys->threads; t; t = t->gnext) {
if (init_thread(bay, t) != 0) {
err("init_thread failed");
return -1;
}
}
return 0;
}

110
src/emu/nodes/event.c Normal file
View File

@ -0,0 +1,110 @@
#include "nodes_priv.h"
enum { PUSH = 1, POP = 2, IGN = 3 };
#define CHSS CH_SUBSYSTEM
static const int ss_table[256][256][3] = {
['R'] = {
['['] = { CHSS, PUSH, ST_REGISTER },
[']'] = { CHSS, POP, ST_REGISTER },
},
['U'] = {
['['] = { CHSS, PUSH, ST_UNREGISTER },
[']'] = { CHSS, POP, ST_UNREGISTER },
},
['W'] = {
['['] = { CHSS, PUSH, ST_IF0_WAIT },
[']'] = { CHSS, POP, ST_IF0_WAIT },
},
['I'] = {
['['] = { CHSS, PUSH, ST_IF0_INLINE },
[']'] = { CHSS, POP, ST_IF0_INLINE },
},
['T'] = {
['['] = { CHSS, PUSH, ST_TASKWAIT },
[']'] = { CHSS, POP, ST_TASKWAIT },
},
['C'] = {
['['] = { CHSS, PUSH, ST_CREATE },
[']'] = { CHSS, POP, ST_CREATE },
},
['S'] = {
['['] = { CHSS, PUSH, ST_SUBMIT },
[']'] = { CHSS, POP, ST_SUBMIT },
},
['P'] = {
['['] = { CHSS, PUSH, ST_SPAWN },
[']'] = { CHSS, POP, ST_SPAWN },
},
};
static int
simple(struct emu *emu)
{
const int *entry = ss_table[emu->ev->c][emu->ev->v];
int chind = entry[0];
int action = entry[1];
int st = entry[2];
struct nodes_thread *th = EXT(emu->thread, 'D');
struct chan *ch = &th->ch[chind];
if (action == PUSH) {
return chan_push(ch, value_int64(st));
} else if (action == POP) {
return chan_pop(ch, value_int64(st));
} else if (action == IGN) {
return 0; /* do nothing */
} else {
err("unknown NODES subsystem event");
return -1;
}
return 0;
}
static int
process_ev(struct emu *emu)
{
if (!emu->thread->is_running) {
err("current thread %d not running", emu->thread->tid);
return -1;
}
switch (emu->ev->c) {
case 'R':
case 'U':
case 'W':
case 'I':
case 'T':
case 'C':
case 'S':
case 'P':
return simple(emu);
default:
err("unknown NODES event category");
return -1;
}
/* Not reached */
return 0;
}
int
nodes_event(struct emu *emu)
{
dbg("in nodes_event");
if (emu->ev->m != 'D') {
err("unexpected event model %c\n", emu->ev->m);
return -1;
}
dbg("got nodes event %s", emu->ev->mcv);
if (process_ev(emu) != 0) {
err("error processing NODES event");
return -1;
}
return 0;
}

44
src/emu/nodes/finish.c Normal file
View File

@ -0,0 +1,44 @@
#include "nodes_priv.h"
static int
end_lint(struct emu *emu)
{
struct system *sys = &emu->system;
/* Ensure we run out of subsystem states */
for (struct thread *t = sys->threads; t; t = t->gnext) {
struct nodes_thread *th = EXT(t, 'D');
struct chan *ch = &th->ch[CH_SUBSYSTEM];
int stacked = ch->data.stack.n;
if (stacked > 0) {
struct value top;
if (chan_read(ch, &top) != 0) {
err("chan_read failed for subsystem");
return -1;
}
err("thread %d ended with %d stacked nodes subsystems, top=\"%s\"\n",
t->tid, stacked, nodes_ss_name(top.i));
return -1;
}
}
return 0;
}
int
nodes_finish(struct emu *emu)
{
if (nodes_finish_pvt(emu) != 0) {
err("finish_pvt failed");
return -1;
}
/* When running in linter mode perform additional checks */
if (emu->args.linter_mode && end_lint(emu) != 0) {
err("end_lint failed");
return -1;
}
return 0;
}

View File

@ -0,0 +1,56 @@
/* Copyright (c) 2023 Barcelona Supercomputing Center (BSC)
* SPDX-License-Identifier: GPL-3.0-or-later */
#ifndef NODES_PRIV_H
#define NODES_PRIV_H
#include "emu.h"
#include "chan.h"
#include "mux.h"
#include "task.h"
/* Private enums */
enum nodes_chan_type {
CT_TH = 0,
CT_CPU,
CT_MAX
};
enum nodes_chan {
CH_SUBSYSTEM = 0,
CH_MAX,
};
enum nodes_ss_values {
ST_REGISTER = 1,
ST_UNREGISTER,
ST_IF0_WAIT,
ST_IF0_INLINE,
ST_TASKWAIT,
ST_CREATE,
ST_SUBMIT,
ST_SPAWN,
};
struct nodes_thread {
struct chan *ch;
struct track *track;
};
struct nodes_cpu {
struct track *track;
};
int nodes_probe(struct emu *emu);
int nodes_create(struct emu *emu);
int nodes_connect(struct emu *emu);
int nodes_event(struct emu *emu);
int nodes_finish(struct emu *emu);
int nodes_init_pvt(struct emu *emu);
int nodes_finish_pvt(struct emu *emu);
const char *nodes_ss_name(int ss);
int nodes_get_track(int c, int type);
#endif /* NODES_PRIV_H */

20
src/emu/nodes/probe.c Normal file
View File

@ -0,0 +1,20 @@
#include "nodes_priv.h"
struct model_spec model_nodes = {
.name = "nodes",
.model = 'D',
.create = nodes_create,
.connect = nodes_connect,
.event = nodes_event,
.probe = nodes_probe,
.finish = nodes_finish,
};
int
nodes_probe(struct emu *emu)
{
if (emu->system.nthreads == 0)
return 1;
return 0;
}

228
src/emu/nodes/pvt.c Normal file
View File

@ -0,0 +1,228 @@
#include "nodes_priv.h"
/* TODO: Assign types on runtime and generate configs */
static const int pvt_type[] = {
[CH_SUBSYSTEM] = 30,
};
static const char *pcf_prefix[CH_MAX] = {
[CH_SUBSYSTEM] = "NODES subsystem",
};
static const char *pcf_suffix[TRACK_TH_MAX] = {
[TRACK_TH_ANY] = "",
[TRACK_TH_RUN] = "of the RUNNING thread",
[TRACK_TH_ACT] = "of the ACTIVE thread",
};
static const struct pcf_value_label nodes_ss_values[] = {
{ ST_REGISTER, "Dependencies: Registering task accesses" },
{ ST_UNREGISTER, "Dependencies: Unregistering task accesses" },
{ ST_IF0_WAIT, "If0: Waiting for an If0 task" },
{ ST_IF0_INLINE, "If0: Executing an If0 task inline" },
{ ST_TASKWAIT, "Taskwait: Taskwait" },
{ ST_CREATE, "Add Task: Creating a task" },
{ ST_SUBMIT, "Add Task: Submitting a task" },
{ ST_SPAWN, "Spawn Function: Spawning a function" },
{ -1, NULL },
};
static const struct pcf_value_label (*pcf_chan_value_labels[CH_MAX])[] = {
[CH_SUBSYSTEM] = &nodes_ss_values,
};
/* ------------------------------ pcf ------------------------------ */
static int
create_values(struct pcf_type *t, int c)
{
const struct pcf_value_label(*q)[] = pcf_chan_value_labels[c];
if (q == NULL)
return 0;
for (const struct pcf_value_label *p = *q; p->label != NULL; p++)
pcf_add_value(t, p->value, p->label);
return 0;
}
static int
create_type(struct pcf *pcf, enum nodes_chan c, enum nodes_chan_type ct)
{
long type = pvt_type[c];
if (type == -1)
return 0;
/* Compute the label by joining the two parts */
const char *prefix = pcf_prefix[c];
int track_mode = nodes_get_track(c, ct);
const char *suffix = pcf_suffix[track_mode];
char label[MAX_PCF_LABEL];
int ret = snprintf(label, MAX_PCF_LABEL, "%s %s",
prefix, suffix);
if (ret >= MAX_PCF_LABEL) {
err("computed type label too long");
return -1;
}
struct pcf_type *pcftype = pcf_add_type(pcf, type, label);
return create_values(pcftype, c);
}
static int
init_pcf(struct pcf *pcf, enum nodes_chan_type ct)
{
/* Create default types and values */
for (enum nodes_chan c = 0; c < CH_MAX; c++) {
if (create_type(pcf, c, ct) != 0) {
err("create_type failed");
return -1;
}
}
return 0;
}
/* ------------------------------ prv ------------------------------ */
static int
connect_thread_prv(struct emu *emu, struct thread *thread, struct prv *prv)
{
struct nodes_thread *th = EXT(thread, 'D');
for (int i = 0; i < CH_MAX; i++) {
struct chan *out = track_get_default(&th->track[i]);
long type = pvt_type[i];
long row = thread->gindex;
if (prv_register(prv, row, type, &emu->bay, out, PRV_DUP)) {
err("prv_register failed");
return -1;
}
}
return 0;
}
static int
connect_cpu_prv(struct emu *emu, struct cpu *scpu, struct prv *prv)
{
struct nodes_cpu *cpu = EXT(scpu, 'D');
for (int i = 0; i < CH_MAX; i++) {
struct chan *out = track_get_default(&cpu->track[i]);
long type = pvt_type[i];
long row = scpu->gindex;
if (prv_register(prv, row, type, &emu->bay, out, PRV_DUP)) {
err("prv_register failed");
return -1;
}
}
return 0;
}
static int
connect_threads(struct emu *emu)
{
struct system *sys = &emu->system;
/* Get thread PRV */
struct pvt *pvt = recorder_find_pvt(&emu->recorder, "thread");
if (pvt == NULL) {
err("cannot find thread pvt");
return -1;
}
/* Connect thread channels to PRV */
struct prv *prv = pvt_get_prv(pvt);
for (struct thread *t = sys->threads; t; t = t->gnext) {
if (connect_thread_prv(emu, t, prv) != 0) {
err("connect_thread_prv failed");
return -1;
}
}
/* Init thread PCF */
struct pcf *pcf = pvt_get_pcf(pvt);
if (init_pcf(pcf, CT_TH) != 0) {
err("init_pcf failed");
return -1;
}
return 0;
}
static int
connect_cpus(struct emu *emu)
{
struct system *sys = &emu->system;
/* Get cpu PRV */
struct pvt *pvt = recorder_find_pvt(&emu->recorder, "cpu");
if (pvt == NULL) {
err("cannot find cpu pvt");
return -1;
}
/* Connect CPU channels to PRV */
struct prv *prv = pvt_get_prv(pvt);
for (struct cpu *c = sys->cpus; c; c = c->next) {
if (connect_cpu_prv(emu, c, prv) != 0) {
err("connect_cpu_prv failed");
return -1;
}
}
/* Init CPU PCF */
struct pcf *pcf = pvt_get_pcf(pvt);
if (init_pcf(pcf, CT_CPU) != 0) {
err("init_pcf failed");
return -1;
}
return 0;
}
/* Connect all outputs to the paraver trace and setup PCF types */
int
nodes_init_pvt(struct emu *emu)
{
if (connect_threads(emu) != 0) {
err("connect_threads failed");
return -1;
}
if (connect_cpus(emu) != 0) {
err("connect_cpus failed");
return -1;
}
return 0;
}
int
nodes_finish_pvt(struct emu *emu)
{
UNUSED(emu);
return 0;
}
const char *
nodes_ss_name(int ss)
{
static const char *unknown = "(unknown)";
const char *name = unknown;
const struct pcf_value_label *pv;
for (pv = &nodes_ss_values[0]; pv->label; pv++) {
if (pv->value == ss) {
name = pv->label;
break;
}
}
return name;
}

View File

@ -2,5 +2,5 @@
# SPDX-License-Identifier: GPL-3.0-or-later
add_subdirectory(nanos6)
#add_subdirectory(nodes)
add_subdirectory(nodes)
add_subdirectory(nosv)

View File

@ -36,8 +36,9 @@ function(nodes_rt_test)
ENVIRONMENT "NOSV_CONFIG=${OVNI_TEST_SOURCE_DIR}/rt/nodes/nosv.toml")
endfunction()
nodes_rt_test(../nanos6/simple-task.c NAME simple-task SORT)
nodes_rt_test(../nanos6/nested-task.c NAME nested-task SORT)
nodes_rt_test(../nanos6/several-tasks.c NAME several-tasks SORT)
nodes_rt_test(../nanos6/if0.c NAME if0 SORT)
nodes_rt_test(../nanos6/sched-add.c NAME sched-add SORT)
# FIXME: Add SORT for all tests once ovnisort is ported
nodes_rt_test(../nanos6/simple-task.c NAME simple-task)
nodes_rt_test(../nanos6/nested-task.c NAME nested-task)
nodes_rt_test(../nanos6/several-tasks.c NAME several-tasks)
nodes_rt_test(../nanos6/if0.c NAME if0)
nodes_rt_test(../nanos6/sched-add.c NAME sched-add)