Skip to content

Instantly share code, notes, and snippets.

@MarkReedZ
Created April 23, 2024 16:27
Show Gist options
  • Select an option

  • Save MarkReedZ/e45d93db10d973623dca8d3ad03cf7e8 to your computer and use it in GitHub Desktop.

Select an option

Save MarkReedZ/e45d93db10d973623dca8d3ad03cf7e8 to your computer and use it in GitHub Desktop.
#include <fcntl.h>
#include <stdint.h>
#include "liburing.h"
#include "liburing/io_uring.h"
#include <netinet/in.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <time.h>
#include <unistd.h>
#define NEW_CLIENT 0xffffffffffffffff
#define PORT 4444
#define BUF_SIZE 4096L
#define NR_BUFS 4096
#define BR_MASK (NR_BUFS - 1)
// sudo apt install pv nc
// gcc -O3 mshot.c -o mshot -luring
// cat /dev/zero | pv | nc localhost 4444
int setup_listening_socket(int port);
int main(int argc, char** argv) {
int fd = setup_listening_socket(PORT);
struct io_uring ring;
struct io_uring_params params;
memset(&params, 0, sizeof(params));
params.flags |= IORING_SETUP_SINGLE_ISSUER;
params.flags |= IORING_SETUP_SQPOLL;
params.sq_thread_idle = 1000;
if (io_uring_queue_init_params(32768, &ring, &params) < 0) { fprintf(stderr, "%s\n", strerror(errno)); exit(1); }
char* buf;
if (posix_memalign((void**)&buf, 4096, BUF_SIZE * NR_BUFS)) { perror("posix memalign"); exit(1); }
int ret;
struct io_uring_buf_ring* br;
br = io_uring_setup_buf_ring(&ring, NR_BUFS, 1, 0, &ret);
if (!br) {
if (ret == -EINVAL) { return 1; }
fprintf(stderr, "Buffer ring register failed %d %s\n", ret, strerror(errno));
return 1;
}
void* ptr = buf;
for (int i = 0; i < NR_BUFS; i++) {
io_uring_buf_ring_add(br, ptr, BUF_SIZE, i, BR_MASK, i);
ptr += BUF_SIZE;
}
io_uring_buf_ring_advance(br, NR_BUFS);
struct io_uring_sqe* sqe;
sqe = io_uring_get_sqe(&ring);
io_uring_prep_multishot_accept(sqe, fd, NULL, NULL, 0);
io_uring_sqe_set_data64(sqe, NEW_CLIENT);
io_uring_submit(&ring);
printf("Server ready\n");
time_t last_time = time(NULL);
while (1) {
struct io_uring_cqe* cqe;
unsigned int head;
unsigned int i = 0;
io_uring_for_each_cqe(&ring, head, cqe) {
if (cqe->res < 0) { fprintf(stderr, "%s\n", strerror(-cqe->res)); perror("cqe-res"); exit(1); }
uint64_t user_data = io_uring_cqe_get_data64(cqe);
if (user_data == NEW_CLIENT) {
int client_fd = cqe->res;
printf("New client: %d\n", client_fd);
struct io_uring_sqe* sqe;
sqe = io_uring_get_sqe(&ring);
io_uring_prep_recv_multishot(sqe, client_fd, NULL, 0, 0);
io_uring_sqe_set_data64(sqe, client_fd);
sqe->buf_group = 1;
sqe->flags |= IOSQE_BUFFER_SELECT;
io_uring_submit(&ring);
} else {
if ((cqe->flags & IORING_CQE_F_MORE) == 0) {
fprintf(stderr, "multishot cancelled for: %llu\n", io_uring_cqe_get_data64(cqe));
}
int bid = cqe->flags >> IORING_CQE_BUFFER_SHIFT;
ptr = buf + bid * BUF_SIZE;
io_uring_buf_ring_add(br, ptr, BUF_SIZE, bid, BR_MASK, 0);
io_uring_buf_ring_advance(br, 1);
}
++i;
}
io_uring_cq_advance(&ring, i);
}
io_uring_free_buf_ring(&ring, br, NR_BUFS, 1);
io_uring_queue_exit(&ring);
return 0;
}
int setup_listening_socket(int port) {
int fd = socket(PF_INET, SOCK_STREAM, 0);
if (fd == -1) { perror("socket()"); exit(1); }
int enable = 1;
if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int)) < 0) {
perror("setsockopt(SO_REUSEADDR)");
exit(1);
}
struct sockaddr_in srv_addr;
memset(&srv_addr, 0, sizeof(srv_addr));
srv_addr.sin_family = AF_INET;
srv_addr.sin_port = htons(port);
srv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
if (bind(fd, (const struct sockaddr*)&srv_addr, sizeof(srv_addr)) < 0) {
perror("bind()");
exit(1);
}
if (listen(fd, 10) < 0) { perror("listen()"); exit(1); }
return fd;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment