Last active
November 4, 2025 02:55
-
-
Save perky/2b2eb4e68a6be407ce5e2892f17e33e7 to your computer and use it in GitHub Desktop.
An example of multi-core by default.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| //! An example of Multi-Core by Default | |
| //! See https://www.rfleury.com/p/multi-core-by-default | |
| //! This solves part 1 & 2 of Advent of Code 2024 day 1: https://adventofcode.com/2024/day/1 | |
| //! Code by Luke Perkin (2025) | |
| const std = @import("std"); | |
| const Allocator = std.mem.Allocator; | |
| const Thread = std.Thread; | |
| threadlocal var thread_ctx: *ThreadContext = undefined; | |
| /// This will be stored into a threadlocal variable | |
| /// and different for each thread. | |
| const ThreadContext = struct { | |
| thread_i: usize, | |
| thread_count: usize, | |
| allocator: Allocator, | |
| barrier: *Barrier, | |
| shared_storage: []u8, | |
| // Syncs a value from src_thread_i to all other threads. | |
| pub fn syncValue(self: *ThreadContext, inout_val: anytype, src_thread_i: usize) void { | |
| const T = @TypeOf(inout_val); | |
| const pointer = @typeInfo(T).pointer; | |
| std.debug.assert(pointer.size == .one); | |
| const size_t = @sizeOf(pointer.child); | |
| if (self.thread_i == src_thread_i) { | |
| const as_bytes = std.mem.asBytes(inout_val); | |
| @memcpy(self.shared_storage[0..size_t], as_bytes); | |
| } | |
| _ = self.barrier.wait(); | |
| if (self.thread_i != src_thread_i) { | |
| const as_bytes = std.mem.asBytes(inout_val); | |
| @memcpy(as_bytes, self.shared_storage[0..size_t]); | |
| } | |
| _ = self.barrier.wait(); | |
| } | |
| // Syncs a pointer from src_thread_i to all other threads. | |
| // Returns the synced pointer. | |
| pub fn syncPtr(self: *ThreadContext, in_ptr: anytype, src_thread_i: usize) @TypeOf(in_ptr) { | |
| const T = @TypeOf(in_ptr); | |
| const size_t = @sizeOf(T); | |
| var result_ptr: T = undefined; | |
| if (self.thread_i == src_thread_i) { | |
| const ptr_val = @intFromPtr(in_ptr); | |
| const as_bytes = std.mem.asBytes(&ptr_val); | |
| @memcpy(self.shared_storage[0..size_t], as_bytes); | |
| result_ptr = in_ptr; | |
| } | |
| _ = self.barrier.wait(); | |
| if (self.thread_i != src_thread_i) { | |
| var ptr_val = @intFromPtr(in_ptr); | |
| const as_bytes = std.mem.asBytes(&ptr_val); | |
| @memcpy(as_bytes, self.shared_storage[0..size_t]); | |
| const as_val = std.mem.bytesToValue(usize, as_bytes); | |
| result_ptr = @ptrFromInt(as_val); | |
| } | |
| _ = self.barrier.wait(); | |
| return result_ptr; | |
| } | |
| }; | |
| /// The program's entry point. | |
| pub fn main() !void { | |
| // Set up a global memory allocator. | |
| // This will be used by all threads. | |
| var gpa: std.heap.GeneralPurposeAllocator(.{}) = .init; | |
| const allocator = gpa.allocator(); | |
| defer _ = gpa.deinit(); | |
| // Get command line arguments. | |
| const args = try std.process.argsAlloc(allocator); | |
| defer std.process.argsFree(allocator, args); | |
| // Spin up threads equal to the host's cpu core count. | |
| const MAX_THREADS = 1024; | |
| const thread_count = @min(try std.Thread.getCpuCount(), MAX_THREADS); | |
| std.debug.assert(thread_count >= 1); | |
| var threads: [MAX_THREADS]std.Thread = undefined; | |
| var thread_ctxes: [MAX_THREADS]ThreadContext = undefined; | |
| var shared_barrier = Barrier.init(thread_count); | |
| var shared_storage: [512]u8 = undefined; | |
| for (0..thread_count) |thread_i| { | |
| thread_ctxes[thread_i] = ThreadContext{ | |
| .thread_i = thread_i, | |
| .thread_count = thread_count, | |
| .barrier = &shared_barrier, | |
| .shared_storage = &shared_storage, | |
| .allocator = allocator, | |
| }; | |
| threads[thread_i] = try std.Thread.spawn(.{}, parallelMain, .{ &thread_ctxes[thread_i], args }); | |
| } | |
| // Wait for all threads to finish. | |
| for (threads[0..thread_count]) |thread| { | |
| thread.join(); | |
| } | |
| } | |
| /// Our threaded main function | |
| fn parallelMain(ctx: *ThreadContext, args: [][:0]u8) void { | |
| // Store context in a threadlocal var. | |
| thread_ctx = ctx; | |
| // Read the process args for an input file to read. | |
| // This will be null on all threads but the first one. | |
| const maybe_input_file_path: ?[:0]const u8 = blk: { | |
| if (ctx.thread_i == 0) { | |
| if (args.len >= 2) { | |
| break :blk args[1]; | |
| } else { | |
| @panic("Must provide an input file to read"); | |
| } | |
| } else break :blk null; | |
| }; | |
| solveAocPuzzles(maybe_input_file_path) catch |err| { | |
| std.log.err("error when trying to solve aoc puzzles: {any}", .{err}); | |
| }; | |
| } | |
| const SolveContext = struct { | |
| left_values: std.ArrayList(u32), | |
| right_values: std.ArrayList(u32), | |
| max_left_val: u32 = 0, | |
| min_left_val: u32 = std.math.maxInt(u32), | |
| max_right_val: u32 = 0, | |
| min_right_val: u32 = std.math.maxInt(u32), | |
| value_counts: ?[]usize = null, | |
| value_counts_mutex: std.Thread.Mutex = .{}, | |
| }; | |
| fn solveAocPuzzles(maybe_input_file_path: ?[:0]const u8) !void { | |
| const ctx = thread_ctx; | |
| // Create a shared bit of memory called the solve context. | |
| // We allocate and initialise this on thread 0, then sync a pointer to that memory | |
| // to all other threads. | |
| var shared_solve_ctx: SolveContext = undefined; | |
| if (ctx.thread_i == 0) { | |
| shared_solve_ctx = SolveContext{ | |
| .left_values = try .initCapacity(ctx.allocator, 1024), | |
| .right_values = try .initCapacity(ctx.allocator, 1024), | |
| }; | |
| } | |
| defer if (ctx.thread_i == 0) { | |
| shared_solve_ctx.left_values.deinit(ctx.allocator); | |
| shared_solve_ctx.right_values.deinit(ctx.allocator); | |
| if (shared_solve_ctx.value_counts) |value_counts| { | |
| ctx.allocator.free(value_counts); | |
| } | |
| }; | |
| const solve_ctx = ctx.syncPtr(&shared_solve_ctx, 0); | |
| // Parse the input file on a single thread. | |
| if (maybe_input_file_path) |input_file_path| { | |
| try parseAocPuzzleInput(ctx.allocator, input_file_path, solve_ctx); | |
| } | |
| _ = ctx.barrier.wait(); | |
| try solveAocPuzzlePart1(solve_ctx); | |
| try solveAocPuzzlePart2(solve_ctx); | |
| } | |
| fn parseAocPuzzleInput(allocator: Allocator, input_file_path: [:0]const u8, solve_ctx: *SolveContext) !void { | |
| std.debug.print("parsing file...\n", .{}); | |
| // Zig's new reader/writer interface (as of 0.15) is little clunky, but it is WIP to be fair | |
| const file = try std.fs.cwd().openFile(input_file_path, .{ .mode = .read_only }); | |
| defer file.close(); | |
| var read_buf: [2048]u8 = undefined; // The buffer is needed to do buffered reads, reducing the number of kernel calls. | |
| var file_reader = file.reader(&read_buf); | |
| const file_reader_interface: *std.Io.Reader = &file_reader.interface; // Get the Reader interface (must be a pointer due to vtable stuffs). | |
| // Go over each line in the file and parse the left and right number. | |
| while (file_reader_interface.takeDelimiterExclusive('\n')) |line| { | |
| const trimmed_line = std.mem.trimEnd(u8, line, "\n\r "); | |
| var tokens = std.mem.tokenizeScalar(u8, trimmed_line, ' '); | |
| if (tokens.next()) |bytes| { | |
| const left_val = try std.fmt.parseInt(u32, bytes, 10); | |
| if (left_val > solve_ctx.max_left_val) solve_ctx.max_left_val = left_val; | |
| if (left_val < solve_ctx.min_left_val) solve_ctx.min_left_val = left_val; | |
| try solve_ctx.left_values.append(allocator, left_val); | |
| } | |
| if (tokens.next()) |bytes| { | |
| const right_val = try std.fmt.parseInt(u32, bytes, 10); | |
| if (right_val > solve_ctx.max_right_val) solve_ctx.max_right_val = right_val; | |
| if (right_val < solve_ctx.min_right_val) solve_ctx.min_right_val = right_val; | |
| try solve_ctx.right_values.append(allocator, right_val); | |
| } | |
| } else |err| switch (err) { | |
| error.EndOfStream => {}, | |
| else => return err, | |
| } | |
| const right_val_range = (solve_ctx.max_right_val - solve_ctx.min_right_val) + 1; | |
| solve_ctx.value_counts = try allocator.alloc(usize, right_val_range); | |
| @memset(solve_ctx.value_counts.?, 0); | |
| std.debug.print("has parsed file...\n", .{}); | |
| } | |
| /// Remember this function is called on every thread, one for each cpu core. | |
| fn solveAocPuzzlePart1(solve_ctx: *SolveContext) !void { | |
| const ctx = thread_ctx; | |
| const left_values: *std.ArrayList(u32) = &solve_ctx.left_values; | |
| const right_values: *std.ArrayList(u32) = &solve_ctx.right_values; | |
| try countingSort(left_values.items); | |
| try countingSort(right_values.items); | |
| // Must do a barrier wait here, to make sure we've sorted everything. | |
| _ = ctx.barrier.wait(); | |
| // Now that both left and right is sorted, we can get the difference between each very easily. | |
| // This is run parallel, we use an atomic variable to add together the results of each thread. | |
| // This atomic value is stored on this functions stack, but exists multiple times as multi threads | |
| // are calling this function. So we're going to sync a pointer to shared_all_differences_sum that exists on thread 0 | |
| // to all other threads. This is how we can share data across threads without having to prepare static variables. | |
| var shared_all_differences_sum: std.atomic.Value(u32) = .init(0); | |
| var all_differences_sum = ctx.syncPtr(&shared_all_differences_sum, 0); | |
| std.debug.assert(left_values.items.len == right_values.items.len); | |
| const work_range = partitionValuesForThread(left_values.items.len, ctx.thread_i, ctx.thread_count); | |
| var differences_sum: u32 = 0; | |
| for (work_range.start..work_range.end) |i| { | |
| const l_val = left_values.items[i]; | |
| const r_val = right_values.items[i]; | |
| const max_val = @max(l_val, r_val); | |
| const min_val = @min(l_val, r_val); | |
| const difference = max_val - min_val; | |
| differences_sum += difference; | |
| } | |
| _ = all_differences_sum.fetchAdd(differences_sum, .acq_rel); | |
| // Print the result. | |
| const is_output_logger = ctx.barrier.wait(); | |
| if (is_output_logger) { | |
| const result = all_differences_sum.load(.acquire); | |
| std.debug.print("thread: {d}, puzzle output: {d}\n", .{ ctx.thread_i, result }); | |
| } | |
| } | |
| /// Remember this function is called on every thread, one for each cpu core. | |
| fn solveAocPuzzlePart2(solve_ctx: *SolveContext) !void { | |
| const ctx = thread_ctx; | |
| const left_values: *std.ArrayList(u32) = &solve_ctx.left_values; | |
| const right_values: *std.ArrayList(u32) = &solve_ctx.right_values; | |
| // Split the workload across threads | |
| const work_range = partitionValuesForThread(right_values.items.len, ctx.thread_i, ctx.thread_count); | |
| const right_work_slice = right_values.items[work_range.start..work_range.end]; | |
| // Count up occurances of each value in right_work_slice | |
| const max_val: u32 = std.mem.max(u32, right_work_slice); | |
| const min_val: u32 = std.mem.min(u32, right_work_slice); | |
| const val_range: u32 = (max_val - min_val) + 1; | |
| const val_counts = try ctx.allocator.alloc(usize, val_range); | |
| defer ctx.allocator.free(val_counts); | |
| @memset(val_counts, 0); | |
| for (right_work_slice) |val| { | |
| val_counts[val - min_val] += 1; | |
| } | |
| // Collect the thread local value_counts into a shared value_counts. | |
| // Uses a mutex so only one thread at a time will write into the shared resource. | |
| { | |
| solve_ctx.value_counts_mutex.lock(); | |
| defer solve_ctx.value_counts_mutex.unlock(); | |
| for (val_counts, 0..) |count, i| { | |
| solve_ctx.value_counts.?[(i + min_val) - solve_ctx.min_right_val] += count; | |
| } | |
| } | |
| // Calculate the similarity score. | |
| var shared_similarity_score: std.atomic.Value(u64) = .init(0); | |
| var similarity_score = ctx.syncPtr(&shared_similarity_score, 0); | |
| const left_work_slice = left_values.items[work_range.start..work_range.end]; | |
| var local_similarity_score: u64 = 0; | |
| for (left_work_slice) |val| { | |
| if (val < solve_ctx.min_right_val) continue; | |
| if (val > solve_ctx.max_right_val) continue; | |
| const count_i = val - solve_ctx.min_right_val; | |
| const count = solve_ctx.value_counts.?[count_i]; | |
| const score: u64 = @intCast(count * val); | |
| local_similarity_score += score; | |
| } | |
| _ = similarity_score.fetchAdd(local_similarity_score, .acq_rel); | |
| // Print the result. | |
| const is_output_logger = ctx.barrier.wait(); | |
| if (is_output_logger) { | |
| const result = similarity_score.load(.acquire); | |
| std.debug.print("thread: {d}, puzzle output: {d}\n", .{ ctx.thread_i, result }); | |
| } | |
| // Final barrier before deinit defers trigger | |
| _ = ctx.barrier.wait(); | |
| } | |
| const SortContext = struct { | |
| all_partitions: []Partition, | |
| whole_partition: Partition, | |
| mins: []u32, | |
| maxs: []u32, | |
| /// A partition of the value array that is input into the sort function. | |
| pub const Partition = struct { | |
| start: usize, | |
| end: usize, | |
| min_value: u32 = 0, | |
| max_value: u32 = std.math.maxInt(u32), | |
| values: []u32, | |
| value_counts: []usize = undefined, | |
| }; | |
| pub fn init(allocator: Allocator, thread_count: usize) !SortContext { | |
| return .{ | |
| .all_partitions = try allocator.alloc(Partition, thread_count), | |
| .whole_partition = undefined, | |
| .mins = try allocator.alloc(u32, thread_count), | |
| .maxs = try allocator.alloc(u32, thread_count), | |
| }; | |
| } | |
| pub fn deinit(self: *SortContext, allocator: Allocator) void { | |
| allocator.free(self.all_partitions); | |
| allocator.free(self.mins); | |
| allocator.free(self.maxs); | |
| } | |
| }; | |
| fn countingSort(values: []u32) !void { | |
| const ctx = thread_ctx; | |
| std.debug.assert(values.len >= 1); | |
| if (values.len == 1) return; | |
| // Quick path if the values input is small | |
| if (values.len < ctx.thread_count * 2) { | |
| if (ctx.thread_i == 0) { | |
| std.mem.sort(u32, values, {}, comptime std.sort.asc(u32)); | |
| } | |
| return; | |
| } | |
| // Allocate and initialise the sort context, the context is to be accessed by all threads. | |
| var shared_sort_ctx: SortContext = undefined; | |
| if (ctx.thread_i == 0) { | |
| shared_sort_ctx = try SortContext.init(ctx.allocator, ctx.thread_count); | |
| } | |
| defer if (ctx.thread_i == 0) shared_sort_ctx.deinit(ctx.allocator); | |
| const sort_ctx = ctx.syncPtr(&shared_sort_ctx, 0); | |
| _ = ctx.barrier.wait(); | |
| // Split the values array into partitions for each thread to work on. | |
| const partition: *SortContext.Partition = blk: { | |
| const _partition: *SortContext.Partition = &sort_ctx.all_partitions[ctx.thread_i]; | |
| const range = partitionValuesForThread(values.len, ctx.thread_i, ctx.thread_count); | |
| _partition.* = .{ | |
| .start = range.start, | |
| .end = range.end, | |
| .values = values[range.start..range.end], | |
| }; | |
| break :blk _partition; | |
| }; | |
| // Do the counting phase, counting occurances of each possible value. | |
| // This workload is split across threads. | |
| partition.max_value = std.mem.max(u32, partition.values); | |
| sort_ctx.maxs[ctx.thread_i] = partition.max_value; | |
| partition.min_value = std.mem.min(u32, partition.values); | |
| sort_ctx.mins[ctx.thread_i] = partition.min_value; | |
| const value_range = (partition.max_value - partition.min_value) + 1; | |
| partition.value_counts = try ctx.allocator.alloc(usize, value_range); | |
| defer ctx.allocator.free(partition.value_counts); | |
| @memset(partition.value_counts, 0); | |
| for (partition.values) |value| { | |
| partition.value_counts[value - partition.min_value] += 1; | |
| } | |
| _ = ctx.barrier.wait(); | |
| // Calculate the min and max value across all partitions. | |
| const whole_partition = &sort_ctx.whole_partition; | |
| if (ctx.thread_i == 0) whole_partition.max_value = std.mem.max(u32, sort_ctx.maxs); | |
| if (ctx.thread_i == 1) whole_partition.min_value = std.mem.min(u32, sort_ctx.mins); | |
| // Do the collecting phase, collecting the counting result from each thread and | |
| // finally placing each value into their sorted position. | |
| // This is sequential so must be run on a single thread. | |
| const is_collector_thread = ctx.barrier.wait(); | |
| if (is_collector_thread) { | |
| // Allocate the array to summate all other value_counts. | |
| const value_range_all = (whole_partition.max_value - whole_partition.min_value) + 1; | |
| whole_partition.value_counts = try ctx.allocator.alloc(usize, value_range_all); | |
| defer ctx.allocator.free(whole_partition.value_counts); | |
| @memset(whole_partition.value_counts, 0); | |
| // Sum all other value_counts. | |
| for (sort_ctx.all_partitions) |p| { | |
| for (p.value_counts, 0..) |count, val| { | |
| const actual_value = p.min_value + val; | |
| whole_partition.value_counts[actual_value - whole_partition.min_value] += count; | |
| } | |
| } | |
| // Update the values array with the sorted values. | |
| var value_i: usize = 0; | |
| for (0..value_range_all) |i| { | |
| const actual_value = whole_partition.min_value + i; | |
| for (0..whole_partition.value_counts[i]) |_| { | |
| values[value_i] = @intCast(actual_value); | |
| value_i += 1; | |
| } | |
| } | |
| } | |
| // Final barrier before all deinit defers run. | |
| _ = ctx.barrier.wait(); | |
| } | |
| /// Splits len up into even sized chunks (as even as possible) for each thread. | |
| /// Returns a {start, end} range. | |
| fn partitionValuesForThread(len: usize, thread_i: usize, thread_count: usize) struct { start: usize, end: usize } { | |
| const values_per_thread = (len / thread_count); | |
| const leftover_values = (len % thread_count); | |
| const thread_has_leftover = (thread_i < leftover_values); | |
| const leftovers_before_this_thread = if (thread_has_leftover) thread_i else leftover_values; | |
| const start = (values_per_thread * thread_i + leftovers_before_this_thread); | |
| const end = (start + values_per_thread); | |
| if (thread_has_leftover) { | |
| return .{ .start = start, .end = end + 1 }; | |
| } else { | |
| return .{ .start = start, .end = end }; | |
| } | |
| } | |
| /// A synchronization data structure that will make all threads wait until all threads reach | |
| /// a certain point in the codepath. That means you can be sure all logic before a barrier | |
| /// has run before you run logic after a barrier. | |
| /// The barrier automatically resets itself so you can reuse the same barrier object. | |
| /// wait() will also return true for the last thread to reach the barrier which is useful | |
| /// when you need to do some sequential logic on a single thread. | |
| const Barrier = struct { | |
| counter: std.atomic.Value(usize), | |
| mutex: std.Thread.Mutex = .{}, | |
| condition: std.Thread.Condition = .{}, | |
| num_threads: usize, | |
| pub fn init(num_threads: usize) Barrier { | |
| return .{ | |
| .counter = .init(num_threads), | |
| .num_threads = num_threads, | |
| }; | |
| } | |
| /// Waits until all other threads also call wait, then it releases all threads. | |
| /// Will elect one of the threads as a "leader" and return true if that thread is the leader. | |
| pub fn wait(self: *Barrier) bool { | |
| self.mutex.lock(); | |
| defer self.mutex.unlock(); | |
| if (self.counter.fetchSub(1, .acq_rel) == 1) { | |
| self.condition.broadcast(); | |
| self.reset(); | |
| return true; | |
| } else { | |
| self.condition.wait(&self.mutex); | |
| return false; | |
| } | |
| } | |
| /// Resets the counter back to the total number of threads. | |
| /// Wait automatically calls this once all threads have called wait. | |
| pub fn reset(self: *Barrier) void { | |
| self.counter.store(self.num_threads, .release); | |
| } | |
| }; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment