Add binary stream header with version

This commit is contained in:
Rodrigo Arias 2022-07-26 19:04:08 +02:00
parent 9820315ccb
commit 610516840e
3 changed files with 126 additions and 70 deletions

60
ovni.c
View File

@ -378,6 +378,42 @@ ovni_proc_fini(void)
} }
} }
static void
write_evbuf(uint8_t *buf, size_t size)
{
do
{
ssize_t written = write(rthread.streamfd, buf, size);
if(written < 0)
die("failed to write buffer to disk: %s\n", strerror(errno));
size -= written;
buf += written;
} while(size > 0);
}
static void
flush_evbuf(void)
{
write_evbuf(rthread.evbuf, rthread.evlen);
rthread.evlen = 0;
}
static void
write_stream_header(void)
{
struct ovni_stream_header *h =
(struct ovni_stream_header *) rthread.evbuf;
memcpy(h->magic, OVNI_STREAM_MAGIC, 4);
h->version = OVNI_STREAM_VERSION;
rthread.evlen = sizeof(struct ovni_stream_header);
flush_evbuf();
}
void void
ovni_thread_init(pid_t tid) ovni_thread_init(pid_t tid)
{ {
@ -403,6 +439,7 @@ ovni_thread_init(pid_t tid)
die("ovni_thread_init: malloc failed: %s", strerror(errno)); die("ovni_thread_init: malloc failed: %s", strerror(errno));
create_trace_stream(); create_trace_stream();
write_stream_header();
rthread.ready = 1; rthread.ready = 1;
} }
@ -456,29 +493,6 @@ ovni_clock_now(void)
#endif #endif
} }
static void
write_evbuf(uint8_t *buf, size_t size)
{
do
{
ssize_t written = write(rthread.streamfd, buf, size);
if(written < 0)
die("failed to write buffer to disk: %s\n", strerror(errno));
size -= written;
buf += written;
} while(size > 0);
}
static void
flush_evbuf(void)
{
write_evbuf(rthread.evbuf, rthread.evlen);
rthread.evlen = 0;
}
void void
ovni_ev_set_clock(struct ovni_ev *ev, uint64_t clock) ovni_ev_set_clock(struct ovni_ev *ev, uint64_t clock)
{ {

8
ovni.h
View File

@ -48,6 +48,9 @@ typedef struct json_value_t JSON_Value;
/* Reserved buffer for event allocation per thread */ /* Reserved buffer for event allocation per thread */
#define OVNI_MAX_EV_BUF (2 * 1024LL * 1024LL) /* 2 MiB */ #define OVNI_MAX_EV_BUF (2 * 1024LL * 1024LL) /* 2 MiB */
#define OVNI_STREAM_MAGIC "ovni"
#define OVNI_STREAM_VERSION 1
/* ----------------------- common ------------------------ */ /* ----------------------- common ------------------------ */
enum ovni_ev_flags { enum ovni_ev_flags {
@ -92,6 +95,11 @@ struct __attribute__((__packed__)) ovni_ev {
union ovni_ev_payload payload; union ovni_ev_payload payload;
}; };
struct __attribute__((__packed__)) ovni_stream_header {
char magic[4];
uint32_t version;
};
/* ----------------------- runtime ------------------------ */ /* ----------------------- runtime ------------------------ */
/* State of each thread on runtime */ /* State of each thread on runtime */

126
trace.c
View File

@ -443,9 +443,74 @@ ovni_load_trace(struct ovni_trace *trace, char *tracedir)
} }
static int static int
load_stream_buf(struct ovni_stream *stream, struct ovni_ethread *thread) check_stream_header(struct ovni_stream *stream)
{
int ret = 0;
if(stream->size < sizeof(struct ovni_stream_header))
{
err("stream %d: incomplete stream header\n",
stream->tid);
return -1;
}
struct ovni_stream_header *h =
(struct ovni_stream_header *) stream->buf;
if(memcmp(h->magic, OVNI_STREAM_MAGIC, 4) != 0)
{
char magic[5];
memcpy(magic, h->magic, 4);
magic[4] = '\0';
err("stream %d: wrong stream magic '%s' (expected '%s')\n",
stream->tid, magic, OVNI_STREAM_MAGIC);
ret = -1;
}
if(h->version != OVNI_STREAM_VERSION)
{
err("stream %d: stream version mismatch %u (expected %u)\n",
stream->tid, h->version, OVNI_STREAM_VERSION);
ret = -1;
}
return ret;
}
static int
load_stream_fd(struct ovni_stream *stream, int fd)
{ {
struct stat st; struct stat st;
if(fstat(fd, &st) < 0)
{
perror("fstat failed");
return -1;
}
/* Error because it doesn't have the header */
if(st.st_size == 0)
{
err("stream %d is empty\n", stream->tid);
return -1;
}
int prot = PROT_READ | PROT_WRITE;
stream->buf = mmap(NULL, st.st_size, prot, MAP_PRIVATE, fd, 0);
if(stream->buf == MAP_FAILED)
{
perror("mmap failed");
return -1;
}
stream->size = st.st_size;
return 0;
}
static int
load_stream_buf(struct ovni_stream *stream, struct ovni_ethread *thread)
{
int fd; int fd;
if((fd = open(thread->tracefile, O_RDWR)) == -1) if((fd = open(thread->tracefile, O_RDWR)) == -1)
@ -454,33 +519,20 @@ load_stream_buf(struct ovni_stream *stream, struct ovni_ethread *thread)
return -1; return -1;
} }
if(fstat(fd, &st) < 0) if(load_stream_fd(stream, fd) != 0)
return -1;
if(check_stream_header(stream) != 0)
{ {
perror("fstat failed"); err("stream %d: bad header\n", stream->tid);
return -1; return -1;
} }
if(st.st_size == 0) stream->offset = sizeof(struct ovni_stream_header);
{
err("warning: stream %d is empty\n", stream->tid); if(stream->offset == stream->size)
stream->size = 0;
stream->buf = NULL;
stream->active = 0; stream->active = 0;
else
/* No need to do anything else */
return 0;
}
stream->size = st.st_size;
stream->buf = mmap(NULL, stream->size, PROT_READ | PROT_WRITE,
MAP_PRIVATE, fd, 0);
if(stream->buf == MAP_FAILED)
{
perror("mmap failed");
return -1;
}
stream->active = 1; stream->active = 1;
/* No need to keep the fd open */ /* No need to keep the fd open */
@ -594,25 +646,15 @@ ovni_free_trace(struct ovni_trace *trace)
int int
ovni_load_next_event(struct ovni_stream *stream) ovni_load_next_event(struct ovni_stream *stream)
{ {
size_t size;
if(stream->active == 0) if(stream->active == 0)
{ {
dbg("stream is inactive, cannot load more events\n"); dbg("stream is inactive, cannot load more events\n");
return -1; return -1;
} }
if(stream->cur_ev == NULL) /* Only step the offset if we have load an event */
{ if(stream->cur_ev != NULL)
stream->cur_ev = (struct ovni_ev *) stream->buf; stream->offset += ovni_ev_size(stream->cur_ev);
stream->offset = 0;
size = 0;
goto out;
}
//printf("advancing offset %ld bytes\n", ovni_ev_size(stream->cur_ev));
size = ovni_ev_size(stream->cur_ev);
stream->offset += size;
/* It cannot overflow, otherwise we are reading garbage */ /* It cannot overflow, otherwise we are reading garbage */
if(stream->offset > stream->size) if(stream->offset > stream->size)
@ -622,20 +664,12 @@ ovni_load_next_event(struct ovni_stream *stream)
if(stream->offset == stream->size) if(stream->offset == stream->size)
{ {
stream->active = 0; stream->active = 0;
stream->cur_ev = NULL;
dbg("stream %d runs out of events\n", stream->tid); dbg("stream %d runs out of events\n", stream->tid);
return -1; return -1;
} }
stream->cur_ev = (struct ovni_ev *) &stream->buf[stream->offset]; stream->cur_ev = (struct ovni_ev *) &stream->buf[stream->offset];
out: return 0;
//dbg("---------\n");
//dbg("ev size = %d\n", ovni_ev_size(stream->cur_ev));
//dbg("ev flags = %02x\n", stream->cur_ev->header.flags);
//dbg("loaded next event:\n");
//hexdump((uint8_t *) stream->cur_ev, ovni_ev_size(stream->cur_ev));
//dbg("---------\n");
return (int) size;
} }