From 26ff31028728586aeba4b2b5bdf524dde35652d5 Mon Sep 17 00:00:00 2001 From: Rodrigo Arias Date: Thu, 16 Dec 2021 14:22:58 +0100 Subject: [PATCH] Use pwrite() to modify the streams Using MAP_SHARED and mmap() causes coherence problems on shared filesystems such as GPFS. See https://pm.bsc.es/gitlab/rarias/ovni/-/issues/28 for more details. --- sort.c | 53 +++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 43 insertions(+), 10 deletions(-) diff --git a/sort.c b/sort.c index c27bae7..a886956 100644 --- a/sort.c +++ b/sort.c @@ -27,18 +27,19 @@ #define _GNU_SOURCE -#include +#include +#include +#include +#include +#include +#include #include +#include #include #include -#include -#include -#include #include -#include -#include +#include #include -#include #include "ovni.h" #include "trace.h" @@ -58,7 +59,13 @@ struct sortplan { /* The next event which must be not affected */ struct ovni_ev *next; + /* Pointer to the stream buffer */ + uint8_t *base; + struct ring *r; + + /* File descriptor of the stream file */ + int fd; }; enum operation_mode { SORT, CHECK }; @@ -216,6 +223,22 @@ sort_buf(uint8_t *src, uint8_t *buf, int64_t bufsize, dbg("injected %ld events in the past\n", injected); } +static void +write_stream(int fd, void *base, void *dst, const void *src, size_t size) +{ + while(size > 0) { + off_t offset = (off_t) ((int64_t) dst - (int64_t) base); + ssize_t written = pwrite(fd, src, size, offset); + + if(written < 0) + die("pwrite failed: %s\n", strerror(errno)); + + size -= written; + src = (void *) (((uint8_t *) src) + written); + dst = (void *) (((uint8_t *) dst) + written); + } +} + static int execute_sort_plan(struct sortplan *sp, size_t region) { @@ -258,6 +281,8 @@ execute_sort_plan(struct sortplan *sp, size_t region) /* Copy the sorted events back into the stream buffer */ memcpy(first, buf, bufsize); + write_stream(sp->fd, sp->base, first, buf, bufsize); + free(buf); if(region == 0) @@ -279,8 +304,16 @@ stream_winsort(struct ovni_stream *stream, struct ring *r) //uint64_t lastclock = 0; char st = 'S'; + char *fn = stream->thread->tracefile; + int fd = open(fn, O_WRONLY); + + if(fd < 0) + die("open %s failed: %s\n", fn, strerror(errno)); + ring_reset(r); sp.r = r; + sp.fd = fd; + sp.base = stream->buf; size_t region = 0; @@ -334,6 +367,9 @@ stream_winsort(struct ovni_stream *stream, struct ring *r) ring_add(r, ev); } + if(close(fd) < 0) + die("close %s failed: %s\n", fn, strerror(errno)); + return 0; } @@ -392,9 +428,6 @@ process_trace(struct ovni_trace *trace) * attempt */ return -1; } - - if(msync(stream->buf, stream->size, MS_ASYNC)) - die("msync failed: %s\n", strerror(errno)); } else {