Skip to content

Instantly share code, notes, and snippets.

@Wybxc
Created December 28, 2022 03:11
Show Gist options
  • Select an option

  • Save Wybxc/68e7a8784a59471c413b6c7ca723b360 to your computer and use it in GitHub Desktop.

Select an option

Save Wybxc/68e7a8784a59471c413b6c7ca723b360 to your computer and use it in GitHub Desktop.
(WIP) an async/await library in C
#include "aio.h"
#include "queue.h"
#include "uthash/include/uthash.h"
#include <stdio.h>
#include <sys/epoll.h>
struct fd_to_promise {
int fd;
promise_t *promise;
UT_hash_handle hh;
};
static struct fd_to_promise *fd_to_promise_map = NULL;
struct queue *ready_queue = NULL;
int epoll_fd = -1;
#define EPOLL_MAX_EVENTS 16
void aio_init() {
ready_queue = queue_create();
epoll_fd = epoll_create(EPOLL_MAX_EVENTS);
}
void aio_event_loop() {
if (!ready_queue) {
perror("Event loop not initialized");
exit(1);
}
while (1) {
// Deal with ready tasks
while (!queue_empty(ready_queue)) {
task_t *task = queue_pop_front(ready_queue);
task->fn(task->ctx);
aio_free(task);
}
// Wait for I/O
struct epoll_event events[EPOLL_MAX_EVENTS];
int n = epoll_wait(epoll_fd, events, EPOLL_MAX_EVENTS, -1);
if (n < 0) {
perror("epoll_wait");
exit(1);
}
// Deal with I/O events
for (int i = 0; i < n; i++) {
struct fd_to_promise *fd_to_promise;
HASH_FIND_INT(fd_to_promise_map, &events[i].data.fd, fd_to_promise);
if (fd_to_promise) {
aio_resolve(fd_to_promise->promise, events[i].events);
HASH_DEL(fd_to_promise_map, fd_to_promise);
aio_free(fd_to_promise);
}
}
}
}
task_t *aio_continuation(task_fn fn, context_t *ctx, void *closure,
size_t closure_size) {
// Create a new task
task_t *task = aio_malloc(sizeof(task_t));
task->fn = fn;
task->ctx = ctx;
// Set the continuation
++ctx->cont;
// Copy the closure
if (closure_size > 0) {
if (ctx->closure)
aio_free(ctx->closure);
ctx->closure = aio_malloc(closure_size);
memcpy(ctx->closure, closure, closure_size);
} else {
ctx->closure = NULL;
}
return task;
}
context_t *aio_create_context(void *closure, size_t closure_size) {
// Create a new promise
promise_t *promise = aio_malloc(sizeof(promise_t));
promise->state = P_PENDING;
promise->value = 0;
promise->then = NULL;
// Create a new context associated with the promise
context_t *context = aio_malloc(sizeof(context_t));
context->promise = promise;
context->cont = 0;
// Copy the closure
if (closure_size > 0) {
context->closure = aio_malloc(closure_size);
memcpy(context->closure, closure, closure_size);
} else {
context->closure = NULL;
}
return context;
}
void aio_free_context(context_t *ctx) { aio_free(ctx); }
void aio_resolve(promise_t *promise, int value) {
// Check if the promise is resolvable
if (promise->state == P_RESOLVED) {
perror("Promise already resolved");
exit(1);
}
// Resolve the promise
promise->state = P_RESOLVED;
promise->value = value;
// If the promise has a continuation, add it to the ready queue
if (promise->then) {
promise->then->ctx->ret = value;
queue_push_back(ready_queue, promise->then);
aio_free(promise);
}
}
void aio_await(promise_t *promise, task_t *cont) {
if (cont) {
if (promise->then) {
perror("Promise already has a continuation");
exit(1);
} else
promise->then = cont;
}
// If the promise is resolved, add its continuation to the ready queue
if (promise->state == P_RESOLVED && promise->then) {
promise->then->ctx->ret = promise->value;
queue_push_back(ready_queue, promise->then);
aio_free(promise);
}
}
promise_t *aio_waitfd(int fd, int events) {
// Create a new promise
promise_t *promise = aio_malloc(sizeof(promise_t));
promise->then = NULL;
// Create a new epoll event
struct epoll_event event;
event.events = events;
event.data.fd = fd;
// Add the event to epoll
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &event) < 0) {
perror("epoll_ctl");
return NULL;
}
// Add the fd to the map
struct fd_to_promise *fd_to_promise =
aio_malloc(sizeof(struct fd_to_promise));
fd_to_promise->fd = fd;
fd_to_promise->promise = promise;
HASH_ADD_INT(fd_to_promise_map, fd, fd_to_promise);
return promise;
}
void aio_create_task(task_fn fn, context_t *ctx) {
task_t *task = aio_malloc(sizeof(task_t));
task->fn = fn;
task->ctx = ctx;
queue_push_back(ready_queue, task);
}
/*
* AIO - Asynchronous IO
*/
#ifndef AIO_H
#define AIO_H
#include <stdlib.h>
#define async promise_t *
#ifndef aio_malloc
#define aio_malloc malloc
#endif
#ifndef aio_free
#define aio_free free
#endif
enum p_state {
P_PENDING,
P_RESOLVED,
};
/// A promise is a value that may not be available yet.
///
/// Lifespan:
/// Create when a new task starts. Free when resolved/rejected.
typedef struct promise {
enum p_state state;
int value;
struct task *then; // may be NULL
} promise_t;
/// Task context.
///
/// Lifespan:
/// Create when a new task starts. Free when task finishes.
typedef struct context {
promise_t *promise;
int cont;
int ret; // return value of the leading promise
void *closure; // task-specific data, allocated by
// `aio_continuation`/`aio_create_context`, freed by
// callee/continuation
} context_t;
/// Task function.
/// Parameters are passed in `ctx->closure`.
/// Returns a promise that will be resolved when task finishes,
/// or NULL if task finishes synchronously.
typedef async (*task_fn)(context_t *ctx);
/// A task is a function that can be run asynchronously.
///
/// Lifespan:
/// Create in `aio_continuation`. Free when finished by event loop.
typedef struct task {
task_fn fn;
context_t *ctx;
} task_t;
#define __ACPSX(n) \
case n: \
goto cont##n;
#define AIO_CPS_1 \
switch (ctx->cont) { __ACPSX(1) }
#define AIO_CPS_2 \
switch (ctx->cont) { __ACPSX(1) __ACPSX(2) }
#define AIO_CPS_3 \
switch (ctx->cont) { __ACPSX(1) __ACPSX(2) __ACPSX(3) }
#define AIO_CPS_4 \
switch (ctx->cont) { __ACPSX(1) __ACPSX(2) __ACPSX(3) __ACPSX(4) }
/// Initialize AIO.
void aio_init();
/// Start event loop.
void aio_event_loop();
/// Get continuation of current task.
task_t *aio_continuation(task_fn fn, context_t *ctx, void *closure,
size_t closure_size);
/// Prepare a context for async call.
context_t *aio_create_context(void *closure, size_t closure_size);
/// Free a context.
void aio_free_context(context_t *ctx);
/// Resolve a promise.
void aio_resolve(promise_t *promise, int value);
void aio_await(promise_t *promise, task_t *cont);
/// Wait for a file descriptor.
promise_t *aio_waitfd(int fd, int events);
void aio_create_task(task_fn fn, context_t *ctx);
#define aio_extract_closure(ctx, args) \
do { \
typeof(args) *closure = ctx->closure; \
args = *closure; \
aio_free(closure); \
ctx->closure = NULL; \
} while (0)
#define aio_run(callee, arg_type, ...) \
(callee(aio_create_context(&(arg_type){__VA_ARGS__}, sizeof(arg_type))))
#define aio_run0(callee) (callee(aio_create_context(NULL, 0)))
#define aio_cont(caller, ctx, closure_type, ...) \
(aio_continuation(caller, ctx, &(closure_type){__VA_ARGS__}, \
sizeof(closure_type)))
#define aio_cont0(caller, ctx) (aio_continuation(caller, ctx, NULL, 0))
#define aio_yield(ctx) \
do { \
return ctx->promise; \
} while (0)
#define aio_return(ctx, value) \
do { \
promise_t *promise = ctx->promise; \
free(ctx); \
aio_resolve(promise, value); \
return promise; \
} while (0)
#define aio_run_task(callee, arg_type, ...) \
(aio_create_task( \
callee, aio_create_context(&(arg_type){__VA_ARGS__}, sizeof(arg_type))))
#define aio_run_task0(callee) \
(aio_create_task(callee, aio_create_context(NULL, 0)))
#endif
#include "aio.h"
#include <stdio.h>
struct test_args {
int a, b;
};
async aio_gcd(context_t *ctx) {
AIO_CPS_1;
struct test_args args;
aio_extract_closure(ctx, args);
if (args.b == 0) {
aio_return(ctx, args.a);
}
promise_t *p = aio_run(aio_gcd, struct test_args, args.b, args.a % args.b);
aio_await(p, aio_cont0(aio_gcd, ctx));
aio_yield(ctx);
cont1:
aio_return(ctx, ctx->ret);
}
async aio_test(context_t *ctx) {
AIO_CPS_1;
promise_t *p = aio_run(aio_gcd, struct test_args, 10, 3);
aio_await(p, aio_cont0(aio_test, ctx));
aio_yield(ctx);
cont1:
printf("gcd(10, 3) = %d", ctx->ret);
exit(0);
aio_return(ctx, 0);
}
int main() {
aio_init();
aio_run_task0(aio_test);
aio_event_loop();
return 0;
}
#ifndef QUEUE_H
#define QUEUE_H
#include <stdlib.h>
struct queue {
void **slots;
int head;
int tail;
int capacity;
};
#define QUEUE_DEFAULT_CAPACITY 8
static inline struct queue *queue_create() {
struct queue *q = malloc(sizeof(struct queue));
q->slots = malloc(sizeof(void *) * QUEUE_DEFAULT_CAPACITY);
q->head = 0;
q->tail = 0;
q->capacity = QUEUE_DEFAULT_CAPACITY;
return q;
}
static inline void queue_free(struct queue *q) {
free(q->slots);
free(q);
}
static inline int queue_empty(struct queue *q) { return q->head == q->tail; }
static inline int queue_size(struct queue *q) {
if (q->tail >= q->head) {
return q->tail - q->head;
} else {
return q->capacity - (q->head - q->tail);
}
}
static inline void queue_resize(struct queue *q, int new_capacity) {
void **new_slots = malloc(sizeof(void *) * new_capacity);
int i;
for (i = 0; i < q->capacity; i++) {
new_slots[i] = q->slots[(q->head + i) % q->capacity];
}
free(q->slots);
q->slots = new_slots;
q->head = 0;
q->tail = q->capacity;
q->capacity = new_capacity;
}
static inline void queue_push_back(struct queue *q, void *item) {
if (queue_size(q) == q->capacity) {
queue_resize(q, q->capacity * 2);
}
q->slots[q->tail] = item;
q->tail = (q->tail + 1) % q->capacity;
}
static inline void *queue_pop_front(struct queue *q) {
if (queue_empty(q)) {
return NULL;
}
void *item = q->slots[q->head];
q->head = (q->head + 1) % q->capacity;
return item;
}
#endif
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment