Begin porting the ovni model

This commit is contained in:
Rodrigo Arias 2023-01-27 18:51:18 +01:00 committed by Rodrigo Arias Mallo
parent 12bfd3fe26
commit 04029995be
41 changed files with 2084 additions and 2359 deletions

View File

@ -15,15 +15,15 @@ void vdie(const char *func, const char *errstr, ...);
/* clang-format off */ /* clang-format off */
#define err(...) verr(__func__, __VA_ARGS__);
#define die(...) vdie(__func__, __VA_ARGS__);
#ifdef ENABLE_DEBUG #ifdef ENABLE_DEBUG
# define dbg(...) fprintf(stderr, __VA_ARGS__); # define dbg(...) verr(__func__, __VA_ARGS__);
#else #else
# define dbg(...) # define dbg(...)
#endif #endif
#define err(...) verr(__func__, __VA_ARGS__);
#define die(...) vdie(__func__, __VA_ARGS__);
#define likely(x) __builtin_expect(!!(x), 1) #define likely(x) __builtin_expect(!!(x), 1)
#define unlikely(x) __builtin_expect(!!(x), 0) #define unlikely(x) __builtin_expect(!!(x), 0)
#define UNUSED(x) (void)(x) #define UNUSED(x) (void)(x)

View File

@ -20,7 +20,7 @@ add_library(emu STATIC
path.c path.c
metadata.c metadata.c
emu.c emu.c
#emu_system.c system.c
#emu_system_thread.c #emu_system_thread.c
emu_args.c emu_args.c
emu_stream.c emu_stream.c
@ -33,14 +33,9 @@ add_library(emu STATIC
mux.c mux.c
prv.c prv.c
clkoff.c clkoff.c
#ovni/probe.c model_ust.c
#ovni/create.c
#ovni/connect.c
#ovni/event.c
) )
add_subdirectory(ovni)
#add_library(emu STATIC #add_library(emu STATIC
# chan.c # chan.c
# emu.c # emu.c

View File

@ -1,4 +1,4 @@
#define ENABLE_DEBUG //#define ENABLE_DEBUG
#include "bay.h" #include "bay.h"
@ -6,10 +6,10 @@
#include "uthash.h" #include "uthash.h"
#include "utlist.h" #include "utlist.h"
static char *propname[BAY_CB_MAX] = { //static char *propname[BAY_CB_MAX] = {
[BAY_CB_DIRTY] = "dirty", // [BAY_CB_DIRTY] = "dirty",
[BAY_CB_EMIT] = "emit" // [BAY_CB_EMIT] = "emit"
}; //};
/* Called from the channel when it becomes dirty */ /* Called from the channel when it becomes dirty */
static int static int
@ -77,6 +77,8 @@ bay_register(struct bay *bay, struct chan *chan)
/* Add to hash table */ /* Add to hash table */
HASH_ADD_STR(bay->channels, chan->name, bchan); HASH_ADD_STR(bay->channels, chan->name, bchan);
dbg("registered %s", chan->name);
return 0; return 0;
} }

View File

