ovni/ovni.c
Rodrigo Arias Mallo 755f2e84f3 Use 64 bit clock to prevent overflows
Large offsets between nodes may overflow the previous 48 bit clock
leading to an incorrect reconstructed clock value after the correction
with the 64 bit offset.
2021-08-11 11:50:07 +02:00

928 lines
16 KiB
C

#define _GNU_SOURCE
#include <unistd.h>
#include <stdio.h>
#include <stdint.h>
#include <stdlib.h>
#include <time.h>
#include <string.h>
#include <linux/limits.h>
#include <errno.h>
#include <sys/stat.h>
#include <sys/syscall.h>
#include <sys/mman.h>
#include <stdatomic.h>
#include <assert.h>
#include <unistd.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <dirent.h>
#include "ovni.h"
#include "ovni_trace.h"
#include "parson.h"
//#define ENABLE_SLOW_CHECKS
//#define USE_RDTSC
/* Data per process */
struct ovni_rproc rproc = {0};
/* Data per thread */
_Thread_local struct ovni_rthread rthread = {0};
static int
create_trace_dirs(char *tracedir, char *loom, int proc)
{
char path[PATH_MAX];
fprintf(stderr, "create trace dirs for loom=%s, proc=%d\n",
loom, proc);
snprintf(path, PATH_MAX, "%s", tracedir);
/* May fail if another loom created the directory already */
mkdir(path, 0755);
snprintf(path, PATH_MAX, "%s/loom.%s", tracedir, loom);
/* Also may fail */
mkdir(path, 0755);
snprintf(rproc.dir, PATH_MAX, "%s/loom.%s/proc.%d", tracedir, loom, proc);
/* But this one shall not fail */
if(mkdir(rproc.dir, 0755))
{
fprintf(stderr, "mkdir %s: %s\n", rproc.dir, strerror(errno));
return -1;
}
return 0;
}
static int
create_trace_stream()
{
int fd;
char path[PATH_MAX];
fprintf(stderr, "create thread stream tid=%d gettid=%d rproc.proc=%d rproc.ready=%d\n",
rthread.tid, syscall(SYS_gettid), rproc.proc, rproc.ready);
snprintf(path, PATH_MAX, "%s/thread.%d", rproc.dir, rthread.tid);
//rthread.streamfd = open(path, O_WRONLY | O_CREAT | O_DSYNC, 0644);
rthread.streamfd = open(path, O_WRONLY | O_CREAT, 0644);
if(rthread.streamfd == -1)
{
fprintf(stderr, "open %s failed: %s\n", path, strerror(errno));
return -1;
}
return 0;
}
static int
proc_metadata_init(struct ovni_rproc *proc)
{
JSON_Value *meta;
proc->meta = json_value_init_object();
if(proc->meta == NULL)
{
err("failed to create json object\n");
abort();
}
return 0;
}
static int
proc_metadata_store(struct ovni_rproc *proc)
{
char path[PATH_MAX];
snprintf(path, PATH_MAX, "%s/metadata.json", proc->dir);
assert(proc->meta != NULL);
if(json_serialize_to_file_pretty(proc->meta, path) != JSONSuccess)
{
err("failed to write proc metadata\n");
abort();
}
return 0;
}
void
ovni_add_cpu(int index, int phyid)
{
JSON_Array *cpuarray;
JSON_Object *cpu;
JSON_Object *meta;
JSON_Value *valcpu, *valcpuarray;
int append = 0;
assert(rproc.ready == 1);
assert(rproc.meta != NULL);
meta = json_value_get_object(rproc.meta);
assert(meta);
/* Find the CPU array and create it if needed */
cpuarray = json_object_dotget_array(meta, "cpus");
if(cpuarray == NULL)
{
valcpuarray = json_value_init_array();
assert(valcpuarray);
cpuarray = json_array(valcpuarray);
assert(cpuarray);
append = 1;
}
valcpuarray = json_array_get_wrapping_value(cpuarray);
assert(valcpuarray);
valcpu = json_value_init_object();
assert(valcpu);
cpu = json_object(valcpu);
assert(cpu);
if(json_object_set_number(cpu, "index", index) != 0)
abort();
if(json_object_set_number(cpu, "phyid", phyid) != 0)
abort();
if(json_array_append_value(cpuarray, valcpu) != 0)
abort();
if(append && json_object_set_value(meta, "cpus", valcpuarray) != 0)
abort();
//puts(json_serialize_to_string_pretty(rproc.meta));
}
static void
proc_set_app(int appid)
{
JSON_Object *meta;
assert(rproc.ready == 1);
assert(rproc.meta != NULL);
meta = json_value_get_object(rproc.meta);
assert(meta);
if(json_object_set_number(meta, "app_id", appid) != 0)
abort();
}
int
ovni_proc_init(int app, char *loom, int proc)
{
int i;
assert(rproc.ready == 0);
memset(&rproc, 0, sizeof(rproc));
/* FIXME: strcpy is insecure */
strcpy(rproc.loom, loom);
rproc.proc = proc;
rproc.app = app;
/* By default we use the monotonic clock */
rproc.clockid = CLOCK_MONOTONIC;
if(create_trace_dirs(OVNI_TRACEDIR, loom, proc))
abort();
if(proc_metadata_init(&rproc) != 0)
abort();
rproc.ready = 1;
proc_set_app(app);
return 0;
}
int
ovni_proc_fini()
{
if(proc_metadata_store(&rproc) != 0)
abort();
return 0;
}
int
ovni_thread_init(pid_t tid)
{
int i;
assert(tid != 0);
if(rthread.ready)
{
fprintf(stderr, "warning: thread tid=%d already initialized\n",
tid);
return 0;
}
assert(rthread.ready == 0);
assert(rthread.tid == 0);
assert(rthread.cpu == 0);
assert(rproc.ready == 1);
fprintf(stderr, "ovni thread init tid=%d\n", tid);
memset(&rthread, 0, sizeof(rthread));
rthread.tid = tid;
rthread.cpu = -666;
rthread.evlen = 0;
rthread.evbuf = malloc(OVNI_MAX_EV_BUF);
if(rthread.evbuf == NULL)
{
perror("malloc");
abort();
}
if(create_trace_stream(tid))
abort();
rthread.ready = 1;
return 0;
}
int
ovni_thread_free()
{
assert(rthread.ready);
free(rthread.evbuf);
}
int
ovni_thread_isready()
{
return rthread.ready;
}
void
ovni_cpu_set(int cpu)
{
rthread.cpu = cpu;
}
static inline
uint64_t rdtsc(void)
{
uint32_t lo, hi;
// RDTSC copies contents of 64-bit TSC into EDX:EAX
asm volatile("rdtsc" : "=a" (lo), "=d" (hi));
return (uint64_t) hi << 32 | lo;
}
uint64_t
ovni_get_clock()
{
struct timespec tp;
uint64_t ns = 1000LL * 1000LL * 1000LL;
uint64_t raw;
int ret;
#ifdef USE_RDTSC
raw = rdtsc();
#else
ret = clock_gettime(rproc.clockid, &tp);
#ifdef ENABLE_SLOW_CHECKS
if(ret) abort();
#endif /* ENABLE_SLOW_CHECKS */
raw = tp.tv_sec * ns + tp.tv_nsec;
rthread.clockvalue = (uint64_t) raw;
#endif /* USE_RDTSC */
return raw;
}
/* Sets the current time so that all subsequent events have the new
* timestamp */
void
ovni_clock_update()
{
rthread.clockvalue = ovni_get_clock();
}
static void
hexdump(uint8_t *buf, size_t size)
{
int i, j;
//printf("writing %ld bytes in cpu=%d\n", size, rthread.cpu);
for(i=0; i<size; i+=16)
{
for(j=0; j<16 && i+j < size; j++)
{
dbg("%02x ", buf[i+j]);
}
dbg("\n");
}
}
static int
ovni_write(uint8_t *buf, size_t size)
{
ssize_t written;
do
{
written = write(rthread.streamfd, buf, size);
if(written < 0)
{
perror("write");
return -1;
}
size -= written;
buf += written;
} while(size > 0);
return 0;
}
static int
flush_evbuf()
{
int ret;
assert(rthread.ready);
assert(rproc.ready);
ret = ovni_write(rthread.evbuf, rthread.evlen);
rthread.evlen = 0;
return ret;
}
static void
ovni_ev_set_clock(struct ovni_ev *ev)
{
ev->header.clock = rthread.clockvalue;
}
uint64_t
ovni_ev_get_clock(struct ovni_ev *ev)
{
return ev->header.clock;
}
void
ovni_ev_set_mcv(struct ovni_ev *ev, char *mcv)
{
ev->header.model = mcv[0];
ev->header.class = mcv[1];
ev->header.value = mcv[2];
}
static size_t
get_jumbo_payload_size(struct ovni_ev *ev)
{
return sizeof(ev->payload.jumbo.size) + ev->payload.jumbo.size;
}
int
ovni_payload_size(struct ovni_ev *ev)
{
int size;
if(ev->header.flags & OVNI_EV_JUMBO)
return get_jumbo_payload_size(ev);
size = ev->header.flags & 0x0f;
if(size == 0)
return 0;
/* The minimum size is 2 bytes, so we can encode a length of 16
* bytes using 4 bits (0x0f) */
size++;
return size;
}
void
ovni_payload_add(struct ovni_ev *ev, uint8_t *buf, int size)
{
int payload_size;
assert((ev->header.flags & OVNI_EV_JUMBO) == 0);
assert(size >= 2);
payload_size = ovni_payload_size(ev);
/* Ensure we have room */
assert(payload_size + size <= sizeof(ev->payload));
memcpy(&ev->payload.u8[payload_size], buf, size);
payload_size += size;
ev->header.flags = ev->header.flags & 0xf0 | (payload_size-1) & 0x0f;
}
int
ovni_ev_size(struct ovni_ev *ev)
{
return sizeof(ev->header) + ovni_payload_size(ev);
}
static void
ovni_ev_add(struct ovni_ev *ev);
int
ovni_flush()
{
int ret = 0;
struct ovni_ev pre={0}, post={0};
assert(rthread.ready);
assert(rproc.ready);
ovni_clock_update();
ovni_ev_set_clock(&pre);
ovni_ev_set_mcv(&pre, "OF[");
ret = flush_evbuf();
ovni_clock_update();
ovni_ev_set_clock(&post);
ovni_ev_set_mcv(&post, "OF]");
/* Add the two flush events */
ovni_ev_add(&pre);
ovni_ev_add(&post);
return ret;
}
void
ovni_ev_add_jumbo(struct ovni_ev *ev, uint8_t *buf, uint32_t bufsize)
{
size_t evsize, totalsize;
assert(ovni_payload_size(ev) == 0);
ovni_payload_add(ev, (uint8_t *) &bufsize, sizeof(bufsize));
evsize = ovni_ev_size(ev);
totalsize = evsize + bufsize;
/* Check if the event fits or flush first otherwise */
if(rthread.evlen + totalsize >= OVNI_MAX_EV_BUF)
ovni_flush();
/* Se the jumbo flag here, so we capture the previous evsize
* properly, ignoring the jumbo buffer */
ev->header.flags |= OVNI_EV_JUMBO;
memcpy(&rthread.evbuf[rthread.evlen], ev, evsize);
rthread.evlen += evsize;
memcpy(&rthread.evbuf[rthread.evlen], buf, bufsize);
rthread.evlen += bufsize;
assert(rthread.evlen < OVNI_MAX_EV_BUF);
}
static void
ovni_ev_add(struct ovni_ev *ev)
{
int size;
size = ovni_ev_size(ev);
/* Check if the event fits or flush first otherwise */
if(rthread.evlen + size >= OVNI_MAX_EV_BUF)
ovni_flush();
memcpy(&rthread.evbuf[rthread.evlen], ev, size);
rthread.evlen += size;
}
void
ovni_ev_jumbo(struct ovni_ev *ev, uint8_t *buf, uint32_t bufsize)
{
ovni_ev_set_clock(ev);
ovni_ev_add_jumbo(ev, buf, bufsize);
}
void
ovni_ev(struct ovni_ev *ev)
{
ovni_ev_set_clock(ev);
ovni_ev_add(ev);
}
static int
find_dir_prefix_str(struct dirent *dirent, const char *prefix, const char **str)
{
const char *p;
p = dirent->d_name;
/* Check the prefix */
if(strncmp(p, prefix, strlen(prefix)) != 0)
return -1;
p += strlen(prefix);
/* Find the dot */
if(*p != '.')
return -1;
p++;
*str = p;
return 0;
}
static int
find_dir_prefix_int(struct dirent *dirent, const char *prefix, int *num)
{
const char *p;
if(find_dir_prefix_str(dirent, prefix, &p) != 0)
return -1;
/* Convert the suffix string to a number */
*num = atoi(p);
return 0;
}
static int
load_thread(struct ovni_ethread *thread, struct ovni_eproc *proc, int index, int tid, char *filepath)
{
static int total_threads = 0;
thread->tid = tid;
thread->index = index;
thread->gindex = total_threads++;
thread->state = TH_ST_UNKNOWN;
thread->proc = proc;
thread->stream_fd = open(filepath, O_RDONLY);
if(thread->stream_fd == -1)
{
perror("open");
return -1;
}
return 0;
}
static void
load_proc_metadata(struct ovni_eproc *proc)
{
JSON_Object *meta;
meta = json_value_get_object(proc->meta);
assert(meta);
proc->appid = (int) json_object_get_number(meta, "app_id");
}
static int
load_proc(struct ovni_eproc *proc, int index, int pid, char *procdir)
{
static int total_procs = 0;
struct dirent *dirent;
DIR *dir;
char path[PATH_MAX];
struct ovni_ethread *thread;
int tid;
proc->pid = pid;
proc->index = index;
proc->gindex = total_procs++;
sprintf(path, "%s/%s", procdir, "metadata.json");
proc->meta = json_parse_file_with_comments(path);
assert(proc->meta);
/* The appid is populated from the metadata */
load_proc_metadata(proc);
if((dir = opendir(procdir)) == NULL)
{
fprintf(stderr, "opendir %s failed: %s\n",
procdir, strerror(errno));
return -1;
}
while((dirent = readdir(dir)) != NULL)
{
if(find_dir_prefix_int(dirent, "thread", &tid) != 0)
continue;
sprintf(path, "%s/%s", procdir, dirent->d_name);
if(proc->nthreads >= OVNI_MAX_THR)
{
err("too many thread streams for process %d\n", pid);
abort();
}
thread = &proc->thread[proc->nthreads];
if(load_thread(thread, proc, proc->nthreads, tid, path) != 0)
return -1;
proc->nthreads++;
}
closedir(dir);
return 0;
}
static int
load_loom(struct ovni_loom *loom, int loomid, char *loomdir)
{
int pid;
char path[PATH_MAX];
struct stat st;
DIR *dir;
struct dirent *dirent;
struct ovni_eproc *proc;
if((dir = opendir(loomdir)) == NULL)
{
fprintf(stderr, "opendir %s failed: %s\n",
loomdir, strerror(errno));
return -1;
}
while((dirent = readdir(dir)) != NULL)
{
if(find_dir_prefix_int(dirent, "proc", &pid) != 0)
continue;
sprintf(path, "%s/%s", loomdir, dirent->d_name);
if(loom->nprocs >= OVNI_MAX_PROC)
{
err("too many process streams for loom %d\n",
loomid);
abort();
}
proc = &loom->proc[loom->nprocs];
if(load_proc(proc, loom->nprocs, pid, path) != 0)
return -1;
loom->nprocs++;
}
closedir(dir);
return 0;
}
int
ovni_load_trace(struct ovni_trace *trace, char *tracedir)
{
int i;
char path[PATH_MAX];
const char *loom_name;
struct stat st;
DIR *dir;
struct dirent *dirent;
struct ovni_loom *loom;
trace->nlooms = 0;
if((dir = opendir(tracedir)) == NULL)
{
fprintf(stderr, "opendir %s failed: %s\n",
tracedir, strerror(errno));
return -1;
}
while((dirent = readdir(dir)) != NULL)
{
if(find_dir_prefix_str(dirent, "loom", &loom_name) != 0)
{
/* Ignore other files in tracedir */
continue;
}
if(trace->nlooms >= OVNI_MAX_LOOM)
{
err("too many looms for trace %s\n",
tracedir);
abort();
}
i = trace->nlooms;
loom = &trace->loom[i];
/* FIXME: Unsafe */
strcpy(loom->hostname, loom_name);
sprintf(path, "%s/%s", tracedir, dirent->d_name);
if(load_loom(&trace->loom[i], i, path) != 0)
return -1;
trace->nlooms++;
}
closedir(dir);
return 0;
}
static int
load_stream_buf(struct ovni_stream *stream, struct ovni_ethread *thread)
{
struct stat st;
if(fstat(thread->stream_fd, &st) < 0)
{
perror("fstat");
return -1;
}
if(st.st_size == 0)
{
err("warning: stream %s is empty\n", stream->tid);
stream->size = 0;
stream->buf = NULL;
stream->active = 0;
return 0;
}
stream->size = st.st_size;
stream->buf = mmap(NULL, stream->size, PROT_READ, MAP_SHARED,
thread->stream_fd, 0);
if(stream->buf == MAP_FAILED)
{
perror("mmap");
return -1;
}
stream->active = 1;
return 0;
}
/* Populates the streams in a single array */
int
ovni_load_streams(struct ovni_trace *trace)
{
int i, j, k, s;
struct ovni_loom *loom;
struct ovni_eproc *proc;
struct ovni_ethread *thread;
struct ovni_stream *stream;
trace->nstreams = 0;
for(i=0; i<trace->nlooms; i++)
{
loom = &trace->loom[i];
for(j=0; j<loom->nprocs; j++)
{
proc = &loom->proc[j];
for(k=0; k<proc->nthreads; k++)
{
trace->nstreams++;
}
}
}
trace->stream = calloc(trace->nstreams, sizeof(struct ovni_stream));
if(trace->stream == NULL)
{
perror("calloc");
return -1;
}
err("loaded %d streams\n", trace->nstreams);
for(s=0, i=0; i<trace->nlooms; i++)
{
loom = &trace->loom[i];
for(j=0; j<loom->nprocs; j++)
{
proc = &loom->proc[j];
for(k=0; k<proc->nthreads; k++)
{
thread = &proc->thread[k];
stream = &trace->stream[s++];
stream->tid = thread->tid;
stream->thread = k;
stream->proc = j;
stream->loom = i;
stream->lastclock = 0;
stream->offset = 0;
stream->cur_ev = NULL;
if(load_stream_buf(stream, thread) != 0)
{
err("load_stream_buf failed\n");
return -1;
}
}
}
}
return 0;
}
void
ovni_free_streams(struct ovni_trace *trace)
{
free(trace->stream);
}
int
ovni_load_next_event(struct ovni_stream *stream)
{
int i;
size_t n, size;
uint8_t flags;
if(stream->active == 0)
{
dbg("stream is inactive, cannot load more events\n");
return -1;
}
if(stream->cur_ev == NULL)
{
stream->cur_ev = (struct ovni_ev *) stream->buf;
stream->offset = 0;
goto out;
}
//printf("advancing offset %ld bytes\n", ovni_ev_size(stream->cur_ev));
stream->offset += ovni_ev_size(stream->cur_ev);
/* We have reached the end */
if(stream->offset == stream->size)
{
stream->active = 0;
dbg("stream %d runs out of events\n", stream->tid);
return -1;
}
/* It cannot overflow, otherwise we are reading garbage */
assert(stream->offset < stream->size);
stream->cur_ev = (struct ovni_ev *) &stream->buf[stream->offset];
out:
//dbg("---------\n");
//dbg("ev size = %d\n", ovni_ev_size(stream->cur_ev));
//dbg("ev flags = %02x\n", stream->cur_ev->header.flags);
//dbg("loaded next event:\n");
//hexdump((uint8_t *) stream->cur_ev, ovni_ev_size(stream->cur_ev));
//dbg("---------\n");
return 0;
}