From 27a23f25ca55640500b433a9569734c299b7540e Mon Sep 17 00:00:00 2001 From: Rodrigo Arias Mallo Date: Mon, 23 Oct 2023 15:24:10 +0200 Subject: [PATCH] Update task model to use bodies The task model is now integrated with the body model. A normal task can only have one body, while a parallel task can have more. It inherits the restriction that a task body cannot be nested over another one unless it is paused (or the relaxed nest model is enabled). --- CHANGELOG.md | 6 ++ src/emu/task.c | 216 ++++++++++++++++++--------------------- src/emu/task.h | 43 ++++---- test/unit/CMakeLists.txt | 1 + test/unit/task.c | 62 +++++++++++ 5 files changed, 191 insertions(+), 137 deletions(-) create mode 100644 test/unit/task.c diff --git a/CHANGELOG.md b/CHANGELOG.md index 71136f9..456528b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Changed + +- The task model now requires the previous task body to be paused before nesting + another one. A compatibility flag is still available to still allow the old + behavior. + ## [1.7.0] - 2024-03-12 ### Added diff --git a/src/emu/task.c b/src/emu/task.c index 9cbc297..3291941 100644 --- a/src/emu/task.c +++ b/src/emu/task.c @@ -9,6 +9,12 @@ #include "pv/pcf.h" #include "utlist.h" +uint32_t +task_get_id(struct task *task) +{ + return task->id; +} + struct task * task_find(struct task *tasks, uint32_t task_id) { @@ -28,7 +34,7 @@ task_type_find(struct task_type *types, uint32_t type_id) } int -task_create(struct task_info *info, uint32_t type_id, uint32_t task_id) +task_create(struct task_info *info, uint32_t type_id, uint32_t task_id, uint32_t flags) { /* Ensure the task id is new */ if (task_find(info->tasks, task_id) != NULL) { @@ -52,8 +58,7 @@ task_create(struct task_info *info, uint32_t type_id, uint32_t task_id) task->id = task_id; task->type = type; - task->state = TASK_ST_CREATED; - task->thread = NULL; + task->flags = flags; /* Add the new task to the hash table */ HASH_ADD_INT(info->tasks, id, task); @@ -62,175 +67,152 @@ task_create(struct task_info *info, uint32_t type_id, uint32_t task_id) return 0; } +static struct body * +create_body(struct task *task, uint32_t body_id) +{ + if (!task_is_parallel(task) && task->nbodies > 0) { + err("cannot create more than one body for non-parallel task %u", + task->id); + return NULL; + } + + int flags = 0; + + if (task->flags & TASK_FLAG_RELAX_NESTING) + flags |= BODY_FLAG_RELAX_NESTING; + + if (task->flags & TASK_FLAG_RESURRECT) + flags |= BODY_FLAG_RESURRECT; + + if (task->flags & TASK_FLAG_PAUSE) + flags |= BODY_FLAG_PAUSE; + + struct body *body = body_create(&task->body_info, task, body_id, flags); + + if (body == NULL) { + err("body_create failed for task %u", task->id); + return NULL; + } + + task->nbodies++; + + return body; +} + int -task_execute(struct task_stack *stack, struct task *task) +task_execute(struct task_stack *stack, struct task *task, uint32_t body_id) { if (task == NULL) { err("task is NULL"); return -1; } - /* FIXME: To support the taskiter we need to transition from Dead to - * Running again. For now we allow the transition until we have a proper - * task state event. */ - if (task->state == TASK_ST_DEAD) { - task->thread = NULL; - } else if (task->state != TASK_ST_CREATED) { - err("cannot execute task %u: state is not created or dead", task->id); + struct body *body = body_find(&task->body_info, body_id); + + /* Create new body if it doesn't exist. */ + if (body == NULL) { + body = create_body(task, body_id); + + if (body == NULL) { + err("create_body failed"); + return -1; + } + } + + if (body_execute(&stack->body_stack, body) != 0) { + err("body_execute failed for task %u", task->id); return -1; } - if (task->thread != NULL) { - err("task already has a thread assigned"); - return -1; - } + dbg("body %u of task %u executes", body_id, task->id); - if (stack->thread->state != TH_ST_RUNNING) { - err("thread state is not running"); - return -1; - } - - if (stack->top == task) { - err("thread already has assigned task %u", task->id); - return -1; - } - - if (stack->top && stack->top->state != TASK_ST_RUNNING) { - err("cannot execute a nested task from a non-running task"); - return -1; - } - - task->state = TASK_ST_RUNNING; - task->thread = stack->thread; - - DL_PREPEND(stack->tasks, task); - - dbg("task id=%u runs now", task->id); return 0; } int -task_pause(struct task_stack *stack, struct task *task) +task_pause(struct task_stack *stack, struct task *task, uint32_t body_id) { if (task == NULL) { - err("cannot pause: task is NULL"); + err("task is NULL"); return -1; } - if (task->state != TASK_ST_RUNNING) { - err("cannot pause: task state is not running"); + struct body *body = body_find(&task->body_info, body_id); + + if (body == NULL) { + err("cannot find body with id %u in task %u", + body_id, task->id); return -1; } - if (task->thread == NULL) { - err("cannot pause: task has no thread assigned"); + if (body_pause(&stack->body_stack, body) != 0) { + err("body_pause failed for task %u", task->id); return -1; } - if (stack->thread->state != TH_ST_RUNNING) { - err("cannot pause: thread state is not running"); - return -1; - } + dbg("body %u of task %u pauses", body_id, task->id); - if (stack->top != task) { - err("thread has assigned a different task"); - return -1; - } - - if (stack->thread != task->thread) { - err("task is assigned to a different thread"); - return -1; - } - - task->state = TASK_ST_PAUSED; - - dbg("task id=%d pauses", task->id); return 0; } int -task_resume(struct task_stack *stack, struct task *task) +task_resume(struct task_stack *stack, struct task *task, uint32_t body_id) { if (task == NULL) { - err("cannot resume: task is NULL"); + err("task is NULL"); return -1; } - if (task->state != TASK_ST_PAUSED) { - err("task state is not paused"); + struct body *body = body_find(&task->body_info, body_id); + + if (body == NULL) { + err("cannot find body with id %u in task %u", + body_id, task->id); return -1; } - if (task->thread == NULL) { - err("cannot resume: task has no thread assigned"); + if (body_resume(&stack->body_stack, body) != 0) { + err("body_resume failed for task %u", task->id); return -1; } - if (stack->thread->state != TH_ST_RUNNING) { - err("thread is not running"); - return -1; - } + dbg("body %u of task %u resumes", body_id, task->id); - if (stack->top != task) { - err("thread has assigned a different task"); - return -1; - } - - if (stack->thread != task->thread) { - err("task is assigned to a different thread"); - return -1; - } - - task->state = TASK_ST_RUNNING; - - dbg("task id=%d resumes", task->id); return 0; } int -task_end(struct task_stack *stack, struct task *task) +task_end(struct task_stack *stack, struct task *task, uint32_t body_id) { if (task == NULL) { - err("cannot end: task is NULL"); + err("task is NULL"); return -1; } - if (task->state != TASK_ST_RUNNING) { - err("task state is not running"); + struct body *body = body_find(&task->body_info, body_id); + + if (body == NULL) { + err("cannot find body with id %u in task %u", + body_id, task->id); return -1; } - if (task->thread == NULL) { - err("cannot end: task has no thread assigned"); + if (body_end(&stack->body_stack, body) != 0) { + err("body_end failed for task %u", task->id); return -1; } - if (stack->thread->state != TH_ST_RUNNING) { - err("cannot end task: thread is not running"); - return -1; - } + dbg("body %u of task %u ends", body_id, task->id); - if (stack->top != task) { - err("thread has assigned a different task"); - return -1; - } - - if (stack->thread != task->thread) { - err("task is assigned to a different thread"); - return -1; - } - - task->state = TASK_ST_DEAD; - - /* Don't unset the thread from the task, as it will be used - * later to ensure we switch to tasks of the same thread. */ - - DL_DELETE(stack->tasks, task); - - dbg("task id=%d ends", task->id); return 0; } +int +task_is_parallel(struct task *task) +{ + return (task->flags & TASK_FLAG_PARALLEL); +} + uint32_t task_get_type_gid(const char *label) { @@ -319,12 +301,14 @@ task_create_pcf_types(struct pcf_type *pcftype, struct task_type *types) return ret; } -struct task * +struct body * task_get_running(struct task_stack *stack) { - struct task *task = stack->top; - if (task && task->state == TASK_ST_RUNNING) - return task; - - return NULL; + return body_get_running(&stack->body_stack); +} + +struct body * +task_get_top(struct task_stack *stack) +{ + return body_get_top(&stack->body_stack); } diff --git a/src/emu/task.h b/src/emu/task.h index 54c90f6..e8aa994 100644 --- a/src/emu/task.h +++ b/src/emu/task.h @@ -8,12 +8,13 @@ #include "common.h" #include "pv/pcf.h" #include "uthash.h" +#include "body.h" -enum task_state { - TASK_ST_CREATED, - TASK_ST_RUNNING, - TASK_ST_PAUSED, - TASK_ST_DEAD, +enum task_flags { + TASK_FLAG_PARALLEL = (1 << 0), /* Can have multiple bodies */ + TASK_FLAG_RESURRECT = (1 << 1), /* Can run again after dead */ + TASK_FLAG_PAUSE = (1 << 2), /* Can pause the bodies */ + TASK_FLAG_RELAX_NESTING = (1 << 3), /* Can nest a over a running task */ }; struct task_type { @@ -26,12 +27,11 @@ struct task_type { struct task { uint32_t id; struct task_type *type; + long nbodies; + struct body_info body_info; + uint32_t flags; - /* TODO: Use a pointer to task_stack instead of thread */ - /* The thread that has began to execute the task. It cannot - * changed after being set, even if the task ends. */ - struct thread *thread; - enum task_state state; + /* Hash map in task_info */ UT_hash_handle hh; /* List handle for nested task support */ @@ -46,23 +46,24 @@ struct task_info { }; struct task_stack { - union { - struct task *top; /* Synctactic sugar */ - struct task *tasks; - }; - struct thread *thread; + struct body_stack body_stack; }; +USE_RET uint32_t task_get_id(struct task *task); USE_RET struct task *task_find(struct task *tasks, uint32_t task_id); -USE_RET int task_create(struct task_info *info, uint32_t type_id, uint32_t task_id); -USE_RET int task_execute(struct task_stack *stack, struct task *task); -USE_RET int task_pause(struct task_stack *stack, struct task *task); -USE_RET int task_resume(struct task_stack *stack, struct task *task); -USE_RET int task_end(struct task_stack *stack, struct task *task); +USE_RET int task_create(struct task_info *info, uint32_t type_id, uint32_t task_id, uint32_t flags); + +USE_RET int task_execute(struct task_stack *stack, struct task *task, uint32_t body_id); +USE_RET int task_pause(struct task_stack *stack, struct task *task, uint32_t body_id); +USE_RET int task_resume(struct task_stack *stack, struct task *task, uint32_t body_id); +USE_RET int task_end(struct task_stack *stack, struct task *task, uint32_t body_id); + USE_RET struct task_type *task_type_find(struct task_type *types, uint32_t type_id); USE_RET int task_type_create(struct task_info *info, uint32_t type_id, const char *label); USE_RET uint32_t task_get_type_gid(const char *label); USE_RET int task_create_pcf_types(struct pcf_type *pcftype, struct task_type *types); -USE_RET struct task *task_get_running(struct task_stack *stack); +USE_RET struct body *task_get_running(struct task_stack *stack); +USE_RET struct body *task_get_top(struct task_stack *stack); +USE_RET int task_is_parallel(struct task *task); #endif /* TASK_H */ diff --git a/test/unit/CMakeLists.txt b/test/unit/CMakeLists.txt index 4d8120c..8bbe3f2 100644 --- a/test/unit/CMakeLists.txt +++ b/test/unit/CMakeLists.txt @@ -16,6 +16,7 @@ unit_test(loom.c) unit_test(mux.c) unit_test(prv.c) unit_test(stream.c) +unit_test(task.c) unit_test(thread.c) unit_test(value.c) unit_test(version.c) diff --git a/test/unit/task.c b/test/unit/task.c new file mode 100644 index 0000000..b1ab71f --- /dev/null +++ b/test/unit/task.c @@ -0,0 +1,62 @@ +/* Copyright (c) 2023 Barcelona Supercomputing Center (BSC) + * SPDX-License-Identifier: GPL-3.0-or-later */ + +#include "common.h" +#include "emu/task.h" +#include "unittest.h" + +static void +test_parallel(void) +{ + struct task_info info; + struct task_stack stack; + struct task_stack stack2; + + memset(&info, 0, sizeof(info)); + memset(&stack, 0, sizeof(stack)); + memset(&stack2, 0, sizeof(stack2)); + + uint32_t type_id = 123; + uint32_t task_id = 456; + + OK(task_type_create(&info, type_id, "parallel_task")); + OK(task_create(&info, type_id, task_id, TASK_FLAG_PARALLEL)); + + struct task *task = task_find(info.tasks, task_id); + + if (task == NULL) + err("task_find failed"); + + /* Attempt to run body with zero id */ + ERR(task_execute(&stack, task, 0)); + + OK(task_execute(&stack, task, 1)); + OK(task_execute(&stack2, task, 2)); + + /* Attempt to run a body which is already running */ + ERR(task_execute(&stack2, task, 1)); + + /* Finish a non existant body */ + ERR(task_end(&stack, task, 42)); + + /* Attempt to pause top without pause flag */ + ERR(task_pause(&stack, task, 1)); + + /* Attempt to resume while running */ + ERR(task_resume(&stack, task, 1)); + + OK(task_end(&stack, task, 1)); + OK(task_end(&stack2, task, 2)); + + /* Try to run one body again without resurrect flag */ + ERR(task_execute(&stack, task, 1)); + + err("ok"); +} + +int main(void) +{ + test_parallel(); + + return 0; +}