Skip to content

Instantly share code, notes, and snippets.

@perky
Last active November 4, 2025 02:55
Show Gist options
  • Select an option

  • Save perky/2b2eb4e68a6be407ce5e2892f17e33e7 to your computer and use it in GitHub Desktop.

Select an option

Save perky/2b2eb4e68a6be407ce5e2892f17e33e7 to your computer and use it in GitHub Desktop.
An example of multi-core by default.
//! An example of Multi-Core by Default
//! See https://www.rfleury.com/p/multi-core-by-default
//! This solves part 1 & 2 of Advent of Code 2024 day 1: https://adventofcode.com/2024/day/1
//! Code by Luke Perkin (2025)
const std = @import("std");
const Allocator = std.mem.Allocator;
const Thread = std.Thread;
threadlocal var thread_ctx: *ThreadContext = undefined;
/// This will be stored into a threadlocal variable
/// and different for each thread.
const ThreadContext = struct {
thread_i: usize,
thread_count: usize,
allocator: Allocator,
barrier: *Barrier,
shared_storage: []u8,
// Syncs a value from src_thread_i to all other threads.
pub fn syncValue(self: *ThreadContext, inout_val: anytype, src_thread_i: usize) void {
const T = @TypeOf(inout_val);
const pointer = @typeInfo(T).pointer;
std.debug.assert(pointer.size == .one);
const size_t = @sizeOf(pointer.child);
if (self.thread_i == src_thread_i) {
const as_bytes = std.mem.asBytes(inout_val);
@memcpy(self.shared_storage[0..size_t], as_bytes);
}
_ = self.barrier.wait();
if (self.thread_i != src_thread_i) {
const as_bytes = std.mem.asBytes(inout_val);
@memcpy(as_bytes, self.shared_storage[0..size_t]);
}
_ = self.barrier.wait();
}
// Syncs a pointer from src_thread_i to all other threads.
// Returns the synced pointer.
pub fn syncPtr(self: *ThreadContext, in_ptr: anytype, src_thread_i: usize) @TypeOf(in_ptr) {
const T = @TypeOf(in_ptr);
const size_t = @sizeOf(T);
var result_ptr: T = undefined;
if (self.thread_i == src_thread_i) {
const ptr_val = @intFromPtr(in_ptr);
const as_bytes = std.mem.asBytes(&ptr_val);
@memcpy(self.shared_storage[0..size_t], as_bytes);
result_ptr = in_ptr;
}
_ = self.barrier.wait();
if (self.thread_i != src_thread_i) {
var ptr_val = @intFromPtr(in_ptr);
const as_bytes = std.mem.asBytes(&ptr_val);
@memcpy(as_bytes, self.shared_storage[0..size_t]);
const as_val = std.mem.bytesToValue(usize, as_bytes);
result_ptr = @ptrFromInt(as_val);
}
_ = self.barrier.wait();
return result_ptr;
}
};
/// The program's entry point.
pub fn main() !void {
// Set up a global memory allocator.
// This will be used by all threads.
var gpa: std.heap.GeneralPurposeAllocator(.{}) = .init;
const allocator = gpa.allocator();
defer _ = gpa.deinit();
// Get command line arguments.
const args = try std.process.argsAlloc(allocator);
defer std.process.argsFree(allocator, args);
// Spin up threads equal to the host's cpu core count.
const MAX_THREADS = 1024;
const thread_count = @min(try std.Thread.getCpuCount(), MAX_THREADS);
std.debug.assert(thread_count >= 1);
var threads: [MAX_THREADS]std.Thread = undefined;
var thread_ctxes: [MAX_THREADS]ThreadContext = undefined;
var shared_barrier = Barrier.init(thread_count);
var shared_storage: [512]u8 = undefined;
for (0..thread_count) |thread_i| {
thread_ctxes[thread_i] = ThreadContext{
.thread_i = thread_i,
.thread_count = thread_count,
.barrier = &shared_barrier,
.shared_storage = &shared_storage,
.allocator = allocator,
};
threads[thread_i] = try std.Thread.spawn(.{}, parallelMain, .{ &thread_ctxes[thread_i], args });
}
// Wait for all threads to finish.
for (threads[0..thread_count]) |thread| {
thread.join();
}
}
/// Our threaded main function
fn parallelMain(ctx: *ThreadContext, args: [][:0]u8) void {
// Store context in a threadlocal var.
thread_ctx = ctx;
// Read the process args for an input file to read.
// This will be null on all threads but the first one.
const maybe_input_file_path: ?[:0]const u8 = blk: {
if (ctx.thread_i == 0) {
if (args.len >= 2) {
break :blk args[1];
} else {
@panic("Must provide an input file to read");
}
} else break :blk null;
};
solveAocPuzzles(maybe_input_file_path) catch |err| {
std.log.err("error when trying to solve aoc puzzles: {any}", .{err});
};
}
const SolveContext = struct {
left_values: std.ArrayList(u32),
right_values: std.ArrayList(u32),
max_left_val: u32 = 0,
min_left_val: u32 = std.math.maxInt(u32),
max_right_val: u32 = 0,
min_right_val: u32 = std.math.maxInt(u32),
value_counts: ?[]usize = null,
value_counts_mutex: std.Thread.Mutex = .{},
};
fn solveAocPuzzles(maybe_input_file_path: ?[:0]const u8) !void {
const ctx = thread_ctx;
// Create a shared bit of memory called the solve context.
// We allocate and initialise this on thread 0, then sync a pointer to that memory
// to all other threads.
var shared_solve_ctx: SolveContext = undefined;
if (ctx.thread_i == 0) {
shared_solve_ctx = SolveContext{
.left_values = try .initCapacity(ctx.allocator, 1024),
.right_values = try .initCapacity(ctx.allocator, 1024),
};
}
defer if (ctx.thread_i == 0) {
shared_solve_ctx.left_values.deinit(ctx.allocator);
shared_solve_ctx.right_values.deinit(ctx.allocator);
if (shared_solve_ctx.value_counts) |value_counts| {
ctx.allocator.free(value_counts);
}
};
const solve_ctx = ctx.syncPtr(&shared_solve_ctx, 0);
// Parse the input file on a single thread.
if (maybe_input_file_path) |input_file_path| {
try parseAocPuzzleInput(ctx.allocator, input_file_path, solve_ctx);
}
_ = ctx.barrier.wait();
try solveAocPuzzlePart1(solve_ctx);
try solveAocPuzzlePart2(solve_ctx);
}
fn parseAocPuzzleInput(allocator: Allocator, input_file_path: [:0]const u8, solve_ctx: *SolveContext) !void {
std.debug.print("parsing file...\n", .{});
// Zig's new reader/writer interface (as of 0.15) is little clunky, but it is WIP to be fair
const file = try std.fs.cwd().openFile(input_file_path, .{ .mode = .read_only });
defer file.close();
var read_buf: [2048]u8 = undefined; // The buffer is needed to do buffered reads, reducing the number of kernel calls.
var file_reader = file.reader(&read_buf);
const file_reader_interface: *std.Io.Reader = &file_reader.interface; // Get the Reader interface (must be a pointer due to vtable stuffs).
// Go over each line in the file and parse the left and right number.
while (file_reader_interface.takeDelimiterExclusive('\n')) |line| {
const trimmed_line = std.mem.trimEnd(u8, line, "\n\r ");
var tokens = std.mem.tokenizeScalar(u8, trimmed_line, ' ');
if (tokens.next()) |bytes| {
const left_val = try std.fmt.parseInt(u32, bytes, 10);
if (left_val > solve_ctx.max_left_val) solve_ctx.max_left_val = left_val;
if (left_val < solve_ctx.min_left_val) solve_ctx.min_left_val = left_val;
try solve_ctx.left_values.append(allocator, left_val);
}
if (tokens.next()) |bytes| {
const right_val = try std.fmt.parseInt(u32, bytes, 10);
if (right_val > solve_ctx.max_right_val) solve_ctx.max_right_val = right_val;
if (right_val < solve_ctx.min_right_val) solve_ctx.min_right_val = right_val;
try solve_ctx.right_values.append(allocator, right_val);
}
} else |err| switch (err) {
error.EndOfStream => {},
else => return err,
}
const right_val_range = (solve_ctx.max_right_val - solve_ctx.min_right_val) + 1;
solve_ctx.value_counts = try allocator.alloc(usize, right_val_range);
@memset(solve_ctx.value_counts.?, 0);
std.debug.print("has parsed file...\n", .{});
}
/// Remember this function is called on every thread, one for each cpu core.
fn solveAocPuzzlePart1(solve_ctx: *SolveContext) !void {
const ctx = thread_ctx;
const left_values: *std.ArrayList(u32) = &solve_ctx.left_values;
const right_values: *std.ArrayList(u32) = &solve_ctx.right_values;
try countingSort(left_values.items);
try countingSort(right_values.items);
// Must do a barrier wait here, to make sure we've sorted everything.
_ = ctx.barrier.wait();
// Now that both left and right is sorted, we can get the difference between each very easily.
// This is run parallel, we use an atomic variable to add together the results of each thread.
// This atomic value is stored on this functions stack, but exists multiple times as multi threads
// are calling this function. So we're going to sync a pointer to shared_all_differences_sum that exists on thread 0
// to all other threads. This is how we can share data across threads without having to prepare static variables.
var shared_all_differences_sum: std.atomic.Value(u32) = .init(0);
var all_differences_sum = ctx.syncPtr(&shared_all_differences_sum, 0);
std.debug.assert(left_values.items.len == right_values.items.len);
const work_range = partitionValuesForThread(left_values.items.len, ctx.thread_i, ctx.thread_count);
var differences_sum: u32 = 0;
for (work_range.start..work_range.end) |i| {
const l_val = left_values.items[i];
const r_val = right_values.items[i];
const max_val = @max(l_val, r_val);
const min_val = @min(l_val, r_val);
const difference = max_val - min_val;
differences_sum += difference;
}
_ = all_differences_sum.fetchAdd(differences_sum, .acq_rel);
// Print the result.
const is_output_logger = ctx.barrier.wait();
if (is_output_logger) {
const result = all_differences_sum.load(.acquire);
std.debug.print("thread: {d}, puzzle output: {d}\n", .{ ctx.thread_i, result });
}
}
/// Remember this function is called on every thread, one for each cpu core.
fn solveAocPuzzlePart2(solve_ctx: *SolveContext) !void {
const ctx = thread_ctx;
const left_values: *std.ArrayList(u32) = &solve_ctx.left_values;
const right_values: *std.ArrayList(u32) = &solve_ctx.right_values;
// Split the workload across threads
const work_range = partitionValuesForThread(right_values.items.len, ctx.thread_i, ctx.thread_count);
const right_work_slice = right_values.items[work_range.start..work_range.end];
// Count up occurances of each value in right_work_slice
const max_val: u32 = std.mem.max(u32, right_work_slice);
const min_val: u32 = std.mem.min(u32, right_work_slice);
const val_range: u32 = (max_val - min_val) + 1;
const val_counts = try ctx.allocator.alloc(usize, val_range);
defer ctx.allocator.free(val_counts);
@memset(val_counts, 0);
for (right_work_slice) |val| {
val_counts[val - min_val] += 1;
}
// Collect the thread local value_counts into a shared value_counts.
// Uses a mutex so only one thread at a time will write into the shared resource.
{
solve_ctx.value_counts_mutex.lock();
defer solve_ctx.value_counts_mutex.unlock();
for (val_counts, 0..) |count, i| {
solve_ctx.value_counts.?[(i + min_val) - solve_ctx.min_right_val] += count;
}
}
// Calculate the similarity score.
var shared_similarity_score: std.atomic.Value(u64) = .init(0);
var similarity_score = ctx.syncPtr(&shared_similarity_score, 0);
const left_work_slice = left_values.items[work_range.start..work_range.end];
var local_similarity_score: u64 = 0;
for (left_work_slice) |val| {
if (val < solve_ctx.min_right_val) continue;
if (val > solve_ctx.max_right_val) continue;
const count_i = val - solve_ctx.min_right_val;
const count = solve_ctx.value_counts.?[count_i];
const score: u64 = @intCast(count * val);
local_similarity_score += score;
}
_ = similarity_score.fetchAdd(local_similarity_score, .acq_rel);
// Print the result.
const is_output_logger = ctx.barrier.wait();
if (is_output_logger) {
const result = similarity_score.load(.acquire);
std.debug.print("thread: {d}, puzzle output: {d}\n", .{ ctx.thread_i, result });
}
// Final barrier before deinit defers trigger
_ = ctx.barrier.wait();
}
const SortContext = struct {
all_partitions: []Partition,
whole_partition: Partition,
mins: []u32,
maxs: []u32,
/// A partition of the value array that is input into the sort function.
pub const Partition = struct {
start: usize,
end: usize,
min_value: u32 = 0,
max_value: u32 = std.math.maxInt(u32),
values: []u32,
value_counts: []usize = undefined,
};
pub fn init(allocator: Allocator, thread_count: usize) !SortContext {
return .{
.all_partitions = try allocator.alloc(Partition, thread_count),
.whole_partition = undefined,
.mins = try allocator.alloc(u32, thread_count),
.maxs = try allocator.alloc(u32, thread_count),
};
}
pub fn deinit(self: *SortContext, allocator: Allocator) void {
allocator.free(self.all_partitions);
allocator.free(self.mins);
allocator.free(self.maxs);
}
};
fn countingSort(values: []u32) !void {
const ctx = thread_ctx;
std.debug.assert(values.len >= 1);
if (values.len == 1) return;
// Quick path if the values input is small
if (values.len < ctx.thread_count * 2) {
if (ctx.thread_i == 0) {
std.mem.sort(u32, values, {}, comptime std.sort.asc(u32));
}
return;
}
// Allocate and initialise the sort context, the context is to be accessed by all threads.
var shared_sort_ctx: SortContext = undefined;
if (ctx.thread_i == 0) {
shared_sort_ctx = try SortContext.init(ctx.allocator, ctx.thread_count);
}
defer if (ctx.thread_i == 0) shared_sort_ctx.deinit(ctx.allocator);
const sort_ctx = ctx.syncPtr(&shared_sort_ctx, 0);
_ = ctx.barrier.wait();
// Split the values array into partitions for each thread to work on.
const partition: *SortContext.Partition = blk: {
const _partition: *SortContext.Partition = &sort_ctx.all_partitions[ctx.thread_i];
const range = partitionValuesForThread(values.len, ctx.thread_i, ctx.thread_count);
_partition.* = .{
.start = range.start,
.end = range.end,
.values = values[range.start..range.end],
};
break :blk _partition;
};
// Do the counting phase, counting occurances of each possible value.
// This workload is split across threads.
partition.max_value = std.mem.max(u32, partition.values);
sort_ctx.maxs[ctx.thread_i] = partition.max_value;
partition.min_value = std.mem.min(u32, partition.values);
sort_ctx.mins[ctx.thread_i] = partition.min_value;
const value_range = (partition.max_value - partition.min_value) + 1;
partition.value_counts = try ctx.allocator.alloc(usize, value_range);
defer ctx.allocator.free(partition.value_counts);
@memset(partition.value_counts, 0);
for (partition.values) |value| {
partition.value_counts[value - partition.min_value] += 1;
}
_ = ctx.barrier.wait();
// Calculate the min and max value across all partitions.
const whole_partition = &sort_ctx.whole_partition;
if (ctx.thread_i == 0) whole_partition.max_value = std.mem.max(u32, sort_ctx.maxs);
if (ctx.thread_i == 1) whole_partition.min_value = std.mem.min(u32, sort_ctx.mins);
// Do the collecting phase, collecting the counting result from each thread and
// finally placing each value into their sorted position.
// This is sequential so must be run on a single thread.
const is_collector_thread = ctx.barrier.wait();
if (is_collector_thread) {
// Allocate the array to summate all other value_counts.
const value_range_all = (whole_partition.max_value - whole_partition.min_value) + 1;
whole_partition.value_counts = try ctx.allocator.alloc(usize, value_range_all);
defer ctx.allocator.free(whole_partition.value_counts);
@memset(whole_partition.value_counts, 0);
// Sum all other value_counts.
for (sort_ctx.all_partitions) |p| {
for (p.value_counts, 0..) |count, val| {
const actual_value = p.min_value + val;
whole_partition.value_counts[actual_value - whole_partition.min_value] += count;
}
}
// Update the values array with the sorted values.
var value_i: usize = 0;
for (0..value_range_all) |i| {
const actual_value = whole_partition.min_value + i;
for (0..whole_partition.value_counts[i]) |_| {
values[value_i] = @intCast(actual_value);
value_i += 1;
}
}
}
// Final barrier before all deinit defers run.
_ = ctx.barrier.wait();
}
/// Splits len up into even sized chunks (as even as possible) for each thread.
/// Returns a {start, end} range.
fn partitionValuesForThread(len: usize, thread_i: usize, thread_count: usize) struct { start: usize, end: usize } {
const values_per_thread = (len / thread_count);
const leftover_values = (len % thread_count);
const thread_has_leftover = (thread_i < leftover_values);
const leftovers_before_this_thread = if (thread_has_leftover) thread_i else leftover_values;
const start = (values_per_thread * thread_i + leftovers_before_this_thread);
const end = (start + values_per_thread);
if (thread_has_leftover) {
return .{ .start = start, .end = end + 1 };
} else {
return .{ .start = start, .end = end };
}
}
/// A synchronization data structure that will make all threads wait until all threads reach
/// a certain point in the codepath. That means you can be sure all logic before a barrier
/// has run before you run logic after a barrier.
/// The barrier automatically resets itself so you can reuse the same barrier object.
/// wait() will also return true for the last thread to reach the barrier which is useful
/// when you need to do some sequential logic on a single thread.
const Barrier = struct {
counter: std.atomic.Value(usize),
mutex: std.Thread.Mutex = .{},
condition: std.Thread.Condition = .{},
num_threads: usize,
pub fn init(num_threads: usize) Barrier {
return .{
.counter = .init(num_threads),
.num_threads = num_threads,
};
}
/// Waits until all other threads also call wait, then it releases all threads.
/// Will elect one of the threads as a "leader" and return true if that thread is the leader.
pub fn wait(self: *Barrier) bool {
self.mutex.lock();
defer self.mutex.unlock();
if (self.counter.fetchSub(1, .acq_rel) == 1) {
self.condition.broadcast();
self.reset();
return true;
} else {
self.condition.wait(&self.mutex);
return false;
}
}
/// Resets the counter back to the total number of threads.
/// Wait automatically calls this once all threads have called wait.
pub fn reset(self: *Barrier) void {
self.counter.store(self.num_threads, .release);
}
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment