diff --git a/ovni.c b/ovni.c index 2f6d01b..3b766ee 100644 --- a/ovni.c +++ b/ovni.c @@ -378,6 +378,42 @@ ovni_proc_fini(void) } } +static void +write_evbuf(uint8_t *buf, size_t size) +{ + do + { + ssize_t written = write(rthread.streamfd, buf, size); + + if(written < 0) + die("failed to write buffer to disk: %s\n", strerror(errno)); + + size -= written; + buf += written; + } while(size > 0); +} + +static void +flush_evbuf(void) +{ + write_evbuf(rthread.evbuf, rthread.evlen); + + rthread.evlen = 0; +} + +static void +write_stream_header(void) +{ + struct ovni_stream_header *h = + (struct ovni_stream_header *) rthread.evbuf; + + memcpy(h->magic, OVNI_STREAM_MAGIC, 4); + h->version = OVNI_STREAM_VERSION; + + rthread.evlen = sizeof(struct ovni_stream_header); + flush_evbuf(); +} + void ovni_thread_init(pid_t tid) { @@ -403,6 +439,7 @@ ovni_thread_init(pid_t tid) die("ovni_thread_init: malloc failed: %s", strerror(errno)); create_trace_stream(); + write_stream_header(); rthread.ready = 1; } @@ -456,29 +493,6 @@ ovni_clock_now(void) #endif } -static void -write_evbuf(uint8_t *buf, size_t size) -{ - do - { - ssize_t written = write(rthread.streamfd, buf, size); - - if(written < 0) - die("failed to write buffer to disk: %s\n", strerror(errno)); - - size -= written; - buf += written; - } while(size > 0); -} - -static void -flush_evbuf(void) -{ - write_evbuf(rthread.evbuf, rthread.evlen); - - rthread.evlen = 0; -} - void ovni_ev_set_clock(struct ovni_ev *ev, uint64_t clock) { diff --git a/ovni.h b/ovni.h index e27f298..bcd8e85 100644 --- a/ovni.h +++ b/ovni.h @@ -48,6 +48,9 @@ typedef struct json_value_t JSON_Value; /* Reserved buffer for event allocation per thread */ #define OVNI_MAX_EV_BUF (2 * 1024LL * 1024LL) /* 2 MiB */ +#define OVNI_STREAM_MAGIC "ovni" +#define OVNI_STREAM_VERSION 1 + /* ----------------------- common ------------------------ */ enum ovni_ev_flags { @@ -92,6 +95,11 @@ struct __attribute__((__packed__)) ovni_ev { union ovni_ev_payload payload; }; +struct __attribute__((__packed__)) ovni_stream_header { + char magic[4]; + uint32_t version; +}; + /* ----------------------- runtime ------------------------ */ /* State of each thread on runtime */ diff --git a/trace.c b/trace.c index f79b47d..39e8101 100644 --- a/trace.c +++ b/trace.c @@ -443,9 +443,74 @@ ovni_load_trace(struct ovni_trace *trace, char *tracedir) } static int -load_stream_buf(struct ovni_stream *stream, struct ovni_ethread *thread) +check_stream_header(struct ovni_stream *stream) +{ + int ret = 0; + + if(stream->size < sizeof(struct ovni_stream_header)) + { + err("stream %d: incomplete stream header\n", + stream->tid); + return -1; + } + + struct ovni_stream_header *h = + (struct ovni_stream_header *) stream->buf; + + if(memcmp(h->magic, OVNI_STREAM_MAGIC, 4) != 0) + { + char magic[5]; + memcpy(magic, h->magic, 4); + magic[4] = '\0'; + err("stream %d: wrong stream magic '%s' (expected '%s')\n", + stream->tid, magic, OVNI_STREAM_MAGIC); + ret = -1; + } + + if(h->version != OVNI_STREAM_VERSION) + { + err("stream %d: stream version mismatch %u (expected %u)\n", + stream->tid, h->version, OVNI_STREAM_VERSION); + ret = -1; + } + + return ret; +} + +static int +load_stream_fd(struct ovni_stream *stream, int fd) { struct stat st; + if(fstat(fd, &st) < 0) + { + perror("fstat failed"); + return -1; + } + + /* Error because it doesn't have the header */ + if(st.st_size == 0) + { + err("stream %d is empty\n", stream->tid); + return -1; + } + + int prot = PROT_READ | PROT_WRITE; + stream->buf = mmap(NULL, st.st_size, prot, MAP_PRIVATE, fd, 0); + + if(stream->buf == MAP_FAILED) + { + perror("mmap failed"); + return -1; + } + + stream->size = st.st_size; + + return 0; +} + +static int +load_stream_buf(struct ovni_stream *stream, struct ovni_ethread *thread) +{ int fd; if((fd = open(thread->tracefile, O_RDWR)) == -1) @@ -454,34 +519,21 @@ load_stream_buf(struct ovni_stream *stream, struct ovni_ethread *thread) return -1; } - if(fstat(fd, &st) < 0) + if(load_stream_fd(stream, fd) != 0) + return -1; + + if(check_stream_header(stream) != 0) { - perror("fstat failed"); + err("stream %d: bad header\n", stream->tid); return -1; } - if(st.st_size == 0) - { - err("warning: stream %d is empty\n", stream->tid); - stream->size = 0; - stream->buf = NULL; + stream->offset = sizeof(struct ovni_stream_header); + + if(stream->offset == stream->size) stream->active = 0; - - /* No need to do anything else */ - return 0; - } - - stream->size = st.st_size; - stream->buf = mmap(NULL, stream->size, PROT_READ | PROT_WRITE, - MAP_PRIVATE, fd, 0); - - if(stream->buf == MAP_FAILED) - { - perror("mmap failed"); - return -1; - } - - stream->active = 1; + else + stream->active = 1; /* No need to keep the fd open */ if(close(fd)) @@ -594,25 +646,15 @@ ovni_free_trace(struct ovni_trace *trace) int ovni_load_next_event(struct ovni_stream *stream) { - size_t size; - if(stream->active == 0) { dbg("stream is inactive, cannot load more events\n"); return -1; } - if(stream->cur_ev == NULL) - { - stream->cur_ev = (struct ovni_ev *) stream->buf; - stream->offset = 0; - size = 0; - goto out; - } - - //printf("advancing offset %ld bytes\n", ovni_ev_size(stream->cur_ev)); - size = ovni_ev_size(stream->cur_ev); - stream->offset += size; + /* Only step the offset if we have load an event */ + if(stream->cur_ev != NULL) + stream->offset += ovni_ev_size(stream->cur_ev); /* It cannot overflow, otherwise we are reading garbage */ if(stream->offset > stream->size) @@ -622,20 +664,12 @@ ovni_load_next_event(struct ovni_stream *stream) if(stream->offset == stream->size) { stream->active = 0; + stream->cur_ev = NULL; dbg("stream %d runs out of events\n", stream->tid); return -1; } stream->cur_ev = (struct ovni_ev *) &stream->buf[stream->offset]; -out: - - //dbg("---------\n"); - //dbg("ev size = %d\n", ovni_ev_size(stream->cur_ev)); - //dbg("ev flags = %02x\n", stream->cur_ev->header.flags); - //dbg("loaded next event:\n"); - //hexdump((uint8_t *) stream->cur_ev, ovni_ev_size(stream->cur_ev)); - //dbg("---------\n"); - - return (int) size; + return 0; }