Created
December 3, 2025 07:16
-
-
Save byroot/2f80381eaac17efe79c5e0274753545d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| require 'bundler/inline' | |
| gemfile do | |
| gem "benchmark-ips" | |
| gem "connection_pool" | |
| end | |
| class ActiveRecordQueue | |
| def initialize(lock = Monitor.new) | |
| @lock = lock | |
| @cond = @lock.new_cond | |
| @num_waiting = 0 | |
| @queue = [] | |
| end | |
| # Test if any threads are currently waiting on the queue. | |
| def any_waiting? | |
| synchronize do | |
| @num_waiting > 0 | |
| end | |
| end | |
| # Returns the number of threads currently waiting on this | |
| # queue. | |
| def num_waiting | |
| synchronize do | |
| @num_waiting | |
| end | |
| end | |
| # Add +element+ to the queue. Never blocks. | |
| def add(element) | |
| synchronize do | |
| @queue.push element | |
| @cond.signal | |
| end | |
| end | |
| # Add +element+ to the back of the queue. Never blocks. | |
| def add_back(element) | |
| synchronize do | |
| @queue.unshift element | |
| @cond.signal | |
| end | |
| end | |
| # If +element+ is in the queue, remove and return it, or +nil+. | |
| def delete(element) | |
| synchronize do | |
| @queue.delete(element) | |
| end | |
| end | |
| # Remove all elements from the queue. | |
| def clear | |
| synchronize do | |
| @queue.clear | |
| end | |
| end | |
| # Number of elements in the queue. | |
| def size | |
| synchronize do | |
| @queue.size | |
| end | |
| end | |
| # Remove the head of the queue. | |
| # | |
| # If +timeout+ is not given, remove and return the head of the | |
| # queue if the number of available elements is strictly | |
| # greater than the number of threads currently waiting (that | |
| # is, don't jump ahead in line). Otherwise, return +nil+. | |
| # | |
| # If +timeout+ is given, block if there is no element | |
| # available, waiting up to +timeout+ seconds for an element to | |
| # become available. | |
| # | |
| # Raises: | |
| # - ActiveRecord::ConnectionTimeoutError if +timeout+ is given and no element | |
| # becomes available within +timeout+ seconds, | |
| def poll(timeout = nil) | |
| synchronize { internal_poll(timeout) } | |
| end | |
| private | |
| def internal_poll(timeout) | |
| no_wait_poll || (timeout && wait_poll(timeout)) | |
| end | |
| def synchronize(&block) | |
| @lock.synchronize(&block) | |
| end | |
| # Test if the queue currently contains any elements. | |
| def any? | |
| [email protected]? | |
| end | |
| # A thread can remove an element from the queue without | |
| # waiting if and only if the number of currently available | |
| # connections is strictly greater than the number of waiting | |
| # threads. | |
| def can_remove_no_wait? | |
| @queue.size > @num_waiting | |
| end | |
| # Removes and returns the head of the queue if possible, or +nil+. | |
| def remove | |
| @queue.pop | |
| end | |
| # Remove and return the head of the queue if the number of | |
| # available elements is strictly greater than the number of | |
| # threads currently waiting. Otherwise, return +nil+. | |
| def no_wait_poll | |
| remove if can_remove_no_wait? | |
| end | |
| # Waits on the queue up to +timeout+ seconds, then removes and | |
| # returns the head of the queue. | |
| def wait_poll(timeout) | |
| @num_waiting += 1 | |
| t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC) | |
| elapsed = 0 | |
| loop do | |
| @cond.wait(timeout - elapsed) | |
| return remove if any? | |
| elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - t0 | |
| if elapsed >= timeout | |
| msg = "could not obtain a connection from the pool within %0.3f seconds (waited %0.3f seconds); all pooled connections were in use" % | |
| [timeout, elapsed] | |
| raise ConnectionTimeoutError, msg | |
| end | |
| end | |
| ensure | |
| @num_waiting -= 1 | |
| end | |
| end | |
| ar_queue = ActiveRecordQueue.new | |
| ar_queue.add(1) | |
| ar_queue.add(2) | |
| queue = Queue.new([1, 2]) | |
| gem_pool = ConnectionPool::TimedStack.new(5) { 1 } | |
| Benchmark.ips do |x| | |
| x.report("core-queue") { queue << queue.pop } | |
| x.report("ar-queue") { ar_queue.add(ar_queue.poll) } | |
| x.report("cp gem") { gem_pool.push(gem_pool.pop) } | |
| x.compare!(order: :baseline) | |
| end | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| ruby 3.4.6 (2025-09-16 revision dbd83256b1) +YJIT +PRISM [arm64-darwin24] | |
| Warming up -------------------------------------- | |
| core-queue 1.357M i/100ms | |
| ar-queue 559.037k i/100ms | |
| cp gem 278.748k i/100ms | |
| Calculating ------------------------------------- | |
| core-queue 16.361M (± 0.3%) i/s (61.12 ns/i) - 82.796M in 5.060491s | |
| ar-queue 6.267M (± 1.7%) i/s (159.57 ns/i) - 31.865M in 5.086122s | |
| cp gem 2.952M (± 1.5%) i/s (338.80 ns/i) - 14.774M in 5.006458s | |
| Comparison: | |
| core-queue: 16361424.8 i/s | |
| ar-queue: 6266892.1 i/s - 2.61x slower | |
| cp gem: 2951618.2 i/s - 5.54x slower |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment