Allow streams to step over unsorted events
This commit is contained in:
parent
2c5cfbb467
commit
61b0efee34
@ -47,13 +47,13 @@ load_stream_fd(struct stream *stream, int fd)
|
|||||||
{
|
{
|
||||||
struct stat st;
|
struct stat st;
|
||||||
if (fstat(fd, &st) < 0) {
|
if (fstat(fd, &st) < 0) {
|
||||||
perror("fstat failed");
|
err("fstat failed:");
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Error because it doesn't have the header */
|
/* Error because it doesn't have the header */
|
||||||
if (st.st_size == 0) {
|
if (st.st_size == 0) {
|
||||||
err("load_stream_fd: stream %s is empty\n", stream->path);
|
err("stream %s is empty", stream->path);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -61,7 +61,7 @@ load_stream_fd(struct stream *stream, int fd)
|
|||||||
stream->buf = mmap(NULL, st.st_size, prot, MAP_PRIVATE, fd, 0);
|
stream->buf = mmap(NULL, st.st_size, prot, MAP_PRIVATE, fd, 0);
|
||||||
|
|
||||||
if (stream->buf == MAP_FAILED) {
|
if (stream->buf == MAP_FAILED) {
|
||||||
perror("mmap failed");
|
err("mmap failed:");
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -73,10 +73,10 @@ load_stream_fd(struct stream *stream, int fd)
|
|||||||
int
|
int
|
||||||
stream_load(struct stream *stream, const char *tracedir, const char *relpath)
|
stream_load(struct stream *stream, const char *tracedir, const char *relpath)
|
||||||
{
|
{
|
||||||
int fd;
|
memset(stream, 0, sizeof(struct stream));
|
||||||
|
|
||||||
if (snprintf(stream->path, PATH_MAX, "%s/%s", tracedir, relpath) >= PATH_MAX) {
|
if (snprintf(stream->path, PATH_MAX, "%s/%s", tracedir, relpath) >= PATH_MAX) {
|
||||||
err("stream_load: path too long: %s/%s\n", tracedir, relpath);
|
err("path too long: %s/%s", tracedir, relpath);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -84,25 +84,26 @@ stream_load(struct stream *stream, const char *tracedir, const char *relpath)
|
|||||||
path_remove_trailing(stream->path);
|
path_remove_trailing(stream->path);
|
||||||
|
|
||||||
if (snprintf(stream->relpath, PATH_MAX, "%s", relpath) >= PATH_MAX) {
|
if (snprintf(stream->relpath, PATH_MAX, "%s", relpath) >= PATH_MAX) {
|
||||||
err("stream_load: path too long: %s\n", relpath);
|
err("path too long: %s", relpath);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
dbg("stream_load: loading %s\n", stream->relpath);
|
dbg("loading %s", stream->relpath);
|
||||||
|
|
||||||
|
int fd;
|
||||||
if ((fd = open(stream->path, O_RDWR)) == -1) {
|
if ((fd = open(stream->path, O_RDWR)) == -1) {
|
||||||
err("stream_load: open failed: %s\n", stream->path);
|
err("open %s failed:", stream->path);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (load_stream_fd(stream, fd) != 0) {
|
if (load_stream_fd(stream, fd) != 0) {
|
||||||
err("stream_load: load_stream_fd failed for stream '%s'\n",
|
err("load_stream_fd failed for stream '%s'",
|
||||||
stream->path);
|
stream->path);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (check_stream_header(stream) != 0) {
|
if (check_stream_header(stream) != 0) {
|
||||||
err("stream_load: stream '%s' has bad header\n",
|
err("stream '%s' has bad header",
|
||||||
stream->path);
|
stream->path);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
@ -113,17 +114,17 @@ stream_load(struct stream *stream, const char *tracedir, const char *relpath)
|
|||||||
if (stream->offset < stream->size) {
|
if (stream->offset < stream->size) {
|
||||||
stream->active = 1;
|
stream->active = 1;
|
||||||
} else if (stream->offset == stream->size) {
|
} else if (stream->offset == stream->size) {
|
||||||
err("warning: stream '%s' has zero events\n", stream->relpath);
|
err("warning: stream '%s' has zero events", stream->relpath);
|
||||||
stream->active = 0;
|
stream->active = 0;
|
||||||
} else {
|
} else {
|
||||||
err("stream_load: impossible, offset %ld bigger than size %ld\n",
|
err("impossible, offset %ld bigger than size %ld",
|
||||||
stream->offset, stream->size);
|
stream->offset, stream->size);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* No need to keep the fd open */
|
/* No need to keep the fd open */
|
||||||
if (close(fd)) {
|
if (close(fd)) {
|
||||||
perror("close failed");
|
err("close failed:");
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -146,13 +147,13 @@ int
|
|||||||
stream_clkoff_set(struct stream *stream, int64_t clkoff)
|
stream_clkoff_set(struct stream *stream, int64_t clkoff)
|
||||||
{
|
{
|
||||||
if (stream->cur_ev) {
|
if (stream->cur_ev) {
|
||||||
die("stream_clkoff_set: cannot set clokoff in started stream '%s'\n",
|
die("cannot set clokoff in started stream '%s'",
|
||||||
stream->relpath);
|
stream->relpath);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (stream->clock_offset != 0) {
|
if (stream->clock_offset != 0) {
|
||||||
err("stream_clkoff_set: stream '%s' already has a clock offset\n",
|
err("stream '%s' already has a clock offset",
|
||||||
stream->relpath);
|
stream->relpath);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
@ -184,7 +185,7 @@ int
|
|||||||
stream_step(struct stream *stream)
|
stream_step(struct stream *stream)
|
||||||
{
|
{
|
||||||
if (!stream->active) {
|
if (!stream->active) {
|
||||||
err("stream_step: stream is inactive, cannot step\n");
|
err("stream is inactive, cannot step");
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -194,7 +195,7 @@ stream_step(struct stream *stream)
|
|||||||
|
|
||||||
/* It cannot pass the size, otherwise we are reading garbage */
|
/* It cannot pass the size, otherwise we are reading garbage */
|
||||||
if (stream->offset > stream->size) {
|
if (stream->offset > stream->size) {
|
||||||
err("stream_step: stream offset %ld exceeds size %ld\n",
|
err("stream offset %ld exceeds size %ld",
|
||||||
stream->offset, stream->size);
|
stream->offset, stream->size);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
@ -211,21 +212,25 @@ stream_step(struct stream *stream)
|
|||||||
|
|
||||||
/* Ensure the event fits */
|
/* Ensure the event fits */
|
||||||
if (stream->offset + ovni_ev_size(stream->cur_ev) > stream->size) {
|
if (stream->offset + ovni_ev_size(stream->cur_ev) > stream->size) {
|
||||||
err("stream_step: stream '%s' ends with incomplete event\n",
|
err("stream '%s' ends with incomplete event",
|
||||||
stream->relpath);
|
stream->relpath);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Ensure the clock grows monotonically */
|
|
||||||
int64_t clock = stream_evclock(stream, stream->cur_ev);
|
int64_t clock = stream_evclock(stream, stream->cur_ev);
|
||||||
|
|
||||||
|
/* Ensure the clock grows monotonically if unsorted flag not set */
|
||||||
|
if (stream->unsorted == 0) {
|
||||||
if (clock < stream->lastclock) {
|
if (clock < stream->lastclock) {
|
||||||
err("clock goes backwards %ld -> %ld in stream '%s' at offset %ld\n",
|
err("clock goes backwards %ld -> %ld in stream '%s' at offset %ld",
|
||||||
stream->lastclock,
|
stream->lastclock,
|
||||||
clock,
|
clock,
|
||||||
stream->relpath,
|
stream->relpath,
|
||||||
stream->offset);
|
stream->offset);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
stream->lastclock = clock;
|
stream->lastclock = clock;
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
@ -241,3 +246,9 @@ stream_progress(struct stream *stream)
|
|||||||
double prog = (double) uoffset / (double) stream->usize;
|
double prog = (double) uoffset / (double) stream->usize;
|
||||||
return prog;
|
return prog;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
stream_allow_unsorted(struct stream *stream)
|
||||||
|
{
|
||||||
|
stream->unsorted = 1;
|
||||||
|
}
|
||||||
|
@ -21,6 +21,7 @@ struct stream {
|
|||||||
int64_t offset;
|
int64_t offset;
|
||||||
|
|
||||||
int active;
|
int active;
|
||||||
|
int unsorted;
|
||||||
|
|
||||||
double progress;
|
double progress;
|
||||||
|
|
||||||
@ -46,6 +47,7 @@ int stream_step(struct stream *stream);
|
|||||||
struct ovni_ev *stream_ev(struct stream *stream);
|
struct ovni_ev *stream_ev(struct stream *stream);
|
||||||
int64_t stream_evclock(struct stream *stream, struct ovni_ev *ev);
|
int64_t stream_evclock(struct stream *stream, struct ovni_ev *ev);
|
||||||
int64_t stream_lastclock(struct stream *stream);
|
int64_t stream_lastclock(struct stream *stream);
|
||||||
|
void stream_allow_unsorted(struct stream *stream);
|
||||||
|
|
||||||
void stream_data_set(struct stream *stream, void *data);
|
void stream_data_set(struct stream *stream, void *data);
|
||||||
void *stream_data_get(struct stream *stream);
|
void *stream_data_get(struct stream *stream);
|
||||||
|
Loading…
Reference in New Issue
Block a user