@ -1,6 +1,8 @@
/* Copyright (c) 2021-2022 Barcelona Supercomputing Center (BSC) /* Copyright (c) 2021-2022 Barcelona Supercomputing Center (BSC)
* SPDX-License-Identifier: GPL-3.0-or-later */ * SPDX-License-Identifier: GPL-3.0-or-later */
//#define ENABLE_DEBUG
#include "chan.h" #include "chan.h"
#include "common.h" #include "common.h"
#include <string.h> #include <string.h>
@ -44,7 +46,7 @@ set_dirty(struct chan *chan)
if (chan->prop[CHAN_DIRTY_WRITE]) if (chan->prop[CHAN_DIRTY_WRITE])
return 0; return 0;
err("channel %s already dirty\n", chan->name); err("%s: already dirty\n", chan->name);
return -1; return -1;
} }
@ -52,8 +54,7 @@ set_dirty(struct chan *chan)
if (chan->dirty_cb != NULL) { if (chan->dirty_cb != NULL) {
if (chan->dirty_cb(chan, chan->dirty_arg) != 0) { if (chan->dirty_cb(chan, chan->dirty_arg) != 0) {
err("set_dirty %s: dirty callback failed\n", err("%s: dirty callback failed", chan->name);
chan->name);
return -1; return -1;
} }
} }
@ -69,8 +70,7 @@ check_duplicates(struct chan *chan, struct value *v)
return 0; return 0;
if (value_is_equal(&chan->last_value, v)) { if (value_is_equal(&chan->last_value, v)) {
err("check_duplicates %s: same value as last_value\n", err("%s: same value as last_value", chan->name);
chan->name);
return -1; return -1;
} }
@ -81,30 +81,28 @@ int
chan_set(struct chan *chan, struct value value) chan_set(struct chan *chan, struct value value)
{ {
if (chan->type != CHAN_SINGLE) { if (chan->type != CHAN_SINGLE) {
err("chan_set %s: cannot set on non-single channel\n", err("%s: cannot set on non-single channel", chan->name);
chan->name);
return -1; return -1;
} }
if (chan->is_dirty && !chan->prop[CHAN_DIRTY_WRITE]) { if (chan->is_dirty && !chan->prop[CHAN_DIRTY_WRITE]) {
err("chan_set %s: cannot modify dirty channel\n", err("%s: cannot modify dirty channel", chan->name);
chan->name);
return -1; return -1;
} }
if (check_duplicates(chan, &value) != 0) { if (check_duplicates(chan, &value) != 0) {
err("chan_set %s: cannot set a duplicated value\n", err("%s: cannot set a duplicated value", chan->name);
chan->name);
return -1; return -1;
} }
//char buf[128]; char buf[128];
//dbg("chan_set %s: sets value to %s\n", UNUSED(buf);
// chan->name, value_str(value, buf)); dbg("chan_set %s: sets value to %s\n",
chan->name, value_str(value, buf));
chan->data.value = value; chan->data.value = value;
if (set_dirty(chan) != 0) { if (set_dirty(chan) != 0) {
err("chan_set %s: set_dirty failed\n", chan->name); err("%s: set_dirty failed", chan->name);
return -1; return -1;
} }
@ -121,34 +119,31 @@ int
chan_push(struct chan *chan, struct value value) chan_push(struct chan *chan, struct value value)
{ {
if (chan->type != CHAN_STACK) { if (chan->type != CHAN_STACK) {
err("chan_push %s: cannot push on non-stack channel\n", err("%s: cannot push on non-stack channel", chan->name);
chan->name);
return -1; return -1;
} }
if (chan->is_dirty && !chan->prop[CHAN_DIRTY_WRITE]) { if (chan->is_dirty && !chan->prop[CHAN_DIRTY_WRITE]) {
err("chan_push %s: cannot modify dirty channel\n", err("%s: cannot modify dirty channel", chan->name);
chan->name);
return -1; return -1;
} }
if (check_duplicates(chan, &value) != 0) { if (check_duplicates(chan, &value) != 0) {
err("chan_push %s: cannot push a duplicated value\n", err("%s: cannot push a duplicated value", chan->name);
chan->name);
return -1; return -1;
} }
struct chan_stack *stack = &chan->data.stack; struct chan_stack *stack = &chan->data.stack;
if (stack->n >= MAX_CHAN_STACK) { if (stack->n >= MAX_CHAN_STACK) {
err("chan_push %s: channel stack full\n", chan->name); err("%s: channel stack full", chan->name);
return -1; return -1;
} }
stack->values[stack->n++] = value; stack->values[stack->n++] = value;
if (set_dirty(chan) != 0) { if (set_dirty(chan) != 0) {
err("chan_push %s: set_dirty failed\n", chan->name); err("%s: set_dirty failed", chan->name);
return -1; return -1;
} }
@ -166,28 +161,26 @@ int
chan_pop(struct chan *chan, struct value evalue) chan_pop(struct chan *chan, struct value evalue)
{ {
if (chan->type != CHAN_STACK) { if (chan->type != CHAN_STACK) {
err("chan_pop %s: cannot pop on non-stack channel\n", err("%s: cannot pop on non-stack channel", chan->name);
chan->name);
return -1; return -1;
} }
if (chan->is_dirty && !chan->prop[CHAN_DIRTY_WRITE]) { if (chan->is_dirty && !chan->prop[CHAN_DIRTY_WRITE]) {
err("chan_pop %s: cannot modify dirty channel\n", err("%s: cannot modify dirty channel", chan->name);
chan->name);
return -1; return -1;
} }
struct chan_stack *stack = &chan->data.stack; struct chan_stack *stack = &chan->data.stack;
if (stack->n <= 0) { if (stack->n <= 0) {
err("chan_pop %s: channel stack empty\n", chan->name); err("%s: channel stack empty", chan->name);
return -1; return -1;
} }
struct value *value = &stack->values[stack->n - 1]; struct value *value = &stack->values[stack->n - 1];
if (!value_is_equal(value, &evalue)) { if (!value_is_equal(value, &evalue)) {
err("chan_pop %s: unexpected value %ld (expected %ld)\n", err("%s: unexpected value %ld (expected %ld)",
chan->name, value->i, evalue.i); chan->name, value->i, evalue.i);
return -1; return -1;
} }
@ -195,7 +188,7 @@ chan_pop(struct chan *chan, struct value evalue)
stack->n--; stack->n--;
if (set_dirty(chan) != 0) { if (set_dirty(chan) != 0) {
err("chan_pop %s: set_dirty failed\n", chan->name); err("%s: set_dirty failed\n", chan->name);
return -1; return -1;
} }
@ -230,7 +223,7 @@ int
chan_flush(struct chan *chan) chan_flush(struct chan *chan)
{ {
if (!chan->is_dirty) { if (!chan->is_dirty) {
err("chan_flush %s: channel is not dirty\n", chan->name); err("%s: channel is not dirty", chan->name);
return -1; return -1;
} }

View File

@ -23,6 +23,12 @@ enum chan_prop {
CHAN_MAXPROP, CHAN_MAXPROP,
}; };
struct chan_spec {
enum chan_type type;
const char *suffix;
const char *desc;
};
struct chan_stack { struct chan_stack {
int n; int n;
struct value values[MAX_CHAN_STACK]; struct value values[MAX_CHAN_STACK];

View File

@ -7,12 +7,23 @@
#include "value.h" #include "value.h"
#include "utlist.h" #include "utlist.h"
static const char chan_fmt[] = "cpu%ld.%s";
static const char *chan_name[] = {
[CPU_CHAN_NRUN] = "nrunning",
[CPU_CHAN_PID] = "pid_running",
[CPU_CHAN_TID] = "tid_running",
[CPU_CHAN_APPID] = "appid_running",
[CPU_CHAN_FLUSH] = "flush_running",
};
void void
cpu_init(struct cpu *cpu, int phyid) cpu_init_begin(struct cpu *cpu, int phyid)
{ {
memset(cpu, 0, sizeof(struct cpu)); memset(cpu, 0, sizeof(struct cpu));
cpu->phyid = phyid; cpu->phyid = phyid;
err("cpu init %d", phyid);
} }
int int
@ -34,12 +45,56 @@ cpu_set_name(struct cpu *cpu, const char *name)
die("cpu name too long"); die("cpu name too long");
} }
int
cpu_init_end(struct cpu *cpu)
{
if (strlen(cpu->name) == 0) {
err("cpu name not set");
return -1;
}
if (cpu->gindex < 0) {
err("cpu gindex not set");
return -1;
}
for (int i = 0; i < CPU_CHAN_MAX; i++) {
chan_init(&cpu->chan[i], CHAN_SINGLE,
chan_fmt, cpu->gindex, chan_name[i]);
}
chan_prop_set(&cpu->chan[CPU_CHAN_NRUN], CHAN_DUPLICATES, 1);
cpu->is_init = 1;
return 0;
}
int
cpu_connect(struct cpu *cpu, struct bay *bay)
{
if (!cpu->is_init) {
err("cpu not initialized");
return -1;
}
for (int i = 0; i < CPU_CHAN_MAX; i++) {
if (bay_register(bay, &cpu->chan[i]) != 0) {
err("bay_register failed");
return -1;
}
}
return 0;
}
static struct thread * static struct thread *
find_thread(struct cpu *cpu, struct thread *thread) find_thread(struct cpu *cpu, struct thread *thread)
{ {
struct thread *p = NULL; struct thread *p = NULL;
DL_FOREACH2(cpu->thread, p, cpu_next) /* TODO use hash table */
DL_FOREACH2(cpu->threads, p, cpu_next)
{ {
if (p == thread) if (p == thread)
return p; return p;
@ -48,15 +103,15 @@ find_thread(struct cpu *cpu, struct thread *thread)
return NULL; return NULL;
} }
static int int
update_cpu(struct cpu *cpu) cpu_update(struct cpu *cpu)
{ {
struct thread *th = NULL; struct thread *th = NULL;
struct thread *th_running = NULL; struct thread *th_running = NULL;
struct thread *th_active = NULL; struct thread *th_active = NULL;
int active = 0, running = 0; int active = 0, running = 0;
DL_FOREACH2(cpu->thread, th, cpu_next) DL_FOREACH2(cpu->threads, th, cpu_next)
{ {
if (th->state == TH_ST_RUNNING) { if (th->state == TH_ST_RUNNING) {
th_running = th; th_running = th;
@ -71,13 +126,21 @@ update_cpu(struct cpu *cpu)
cpu->nth_running = running; cpu->nth_running = running;
cpu->nth_active = active; cpu->nth_active = active;
if (running == 1)
cpu->th_running = th_running; cpu->th_running = th_running;
else
cpu->th_running = NULL;
if (active == 1)
cpu->th_active = th_active; cpu->th_active = th_active;
else
cpu->th_active = NULL;
/* Update nth_running number in the channel */ /* Update nth_running number in the channel */
struct cpu_chan *chan = &cpu->chan; struct chan *nrun = &cpu->chan[CPU_CHAN_NRUN];
if (chan_set(&chan->nth_running, value_int64(cpu->nth_running)) != 0) { if (chan_set(nrun, value_int64(cpu->nth_running)) != 0) {
err("update_cpu: chan_set failed\n"); err("chan_set nth_running failed");
return -1; return -1;
} }
@ -89,14 +152,59 @@ int
cpu_add_thread(struct cpu *cpu, struct thread *thread) cpu_add_thread(struct cpu *cpu, struct thread *thread)
{ {
if (find_thread(cpu, thread) != NULL) { if (find_thread(cpu, thread) != NULL) {
err("cpu_add_thread: thread %d already assigned to %s\n", err("thread %d already assigned to %s",
thread->tid, cpu->name); thread->tid, cpu->name);
return -1; return -1;
} }
DL_APPEND2(cpu->thread, thread, cpu_prev, cpu_next); DL_APPEND2(cpu->threads, thread, cpu_prev, cpu_next);
cpu->nthreads++; cpu->nthreads++;
update_cpu(cpu); if (cpu_update(cpu) != 0) {
err("cpu_update failed");
return -1;
}
return 0;
}
int
cpu_remove_thread(struct cpu *cpu, struct thread *thread)
{
struct thread *t = find_thread(cpu, thread);
/* Not found, abort */
if (t == NULL) {
err("cannot remove missing thread %d from cpu %s\n",
thread->tid, cpu->name);
return -1;
}
DL_DELETE2(cpu->threads, thread, cpu_prev, cpu_next);
cpu->nthreads--;
if (cpu_update(cpu) != 0) {
err("cpu_update failed");
return -1;
}
return 0;
}
int
cpu_migrate_thread(struct cpu *cpu, struct thread *thread, struct cpu *newcpu)
{
if (cpu_remove_thread(cpu, thread) != 0) {
err("cannot remove thread %d from %s",
thread->tid, cpu->name);
return -1;
}
if (cpu_add_thread(newcpu, thread) != 0) {
err("cannot add thread %d to %s",
thread->tid, cpu->name);
return -1;
}
return 0; return 0;
} }

View File

@ -4,22 +4,27 @@
#ifndef CPU_H #ifndef CPU_H
#define CPU_H #define CPU_H
struct cpu; struct cpu; /* Needed for thread */
#include "thread.h" #include "thread.h"
#include "chan.h" #include "chan.h"
#include "bay.h"
#include "uthash.h" #include "uthash.h"
#include <linux/limits.h> #include <linux/limits.h>
struct cpu_chan { enum cpu_chan {
struct chan pid_running; CPU_CHAN_NRUN = 0,
struct chan tid_running; CPU_CHAN_PID,
struct chan nth_running; CPU_CHAN_TID,
CPU_CHAN_APPID,
CPU_CHAN_FLUSH,
CPU_CHAN_MAX,
}; };
struct cpu { struct cpu {
size_t gindex; /* In the system */ int64_t gindex; /* In the system */
char name[PATH_MAX]; char name[PATH_MAX];
int is_init;
/* Logical index: 0 to ncpus - 1 */ /* Logical index: 0 to ncpus - 1 */
//int index; //int index;
@ -30,9 +35,9 @@ struct cpu {
size_t nthreads; size_t nthreads;
size_t nth_running; size_t nth_running;
size_t nth_active; size_t nth_active;
struct thread *thread; /* List of threads assigned to this CPU */ struct thread *threads; /* List of threads assigned to this CPU */
struct thread *th_running; /* One */ struct thread *th_running; /* Unique thread or NULL */
struct thread *th_active; struct thread *th_active; /* Unique thread or NULL */
int is_virtual; int is_virtual;
@ -45,18 +50,24 @@ struct cpu {
struct cpu *prev; struct cpu *prev;
/* Channels */ /* Channels */
struct cpu_chan chan; struct chan chan[CPU_CHAN_MAX];
//struct model_ctx ctx; //struct model_ctx ctx;
UT_hash_handle hh; /* CPUs in the loom */ UT_hash_handle hh; /* CPUs in the loom */
}; };
void cpu_init(struct cpu *cpu, int phyid); void cpu_init_begin(struct cpu *cpu, int phyid);
int cpu_get_phyid(struct cpu *cpu); int cpu_get_phyid(struct cpu *cpu);
//int cpu_get_index(struct cpu *cpu); //int cpu_get_index(struct cpu *cpu);
void cpu_set_gindex(struct cpu *cpu, int64_t gindex); void cpu_set_gindex(struct cpu *cpu, int64_t gindex);
void cpu_set_name(struct cpu *cpu, const char *name); void cpu_set_name(struct cpu *cpu, const char *name);
int cpu_init_end(struct cpu *cpu);
int cpu_connect(struct cpu *cpu, struct bay *bay);
int cpu_update(struct cpu *cpu);
int cpu_add_thread(struct cpu *cpu, struct thread *thread); int cpu_add_thread(struct cpu *cpu, struct thread *thread);
int cpu_remove_thread(struct cpu *cpu, struct thread *thread);
int cpu_migrate_thread(struct cpu *cpu, struct thread *thread, struct cpu *newcpu);
#endif /* CPU_H */ #endif /* CPU_H */

View File

@ -6,7 +6,7 @@
#include "emu.h" #include "emu.h"
#include <unistd.h> #include <unistd.h>
#include "ovni/ovni_model.h" #include "model_ust.h"
int int
emu_init(struct emu *emu, int argc, char *argv[]) emu_init(struct emu *emu, int argc, char *argv[])
@ -22,43 +22,71 @@ emu_init(struct emu *emu, int argc, char *argv[])
return -1; return -1;
} }
// /* Parse the trace and build the emu_system */ /* Parse the trace and build the emu_system */
// if (emu_system_init(&emu->system, &emu->args, &emu->trace) != 0) { if (system_init(&emu->system, &emu->args, &emu->trace) != 0) {
// err("emu_init: cannot init system for trace '%s'\n", err("cannot init system for trace '%s'\n",
// emu->args.tracedir); emu->args.tracedir);
// return -1; return -1;
// } }
//
// if (emu_player_init(&emu->player, &emu->trace) != 0) { /* Initialize the bay */
// err("emu_init: cannot init player for trace '%s'\n", bay_init(&emu->bay);
// emu->args.tracedir);
// return -1; /* Connect system channels to bay */
// } if (system_connect(&emu->system, &emu->bay) != 0) {
// err("system_connect failed");
// /* Initialize the bay */ return -1;
// bay_init(&emu->bay); }
//
if (emu_player_init(&emu->player, &emu->trace) != 0) {
err("emu_init: cannot init player for trace '%s'\n",
emu->args.tracedir);
return -1;
}
// /* Register all the models */ // /* Register all the models */
// emu_model_register(&emu->model, &ovni_model_spec, emu); // emu_model_register(&emu->model, &ovni_model_spec, emu);
// //
// if (ovni_model_spec.probe(emu) != 0) {
// err("emu_init: ovni probe failed\n");
// return -1;
// }
//
// if (ovni_model_spec.create(emu) != 0) {
// err("emu_init: ovni create failed\n");
// return -1;
// }
//
// if (ovni_model_spec.connect(emu) != 0) {
// err("emu_init: ovni connect failed\n");
// return -1;
// }
return 0; return 0;
} }
static void
set_current(struct emu *emu)
{
emu->ev = emu_player_ev(&emu->player);
emu->stream = emu_player_stream(&emu->player);
struct lpt *lpt = system_get_lpt(emu->stream);
emu->loom = lpt->loom;
emu->proc = lpt->proc;
emu->thread = lpt->thread;
}
static void
panic(struct emu *emu)
{
err("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@");
err("@@@@@@@@@@@@@@ EMULATOR PANIC @@@@@@@@@@@@@");
err("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@");
if (emu->ev != NULL) {
err("event: ");
err(" mcv=%s", emu->ev->mcv);
err(" rclock=%ld", emu->ev->rclock);
err(" sclock=%ld", emu->ev->sclock);
err(" dclock=%ld", emu->ev->dclock);
err(" payload_size=%ld", emu->ev->payload_size);
err(" is_jumbo=%d", emu->ev->is_jumbo);
}
if (emu->stream != NULL) {
err("stream: ");
err(" relpath=%s", emu->stream->relpath);
err(" offset=%ld", emu->stream->offset);
err(" clock_offset=%ld", emu->stream->clock_offset);
}
err("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@");
}
int int
emu_step(struct emu *emu) emu_step(struct emu *emu)
{ {
@ -74,17 +102,19 @@ emu_step(struct emu *emu)
return -1; return -1;
} }
// emu->ev = emu_player_ev(&emu->player); set_current(emu);
// emu->stream = emu_player_stream(&emu->player);
// emu->thread = emu_system_get_thread(emu->stream); /* Otherwise progress */
// emu->proc = emu->thread->proc; if (emu->ev->m == 'O' && model_ust.event(emu) != 0) {
// emu->loom = emu->proc->loom; err("ovni event failed");
// panic(emu);
// /* Otherwise progress */ return -1;
// if (ovni_model_spec.event(emu) != 0) { }
// err("emu_init: ovni event failed\n");
// return -1; if (bay_propagate(&emu->bay) != 0) {
// } err("bay_propagate failed");
return -1;
}
return 0; return 0;
} }

View File

@ -10,7 +10,7 @@ struct emu;
#include "pvtrace.h" #include "pvtrace.h"
#include "emu_trace.h" #include "emu_trace.h"
#include "emu_args.h" #include "emu_args.h"
#include "emu_system.h" #include "system.h"
#include "emu_player.h" #include "emu_player.h"
#include "emu_model.h" #include "emu_model.h"
#include "emu_ev.h" #include "emu_ev.h"
@ -26,16 +26,16 @@ struct emu {
struct emu_args args; struct emu_args args;
struct emu_trace trace; struct emu_trace trace;
struct emu_system system; struct system system;
struct emu_player player; struct emu_player player;
struct emu_model model; struct emu_model model;
/* Quick access */ /* Quick access */
struct emu_stream *stream; struct emu_stream *stream;
struct emu_ev *ev; struct emu_ev *ev;
struct emu_thread *thread; struct thread *thread;
struct emu_proc *proc; struct proc *proc;
struct emu_loom *loom; struct loom *loom;
}; };
int emu_init(struct emu *emu, int argc, char *argv[]); int emu_init(struct emu *emu, int argc, char *argv[]);

View File

@ -1,6 +0,0 @@
/* Copyright (c) 2021-2023 Barcelona Supercomputing Center (BSC)
* SPDX-License-Identifier: GPL-3.0-or-later */
#include "emu_system.h"
#include "chan.h"

View File

@ -81,6 +81,12 @@ cb_nftw(const char *fpath, const struct stat *sb,
return load_stream(cur_trace, fpath); return load_stream(cur_trace, fpath);
} }
static int
cmp_streams(struct emu_stream *a, struct emu_stream *b)
{
return strcmp(a->relpath, b->relpath);
}
int int
emu_trace_load(struct emu_trace *trace, const char *tracedir) emu_trace_load(struct emu_trace *trace, const char *tracedir)
{ {
@ -101,6 +107,9 @@ emu_trace_load(struct emu_trace *trace, const char *tracedir)
cur_trace = NULL; cur_trace = NULL;
/* Sort the streams */
DL_SORT(trace->streams, cmp_streams);
err("emu_trace_load: loaded %ld streams\n", trace->nstreams); err("emu_trace_load: loaded %ld streams\n", trace->nstreams);
return 0; return 0;

View File

@ -6,6 +6,7 @@
#include <string.h> #include <string.h>
#include "cpu.h" #include "cpu.h"
#include "proc.h" #include "proc.h"
#include "thread.h"
#include "uthash.h" #include "uthash.h"
#include "utlist.h" #include "utlist.h"
@ -62,7 +63,10 @@ loom_init_begin(struct loom *loom, const char *name)
set_hostname(loom->hostname, loom->name); set_hostname(loom->hostname, loom->name);
loom->id = loom->name; loom->id = loom->name;
loom->is_ready = 0;
cpu_init_begin(&loom->vcpu, -1);
err("creating new loom %s", loom->id);
return 0; return 0;
} }
@ -73,11 +77,17 @@ loom_set_gindex(struct loom *loom, int64_t gindex)
loom->gindex = gindex; loom->gindex = gindex;
} }
int64_t
loom_get_gindex(struct loom *loom)
{
return loom->gindex;
}
struct cpu * struct cpu *
loom_find_cpu(struct loom *loom, int phyid) loom_find_cpu(struct loom *loom, int phyid)
{ {
if (phyid == -1) if (phyid == -1)
return loom->vcpu; return &loom->vcpu;
struct cpu *cpu = NULL; struct cpu *cpu = NULL;
HASH_FIND_INT(loom->cpus, &phyid, cpu); HASH_FIND_INT(loom->cpus, &phyid, cpu);
@ -99,8 +109,8 @@ loom_add_cpu(struct loom *loom, struct cpu *cpu)
return -1; return -1;
} }
if (loom->is_ready) { if (loom->is_init) {
err("cannot modify CPUs of ready loom"); err("cannot modify CPUs of initialized loom");
return -1; return -1;
} }
@ -111,14 +121,28 @@ loom_add_cpu(struct loom *loom, struct cpu *cpu)
return 0; return 0;
} }
void struct cpu *
loom_set_vcpu(struct loom *loom, struct cpu *vcpu) loom_get_vcpu(struct loom *loom)
{ {
loom->vcpu = vcpu; return &loom->vcpu;
} }
static int static int
cmp_cpus(struct cpu *c1, struct cpu *c2) by_pid(struct proc *p1, struct proc *p2)
{
int id1 = proc_get_pid(p1);
int id2 = proc_get_pid(p2);
if (id1 < id2)
return -1;
if (id1 > id2)
return +1;
else
return 0;
}
static int
by_phyid(struct cpu *c1, struct cpu *c2)
{ {
int id1 = cpu_get_phyid(c1); int id1 = cpu_get_phyid(c1);
int id2 = cpu_get_phyid(c2); int id2 = cpu_get_phyid(c2);
@ -131,12 +155,20 @@ cmp_cpus(struct cpu *c1, struct cpu *c2)
return 0; return 0;
} }
void
loom_sort(struct loom *loom)
{
HASH_SORT(loom->procs, by_pid);
HASH_SORT(loom->cpus, by_phyid);
for (struct proc *p = loom->procs; p; p = p->hh.next) {
proc_sort(p);
}
}
int int
loom_init_end(struct loom *loom) loom_init_end(struct loom *loom)
{ {
/* Sort CPUs by phyid */
DL_SORT2(loom->scpus, cmp_cpus, lprev, lnext);
/* Set rank enabled */ /* Set rank enabled */
for (struct proc *p = loom->procs; p; p = p->hh.next) { for (struct proc *p = loom->procs; p; p = p->hh.next) {
if (p->rank >= 0) { if (p->rank >= 0) {
@ -145,7 +177,7 @@ loom_init_end(struct loom *loom)
} }
} }
loom->is_ready = 1; loom->is_init = 1;
return 0; return 0;
} }
@ -158,6 +190,18 @@ loom_find_proc(struct loom *loom, int pid)
return proc; return proc;
} }
struct thread *
loom_find_thread(struct loom *loom, int tid)
{
for (struct proc *p = loom->procs; p; p = p->hh.next) {
struct thread *thread = proc_find_thread(p, tid);
if (thread != NULL)
return thread;
}
return NULL;
}
int int
loom_add_proc(struct loom *loom, struct proc *proc) loom_add_proc(struct loom *loom, struct proc *proc)
{ {
@ -168,8 +212,8 @@ loom_add_proc(struct loom *loom, struct proc *proc)
return -1; return -1;
} }
if (loom->is_ready) { if (loom->is_init) {
err("cannot modify procs of ready loom"); err("cannot modify procs of loom initialized");
return -1; return -1;
} }

View File

@ -4,16 +4,19 @@
#ifndef LOOM_H #ifndef LOOM_H
#define LOOM_H #define LOOM_H
struct loom; //struct loom;
#include <stddef.h> #include <stddef.h>
#include <stdint.h> #include <stdint.h>
#include <linux/limits.h> #include <linux/limits.h>
#include <sys/types.h> #include <sys/types.h>
#include "cpu.h"
#include "proc.h"
#include "thread.h"
struct loom { struct loom {
size_t gindex; size_t gindex;
int is_ready; int is_init;
char name[PATH_MAX]; char name[PATH_MAX];
char hostname[PATH_MAX]; char hostname[PATH_MAX];
@ -27,14 +30,11 @@ struct loom {
int64_t clock_offset; int64_t clock_offset;
/* Sorted double linked list of CPUs by phyid */
struct cpu *scpus;
/* Physical CPUs hash table by phyid */ /* Physical CPUs hash table by phyid */
struct cpu *cpus; struct cpu *cpus;
/* Virtual CPU */ /* Virtual CPU */
struct cpu *vcpu; struct cpu vcpu;
/* Local list */ /* Local list */
size_t nprocs; size_t nprocs;
@ -51,10 +51,14 @@ int loom_matches(const char *relpath);
int loom_init_begin(struct loom *loom, const char *name); int loom_init_begin(struct loom *loom, const char *name);
int loom_init_end(struct loom *loom); int loom_init_end(struct loom *loom);
int loom_add_cpu(struct loom *loom, struct cpu *cpu); int loom_add_cpu(struct loom *loom, struct cpu *cpu);
int64_t loom_get_gindex(struct loom *loom);
void loom_set_gindex(struct loom *loom, int64_t gindex); void loom_set_gindex(struct loom *loom, int64_t gindex);
struct cpu *loom_find_cpu(struct loom *loom, int phyid); struct cpu *loom_find_cpu(struct loom *loom, int phyid);
void loom_set_vcpu(struct loom *loom, struct cpu *vcpu); void loom_set_vcpu(struct loom *loom, struct cpu *vcpu);
struct cpu *loom_get_vcpu(struct loom *loom);
struct proc *loom_find_proc(struct loom *loom, pid_t pid); struct proc *loom_find_proc(struct loom *loom, pid_t pid);
struct thread *loom_find_thread(struct loom *loom, int tid);
int loom_add_proc(struct loom *loom, struct proc *proc); int loom_add_proc(struct loom *loom, struct proc *proc);
void loom_sort(struct loom *loom);
#endif /* LOOM_H */ #endif /* LOOM_H */

View File

@ -3,6 +3,7 @@
#include "metadata.h" #include "metadata.h"
#include "ovni.h"
#include "parson.h" #include "parson.h"
static JSON_Object * static JSON_Object *
@ -20,7 +21,7 @@ load_json(const char *path)
return NULL; return NULL;
} }
return 0; return meta;
} }
static int static int
@ -104,7 +105,7 @@ load_cpus(struct loom *loom, JSON_Object *meta)
return -1; return -1;
} }
cpu_init(cpu, phyid); cpu_init_begin(cpu, phyid);
if (loom_add_cpu(loom, cpu) != 0) { if (loom_add_cpu(loom, cpu) != 0) {
err("loom_add_cpu() failed"); err("loom_add_cpu() failed");

445
src/emu/model_ust.c Normal file
View File

@ -0,0 +1,445 @@
/* Copyright (c) 2021-2023 Barcelona Supercomputing Center (BSC)
* SPDX-License-Identifier: GPL-3.0-or-later */
#define ENABLE_DEBUG
#include "model_ust.h"
#include "emu.h"
#include "loom.h"
#include "common.h"
static int
pre_thread_execute(struct emu *emu, struct thread *th)
{
/* The thread cannot be already running */
if (th->state == TH_ST_RUNNING) {
err("cannot execute thread %d, is already running", th->tid);
return -1;
}
int cpuid = emu->ev->payload->i32[0];
struct cpu *cpu = loom_find_cpu(emu->loom, cpuid);
if (cpu == NULL) {
err("cannot find CPU with phyid %d in loom %s",
cpuid, emu->loom->id)
return -1;
}
dbg("thread %d runs in %s", th->tid, cpu->name);
/* First set the CPU in the thread */
thread_set_cpu(th, cpu);
/* Then set the thread to running state */
thread_set_state(th, TH_ST_RUNNING);
/* And then add the thread to the CPU, so tracking channels see the
* updated thread state */
cpu_add_thread(cpu, th);
return 0;
}
static int
pre_thread_end(struct thread *th)
{
if (th->state != TH_ST_RUNNING && th->state != TH_ST_COOLING) {
err("cannot end thread %d: state not running or cooling",
th->tid);
return -1;
}
if (thread_set_state(th, TH_ST_DEAD) != 0) {
err("cannot set thread %d state", th->tid);
return -1;
}
if (cpu_remove_thread(th->cpu, th) != 0) {
err("cannot remove thread %d from %s",
th->tid, th->cpu->name);
return -1;
}
if (thread_unset_cpu(th) != 0) {
err("cannot unset cpu from thread %d", th->tid);
return -1;
}
return 0;
}
static int
pre_thread_pause(struct thread *th)
{
if (th->state != TH_ST_RUNNING && th->state != TH_ST_COOLING) {
err("cannot pause thread %d: state not running or cooling\n",
th->tid);
return -1;
}
if (thread_set_state(th, TH_ST_PAUSED) != 0) {
err("cannot set thread %d state", th->tid);
return -1;
}
if (cpu_update(th->cpu) != 0) {
err("cpu_update failed for %s", th->cpu->name);
return -1;
}
return 0;
}
static int
pre_thread_resume(struct thread *th)
{
if (th->state != TH_ST_PAUSED && th->state != TH_ST_WARMING) {
err("cannot resume thread %d: state not paused or warming",
th->tid);
return -1;
}
if (thread_set_state(th, TH_ST_RUNNING) != 0) {
err("cannot set thread %d state", th->tid);
return -1;
}
if (cpu_update(th->cpu) != 0) {
err("cpu_update failed for %s", th->cpu->name);
return -1;
}
return 0;
}
static int
pre_thread_cool(struct thread *th)
{
if (th->state != TH_ST_RUNNING) {
err("cannot cool thread %d: state not running", th->tid);
return -1;
}
if (thread_set_state(th, TH_ST_COOLING) != 0) {
err("cannot set thread %d state", th->tid);
return -1;
}
if (cpu_update(th->cpu) != 0) {
err("cpu_update failed for %s", th->cpu->name);
return -1;
}
return 0;
}
static int
pre_thread_warm(struct thread *th)
{
if (th->state != TH_ST_PAUSED) {
err("cannot warm thread %d: state not paused\n", th->tid);
return -1;
}
if (thread_set_state(th, TH_ST_WARMING) != 0) {
err("cannot set thread %d state", th->tid);
return -1;
}
if (cpu_update(th->cpu) != 0) {
err("cpu_update failed for %s", th->cpu->name);
return -1;
}
return 0;
}
static int
pre_thread(struct emu *emu)
{
struct thread *th = emu->thread;
struct emu_ev *ev = emu->ev;
switch (ev->v) {
case 'C': /* create */
dbg("thread %d creates a new thread at cpu=%d with args=%x %x\n",
th->tid,
ev->payload->u32[0],
ev->payload->u32[1],
ev->payload->u32[2]);
break;
case 'x':
return pre_thread_execute(emu, th);
case 'e':
return pre_thread_end(th);
case 'p':
return pre_thread_pause(th);
case 'r':
return pre_thread_resume(th);
case 'c':
return pre_thread_cool(th);
case 'w':
return pre_thread_warm(th);
default:
err("unknown thread event value %c\n", ev->v);
return -1;
}
return 0;
}
static int
pre_affinity_set(struct emu *emu)
{
struct thread *th = emu->thread;
if (th->cpu == NULL) {
err("thread %d doesn't have CPU set", th->tid);
return -1;
}
if (!th->is_active) {
err("thread %d is not active", th->tid);
return -1;
}
/* Migrate current cpu to the one at phyid */
int phyid = emu->ev->payload->i32[0];
struct cpu *newcpu = loom_find_cpu(emu->loom, phyid);
if (newcpu == NULL) {
err("cannot find cpu with phyid %d", phyid);
return -1;
}
/* The CPU is already properly set, return */
if (th->cpu == newcpu)
return 0;
if (cpu_migrate_thread(th->cpu, th, newcpu) != 0) {
err("cpu_migrate_thread() failed");
return -1;
}
if (thread_migrate_cpu(th, newcpu) != 0) {
err("thread_migrate_cpu() failed");
return -1;
}
dbg("thread %d now runs in %s\n", th->tid, newcpu->name);
return 0;
}
static int
pre_affinity_remote(struct emu *emu)
{
int32_t phyid = emu->ev->payload->i32[0];
int32_t tid = emu->ev->payload->i32[1];
struct thread *remote_th = proc_find_thread(emu->proc, tid);
/* Search the thread in other processes of the loom if
* not found in the current one */
if (remote_th == NULL)
remote_th = loom_find_thread(emu->loom, tid);
if (remote_th == NULL) {
err("thread %d not found", tid);
return -1;
}
/* The remote_th cannot be in states dead or unknown */
if (remote_th->state == TH_ST_DEAD) {
err("thread %d is dead", tid);
return -1;
}
if (remote_th->state == TH_ST_UNKNOWN) {
err("thread %d in state unknown", tid);
return -1;
}
/* It must have an assigned CPU */
if (remote_th->cpu == NULL) {
err("thread %d has no CPU", tid);
return -1;
}
/* Migrate current cpu to the one at phyid */
struct cpu *newcpu = loom_find_cpu(emu->loom, phyid);
if (newcpu == NULL) {
err("cannot find CPU with phyid %d", phyid);
return -1;
}
if (cpu_migrate_thread(remote_th->cpu, remote_th, newcpu) != 0) {
err("cpu_migrate_thread() failed");
return -1;
}
if (thread_migrate_cpu(remote_th, newcpu) != 0) {
err("thread_migrate_cpu() failed");
return -1;
}
dbg("remote_th %d remotely switches to cpu %d", tid, phyid);
return 0;
}
static int
pre_affinity(struct emu *emu)
{
switch (emu->ev->v) {
case 's':
return pre_affinity_set(emu);
case 'r':
return pre_affinity_remote(emu);
default:
err("unknown affinity event value %c\n",
emu->ev->v);
// return -1
}
return 0;
}
//static int
//compare_int64(const void *a, const void *b)
//{
// int64_t aa = *(const int64_t *) a;
// int64_t bb = *(const int64_t *) b;
//
// if (aa < bb)
// return -1;
// else if (aa > bb)
// return +1;
// else
// return 0;
//}
//static int
//pre_burst(struct emu *emu)
//{
// struct thread *th = emu->thread;
//
// if (th->nbursts >= MAX_BURSTS) {
// err("too many bursts");
// return -1;
// }
//
// th->burst_time[th->nbursts++] = emu->delta_time;
// if (th->nbursts == MAX_BURSTS) {
// int n = MAX_BURSTS - 1;
// int64_t deltas[MAX_BURSTS - 1];
// for (int i = 0; i < n; i++) {
// deltas[i] = th->burst_time[i + 1] - th->burst_time[i];
// }
//
// qsort(deltas, n, sizeof(int64_t), compare_int64);
//
// double avg = 0.0;
// double maxdelta = 0;
// for (int i = 0; i < n; i++) {
// if (deltas[i] > maxdelta)
// maxdelta = deltas[i];
// avg += deltas[i];
// }
//
// avg /= (double) n;
// double median = deltas[n / 2];
//
// err("%s burst stats: median %.0f ns, avg %.1f ns, max %.0f ns\n",
// emu->cur_loom->dname, median, avg, maxdelta);
//
// th->nbursts = 0;
// }
//}
static int
pre_flush(struct emu *emu)
{
struct thread *th = emu->thread;
struct chan *flush = &th->chan[TH_CHAN_FLUSH];
switch (emu->ev->v) {
case '[':
if (chan_push(flush, value_int64(1)) != 0) {
err("chan_push failed");
return -1;
}
break;
case ']':
if (chan_pop(flush, value_int64(1)) != 0) {
err("chan_pop failed");
return -1;
}
break;
default:
err("unexpected value '%c' (expecting '[' or ']')\n",
emu->ev->v);
return -1;
}
return 0;
}
static int
process_ev(struct emu *emu)
{
if (emu->ev->m != 'O')
return -1;
switch (emu->ev->c) {
case 'H':
return pre_thread(emu);
case 'A':
return pre_affinity(emu);
// case 'B':
// pre_burst(emu);
// break;
case 'F':
return pre_flush(emu);
default:
err("unknown ovni event category %c\n",
emu->ev->c);
// return -1;
}
return 0;
}
static int
ust_probe(void *p)
{
struct emu *emu = emu_get(p);
if (emu->system.nthreads == 0)
return -1;
return 0;
}
static int
ust_event(void *ptr)
{
struct emu *emu = emu_get(ptr);
if (emu->ev->m != model_ust.model) {
err("unexpected event model %c\n", emu->ev->m);
return -1;
}
return process_ev(emu);
}
struct model_spec model_ust = {
.name = "ust",
.model = 'O',
.create = NULL,
.connect = NULL,
.event = ust_event,
.probe = ust_probe,
};

28
src/emu/model_ust.h Normal file
View File

@ -0,0 +1,28 @@
/* Copyright (c) 2023 Barcelona Supercomputing Center (BSC)
* SPDX-License-Identifier: GPL-3.0-or-later */
#ifndef MODEL_UST_H
#define MODEL_UST_H
/* The user-space thread "ust" execution model tracks the state of processes and
* threads running in the CPUs by instrumenting the threads before and after
* they are going to sleep. It just provides an approximate view of the real
* execution by the kernel. */
#include "emu_model.h"
extern struct model_spec model_ust;
#include "chan.h"
enum ust_chan_type {
UST_CHAN_FLUSH = 0,
UST_CHAN_BURST,
UST_CHAN_MAX,
};
struct ust_thread {
struct chan chan[UST_CHAN_MAX];
};
#endif /* MODEL_UST_H */

View File

@ -1,95 +0,0 @@
/* Copyright (c) 2021-2023 Barcelona Supercomputing Center (BSC)
* SPDX-License-Identifier: GPL-3.0-or-later */
#include "ovni_model.h"
#include "emu.h"
#include "mux.h"
static int
cb_is_running(struct chan *in, void *ptr)
{
struct chan *out = ptr;
struct value value;
if (chan_read(in, &value) != 0) {
err("cb_is_running: chan_read %s failed\n", in->name);
return -1;
}
if (value.type != VALUE_INT64)
die("wrong value type\n");
int st = value.i;
if (st == TH_ST_RUNNING)
value = value_int64(1);
else
value = value_int64(0);
if (chan_set(out, value) != 0) {
err("cb_is_running: chan_set %s failed\n", out->name);
return -1;
}
return 0;
}
static int
cb_is_active(struct chan *in, void *ptr)
{
struct chan *out = ptr;
struct value value;
if (chan_read(in, &value) != 0) {
err("cb_is_running: chan_read %s failed\n", in->name);
return -1;
}
if (value.type != VALUE_INT64)
die("wrong value type\n");
int st = value.i;
if (st == TH_ST_RUNNING || st == TH_ST_COOLING || st == TH_ST_WARMING)
value = value_int64(1);
else
value = value_int64(0);
if (chan_set(out, value) != 0) {
err("cb_is_running: chan_set %s failed\n", out->name);
return -1;
}
return 0;
}
static struct chan *
find_thread_chan(struct bay *bay, long th_gindex, char *name)
{
char fullname[MAX_CHAN_NAME];
sprintf(fullname, "ovni.thread%ld.%s", th_gindex, name);
return bay_find(bay, fullname);
}
static void
track_thread_state(struct bay *bay, long th_gindex)
{
struct chan *state = find_thread_chan(bay, th_gindex, "state");
struct chan *is_running = find_thread_chan(bay, th_gindex, "is_running");
struct chan *is_active = find_thread_chan(bay, th_gindex, "is_active");
if (bay_add_cb(bay, BAY_CB_DIRTY, state, cb_is_running, is_running) != 0)
die("bay_add_cb failed\n");
if (bay_add_cb(bay, BAY_CB_DIRTY, state, cb_is_active, is_active) != 0)
die("bay_add_cb failed\n");
}
int
ovni_model_connect(void *ptr)
{
struct emu *emu = emu_get(ptr);
for (size_t i = 0; i < emu->system.nthreads; i++)
track_thread_state(&emu->bay, i);
return 0;
}

View File

@ -1,110 +0,0 @@
/* Copyright (c) 2021-2023 Barcelona Supercomputing Center (BSC)
* SPDX-License-Identifier: GPL-3.0-or-later */
#include "ovni_model.h"
#include "emu.h"
#include "bay.h"
struct model_spec ovni_model_spec = {
.name = "ovni",
.model = 'O',
.create = ovni_model_create,
.connect = ovni_model_connect,
.event = ovni_model_event,
.probe = ovni_model_probe,
};
static char *th_channels[] = {
"state",
"is_running",
"is_active",
"tid_active",
"pid_active",
"cpu",
"is_flushing"
};
static char *cpu_channels[] = {
"tid_running",
"pid_running",
"nthreads_running",
"is_flushing"
};
struct pcf_value_label ovni_state_values[] = {
{ TH_ST_UNKNOWN, "Unknown" },
{ TH_ST_RUNNING, "Running" },
{ TH_ST_PAUSED, "Paused" },
{ TH_ST_DEAD, "Dead" },
{ TH_ST_COOLING, "Cooling" },
{ TH_ST_WARMING, "Warming" },
{ -1, NULL },
};
struct pcf_value_label ovni_flush_values[] = {
{ 0, "None" },
{ ST_OVNI_FLUSHING, "Flushing" },
{ ST_TOO_MANY_TH, "Unknown flushing state: Multiple threads running" },
{ -1, NULL },
};
// [CHAN_OVNI_PID] = "PID",
// [CHAN_OVNI_TID] = "TID",
// [CHAN_OVNI_NRTHREADS] = "Number of RUNNING threads",
// [CHAN_OVNI_STATE] = "Execution state",
// [CHAN_OVNI_APPID] = "AppID",
// [CHAN_OVNI_CPU] = "CPU affinity",
// [CHAN_OVNI_FLUSH] = "Flushing state",
static void
create_chan(struct bay *bay, char *group, int64_t gid, char *item)
{
char name[MAX_CHAN_NAME];
sprintf(name, "%s.%s%ld.%s", ovni_model_spec.name, group, gid, item);
struct chan *c = calloc(1, sizeof(struct chan));
if (c == NULL)
die("calloc failed\n");
chan_init(c, CHAN_SINGLE, name);
if (bay_register(bay, c) != 0)
die("bay_register failed\n");
}
static void
create_thread(struct bay *bay, int64_t gid)
{
for (size_t i = 0; i < ARRAYLEN(th_channels); i++)
create_chan(bay, "thread", gid, th_channels[i]);
}
static void
create_cpu(struct bay *bay, int64_t gid)
{
for (size_t i = 0; i < ARRAYLEN(cpu_channels); i++)
create_chan(bay, "cpu", gid, cpu_channels[i]);
}
static void
create_channels(struct emu_system *sys, struct bay *bay)
{
for (size_t i = 0; i < sys->nthreads; i++)
create_thread(bay, i);
for (size_t i = 0; i < sys->ncpus; i++)
create_cpu(bay, i);
}
int
ovni_model_create(void *p)
{
struct emu *emu = emu_get(p);
/* Get paraver traces */
//oemu->pvt_thread = pvman_new(emu->pvman, "thread");
//oemu->pvt_cpu = pvman_new(emu->pvman, "cpu");
create_channels(&emu->system, &emu->bay);
return 0;
}

View File

@ -1,516 +0,0 @@
/* Copyright (c) 2021-2023 Barcelona Supercomputing Center (BSC)
* SPDX-License-Identifier: GPL-3.0-or-later */
#include "ovni_model.h"
#include "emu.h"
/* Sets the state of the thread and updates the thread tracking channels */
static void
thread_set_state(struct emu_thread *th, enum ethread_state state)
{
/* The state must be updated when a cpu is set */
if (th->cpu == NULL)
die("thread_set_state: thread %d doesn't have a CPU\n",
th->tid);
dbg("thread_set_state: setting thread %d state %d\n",
th->tid, state);
th->state = state;
th->is_running = (state == TH_ST_RUNNING) ? 1 : 0;
th->is_active = (state == TH_ST_RUNNING
|| state == TH_ST_COOLING
|| state == TH_ST_WARMING)
? 1
: 0;
chan_set(&th->chan[CHAN_OVNI_STATE], th->state);
/* Enable or disable the thread channels that track the thread state */
for (int i = 0; i < CHAN_MAX; i++)
chan_tracking_update(&th->chan[i], th);
dbg("thread_set_state: done\n");
}
static void
cpu_update_th_stats(struct emu_cpu *cpu)
{
struct emu_thread *th = NULL;
struct emu_thread *th_running = NULL;
struct emu_thread *th_active = NULL;
int active = 0, running = 0;
DL_FOREACH(cpu->thread, th)
{
if (th->state == TH_ST_RUNNING) {
th_running = th;
running++;
th_active = th;
active++;
} else if (th->state == TH_ST_COOLING || th->state == TH_ST_WARMING) {
th_active = th;
active++;
}
}
cpu->nrunning_threads = running;
cpu->nactive_threads = active;
cpu->th_running = th_running;
cpu->th_active = th_active;
}
static void
update_cpu(struct emu_cpu *cpu)
{
dbg("updating cpu %s\n", cpu->name);
/* Update the running and active threads first */
cpu_update_th_stats(cpu);
/* From the CPU channels we only need to manually update the number of
* threads running in the CPU */
if (chan_get_st(&cpu->chan[CHAN_OVNI_NRTHREADS]) != (int) cpu->nrunning_threads)
chan_set(&cpu->chan[CHAN_OVNI_NRTHREADS], (int) cpu->nrunning_threads);
/* Update all tracking channels */
for (int i = 0; i < CHAN_MAX; i++)
emu_cpu_update_chan(cpu, &cpu->chan[i]);
dbg("updating cpu %s complete!\n", cpu->name);
}
struct emu_cpu *
emu_get_cpu(struct emu_loom *loom, int cpuid)
{
if (cpuid >= (int) loom->ncpus)
die("emu_get_cpu: CPU index out of bounds\n");
if (cpuid < 0)
return &loom->vcpu;
return &loom->cpu[cpuid];
}
static struct emu_thread *
emu_cpu_find_thread(struct emu_cpu *cpu, struct emu_thread *thread)
{
struct emu_thread *p = NULL;
DL_FOREACH(cpu->thread, p)
{
if (p == thread)
return p;
}
return NULL;
}
/* Add the given thread to the list of threads assigned to the CPU */
static void
cpu_add_thread(struct emu_cpu *cpu, struct emu_thread *thread)
{
/* Found, abort */
if (emu_cpu_find_thread(cpu, thread) != NULL) {
err("The thread %d is already assigned to %s\n",
thread->tid, cpu->name);
abort();
}
DL_APPEND(cpu->thread, thread);
cpu->nthreads++;
update_cpu(cpu);
}
static void
cpu_remove_thread(struct emu_cpu *cpu, struct emu_thread *thread)
{
struct emu_thread *p = emu_cpu_find_thread(cpu, thread);
/* Not found, abort */
if (p == NULL) {
err("cannot remove missing thread %d from cpu %s\n",
thread->tid, cpu->name);
abort();
}
DL_DELETE(cpu->thread, thread);
cpu->nthreads--;
update_cpu(cpu);
}
static void
cpu_migrate_thread(struct emu_cpu *cpu,
struct emu_thread *thread,
struct emu_cpu *newcpu)
{
cpu_remove_thread(cpu, thread);
cpu_add_thread(newcpu, thread);
}
static int
pre_thread_execute(struct emu *emu, struct emu_thread *th)
{
/* The thread cannot be already running */
if (th->state == TH_ST_RUNNING) {
err("pre_thread_execute: thread %d already running\n", th->tid);
return -1;
}
int cpuid = emu->ev->payload.i32[0];
struct emu_cpu *cpu = emu_system_get_cpu(emu->loom, cpuid);
dbg("pre_thread_execute: thread %d runs in CPU %s\n", th->tid, cpu->name);
/* First set the CPU in the thread */
thread_set_cpu(th, cpu);
/* Then set the thread to running state */
thread_set_state(th, TH_ST_RUNNING);
/* And then add the thread to the CPU, so tracking channels see the
* updated thread state */
cpu_add_thread(cpu, th);
}
//static void
//pre_thread_end(struct emu_thread *th)
//{
// if (th->state != TH_ST_RUNNING && th->state != TH_ST_COOLING)
// die("pre_thread_end: thread %d not running or cooling\n",
// th->tid);
//
// if (th->cpu == NULL)
// die("pre_thread_end: thread %d doesn't have a CPU\n",
// th->tid);
//
// /* First update the thread state */
// thread_set_state(th, TH_ST_DEAD);
//
// /* Then remove it from the cpu, so channels are properly updated */
// cpu_remove_thread(th->cpu, th);
//
// thread_unset_cpu(th);
//}
//
//static void
//pre_thread_pause(struct emu_thread *th)
//{
// if (th->state != TH_ST_RUNNING && th->state != TH_ST_COOLING)
// die("pre_thread_pause: thread %d not running or cooling\n",
// th->tid);
//
// if (th->cpu == NULL)
// die("pre_thread_pause: thread %d doesn't have a CPU\n",
// th->tid);
//
// thread_set_state(th, TH_ST_PAUSED);
// update_cpu(th->cpu);
//}
//
//static void
//pre_thread_resume(struct emu_thread *th)
//{
// if (th->state != TH_ST_PAUSED && th->state != TH_ST_WARMING)
// die("pre_thread_resume: thread %d not paused or warming\n",
// th->tid);
//
// if (th->cpu == NULL)
// die("pre_thread_resume: thread %d doesn't have a CPU\n",
// th->tid);
//
// thread_set_state(th, TH_ST_RUNNING);
// update_cpu(th->cpu);
//}
//
//static void
//pre_thread_cool(struct emu_thread *th)
//{
// if (th->state != TH_ST_RUNNING)
// die("pre_thread_cool: thread %d not running\n",
// th->tid);
//
// if (th->cpu == NULL)
// die("pre_thread_cool: thread %d doesn't have a CPU\n",
// th->tid);
//
// thread_set_state(th, TH_ST_COOLING);
// update_cpu(th->cpu);
//}
//
//static void
//pre_thread_warm(struct emu_thread *th)
//{
// if (th->state != TH_ST_PAUSED)
// die("pre_thread_warm: thread %d not paused\n",
// th->tid);
//
// if (th->cpu == NULL)
// die("pre_thread_warm: thread %d doesn't have a CPU\n",
// th->tid);
//
// thread_set_state(th, TH_ST_WARMING);
// update_cpu(th->cpu);
//}
static int
pre_thread(struct emu *emu)
{
struct emu_thread *th = emu->thread;
struct emu_ev *ev = emu->ev;
switch (ev->v) {
case 'C': /* create */
dbg("thread %d creates a new thread at cpu=%d with args=%x %x\n",
th->tid,
ev->payload->u32[0],
ev->payload->u32[1],
ev->payload->u32[2]);
break;
case 'x':
return pre_thread_execute(emu, th);
// case 'e':
// return pre_thread_end(th);
// case 'p':
// return pre_thread_pause(th);
// case 'r':
// return pre_thread_resume(th);
// case 'c':
// return pre_thread_cool(th);
// case 'w':
// return pre_thread_warm(th);
default:
err("unknown thread event value %c\n", ev->v);
// return -1;
}
return 0;
}
//static void
//pre_affinity_set(struct emu *emu)
//{
// struct emu_thread *th = emu->cur_thread;
// int cpuid = emu->ev->payload.i32[0];
//
// if (th->cpu == NULL)
// edie(emu, "pre_affinity_set: thread %d doesn't have a CPU\n",
// th->tid);
//
// if (!th->is_active)
// edie(emu, "pre_affinity_set: thread %d is not active\n",
// th->tid);
//
// /* Migrate current cpu to the one at cpuid */
// struct emu_cpu *newcpu = emu_get_cpu(emu->cur_loom, cpuid);
//
// /* The CPU is already properly set, return */
// if (th->cpu == newcpu)
// return;
//
// cpu_migrate_thread(th->cpu, th, newcpu);
// thread_migrate_cpu(th, newcpu);
//
// // dbg("cpu %d now runs %d\n", cpuid, th->tid);
//}
//
//static void
//pre_affinity_remote(struct emu *emu)
//{
// int32_t cpuid = emu->ev->payload.i32[0];
// int32_t tid = emu->ev->payload.i32[1];
//
// struct emu_thread *remote_th = emu_get_thread(emu->cur_proc, tid);
//
// if (remote_th == NULL) {
// /* Search the thread in other processes of the loom if
// * not found in the current one */
// struct emu_loom *loom = emu->cur_loom;
//
// for (size_t i = 0; i < loom->nprocs; i++) {
// struct ovni_eproc *proc = &loom->proc[i];
//
// /* Skip the current process */
// if (proc == emu->cur_proc)
// continue;
//
// remote_th = emu_get_thread(proc, tid);
//
// if (remote_th)
// break;
// }
//
// if (remote_th == NULL) {
// err("thread tid %d not found: cannot set affinity remotely\n",
// tid);
// abort();
// }
// }
//
// /* The remote_th cannot be in states dead or unknown */
// if (remote_th->state == TH_ST_DEAD)
// edie(emu, "pre_affinity_remote: remote thread %d in state DEAD\n",
// remote_th->tid);
//
// if (remote_th->state == TH_ST_UNKNOWN)
// edie(emu, "pre_affinity_remote: remote thread %d in state UNKNOWN\n",
// remote_th->tid);
//
// /* It must have an assigned CPU */
// if (remote_th->cpu == NULL)
// edie(emu, "pre_affinity_remote: remote thread %d has no CPU\n",
// remote_th->tid);
//
// /* Migrate current cpu to the one at cpuid */
// struct emu_cpu *newcpu = emu_get_cpu(emu->cur_loom, cpuid);
//
// cpu_migrate_thread(remote_th->cpu, remote_th, newcpu);
// thread_migrate_cpu(remote_th, newcpu);
//
// // dbg("remote_th %d switches to cpu %d by remote petition\n", tid,
// // cpuid);
//}
//
//static void
//pre_affinity(struct emu *emu)
//{
// // emu_emit(emu);
// switch (emu->ev->v) {
// case 's':
// pre_affinity_set(emu);
// break;
// case 'r':
// pre_affinity_remote(emu);
// break;
// default:
// dbg("unknown affinity event value %c\n",
// emu->ev->v);
// break;
// }
//}
//
//static int
//compare_int64(const void *a, const void *b)
//{
// int64_t aa = *(const int64_t *) a;
// int64_t bb = *(const int64_t *) b;
//
// if (aa < bb)
// return -1;
// else if (aa > bb)
// return +1;
// else
// return 0;
//}
//
//static void
//pre_burst(struct emu *emu)
//{
// int64_t dt = 0;
//
// UNUSED(dt);
//
// struct emu_thread *th = emu->cur_thread;
//
// if (th->nbursts >= MAX_BURSTS) {
// err("too many bursts: ignored\n");
// return;
// }
//
// th->burst_time[th->nbursts++] = emu->delta_time;
// if (th->nbursts == MAX_BURSTS) {
// int n = MAX_BURSTS - 1;
// int64_t deltas[MAX_BURSTS - 1];
// for (int i = 0; i < n; i++) {
// deltas[i] = th->burst_time[i + 1] - th->burst_time[i];
// }
//
// qsort(deltas, n, sizeof(int64_t), compare_int64);
//
// double avg = 0.0;
// double maxdelta = 0;
// for (int i = 0; i < n; i++) {
// if (deltas[i] > maxdelta)
// maxdelta = deltas[i];
// avg += deltas[i];
// }
//
// avg /= (double) n;
// double median = deltas[n / 2];
//
// err("%s burst stats: median %.0f ns, avg %.1f ns, max %.0f ns\n",
// emu->cur_loom->dname, median, avg, maxdelta);
//
// th->nbursts = 0;
// }
//}
//
//static void
//pre_flush(struct emu *emu)
//{
// struct emu_thread *th = emu->cur_thread;
// struct ovni_chan *chan_th = &th->chan[CHAN_OVNI_FLUSH];
//
// switch (emu->ev->v) {
// case '[':
// chan_push(chan_th, ST_OVNI_FLUSHING);
// break;
// case ']':
// chan_pop(chan_th, ST_OVNI_FLUSHING);
// break;
// default:
// err("unexpected value '%c' (expecting '[' or ']')\n",
// emu->ev->v);
// abort();
// }
//}
static int
hook_pre_ovni(struct emu *emu)
{
if (emu->ev->m != 'O')
return -1;
switch (emu->ev->c) {
case 'H':
return pre_thread(emu);
// case 'A':
// pre_affinity(emu);
// break;
// case 'B':
// pre_burst(emu);
// break;
// case 'F':
// pre_flush(emu);
// break;
default:
err("unknown ovni event category %c\n",
emu->ev->c);
return -1;
}
return 0;
}
int
ovni_model_event(void *ptr)
{
struct emu *emu = emu_get(ptr);
if (emu->ev->m != 'O') {
err("unexpected event model %c\n", emu->ev->m);
return -1;
}
err("got ovni event '%s'\n", emu->ev->mcv);
if (hook_pre_ovni(emu) != 0) {
err("ovni_model_event: failed to process event\n");
return -1;
}
return 0;
}

View File

@ -1,59 +0,0 @@
/* Copyright (c) 2023 Barcelona Supercomputing Center (BSC)
* SPDX-License-Identifier: GPL-3.0-or-later */
#ifndef OVNI_MODEL_H
#define OVNI_MODEL_H
/* The emulator ovni module provides the execution model by
* tracking the thread state and which threads run in each
* CPU */
/* Cannot depend on emu.h */
#include "emu_model.h"
#include "chan.h"
#include <stdint.h>
enum ovni_flushing_state {
ST_OVNI_FLUSHING = 1,
};
enum ovni_chan {
CHAN_OVNI_PID = 0,
CHAN_OVNI_TID,
CHAN_OVNI_NRTHREADS,
CHAN_OVNI_STATE,
CHAN_OVNI_APPID,
CHAN_OVNI_CPU,
CHAN_OVNI_FLUSH,
CHAN_OVNI_MAX,
};
#define MAX_BURSTS 100
struct ovni_cpu {
/* CPU channels */
struct chan chan[CHAN_OVNI_MAX];
struct chan pid_running;
struct chan tid_running;
struct chan nthreads_running;
};
struct ovni_thread {
struct chan chan[CHAN_OVNI_MAX];
struct chan cpu;
struct chan flush;
/* Burst times */
int nbursts;
int64_t burst_time[MAX_BURSTS];
};
extern struct model_spec ovni_model_spec;
int ovni_model_probe(void *ptr);
int ovni_model_create(void *ptr);
int ovni_model_connect(void *ptr);
int ovni_model_event(void *ptr);
#endif /* OVNI_MODEL_H */

View File

@ -1,672 +0,0 @@
/* Copyright (c) 2021-2022 Barcelona Supercomputing Center (BSC)
* SPDX-License-Identifier: GPL-3.0-or-later */
#define _GNU_SOURCE
#include "trace.h"
#include "ovni.h"
#include "parson.h"
#include <unistd.h>
#include <dirent.h>
#include <errno.h>
#include <fcntl.h>
#include <linux/limits.h>
#include <stdatomic.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <time.h>
#include <unistd.h>
static int
find_dir_prefix_str(const char *dirname, const char *prefix, const char **str)
{
const char *p = dirname;
/* Check the prefix */
if (strncmp(p, prefix, strlen(prefix)) != 0)
return -1;
p += strlen(prefix);
/* Find the dot */
if (*p != '.')
return -1;
p++;
if (str)
*str = p;
return 0;
}
static int
find_dir_prefix_int(const char *dirname, const char *prefix, int *num)
{
const char *p = NULL;
if (find_dir_prefix_str(dirname, prefix, &p) != 0)
return -1;
/* Convert the suffix string to a number */
*num = atoi(p);
return 0;
}
static size_t
count_dir_prefix(DIR *dir, const char *prefix)
{
struct dirent *dirent;
size_t n = 0;
while ((dirent = readdir(dir)) != NULL) {
if (find_dir_prefix_str(dirent->d_name, prefix, NULL) != 0)
continue;
n++;
}
return n;
}
static int
load_thread(struct ovni_thread *thread, struct ovni_proc *proc, int index, int tid, char *filepath)
{
static int total_threads = 0;
thread->tid = tid;
thread->index = index;
thread->gindex = total_threads++;
thread->state = TH_ST_UNKNOWN;
thread->proc = proc;
if (strlen(filepath) >= PATH_MAX) {
err("filepath too large: %s\n", filepath);
return -1;
}
strcpy(thread->tracefile, filepath);
return 0;
}
static void
load_proc_metadata(struct ovni_proc *proc, int *rank_enabled)
{
JSON_Object *meta = json_value_get_object(proc->meta);
if (meta == NULL)
die("load_proc_metadata: json_value_get_object() failed\n");
JSON_Value *appid_val = json_object_get_value(meta, "app_id");
if (appid_val == NULL)
die("process %d is missing app_id in metadata\n", proc->pid);
proc->appid = (int) json_number(appid_val);
JSON_Value *rank_val = json_object_get_value(meta, "rank");
if (rank_val != NULL) {
proc->rank = (int) json_number(rank_val);
*rank_enabled = 1;
} else {
proc->rank = -1;
}
}
static void
check_metadata_version(struct ovni_proc *proc)
{
JSON_Object *meta = json_value_get_object(proc->meta);
if (meta == NULL)
die("check_metadata_version: json_value_get_object() failed\n");
JSON_Value *version_val = json_object_get_value(meta, "version");
if (version_val == NULL) {
die("process %d is missing attribute \"version\" in metadata\n",
proc->pid);
}
int version = (int) json_number(version_val);
if (version != OVNI_METADATA_VERSION) {
die("pid %d: metadata version mismatch %d (expected %d)\n",
proc->pid, version,
OVNI_METADATA_VERSION);
}
JSON_Value *mversion_val = json_object_get_value(meta, "model_version");
if (mversion_val == NULL) {
die("process %d is missing attribute \"model_version\" in metadata\n",
proc->pid);
}
const char *mversion = json_string(mversion_val);
if (strcmp(mversion, OVNI_MODEL_VERSION) != 0) {
die("pid %d: metadata model version mismatch '%s' (expected '%s')\n",
proc->pid, mversion,
OVNI_MODEL_VERSION);
}
}
static int
compare_int(const void *a, const void *b)
{
int aa = *(const int *) a;
int bb = *(const int *) b;
if (aa < bb)
return -1;
else if (aa > bb)
return +1;
else
return 0;
}
static int
load_proc(struct ovni_proc *proc, struct ovni_loom *loom, int index, int pid, char *procdir)
{
static int total_procs = 0;
proc->pid = pid;
proc->index = index;
proc->gindex = total_procs++;
proc->loom = loom;
char path[PATH_MAX];
if (snprintf(path, PATH_MAX, "%s/%s", procdir, "metadata.json") >= PATH_MAX) {
err("snprintf: path too large: %s\n", procdir);
abort();
}
proc->meta = json_parse_file_with_comments(path);
if (proc->meta == NULL) {
err("error loading metadata from %s\n", path);
return -1;
}
check_metadata_version(proc);
/* The appid is populated from the metadata */
load_proc_metadata(proc, &loom->rank_enabled);
DIR *dir;
if ((dir = opendir(procdir)) == NULL) {
fprintf(stderr, "opendir %s failed: %s\n",
procdir, strerror(errno));
return -1;
}
proc->nthreads = count_dir_prefix(dir, "thread");
if (proc->nthreads <= 0) {
err("cannot find any thread for process %d\n",
proc->pid);
return -1;
}
proc->thread = calloc(proc->nthreads, sizeof(struct ovni_thread));
if (proc->thread == NULL) {
perror("calloc failed");
return -1;
}
int *tids;
if ((tids = calloc(proc->nthreads, sizeof(int))) == NULL) {
perror("calloc failed\n");
return -1;
}
rewinddir(dir);
for (size_t i = 0; i < proc->nthreads;) {
struct dirent *dirent = readdir(dir);
if (dirent == NULL) {
err("inconsistent: readdir returned NULL\n");
return -1;
}
if (find_dir_prefix_int(dirent->d_name, "thread", &tids[i]) != 0)
continue;
i++;
}
closedir(dir);
/* Sort threads by ascending TID */
qsort(tids, proc->nthreads, sizeof(int), compare_int);
for (size_t i = 0; i < proc->nthreads; i++) {
int tid = tids[i];
if (snprintf(path, PATH_MAX, "%s/thread.%d", procdir, tid) >= PATH_MAX) {
err("snprintf: path too large: %s\n", procdir);
abort();
}
struct ovni_thread *thread = &proc->thread[i];
if (load_thread(thread, proc, i, tid, path) != 0)
return -1;
}
free(tids);
return 0;
}
static int
load_loom(struct ovni_loom *loom, char *loomdir)
{
DIR *dir = NULL;
if ((dir = opendir(loomdir)) == NULL) {
fprintf(stderr, "opendir %s failed: %s\n",
loomdir, strerror(errno));
return -1;
}
loom->rank_enabled = 0;
loom->nprocs = count_dir_prefix(dir, "proc");
if (loom->nprocs <= 0) {
err("cannot find any process directory in loom %s\n",
loom->hostname);
return -1;
}
loom->proc = calloc(loom->nprocs, sizeof(struct ovni_proc));
if (loom->proc == NULL) {
perror("calloc failed");
return -1;
}
rewinddir(dir);
size_t i = 0;
struct dirent *dirent = NULL;
while ((dirent = readdir(dir)) != NULL) {
int pid;
if (find_dir_prefix_int(dirent->d_name, "proc", &pid) != 0)
continue;
if (i >= loom->nprocs) {
err("more process than expected\n");
abort();
}
struct ovni_proc *proc = &loom->proc[i];
if (snprintf(proc->dir, PATH_MAX, "%s/%s", loomdir, dirent->d_name) >= PATH_MAX) {
err("error: process dir name %s too long\n", dirent->d_name);
return -1;
}
if (load_proc(&loom->proc[i], loom, i, pid, proc->dir) != 0)
return -1;
i++;
}
if (i != loom->nprocs) {
err("unexpected number of processes\n");
abort();
}
closedir(dir);
/* Ensure all process have the rank, if enabled in any */
if (loom->rank_enabled) {
for (i = 0; i < loom->nprocs; i++) {
struct ovni_proc *proc = &loom->proc[i];
if (proc->rank < 0) {
die("process %d is missing the rank\n",
proc->pid);
}
}
}
return 0;
}
static int
compare_looms(const void *a, const void *b)
{
struct ovni_loom *la = (struct ovni_loom *) a;
struct ovni_loom *lb = (struct ovni_loom *) b;
return strcmp(la->dname, lb->dname);
}
static void
loom_to_host(const char *loom_name, char *host, int n)
{
int i = 0;
for (i = 0; i < n; i++) {
/* Copy until dot or end */
if (loom_name[i] != '.' && loom_name[i] != '\0')
host[i] = loom_name[i];
else
break;
}
if (i == n)
die("loom host name %s too long\n", loom_name);
host[i] = '\0';
}
int
ovni_load_trace(struct ovni_trace *trace, char *tracedir)
{
DIR *dir = NULL;
if ((dir = opendir(tracedir)) == NULL) {
err("opendir %s failed: %s\n", tracedir, strerror(errno));
return -1;
}
trace->nlooms = count_dir_prefix(dir, "loom");
if (trace->nlooms == 0) {
err("cannot find any loom in %s\n", tracedir);
return -1;
}
trace->loom = calloc(trace->nlooms, sizeof(struct ovni_loom));
if (trace->loom == NULL) {
perror("calloc failed\n");
return -1;
}
rewinddir(dir);
size_t l = 0;
struct dirent *dirent = NULL;
while ((dirent = readdir(dir)) != NULL) {
struct ovni_loom *loom = &trace->loom[l];
const char *loom_name;
if (find_dir_prefix_str(dirent->d_name, "loom", &loom_name) != 0) {
/* Ignore other files in tracedir */
continue;
}
if (l >= trace->nlooms) {
err("extra loom detected\n");
return -1;
}
/* Copy the complete loom directory name to looms */
if (snprintf(loom->dname, PATH_MAX, "%s", dirent->d_name) >= PATH_MAX) {
err("error: loom name %s too long\n", dirent->d_name);
return -1;
}
l++;
}
closedir(dir);
/* Sort the looms, so we get the hostnames in alphanumeric order */
qsort(trace->loom, trace->nlooms, sizeof(struct ovni_loom),
compare_looms);
for (size_t i = 0; i < trace->nlooms; i++) {
struct ovni_loom *loom = &trace->loom[i];
const char *name = NULL;
if (find_dir_prefix_str(loom->dname, "loom", &name) != 0) {
err("error: mismatch for loom %s\n", loom->dname);
return -1;
}
loom_to_host(name, loom->hostname, sizeof(loom->hostname));
if (snprintf(loom->path, PATH_MAX, "%s/%s",
tracedir, loom->dname)
>= PATH_MAX) {
err("error: loom path %s/%s too long\n",
tracedir, loom->dname);
return -1;
}
if (load_loom(loom, loom->path) != 0)
return -1;
}
return 0;
}
static int
check_stream_header(struct ovni_stream *stream)
{
int ret = 0;
if (stream->size < sizeof(struct ovni_stream_header)) {
err("stream %d: incomplete stream header\n",
stream->tid);
return -1;
}
struct ovni_stream_header *h =
(struct ovni_stream_header *) stream->buf;
if (memcmp(h->magic, OVNI_STREAM_MAGIC, 4) != 0) {
char magic[5];
memcpy(magic, h->magic, 4);
magic[4] = '\0';
err("stream %d: wrong stream magic '%s' (expected '%s')\n",
stream->tid, magic, OVNI_STREAM_MAGIC);
ret = -1;
}
if (h->version != OVNI_STREAM_VERSION) {
err("stream %d: stream version mismatch %u (expected %u)\n",
stream->tid, h->version, OVNI_STREAM_VERSION);
ret = -1;
}
return ret;
}
static int
load_stream_fd(struct ovni_stream *stream, int fd)
{
struct stat st;
if (fstat(fd, &st) < 0) {
perror("fstat failed");
return -1;
}
/* Error because it doesn't have the header */
if (st.st_size == 0) {
err("stream %d is empty\n", stream->tid);
return -1;
}
int prot = PROT_READ | PROT_WRITE;
stream->buf = mmap(NULL, st.st_size, prot, MAP_PRIVATE, fd, 0);
if (stream->buf == MAP_FAILED) {
perror("mmap failed");
return -1;
}
stream->size = st.st_size;
return 0;
}
static int
load_stream_buf(struct ovni_stream *stream, struct ovni_thread *thread)
{
int fd;
if ((fd = open(thread->tracefile, O_RDWR)) == -1) {
perror("open failed");
return -1;
}
if (load_stream_fd(stream, fd) != 0)
return -1;
if (check_stream_header(stream) != 0) {
err("stream %d: bad header\n", stream->tid);
return -1;
}
stream->offset = sizeof(struct ovni_stream_header);
if (stream->offset == stream->size)
stream->active = 0;
else
stream->active = 1;
/* No need to keep the fd open */
if (close(fd)) {
perror("close failed");
return -1;
}
return 0;
}
/* Populates the streams in a single array */
int
ovni_load_streams(struct ovni_trace *trace)
{
trace->nstreams = 0;
for (size_t i = 0; i < trace->nlooms; i++) {
struct ovni_loom *loom = &trace->loom[i];
for (size_t j = 0; j < loom->nprocs; j++) {
struct ovni_proc *proc = &loom->proc[j];
for (size_t k = 0; k < proc->nthreads; k++) {
trace->nstreams++;
}
}
}
trace->stream = calloc(trace->nstreams, sizeof(struct ovni_stream));
if (trace->stream == NULL) {
perror("calloc failed");
return -1;
}
err("loaded %ld streams\n", trace->nstreams);
size_t s = 0;
for (size_t i = 0; i < trace->nlooms; i++) {
struct ovni_loom *loom = &trace->loom[i];
for (size_t j = 0; j < loom->nprocs; j++) {
struct ovni_proc *proc = &loom->proc[j];
for (size_t k = 0; k < proc->nthreads; k++) {
struct ovni_thread *thread = &proc->thread[k];
struct ovni_stream *stream = &trace->stream[s++];
stream->tid = thread->tid;
stream->thread = thread;
stream->proc = proc;
stream->loom = loom;
stream->lastclock = 0;
stream->offset = 0;
stream->cur_ev = NULL;
if (load_stream_buf(stream, thread) != 0) {
err("load_stream_buf failed\n");
return -1;
}
}
}
}
return 0;
}
void
ovni_free_streams(struct ovni_trace *trace)
{
for (size_t i = 0; i < trace->nstreams; i++) {
struct ovni_stream *stream = &trace->stream[i];
if (munmap(stream->buf, stream->size) != 0)
die("munmap stream failed: %s\n", strerror(errno));
}
free(trace->stream);
}
void
ovni_free_trace(struct ovni_trace *trace)
{
for (size_t i = 0; i < trace->nlooms; i++) {
for (size_t j = 0; j < trace->loom[i].nprocs; j++) {
free(trace->loom[i].proc[j].thread);
}
free(trace->loom[i].proc);
}
free(trace->loom);
}
int
ovni_load_next_event(struct ovni_stream *stream)
{
if (stream->active == 0) {
dbg("stream is inactive, cannot load more events\n");
return -1;
}
/* Only step the offset if we have load an event */
if (stream->cur_ev != NULL)
stream->offset += ovni_ev_size(stream->cur_ev);
/* It cannot overflow, otherwise we are reading garbage */
if (stream->offset > stream->size)
die("ovni_load_next_event: stream offset exceeds size\n");
/* We have reached the end */
if (stream->offset == stream->size) {
stream->active = 0;
stream->cur_ev = NULL;
dbg("stream %d runs out of events\n", stream->tid);
return -1;
}
stream->cur_ev = (struct ovni_ev *) &stream->buf[stream->offset];
return 0;
}
int
ovni_trace_probe(const char *tracedir)
{
DIR *dir = NULL;
if ((dir = opendir(tracedir)) == NULL) {
err("trace_probe: opendir %s failed: %s\n",
tracedir, strerror(errno));
return -1;
}
int nlooms = count_dir_prefix(dir, "loom");
closedir(dir);
/* At least one loom required */
if (nlooms == 0)
return -1;
else
return 0;
}

View File

@ -1,16 +0,0 @@
/* Copyright (c) 2021 Barcelona Supercomputing Center (BSC)
* SPDX-License-Identifier: GPL-3.0-or-later */
#ifndef OVNI_TRACE_H
#define OVNI_TRACE_H
#include "ovni_model.h"
int ovni_load_next_event(struct ovni_stream *stream);
int ovni_load_trace(struct ovni_trace *trace, char *tracedir);
int ovni_load_streams(struct ovni_trace *trace);
void ovni_free_streams(struct ovni_trace *trace);
void ovni_free_trace(struct ovni_trace *trace);
int ovni_trace_probe(const char *tracedir);
#endif /* OVNI_TRACE_H */

View File

@ -3,6 +3,7 @@
#include "path.h" #include "path.h"
#include "common.h"
#include <string.h> #include <string.h>
int int
@ -13,3 +14,80 @@ path_has_prefix(const char *path, const char *prefix)
return 1; return 1;
} }
/* Counts occurences of c in path */
int
path_count(const char *path, char c)
{
int n = 0;
for (int i = 0; path[i] != '\0'; i++) {
if (path[i] == c)
n++;
}
return n;
}
/* Given the "a/b/c" path and '/' as sep, returns "b/c" */
int
path_next(const char *path, char sep, const char (**next))
{
const char *p = strchr(path, sep);
if (p == NULL) {
err("missing '%c' in path %s", sep, path);
return -1;
}
p++; /* Skip sep */
*next = p;
return 0;
}
/* Removes n components from the beginning.
*
* Examples:
*
* path="a/b/c/d" and n=2 -> "c/d"
* path="a/b/c/d" and n=3 -> "d"
*/
int
path_strip(const char *path, int n, const char (**next))
{
const char *p = path;
for (; n>0; n--) {
const char *q;
if (path_next(p, '/', &q) != 0) {
err("missing %d '/' in path %s", n, path);
return -1;
}
p = q;
}
*next = p;
return 0;
}
/* Given the "a/b/c" path 2 as n, trims the path as "a/b" */
int
path_keep(char *path, int n)
{
for (int i = 0; path[i] != '\0'; i++) {
if (path[i] == '/')
n--;
if (n == 0) {
path[i] = '\0';
return 0;
}
}
/* if we ask path=a/b n=2, is ok */
if (n == 1)
return 0;
err("missing %d components", n);
return -1;
}

