Skip to content

Instantly share code, notes, and snippets.

@croisillon
Created December 3, 2025 06:33
Show Gist options
  • Select an option

  • Save croisillon/ee16f1d01fdc98be5302ad9ae20ed5ef to your computer and use it in GitHub Desktop.

Select an option

Save croisillon/ee16f1d01fdc98be5302ad9ae20ed5ef to your computer and use it in GitHub Desktop.
#include "cmpmc.h"
int cmpmc_init(mpmc_bounded_queue_t *q, size_t buf_size) {
if (q == NULL || buf_size < 2 || (buf_size & (buf_size - 1)) != 0) return -1;
// Выделяем память для ячеек буфера
cell_t *alloc_buf = calloc(buf_size, sizeof(cell_t));
if (alloc_buf == NULL)
return -1;
// Устанавливаем указатель на буфер и маску для быстрого вычисления индекса
q->buf = alloc_buf;
q->buf_mask = (buf_size - 1); // Маска для операции pos & mask
// Инициализируем sequence numbers для каждой ячейки
// seq[i] = i означает, что ячейка готова для записи
for (size_t i = 0; i != buf_size; i += 1)
atomic_store_explicit(&q->buf[i].seq, i, memory_order_relaxed);
// Инициализируем позиции enqueue и dequeue
atomic_store_explicit(&q->enqueue_pos, 0, memory_order_relaxed);
atomic_store_explicit(&q->dequeue_pos, 0, memory_order_relaxed);
return 0;
}
void cmpmc_destroy(mpmc_bounded_queue_t *q) {
if (q != NULL && q->buf != NULL) {
free(q->buf);
q->buf = NULL;
}
}
int cmpmc_enq(mpmc_bounded_queue_t *q, void *const data) {
if (q == NULL) {
return -1;
}
cell_t *cell;
// Получаем текущую позицию для записи
size_t pos = atomic_load_explicit(&q->enqueue_pos, memory_order_relaxed);
// printf("[DEBUG] ENQ START: pos=%zu, data=%p\n", pos, data);
for (;;) {
// Вычисляем индекс ячейки в кольцевом буфере
cell = &q->buf[pos & q->buf_mask];
// Читаем sequence number ячейки
size_t seq = atomic_load_explicit(&cell->seq, memory_order_acquire);
// Вычисляем разность: seq - pos
// dif == 0: ячейка готова для записи
// dif < 0: ячейка еще не готова (очередь полная)
// dif > 0: ячейка уже была записана (конкуренция)
intptr_t dif = (intptr_t)seq - (intptr_t)pos;
// printf("[DEBUG] ENQ LOOP: pos=%zu, idx=%zu, seq=%zu, dif=%ld\n", pos, pos & q->buf_mask, seq, dif);
if (dif == 0) {
// Ячейка готова для записи - пытаемся захватить позицию
if (atomic_compare_exchange_weak_explicit(&q->enqueue_pos, &pos, pos + 1,
memory_order_relaxed, memory_order_relaxed)) {
// printf("[DEBUG] ENQ CAPTURED: new_pos=%zu\n", pos + 1);
break; // Успешно захватили позицию
}
} else if (dif < 0) {
// Ячейка не готова - очередь полная
// printf("[DEBUG] ENQ FULL\n");
return -1;
} else {
// Конкуренция - обновляем позицию и пробуем снова
pos = atomic_load_explicit(&q->enqueue_pos, memory_order_relaxed);
// printf("[DEBUG] ENQ RETRY: new_pos=%zu\n", pos);
}
}
// Записываем данные в ячейку
// printf("[DEBUG] ENQ WRITING: idx=%zu, data=%p\n", pos & q->buf_mask, data);
cell->data = data;
// Устанавливаем sequence number = pos + 1 (готово для чтения)
atomic_store_explicit(&cell->seq, pos + 1, memory_order_release);
return 0;
}
size_t cmpmc_size(mpmc_bounded_queue_t *q) {
if (q == NULL) return 0;
size_t head = atomic_load_explicit(&q->enqueue_pos, memory_order_relaxed);
size_t tail = atomic_load_explicit(&q->dequeue_pos, memory_order_relaxed);
// Защита от переполнения и гонок:
return (head >= tail) ? (head - tail) : 0;
}
void *cmpmc_deq(mpmc_bounded_queue_t *q) {
if (q == NULL) return NULL;
cell_t *cell;
void *data;
// Получаем текущую позицию для чтения
size_t pos = atomic_load_explicit(&q->dequeue_pos, memory_order_relaxed);
size_t buffer_size = q->buf_mask + 1;
// printf("[DEBUG] DEQ START: pos=%zu, buffer_size=%zu\n", pos, buffer_size);
for (;;) {
// Вычисляем индекс ячейки в кольцевом буфере
cell = &q->buf[pos & q->buf_mask];
// Читаем sequence number ячейки
size_t seq = atomic_load_explicit(&cell->seq, memory_order_acquire);
// Вычисляем разность: seq - (pos + 1)
// dif == 0: ячейка готова для чтения
// dif < 0: ячейка еще не готова (очередь пуста)
// dif > 0: ячейка уже была прочитана (конкуренция)
intptr_t dif = (intptr_t)seq - (intptr_t)(pos + 1);
// printf("[DEBUG] DEQ LOOP: pos=%zu, idx=%zu, seq=%zu, dif=%ld\n", pos, pos & q->buf_mask, seq, dif);
if (dif == 0) {
// Ячейка готова для чтения - пытаемся захватить позицию
if (atomic_compare_exchange_weak_explicit(&q->dequeue_pos, &pos, pos + 1,
memory_order_relaxed, memory_order_relaxed)) {
// printf("[DEBUG] DEQ CAPTURED: new_pos=%zu\n", pos + 1);
break; // Успешно захватили позицию
}
} else if (dif < 0) {
// Ячейка не готова - очередь пуста
// printf("[DEBUG] DEQ EMPTY\n");
return NULL;
} else {
// Конкуренция - обновляем позицию и пробуем снова
pos = atomic_load_explicit(&q->dequeue_pos, memory_order_relaxed);
// printf("[DEBUG] DEQ RETRY: new_pos=%zu\n", pos);
}
}
// Читаем данные из ячейки
data = cell->data;
// Устанавливаем sequence number = pos + buffer_size (готово для записи)
size_t new_seq = pos + buffer_size;
// printf("[DEBUG] DEQ FINISH: data=%p, setting seq=%zu\n", data, new_seq);
atomic_store_explicit(&cell->seq, new_seq, memory_order_release);
return data;
}
#ifndef CMPMC_H
#define CMPMC_H
#include <assert.h>
#include <stdatomic.h>
#include <stdbool.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#define CACHELINE_SIZE 64
typedef struct {
_Atomic size_t seq;
void *data;
} cell_t;
typedef char cacheline_pad_t[CACHELINE_SIZE];
typedef struct {
cacheline_pad_t pad0_;
cell_t *buf;
size_t buf_mask;
cacheline_pad_t pad1;
_Atomic size_t enqueue_pos;
cacheline_pad_t pad2;
_Atomic size_t dequeue_pos;
cacheline_pad_t pad3;
} mpmc_bounded_queue_t;
/**
* @brief Инициализация lock-free кольцевого буфера
* @param q Указатель на структуру буфера
* @param buf_size Размер буфера (должен быть степенью двойки >= 2)
* @return 0 при успехе, -1 при ошибке
*/
int cmpmc_init(mpmc_bounded_queue_t *q, size_t buf_size);
/**
* @brief Уничтожение кольцевого буфера
* @param q Указатель на структуру буфера
*/
void cmpmc_destroy(mpmc_bounded_queue_t *q);
/**
* @brief Добавление элемента в кольцевой буфер (enqueue)
* @param q Указатель на структуру буфера
* @param data Указатель на данные для добавления
* @return 0 при успехе, -1 при переполнении или невалидных параметрах
*/
int cmpmc_enq(mpmc_bounded_queue_t *q, void *const data);
/**
* @brief Извлечение элемента из кольцевого буфера (dequeue)
* @param q Указатель на структуру буфера
* @return Указатель на данные или NULL если очередь пуста
*/
void *cmpmc_deq(mpmc_bounded_queue_t *q);
/**
* @brief Получение текущего размера очереди
* @param q Указатель на структуру буфера
* @return Количество элементов в очереди
*/
size_t cmpmc_size(mpmc_bounded_queue_t *q);
#endif
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment