Created
December 3, 2025 06:33
-
-
Save croisillon/ee16f1d01fdc98be5302ad9ae20ed5ef to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #include "cmpmc.h" | |
| int cmpmc_init(mpmc_bounded_queue_t *q, size_t buf_size) { | |
| if (q == NULL || buf_size < 2 || (buf_size & (buf_size - 1)) != 0) return -1; | |
| // Выделяем память для ячеек буфера | |
| cell_t *alloc_buf = calloc(buf_size, sizeof(cell_t)); | |
| if (alloc_buf == NULL) | |
| return -1; | |
| // Устанавливаем указатель на буфер и маску для быстрого вычисления индекса | |
| q->buf = alloc_buf; | |
| q->buf_mask = (buf_size - 1); // Маска для операции pos & mask | |
| // Инициализируем sequence numbers для каждой ячейки | |
| // seq[i] = i означает, что ячейка готова для записи | |
| for (size_t i = 0; i != buf_size; i += 1) | |
| atomic_store_explicit(&q->buf[i].seq, i, memory_order_relaxed); | |
| // Инициализируем позиции enqueue и dequeue | |
| atomic_store_explicit(&q->enqueue_pos, 0, memory_order_relaxed); | |
| atomic_store_explicit(&q->dequeue_pos, 0, memory_order_relaxed); | |
| return 0; | |
| } | |
| void cmpmc_destroy(mpmc_bounded_queue_t *q) { | |
| if (q != NULL && q->buf != NULL) { | |
| free(q->buf); | |
| q->buf = NULL; | |
| } | |
| } | |
| int cmpmc_enq(mpmc_bounded_queue_t *q, void *const data) { | |
| if (q == NULL) { | |
| return -1; | |
| } | |
| cell_t *cell; | |
| // Получаем текущую позицию для записи | |
| size_t pos = atomic_load_explicit(&q->enqueue_pos, memory_order_relaxed); | |
| // printf("[DEBUG] ENQ START: pos=%zu, data=%p\n", pos, data); | |
| for (;;) { | |
| // Вычисляем индекс ячейки в кольцевом буфере | |
| cell = &q->buf[pos & q->buf_mask]; | |
| // Читаем sequence number ячейки | |
| size_t seq = atomic_load_explicit(&cell->seq, memory_order_acquire); | |
| // Вычисляем разность: seq - pos | |
| // dif == 0: ячейка готова для записи | |
| // dif < 0: ячейка еще не готова (очередь полная) | |
| // dif > 0: ячейка уже была записана (конкуренция) | |
| intptr_t dif = (intptr_t)seq - (intptr_t)pos; | |
| // printf("[DEBUG] ENQ LOOP: pos=%zu, idx=%zu, seq=%zu, dif=%ld\n", pos, pos & q->buf_mask, seq, dif); | |
| if (dif == 0) { | |
| // Ячейка готова для записи - пытаемся захватить позицию | |
| if (atomic_compare_exchange_weak_explicit(&q->enqueue_pos, &pos, pos + 1, | |
| memory_order_relaxed, memory_order_relaxed)) { | |
| // printf("[DEBUG] ENQ CAPTURED: new_pos=%zu\n", pos + 1); | |
| break; // Успешно захватили позицию | |
| } | |
| } else if (dif < 0) { | |
| // Ячейка не готова - очередь полная | |
| // printf("[DEBUG] ENQ FULL\n"); | |
| return -1; | |
| } else { | |
| // Конкуренция - обновляем позицию и пробуем снова | |
| pos = atomic_load_explicit(&q->enqueue_pos, memory_order_relaxed); | |
| // printf("[DEBUG] ENQ RETRY: new_pos=%zu\n", pos); | |
| } | |
| } | |
| // Записываем данные в ячейку | |
| // printf("[DEBUG] ENQ WRITING: idx=%zu, data=%p\n", pos & q->buf_mask, data); | |
| cell->data = data; | |
| // Устанавливаем sequence number = pos + 1 (готово для чтения) | |
| atomic_store_explicit(&cell->seq, pos + 1, memory_order_release); | |
| return 0; | |
| } | |
| size_t cmpmc_size(mpmc_bounded_queue_t *q) { | |
| if (q == NULL) return 0; | |
| size_t head = atomic_load_explicit(&q->enqueue_pos, memory_order_relaxed); | |
| size_t tail = atomic_load_explicit(&q->dequeue_pos, memory_order_relaxed); | |
| // Защита от переполнения и гонок: | |
| return (head >= tail) ? (head - tail) : 0; | |
| } | |
| void *cmpmc_deq(mpmc_bounded_queue_t *q) { | |
| if (q == NULL) return NULL; | |
| cell_t *cell; | |
| void *data; | |
| // Получаем текущую позицию для чтения | |
| size_t pos = atomic_load_explicit(&q->dequeue_pos, memory_order_relaxed); | |
| size_t buffer_size = q->buf_mask + 1; | |
| // printf("[DEBUG] DEQ START: pos=%zu, buffer_size=%zu\n", pos, buffer_size); | |
| for (;;) { | |
| // Вычисляем индекс ячейки в кольцевом буфере | |
| cell = &q->buf[pos & q->buf_mask]; | |
| // Читаем sequence number ячейки | |
| size_t seq = atomic_load_explicit(&cell->seq, memory_order_acquire); | |
| // Вычисляем разность: seq - (pos + 1) | |
| // dif == 0: ячейка готова для чтения | |
| // dif < 0: ячейка еще не готова (очередь пуста) | |
| // dif > 0: ячейка уже была прочитана (конкуренция) | |
| intptr_t dif = (intptr_t)seq - (intptr_t)(pos + 1); | |
| // printf("[DEBUG] DEQ LOOP: pos=%zu, idx=%zu, seq=%zu, dif=%ld\n", pos, pos & q->buf_mask, seq, dif); | |
| if (dif == 0) { | |
| // Ячейка готова для чтения - пытаемся захватить позицию | |
| if (atomic_compare_exchange_weak_explicit(&q->dequeue_pos, &pos, pos + 1, | |
| memory_order_relaxed, memory_order_relaxed)) { | |
| // printf("[DEBUG] DEQ CAPTURED: new_pos=%zu\n", pos + 1); | |
| break; // Успешно захватили позицию | |
| } | |
| } else if (dif < 0) { | |
| // Ячейка не готова - очередь пуста | |
| // printf("[DEBUG] DEQ EMPTY\n"); | |
| return NULL; | |
| } else { | |
| // Конкуренция - обновляем позицию и пробуем снова | |
| pos = atomic_load_explicit(&q->dequeue_pos, memory_order_relaxed); | |
| // printf("[DEBUG] DEQ RETRY: new_pos=%zu\n", pos); | |
| } | |
| } | |
| // Читаем данные из ячейки | |
| data = cell->data; | |
| // Устанавливаем sequence number = pos + buffer_size (готово для записи) | |
| size_t new_seq = pos + buffer_size; | |
| // printf("[DEBUG] DEQ FINISH: data=%p, setting seq=%zu\n", data, new_seq); | |
| atomic_store_explicit(&cell->seq, new_seq, memory_order_release); | |
| return data; | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #ifndef CMPMC_H | |
| #define CMPMC_H | |
| #include <assert.h> | |
| #include <stdatomic.h> | |
| #include <stdbool.h> | |
| #include <stdint.h> | |
| #include <stdio.h> | |
| #include <stdlib.h> | |
| #include <string.h> | |
| #define CACHELINE_SIZE 64 | |
| typedef struct { | |
| _Atomic size_t seq; | |
| void *data; | |
| } cell_t; | |
| typedef char cacheline_pad_t[CACHELINE_SIZE]; | |
| typedef struct { | |
| cacheline_pad_t pad0_; | |
| cell_t *buf; | |
| size_t buf_mask; | |
| cacheline_pad_t pad1; | |
| _Atomic size_t enqueue_pos; | |
| cacheline_pad_t pad2; | |
| _Atomic size_t dequeue_pos; | |
| cacheline_pad_t pad3; | |
| } mpmc_bounded_queue_t; | |
| /** | |
| * @brief Инициализация lock-free кольцевого буфера | |
| * @param q Указатель на структуру буфера | |
| * @param buf_size Размер буфера (должен быть степенью двойки >= 2) | |
| * @return 0 при успехе, -1 при ошибке | |
| */ | |
| int cmpmc_init(mpmc_bounded_queue_t *q, size_t buf_size); | |
| /** | |
| * @brief Уничтожение кольцевого буфера | |
| * @param q Указатель на структуру буфера | |
| */ | |
| void cmpmc_destroy(mpmc_bounded_queue_t *q); | |
| /** | |
| * @brief Добавление элемента в кольцевой буфер (enqueue) | |
| * @param q Указатель на структуру буфера | |
| * @param data Указатель на данные для добавления | |
| * @return 0 при успехе, -1 при переполнении или невалидных параметрах | |
| */ | |
| int cmpmc_enq(mpmc_bounded_queue_t *q, void *const data); | |
| /** | |
| * @brief Извлечение элемента из кольцевого буфера (dequeue) | |
| * @param q Указатель на структуру буфера | |
| * @return Указатель на данные или NULL если очередь пуста | |
| */ | |
| void *cmpmc_deq(mpmc_bounded_queue_t *q); | |
| /** | |
| * @brief Получение текущего размера очереди | |
| * @param q Указатель на структуру буфера | |
| * @return Количество элементов в очереди | |
| */ | |
| size_t cmpmc_size(mpmc_bounded_queue_t *q); | |
| #endif |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment