Begin porting nanos6 model

This commit is contained in:
Rodrigo Arias 2023-01-30 22:43:57 +01:00 committed by Rodrigo Arias Mallo
parent 070c0f5e24
commit 524ccc4dd5
30 changed files with 1203 additions and 229 deletions

View File

@ -13,27 +13,31 @@ include_directories(
add_library(emu STATIC
../common.c
bay.c
chan.c
clkoff.c
cpu.c
loom.c
proc.c
thread.c
path.c
metadata.c
emu.c
system.c
#emu_system_thread.c
emu_args.c
emu_ev.c
emu_model.c
emu_player.c
emu_stream.c
emu_trace.c
emu_player.c
emu_model.c
emu_ev.c
chan.c
bay.c
mux.c
prv.c
clkoff.c
loom.c
metadata.c
model_nanos6.c
model_ust.c
mux.c
path.c
proc.c
prv.c
pvt.c
recorder.c
system.c
task.c
thread.c
extend.c
)
#add_library(emu STATIC

View File

@ -1,4 +1,4 @@
//#define ENABLE_DEBUG
#define ENABLE_DEBUG
#include "bay.h"
@ -6,11 +6,6 @@
#include "uthash.h"
#include "utlist.h"
//static char *propname[BAY_CB_MAX] = {
// [BAY_CB_DIRTY] = "dirty",
// [BAY_CB_EMIT] = "emit"
//};
/* Called from the channel when it becomes dirty */
static int
cb_chan_is_dirty(struct chan *chan, void *arg)
@ -146,6 +141,13 @@ bay_init(struct bay *bay)
static int
propagate_chan(struct bay_chan *bchan, enum bay_cb_type type)
{
char *propname[BAY_CB_MAX] = {
[BAY_CB_DIRTY] = "dirty",
[BAY_CB_EMIT] = "emit"
};
UNUSED(propname);
dbg("- propagating channel '%s' phase %s\n",
bchan->chan->name, propname[type]);

View File

@ -6,6 +6,8 @@
#include "chan.h"
#include "value.h"
#include "utlist.h"
#include "pvt.h"
#include "prv.h"
static const char chan_fmt[] = "cpu%ld.%s";
static const char *chan_name[] = {
@ -16,6 +18,14 @@ static const char *chan_name[] = {
[CPU_CHAN_FLUSH] = "flush_running",
};
static int chan_type[] = {
[CPU_CHAN_PID] = 1,
[CPU_CHAN_TID] = 2,
[CPU_CHAN_NRUN] = 3,
[CPU_CHAN_APPID] = 5,
[CPU_CHAN_FLUSH] = 7,
};
void
cpu_init_begin(struct cpu *cpu, int phyid)
{
@ -71,18 +81,34 @@ cpu_init_end(struct cpu *cpu)
}
int
cpu_connect(struct cpu *cpu, struct bay *bay)
cpu_connect(struct cpu *cpu, struct bay *bay, struct recorder *rec)
{
if (!cpu->is_init) {
err("cpu not initialized");
return -1;
}
/* Get cpu prv */
struct pvt *pvt = recorder_find_pvt(rec, "cpu");
if (pvt == NULL) {
err("cannot find cpu pvt");
return -1;
}
struct prv *prv = pvt_get_prv(pvt);
for (int i = 0; i < CPU_CHAN_MAX; i++) {
if (bay_register(bay, &cpu->chan[i]) != 0) {
struct chan *c = &cpu->chan[i];
if (bay_register(bay, c) != 0) {
err("bay_register failed");
return -1;
}
long type = chan_type[i];
long row = cpu->gindex;
if (prv_register(prv, row, type, bay, c)) {
err("prv_register failed");
return -1;
}
}
return 0;
@ -127,10 +153,26 @@ cpu_update(struct cpu *cpu)
cpu->nth_running = running;
cpu->nth_active = active;
if (running == 1)
struct value tid_running;
struct value pid_running;
if (running == 1) {
cpu->th_running = th_running;
else
tid_running = value_int64(th_running->tid);
pid_running = value_int64(th_running->proc->pid);
} else {
cpu->th_running = NULL;
tid_running = value_null();
pid_running = value_null();
}
if (chan_set(&cpu->chan[CPU_CHAN_TID], tid_running) != 0) {
err("chan_set tid failed");
return -1;
}
if (chan_set(&cpu->chan[CPU_CHAN_PID], pid_running) != 0) {
err("chan_set pid failed");
return -1;
}
if (active == 1)
cpu->th_active = th_active;
@ -138,8 +180,7 @@ cpu_update(struct cpu *cpu)
cpu->th_active = NULL;
/* Update nth_running number in the channel */
struct chan *nrun = &cpu->chan[CPU_CHAN_NRUN];
if (chan_set(nrun, value_int64(cpu->nth_running)) != 0) {
if (chan_set(&cpu->chan[CPU_CHAN_NRUN], value_int64(running)) != 0) {
err("chan_set nth_running failed");
return -1;
}

View File

@ -10,6 +10,8 @@ struct cpu; /* Needed for thread */
#include "chan.h"
#include "bay.h"
#include "uthash.h"
#include "recorder.h"
#include "extend.h"
#include <linux/limits.h>
enum cpu_chan {
@ -52,7 +54,7 @@ struct cpu {
/* Channels */
struct chan chan[CPU_CHAN_MAX];
//struct model_ctx ctx;
struct extend ext;
UT_hash_handle hh; /* CPUs in the loom */
};
@ -63,7 +65,7 @@ int cpu_get_phyid(struct cpu *cpu);
void cpu_set_gindex(struct cpu *cpu, int64_t gindex);
void cpu_set_name(struct cpu *cpu, const char *name);
int cpu_init_end(struct cpu *cpu);
int cpu_connect(struct cpu *cpu, struct bay *bay);
int cpu_connect(struct cpu *cpu, struct bay *bay, struct recorder *rec);
int cpu_update(struct cpu *cpu);
int cpu_add_thread(struct cpu *cpu, struct thread *thread);

View File

@ -3,10 +3,13 @@
#define _POSIX_C_SOURCE 2
#define ENABLE_DEBUG
#include "emu.h"
#include <unistd.h>
#include "model_ust.h"
#include "model_nanos6.h"
int
emu_init(struct emu *emu, int argc, char *argv[])
@ -17,8 +20,7 @@ emu_init(struct emu *emu, int argc, char *argv[])
/* Load the streams into the trace */
if (emu_trace_load(&emu->trace, emu->args.tracedir) != 0) {
err("emu_init: cannot load trace '%s'\n",
emu->args.tracedir);
err("cannot load trace '%s'\n", emu->args.tracedir);
return -1;
}
@ -29,11 +31,17 @@ emu_init(struct emu *emu, int argc, char *argv[])
return -1;
}
/* Place output inside the same tracedir directory */
if (recorder_init(&emu->recorder, emu->args.tracedir) != 0) {
err("recorder_init failed");
return -1;
}
/* Initialize the bay */
bay_init(&emu->bay);
/* Connect system channels to bay */
if (system_connect(&emu->system, &emu->bay) != 0) {
if (system_connect(&emu->system, &emu->bay, &emu->recorder) != 0) {
err("system_connect failed");
return -1;
}
@ -48,6 +56,31 @@ emu_init(struct emu *emu, int argc, char *argv[])
// emu_model_register(&emu->model, &ovni_model_spec, emu);
//
if (model_ust.create && model_ust.create(emu) != 0) {
err("model ust create failed");
return -1;
}
if (model_nanos6.create && model_nanos6.create(emu) != 0) {
err("model nanos6 create failed");
return -1;
}
return 0;
}
int
emu_connect(struct emu *emu)
{
if (model_ust.connect && model_ust.connect(emu) != 0) {
err("model ust connect failed");
return -1;
}
if (model_nanos6.connect && model_nanos6.connect(emu) != 0) {
err("model nanos6 connect failed");
return -1;
}
return 0;
}
@ -98,12 +131,20 @@ emu_step(struct emu *emu)
/* Error happened */
if (ret < 0) {
err("emu_step: emu_player_step failed\n");
err("emu_player_step failed");
return -1;
}
set_current(emu);
dbg("----- mvc=%s dclock=%ld -----", emu->ev->mcv, emu->ev->dclock);
/* Advance recorder clock */
if (recorder_advance(&emu->recorder, emu->ev->dclock) != 0) {
err("recorder_advance failed");
return -1;
}
/* Otherwise progress */
if (emu->ev->m == 'O' && model_ust.event(emu) != 0) {
err("ovni event failed");
@ -111,6 +152,12 @@ emu_step(struct emu *emu)
return -1;
}
if (emu->ev->m == '6' && model_nanos6.event(emu) != 0) {
err("nanos6 event failed");
panic(emu);
return -1;
}
if (bay_propagate(&emu->bay) != 0) {
err("bay_propagate failed");
return -1;

View File

@ -4,16 +4,14 @@
#ifndef EMU_H
#define EMU_H
struct emu;
#include "bay.h"
#include "pvtrace.h"
#include "emu_trace.h"
#include "emu_args.h"
#include "system.h"
#include "emu_player.h"
#include "emu_model.h"
#include "emu_ev.h"
#include "recorder.h"
enum error_values {
ST_BAD = 666,
@ -22,13 +20,13 @@ enum error_values {
struct emu {
struct bay bay;
struct pvman *pvman;
struct emu_args args;
struct emu_trace trace;
struct system system;
struct emu_player player;
struct emu_model model;
struct recorder recorder;
/* Quick access */
struct emu_stream *stream;
@ -39,6 +37,7 @@ struct emu {
};
int emu_init(struct emu *emu, int argc, char *argv[]);
int emu_connect(struct emu *emu);
int emu_step(struct emu *emu);
static inline struct emu *

View File

@ -9,9 +9,9 @@
/* Easier to parse emulation event */
struct emu_ev {
char m;
char c;
char v;
uint8_t m;
uint8_t c;
uint8_t v;
char mcv[4];
int64_t rclock; /* As-is clock in the binary stream */

10
src/emu/emu_hook.h Normal file
View File

@ -0,0 +1,10 @@
/* Copyright (c) 2021-2023 Barcelona Supercomputing Center (BSC)
* SPDX-License-Identifier: GPL-3.0-or-later */
#ifndef EMU_HOOK_H
#define EMU_HOOK_H
struct emu;
typedef int (emu_hook_t)(struct emu *emu);
#endif /* EMU_HOOK_H */

View File

@ -4,7 +4,7 @@
#ifndef EMU_MODEL_H
#define EMU_MODEL_H
typedef int (emu_hook_t)(void *ptr);
#include "emu_hook.h"
struct model_spec {
char *name;

16
src/emu/extend.c Normal file
View File

@ -0,0 +1,16 @@
/* Copyright (c) 2021-2023 Barcelona Supercomputing Center (BSC)
* SPDX-License-Identifier: GPL-3.0-or-later */
#include "extend.h"
void
extend_set(struct extend *ext, int id, void *ctx)
{
ext->ctx[id] = ctx;
}
void *
extend_get(struct extend *ext, int id)
{
return ext->ctx[id];
}

16
src/emu/extend.h Normal file
View File

@ -0,0 +1,16 @@
/* Copyright (c) 2021-2023 Barcelona Supercomputing Center (BSC)
* SPDX-License-Identifier: GPL-3.0-or-later */
#ifndef EXTEND_H
#define EXTEND_H
#define MAX_EXTEND 256
struct extend {
void *ctx[MAX_EXTEND];
};
void extend_set(struct extend *ext, int id, void *ctx);
void *extend_get(struct extend *ext, int id);
#endif /* EXTEND_H */

386
src/emu/model_nanos6.c Normal file
View File

@ -0,0 +1,386 @@
/* Copyright (c) 2021-2023 Barcelona Supercomputing Center (BSC)
* SPDX-License-Identifier: GPL-3.0-or-later */
#define ENABLE_DEBUG
#include "model_nanos6.h"
#include "emu.h"
#include "loom.h"
#include "common.h"
#include "chan.h"
/* Raw channels */
static const char chan_cpu_fmt[] = "nanos6.cpu%ld.%s.raw";
static const char chan_th_fmt[] = "nanos6.thread%ld.%s.raw";
/* Channels filtered by tracking */
//static const char chan_fcpu_fmt[] = "nanos6.cpu%ld.%s.filtered";
static const char chan_fth_fmt[] = "nanos6.thread%ld.%s.filtered";
static const char *chan_name[] = {
[NANOS6_CHAN_TASKID] = "taskid",
[NANOS6_CHAN_TYPE] = "task_type",
[NANOS6_CHAN_SUBSYSTEM] = "subsystem",
[NANOS6_CHAN_RANK] = "rank",
[NANOS6_CHAN_THREAD] = "thread_type",
};
static const char *th_track[] = {
[NANOS6_CHAN_TASKID] = "running",
[NANOS6_CHAN_TYPE] = "running",
[NANOS6_CHAN_SUBSYSTEM] = "active",
[NANOS6_CHAN_RANK] = "running",
[NANOS6_CHAN_THREAD] = "none",
};
static const int chan_stack[] = {
[NANOS6_CHAN_SUBSYSTEM] = 1,
[NANOS6_CHAN_THREAD] = 1,
};
static const int th_type[] = {
[NANOS6_CHAN_TASKID] = 35,
[NANOS6_CHAN_TYPE] = 36,
[NANOS6_CHAN_SUBSYSTEM] = 37,
[NANOS6_CHAN_RANK] = 38,
[NANOS6_CHAN_THREAD] = 39,
};
enum { PUSH = 1, POP = 2, IGN = 3 };
#define CHSS NANOS6_CHAN_SUBSYSTEM
#define CHTT NANOS6_CHAN_THREAD
static const int ss_table[256][256][3] = {
['W'] = {
['['] = { CHSS, PUSH, ST_NANOS6_WORKER_LOOP },
[']'] = { CHSS, POP, ST_NANOS6_WORKER_LOOP },
['t'] = { CHSS, PUSH, ST_NANOS6_HANDLING_TASK },
['T'] = { CHSS, POP, ST_NANOS6_HANDLING_TASK },
['w'] = { CHSS, PUSH, ST_NANOS6_SWITCH_TO },
['W'] = { CHSS, POP, ST_NANOS6_SWITCH_TO },
['m'] = { CHSS, PUSH, ST_NANOS6_MIGRATE },
['M'] = { CHSS, POP, ST_NANOS6_MIGRATE },
['s'] = { CHSS, PUSH, ST_NANOS6_SUSPEND },
['S'] = { CHSS, POP, ST_NANOS6_SUSPEND },
['r'] = { CHSS, PUSH, ST_NANOS6_RESUME },
['R'] = { CHSS, POP, ST_NANOS6_RESUME },
['*'] = { CHSS, IGN, -1 },
},
['C'] = {
['['] = { CHSS, PUSH, ST_NANOS6_TASK_CREATING },
[']'] = { CHSS, POP, ST_NANOS6_TASK_CREATING },
},
['U'] = {
['['] = { CHSS, PUSH, ST_NANOS6_TASK_SUBMIT },
[']'] = { CHSS, POP, ST_NANOS6_TASK_SUBMIT },
},
['F'] = {
['['] = { CHSS, PUSH, ST_NANOS6_TASK_SPAWNING },
[']'] = { CHSS, POP, ST_NANOS6_TASK_SPAWNING },
},
['O'] = {
['['] = { CHSS, PUSH, ST_NANOS6_TASK_FOR },
[']'] = { CHSS, POP, ST_NANOS6_TASK_FOR },
},
['t'] = {
['['] = { CHSS, PUSH, ST_NANOS6_TASK_BODY },
[']'] = { CHSS, POP, ST_NANOS6_TASK_BODY },
},
['M'] = {
['a'] = { CHSS, PUSH, ST_NANOS6_ALLOCATING },
['A'] = { CHSS, POP, ST_NANOS6_ALLOCATING },
['f'] = { CHSS, PUSH, ST_NANOS6_FREEING },
['F'] = { CHSS, POP, ST_NANOS6_FREEING },
},
['D'] = {
['r'] = { CHSS, PUSH, ST_NANOS6_DEP_REG },
['R'] = { CHSS, POP, ST_NANOS6_DEP_REG },
['u'] = { CHSS, PUSH, ST_NANOS6_DEP_UNREG },
['U'] = { CHSS, POP, ST_NANOS6_DEP_UNREG },
},
['S'] = {
['['] = { CHSS, PUSH, ST_NANOS6_SCHED_SERVING },
[']'] = { CHSS, POP, ST_NANOS6_SCHED_SERVING },
['a'] = { CHSS, PUSH, ST_NANOS6_SCHED_ADDING },
['A'] = { CHSS, POP, ST_NANOS6_SCHED_ADDING },
['p'] = { CHSS, PUSH, ST_NANOS6_SCHED_PROCESSING },
['P'] = { CHSS, POP, ST_NANOS6_SCHED_PROCESSING },
['@'] = { CHSS, IGN, -1 },
['r'] = { CHSS, IGN, -1 },
['s'] = { CHSS, IGN, -1 },
},
['B'] = {
['b'] = { CHSS, PUSH, ST_NANOS6_BLK_BLOCKING },
['B'] = { CHSS, POP, ST_NANOS6_BLK_BLOCKING },
['u'] = { CHSS, PUSH, ST_NANOS6_BLK_UNBLOCKING },
['U'] = { CHSS, POP, ST_NANOS6_BLK_UNBLOCKING },
['w'] = { CHSS, PUSH, ST_NANOS6_BLK_TASKWAIT },
['W'] = { CHSS, POP, ST_NANOS6_BLK_TASKWAIT },
['f'] = { CHSS, PUSH, ST_NANOS6_BLK_WAITFOR },
['F'] = { CHSS, POP, ST_NANOS6_BLK_WAITFOR },
},
['H'] = {
['e'] = { CHTT, PUSH, ST_NANOS6_TH_EXTERNAL },
['E'] = { CHTT, POP, ST_NANOS6_TH_EXTERNAL },
['w'] = { CHTT, PUSH, ST_NANOS6_TH_WORKER },
['W'] = { CHTT, POP, ST_NANOS6_TH_WORKER },
['l'] = { CHTT, PUSH, ST_NANOS6_TH_LEADER },
['L'] = { CHTT, POP, ST_NANOS6_TH_LEADER },
['m'] = { CHTT, PUSH, ST_NANOS6_TH_MAIN },
['M'] = { CHTT, POP, ST_NANOS6_TH_MAIN },
},
};
static int
nanos6_probe(struct emu *emu)
{
if (emu->system.nthreads == 0)
return -1;
return 0;
}
static int
init_chans(struct bay *bay, struct chan chans[], const char *fmt, int64_t gindex, int filtered)
{
for (int i = 0; i < NANOS6_CHAN_MAX; i++) {
struct chan *c = &chans[i];
int type = (chan_stack[i] && !filtered) ? CHAN_STACK : CHAN_SINGLE;
chan_init(c, type, fmt, gindex, chan_name[i]);
if (bay_register(bay, c) != 0) {
err("bay_register failed");
return -1;
}
}
return 0;
}
static int
nanos6_create(struct emu *emu)
{
struct system *sys = &emu->system;
struct bay *bay = &emu->bay;
/* Create nanos6 cpu data */
struct nanos6_cpu *cpus = calloc(sys->ncpus, sizeof(*cpus));
if (cpus == NULL) {
err("calloc failed:");
return -1;
}
for (struct cpu *c = sys->cpus; c; c = c->next) {
struct nanos6_cpu *cpu = &cpus[c->gindex];
if (init_chans(bay, cpu->chans, chan_cpu_fmt, c->gindex, 0) != 0) {
err("init_chans failed");
return -1;
}
extend_set(&c->ext, model_nanos6.model, cpu);
}
/* Create nanos6 thread data */
struct nanos6_thread *threads = calloc(sys->nthreads, sizeof(*threads));
if (threads == NULL) {
err("calloc failed:");
return -1;
}
for (struct thread *t = sys->threads; t; t = t->gnext) {
struct nanos6_thread *th = &threads[t->gindex];
if (init_chans(bay, th->chans, chan_th_fmt, t->gindex, 0) != 0) {
err("init_chans failed");
return -1;
}
if (init_chans(bay, th->fchans, chan_fth_fmt, t->gindex, 1) != 0) {
err("init_chans failed");
return -1;
}
extend_set(&t->ext, model_nanos6.model, th);
}
return 0;
}
static int
connect_thread_mux(struct emu *emu, struct thread *thread)
{
struct nanos6_thread *th = extend_get(&thread->ext, '6');
for (int i = 0; i < NANOS6_CHAN_MAX; i++) {
struct mux *mux = &th->muxers[i];
const char *tracking = th_track[i];
mux_select_func_t selfun;
struct chan *inp = &th->chans[i];
/* TODO: Let the thread take the select channel
* and build the mux as a tracking mode */
struct chan *sel = &thread->chan[TH_CHAN_STATE];
if (strcmp(tracking, "running") == 0) {
selfun = thread_select_running;
} else if (strcmp(tracking, "active") == 0) {
selfun = thread_select_active;
} else {
th->ochans[i] = inp;
/* No tracking */
continue;
}
struct chan *out = &th->fchans[i];
th->ochans[i] = out;
if (mux_init(mux, &emu->bay, sel, out, selfun) != 0) {
err("mux_init failed");
return -1;
}
if (mux_add_input(mux, value_int64(0), inp) != 0) {
err("mux_add_input failed");
return -1;
}
/* Connect to prv output */
}
return 0;
}
static int
connect_thread_prv(struct emu *emu, struct thread *thread, struct prv *prv)
{
struct nanos6_thread *th = extend_get(&thread->ext, '6');
for (int i = 0; i < NANOS6_CHAN_MAX; i++) {
struct chan *out = &th->fchans[i];
long type = th_type[i];
long row = thread->gindex;
if (prv_register(prv, row, type, &emu->bay, out)) {
err("prv_register failed");
return -1;
}
}
return 0;
}
static int
nanos6_connect(struct emu *emu)
{
struct system *sys = &emu->system;
for (struct thread *t = sys->threads; t; t = t->gnext) {
if (connect_thread_mux(emu, t) != 0) {
err("connect_thread_mux failed");
return -1;
}
}
/* Get thread PRV */
struct pvt *pvt = recorder_find_pvt(&emu->recorder, "thread");
if (pvt == NULL) {
err("cannot find thread pvt");
return -1;
}
struct prv *prv = pvt_get_prv(pvt);
for (struct thread *t = sys->threads; t; t = t->gnext) {
if (connect_thread_prv(emu, t, prv) != 0) {
err("connect_thread_prv failed");
return -1;
}
}
return 0;
}
static int
simple(struct emu *emu)
{
const int *entry = ss_table[emu->ev->c][emu->ev->v];
int chind = entry[0];
int action = entry[1];
int st = entry[2];
struct nanos6_thread *th = extend_get(&emu->thread->ext, '6');
struct chan *ch = &th->chans[chind];
if (action == PUSH) {
return chan_push(ch, value_int64(st));
} else if (action == POP) {
return chan_pop(ch, value_int64(st));
} else if (action == IGN) {
return 0; /* do nothing */
} else {
err("unknown Nanos6 subsystem event");
return -1;
}
return 0;
}
static int
process_ev(struct emu *emu)
{
if (!emu->thread->is_active) {
err("current thread %d not active", emu->thread->tid);
return -1;
}
switch (emu->ev->c) {
case 'C':
case 'S':
case 'U':
case 'F':
case 'O':
case 't':
case 'H':
case 'D':
case 'B':
case 'W':
return simple(emu);
// case 'T':
// pre_task(emu);
// break;
// case 'Y':
// pre_type(emu);
// break;
default:
err("unknown Nanos6 event category");
// return -1;
}
/* Not reached */
return 0;
}
static int
nanos6_event(struct emu *emu)
{
if (emu->ev->m != model_nanos6.model) {
err("unexpected event model %c\n", emu->ev->m);
return -1;
}
dbg("got nanos6 event %s", emu->ev->mcv);
if (process_ev(emu) != 0) {
err("error processing Nanos6 event");
return -1;
}
//check_affinity(emu);
return 0;
}
struct model_spec model_nanos6 = {
.name = "nanos6",
.model = '6',
.create = nanos6_create,
.connect = nanos6_connect,
.event = nanos6_event,
.probe = nanos6_probe,
};

80
src/emu/model_nanos6.h Normal file
View File

@ -0,0 +1,80 @@
/* Copyright (c) 2023 Barcelona Supercomputing Center (BSC)
* SPDX-License-Identifier: GPL-3.0-or-later */
#ifndef MODEL_NANOS6_H
#define MODEL_NANOS6_H
#include "emu_model.h"
extern struct model_spec model_nanos6;
#include "chan.h"
#include "mux.h"
#include "task.h"
enum nanos6_chan_type {
NANOS6_CHAN_TASKID = 0,
NANOS6_CHAN_TYPE,
NANOS6_CHAN_SUBSYSTEM,
NANOS6_CHAN_RANK,
NANOS6_CHAN_THREAD,
NANOS6_CHAN_MAX,
};
enum nanos6_ss_state {
ST_NANOS6_TASK_BODY = 1,
ST_NANOS6_TASK_CREATING,
ST_NANOS6_TASK_SUBMIT,
ST_NANOS6_TASK_SPAWNING,
ST_NANOS6_TASK_FOR,
ST_NANOS6_SCHED_ADDING,
ST_NANOS6_SCHED_PROCESSING,
ST_NANOS6_SCHED_SERVING,
ST_NANOS6_DEP_REG,
ST_NANOS6_DEP_UNREG,
ST_NANOS6_BLK_TASKWAIT,
ST_NANOS6_BLK_WAITFOR,
ST_NANOS6_BLK_BLOCKING,
ST_NANOS6_BLK_UNBLOCKING,
ST_NANOS6_ALLOCATING,
ST_NANOS6_FREEING,
ST_NANOS6_HANDLING_TASK,
ST_NANOS6_WORKER_LOOP,
ST_NANOS6_SWITCH_TO,
ST_NANOS6_MIGRATE,
ST_NANOS6_SUSPEND,
ST_NANOS6_RESUME,
/* Value 51 is broken in old Paraver */
EV_NANOS6_SCHED_RECV = 60,
EV_NANOS6_SCHED_SEND,
EV_NANOS6_SCHED_SELF,
EV_NANOS6_CPU_IDLE,
EV_NANOS6_CPU_ACTIVE,
EV_NANOS6_SIGNAL,
};
enum nanos6_thread_type {
ST_NANOS6_TH_LEADER = 1,
ST_NANOS6_TH_MAIN = 2,
ST_NANOS6_TH_WORKER = 3,
ST_NANOS6_TH_EXTERNAL = 4,
};
struct nanos6_thread {
struct chan chans[NANOS6_CHAN_MAX];
struct chan fchans[NANOS6_CHAN_MAX];
struct chan *ochans[NANOS6_CHAN_MAX];
struct mux muxers[NANOS6_CHAN_MAX];
struct task_stack task_stack;
};
struct nanos6_cpu {
struct chan chans[NANOS6_CHAN_MAX];
};
struct nanos6_proc {
struct task_info task_info;
};
#endif /* MODEL_NANOS6_H */

View File

@ -413,10 +413,8 @@ process_ev(struct emu *emu)
}
static int
ust_probe(void *p)
ust_probe(struct emu *emu)
{
struct emu *emu = emu_get(p);
if (emu->system.nthreads == 0)
return -1;
@ -424,9 +422,8 @@ ust_probe(void *p)
}
static int
ust_event(void *ptr)
ust_event(struct emu *emu)
{
struct emu *emu = emu_get(ptr);
if (emu->ev->m != model_ust.model) {
err("unexpected event model %c\n", emu->ev->m);
return -1;

View File

@ -64,8 +64,10 @@ cb_select(struct chan *sel_chan, void *ptr)
return -1;
}
dbg("mux selects input key=%s chan=%s\n",
value_str(sel_value, buf), input->chan->name);
if (input) {
dbg("mux selects input key=%s chan=%s\n",
value_str(sel_value, buf), input->chan->name);
}
/* Set to null by default */
struct value out_value = value_null();
@ -227,5 +229,7 @@ mux_add_input(struct mux *mux, struct value key, struct chan *chan)
return -1;
}
mux->ninputs++;
return 0;
}

View File

@ -7,7 +7,10 @@
/* No loom dependency here */
#include "thread.h"
#include "parson.h"
#include "uthash.h"
#include <stddef.h>
#include <stdint.h>
#include <linux/limits.h>
struct proc {
int64_t gindex;

View File

@ -1,6 +1,8 @@
/* Copyright (c) 2021-2023 Barcelona Supercomputing Center (BSC)
* SPDX-License-Identifier: GPL-3.0-or-later */
#define ENABLE_DEBUG
#include "prv.h"
#include <stdio.h>
#include <errno.h>
@ -32,7 +34,7 @@ prv_open(struct prv *prv, long nrows, const char *path)
FILE *f = fopen(path, "w");
if (f == NULL) {
die("prv_open: cannot open file '%s' for writting: %s\n",
die("cannot open file '%s' for writting: %s\n",
path, strerror(errno));
return -1;
}
@ -75,34 +77,51 @@ emit(struct prv *prv, struct prv_chan *rchan)
{
struct value value;
struct chan *chan = rchan->chan;
char buf[128];
if (chan_read(chan, &value) != 0) {
err("prv_emit: chan_read %s failed\n", chan->name);
err("chan_read %s failed\n", chan->name);
return -1;
}
/* Ensure we don't emit the same value twice */
if (rchan->last_value_set) {
/* TODO: skip optionally */
if (value_is_equal(&value, &rchan->last_value)) {
char buf[128];
err("prv_emit: cannot emit value %s twice for channel %s\n",
err("skipping duplicated value %s for channel %s\n",
value_str(value, buf), chan->name);
return -1;
return 0;
}
}
if (value.type != VALUE_INT64) {
char buf[128];
err("prv_emit: chan_read %s only int64 supported: %s\n",
chan->name, value_str(value, buf));
return -1;
long val = 0;
switch (value.type) {
case VALUE_INT64:
val = value.i;
//if (val == 0) {
// err("forbidden value 0 in %s: %s\n",
// chan->name,
// value_str(value, buf));
// return -1;
//}
break;
case VALUE_NULL:
val = 0;
break;
default:
err("chan_read %s only int64 and null supported: %s\n",
chan->name, value_str(value, buf));
return -1;
}
if (write_line(prv, rchan->row_base1, rchan->type, value.i) != 0) {
err("prv_emit: write_line failed for channel %s\n",
if (write_line(prv, rchan->row_base1, rchan->type, val) != 0) {
err("write_line failed for channel %s\n",
chan->name);
return -1;
}
dbg("written %s for chan %s", value_str(value, buf), chan->name);
rchan->last_value = value;
rchan->last_value_set = 1;
@ -122,16 +141,16 @@ cb_prv(struct chan *chan, void *ptr)
int
prv_register(struct prv *prv, long row, long type, struct bay *bay, struct chan *chan)
{
/* FIXME: use the type instead of channel name as key */
struct prv_chan *rchan = find_prv_chan(prv, chan->name);
if (rchan != NULL) {
err("prv_register: channel %s already registered\n",
chan->name);
err("channel %s already registered", chan->name);
return -1;
}
rchan = calloc(1, sizeof(struct prv_chan));
if (rchan == NULL) {
err("prv_register: calloc failed\n");
err("calloc failed:");
return -1;
}
@ -144,7 +163,7 @@ prv_register(struct prv *prv, long row, long type, struct bay *bay, struct chan
/* Add emit callback */
if (bay_add_cb(bay, BAY_CB_EMIT, chan, cb_prv, rchan) != 0) {
err("prv_register: bay_add_cb failed\n");
err("bay_add_cb failed");
return -1;
}
@ -158,7 +177,7 @@ int
prv_advance(struct prv *prv, int64_t time)
{
if (time < prv->time) {
err("prv_advance: cannot move to previous time\n");
err("cannot move to previous time");
return -1;
}

42
src/emu/pvt.c Normal file
View File

@ -0,0 +1,42 @@
#include "pvt.h"
int
pvt_open(struct pvt *pvt, long nrows, const char *dir, const char *name)
{
memset(pvt, 0, sizeof(struct pvt));
if (snprintf(pvt->dir, PATH_MAX, "%s", dir) >= PATH_MAX) {
err("snprintf failed: name too long");
return -1;
}
if (snprintf(pvt->name, PATH_MAX, "%s", name) >= PATH_MAX) {
err("snprintf failed: name too long");
return -1;
}
char prvpath[PATH_MAX];
if (snprintf(prvpath, PATH_MAX, "%s/%s.prv", dir, name) >= PATH_MAX) {
err("snprintf failed: path too long");
return -1;
}
if (prv_open(&pvt->prv, nrows, prvpath) != 0) {
err("prv_open failed");
return -1;
}
return 0;
}
struct prv *
pvt_get_prv(struct pvt *pvt)
{
return &pvt->prv;
}
int
pvt_advance(struct pvt *pvt, int64_t time)
{
return prv_advance(&pvt->prv, time);
}

26
src/emu/pvt.h Normal file
View File

@ -0,0 +1,26 @@
/* Copyright (c) 2021-2023 Barcelona Supercomputing Center (BSC)
* SPDX-License-Identifier: GPL-3.0-or-later */
#ifndef PVT_H
#define PVT_H
#include "prv.h"
#include "pcf.h"
#include "uthash.h"
#include <linux/limits.h>
struct pvt {
char dir[PATH_MAX];
char name[PATH_MAX]; /* Without .prv extension */
struct prv prv;
struct pcf_file pcf;
struct UT_hash_handle hh; /* For recorder */
};
int pvt_open(struct pvt *pvt, long nrows, const char *dir, const char *name);
struct prv *pvt_get_prv(struct pvt *pvt);
struct pcf *pvt_get_pcf(struct pvt *pvt);
int pvt_advance(struct pvt *pvt, int64_t time);
#endif /* PVT_H */

View File

View File

@ -1,30 +0,0 @@
/* Copyright (c) 2021-2023 Barcelona Supercomputing Center (BSC)
* SPDX-License-Identifier: GPL-3.0-or-later */
#ifndef PVTRACE_H
#define PVTRACE_H
#include "prv.h"
#include "pcf.h"
#include "uthash.h"
#include <stdio.h>
#include <linux/limits.h>
struct pvtrace {
char name[PATH_MAX];
struct prv prv;
struct pcf_file pcf;
};
struct pvmanager {
struct pvtrace *traces;
};
int pvmanager_init(struct pvmanager *man);
struct pvt *pvman_new(struct pvmanager *man,
const char *path, long nrows);
struct prv *pvt_get_prv(struct pvtrace *trace);
struct pcf *pvt_get_pcf(struct pvtrace *trace);
#endif /* PRV_H */

66
src/emu/recorder.c Normal file
View File

@ -0,0 +1,66 @@
/* Copyright (c) 2021-2023 Barcelona Supercomputing Center (BSC)
* SPDX-License-Identifier: GPL-3.0-or-later */
#define ENABLE_DEBUG
#include "recorder.h"
int
recorder_init(struct recorder *rec, const char *dir)
{
memset(rec, 0, sizeof(struct recorder));
if (snprintf(rec->dir, PATH_MAX, "%s", dir) >= PATH_MAX) {
err("snprintf failed: path too long");
return -1;
}
return 0;
}
struct pvt *
recorder_find_pvt(struct recorder *rec, const char *name)
{
struct pvt *pvt = NULL;
HASH_FIND_STR(rec->pvt, name, pvt);
return pvt;
}
struct pvt *
recorder_add_pvt(struct recorder *rec, const char *name, long nrows)
{
struct pvt *pvt = recorder_find_pvt(rec, name);
if (pvt != NULL) {
err("pvt %s already registered", name);
return NULL;
}
pvt = calloc(1, sizeof(struct pvt));
if (pvt == NULL) {
err("calloc failed:");
return NULL;
}
if (pvt_open(pvt, nrows, rec->dir, name) != 0) {
err("pvt_open failed");
return NULL;
}
HASH_ADD_STR(rec->pvt, name, pvt);
return pvt;
}
int
recorder_advance(struct recorder *rec, int64_t time)
{
for (struct pvt *pvt = rec->pvt; pvt; pvt = pvt->hh.next) {
if (pvt_advance(pvt, time) != 0) {
err("pvt_advance failed");
return -1;
}
}
return 0;
}

23
src/emu/recorder.h Normal file
View File

@ -0,0 +1,23 @@
/* Copyright (c) 2021-2023 Barcelona Supercomputing Center (BSC)
* SPDX-License-Identifier: GPL-3.0-or-later */
#ifndef RECORDER_H
#define RECORDER_H
/* Records data into files (Paraver traces only for now) */
#include "pvt.h"
#include <linux/limits.h>
struct recorder {
char dir[PATH_MAX]; /* To place the traces */
struct pvt *pvt; /* Hash table by name */
};
int recorder_init(struct recorder *rec, const char *dir);
struct pvt *recorder_find_pvt(struct recorder *rec, const char *name);
struct pvt *recorder_add_pvt(struct recorder *rec, const char *name, long nrows);
int recorder_advance(struct recorder *rec, int64_t time);
#endif /* RECORDER_H */

View File

@ -28,7 +28,7 @@ create_thread(struct proc *proc, const char *relpath)
return NULL;
}
if (thread_init_begin(thread, relpath) != 0) {
if (thread_init_begin(thread, proc, relpath) != 0) {
err("cannot init thread");
return NULL;
}
@ -532,17 +532,27 @@ system_get_lpt(struct emu_stream *stream)
}
int
system_connect(struct system *sys, struct bay *bay)
system_connect(struct system *sys, struct bay *bay, struct recorder *rec)
{
/* Create Paraver traces */
if (recorder_add_pvt(rec, "cpu", sys->ncpus) == NULL) {
err("recorder_add_pvt failed");
return -1;
}
if (recorder_add_pvt(rec, "thread", sys->nthreads) == NULL) {
err("recorder_add_pvt failed");
return -1;
}
for (struct thread *th = sys->threads; th; th = th->gnext) {
if (thread_connect(th, bay) != 0) {
if (thread_connect(th, bay, rec) != 0) {
err("thread_connect failed\n");
return -1;
}
}
for (struct cpu *cpu = sys->cpus; cpu; cpu = cpu->next) {
if (cpu_connect(cpu, bay) != 0) {
if (cpu_connect(cpu, bay, rec) != 0) {
err("cpu_connect failed\n");
return -1;
}

View File

@ -12,6 +12,7 @@
#include "thread.h"
#include "cpu.h"
#include "clkoff.h"
#include "recorder.h"
#include <stddef.h>
/* Map from stream to lpt */
@ -27,7 +28,7 @@ struct system {
size_t nlooms;
size_t nthreads;
size_t nprocs;
size_t ncpus; /* Physical */
size_t ncpus; /* Including virtual cpus */
struct loom *looms;
struct proc *procs;
@ -43,7 +44,7 @@ struct system {
};
int system_init(struct system *sys, struct emu_args *args, struct emu_trace *trace);
int system_connect(struct system *sys, struct bay *bay);
int system_connect(struct system *sys, struct bay *bay, struct recorder *rec);
struct lpt *system_get_lpt(struct emu_stream *stream);
//struct emu_cpu *system_find_cpu(struct emu_loom *loom, int cpuid);
//int model_ctx_set(struct model_ctx *ctx, int model, void *data);

View File

@ -1,15 +1,11 @@
/* Copyright (c) 2022 Barcelona Supercomputing Center (BSC)
* SPDX-License-Identifier: GPL-3.0-or-later */
#include "task.h"
#include "uthash.h"
#include "utlist.h"
#include "chan.h"
#include "emu.h"
#include "task.h"
#include "ovni.h"
#include "prv.h"
struct task *
task_find(struct task *tasks, uint32_t task_id)
{
@ -28,24 +24,28 @@ task_type_find(struct task_type *types, uint32_t type_id)
return type;
}
void
task_create(struct ovni_emu *emu, struct task_info *info,
uint32_t type_id, uint32_t task_id)
int
task_create(struct task_info *info, uint32_t type_id, uint32_t task_id)
{
/* Ensure the task id is new */
if (task_find(info->tasks, task_id) != NULL)
edie(emu, "cannot create task: task_id %u already exists\n",
task_id);
if (task_find(info->tasks, task_id) != NULL) {
err("task_id %u already exists", task_id);
return -1;
}
/* Ensure the type exists */
struct task_type *type = task_type_find(info->types, type_id);
if (type == NULL)
edie(emu, "cannot create task: unknown type id %u\n", type_id);
if (type == NULL) {
err("unknown type id %u", type_id);
return -1;
}
struct task *task = calloc(1, sizeof(struct task));
if (task == NULL)
die("calloc failed\n");
if (task == NULL) {
err("calloc failed:");
return -1;
}
task->id = task_id;
task->type = type;
@ -55,114 +55,162 @@ task_create(struct ovni_emu *emu, struct task_info *info,
/* Add the new task to the hash table */
HASH_ADD_INT(info->tasks, id, task);
dbg("new task created id=%d\n", task->id);
dbg("new task created id=%d", task->id);
return 0;
}
void
task_execute(struct ovni_emu *emu,
struct task_stack *stack, struct task *task)
int
task_execute(struct task_stack *stack, struct task *task)
{
if (task == NULL)
edie(emu, "cannot execute: task is NULL\n");
if (task == NULL) {
err("task is NULL");
return -1;
}
if (task->state != TASK_ST_CREATED)
edie(emu, "cannot execute task %u: state is not created\n", task->id);
if (task->state != TASK_ST_CREATED) {
err("cannot execute task %u: state is not created", task->id);
return -1;
}
if (task->thread != NULL)
edie(emu, "task already has a thread assigned\n");
if (task->thread != NULL) {
err("task already has a thread assigned");
return -1;
}
if (stack->thread->state != TH_ST_RUNNING)
edie(emu, "thread state is not running\n");
if (stack->thread->state != TH_ST_RUNNING) {
err("thread state is not running");
return -1;
}
if (stack->top == task)
edie(emu, "thread already has assigned task %u\n", task->id);
if (stack->top == task) {
err("thread already has assigned task %u", task->id);
return -1;
}
if (stack->top && stack->top->state != TASK_ST_RUNNING)
edie(emu, "cannot execute a nested task from a non-running task\n");
if (stack->top && stack->top->state != TASK_ST_RUNNING) {
err("cannot execute a nested task from a non-running task");
return -1;
}
task->state = TASK_ST_RUNNING;
task->thread = stack->thread;
DL_PREPEND(stack->tasks, task);
dbg("task id=%u runs now\n", task->id);
dbg("task id=%u runs now", task->id);
return 0;
}
void
task_pause(struct ovni_emu *emu,
struct task_stack *stack, struct task *task)
int
task_pause(struct task_stack *stack, struct task *task)
{
if (task == NULL)
edie(emu, "cannot pause: task is NULL\n");
if (task == NULL) {
err("cannot pause: task is NULL");
return -1;
}
if (task->state != TASK_ST_RUNNING)
edie(emu, "cannot pause: task state is not running\n");
if (task->state != TASK_ST_RUNNING) {
err("cannot pause: task state is not running");
return -1;
}
if (task->thread == NULL)
edie(emu, "cannot pause: task has no thread assigned\n");
if (task->thread == NULL) {
err("cannot pause: task has no thread assigned");
return -1;
}
if (stack->thread->state != TH_ST_RUNNING)
edie(emu, "cannot pause: thread state is not running\n");
if (stack->thread->state != TH_ST_RUNNING) {
err("cannot pause: thread state is not running");
return -1;
}
if (stack->top != task)
edie(emu, "thread has assigned a different task\n");
if (stack->top != task) {
err("thread has assigned a different task");
return -1;
}
if (stack->thread != task->thread)
edie(emu, "task is assigned to a different thread\n");
if (stack->thread != task->thread) {
err("task is assigned to a different thread");
return -1;
}
task->state = TASK_ST_PAUSED;
dbg("task id=%d pauses\n", task->id);
dbg("task id=%d pauses", task->id);
return 0;
}
void
task_resume(struct ovni_emu *emu,
struct task_stack *stack, struct task *task)
int
task_resume(struct task_stack *stack, struct task *task)
{
if (task == NULL)
edie(emu, "cannot resume: task is NULL\n");
if (task == NULL) {
err("cannot resume: task is NULL");
return -1;
}
if (task->state != TASK_ST_PAUSED)
edie(emu, "task state is not paused\n");
if (task->state != TASK_ST_PAUSED) {
err("task state is not paused");
return -1;
}
if (task->thread == NULL)
edie(emu, "cannot resume: task has no thread assigned\n");
if (task->thread == NULL) {
err("cannot resume: task has no thread assigned");
return -1;
}
if (stack->thread->state != TH_ST_RUNNING)
edie(emu, "thread is not running\n");
if (stack->thread->state != TH_ST_RUNNING) {
err("thread is not running");
return -1;
}
if (stack->top != task)
edie(emu, "thread has assigned a different task\n");
if (stack->top != task) {
err("thread has assigned a different task");
return -1;
}
if (stack->thread != task->thread)
edie(emu, "task is assigned to a different thread\n");
if (stack->thread != task->thread) {
err("task is assigned to a different thread");
return -1;
}
task->state = TASK_ST_RUNNING;
dbg("task id=%d resumes\n", task->id);
dbg("task id=%d resumes", task->id);
return 0;
}
void
task_end(struct ovni_emu *emu,
struct task_stack *stack, struct task *task)
int
task_end(struct task_stack *stack, struct task *task)
{
if (task == NULL)
edie(emu, "cannot end: task is NULL\n");
if (task == NULL) {
err("cannot end: task is NULL");
return -1;
}
if (task->state != TASK_ST_RUNNING)
edie(emu, "task state is not running\n");
if (task->state != TASK_ST_RUNNING) {
err("task state is not running");
return -1;
}
if (task->thread == NULL)
edie(emu, "cannot end: task has no thread assigned\n");
if (task->thread == NULL) {
err("cannot end: task has no thread assigned");
return -1;
}
if (stack->thread->state != TH_ST_RUNNING)
edie(emu, "cannot end task: thread is not running\n");
if (stack->thread->state != TH_ST_RUNNING) {
err("cannot end task: thread is not running");
return -1;
}
if (stack->top != task)
edie(emu, "thread has assigned a different task\n");
if (stack->top != task) {
err("thread has assigned a different task");
return -1;
}
if (stack->thread != task->thread)
edie(emu, "task is assigned to a different thread\n");
if (stack->thread != task->thread) {
err("task is assigned to a different thread");
return -1;
}
task->state = TASK_ST_DEAD;
@ -171,7 +219,8 @@ task_end(struct ovni_emu *emu,
DL_DELETE(stack->tasks, task);
dbg("task id=%d ends\n", task->id);
dbg("task id=%d ends", task->id);
return 0;
}
static uint32_t
@ -193,56 +242,63 @@ get_task_type_gid(const char *label)
return gid;
}
void
int
task_type_create(struct task_info *info, uint32_t type_id, const char *label)
{
struct task_type *type;
/* Ensure the type id is new */
HASH_FIND_INT(info->types, &type_id, type);
if (type != NULL)
die("a task type with id %u already exists\n", type_id);
if (type != NULL) {
err("a task type with id %u already exists", type_id);
return -1;
}
type = calloc(1, sizeof(*type));
if (type == NULL)
die("calloc failed");
if (type == NULL) {
err("calloc failed:");
return -1;
}
type->id = type_id;
if (type->id == 0)
die("invalid task type id %d\n", type->id);
if (type->id == 0) {
err("invalid task type id %d", type->id);
return -1;
}
type->gid = get_task_type_gid(label);
int n = snprintf(type->label, MAX_PCF_LABEL, "%s", label);
if (n >= MAX_PCF_LABEL)
die("task type label too long: %s\n", label);
if (n >= MAX_PCF_LABEL) {
err("task type label too long: %s", label);
return -1;
}
/* Add the new task type to the hash table */
HASH_ADD_INT(info->types, id, type);
dbg("new task type created id=%d label=%s\n", type->id,
type->label);
dbg("new task type created id=%d label=%s", type->id, type->label);
return 0;
}
void
task_create_pcf_types(struct pcf_type *pcftype, struct task_type *types)
{
/* Emit types for all task types */
for (struct task_type *tt = types; tt != NULL; tt = tt->hh.next) {
struct pcf_value *pcfvalue = pcf_find_value(pcftype, tt->gid);
if (pcfvalue != NULL) {
/* Ensure the label is the same, so we know that
* no collision occurred */
if (strcmp(pcfvalue->label, tt->label) != 0)
die("collision occurred in task type labels\n");
else
continue;
}
pcf_add_value(pcftype, tt->gid, tt->label);
}
}
//void
//task_create_pcf_types(struct pcf_type *pcftype, struct task_type *types)
//{
// /* Emit types for all task types */
// for (struct task_type *tt = types; tt != NULL; tt = tt->hh.next) {
// struct pcf_value *pcfvalue = pcf_find_value(pcftype, tt->gid);
// if (pcfvalue != NULL) {
// /* Ensure the label is the same, so we know that
// * no collision occurred */
// if (strcmp(pcfvalue->label, tt->label) != 0)
// die("collision occurred in task type labels");
// else
// continue;
// }
//
// pcf_add_value(pcftype, tt->gid, tt->label);
// }
//}
struct task *
task_get_running(struct task_stack *stack)

View File

@ -1,23 +1,70 @@
/* Copyright (c) 2022 Barcelona Supercomputing Center (BSC)
/* Copyright (c) 2022-2023 Barcelona Supercomputing Center (BSC)
* SPDX-License-Identifier: GPL-3.0-or-later */
#ifndef OVNI_EMU_TASK_H
#define OVNI_EMU_TASK_H
#ifndef TASK_H
#define TASK_H
#include "emu.h"
#include <stdint.h>
#include "uthash.h"
#include "pcf.h"
#include "thread.h"
enum task_state {
TASK_ST_CREATED,
TASK_ST_RUNNING,
TASK_ST_PAUSED,
TASK_ST_DEAD,
};
struct task_type {
uint32_t id; /* Per-process task identifier */
uint32_t gid; /* Global identifier computed from the label */
char label[MAX_PCF_LABEL];
UT_hash_handle hh;
};
struct task {
uint32_t id;
struct task_type *type;
/* TODO: Use a pointer to task_stack instead of thread */
/* The thread that has began to execute the task. It cannot
* changed after being set, even if the task ends. */
struct thread *thread;
enum task_state state;
UT_hash_handle hh;
/* List handle for nested task support */
struct task *next;
struct task *prev;
};
struct task_info {
/* Both hash maps of all known tasks and types */
struct task_type *types;
struct task *tasks;
};
struct task_stack {
union {
struct task *top; /* Synctactic sugar */
struct task *tasks;
};
struct thread *thread;
};
struct task *task_find(struct task *tasks, uint32_t task_id);
void task_create(struct ovni_emu *emu, struct task_info *info, uint32_t type_id, uint32_t task_id);
void task_execute(struct ovni_emu *emu, struct task_stack *stack, struct task *task);
void task_pause(struct ovni_emu *emu, struct task_stack *stack, struct task *task);
void task_resume(struct ovni_emu *emu, struct task_stack *stack, struct task *task);
void task_end(struct ovni_emu *emu, struct task_stack *stack, struct task *task);
int task_create(struct task_info *info, uint32_t type_id, uint32_t task_id);
int task_execute(struct task_stack *stack, struct task *task);
int task_pause(struct task_stack *stack, struct task *task);
int task_resume(struct task_stack *stack, struct task *task);
int task_end(struct task_stack *stack, struct task *task);
struct task_type *task_type_find(struct task_type *types, uint32_t type_id);
void task_type_create(struct task_info *info, uint32_t type_id, const char *label);
int task_type_create(struct task_info *info, uint32_t type_id, const char *label);
void task_create_pcf_types(struct pcf_type *pcftype, struct task_type *types);
//void task_create_pcf_types(struct pcf_type *pcftype, struct task_type *types);
struct task *task_get_running(struct task_stack *stack);
#endif /* OVNI_EMU_TASK_H */
#endif /* TASK_H */

View File

@ -18,6 +18,13 @@ static const int chan_stack[] = {
[TH_CHAN_FLUSH] = 1,
};
static const int chan_type[] = {
[TH_CHAN_TID] = 2,
[TH_CHAN_STATE] = 4,
[TH_CHAN_CPU] = 6,
[TH_CHAN_FLUSH] = 7,
};
static int
get_tid(const char *id, int *tid)
{
@ -60,12 +67,13 @@ thread_relpath_get_tid(const char *relpath, int *tid)
}
int
thread_init_begin(struct thread *thread, const char *relpath)
thread_init_begin(struct thread *thread, struct proc *proc, const char *relpath)
{
memset(thread, 0, sizeof(struct thread));
thread->state = TH_ST_UNKNOWN;
thread->gindex = -1;
thread->proc = proc;
if (snprintf(thread->id, PATH_MAX, "%s", relpath) >= PATH_MAX) {
err("relpath too long");
@ -111,18 +119,34 @@ thread_init_end(struct thread *th)
}
int
thread_connect(struct thread *th, struct bay *bay)
thread_connect(struct thread *th, struct bay *bay, struct recorder *rec)
{
if (!th->is_init) {
err("thread is not initialized");
return -1;
}
/* Get thread prv */
struct pvt *pvt = recorder_find_pvt(rec, "thread");
if (pvt == NULL) {
err("cannot find thread pvt");
return -1;
}
struct prv *prv = pvt_get_prv(pvt);
for (int i = 0; i < TH_CHAN_MAX; i++) {
if (bay_register(bay, &th->chan[i]) != 0) {
struct chan *c = &th->chan[i];
if (bay_register(bay, c) != 0) {
err("bay_register failed");
return -1;
}
long type = chan_type[i];
long row = th->gindex;
if (prv_register(prv, row, type, bay, c)) {
err("prv_register failed");
return -1;
}
}
return 0;
@ -171,6 +195,78 @@ thread_set_state(struct thread *th, enum thread_state state)
return 0;
}
int
thread_select_active(struct mux *mux,
struct value value,
struct mux_input **input)
{
if (value.type == VALUE_NULL) {
*input = NULL;
return 0;
}
if (value.type != VALUE_INT64) {
err("expecting NULL or INT64 channel value");
return -1;
}
enum thread_state state = (enum thread_state) value.i;
if (mux->ninputs != 1) {
err("expecting NULL or INT64 channel value");
return -1;
}
switch (state) {
case TH_ST_RUNNING:
case TH_ST_COOLING:
case TH_ST_WARMING:
*input = mux->input;
break;
default:
*input = NULL;
break;
}
return 0;
}
int
thread_select_running(struct mux *mux,
struct value value,
struct mux_input **input)
{
if (value.type == VALUE_NULL) {
*input = NULL;
return 0;
}
if (value.type != VALUE_INT64) {
err("expecting NULL or INT64 channel value");
return -1;
}
enum thread_state state = (enum thread_state) value.i;
if (mux->ninputs != 1) {
err("mux doesn't have one input but %d", mux->ninputs);
return -1;
}
switch (state) {
case TH_ST_RUNNING:
*input = mux->input;
break;
default:
*input = NULL;
break;
}
return 0;
}
int
thread_set_cpu(struct thread *th, struct cpu *cpu)
{

View File

@ -7,9 +7,13 @@
struct thread; /* Needed for cpu */
#include "cpu.h"
#include "proc.h"
#include "chan.h"
#include "bay.h"
#include "uthash.h"
#include "recorder.h"
#include "extend.h"
#include "mux.h"
#include <stddef.h>
#include <linux/limits.h>
@ -64,12 +68,13 @@ struct thread {
struct chan chan[TH_CHAN_MAX];
//struct model_ctx ctx;
struct extend ext;
UT_hash_handle hh; /* threads in the process */
};
int thread_relpath_get_tid(const char *relpath, int *tid);
int thread_init_begin(struct thread *thread, const char *relpath);
int thread_init_begin(struct thread *thread, struct proc *proc, const char *relpath);
int thread_init_end(struct thread *thread);
int thread_set_state(struct thread *th, enum thread_state state);
int thread_set_cpu(struct thread *th, struct cpu *cpu);
@ -77,6 +82,9 @@ int thread_unset_cpu(struct thread *th);
int thread_migrate_cpu(struct thread *th, struct cpu *cpu);
int thread_get_tid(struct thread *thread);
void thread_set_gindex(struct thread *th, int64_t gindex);
int thread_connect(struct thread *th, struct bay *bay);
int thread_connect(struct thread *th, struct bay *bay, struct recorder *rec);
int thread_select_active(struct mux *mux, struct value value, struct mux_input **input);
int thread_select_running(struct mux *mux, struct value value, struct mux_input **input);
#endif /* THREAD_H */

View File

@ -16,6 +16,9 @@ int main(void)
if (emu_init(&emu, argc, argv) != 0)
die("emu_init failed\n");
if (emu_connect(&emu) != 0)
die("emu_connect failed\n");
int ret = 0;
while ((ret = emu_step(&emu)) == 0) {