View File

@ -5,5 +5,9 @@
#define PATH_H #define PATH_H
int path_has_prefix(const char *path, const char *prefix); int path_has_prefix(const char *path, const char *prefix);
int path_count(const char *path, char c);
int path_next(const char *path, char sep, const char (**next));
int path_keep(char *path, int n);
int path_strip(const char *path, int n, const char (**next));
#endif /* PATH_H */ #endif /* PATH_H */

View File

@ -7,90 +7,102 @@
#include "path.h" #include "path.h"
#include <errno.h> #include <errno.h>
static const char prefix[] = "proc.";
static int static int
set_id(struct proc *proc, const char *id) get_pid(const char *id, int *pid)
{ {
/* The id must be like "loom.123/proc.345" */ /* TODO: Store the PID the metadata.json instead */
const char *p = strchr(id, '/'); /* The id must be like "loom.host01.123/proc.345" */
if (p == NULL) { if (path_count(id, '/') != 1) {
err("proc relpath missing '/': %s", id); err("proc id can only contain one '/': %s", id);
return -1; return -1;
} }
p++; /* Skip slash */ /* Get the proc.345 part */
if (strchr(p, '/') != NULL) { const char *procname;
err("proc id contains multiple '/': %s", id); if (path_next(id, '/', &procname) != 0) {
err("cannot get proc name");
return -1; return -1;
} }
/* Ensure the prefix is ok */ /* Ensure the prefix is ok */
if (!path_has_prefix(p, prefix)) { const char prefix[] = "proc.";
if (!path_has_prefix(procname, prefix)) {
err("proc name must start with '%s': %s", prefix, id); err("proc name must start with '%s': %s", prefix, id);
return -1; return -1;
} }
if (snprintf(proc->id, PATH_MAX, "%s", id) >= PATH_MAX) { /* Get the 345 part */
err("proc id too long: %s", id); const char *pidstr;
if (path_next(procname, '.', &pidstr) != 0) {
err("cannot find proc dot in '%s'", id);
return -1; return -1;
} }
*pid = atoi(pidstr);
return 0; return 0;
} }
int int
proc_init(struct proc *proc, const char *id, int pid) proc_relpath_get_pid(const char *relpath, int *pid)
{
char id[PATH_MAX];
if (snprintf(id, PATH_MAX, "%s", relpath) >= PATH_MAX) {
err("path too long");
return -1;
}
if (path_keep(id, 2) != 0) {
err("cannot delimite proc dir");
return -1;
}
return get_pid(id, pid);
}
int
proc_init_begin(struct proc *proc, const char *relpath)
{ {
memset(proc, 0, sizeof(struct proc)); memset(proc, 0, sizeof(struct proc));
if (set_id(proc, id) != 0) { proc->gindex = -1;
err("cannot set process id");
if (snprintf(proc->id, PATH_MAX, "%s", relpath) >= PATH_MAX) {
err("path too long");
return -1; return -1;
} }
proc->pid = pid; if (path_keep(proc->id, 2) != 0) {
err("cannot delimite proc dir");
return -1;
}
if (get_pid(proc->id, &proc->pid) != 0) {
err("cannot parse proc pid");
return -1;
}
err("created proc %s", proc->id);
return 0; return 0;
} }
static int void
check_metadata_version(JSON_Object *meta) proc_set_gindex(struct proc *proc, int64_t gindex)
{ {
JSON_Value *version_val = json_object_get_value(meta, "version"); proc->gindex = gindex;
if (version_val == NULL) {
err("missing attribute \"version\"");
return -1;
} }
int version = (int) json_number(version_val); int
proc_load_metadata(struct proc *proc, JSON_Object *meta)
if (version != OVNI_METADATA_VERSION) {
err("metadata version mismatch %d (expected %d)",
version, OVNI_METADATA_VERSION);
return -1;
}
JSON_Value *mversion_val = json_object_get_value(meta, "model_version");
if (mversion_val == NULL) {
err("missing attribute \"model_version\"");
return -1;
}
const char *mversion = json_string(mversion_val);
if (strcmp(mversion, OVNI_MODEL_VERSION) != 0) {
err("model version mismatch '%s' (expected '%s')",
mversion, OVNI_MODEL_VERSION);
return -1;
}
return 0;
}
static int
load_attributes(struct proc *proc, JSON_Object *meta)
{ {
if (proc->metadata_loaded) {
err("process %s already loaded metadata", proc->id);
return -1;
}
JSON_Value *appid_val = json_object_get_value(meta, "app_id"); JSON_Value *appid_val = json_object_get_value(meta, "app_id");
if (appid_val == NULL) { if (appid_val == NULL) {
err("missing attribute 'app_id' in metadata\n"); err("missing attribute 'app_id' in metadata\n");
@ -106,22 +118,6 @@ load_attributes(struct proc *proc, JSON_Object *meta)
else else
proc->rank = -1; proc->rank = -1;
return 0;
}
int
proc_load_metadata(struct proc *proc, JSON_Object *meta)
{
if (proc->metadata_loaded) {
err("process %s already loaded metadata", proc->id);
return -1;
}
if (load_attributes(proc, meta) != 0) {
err("cannot load attributes for process %s", proc->id);
return -1;
}
proc->metadata_loaded = 1; proc->metadata_loaded = 1;
return 0; return 0;
@ -130,12 +126,67 @@ proc_load_metadata(struct proc *proc, JSON_Object *meta)
struct thread * struct thread *
proc_find_thread(struct proc *proc, int tid) proc_find_thread(struct proc *proc, int tid)
{ {
struct thread *th; struct thread *thread = NULL;
DL_FOREACH2(proc->threads, th, lnext) { HASH_FIND_INT(proc->threads, &tid, thread);
if (th->tid == tid) return thread;
return th;
} }
return NULL;
int
proc_add_thread(struct proc *proc, struct thread *thread)
{
if (proc->is_init) {
err("cannot modify threads of initialized proc");
return -1;
}
int tid = thread_get_tid(thread);
if (proc_find_thread(proc, tid) != NULL) {
err("thread with tid %d already in proc %s", tid, proc->id);
return -1;
}
HASH_ADD_INT(proc->threads, tid, thread);
proc->nthreads++;
return 0;
}
static int
by_tid(struct thread *t1, struct thread *t2)
{
int id1 = thread_get_tid(t1);
int id2 = thread_get_tid(t2);
if (id1 < id2)
return -1;
if (id1 > id2)
return +1;
else
return 0;
}
void
proc_sort(struct proc *proc)
{
HASH_SORT(proc->threads, by_tid);
}
int
proc_init_end(struct proc *proc)
{
if (proc->gindex < 0) {
err("gindex not set");
return -1;
}
if (!proc->metadata_loaded) {
err("metadata not loaded");
return -1;
}
proc->is_init = 1;
return 0;
} }
int int

View File

@ -4,15 +4,15 @@
#ifndef PROC_H #ifndef PROC_H
#define PROC_H #define PROC_H
struct proc; /* No loom dependency here */
#include "thread.h" #include "thread.h"
#include "parson.h" #include "parson.h"
#include <stddef.h> #include <stddef.h>
struct proc { struct proc {
size_t gindex; int64_t gindex;
char id[PATH_MAX]; char id[PATH_MAX];
int is_init;
int metadata_loaded; int metadata_loaded;
int pid; int pid;
@ -35,9 +35,15 @@ struct proc {
UT_hash_handle hh; /* procs in the loom */ UT_hash_handle hh; /* procs in the loom */
}; };
int proc_init(struct proc *proc, const char *id, int pid); int proc_relpath_get_pid(const char *relpath, int *pid);
int proc_init_begin(struct proc *proc, const char *id);
int proc_init_end(struct proc *proc);
int proc_get_pid(struct proc *proc); int proc_get_pid(struct proc *proc);
void proc_set_gindex(struct proc *proc, int64_t gindex);
void proc_sort(struct proc *proc);
int proc_load_metadata(struct proc *proc, JSON_Object *meta); int proc_load_metadata(struct proc *proc, JSON_Object *meta);
struct thread *proc_find_thread(struct proc *proc, int tid); struct thread *proc_find_thread(struct proc *proc, int tid);
int proc_add_thread(struct proc *proc, struct thread *thread);
void proc_sort(struct proc *proc);
#endif /* PROC_H */ #endif /* PROC_H */

File diff suppressed because it is too large Load Diff

View File

@ -14,6 +14,14 @@
#include "clkoff.h" #include "clkoff.h"
#include <stddef.h> #include <stddef.h>
/* Map from stream to lpt */
struct lpt {
struct emu_stream *stream; /* Redundancy */
struct loom *loom;
struct proc *proc;
struct thread *thread;
};
struct system { struct system {
/* Total counters */ /* Total counters */
size_t nlooms; size_t nlooms;
@ -29,11 +37,14 @@ struct system {
struct clkoff clkoff; struct clkoff clkoff;
struct emu_args *args; struct emu_args *args;
struct lpt *lpt;
//struct model_ctx ctx; //struct model_ctx ctx;
}; };
int system_init(struct system *sys, struct emu_args *args, struct emu_trace *trace); int system_init(struct system *sys, struct emu_args *args, struct emu_trace *trace);
//struct emu_thread *system_get_thread(struct emu_stream *stream); int system_connect(struct system *sys, struct bay *bay);
struct lpt *system_get_lpt(struct emu_stream *stream);
//struct emu_cpu *system_find_cpu(struct emu_loom *loom, int cpuid); //struct emu_cpu *system_find_cpu(struct emu_loom *loom, int cpuid);
//int model_ctx_set(struct model_ctx *ctx, int model, void *data); //int model_ctx_set(struct model_ctx *ctx, int model, void *data);
//int model_ctx_get(struct model_ctx *ctx, int model, void *data); //int model_ctx_get(struct model_ctx *ctx, int model, void *data);

View File

@ -3,30 +3,135 @@
#include "thread.h" #include "thread.h"
static void #include "path.h"
init_chans(struct thread *th) #include "bay.h"
static const char chan_fmt[] = "thread%lu.%s";
static const char *chan_name[] = {
[TH_CHAN_CPU] = "cpu_gindex",
[TH_CHAN_TID] = "tid_active",
[TH_CHAN_STATE] = "state",
[TH_CHAN_FLUSH] = "flush",
};
static const int chan_stack[] = {
[TH_CHAN_FLUSH] = 1,
};
static int
get_tid(const char *id, int *tid)
{ {
struct thread_chan *c = &th->chan; /* The id must be like "loom.host01.123/proc.345/thread.567" */
char prefix[128]; if (path_count(id, '/') != 2) {
err("proc id can only contain two '/': %s", id);
if (snprintf(prefix, 128, "sys.thread%lu", th->gindex) >= 128) return -1;
die("snprintf too long");
chan_init(&c->cpu_gindex, CHAN_SINGLE, "%s.cpu_gindex", prefix);
chan_init(&c->tid_active, CHAN_SINGLE, "%s.tid_active", prefix);
chan_init(&c->nth_active, CHAN_SINGLE, "%s.nth_active", prefix);
chan_init(&c->state, CHAN_SINGLE, "%s.state", prefix);
} }
void /* Get the thread.567 part */
thread_init(struct thread *thread, struct proc *proc) const char *thname;
if (path_strip(id, 2, &thname) != 0) {
err("cannot get thread name");
return -1;
}
/* Ensure the prefix is ok */
const char prefix[] = "thread.";
if (!path_has_prefix(thname, prefix)) {
err("thread name must start with '%s': %s", prefix, thname);
return -1;
}
/* Get the 567 part */
const char *tidstr;
if (path_next(thname, '.', &tidstr) != 0) {
err("cannot find thread dot in '%s'", id);
return -1;
}
*tid = atoi(tidstr);
return 0;
}
int
thread_relpath_get_tid(const char *relpath, int *tid)
{
return get_tid(relpath, tid);
}
int
thread_init_begin(struct thread *thread, const char *relpath)
{ {
memset(thread, 0, sizeof(struct thread)); memset(thread, 0, sizeof(struct thread));
thread->state = TH_ST_UNKNOWN; thread->state = TH_ST_UNKNOWN;
thread->proc = proc; thread->gindex = -1;
init_chans(thread); if (snprintf(thread->id, PATH_MAX, "%s", relpath) >= PATH_MAX) {
err("relpath too long");
return -1;
}
if (get_tid(thread->id, &thread->tid) != 0) {
err("cannot parse thread tid");
return -1;
}
return 0;
}
void
thread_set_gindex(struct thread *th, int64_t gindex)
{
th->gindex = gindex;
}
int
thread_init_end(struct thread *th)
{
if (th->gindex < 0) {
err("gindex not set");
return -1;
}
for (int i = 0; i < TH_CHAN_MAX; i++) {
enum chan_type type = CHAN_SINGLE;
if (chan_stack[i])
type = CHAN_STACK;
chan_init(&th->chan[i], type,
chan_fmt, th->gindex, chan_name[i]);
}
chan_prop_set(&th->chan[TH_CHAN_TID], CHAN_DUPLICATES, 1);
th->is_init = 1;
return 0;
}
int
thread_connect(struct thread *th, struct bay *bay)
{
if (!th->is_init) {
err("thread is not initialized");
return -1;
}
for (int i = 0; i < TH_CHAN_MAX; i++) {
if (bay_register(bay, &th->chan[i]) != 0) {
err("bay_register failed");
return -1;
}
}
return 0;
}
int
thread_get_tid(struct thread *thread)
{
return thread->tid;
} }
/* Sets the state of the thread and updates the thread tracking channels */ /* Sets the state of the thread and updates the thread tracking channels */
@ -35,7 +140,7 @@ thread_set_state(struct thread *th, enum thread_state state)
{ {
/* The state must be updated when a cpu is set */ /* The state must be updated when a cpu is set */
if (th->cpu == NULL) { if (th->cpu == NULL) {
die("thread_set_state: thread %d doesn't have a CPU\n", th->tid); die("thread %d doesn't have a CPU", th->tid);
return -1; return -1;
} }
@ -47,12 +152,22 @@ thread_set_state(struct thread *th, enum thread_state state)
? 1 ? 1
: 0; : 0;
struct thread_chan *chan = &th->chan; struct chan *st = &th->chan[TH_CHAN_STATE];
if (chan_set(&chan->state, value_int64(th->state)) != 0) { if (chan_set(st, value_int64(th->state)) != 0) {
err("thread_set_cpu: chan_set() failed"); err("thread_set_cpu: chan_set() failed");
return -1; return -1;
} }
struct value tid_active = value_null();
if (th->is_active)
tid_active = value_int64(th->tid);
if (chan_set(&th->chan[TH_CHAN_TID], tid_active) != 0) {
err("chan_set() failed");
return -1;
}
return 0; return 0;
} }
@ -72,8 +187,8 @@ thread_set_cpu(struct thread *th, struct cpu *cpu)
th->cpu = cpu; th->cpu = cpu;
/* Update cpu channel */ /* Update cpu channel */
struct thread_chan *chan = &th->chan; struct chan *c = &th->chan[TH_CHAN_CPU];
if (chan_set(&chan->cpu_gindex, value_int64(cpu->gindex)) != 0) { if (chan_set(c, value_int64(cpu->gindex)) != 0) {
err("thread_set_cpu: chan_set failed\n"); err("thread_set_cpu: chan_set failed\n");
return -1; return -1;
} }
@ -91,9 +206,9 @@ thread_unset_cpu(struct thread *th)
th->cpu = NULL; th->cpu = NULL;
struct thread_chan *chan = &th->chan; struct chan *c = &th->chan[TH_CHAN_CPU];
if (chan_set(&chan->cpu_gindex, value_null()) != 0) { if (chan_set(c, value_null()) != 0) {
err("thread_unset_cpu: chan_set failed\n"); err("thread_set_cpu: chan_set failed\n");
return -1; return -1;
} }
@ -110,9 +225,9 @@ thread_migrate_cpu(struct thread *th, struct cpu *cpu)
th->cpu = cpu; th->cpu = cpu;
struct thread_chan *chan = &th->chan; struct chan *c = &th->chan[TH_CHAN_CPU];
if (chan_set(&chan->cpu_gindex, value_int64(cpu->gindex)) != 0) { if (chan_set(c, value_int64(cpu->gindex)) != 0) {
err("thread_migrate_cpu: chan_set failed\n"); err("thread_set_cpu: chan_set failed\n");
return -1; return -1;
} }

View File

@ -4,12 +4,14 @@
#ifndef THREAD_H #ifndef THREAD_H
#define THREAD_H #define THREAD_H
struct thread; struct thread; /* Needed for cpu */
#include "cpu.h" #include "cpu.h"
#include "chan.h" #include "chan.h"
#include "emu_stream.h" #include "bay.h"
#include "uthash.h"
#include <stddef.h> #include <stddef.h>
#include <linux/limits.h>
/* Emulated thread runtime status */ /* Emulated thread runtime status */
enum thread_state { enum thread_state {
@ -21,19 +23,19 @@ enum thread_state {
TH_ST_WARMING, TH_ST_WARMING,
}; };
struct thread_chan { enum thread_chan {
struct chan cpu_gindex; TH_CHAN_CPU = 0,
struct chan tid_active; TH_CHAN_TID,
struct chan nth_active; TH_CHAN_STATE,
struct chan state; TH_CHAN_FLUSH,
TH_CHAN_MAX,
}; };
struct thread { struct thread {
size_t gindex; /* In the system */ int64_t gindex; /* In the system */
char id[PATH_MAX];
char name[PATH_MAX]; int is_init;
char path[PATH_MAX];
char relpath[PATH_MAX];
int tid; int tid;
size_t index; /* In loom */ size_t index; /* In loom */
@ -45,9 +47,6 @@ struct thread {
int is_running; int is_running;
int is_active; int is_active;
/* Stream linked to this thread */
struct emu_stream *stream;
/* Current cpu, NULL if not unique affinity */ /* Current cpu, NULL if not unique affinity */
struct cpu *cpu; struct cpu *cpu;
@ -63,15 +62,21 @@ struct thread {
struct thread *gnext; struct thread *gnext;
struct thread *gprev; struct thread *gprev;
struct thread_chan chan; struct chan chan[TH_CHAN_MAX];
//struct model_ctx ctx; //struct model_ctx ctx;
UT_hash_handle hh; /* threads in the process */
}; };
void thread_init(struct thread *thread, struct proc *proc); int thread_relpath_get_tid(const char *relpath, int *tid);
int thread_init_begin(struct thread *thread, const char *relpath);
int thread_init_end(struct thread *thread);
int thread_set_state(struct thread *th, enum thread_state state); int thread_set_state(struct thread *th, enum thread_state state);
int thread_set_cpu(struct thread *th, struct cpu *cpu); int thread_set_cpu(struct thread *th, struct cpu *cpu);
int thread_unset_cpu(struct thread *th); int thread_unset_cpu(struct thread *th);
int thread_migrate_cpu(struct thread *th, struct cpu *cpu); int thread_migrate_cpu(struct thread *th, struct cpu *cpu);
int thread_get_tid(struct thread *thread);
void thread_set_gindex(struct thread *th, int64_t gindex);
int thread_connect(struct thread *th, struct bay *bay);
#endif /* THREAD_H */ #endif /* THREAD_H */

95
src/emu/ust/connect.c Normal file
View File

@ -0,0 +1,95 @@
/* Copyright (c) 2021-2023 Barcelona Supercomputing Center (BSC)
* SPDX-License-Identifier: GPL-3.0-or-later */
#include "ust_model.h"
#include "emu.h"
//#include "mux.h"
//static int
//cb_is_running(struct chan *in, void *ptr)
//{
// struct chan *out = ptr;
// struct value value;
//
// if (chan_read(in, &value) != 0) {
// err("cb_is_running: chan_read %s failed\n", in->name);
// return -1;
// }
//
// if (value.type != VALUE_INT64)
// die("wrong value type\n");
//
// int st = value.i;
// if (st == TH_ST_RUNNING)
// value = value_int64(1);
// else
// value = value_int64(0);
//
// if (chan_set(out, value) != 0) {
// err("cb_is_running: chan_set %s failed\n", out->name);
// return -1;
// }
//
// return 0;
//}
//
//static int
//cb_is_active(struct chan *in, void *ptr)
//{
// struct chan *out = ptr;
// struct value value;
//
// if (chan_read(in, &value) != 0) {
// err("cb_is_running: chan_read %s failed\n", in->name);
// return -1;
// }
//
// if (value.type != VALUE_INT64)
// die("wrong value type\n");
//
// int st = value.i;
// if (st == TH_ST_RUNNING || st == TH_ST_COOLING || st == TH_ST_WARMING)
// value = value_int64(1);
// else
// value = value_int64(0);
//
// if (chan_set(out, value) != 0) {
// err("cb_is_running: chan_set %s failed\n", out->name);
// return -1;
// }
//
// return 0;
//}
//
//static struct chan *
//find_thread_chan(struct bay *bay, long th_gindex, char *name)
//{
// char fullname[MAX_CHAN_NAME];
// sprintf(fullname, "ovni.thread%ld.%s", th_gindex, name);
// return bay_find(bay, fullname);
//}
//
//static void
//track_thread_state(struct bay *bay, long th_gindex)
//{
// struct chan *state = find_thread_chan(bay, th_gindex, "state");
// struct chan *is_running = find_thread_chan(bay, th_gindex, "is_running");
// struct chan *is_active = find_thread_chan(bay, th_gindex, "is_active");
//
// if (bay_add_cb(bay, BAY_CB_DIRTY, state, cb_is_running, is_running) != 0)
// die("bay_add_cb failed\n");
// if (bay_add_cb(bay, BAY_CB_DIRTY, state, cb_is_active, is_active) != 0)
// die("bay_add_cb failed\n");
//}
int
ust_model_connect(void *ptr)
{
UNUSED(ptr);
//struct emu *emu = emu_get(ptr);
//for (size_t i = 0; i < emu->system.nthreads; i++)
// track_thread_state(&emu->bay, i);
return 0;
}

28
src/emu/ust/create.c Normal file
View File

@ -0,0 +1,28 @@
/* Copyright (c) 2021-2023 Barcelona Supercomputing Center (BSC)
* SPDX-License-Identifier: GPL-3.0-or-later */
#include "ust_model.h"
#include "emu.h"
struct model_spec ust_model_spec = {
.name = "ust",
.model = 'O',
.create = ust_model_create,
.connect = ust_model_connect,
.event = ust_model_event,
.probe = ust_model_probe,
};
int
ust_model_create(void *p)
{
struct emu *emu = emu_get(p);
UNUSED(emu);
/* Get paraver traces */
//oemu->pvt_thread = pvman_new(emu->pvman, "thread");
//oemu->pvt_cpu = pvman_new(emu->pvman, "cpu");
return 0;
}

447
src/emu/ust/event.c Normal file
View File

@ -0,0 +1,447 @@
/* Copyright (c) 2021-2023 Barcelona Supercomputing Center (BSC)
* SPDX-License-Identifier: GPL-3.0-or-later */
#define ENABLE_DEBUG
#include "ovni_model.h"
#include "emu.h"
#include "loom.h"
#include "common.h"
static int
pre_thread_execute(struct emu *emu, struct thread *th)
{
/* The thread cannot be already running */
if (th->state == TH_ST_RUNNING) {
err("cannot execute thread %d, is already running", th->tid);
return -1;
}
int cpuid = emu->ev->payload->i32[0];
struct cpu *cpu = loom_find_cpu(emu->loom, cpuid);
if (cpu == NULL) {
err("cannot find CPU with phyid %d in loom %s",
cpuid, emu->loom->id)
return -1;
}
dbg("thread %d runs in %s", th->tid, cpu->name);
/* First set the CPU in the thread */
thread_set_cpu(th, cpu);
/* Then set the thread to running state */
thread_set_state(th, TH_ST_RUNNING);
/* And then add the thread to the CPU, so tracking channels see the
* updated thread state */
cpu_add_thread(cpu, th);
return 0;
}
static int
pre_thread_end(struct thread *th)
{
if (th->state != TH_ST_RUNNING && th->state != TH_ST_COOLING) {
err("cannot end thread %d: state not running or cooling",
th->tid);
return -1;
}
if (thread_set_state(th, TH_ST_DEAD) != 0) {
err("cannot set thread %d state", th->tid);
return -1;
}
if (cpu_remove_thread(th->cpu, th) != 0) {
err("cannot remove thread %d from %s",
th->tid, th->cpu->name);
return -1;
}
if (thread_unset_cpu(th) != 0) {
err("cannot unset cpu from thread %d", th->tid);
return -1;
}
return 0;
}
static int
pre_thread_pause(struct thread *th)
{
if (th->state != TH_ST_RUNNING && th->state != TH_ST_COOLING) {
err("cannot pause thread %d: state not running or cooling\n",
th->tid);
return -1;
}
if (thread_set_state(th, TH_ST_PAUSED) != 0) {
err("cannot set thread %d state", th->tid);
return -1;
}
if (cpu_update(th->cpu) != 0) {
err("cpu_update failed for %s", th->cpu->name);
return -1;
}
return 0;
}
static int
pre_thread_resume(struct thread *th)
{
if (th->state != TH_ST_PAUSED && th->state != TH_ST_WARMING) {
err("cannot resume thread %d: state not paused or warming",
th->tid);
return -1;
}
if (thread_set_state(th, TH_ST_RUNNING) != 0) {
err("cannot set thread %d state", th->tid);
return -1;
}
if (cpu_update(th->cpu) != 0) {
err("cpu_update failed for %s", th->cpu->name);
return -1;
}
return 0;
}
static int
pre_thread_cool(struct thread *th)
{
if (th->state != TH_ST_RUNNING) {
err("cannot cool thread %d: state not running", th->tid);
return -1;
}
if (thread_set_state(th, TH_ST_COOLING) != 0) {
err("cannot set thread %d state", th->tid);
return -1;
}
if (cpu_update(th->cpu) != 0) {
err("cpu_update failed for %s", th->cpu->name);
return -1;
}
return 0;
}
static int
pre_thread_warm(struct thread *th)
{
if (th->state != TH_ST_PAUSED) {
err("cannot warm thread %d: state not paused\n", th->tid);
return -1;
}
if (thread_set_state(th, TH_ST_WARMING) != 0) {
err("cannot set thread %d state", th->tid);
return -1;
}
if (cpu_update(th->cpu) != 0) {
err("cpu_update failed for %s", th->cpu->name);
return -1;
}
return 0;
}
static int
pre_thread(struct emu *emu)
{
struct thread *th = emu->thread;
struct emu_ev *ev = emu->ev;
switch (ev->v) {
case 'C': /* create */
dbg("thread %d creates a new thread at cpu=%d with args=%x %x\n",
th->tid,
ev->payload->u32[0],
ev->payload->u32[1],
ev->payload->u32[2]);
break;
case 'x':
return pre_thread_execute(emu, th);
case 'e':
return pre_thread_end(th);
case 'p':
return pre_thread_pause(th);
case 'r':
return pre_thread_resume(th);
case 'c':
return pre_thread_cool(th);
case 'w':
return pre_thread_warm(th);
default:
err("unknown thread event value %c\n", ev->v);
return -1;
}
return 0;
}
static int
pre_affinity_set(struct emu *emu)
{
struct thread *th = emu->thread;
if (th->cpu == NULL) {
err("thread %d doesn't have CPU set", th->tid);
return -1;
}
if (!th->is_active) {
err("thread %d is not active", th->tid);
return -1;
}
/* Migrate current cpu to the one at phyid */
int phyid = emu->ev->payload->i32[0];
struct cpu *newcpu = loom_find_cpu(emu->loom, phyid);
if (newcpu == NULL) {
err("cannot find cpu with phyid %d", phyid);
return -1;
}
/* The CPU is already properly set, return */
if (th->cpu == newcpu)
return 0;
if (cpu_migrate_thread(th->cpu, th, newcpu) != 0) {
err("cpu_migrate_thread() failed");
return -1;
}
if (thread_migrate_cpu(th, newcpu) != 0) {
err("thread_migrate_cpu() failed");
return -1;
}
dbg("thread %d now runs in %s\n", th->tid, newcpu->name);
return 0;
}
static int
pre_affinity_remote(struct emu *emu)
{
int32_t phyid = emu->ev->payload->i32[0];
int32_t tid = emu->ev->payload->i32[1];
struct thread *remote_th = proc_find_thread(emu->proc, tid);
/* Search the thread in other processes of the loom if
* not found in the current one */
if (remote_th == NULL)
remote_th = loom_find_thread(emu->loom, tid);
if (remote_th == NULL) {
err("thread %d not found", tid);
return -1;
}
/* The remote_th cannot be in states dead or unknown */
if (remote_th->state == TH_ST_DEAD) {
err("thread %d is dead", tid);
return -1;
}
if (remote_th->state == TH_ST_UNKNOWN) {
err("thread %d in state unknown", tid);
return -1;
}
/* It must have an assigned CPU */
if (remote_th->cpu == NULL) {
err("thread %d has no CPU", tid);
return -1;
}
/* Migrate current cpu to the one at phyid */
struct cpu *newcpu = loom_find_cpu(emu->loom, phyid);
if (newcpu == NULL) {
err("cannot find CPU with phyid %d", phyid);
return -1;
}
if (cpu_migrate_thread(remote_th->cpu, remote_th, newcpu) != 0) {
err("cpu_migrate_thread() failed");
return -1;
}
if (thread_migrate_cpu(remote_th, newcpu) != 0) {
err("thread_migrate_cpu() failed");
return -1;
}
dbg("remote_th %d remotely switches to cpu %d", tid, phyid);
return 0;
}
static int
pre_affinity(struct emu *emu)
{
switch (emu->ev->v) {
case 's':
return pre_affinity_set(emu);
case 'r':
return pre_affinity_remote(emu);
default:
err("unknown affinity event value %c\n",
emu->ev->v);
// return -1
}
return 0;
}
//static int
//compare_int64(const void *a, const void *b)
//{
// int64_t aa = *(const int64_t *) a;
// int64_t bb = *(const int64_t *) b;
//
// if (aa < bb)
// return -1;
// else if (aa > bb)
// return +1;
// else
// return 0;
//}
//
//static void
//pre_burst(struct emu *emu)
//{
// int64_t dt = 0;
//
// UNUSED(dt);
//
// struct emu_thread *th = emu->cur_thread;
//
// if (th->nbursts >= MAX_BURSTS) {
// err("too many bursts: ignored\n");
// return;
// }
//
// th->burst_time[th->nbursts++] = emu->delta_time;
// if (th->nbursts == MAX_BURSTS) {
// int n = MAX_BURSTS - 1;
// int64_t deltas[MAX_BURSTS - 1];
// for (int i = 0; i < n; i++) {
// deltas[i] = th->burst_time[i + 1] - th->burst_time[i];
// }
//
// qsort(deltas, n, sizeof(int64_t), compare_int64);
//
// double avg = 0.0;
// double maxdelta = 0;
// for (int i = 0; i < n; i++) {
// if (deltas[i] > maxdelta)
// maxdelta = deltas[i];
// avg += deltas[i];
// }
//
// avg /= (double) n;
// double median = deltas[n / 2];
//
// err("%s burst stats: median %.0f ns, avg %.1f ns, max %.0f ns\n",
// emu->cur_loom->dname, median, avg, maxdelta);
//
// th->nbursts = 0;
// }
//}
//
//static void
//pre_flush(struct emu *emu)
//{
// struct emu_thread *th = emu->cur_thread;
// struct ovni_chan *chan_th = &th->chan[CHAN_OVNI_FLUSH];
//
// switch (emu->ev->v) {
// case '[':
// chan_push(chan_th, ST_OVNI_FLUSHING);
// break;
// case ']':
// chan_pop(chan_th, ST_OVNI_FLUSHING);
// break;
// default:
// err("unexpected value '%c' (expecting '[' or ']')\n",
// emu->ev->v);
// abort();
// }
//}
static int
hook_pre_ovni(struct emu *emu)
{
if (emu->ev->m != 'O')
return -1;
switch (emu->ev->c) {
case 'H':
return pre_thread(emu);
case 'A':
return pre_affinity(emu);
// case 'B':
// pre_burst(emu);
// break;
// case 'F':
// pre_flush(emu);
// break;
default:
err("unknown ovni event category %c\n",
emu->ev->c);
// return -1;
}
return 0;
}
int
ust_model_probe(void *p)
{
struct emu *emu = emu_get(p);
if (emu->system.nthreads == 0)
return -1;
return 0;
}
int
ust_model_event(void *ptr)
{
struct emu *emu = emu_get(ptr);
if (emu->ev->m != 'O') {
err("unexpected event model %c\n", emu->ev->m);
return -1;
}
err("got ovni event '%s'\n", emu->ev->mcv);
if (hook_pre_ovni(emu) != 0) {
err("ovni_model_event: failed to process event\n");
return -1;
}
return 0;
}
struct model_spec ust_model_spec = {
.name = "ust",
.model = 'O',
.create = NULL,
.connect = NULL,
.event = ust_model_event,
.probe = ust_model_probe,
};

View File

@ -10,5 +10,8 @@ ovni_model_probe(void *ptr)
{ {
struct emu *emu = emu_get(ptr); struct emu *emu = emu_get(ptr);
return emu->system.nthreads > 0; if (emu->system.nthreads == 0)
return -1;
return 0;
} }

16
src/emu/ust/ust_model.h Normal file
View File

@ -0,0 +1,16 @@
/* Copyright (c) 2023 Barcelona Supercomputing Center (BSC)
* SPDX-License-Identifier: GPL-3.0-or-later */
#ifndef UST_MODEL_H
#define UST_MODEL_H
/* The user-space thread "ust" execution model tracks the state of processes and
* threads running in the CPUs by instrumenting the threads before and after
* they are going to sleep. It just provides an approximate view of the real
* execution by the kernel. */
#include "emu_model.h"
extern struct model_spec ust_model_spec;
#endif /* UST_MODEL_H */

View File

@ -15,12 +15,12 @@ version_parse(const char *version, int tuple[3])
char buf[64]; char buf[64];
if (version == NULL) { if (version == NULL) {
err("parse_version: version is NULL\n"); err("version is NULL");
return -1; return -1;
} }
if (strlen(version) >= 64) { if (strlen(version) >= 64) {
err("parse_version: version too long: %s\n", version); err("version too long: %s", version);
return -1; return -1;
} }
@ -38,7 +38,7 @@ version_parse(const char *version, int tuple[3])
str = NULL; str = NULL;
if (num == NULL) { if (num == NULL) {
err("parse_version: missing %s number: %s\n", err("missing %s number: %s",
which[i], version); which[i], version);
return -1; return -1;
} }
@ -48,13 +48,13 @@ version_parse(const char *version, int tuple[3])
int v = (int) strtol(num, &endptr, 10); int v = (int) strtol(num, &endptr, 10);
if (errno != 0 || endptr == num || endptr[0] != '\0') { if (errno != 0 || endptr == num || endptr[0] != '\0') {
err("parse_version: failed to parse %s number: %s\n", err("failed to parse %s number: %s",
which[i], version); which[i], version);
return -1; return -1;
} }
if (v < 0) { if (v < 0) {
err("parse_version: invalid negative %s number: %s\n", err("invalid negative %s number: %s",
which[i], version); which[i], version);
return -1; return -1;
} }

View File

@ -1,7 +1,7 @@
# Copyright (c) 2022 Barcelona Supercomputing Center (BSC) # Copyright (c) 2022 Barcelona Supercomputing Center (BSC)
# SPDX-License-Identifier: GPL-3.0-or-later # SPDX-License-Identifier: GPL-3.0-or-later
#unit_test(version.c) unit_test(version.c)
#unit_test(task.c) #unit_test(task.c)
#unit_test(taskstack.c) #unit_test(taskstack.c)
#unit_test(taskstack-bad.c) #unit_test(taskstack-bad.c)

View File

@ -19,7 +19,7 @@ int main(void)
int ret = 0; int ret = 0;
while ((ret = emu_step(&emu)) == 0) { while ((ret = emu_step(&emu)) == 0) {
err("event clock %ld\n", emu.player.deltaclock); //err("event clock %ld\n", emu.player.deltaclock);
} }
if (ret < 0) if (ret < 0)

View File

@ -41,7 +41,7 @@ test_negative_cpu(struct loom *loom)
die("loom_init_begin failed"); die("loom_init_begin failed");
struct cpu cpu; struct cpu cpu;
cpu_init(&cpu, -1); cpu_init_begin(&cpu, -1);
if (loom_add_cpu(loom, &cpu) == 0) if (loom_add_cpu(loom, &cpu) == 0)
die("loom_add_cpu didn't fail"); die("loom_add_cpu didn't fail");
@ -56,7 +56,7 @@ test_duplicate_cpus(struct loom *loom)
die("loom_init_begin failed"); die("loom_init_begin failed");
struct cpu cpu; struct cpu cpu;
cpu_init(&cpu, 123); cpu_init_begin(&cpu, 123);
if (loom_add_cpu(loom, &cpu) != 0) if (loom_add_cpu(loom, &cpu) != 0)
die("loom_add_cpu failed"); die("loom_add_cpu failed");
@ -66,42 +66,42 @@ test_duplicate_cpus(struct loom *loom)
err("ok"); err("ok");
} }
static void //static void
test_sort_cpus(struct loom *loom) //test_sort_cpus(struct loom *loom)
{ //{
int ncpus = 10; // int ncpus = 10;
//
if (loom_init_begin(loom, testloom) != 0) // if (loom_init_begin(loom, testloom) != 0)
die("loom_init_begin failed"); // die("loom_init_begin failed");
//
for (int i = 0; i < ncpus; i++) { // for (int i = 0; i < ncpus; i++) {
int phyid = 1000 - i * i; // int phyid = 1000 - i * i;
struct cpu *cpu = malloc(sizeof(struct cpu)); // struct cpu *cpu = malloc(sizeof(struct cpu));
if (cpu == NULL) // if (cpu == NULL)
die("malloc failed:"); // die("malloc failed:");
//
cpu_init(cpu, phyid); // cpu_init(cpu, phyid);
if (loom_add_cpu(loom, cpu) != 0) // if (loom_add_cpu(loom, cpu) != 0)
die("loom_add_cpu failed"); // die("loom_add_cpu failed");
} // }
//
if (loom_init_end(loom) != 0) // if (loom_init_end(loom) != 0)
die("loom_init_end failed"); // die("loom_init_end failed");
//
if (loom->ncpus != (size_t) ncpus) // if (loom->ncpus != (size_t) ncpus)
die("ncpus mismatch"); // die("ncpus mismatch");
//
struct cpu *cpu = NULL; // struct cpu *cpu = NULL;
int lastphyid = -1; // int lastphyid = -1;
DL_FOREACH2(loom->scpus, cpu, lnext) { // DL_FOREACH2(loom->scpus, cpu, lnext) {
int phyid = cpu_get_phyid(cpu); // int phyid = cpu_get_phyid(cpu);
if (lastphyid >= phyid) // if (lastphyid >= phyid)
die("unsorted scpus"); // die("unsorted scpus");
lastphyid = phyid; // lastphyid = phyid;
} // }
//
err("ok"); // err("ok");
} //}
static void static void
test_duplicate_procs(struct loom *loom) test_duplicate_procs(struct loom *loom)
@ -110,7 +110,7 @@ test_duplicate_procs(struct loom *loom)
die("loom_init_begin failed"); die("loom_init_begin failed");
struct proc proc; struct proc proc;
proc_init(&proc, testproc, 1); proc_init_begin(&proc, testproc);
if (loom_add_proc(loom, &proc) != 0) if (loom_add_proc(loom, &proc) != 0)
die("loom_add_proc failed"); die("loom_add_proc failed");
@ -127,7 +127,7 @@ int main(void)
test_bad_name(&loom); test_bad_name(&loom);
test_negative_cpu(&loom); test_negative_cpu(&loom);
test_duplicate_cpus(&loom); test_duplicate_cpus(&loom);
test_sort_cpus(&loom); //test_sort_cpus(&loom);
test_duplicate_procs(&loom); test_duplicate_procs(&loom);
return 0; return 0;

View File

@ -42,5 +42,7 @@ int main(void)
} }
} }
err("ok");
return 0; return 0;
} }