Created
December 28, 2022 03:11
-
-
Save Wybxc/68e7a8784a59471c413b6c7ca723b360 to your computer and use it in GitHub Desktop.
(WIP) an async/await library in C
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #include "aio.h" | |
| #include "queue.h" | |
| #include "uthash/include/uthash.h" | |
| #include <stdio.h> | |
| #include <sys/epoll.h> | |
| struct fd_to_promise { | |
| int fd; | |
| promise_t *promise; | |
| UT_hash_handle hh; | |
| }; | |
| static struct fd_to_promise *fd_to_promise_map = NULL; | |
| struct queue *ready_queue = NULL; | |
| int epoll_fd = -1; | |
| #define EPOLL_MAX_EVENTS 16 | |
| void aio_init() { | |
| ready_queue = queue_create(); | |
| epoll_fd = epoll_create(EPOLL_MAX_EVENTS); | |
| } | |
| void aio_event_loop() { | |
| if (!ready_queue) { | |
| perror("Event loop not initialized"); | |
| exit(1); | |
| } | |
| while (1) { | |
| // Deal with ready tasks | |
| while (!queue_empty(ready_queue)) { | |
| task_t *task = queue_pop_front(ready_queue); | |
| task->fn(task->ctx); | |
| aio_free(task); | |
| } | |
| // Wait for I/O | |
| struct epoll_event events[EPOLL_MAX_EVENTS]; | |
| int n = epoll_wait(epoll_fd, events, EPOLL_MAX_EVENTS, -1); | |
| if (n < 0) { | |
| perror("epoll_wait"); | |
| exit(1); | |
| } | |
| // Deal with I/O events | |
| for (int i = 0; i < n; i++) { | |
| struct fd_to_promise *fd_to_promise; | |
| HASH_FIND_INT(fd_to_promise_map, &events[i].data.fd, fd_to_promise); | |
| if (fd_to_promise) { | |
| aio_resolve(fd_to_promise->promise, events[i].events); | |
| HASH_DEL(fd_to_promise_map, fd_to_promise); | |
| aio_free(fd_to_promise); | |
| } | |
| } | |
| } | |
| } | |
| task_t *aio_continuation(task_fn fn, context_t *ctx, void *closure, | |
| size_t closure_size) { | |
| // Create a new task | |
| task_t *task = aio_malloc(sizeof(task_t)); | |
| task->fn = fn; | |
| task->ctx = ctx; | |
| // Set the continuation | |
| ++ctx->cont; | |
| // Copy the closure | |
| if (closure_size > 0) { | |
| if (ctx->closure) | |
| aio_free(ctx->closure); | |
| ctx->closure = aio_malloc(closure_size); | |
| memcpy(ctx->closure, closure, closure_size); | |
| } else { | |
| ctx->closure = NULL; | |
| } | |
| return task; | |
| } | |
| context_t *aio_create_context(void *closure, size_t closure_size) { | |
| // Create a new promise | |
| promise_t *promise = aio_malloc(sizeof(promise_t)); | |
| promise->state = P_PENDING; | |
| promise->value = 0; | |
| promise->then = NULL; | |
| // Create a new context associated with the promise | |
| context_t *context = aio_malloc(sizeof(context_t)); | |
| context->promise = promise; | |
| context->cont = 0; | |
| // Copy the closure | |
| if (closure_size > 0) { | |
| context->closure = aio_malloc(closure_size); | |
| memcpy(context->closure, closure, closure_size); | |
| } else { | |
| context->closure = NULL; | |
| } | |
| return context; | |
| } | |
| void aio_free_context(context_t *ctx) { aio_free(ctx); } | |
| void aio_resolve(promise_t *promise, int value) { | |
| // Check if the promise is resolvable | |
| if (promise->state == P_RESOLVED) { | |
| perror("Promise already resolved"); | |
| exit(1); | |
| } | |
| // Resolve the promise | |
| promise->state = P_RESOLVED; | |
| promise->value = value; | |
| // If the promise has a continuation, add it to the ready queue | |
| if (promise->then) { | |
| promise->then->ctx->ret = value; | |
| queue_push_back(ready_queue, promise->then); | |
| aio_free(promise); | |
| } | |
| } | |
| void aio_await(promise_t *promise, task_t *cont) { | |
| if (cont) { | |
| if (promise->then) { | |
| perror("Promise already has a continuation"); | |
| exit(1); | |
| } else | |
| promise->then = cont; | |
| } | |
| // If the promise is resolved, add its continuation to the ready queue | |
| if (promise->state == P_RESOLVED && promise->then) { | |
| promise->then->ctx->ret = promise->value; | |
| queue_push_back(ready_queue, promise->then); | |
| aio_free(promise); | |
| } | |
| } | |
| promise_t *aio_waitfd(int fd, int events) { | |
| // Create a new promise | |
| promise_t *promise = aio_malloc(sizeof(promise_t)); | |
| promise->then = NULL; | |
| // Create a new epoll event | |
| struct epoll_event event; | |
| event.events = events; | |
| event.data.fd = fd; | |
| // Add the event to epoll | |
| if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &event) < 0) { | |
| perror("epoll_ctl"); | |
| return NULL; | |
| } | |
| // Add the fd to the map | |
| struct fd_to_promise *fd_to_promise = | |
| aio_malloc(sizeof(struct fd_to_promise)); | |
| fd_to_promise->fd = fd; | |
| fd_to_promise->promise = promise; | |
| HASH_ADD_INT(fd_to_promise_map, fd, fd_to_promise); | |
| return promise; | |
| } | |
| void aio_create_task(task_fn fn, context_t *ctx) { | |
| task_t *task = aio_malloc(sizeof(task_t)); | |
| task->fn = fn; | |
| task->ctx = ctx; | |
| queue_push_back(ready_queue, task); | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /* | |
| * AIO - Asynchronous IO | |
| */ | |
| #ifndef AIO_H | |
| #define AIO_H | |
| #include <stdlib.h> | |
| #define async promise_t * | |
| #ifndef aio_malloc | |
| #define aio_malloc malloc | |
| #endif | |
| #ifndef aio_free | |
| #define aio_free free | |
| #endif | |
| enum p_state { | |
| P_PENDING, | |
| P_RESOLVED, | |
| }; | |
| /// A promise is a value that may not be available yet. | |
| /// | |
| /// Lifespan: | |
| /// Create when a new task starts. Free when resolved/rejected. | |
| typedef struct promise { | |
| enum p_state state; | |
| int value; | |
| struct task *then; // may be NULL | |
| } promise_t; | |
| /// Task context. | |
| /// | |
| /// Lifespan: | |
| /// Create when a new task starts. Free when task finishes. | |
| typedef struct context { | |
| promise_t *promise; | |
| int cont; | |
| int ret; // return value of the leading promise | |
| void *closure; // task-specific data, allocated by | |
| // `aio_continuation`/`aio_create_context`, freed by | |
| // callee/continuation | |
| } context_t; | |
| /// Task function. | |
| /// Parameters are passed in `ctx->closure`. | |
| /// Returns a promise that will be resolved when task finishes, | |
| /// or NULL if task finishes synchronously. | |
| typedef async (*task_fn)(context_t *ctx); | |
| /// A task is a function that can be run asynchronously. | |
| /// | |
| /// Lifespan: | |
| /// Create in `aio_continuation`. Free when finished by event loop. | |
| typedef struct task { | |
| task_fn fn; | |
| context_t *ctx; | |
| } task_t; | |
| #define __ACPSX(n) \ | |
| case n: \ | |
| goto cont##n; | |
| #define AIO_CPS_1 \ | |
| switch (ctx->cont) { __ACPSX(1) } | |
| #define AIO_CPS_2 \ | |
| switch (ctx->cont) { __ACPSX(1) __ACPSX(2) } | |
| #define AIO_CPS_3 \ | |
| switch (ctx->cont) { __ACPSX(1) __ACPSX(2) __ACPSX(3) } | |
| #define AIO_CPS_4 \ | |
| switch (ctx->cont) { __ACPSX(1) __ACPSX(2) __ACPSX(3) __ACPSX(4) } | |
| /// Initialize AIO. | |
| void aio_init(); | |
| /// Start event loop. | |
| void aio_event_loop(); | |
| /// Get continuation of current task. | |
| task_t *aio_continuation(task_fn fn, context_t *ctx, void *closure, | |
| size_t closure_size); | |
| /// Prepare a context for async call. | |
| context_t *aio_create_context(void *closure, size_t closure_size); | |
| /// Free a context. | |
| void aio_free_context(context_t *ctx); | |
| /// Resolve a promise. | |
| void aio_resolve(promise_t *promise, int value); | |
| void aio_await(promise_t *promise, task_t *cont); | |
| /// Wait for a file descriptor. | |
| promise_t *aio_waitfd(int fd, int events); | |
| void aio_create_task(task_fn fn, context_t *ctx); | |
| #define aio_extract_closure(ctx, args) \ | |
| do { \ | |
| typeof(args) *closure = ctx->closure; \ | |
| args = *closure; \ | |
| aio_free(closure); \ | |
| ctx->closure = NULL; \ | |
| } while (0) | |
| #define aio_run(callee, arg_type, ...) \ | |
| (callee(aio_create_context(&(arg_type){__VA_ARGS__}, sizeof(arg_type)))) | |
| #define aio_run0(callee) (callee(aio_create_context(NULL, 0))) | |
| #define aio_cont(caller, ctx, closure_type, ...) \ | |
| (aio_continuation(caller, ctx, &(closure_type){__VA_ARGS__}, \ | |
| sizeof(closure_type))) | |
| #define aio_cont0(caller, ctx) (aio_continuation(caller, ctx, NULL, 0)) | |
| #define aio_yield(ctx) \ | |
| do { \ | |
| return ctx->promise; \ | |
| } while (0) | |
| #define aio_return(ctx, value) \ | |
| do { \ | |
| promise_t *promise = ctx->promise; \ | |
| free(ctx); \ | |
| aio_resolve(promise, value); \ | |
| return promise; \ | |
| } while (0) | |
| #define aio_run_task(callee, arg_type, ...) \ | |
| (aio_create_task( \ | |
| callee, aio_create_context(&(arg_type){__VA_ARGS__}, sizeof(arg_type)))) | |
| #define aio_run_task0(callee) \ | |
| (aio_create_task(callee, aio_create_context(NULL, 0))) | |
| #endif |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #include "aio.h" | |
| #include <stdio.h> | |
| struct test_args { | |
| int a, b; | |
| }; | |
| async aio_gcd(context_t *ctx) { | |
| AIO_CPS_1; | |
| struct test_args args; | |
| aio_extract_closure(ctx, args); | |
| if (args.b == 0) { | |
| aio_return(ctx, args.a); | |
| } | |
| promise_t *p = aio_run(aio_gcd, struct test_args, args.b, args.a % args.b); | |
| aio_await(p, aio_cont0(aio_gcd, ctx)); | |
| aio_yield(ctx); | |
| cont1: | |
| aio_return(ctx, ctx->ret); | |
| } | |
| async aio_test(context_t *ctx) { | |
| AIO_CPS_1; | |
| promise_t *p = aio_run(aio_gcd, struct test_args, 10, 3); | |
| aio_await(p, aio_cont0(aio_test, ctx)); | |
| aio_yield(ctx); | |
| cont1: | |
| printf("gcd(10, 3) = %d", ctx->ret); | |
| exit(0); | |
| aio_return(ctx, 0); | |
| } | |
| int main() { | |
| aio_init(); | |
| aio_run_task0(aio_test); | |
| aio_event_loop(); | |
| return 0; | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #ifndef QUEUE_H | |
| #define QUEUE_H | |
| #include <stdlib.h> | |
| struct queue { | |
| void **slots; | |
| int head; | |
| int tail; | |
| int capacity; | |
| }; | |
| #define QUEUE_DEFAULT_CAPACITY 8 | |
| static inline struct queue *queue_create() { | |
| struct queue *q = malloc(sizeof(struct queue)); | |
| q->slots = malloc(sizeof(void *) * QUEUE_DEFAULT_CAPACITY); | |
| q->head = 0; | |
| q->tail = 0; | |
| q->capacity = QUEUE_DEFAULT_CAPACITY; | |
| return q; | |
| } | |
| static inline void queue_free(struct queue *q) { | |
| free(q->slots); | |
| free(q); | |
| } | |
| static inline int queue_empty(struct queue *q) { return q->head == q->tail; } | |
| static inline int queue_size(struct queue *q) { | |
| if (q->tail >= q->head) { | |
| return q->tail - q->head; | |
| } else { | |
| return q->capacity - (q->head - q->tail); | |
| } | |
| } | |
| static inline void queue_resize(struct queue *q, int new_capacity) { | |
| void **new_slots = malloc(sizeof(void *) * new_capacity); | |
| int i; | |
| for (i = 0; i < q->capacity; i++) { | |
| new_slots[i] = q->slots[(q->head + i) % q->capacity]; | |
| } | |
| free(q->slots); | |
| q->slots = new_slots; | |
| q->head = 0; | |
| q->tail = q->capacity; | |
| q->capacity = new_capacity; | |
| } | |
| static inline void queue_push_back(struct queue *q, void *item) { | |
| if (queue_size(q) == q->capacity) { | |
| queue_resize(q, q->capacity * 2); | |
| } | |
| q->slots[q->tail] = item; | |
| q->tail = (q->tail + 1) % q->capacity; | |
| } | |
| static inline void *queue_pop_front(struct queue *q) { | |
| if (queue_empty(q)) { | |
| return NULL; | |
| } | |
| void *item = q->slots[q->head]; | |
| q->head = (q->head + 1) % q->capacity; | |
| return item; | |
| } | |
| #endif |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment