From 4a8255e2279488a29ca4455d7b1541ff99856788 Mon Sep 17 00:00:00 2001 From: Rodrigo Arias Date: Tue, 31 Jan 2023 18:23:41 +0100 Subject: [PATCH] Complete Nanos6 model --- src/emu/bay.c | 17 +- src/emu/chan.c | 3 +- src/emu/cpu.c | 35 +- src/emu/cpu.h | 2 + src/emu/emu.c | 1 + src/emu/model_nanos6.c | 982 ++++++++++++++++++++++++++++++++++------- src/emu/model_nanos6.h | 62 +-- src/emu/mux.c | 13 +- src/emu/mux.h | 1 + src/emu/proc.h | 2 + src/emu/prv.c | 16 +- src/emu/prv.h | 7 +- src/emu/thread.c | 4 +- src/emu/value.h | 2 + src/include/utlist.h | 3 +- 15 files changed, 909 insertions(+), 241 deletions(-) diff --git a/src/emu/bay.c b/src/emu/bay.c index 521128e..13efb58 100644 --- a/src/emu/bay.c +++ b/src/emu/bay.c @@ -25,7 +25,9 @@ cb_chan_is_dirty(struct chan *chan, void *arg) return -1; } + dbg("adding dirty chan %s", chan->name) DL_APPEND(bay->dirty, bchan); + return 0; } @@ -152,8 +154,7 @@ propagate_chan(struct bay_chan *bchan, enum bay_cb_type type) bchan->chan->name, propname[type]); struct bay_cb *cur = NULL; - struct bay_cb *tmp = NULL; - DL_FOREACH_SAFE(bchan->cb[type], cur, tmp) { + DL_FOREACH(bchan->cb[type], cur) { if (cur->func(bchan->chan, cur->arg) != 0) { err("propagate_chan: callback failed\n"); return -1; @@ -166,9 +167,9 @@ propagate_chan(struct bay_chan *bchan, enum bay_cb_type type) int bay_propagate(struct bay *bay) { - struct bay_chan *cur, *tmp; + struct bay_chan *cur; bay->state = BAY_PROPAGATING; - DL_FOREACH_SAFE(bay->dirty, cur, tmp) { + DL_FOREACH(bay->dirty, cur) { /* May add more dirty channels */ if (propagate_chan(cur, BAY_CB_DIRTY) != 0) { err("bay_propagate: propagate_chan failed\n"); @@ -176,10 +177,12 @@ bay_propagate(struct bay *bay) } } + dbg("<> dirty phase complete"); + /* Once the dirty callbacks have been propagated, * begin the emit stage */ bay->state = BAY_EMITTING; - DL_FOREACH_SAFE(bay->dirty, cur, tmp) { + DL_FOREACH(bay->dirty, cur) { /* May add more dirty channels */ if (propagate_chan(cur, BAY_CB_EMIT) != 0) { err("bay_propagate: propagate_chan failed\n"); @@ -187,11 +190,13 @@ bay_propagate(struct bay *bay) } } + dbg("<> emit phase complete"); + /* Flush channels after running all the dirty and emit * callbacks, so we capture any potential double write when * running the callbacks */ bay->state = BAY_FLUSHING; - DL_FOREACH_SAFE(bay->dirty, cur, tmp) { + DL_FOREACH(bay->dirty, cur) { if (chan_flush(cur->chan) != 0) { err("bay_propagate: chan_flush failed\n"); return -1; diff --git a/src/emu/chan.c b/src/emu/chan.c index 7aa4001..24eb481 100644 --- a/src/emu/chan.c +++ b/src/emu/chan.c @@ -1,7 +1,7 @@ /* Copyright (c) 2021-2022 Barcelona Supercomputing Center (BSC) * SPDX-License-Identifier: GPL-3.0-or-later */ -//#define ENABLE_DEBUG +#define ENABLE_DEBUG #include "chan.h" #include "common.h" @@ -53,6 +53,7 @@ set_dirty(struct chan *chan) chan->is_dirty = 1; if (chan->dirty_cb != NULL) { + dbg("%s: calling dirty callback", chan->name); if (chan->dirty_cb(chan, chan->dirty_arg) != 0) { err("%s: dirty callback failed", chan->name); return -1; diff --git a/src/emu/cpu.c b/src/emu/cpu.c index 4d27556..b5ec1a7 100644 --- a/src/emu/cpu.c +++ b/src/emu/cpu.c @@ -16,6 +16,8 @@ static const char *chan_name[] = { [CPU_CHAN_TID] = "tid_running", [CPU_CHAN_APPID] = "appid_running", [CPU_CHAN_FLUSH] = "flush_running", + [CPU_CHAN_THRUN] = "th_running", + [CPU_CHAN_THACT] = "th_active", }; static int chan_type[] = { @@ -24,6 +26,8 @@ static int chan_type[] = { [CPU_CHAN_NRUN] = 3, [CPU_CHAN_APPID] = 5, [CPU_CHAN_FLUSH] = 7, + [CPU_CHAN_THRUN] = -1, + [CPU_CHAN_THACT] = -1, }; void @@ -69,11 +73,18 @@ cpu_init_end(struct cpu *cpu) } for (int i = 0; i < CPU_CHAN_MAX; i++) { + if (chan_name[i] == NULL) + die("chan_name is null"); + chan_init(&cpu->chan[i], CHAN_SINGLE, chan_fmt, cpu->gindex, chan_name[i]); } chan_prop_set(&cpu->chan[CPU_CHAN_NRUN], CHAN_DUPLICATES, 1); + chan_prop_set(&cpu->chan[CPU_CHAN_TID], CHAN_DUPLICATES, 1); + chan_prop_set(&cpu->chan[CPU_CHAN_PID], CHAN_DUPLICATES, 1); + chan_prop_set(&cpu->chan[CPU_CHAN_THRUN], CHAN_DUPLICATES, 1); + chan_prop_set(&cpu->chan[CPU_CHAN_THACT], CHAN_DUPLICATES, 1); cpu->is_init = 1; @@ -104,8 +115,11 @@ cpu_connect(struct cpu *cpu, struct bay *bay, struct recorder *rec) } long type = chan_type[i]; + if (type < 0) + continue; + long row = cpu->gindex; - if (prv_register(prv, row, type, bay, c)) { + if (prv_register(prv, row, type, bay, c, PRV_DUP)) { err("prv_register failed"); return -1; } @@ -155,14 +169,17 @@ cpu_update(struct cpu *cpu) struct value tid_running; struct value pid_running; + struct value gid_running; if (running == 1) { cpu->th_running = th_running; tid_running = value_int64(th_running->tid); pid_running = value_int64(th_running->proc->pid); + gid_running = value_int64(th_running->gindex); } else { cpu->th_running = NULL; tid_running = value_null(); pid_running = value_null(); + gid_running = value_null(); } if (chan_set(&cpu->chan[CPU_CHAN_TID], tid_running) != 0) { @@ -173,17 +190,29 @@ cpu_update(struct cpu *cpu) err("chan_set pid failed"); return -1; } + if (chan_set(&cpu->chan[CPU_CHAN_THRUN], gid_running) != 0) { + err("chan_set gid_running failed"); + return -1; + } - if (active == 1) + struct value gid_active; + if (active == 1) { cpu->th_active = th_active; - else + gid_active = value_int64(th_active->gindex); + } else { cpu->th_active = NULL; + gid_active = value_null(); + } /* Update nth_running number in the channel */ if (chan_set(&cpu->chan[CPU_CHAN_NRUN], value_int64(running)) != 0) { err("chan_set nth_running failed"); return -1; } + if (chan_set(&cpu->chan[CPU_CHAN_THACT], gid_active) != 0) { + err("chan_set gid_active failed"); + return -1; + } return 0; } diff --git a/src/emu/cpu.h b/src/emu/cpu.h index 6892858..9853ba0 100644 --- a/src/emu/cpu.h +++ b/src/emu/cpu.h @@ -20,6 +20,8 @@ enum cpu_chan { CPU_CHAN_TID, CPU_CHAN_APPID, CPU_CHAN_FLUSH, + CPU_CHAN_THRUN, /* gindex */ + CPU_CHAN_THACT, /* gindex */ CPU_CHAN_MAX, }; diff --git a/src/emu/emu.c b/src/emu/emu.c index b210a80..bb98cfe 100644 --- a/src/emu/emu.c +++ b/src/emu/emu.c @@ -160,6 +160,7 @@ emu_step(struct emu *emu) if (bay_propagate(&emu->bay) != 0) { err("bay_propagate failed"); + panic(emu); return -1; } diff --git a/src/emu/model_nanos6.c b/src/emu/model_nanos6.c index 9140d99..ec6e6f8 100644 --- a/src/emu/model_nanos6.c +++ b/src/emu/model_nanos6.c @@ -10,130 +10,188 @@ #include "common.h" #include "chan.h" -/* Raw channels */ -static const char chan_cpu_fmt[] = "nanos6.cpu%ld.%s.raw"; -static const char chan_th_fmt[] = "nanos6.thread%ld.%s.raw"; +static const char chan_fmt_cpu_raw[] = "nanos6.cpu%ld.%s"; +//static const char chan_fmt_cpu_run[] = "nanos6.cpu%ld.%s.run"; +//static const char chan_fmt_cpu_act[] = "nanos6.cpu%ld.%s.act"; +static const char chan_fmt_th_raw[] = "nanos6.thread%ld.%s.raw"; +static const char chan_fmt_th_run[] = "nanos6.thread%ld.%s.run"; +static const char chan_fmt_th_act[] = "nanos6.thread%ld.%s.act"; -/* Channels filtered by tracking */ -//static const char chan_fcpu_fmt[] = "nanos6.cpu%ld.%s.filtered"; -static const char chan_fth_fmt[] = "nanos6.thread%ld.%s.filtered"; +/* Private enums */ +enum nanos6_chan_type { + CH_TASKID = 0, + CH_TYPE, + CH_SUBSYSTEM, + CH_RANK, + CH_THREAD, + CH_MAX, +}; + +enum nanos6_ss_state { + ST_TASK_BODY = 1, + ST_TASK_CREATING, + ST_TASK_SUBMIT, + ST_TASK_SPAWNING, + ST_TASK_FOR, + ST_SCHED_ADDING, + ST_SCHED_PROCESSING, + ST_SCHED_SERVING, + ST_DEP_REG, + ST_DEP_UNREG, + ST_BLK_TASKWAIT, + ST_BLK_WAITFOR, + ST_BLK_BLOCKING, + ST_BLK_UNBLOCKING, + ST_ALLOCATING, + ST_FREEING, + ST_HANDLING_TASK, + ST_WORKER_LOOP, + ST_SWITCH_TO, + ST_MIGRATE, + ST_SUSPEND, + ST_RESUME, + + /* Value 51 is broken in old Paraver */ + EV_SCHED_RECV = 60, + EV_SCHED_SEND, + EV_SCHED_SELF, + EV_CPU_IDLE, + EV_CPU_ACTIVE, + EV_SIGNAL, +}; + +enum nanos6_thread_type { + ST_TH_LEADER = 1, + ST_TH_MAIN = 2, + ST_TH_WORKER = 3, + ST_TH_EXTERNAL = 4, +}; static const char *chan_name[] = { - [NANOS6_CHAN_TASKID] = "taskid", - [NANOS6_CHAN_TYPE] = "task_type", - [NANOS6_CHAN_SUBSYSTEM] = "subsystem", - [NANOS6_CHAN_RANK] = "rank", - [NANOS6_CHAN_THREAD] = "thread_type", + [CH_TASKID] = "taskid", + [CH_TYPE] = "task_type", + [CH_SUBSYSTEM] = "subsystem", + [CH_RANK] = "rank", + [CH_THREAD] = "thread_type", }; static const char *th_track[] = { - [NANOS6_CHAN_TASKID] = "running", - [NANOS6_CHAN_TYPE] = "running", - [NANOS6_CHAN_SUBSYSTEM] = "active", - [NANOS6_CHAN_RANK] = "running", - [NANOS6_CHAN_THREAD] = "none", + [CH_TASKID] = "running", + [CH_TYPE] = "running", + [CH_SUBSYSTEM] = "active", + [CH_RANK] = "running", + [CH_THREAD] = "none", +}; + +static const char *cpu_track[] = { + [CH_TASKID] = "running", + [CH_TYPE] = "running", + [CH_SUBSYSTEM] = "running", + [CH_RANK] = "running", + [CH_THREAD] = "running", }; static const int chan_stack[] = { - [NANOS6_CHAN_SUBSYSTEM] = 1, - [NANOS6_CHAN_THREAD] = 1, + [CH_SUBSYSTEM] = 1, + [CH_THREAD] = 1, }; static const int th_type[] = { - [NANOS6_CHAN_TASKID] = 35, - [NANOS6_CHAN_TYPE] = 36, - [NANOS6_CHAN_SUBSYSTEM] = 37, - [NANOS6_CHAN_RANK] = 38, - [NANOS6_CHAN_THREAD] = 39, + [CH_TASKID] = 35, + [CH_TYPE] = 36, + [CH_SUBSYSTEM] = 37, + [CH_RANK] = 38, + [CH_THREAD] = 39, }; +static const int *cpu_type = th_type; + enum { PUSH = 1, POP = 2, IGN = 3 }; -#define CHSS NANOS6_CHAN_SUBSYSTEM -#define CHTT NANOS6_CHAN_THREAD +#define CHSS CH_SUBSYSTEM +#define CHTH CH_THREAD static const int ss_table[256][256][3] = { -['W'] = { - ['['] = { CHSS, PUSH, ST_NANOS6_WORKER_LOOP }, - [']'] = { CHSS, POP, ST_NANOS6_WORKER_LOOP }, - ['t'] = { CHSS, PUSH, ST_NANOS6_HANDLING_TASK }, - ['T'] = { CHSS, POP, ST_NANOS6_HANDLING_TASK }, - ['w'] = { CHSS, PUSH, ST_NANOS6_SWITCH_TO }, - ['W'] = { CHSS, POP, ST_NANOS6_SWITCH_TO }, - ['m'] = { CHSS, PUSH, ST_NANOS6_MIGRATE }, - ['M'] = { CHSS, POP, ST_NANOS6_MIGRATE }, - ['s'] = { CHSS, PUSH, ST_NANOS6_SUSPEND }, - ['S'] = { CHSS, POP, ST_NANOS6_SUSPEND }, - ['r'] = { CHSS, PUSH, ST_NANOS6_RESUME }, - ['R'] = { CHSS, POP, ST_NANOS6_RESUME }, - ['*'] = { CHSS, IGN, -1 }, -}, -['C'] = { - ['['] = { CHSS, PUSH, ST_NANOS6_TASK_CREATING }, - [']'] = { CHSS, POP, ST_NANOS6_TASK_CREATING }, -}, -['U'] = { - ['['] = { CHSS, PUSH, ST_NANOS6_TASK_SUBMIT }, - [']'] = { CHSS, POP, ST_NANOS6_TASK_SUBMIT }, -}, -['F'] = { - ['['] = { CHSS, PUSH, ST_NANOS6_TASK_SPAWNING }, - [']'] = { CHSS, POP, ST_NANOS6_TASK_SPAWNING }, -}, -['O'] = { - ['['] = { CHSS, PUSH, ST_NANOS6_TASK_FOR }, - [']'] = { CHSS, POP, ST_NANOS6_TASK_FOR }, -}, -['t'] = { - ['['] = { CHSS, PUSH, ST_NANOS6_TASK_BODY }, - [']'] = { CHSS, POP, ST_NANOS6_TASK_BODY }, -}, -['M'] = { - ['a'] = { CHSS, PUSH, ST_NANOS6_ALLOCATING }, - ['A'] = { CHSS, POP, ST_NANOS6_ALLOCATING }, - ['f'] = { CHSS, PUSH, ST_NANOS6_FREEING }, - ['F'] = { CHSS, POP, ST_NANOS6_FREEING }, -}, -['D'] = { - ['r'] = { CHSS, PUSH, ST_NANOS6_DEP_REG }, - ['R'] = { CHSS, POP, ST_NANOS6_DEP_REG }, - ['u'] = { CHSS, PUSH, ST_NANOS6_DEP_UNREG }, - ['U'] = { CHSS, POP, ST_NANOS6_DEP_UNREG }, -}, -['S'] = { - ['['] = { CHSS, PUSH, ST_NANOS6_SCHED_SERVING }, - [']'] = { CHSS, POP, ST_NANOS6_SCHED_SERVING }, - ['a'] = { CHSS, PUSH, ST_NANOS6_SCHED_ADDING }, - ['A'] = { CHSS, POP, ST_NANOS6_SCHED_ADDING }, - ['p'] = { CHSS, PUSH, ST_NANOS6_SCHED_PROCESSING }, - ['P'] = { CHSS, POP, ST_NANOS6_SCHED_PROCESSING }, - ['@'] = { CHSS, IGN, -1 }, - ['r'] = { CHSS, IGN, -1 }, - ['s'] = { CHSS, IGN, -1 }, -}, -['B'] = { - ['b'] = { CHSS, PUSH, ST_NANOS6_BLK_BLOCKING }, - ['B'] = { CHSS, POP, ST_NANOS6_BLK_BLOCKING }, - ['u'] = { CHSS, PUSH, ST_NANOS6_BLK_UNBLOCKING }, - ['U'] = { CHSS, POP, ST_NANOS6_BLK_UNBLOCKING }, - ['w'] = { CHSS, PUSH, ST_NANOS6_BLK_TASKWAIT }, - ['W'] = { CHSS, POP, ST_NANOS6_BLK_TASKWAIT }, - ['f'] = { CHSS, PUSH, ST_NANOS6_BLK_WAITFOR }, - ['F'] = { CHSS, POP, ST_NANOS6_BLK_WAITFOR }, -}, -['H'] = { - ['e'] = { CHTT, PUSH, ST_NANOS6_TH_EXTERNAL }, - ['E'] = { CHTT, POP, ST_NANOS6_TH_EXTERNAL }, - ['w'] = { CHTT, PUSH, ST_NANOS6_TH_WORKER }, - ['W'] = { CHTT, POP, ST_NANOS6_TH_WORKER }, - ['l'] = { CHTT, PUSH, ST_NANOS6_TH_LEADER }, - ['L'] = { CHTT, POP, ST_NANOS6_TH_LEADER }, - ['m'] = { CHTT, PUSH, ST_NANOS6_TH_MAIN }, - ['M'] = { CHTT, POP, ST_NANOS6_TH_MAIN }, -}, + ['W'] = { + ['['] = { CHSS, PUSH, ST_WORKER_LOOP }, + [']'] = { CHSS, POP, ST_WORKER_LOOP }, + ['t'] = { CHSS, PUSH, ST_HANDLING_TASK }, + ['T'] = { CHSS, POP, ST_HANDLING_TASK }, + ['w'] = { CHSS, PUSH, ST_SWITCH_TO }, + ['W'] = { CHSS, POP, ST_SWITCH_TO }, + ['m'] = { CHSS, PUSH, ST_MIGRATE }, + ['M'] = { CHSS, POP, ST_MIGRATE }, + ['s'] = { CHSS, PUSH, ST_SUSPEND }, + ['S'] = { CHSS, POP, ST_SUSPEND }, + ['r'] = { CHSS, PUSH, ST_RESUME }, + ['R'] = { CHSS, POP, ST_RESUME }, + ['*'] = { CHSS, IGN, -1 }, + }, + ['C'] = { + ['['] = { CHSS, PUSH, ST_TASK_CREATING }, + [']'] = { CHSS, POP, ST_TASK_CREATING }, + }, + ['U'] = { + ['['] = { CHSS, PUSH, ST_TASK_SUBMIT }, + [']'] = { CHSS, POP, ST_TASK_SUBMIT }, + }, + ['F'] = { + ['['] = { CHSS, PUSH, ST_TASK_SPAWNING }, + [']'] = { CHSS, POP, ST_TASK_SPAWNING }, + }, + ['O'] = { + ['['] = { CHSS, PUSH, ST_TASK_FOR }, + [']'] = { CHSS, POP, ST_TASK_FOR }, + }, + ['t'] = { + ['['] = { CHSS, PUSH, ST_TASK_BODY }, + [']'] = { CHSS, POP, ST_TASK_BODY }, + }, + ['M'] = { + ['a'] = { CHSS, PUSH, ST_ALLOCATING }, + ['A'] = { CHSS, POP, ST_ALLOCATING }, + ['f'] = { CHSS, PUSH, ST_FREEING }, + ['F'] = { CHSS, POP, ST_FREEING }, + }, + ['D'] = { + ['r'] = { CHSS, PUSH, ST_DEP_REG }, + ['R'] = { CHSS, POP, ST_DEP_REG }, + ['u'] = { CHSS, PUSH, ST_DEP_UNREG }, + ['U'] = { CHSS, POP, ST_DEP_UNREG }, + }, + ['S'] = { + ['['] = { CHSS, PUSH, ST_SCHED_SERVING }, + [']'] = { CHSS, POP, ST_SCHED_SERVING }, + ['a'] = { CHSS, PUSH, ST_SCHED_ADDING }, + ['A'] = { CHSS, POP, ST_SCHED_ADDING }, + ['p'] = { CHSS, PUSH, ST_SCHED_PROCESSING }, + ['P'] = { CHSS, POP, ST_SCHED_PROCESSING }, + ['@'] = { CHSS, IGN, -1 }, + ['r'] = { CHSS, IGN, -1 }, + ['s'] = { CHSS, IGN, -1 }, + }, + ['B'] = { + ['b'] = { CHSS, PUSH, ST_BLK_BLOCKING }, + ['B'] = { CHSS, POP, ST_BLK_BLOCKING }, + ['u'] = { CHSS, PUSH, ST_BLK_UNBLOCKING }, + ['U'] = { CHSS, POP, ST_BLK_UNBLOCKING }, + ['w'] = { CHSS, PUSH, ST_BLK_TASKWAIT }, + ['W'] = { CHSS, POP, ST_BLK_TASKWAIT }, + ['f'] = { CHSS, PUSH, ST_BLK_WAITFOR }, + ['F'] = { CHSS, POP, ST_BLK_WAITFOR }, + }, + ['H'] = { + ['e'] = { CHTH, PUSH, ST_TH_EXTERNAL }, + ['E'] = { CHTH, POP, ST_TH_EXTERNAL }, + ['w'] = { CHTH, PUSH, ST_TH_WORKER }, + ['W'] = { CHTH, POP, ST_TH_WORKER }, + ['l'] = { CHTH, PUSH, ST_TH_LEADER }, + ['L'] = { CHTH, POP, ST_TH_LEADER }, + ['m'] = { CHTH, PUSH, ST_TH_MAIN }, + ['M'] = { CHTH, POP, ST_TH_MAIN }, + }, }; - static int nanos6_probe(struct emu *emu) { @@ -144,9 +202,9 @@ nanos6_probe(struct emu *emu) } static int -init_chans(struct bay *bay, struct chan chans[], const char *fmt, int64_t gindex, int filtered) +init_chans(struct bay *bay, struct chan *chans, const char *fmt, int64_t gindex, int filtered) { - for (int i = 0; i < NANOS6_CHAN_MAX; i++) { + for (int i = 0; i < CH_MAX; i++) { struct chan *c = &chans[i]; int type = (chan_stack[i] && !filtered) ? CHAN_STACK : CHAN_SINGLE; chan_init(c, type, fmt, gindex, chan_name[i]); @@ -160,46 +218,143 @@ init_chans(struct bay *bay, struct chan chans[], const char *fmt, int64_t gindex return 0; } +static int +init_cpu(struct bay *bay, struct cpu *syscpu) +{ + struct nanos6_cpu *cpu = calloc(1, sizeof(struct nanos6_cpu)); + if (cpu == NULL) { + err("calloc failed:"); + return -1; + } + + cpu->ch = calloc(CH_MAX, sizeof(struct chan)); + if (cpu->ch == NULL) { + err("calloc failed:"); + return -1; + } + + cpu->mux = calloc(CH_MAX, sizeof(struct mux)); + if (cpu->mux == NULL) { + err("calloc failed:"); + return -1; + } + + if (init_chans(bay, cpu->ch, chan_fmt_cpu_raw, syscpu->gindex, 1) != 0) { + err("init_chans failed"); + return -1; + } + + extend_set(&syscpu->ext, '6', cpu); + return 0; +} + +static int +init_thread(struct bay *bay, struct thread *systh) +{ + struct nanos6_thread *th = calloc(1, sizeof(struct nanos6_thread)); + if (th == NULL) { + err("calloc failed:"); + return -1; + } + + th->ch = calloc(CH_MAX, sizeof(struct chan)); + if (th->ch == NULL) { + err("calloc failed:"); + return -1; + } + + th->ch_run = calloc(CH_MAX, sizeof(struct chan)); + if (th->ch_run == NULL) { + err("calloc failed:"); + return -1; + } + + th->ch_act = calloc(CH_MAX, sizeof(struct chan)); + if (th->ch_act == NULL) { + err("calloc failed:"); + return -1; + } + + th->ch_out = calloc(CH_MAX, sizeof(struct chan *)); + if (th->ch_out == NULL) { + err("calloc failed:"); + return -1; + } + + th->mux_run = calloc(CH_MAX, sizeof(struct mux)); + if (th->mux_run == NULL) { + err("calloc failed:"); + return -1; + } + + th->mux_act = calloc(CH_MAX, sizeof(struct mux)); + if (th->mux_act == NULL) { + err("calloc failed:"); + return -1; + } + + if (init_chans(bay, th->ch, chan_fmt_th_raw, systh->gindex, 0) != 0) { + err("init_chans failed"); + return -1; + } + + if (init_chans(bay, th->ch_run, chan_fmt_th_run, systh->gindex, 1) != 0) { + err("init_chans failed"); + return -1; + } + + if (init_chans(bay, th->ch_act, chan_fmt_th_act, systh->gindex, 1) != 0) { + err("init_chans failed"); + return -1; + } + + + th->task_stack.thread = systh; + + extend_set(&systh->ext, '6', th); + + return 0; +} + +static int +init_proc(struct proc *sysproc) +{ + struct nanos6_proc *proc = calloc(1, sizeof(struct nanos6_proc)); + if (proc == NULL) { + err("calloc failed:"); + return -1; + } + + extend_set(&sysproc->ext, '6', proc); + + return 0; +} + static int nanos6_create(struct emu *emu) { struct system *sys = &emu->system; struct bay *bay = &emu->bay; - /* Create nanos6 cpu data */ - struct nanos6_cpu *cpus = calloc(sys->ncpus, sizeof(*cpus)); - if (cpus == NULL) { - err("calloc failed:"); - return -1; - } - for (struct cpu *c = sys->cpus; c; c = c->next) { - struct nanos6_cpu *cpu = &cpus[c->gindex]; - if (init_chans(bay, cpu->chans, chan_cpu_fmt, c->gindex, 0) != 0) { - err("init_chans failed"); + if (init_cpu(bay, c) != 0) { + err("init_cpu failed"); return -1; } - extend_set(&c->ext, model_nanos6.model, cpu); - } - - /* Create nanos6 thread data */ - struct nanos6_thread *threads = calloc(sys->nthreads, sizeof(*threads)); - if (threads == NULL) { - err("calloc failed:"); - return -1; } for (struct thread *t = sys->threads; t; t = t->gnext) { - struct nanos6_thread *th = &threads[t->gindex]; - if (init_chans(bay, th->chans, chan_th_fmt, t->gindex, 0) != 0) { - err("init_chans failed"); + if (init_thread(bay, t) != 0) { + err("init_thread failed"); return -1; } - if (init_chans(bay, th->fchans, chan_fth_fmt, t->gindex, 1) != 0) { - err("init_chans failed"); + } + + for (struct proc *p = sys->procs; p; p = p->gnext) { + if (init_proc(p) != 0) { + err("init_proc failed"); return -1; } - extend_set(&t->ext, model_nanos6.model, th); } return 0; @@ -209,42 +364,51 @@ static int connect_thread_mux(struct emu *emu, struct thread *thread) { struct nanos6_thread *th = extend_get(&thread->ext, '6'); - for (int i = 0; i < NANOS6_CHAN_MAX; i++) { - struct mux *mux = &th->muxers[i]; - - const char *tracking = th_track[i]; - mux_select_func_t selfun; - - struct chan *inp = &th->chans[i]; + for (int i = 0; i < CH_MAX; i++) { /* TODO: Let the thread take the select channel * and build the mux as a tracking mode */ + struct chan *inp = &th->ch[i]; struct chan *sel = &thread->chan[TH_CHAN_STATE]; - if (strcmp(tracking, "running") == 0) { - selfun = thread_select_running; - } else if (strcmp(tracking, "active") == 0) { - selfun = thread_select_active; - } else { - th->ochans[i] = inp; - /* No tracking */ - continue; - } - - struct chan *out = &th->fchans[i]; - th->ochans[i] = out; - - if (mux_init(mux, &emu->bay, sel, out, selfun) != 0) { + struct mux *mux_run = &th->mux_run[i]; + mux_select_func_t selrun = thread_select_running; + if (mux_init(mux_run, &emu->bay, sel, &th->ch_run[i], selrun) != 0) { err("mux_init failed"); return -1; } - if (mux_add_input(mux, value_int64(0), inp) != 0) { + if (mux_add_input(mux_run, value_int64(0), inp) != 0) { err("mux_add_input failed"); return -1; } - /* Connect to prv output */ + struct mux *mux_act = &th->mux_act[i]; + mux_select_func_t selact = thread_select_active; + if (mux_init(mux_act, &emu->bay, sel, &th->ch_act[i], selact) != 0) { + err("mux_init failed"); + return -1; + } + + if (mux_add_input(mux_act, value_int64(0), inp) != 0) { + err("mux_add_input failed"); + return -1; + } + + if (mux_act->ninputs != 1) + die("expecting one input only"); + + /* The tracking only sets the ch_out, but we keep both tracking + * updated as the CPU tracking channels may use them. */ + const char *tracking = th_track[i]; + if (strcmp(tracking, "running") == 0) { + th->ch_out[i] = &th->ch_run[i]; + } else if (strcmp(tracking, "active") == 0) { + th->ch_out[i] = &th->ch_act[i]; + } else { + th->ch_out[i] = &th->ch[i]; + } + } return 0; @@ -254,11 +418,11 @@ static int connect_thread_prv(struct emu *emu, struct thread *thread, struct prv *prv) { struct nanos6_thread *th = extend_get(&thread->ext, '6'); - for (int i = 0; i < NANOS6_CHAN_MAX; i++) { - struct chan *out = &th->fchans[i]; + for (int i = 0; i < CH_MAX; i++) { + struct chan *out = th->ch_out[i]; long type = th_type[i]; long row = thread->gindex; - if (prv_register(prv, row, type, &emu->bay, out)) { + if (prv_register(prv, row, type, &emu->bay, out, PRV_DUP)) { err("prv_register failed"); return -1; } @@ -268,10 +432,70 @@ connect_thread_prv(struct emu *emu, struct thread *thread, struct prv *prv) } static int -nanos6_connect(struct emu *emu) +add_inputs_cpu_mux(struct emu *emu, struct mux *mux, int i) +{ + for (struct thread *t = emu->system.threads; t; t = t->gnext) { + struct nanos6_thread *th = extend_get(&t->ext, '6'); + + /* Choose input thread channel based on tracking mode */ + const char *tracking = cpu_track[i]; + struct chan *inp; + if (strcmp(tracking, "running") == 0) { + inp = &th->ch_run[i]; + } else if (strcmp(tracking, "active") == 0) { + inp = &th->ch_act[i]; + } else { + die("cpu tracking must be 'running' or 'active'"); + } + + if (mux_add_input(mux, value_int64(t->gindex), inp) != 0) { + err("mux_add_input failed"); + return -1; + } + } + + return 0; +} + +static int +connect_cpu_mux(struct emu *emu, struct cpu *scpu) +{ + struct nanos6_cpu *cpu = extend_get(&scpu->ext, '6'); + for (int i = 0; i < CH_MAX; i++) { + struct mux *mux = &cpu->mux[i]; + struct chan *out = &cpu->ch[i]; + const char *tracking = cpu_track[i]; + + /* Choose select CPU channel based on tracking mode */ + struct chan *sel; + if (strcmp(tracking, "running") == 0) { + sel = &scpu->chan[CPU_CHAN_THRUN]; + } else if (strcmp(tracking, "active") == 0) { + sel = &scpu->chan[CPU_CHAN_THACT]; + } else { + die("cpu tracking must be 'running' or 'active'"); + } + + if (mux_init(mux, &emu->bay, sel, out, NULL) != 0) { + err("mux_init failed"); + return -1; + } + + if (add_inputs_cpu_mux(emu, mux, i) != 0) { + err("add_inputs_cpu_mux failed"); + return -1; + } + } + + return 0; +} + +static int +connect_threads(struct emu *emu) { struct system *sys = &emu->system; + /* threads */ for (struct thread *t = sys->threads; t; t = t->gnext) { if (connect_thread_mux(emu, t) != 0) { err("connect_thread_mux failed"); @@ -297,6 +521,71 @@ nanos6_connect(struct emu *emu) return 0; } +static int +connect_cpu_prv(struct emu *emu, struct cpu *scpu, struct prv *prv) +{ + struct nanos6_cpu *cpu = extend_get(&scpu->ext, '6'); + for (int i = 0; i < CH_MAX; i++) { + struct chan *out = &cpu->ch[i]; + long type = cpu_type[i]; + long row = scpu->gindex; + if (prv_register(prv, row, type, &emu->bay, out, PRV_DUP)) { + err("prv_register failed"); + return -1; + } + } + + return 0; +} + +static int +connect_cpus(struct emu *emu) +{ + struct system *sys = &emu->system; + + /* cpus */ + for (struct cpu *c = sys->cpus; c; c = c->next) { + if (connect_cpu_mux(emu, c) != 0) { + err("connect_cpu_mux failed"); + return -1; + } + } + + /* Get cpu PRV */ + struct pvt *pvt = recorder_find_pvt(&emu->recorder, "cpu"); + if (pvt == NULL) { + err("cannot find cpu pvt"); + return -1; + } + struct prv *prv = pvt_get_prv(pvt); + + for (struct cpu *c = sys->cpus; c; c = c->next) { + if (connect_cpu_prv(emu, c, prv) != 0) { + err("connect_cpu_prv failed"); + return -1; + } + } + + return 0; +} + + +static int +nanos6_connect(struct emu *emu) +{ + if (connect_threads(emu) != 0) { + err("connect_threads failed"); + return -1; + } + + if (connect_cpus(emu) != 0) { + err("connect_cpus failed"); + return -1; + } + + return 0; +} + static int simple(struct emu *emu) { @@ -306,7 +595,7 @@ simple(struct emu *emu) int st = entry[2]; struct nanos6_thread *th = extend_get(&emu->thread->ext, '6'); - struct chan *ch = &th->chans[chind]; + struct chan *ch = &th->ch[chind]; if (action == PUSH) { return chan_push(ch, value_int64(st)); @@ -322,6 +611,373 @@ simple(struct emu *emu) return 0; } +/* --------------------------- pre ------------------------------- */ + +static int +chan_task_stopped(struct emu *emu) +{ + struct nanos6_thread *th = extend_get(&emu->thread->ext, '6'); + + struct value null = value_null(); + if (chan_set(&th->ch[CH_TASKID], null) != 0) { + err("chan_set taskid failed"); + return -1; + } + + if (chan_set(&th->ch[CH_TYPE], null) != 0) { + err("chan_set type failed"); + return -1; + } + + struct proc *proc = emu->proc; + if (proc->rank >= 0) { + if (chan_set(&th->ch[CH_RANK], null) != 0) { + err("chan_set rank failed"); + return -1; + } + } + + return 0; +} + +static int +chan_task_running(struct emu *emu, struct task *task) +{ + struct nanos6_thread *th = extend_get(&emu->thread->ext, '6'); + struct proc *proc = emu->proc; + + if (task->id == 0) { + err("task id cannot be 0"); + return -1; + } + if (task->type->gid == 0) { + err("task type gid cannot be 0"); + return -1; + } + + if (chan_set(&th->ch[CH_TASKID], value_int64(task->id)) != 0) { + err("chan_set taskid failed"); + return -1; + } + if (chan_set(&th->ch[CH_TYPE], value_int64(task->type->gid)) != 0) { + err("chan_set type failed"); + return -1; + } + if (proc->rank >= 0) { + struct value vrank = value_int64(proc->rank + 1); + if (chan_set(&th->ch[CH_RANK], vrank) != 0) { + err("chan_set rank failed"); + return -1; + } + } + + return 0; +} + +static int +chan_task_switch(struct emu *emu, + struct task *prev, struct task *next) +{ + struct nanos6_thread *th = extend_get(&emu->thread->ext, '6'); + + if (!prev || !next) { + err("cannot switch to or from a NULL task"); + return -1; + } + + if (prev == next) { + err("cannot switch to the same task"); + return -1; + } + + if (next->id == 0) { + err("next task id cannot be 0"); + return -1; + } + + if (next->type->gid == 0) { + err("next task type id cannot be 0"); + return -1; + } + + if (prev->thread != next->thread) { + err("cannot switch to a task of another thread"); + return -1; + } + + /* No need to change the rank as we will switch to tasks from + * same thread */ + if (chan_set(&th->ch[CH_TASKID], value_int64(next->id)) != 0) { + err("chan_set taskid failed"); + return -1; + } + + /* TODO: test when switching to another task with the same type. We + * should emit the same type state value as previous task. */ + if (chan_set(&th->ch[CH_TYPE], value_int64(next->type->gid)) != 0) { + err("chan_set type failed"); + return -1; + } + + return 0; +} + +static int +update_task_state(struct emu *emu) +{ + if (emu->ev->payload_size < 4) { + err("missing task id in payload"); + return -1; + } + + uint32_t task_id = emu->ev->payload->u32[0]; + + struct nanos6_thread *th = extend_get(&emu->thread->ext, '6'); + struct nanos6_proc *proc = extend_get(&emu->proc->ext, '6'); + + struct task_info *info = &proc->task_info; + struct task_stack *stack = &th->task_stack; + + struct task *task = task_find(info->tasks, task_id); + + if (task == NULL) { + err("cannot find task with id %u", task_id); + return -1; + } + + int ret = 0; + switch (emu->ev->v) { + case 'x': + ret = task_execute(stack, task); + break; + case 'e': + ret = task_end(stack, task); + break; + case 'p': + ret = task_pause(stack, task); + break; + case 'r': + ret = task_resume(stack, task); + break; + default: + err("unexpected Nanos6 task event"); + return -1; + } + + if (ret != 0) { + err("cannot change task state"); + return -1; + } + + return 0; +} + +static int +expand_transition_value(struct emu *emu, int was_running, int runs_now, char *tr_p) +{ + char tr = emu->ev->v; + + /* Ensure we don't clobber the value */ + if (tr == 'X' || tr == 'E') { + err("unexpected event value %c", tr); + return -1; + } + + /* Modify the event value to detect nested transitions */ + if (tr == 'x' && was_running) + tr = 'X'; /* Execute a new nested task */ + else if (tr == 'e' && runs_now) + tr = 'E'; /* End a nested task */ + + *tr_p = tr; + return 0; +} + +static int +update_task_channels(struct emu *emu, + char tr, struct task *prev, struct task *next) +{ + int ret = 0; + switch (tr) { + case 'x': + case 'r': + ret = chan_task_running(emu, next); + break; + case 'e': + case 'p': + ret = chan_task_stopped(emu); + break; + /* Additional nested transitions */ + case 'X': + case 'E': + ret = chan_task_switch(emu, prev, next); + break; + default: + err("unexpected transition value %c", tr); + return -1; + } + + if (ret != 0) { + err("cannot update task channels"); + return -1; + } + + return 0; +} + +static int +enforce_task_rules(struct emu *emu, char tr, + struct task *prev, struct task *next) + +{ + UNUSED(prev); + + if (tr != 'x' && tr != 'X') + return 0; + + /* If a task has just entered the running state, it must show + * the running task body subsystem */ + + if (next->state != TASK_ST_RUNNING) { + err("task not in running state on begin"); + return -1; + } + + struct nanos6_thread *th = extend_get(&emu->thread->ext, '6'); + struct value ss; + if (chan_read(&th->ch[CH_SUBSYSTEM], &ss) != 0) { + err("chan_read failed"); + return -1; + } + + if (ss.type == VALUE_INT64 && ss.i != ST_TASK_BODY) { + err("wrong subsystem state on task begin"); + //return -1; + return 0; // FIXME + } + + return 0; +} + +static int +update_task(struct emu *emu) +{ + struct nanos6_thread *th = extend_get(&emu->thread->ext, '6'); + struct task_stack *stack = &th->task_stack; + + struct task *prev = task_get_running(stack); + + /* Update the emulator state, but don't modify the channels */ + if (update_task_state(emu) != 0) { + err("update_task_state failed"); + return -1; + } + + struct task *next = task_get_running(stack); + + int was_running = (prev != NULL); + int runs_now = (next != NULL); + char tr; + if (expand_transition_value(emu, was_running, runs_now, &tr) != 0) { + err("expand_transition_value failed"); + return -1; + } + + /* Update the task related channels now */ + update_task_channels(emu, tr, prev, next); + + if (enforce_task_rules(emu, tr, prev, next) != 0) { + err("enforce_task_rules failed"); + return -1; + } + + return 0; +} + +static int +create_task(struct emu *emu) +{ + if (emu->ev->payload_size != 8) { + err("unexpected payload size"); + return -1; + } + + uint32_t task_id = emu->ev->payload->u32[0]; + uint32_t type_id = emu->ev->payload->u32[1]; + + struct nanos6_proc *proc = extend_get(&emu->proc->ext, '6'); + struct task_info *info = &proc->task_info; + + if (task_create(info, type_id, task_id) != 0) { + err("task_create failed"); + return -1; + } + + return 0; +} + +static int +pre_task(struct emu *emu) +{ + int ret = 0; + switch (emu->ev->v) { + case 'C': + err("warning: got old 6TC event, ignoring"); + break; + case 'c': + ret = create_task(emu); + break; + case 'x': + case 'e': + case 'r': + case 'p': + ret = update_task(emu); + break; + default: + err("unexpected Nanos6 task event value"); + return -1; + } + + if (ret != 0) { + err("cannot update task state"); + return -1; + } + + return 0; +} + +static int +pre_type(struct emu *emu) +{ + uint8_t value = emu->ev->v; + + if (value != 'c') { + err("unexpected event value %c", value); + return -1; + } + + if (!emu->ev->is_jumbo) { + err("expecting a jumbo event"); + return -1; + } + + const uint8_t *data = &emu->ev->payload->jumbo.data[0]; + uint32_t typeid = *(uint32_t *) data; + data += 4; + + const char *label = (const char *) data; + + struct nanos6_proc *proc = extend_get(&emu->proc->ext, '6'); + struct task_info *info = &proc->task_info; + + if (task_type_create(info, typeid, label) != 0) { + err("task_type_create failed"); + return -1; + } + + return 0; +} + static int process_ev(struct emu *emu) { @@ -342,12 +998,10 @@ process_ev(struct emu *emu) case 'B': case 'W': return simple(emu); -// case 'T': -// pre_task(emu); -// break; -// case 'Y': -// pre_type(emu); -// break; + case 'T': + return pre_task(emu); + case 'Y': + return pre_type(emu); default: err("unknown Nanos6 event category"); // return -1; diff --git a/src/emu/model_nanos6.h b/src/emu/model_nanos6.h index a4308fa..a0aa41e 100644 --- a/src/emu/model_nanos6.h +++ b/src/emu/model_nanos6.h @@ -12,65 +12,19 @@ extern struct model_spec model_nanos6; #include "mux.h" #include "task.h" -enum nanos6_chan_type { - NANOS6_CHAN_TASKID = 0, - NANOS6_CHAN_TYPE, - NANOS6_CHAN_SUBSYSTEM, - NANOS6_CHAN_RANK, - NANOS6_CHAN_THREAD, - NANOS6_CHAN_MAX, -}; - -enum nanos6_ss_state { - ST_NANOS6_TASK_BODY = 1, - ST_NANOS6_TASK_CREATING, - ST_NANOS6_TASK_SUBMIT, - ST_NANOS6_TASK_SPAWNING, - ST_NANOS6_TASK_FOR, - ST_NANOS6_SCHED_ADDING, - ST_NANOS6_SCHED_PROCESSING, - ST_NANOS6_SCHED_SERVING, - ST_NANOS6_DEP_REG, - ST_NANOS6_DEP_UNREG, - ST_NANOS6_BLK_TASKWAIT, - ST_NANOS6_BLK_WAITFOR, - ST_NANOS6_BLK_BLOCKING, - ST_NANOS6_BLK_UNBLOCKING, - ST_NANOS6_ALLOCATING, - ST_NANOS6_FREEING, - ST_NANOS6_HANDLING_TASK, - ST_NANOS6_WORKER_LOOP, - ST_NANOS6_SWITCH_TO, - ST_NANOS6_MIGRATE, - ST_NANOS6_SUSPEND, - ST_NANOS6_RESUME, - - /* Value 51 is broken in old Paraver */ - EV_NANOS6_SCHED_RECV = 60, - EV_NANOS6_SCHED_SEND, - EV_NANOS6_SCHED_SELF, - EV_NANOS6_CPU_IDLE, - EV_NANOS6_CPU_ACTIVE, - EV_NANOS6_SIGNAL, -}; - -enum nanos6_thread_type { - ST_NANOS6_TH_LEADER = 1, - ST_NANOS6_TH_MAIN = 2, - ST_NANOS6_TH_WORKER = 3, - ST_NANOS6_TH_EXTERNAL = 4, -}; - struct nanos6_thread { - struct chan chans[NANOS6_CHAN_MAX]; - struct chan fchans[NANOS6_CHAN_MAX]; - struct chan *ochans[NANOS6_CHAN_MAX]; - struct mux muxers[NANOS6_CHAN_MAX]; + struct chan *ch; /* Raw, modified by nanos6 */ + struct chan *ch_run; /* Tracking running thread */ + struct chan *ch_act; /* Tracking active thread */ + struct chan **ch_out; /* Output to PRV */ + struct mux *mux_run; + struct mux *mux_act; struct task_stack task_stack; }; struct nanos6_cpu { - struct chan chans[NANOS6_CHAN_MAX]; + struct chan *ch; + struct mux *mux; }; struct nanos6_proc { diff --git a/src/emu/mux.c b/src/emu/mux.c index 532bbe7..1d77bae 100644 --- a/src/emu/mux.c +++ b/src/emu/mux.c @@ -48,6 +48,8 @@ cb_select(struct chan *sel_chan, void *ptr) { struct mux *mux = ptr; + dbg("selecting input for output chan chan=%s", mux->output->name); + struct value sel_value; if (chan_read(sel_chan, &sel_value) != 0) { err("cb_select: chan_read(select) failed\n"); @@ -104,8 +106,8 @@ cb_input(struct chan *in_chan, void *ptr) /* Nothing to do, the input is not selected */ if (input == NULL || input->chan != in_chan) { - dbg("mux: input channel %s changed but not selected\n", - in_chan->name); + //dbg("input channel %s changed but not selected\n", + // in_chan->name); return 0; } @@ -119,6 +121,11 @@ cb_input(struct chan *in_chan, void *ptr) return -1; } + char buf[128]; + UNUSED(buf); + dbg("setting output chan %s to value %s", + mux->output->name, value_str(out_value, buf)); + if (chan_set(mux->output, out_value) != 0) { err("cb_input: chan_set() failed\n"); return -1; @@ -165,7 +172,7 @@ mux_init(struct mux *mux, * as the last output value, so we allow duplicates too */ chan_prop_set(output, CHAN_DUPLICATES, 1); - memset(mux, 0, sizeof(struct mux_input)); + memset(mux, 0, sizeof(struct mux)); mux->select = select; mux->output = output; diff --git a/src/emu/mux.h b/src/emu/mux.h index 42e3596..e7f2d9d 100644 --- a/src/emu/mux.h +++ b/src/emu/mux.h @@ -39,6 +39,7 @@ USE_RET int mux_init(struct mux *mux, USE_RET struct mux_input *mux_find_input(struct mux *mux, struct value key); +/* TODO: use an index to select the input in O(1) */ USE_RET int mux_add_input(struct mux *mux, struct value key, struct chan *input); diff --git a/src/emu/proc.h b/src/emu/proc.h index ac35e75..f3055dd 100644 --- a/src/emu/proc.h +++ b/src/emu/proc.h @@ -8,6 +8,7 @@ #include "thread.h" #include "parson.h" #include "uthash.h" +#include "extend.h" #include #include #include @@ -36,6 +37,7 @@ struct proc { //struct model_ctx ctx; UT_hash_handle hh; /* procs in the loom */ + struct extend ext; }; int proc_relpath_get_pid(const char *relpath, int *pid); diff --git a/src/emu/prv.c b/src/emu/prv.c index 06f283d..4240ebb 100644 --- a/src/emu/prv.c +++ b/src/emu/prv.c @@ -84,13 +84,16 @@ emit(struct prv *prv, struct prv_chan *rchan) } /* Ensure we don't emit the same value twice */ - if (rchan->last_value_set) { - /* TODO: skip optionally */ - if (value_is_equal(&value, &rchan->last_value)) { - char buf[128]; - err("skipping duplicated value %s for channel %s\n", + if (rchan->last_value_set && value_is_equal(&value, &rchan->last_value)) { + char buf[128]; + if (rchan->flags & PRV_DUP) { + dbg("skipping duplicated value %s for channel %s\n", value_str(value, buf), chan->name); return 0; + } else { + err("error duplicated value %s for channel %s\n", + value_str(value, buf), chan->name); + return -1; } } @@ -139,7 +142,7 @@ cb_prv(struct chan *chan, void *ptr) } int -prv_register(struct prv *prv, long row, long type, struct bay *bay, struct chan *chan) +prv_register(struct prv *prv, long row, long type, struct bay *bay, struct chan *chan, long flags) { /* FIXME: use the type instead of channel name as key */ struct prv_chan *rchan = find_prv_chan(prv, chan->name); @@ -160,6 +163,7 @@ prv_register(struct prv *prv, long row, long type, struct bay *bay, struct chan rchan->prv = prv; rchan->last_value = value_null(); rchan->last_value_set = 0; + rchan->flags = flags; /* Add emit callback */ if (bay_add_cb(bay, BAY_CB_EMIT, chan, cb_prv, rchan) != 0) { diff --git a/src/emu/prv.h b/src/emu/prv.h index 360f6b2..186e917 100644 --- a/src/emu/prv.h +++ b/src/emu/prv.h @@ -11,11 +11,16 @@ struct prv; +enum prv_flags { + PRV_DUP = 1, +}; + struct prv_chan { struct prv *prv; struct chan *chan; long row_base1; long type; + long flags; int last_value_set; struct value last_value; UT_hash_handle hh; /* Indexed by chan->name */ @@ -30,7 +35,7 @@ struct prv { int prv_open(struct prv *prv, long nrows, const char *path); int prv_open_file(struct prv *prv, long nrows, FILE *file); -int prv_register(struct prv *prv, long row, long type, struct bay *bay, struct chan *c); +int prv_register(struct prv *prv, long row, long type, struct bay *bay, struct chan *chan, long flags); int prv_advance(struct prv *prv, int64_t time); void prv_close(struct prv *prv); diff --git a/src/emu/thread.c b/src/emu/thread.c index 172ae1b..a8bd0ab 100644 --- a/src/emu/thread.c +++ b/src/emu/thread.c @@ -143,7 +143,7 @@ thread_connect(struct thread *th, struct bay *bay, struct recorder *rec) long type = chan_type[i]; long row = th->gindex; - if (prv_register(prv, row, type, bay, c)) { + if (prv_register(prv, row, type, bay, c, PRV_DUP)) { err("prv_register failed"); return -1; } @@ -213,7 +213,7 @@ thread_select_active(struct mux *mux, enum thread_state state = (enum thread_state) value.i; if (mux->ninputs != 1) { - err("expecting NULL or INT64 channel value"); + err("mux doesn't have one input but %d", mux->ninputs); return -1; } diff --git a/src/emu/value.h b/src/emu/value.h index 05b9e6a..871f5c1 100644 --- a/src/emu/value.h +++ b/src/emu/value.h @@ -32,6 +32,8 @@ value_is_equal(struct value *a, struct value *b) return 1; else if (a->type == VALUE_DOUBLE && a->d == b->d) return 1; + else if (a->type == VALUE_NULL && b->type == VALUE_NULL) + return 1; else return 0; } diff --git a/src/include/utlist.h b/src/include/utlist.h index 4155c42..9fdce4e 100644 --- a/src/include/utlist.h +++ b/src/include/utlist.h @@ -861,7 +861,8 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #define DL_FOREACH2(head, el, next) \ for ((el) = (head); el; (el) = (el)->next) -/* this version is safe for deleting the elements during iteration */ +/* this version is safe for deleting the elements during iteration (not for + * appending!) */ #define DL_FOREACH_SAFE(head, el, tmp) \ DL_FOREACH_SAFE2(head, el, tmp, next)