ovni/src/ovni.c

653 lines
14 KiB
C
Raw Normal View History

2022-09-19 12:39:02 +02:00
/* Copyright (c) 2021-2022 Barcelona Supercomputing Center (BSC)
* SPDX-License-Identifier: MIT */
2021-10-26 18:42:41 +02:00
#define _GNU_SOURCE
#include <dirent.h>
2021-07-19 15:11:41 +02:00
#include <errno.h>
#include <fcntl.h>
#include <stdint.h>
#include <stdio.h>
2021-07-30 20:08:40 +02:00
#include <sys/mman.h>
2021-07-22 12:35:02 +02:00
#include <sys/stat.h>
#include <time.h>
#include <unistd.h>
2021-07-19 15:11:41 +02:00
#include "common.h"
2022-01-11 15:47:17 +01:00
#include "compat.h"
#include "ovni.h"
#include "parson.h"
2021-09-28 19:21:22 +02:00
2021-07-19 15:11:41 +02:00
/* Data per process */
struct ovni_rproc rproc = {0};
2021-07-19 15:11:41 +02:00
/* Data per thread */
_Thread_local struct ovni_rthread rthread = {0};
2021-07-19 15:11:41 +02:00
static void
create_trace_stream(void)
2021-07-19 15:11:41 +02:00
{
char path[PATH_MAX];
int written = snprintf(path, PATH_MAX, "%s/thread.%d",
2022-09-30 10:25:09 +02:00
rproc.procdir, rthread.tid);
if (written >= PATH_MAX)
die("thread trace path too long: %s/thread.%d\n",
2022-09-30 10:25:09 +02:00
rproc.procdir, rthread.tid);
2021-07-22 12:35:02 +02:00
rthread.streamfd = open(path, O_WRONLY | O_CREAT, 0644);
if (rthread.streamfd == -1)
die("open %s failed: %s\n", path, strerror(errno));
2021-07-19 15:11:41 +02:00
}
static void
proc_metadata_init(struct ovni_rproc *proc)
{
proc->meta = json_value_init_object();
if (proc->meta == NULL)
die("failed to create metadata JSON object\n");
}
static void
proc_metadata_store(JSON_Value *meta, const char *procdir)
{
char path[PATH_MAX];
if (meta == NULL)
die("process metadata not initialized\n");
if (snprintf(path, PATH_MAX, "%s/metadata.json", procdir) >= PATH_MAX)
die("metadata path too long: %s/metadata.json\n",
2022-09-30 10:25:09 +02:00
procdir);
if (json_serialize_to_file_pretty(meta, path) != JSONSuccess)
die("failed to write process metadata\n");
}
void
ovni_add_cpu(int index, int phyid)
{
if (index < 0)
die("ovni_add_cpu: cannot use negative index %d\n", index);
if (phyid < 0)
die("ovni_add_cpu: cannot use negative CPU id %d\n", phyid);
if (!rproc.ready)
die("ovni_add_cpu: process not yet initialized\n");
if (rproc.meta == NULL)
die("ovni_add_cpu: metadata not initialized\n");
JSON_Object *meta = json_value_get_object(rproc.meta);
if (meta == NULL)
die("ovni_add_cpu: json_value_get_object() failed\n");
int first_time = 0;
/* Find the CPU array and create it if needed */
JSON_Array *cpuarray = json_object_dotget_array(meta, "cpus");
if (cpuarray == NULL) {
JSON_Value *value = json_value_init_array();
if (value == NULL)
die("ovni_add_cpu: json_value_init_array() failed\n");
cpuarray = json_array(value);
if (cpuarray == NULL)
die("ovni_add_cpu: json_array() failed\n");
first_time = 1;
}
JSON_Value *valcpu = json_value_init_object();
if (valcpu == NULL)
die("ovni_add_cpu: json_value_init_object() failed\n");
JSON_Object *cpu = json_object(valcpu);
if (cpu == NULL)
die("ovni_add_cpu: json_object() failed\n");
if (json_object_set_number(cpu, "index", index) != 0)
die("ovni_add_cpu: json_object_set_number() failed\n");
if (json_object_set_number(cpu, "phyid", phyid) != 0)
die("ovni_add_cpu: json_object_set_number() failed\n");
if (json_array_append_value(cpuarray, valcpu) != 0)
die("ovni_add_cpu: json_array_append_value() failed\n");
if (first_time) {
JSON_Value *value = json_array_get_wrapping_value(cpuarray);
if (value == NULL)
die("ovni_add_cpu: json_array_get_wrapping_value() failed\n");
if (json_object_set_value(meta, "cpus", value) != 0)
die("ovni_add_cpu: json_object_set_value failed\n");
}
}
static void
proc_set_app(int appid)
{
JSON_Object *meta = json_value_get_object(rproc.meta);
if (meta == NULL)
die("json_value_get_object failed\n");
if (json_object_set_number(meta, "app_id", appid) != 0)
die("json_object_set_number for app_id failed\n");
}
2022-06-07 11:00:15 +02:00
static void
proc_set_version(void)
{
JSON_Object *meta = json_value_get_object(rproc.meta);
if (meta == NULL)
2022-06-07 11:00:15 +02:00
die("json_value_get_object failed\n");
if (json_object_set_number(meta, "version", OVNI_METADATA_VERSION) != 0)
2022-07-27 18:22:13 +02:00
die("json_object_set_number for version failed\n");
if (json_object_set_string(meta, "model_version", OVNI_MODEL_VERSION) != 0)
2022-07-27 18:22:13 +02:00
die("json_object_set_string for model_version failed\n");
2022-06-07 11:00:15 +02:00
}
2021-12-10 18:20:31 +01:00
void
ovni_proc_set_rank(int rank, int nranks)
{
if (!rproc.ready)
2021-12-10 18:20:31 +01:00
die("ovni_proc_set_rank: process not yet initialized\n");
JSON_Object *meta = json_value_get_object(rproc.meta);
if (meta == NULL)
2021-12-10 18:20:31 +01:00
die("json_value_get_object failed\n");
if (json_object_set_number(meta, "rank", rank) != 0)
2021-12-10 18:20:31 +01:00
die("json_object_set_number for rank failed\n");
if (json_object_set_number(meta, "nranks", nranks) != 0)
2021-12-10 18:20:31 +01:00
die("json_object_set_number for nranks failed\n");
}
/* Create $tracedir/loom.$loom/proc.$pid and return it in path. */
static void
mkdir_proc(char *path, const char *tracedir, const char *loom, int pid)
{
snprintf(path, PATH_MAX, "%s", tracedir);
/* May fail if another loom created the directory already */
mkdir(path, 0755);
snprintf(path, PATH_MAX, "%s/loom.%s", tracedir, loom);
/* Also may fail */
mkdir(path, 0755);
snprintf(path, PATH_MAX, "%s/loom.%s/proc.%d", tracedir, loom, pid);
/* But this one shall not fail */
if (mkdir(path, 0755))
die("mkdir %s failed: %s\n", path, strerror(errno));
}
static void
create_proc_dir(const char *loom, int pid)
{
char *tmpdir = getenv("OVNI_TMPDIR");
if (tmpdir != NULL) {
rproc.move_to_final = 1;
mkdir_proc(rproc.procdir, tmpdir, loom, pid);
mkdir_proc(rproc.procdir_final, OVNI_TRACEDIR, loom, pid);
} else {
rproc.move_to_final = 0;
mkdir_proc(rproc.procdir, OVNI_TRACEDIR, loom, pid);
}
}
void
ovni_proc_init(int app, const char *loom, int pid)
2021-07-19 15:11:41 +02:00
{
if (rproc.ready)
die("ovni_proc_init: pid %d already initialized\n", pid);
2021-07-19 19:05:26 +02:00
memset(&rproc, 0, sizeof(rproc));
2021-07-19 15:11:41 +02:00
if (strlen(loom) >= OVNI_MAX_HOSTNAME)
die("ovni_proc_init: loom name too long: %s\n", loom);
strcpy(rproc.loom, loom);
rproc.pid = pid;
rproc.app = app;
2021-07-19 19:05:26 +02:00
rproc.clockid = CLOCK_MONOTONIC;
2021-07-19 15:11:41 +02:00
create_proc_dir(loom, pid);
proc_metadata_init(&rproc);
rproc.ready = 1;
2021-07-19 15:11:41 +02:00
2022-06-07 11:00:15 +02:00
proc_set_version();
proc_set_app(app);
}
static int
move_thread_to_final(const char *src, const char *dst)
{
char buffer[1024];
FILE *infile = fopen(src, "r");
if (infile == NULL) {
err("fopen(%s) failed: %s\n", src, strerror(errno));
return -1;
}
FILE *outfile = fopen(dst, "w");
if (outfile == NULL) {
err("fopen(%s) failed: %s\n", src, strerror(errno));
return -1;
}
size_t bytes;
while ((bytes = fread(buffer, 1, sizeof(buffer), infile)) > 0)
fwrite(buffer, 1, bytes, outfile);
fclose(outfile);
fclose(infile);
if (remove(src) != 0) {
err("remove(%s) failed: %s\n", src, strerror(errno));
return -1;
}
return 0;
}
static void
move_procdir_to_final(const char *procdir, const char *procdir_final)
{
DIR *dir;
int err = 0;
if ((dir = opendir(procdir)) == NULL) {
err("opendir %s failed: %s\n", procdir, strerror(errno));
return;
}
struct dirent *dirent;
const char *prefix = "thread.";
while ((dirent = readdir(dir)) != NULL) {
/* It should only contain thread.* directories, skip others */
if (strncmp(dirent->d_name, prefix, strlen(prefix)) != 0)
continue;
char thread[PATH_MAX];
if (snprintf(thread, PATH_MAX, "%s/%s", procdir,
2022-09-30 10:25:09 +02:00
dirent->d_name)
>= PATH_MAX) {
err("snprintf: path too large: %s/%s\n", procdir,
2022-09-30 10:25:09 +02:00
dirent->d_name);
err = 1;
continue;
}
char thread_final[PATH_MAX];
if (snprintf(thread_final, PATH_MAX, "%s/%s", procdir_final,
2022-09-30 10:25:09 +02:00
dirent->d_name)
>= PATH_MAX) {
err("snprintf: path too large: %s/%s\n", procdir_final,
2022-09-30 10:25:09 +02:00
dirent->d_name);
err = 1;
continue;
}
if (move_thread_to_final(thread, thread_final) != 0)
err = 1;
}
closedir(dir);
if (rmdir(procdir) != 0) {
err("rmdir(%s) failed: %s\n", procdir, strerror(errno));
err = 1;
}
/* Warn the user, but we cannot do much at this point */
if (err)
err("errors occurred when moving the trace to %s\n", procdir_final);
}
void
ovni_proc_fini(void)
{
if (!rproc.ready)
die("ovni_proc_fini: process not initialized\n");
/* Mark the process no longer ready */
rproc.ready = 0;
if (rproc.move_to_final) {
proc_metadata_store(rproc.meta, rproc.procdir_final);
move_procdir_to_final(rproc.procdir, rproc.procdir_final);
} else {
proc_metadata_store(rproc.meta, rproc.procdir);
}
2021-07-19 19:05:26 +02:00
}
2022-07-26 19:04:08 +02:00
static void
write_evbuf(uint8_t *buf, size_t size)
{
do {
2022-07-26 19:04:08 +02:00
ssize_t written = write(rthread.streamfd, buf, size);
if (written < 0)
2022-07-26 19:04:08 +02:00
die("failed to write buffer to disk: %s\n", strerror(errno));
size -= written;
buf += written;
} while (size > 0);
2022-07-26 19:04:08 +02:00
}
static void
flush_evbuf(void)
{
write_evbuf(rthread.evbuf, rthread.evlen);
rthread.evlen = 0;
}
static void
write_stream_header(void)
{
struct ovni_stream_header *h =
2022-09-30 10:25:09 +02:00
(struct ovni_stream_header *) rthread.evbuf;
2022-07-26 19:04:08 +02:00
memcpy(h->magic, OVNI_STREAM_MAGIC, 4);
h->version = OVNI_STREAM_VERSION;
rthread.evlen = sizeof(struct ovni_stream_header);
flush_evbuf();
}
void
2021-07-19 19:05:26 +02:00
ovni_thread_init(pid_t tid)
{
if (rthread.ready) {
err("warning: thread %d already initialized, ignored\n", tid);
return;
}
if (tid == 0)
die("ovni_thread_init: cannot use tid=%d\n", tid);
if (!rproc.ready)
die("ovni_thread_init: process not yet initialized\n");
2021-07-19 19:05:26 +02:00
memset(&rthread, 0, sizeof(rthread));
rthread.tid = tid;
rthread.evlen = 0;
rthread.evbuf = malloc(OVNI_MAX_EV_BUF);
if (rthread.evbuf == NULL)
die("ovni_thread_init: malloc failed: %s", strerror(errno));
2021-07-19 19:05:26 +02:00
create_trace_stream();
2022-07-26 19:04:08 +02:00
write_stream_header();
rthread.ready = 1;
2021-07-19 15:11:41 +02:00
}
void
ovni_thread_free(void)
{
if (!rthread.ready)
die("ovni_thread_free: thread not initialized\n");
free(rthread.evbuf);
}
int
ovni_thread_isready(void)
{
return rthread.ready;
}
#ifdef USE_TSC
static inline uint64_t
clock_tsc_now(void)
{
uint32_t lo, hi;
/* RDTSC copies contents of 64-bit TSC into EDX:EAX */
__asm__ volatile("rdtsc"
: "=a"(lo), "=d"(hi));
return (uint64_t) hi << 32 | lo;
}
#endif
static uint64_t
clock_monotonic_now(void)
2021-07-19 15:11:41 +02:00
{
uint64_t ns = 1000ULL * 1000ULL * 1000ULL;
2021-07-19 15:11:41 +02:00
struct timespec tp;
if (clock_gettime(rproc.clockid, &tp))
die("clock_gettime() failed: %s\n", strerror(errno));
2021-07-19 15:11:41 +02:00
return tp.tv_sec * ns + tp.tv_nsec;
}
uint64_t
ovni_clock_now(void)
2021-07-19 19:05:26 +02:00
{
#ifdef USE_TSC
return clock_tsc_now();
#else
return clock_monotonic_now();
#endif
}
void
ovni_ev_set_clock(struct ovni_ev *ev, uint64_t clock)
{
ev->header.clock = clock;
}
uint64_t
ovni_ev_get_clock(const struct ovni_ev *ev)
{
return ev->header.clock;
}
void
ovni_ev_set_mcv(struct ovni_ev *ev, const char *mcv)
{
2021-07-30 20:08:40 +02:00
ev->header.model = mcv[0];
ev->header.category = mcv[1];
2021-07-30 20:08:40 +02:00
ev->header.value = mcv[2];
}
static size_t
get_jumbo_payload_size(const struct ovni_ev *ev)
2021-07-30 20:08:40 +02:00
{
return sizeof(ev->payload.jumbo.size) + ev->payload.jumbo.size;
}
2021-07-22 12:35:02 +02:00
int
ovni_payload_size(const struct ovni_ev *ev)
{
if (ev->header.flags & OVNI_EV_JUMBO)
2021-07-30 20:08:40 +02:00
return get_jumbo_payload_size(ev);
int size = ev->header.flags & 0x0f;
2021-07-28 11:56:35 +02:00
if (size == 0)
2021-07-28 11:56:35 +02:00
return 0;
/* The minimum size is 2 bytes, so we can encode a length of 16
* bytes using 4 bits (0x0f) */
size++;
return size;
}
void
ovni_payload_add(struct ovni_ev *ev, const uint8_t *buf, int size)
{
if (ev->header.flags & OVNI_EV_JUMBO)
die("ovni_payload_add: event is marked as jumbo\n");
2021-07-28 11:56:35 +02:00
if (size < 2)
die("ovni_payload_add: payload size %d too small\n", size);
2021-07-30 20:08:40 +02:00
size_t payload_size = ovni_payload_size(ev);
2021-07-28 11:56:35 +02:00
/* Ensure we have room */
if (payload_size + size > sizeof(ev->payload))
die("ovni_payload_add: no space left for %d bytes\n", size);
2021-07-28 11:56:35 +02:00
memcpy(&ev->payload.u8[payload_size], buf, size);
payload_size += size;
ev->header.flags = (ev->header.flags & 0xf0) | ((payload_size - 1) & 0x0f);
}
int
ovni_ev_size(const struct ovni_ev *ev)
{
2021-07-30 20:08:40 +02:00
return sizeof(ev->header) + ovni_payload_size(ev);
}
static void
2021-07-30 20:08:40 +02:00
ovni_ev_add(struct ovni_ev *ev);
void
ovni_flush(void)
2021-07-22 12:35:02 +02:00
{
struct ovni_ev pre = {0}, post = {0};
2021-07-22 12:35:02 +02:00
if (!rthread.ready)
die("ovni_flush: thread is not initialized\n");
if (!rproc.ready)
die("ovni_flush: process is not initialized\n");
2021-07-22 12:35:02 +02:00
ovni_ev_set_clock(&pre, ovni_clock_now());
ovni_ev_set_mcv(&pre, "OF[");
2021-07-22 12:35:02 +02:00
flush_evbuf();
2021-07-22 12:35:02 +02:00
ovni_ev_set_clock(&post, ovni_clock_now());
ovni_ev_set_mcv(&post, "OF]");
2021-07-22 12:35:02 +02:00
/* Add the two flush events */
ovni_ev_add(&pre);
ovni_ev_add(&post);
2021-07-22 12:35:02 +02:00
}
static void
add_flush_events(uint64_t t0, uint64_t t1)
{
struct ovni_ev pre = {0}, post = {0};
pre.header.clock = t0;
ovni_ev_set_mcv(&pre, "OF[");
post.header.clock = t1;
ovni_ev_set_mcv(&post, "OF]");
/* Add the two flush events */
ovni_ev_add(&pre);
ovni_ev_add(&post);
}
static void
ovni_ev_add_jumbo(struct ovni_ev *ev, const uint8_t *buf, uint32_t bufsize)
2021-07-30 20:08:40 +02:00
{
int flushed = 0;
uint64_t t0, t1;
2021-07-30 20:08:40 +02:00
if (ovni_payload_size(ev) != 0)
die("ovni_ev_add_jumbo: the event payload must be empty\n");
2021-07-30 20:08:40 +02:00
ovni_payload_add(ev, (uint8_t *) &bufsize, sizeof(bufsize));
size_t evsize = ovni_ev_size(ev);
size_t totalsize = evsize + bufsize;
2021-07-30 20:08:40 +02:00
if (totalsize >= OVNI_MAX_EV_BUF)
die("ovni_ev_add_jumbo: event too large\n");
2021-07-30 20:08:40 +02:00
/* Check if the event fits or flush first otherwise */
if (rthread.evlen + totalsize >= OVNI_MAX_EV_BUF) {
/* Measure the flush times */
t0 = ovni_clock_now();
flush_evbuf();
t1 = ovni_clock_now();
flushed = 1;
}
2021-07-30 20:08:40 +02:00
2021-12-09 16:42:12 +01:00
/* Set the jumbo flag here, so we capture the previous evsize
2021-07-30 20:08:40 +02:00
* properly, ignoring the jumbo buffer */
ev->header.flags |= OVNI_EV_JUMBO;
memcpy(&rthread.evbuf[rthread.evlen], ev, evsize);
rthread.evlen += evsize;
memcpy(&rthread.evbuf[rthread.evlen], buf, bufsize);
rthread.evlen += bufsize;
if (flushed) {
/* Emit the flush events *after* the user event */
add_flush_events(t0, t1);
}
}
2021-07-30 20:08:40 +02:00
static void
ovni_ev_add(struct ovni_ev *ev)
{
int flushed = 0;
uint64_t t0, t1;
2021-07-30 20:08:40 +02:00
int size = ovni_ev_size(ev);
2021-07-30 20:08:40 +02:00
/* Check if the event fits or flush first otherwise */
if (rthread.evlen + size >= OVNI_MAX_EV_BUF) {
/* Measure the flush times */
t0 = ovni_clock_now();
flush_evbuf();
t1 = ovni_clock_now();
flushed = 1;
}
2021-07-30 20:08:40 +02:00
memcpy(&rthread.evbuf[rthread.evlen], ev, size);
rthread.evlen += size;
if (flushed) {
/* Emit the flush events *after* the user event */
add_flush_events(t0, t1);
}
2021-07-30 20:08:40 +02:00
}
void
ovni_ev_jumbo_emit(struct ovni_ev *ev, const uint8_t *buf, uint32_t bufsize)
2021-07-30 20:08:40 +02:00
{
ovni_ev_add_jumbo(ev, buf, bufsize);
}
void
ovni_ev_emit(struct ovni_ev *ev)
2021-07-30 20:08:40 +02:00
{
ovni_ev_add(ev);
}