Parse metadata from proc and loom directly

All the metadata keys are known to proc and loom only, making it
self-contained. The metadata.c module is no longer needed.
This commit is contained in:
Rodrigo Arias 2024-09-12 15:54:29 +02:00
parent 24805f607b
commit 1f30e8ef8b
12 changed files with 272 additions and 245 deletions

View File

@ -30,7 +30,6 @@ add_library(emu STATIC
stream.c
trace.c
loom.c
metadata.c
mux.c
sort.c
path.c

View File

@ -70,6 +70,84 @@ loom_init_begin(struct loom *loom, const char *name)
return 0;
}
/* Merges the metadata CPUs with the ones in the loom */
static int
load_cpus(struct loom *loom, JSON_Object *meta)
{
JSON_Array *cpuarray = json_object_dotget_array(meta, "ovni.loom_cpus");
/* It may not have the CPUs defined */
if (cpuarray == NULL)
return 0;
size_t ncpus = json_array_get_count(cpuarray);
if (ncpus == 0) {
err("empty 'cpus' array in metadata");
return -1;
}
for (size_t i = 0; i < ncpus; i++) {
JSON_Object *jcpu = json_array_get_object(cpuarray, i);
if (jcpu == NULL) {
err("json_array_get_object() failed for cpu");
return -1;
}
/* Cast from double */
int index = (int) json_object_get_number(jcpu, "index");
int phyid = (int) json_object_get_number(jcpu, "phyid");
struct cpu *cpu = loom_find_cpu(loom, phyid);
if (cpu) {
/* Ensure they have the same index */
if (cpu->index != index) {
err("mismatch index in existing cpu: %d", index);
return -1;
}
/* Duplicated, ignore */
continue;
}
cpu = calloc(1, sizeof(struct cpu));
if (cpu == NULL) {
err("calloc failed:");
return -1;
}
cpu_init_begin(cpu, index, phyid, 0);
if (loom_add_cpu(loom, cpu) != 0) {
err("loom_add_cpu() failed");
return -1;
}
}
return 0;
}
/** Merges the given metadata with the one stored.
*
* It is an error to provide metadata that doesn't match with the already stored
* in the process.
*
* Precondition: The stream ovni.part must be "thread".
* Precondition: The stream version must be ok.
*/
int
loom_load_metadata(struct loom *loom, struct stream *s)
{
JSON_Object *meta = stream_metadata(s);
if (load_cpus(loom, meta) != 0) {
err("cannot load loom cpus");
return -1;
}
return 0;
}
void
loom_set_gindex(struct loom *loom, int64_t gindex)
{
@ -183,6 +261,44 @@ by_phyid(struct cpu *c1, struct cpu *c2)
return 0;
}
int
loom_set_rank_min(struct loom *loom)
{
if (loom->rank_min != INT_MAX) {
err("rank_min already set");
return -1;
}
/* Ensure that all processes have a rank */
for (struct proc *p = loom->procs; p; p = p->hh.next) {
if (p->rank >= 0) {
loom->rank_enabled = 1;
break;
}
}
if (!loom->rank_enabled) {
dbg("loom %s has no rank information", loom->name);
return 0;
}
/* Ensure that all processes have a rank */
for (struct proc *p = loom->procs; p; p = p->hh.next) {
if (p->rank < 0) {
err("process %s has no rank information", p->id);
return -1;
}
/* Compute rank_min for CPU sorting */
if (p->rank < loom->rank_min)
loom->rank_min = p->rank;
}
dbg("loom %s has rank_min %d", loom->name, loom->rank_min);
return 0;
}
void
loom_sort(struct loom *loom)
{
@ -200,22 +316,22 @@ loom_sort(struct loom *loom)
int
loom_init_end(struct loom *loom)
{
/* Set rank enabled */
for (struct proc *p = loom->procs; p; p = p->hh.next) {
if (p->rank >= 0) {
loom->rank_enabled = 1;
break;
}
/* rank_min must be set */
if (loom->rank_enabled && loom->rank_min == INT_MAX) {
err("rank_min not set");
return -1;
}
/* Ensure that all processes have a rank */
if (loom->rank_enabled) {
for (struct proc *p = loom->procs; p; p = p->hh.next) {
if (p->rank < 0) {
err("process %s has no rank information", p->id);
return -1;
}
}
/* It is not valid to define a loom without CPUs */
if (loom->ncpus == 0) {
err("loom %s has no physical CPUs", loom->name);
return -1;
}
/* Or without processes */
if (loom->nprocs == 0) {
err("loom %s has no processes", loom->name);
return -1;
}
/* Populate cpus_array */
@ -224,6 +340,7 @@ loom_init_end(struct loom *loom)
err("calloc failed:");
return -1;
}
for (struct cpu *c = loom->cpus; c; c = c->hh.next) {
int index = cpu_get_index(c);
if (index < 0 || (size_t) index >= loom->ncpus) {
@ -279,34 +396,6 @@ loom_add_proc(struct loom *loom, struct proc *proc)
return -1;
}
if (!proc->metadata_loaded) {
err("process %d hasn't loaded metadata", pid);
return -1;
}
if (loom->rank_enabled && proc->rank < 0) {
err("missing rank in process %d", pid);
return -1;
}
/* Check previous ranks if any */
if (!loom->rank_enabled && proc->rank >= 0) {
loom->rank_enabled = 1;
for (struct proc *p = loom->procs; p; p = p->hh.next) {
if (p->rank < 0) {
err("missing rank in process %d", p->pid);
return -1;
}
if (p->rank < loom->rank_min)
loom->rank_min = p->rank;
}
}
if (loom->rank_enabled && proc->rank < loom->rank_min)
loom->rank_min = proc->rank;
HASH_ADD_INT(loom->procs, pid, proc);
loom->nprocs++;

View File

@ -53,6 +53,8 @@ struct loom {
USE_RET const char *loom_name(struct stream *s);
USE_RET int loom_init_begin(struct loom *loom, const char *name);
USE_RET int loom_load_metadata(struct loom *loom, struct stream *s);
USE_RET int loom_set_rank_min(struct loom *loom);
USE_RET int loom_init_end(struct loom *loom);
USE_RET int loom_add_cpu(struct loom *loom, struct cpu *cpu);
USE_RET int64_t loom_get_gindex(struct loom *loom);

View File

@ -1,116 +0,0 @@
/* Copyright (c) 2021-2024 Barcelona Supercomputing Center (BSC)
* SPDX-License-Identifier: GPL-3.0-or-later */
#include "metadata.h"
#include <stdlib.h>
#include <string.h>
#include "cpu.h"
#include "loom.h"
#include "ovni.h"
#include "parson.h"
#include "proc.h"
#include "stream.h"
#include "thread.h"
static int
check_version(JSON_Object *meta)
{
JSON_Value *version_val = json_object_get_value(meta, "version");
if (version_val == NULL) {
err("missing attribute \"version\"");
return -1;
}
int version = (int) json_number(version_val);
if (version != OVNI_METADATA_VERSION) {
err("metadata version mismatch %d (expected %d)",
version, OVNI_METADATA_VERSION);
return -1;
}
return 0;
}
static int
has_cpus(JSON_Object *meta)
{
/* Only check for the "ovni.loom_cpus" key, if it has zero
* elements is an error that will be reported later */
if (json_object_dotget_array(meta, "ovni.loom_cpus") != NULL)
return 1;
return 0;
}
static int
load_cpus(struct loom *loom, JSON_Object *meta)
{
JSON_Array *cpuarray = json_object_dotget_array(meta, "ovni.loom_cpus");
if (cpuarray == NULL) {
err("cannot find 'cpus' array");
return -1;
}
size_t ncpus = json_array_get_count(cpuarray);
if (ncpus == 0) {
err("empty 'cpus' array in metadata");
return -1;
}
if (loom->ncpus > 0) {
err("loom %s already has cpus", loom->id);
return -1;
}
for (size_t i = 0; i < ncpus; i++) {
JSON_Object *jcpu = json_array_get_object(cpuarray, i);
if (jcpu == NULL) {
err("json_array_get_object() failed for cpu");
return -1;
}
/* Cast from double */
int index = (int) json_object_get_number(jcpu, "index");
int phyid = (int) json_object_get_number(jcpu, "phyid");
struct cpu *cpu = calloc(1, sizeof(struct cpu));
if (cpu == NULL) {
err("calloc failed:");
return -1;
}
cpu_init_begin(cpu, index, phyid, 0);
if (loom_add_cpu(loom, cpu) != 0) {
err("loom_add_cpu() failed");
return -1;
}
}
return 0;
}
int
metadata_load_proc(struct stream *s, struct loom *loom, struct proc *proc)
{
JSON_Object *meta = stream_metadata(s);
if (check_version(meta) != 0) {
err("version check failed");
return -1;
}
/* The appid is populated from the metadata */
if (proc_load_metadata(proc, meta) != 0) {
err("cannot load process attributes");
return -1;
}
if (has_cpus(meta) && load_cpus(loom, meta) != 0) {
err("cannot load loom cpus");
return -1;
}
return 0;
}

View File

@ -1,14 +0,0 @@
/* Copyright (c) 2021-2024 Barcelona Supercomputing Center (BSC)
* SPDX-License-Identifier: GPL-3.0-or-later */
#ifndef METADATA_H
#define METADATA_H
#include "common.h"
struct stream;
struct loom;
struct proc;
USE_RET int metadata_load_proc(struct stream *s, struct loom *loom, struct proc *proc);
#endif /* METADATA_H */

View File

@ -32,6 +32,9 @@ proc_init_begin(struct proc *proc, int pid)
memset(proc, 0, sizeof(struct proc));
proc->gindex = -1;
proc->appid = 0;
proc->rank = -1;
proc->nranks = 0;
proc->pid = pid;
if (snprintf(proc->id, PATH_MAX, "proc.%d", pid) >= PATH_MAX) {
@ -56,38 +59,105 @@ proc_set_loom(struct proc *proc, struct loom *loom)
proc->loom = loom;
}
int
proc_load_metadata(struct proc *proc, JSON_Object *meta)
static int
load_appid(struct proc *proc, struct stream *s)
{
if (proc->metadata_loaded) {
err("process %s already loaded metadata", proc->id);
return -1;
}
JSON_Value *version_val = json_object_get_value(meta, "version");
if (version_val == NULL) {
err("missing attribute 'version' in metadata");
return -1;
}
proc->metadata_version = (int) json_number(version_val);
JSON_Object *meta = stream_metadata(s);
JSON_Value *appid_val = json_object_dotget_value(meta, "ovni.app_id");
if (appid_val == NULL) {
err("missing attribute 'ovni.app_id' in metadata");
/* May not be present in all thread streams */
if (appid_val == NULL)
return 0;
int appid = (int) json_number(appid_val);
if (proc->appid && proc->appid != appid) {
err("mismatch previous appid %d with stream: %s",
proc->appid, s->relpath);
return -1;
}
proc->appid = (int) json_number(appid_val);
if (appid <= 0) {
err("appid must be >0, stream: %s", s->relpath);
return -1;
}
proc->appid = appid;
return 0;
}
static int
load_rank(struct proc *proc, struct stream *s)
{
JSON_Object *meta = stream_metadata(s);
JSON_Value *rank_val = json_object_dotget_value(meta, "ovni.rank");
if (rank_val != NULL)
proc->rank = (int) json_number(rank_val);
else
proc->rank = -1;
/* Optional */
if (rank_val == NULL) {
dbg("process %s has no rank", proc->id);
return 0;
}
proc->metadata_loaded = 1;
int rank = (int) json_number(rank_val);
if (rank < 0) {
err("rank %d must be >=0, stream: %s", rank, s->relpath);
return -1;
}
if (proc->rank >= 0 && proc->rank != rank) {
err("mismatch previous rank %d with stream: %s",
proc->rank, s->relpath);
return -1;
}
/* Same with nranks, but it is not optional now */
JSON_Value *nranks_val = json_object_dotget_value(meta, "ovni.nranks");
if (nranks_val == NULL) {
err("missing ovni.nranks attribute: %s", s->relpath);
return -1;
}
int nranks = (int) json_number(nranks_val);
if (nranks <= 0) {
err("nranks %d must be >0, stream: %s", nranks, s->relpath);
return -1;
}
if (proc->nranks > 0 && proc->nranks != nranks) {
err("mismatch previous nranks %d with stream: %s",
proc->nranks, s->relpath);
return -1;
}
/* Ensure rank fits in nranks */
if (rank >= nranks) {
err("rank %d must be lower than nranks %d: %s",
rank, nranks, s->relpath);
return -1;
}
dbg("process %s rank=%d nranks=%d",
proc->id, rank, nranks);
proc->rank = rank;
proc->nranks = nranks;
return 0;
}
/** Merges the metadata from the stream in the process. */
int
proc_load_metadata(struct proc *proc, struct stream *s)
{
if (load_appid(proc, s) != 0) {
err("load_appid failed for stream: %s", s->relpath);
return -1;
}
if (load_rank(proc, s) != 0) {
err("load_rank failed for stream: %s", s->relpath);
return -1;
}
return 0;
}
@ -151,8 +221,8 @@ proc_init_end(struct proc *proc)
return -1;
}
if (!proc->metadata_loaded) {
err("metadata not loaded");
if (proc->appid <= 0) {
err("appid not set");
return -1;
}

View File

@ -8,7 +8,6 @@
#include <stdint.h>
#include "common.h"
#include "extend.h"
#include "parson.h"
#include "uthash.h"
struct loom;
struct stream;
@ -19,12 +18,11 @@ struct proc {
char id[PATH_MAX];
int is_init;
int metadata_loaded;
int metadata_version;
int pid;
int index;
int appid;
int rank;
int nranks;
int nthreads;
struct thread *threads;
@ -53,7 +51,7 @@ USE_RET int proc_get_pid(struct proc *proc);
void proc_set_gindex(struct proc *proc, int64_t gindex);
void proc_set_loom(struct proc *proc, struct loom *loom);
void proc_sort(struct proc *proc);
USE_RET int proc_load_metadata(struct proc *proc, JSON_Object *meta);
USE_RET int proc_load_metadata(struct proc *proc, struct stream *s);
USE_RET struct thread *proc_find_thread(struct proc *proc, int tid);
USE_RET int proc_add_thread(struct proc *proc, struct thread *thread);
void proc_sort(struct proc *proc);

View File

@ -148,7 +148,6 @@ load_json(const char *path)
return NULL;
}
/* TODO: Check version */
if (check_version(meta) != 0) {
err("check_version failed");
return NULL;

View File

@ -11,7 +11,6 @@
#include "cpu.h"
#include "emu_args.h"
#include "loom.h"
#include "metadata.h"
#include "proc.h"
#include "pv/prf.h"
#include "pv/pvt.h"
@ -73,30 +72,30 @@ create_proc(struct loom *loom, struct stream *s)
}
struct proc *proc = loom_find_proc(loom, pid);
if (proc != NULL)
return proc;
proc = malloc(sizeof(struct proc));
if (proc == NULL) {
err("malloc failed:");
return NULL;
/* Create a new process */
proc = malloc(sizeof(struct proc));
if (proc == NULL) {
err("malloc failed:");
return NULL;
}
if (proc_init_begin(proc, pid) != 0) {
err("proc_init_begin failed: %s", s->relpath);
return NULL;
}
if (loom_add_proc(loom, proc) != 0) {
err("loom_add_proc failed");
return NULL;
}
}
if (proc_init_begin(proc, pid) != 0) {
err("proc_init_begin failed: %s", s->relpath);
return NULL;
}
/* Load metadata too */
if (metadata_load_proc(s, loom, proc) != 0) {
err("cannot load metadata from %s", s->relpath);
return NULL;
}
if (loom_add_proc(loom, proc) != 0) {
err("loom_add_proc failed");
/* The appid is populated from the metadata */
if (proc_load_metadata(proc, s) != 0) {
err("proc_load_metadata failed");
return NULL;
}
@ -141,6 +140,11 @@ create_loom(struct system *sys, struct stream *s)
sys->nlooms++;
}
if (loom_load_metadata(loom, s) != 0) {
err("loom_load_metadata failed for stream: %s", s->relpath);
return NULL;
}
return loom;
}
@ -216,9 +220,8 @@ is_thread_stream(struct stream *s)
return -1;
}
if (strcmp(part_type, "thread") == 0) {
if (strcmp(part_type, "thread") == 0)
return 1;
}
return 0;
}
@ -235,11 +238,11 @@ create_system(struct system *sys, struct trace *trace)
size_t i = 0;
for (struct stream *s = trace->streams; s ; s = s->next) {
int m = is_thread_stream(s);
if (m < 0) {
int ok = is_thread_stream(s);
if (ok < 0) {
err("is_thread_stream failed");
return -1;
} else if (m == 0) {
} else if (ok == 0) {
warn("ignoring unknown stream %s", s->relpath);
continue;
}
@ -250,7 +253,6 @@ create_system(struct system *sys, struct trace *trace)
return -1;
}
/* Loads metadata too */
struct proc *proc = create_proc(loom, s);
if (proc == NULL) {
err("create_proc failed");
@ -272,14 +274,6 @@ create_system(struct system *sys, struct trace *trace)
stream_data_set(s, lpt);
}
/* Ensure all looms have at least one CPU */
for (struct loom *l = sys->looms; l; l = l->next) {
if (l->ncpus == 0) {
err("loom %s has no physical CPUs", l->id);
return -1;
}
}
return 0;
}
@ -557,6 +551,12 @@ set_sort_criteria(struct system *sys)
int some_have = 0;
int all_have = 1;
for (struct loom *l = sys->looms; l; l = l->next) {
/* Set the rank_min for later sorting */
if (loom_set_rank_min(l) != 0) {
err("loom_set_rank_min failed");
return -1;
}
if (l->rank_enabled)
some_have = 1;
else

View File

@ -102,7 +102,10 @@ instr_start(int rank, int nranks)
ovni_version_check();
ovni_proc_init(1, rankname, getpid());
ovni_proc_set_rank(rank, nranks);
if (nranks > 0)
ovni_proc_set_rank(rank, nranks);
ovni_thread_init(get_tid());
/* All ranks inform CPUs */

View File

@ -30,11 +30,9 @@ test_oversubscription(void)
if (proc_init_begin(&proc, 1) != 0)
die("proc_init_begin failed");
proc_set_gindex(&proc, 0);
proc.appid = 1;
/* FIXME: We shouldn't need to recreate a full process to test the CPU
* affinity rules */
proc.metadata_loaded = 1;
proc_set_gindex(&proc, 0);
struct thread th0, th1;

View File

@ -65,7 +65,6 @@ test_duplicate_procs(struct loom *loom)
struct proc proc;
OK(loom_init_begin(loom, testloom));
OK(proc_init_begin(&proc, testproc));
proc.metadata_loaded = 1;
OK(loom_add_proc(loom, &proc));
ERR(loom_add_proc(loom, &proc));