Created
November 24, 2025 21:04
-
-
Save ruurdadema/9dad00b303cf8225f9f41077c49cfa55 to your computer and use it in GitHub Desktop.
Rcu (experimental)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /* | |
| * Owllab License Agreement | |
| * | |
| * This software is provided by Owllab and may not be used, copied, modified, | |
| * merged, published, distributed, sublicensed, or sold without a valid and | |
| * explicit agreement with Owllab. | |
| * | |
| * Copyright (c) 2025 Owllab. All rights reserved. | |
| */ | |
| #pragma once | |
| namespace rav { | |
| /** | |
| * Holds the number of instances created and destroyed. Useful for tracking object creation and destruction in tests. | |
| */ | |
| class ObjectCounter { | |
| public: | |
| size_t instances_created = 0; | |
| size_t instances_alive = 0; | |
| }; | |
| /** | |
| * Little helper class which keeps track of how many instances of this class have been created and how many are still | |
| * alive. Useful to track object creation and destruction in tests. | |
| */ | |
| class CountedObject { | |
| public: | |
| CountedObject() = delete; | |
| explicit CountedObject(ObjectCounter& counter) : counter_(counter), index_(counter.instances_created++) { | |
| ++counter_.instances_alive; | |
| } | |
| ~CountedObject() { | |
| --counter_.instances_alive; | |
| } | |
| CountedObject(const CountedObject&) = delete; | |
| CountedObject& operator=(const CountedObject&) = delete; | |
| CountedObject(CountedObject&&) = delete; | |
| CountedObject& operator=(CountedObject&&) = delete; | |
| /** | |
| * | |
| * @return The index of the object, which is based on given counter. | |
| */ | |
| [[nodiscard]] size_t index() const { | |
| return index_; | |
| } | |
| private: | |
| ObjectCounter& counter_; | |
| size_t index_ {}; | |
| }; | |
| } // namespace rav |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /* | |
| * Owllab License Agreement | |
| * | |
| * This software is provided by Owllab and may not be used, copied, modified, | |
| * merged, published, distributed, sublicensed, or sold without a valid and | |
| * explicit agreement with Owllab. | |
| * | |
| * Copyright (c) 2025 Owllab. All rights reserved. | |
| */ | |
| #pragma once | |
| #include "ravennakit/core/assert.hpp" | |
| #include <mutex> | |
| #include <atomic> | |
| #include <memory> | |
| #include <thread> | |
| namespace rav { | |
| /** | |
| * This class behaves like a Read-Copy-Update (RCU) synchronization mechanism and allows to share objects among multiple | |
| * readers which can read the most recent value in a wait-free manner (which also implies lock-free). | |
| * | |
| * The writer side is protected by a mutex and can update the value in a thread safe way. | |
| * It's important to reclaim memory by calling reclaim() periodically to delete outdated values. As long as there are | |
| * readers using an object, the object and newer objects won't be deleted. | |
| * | |
| * To give a realtime thread access to objects, create a reader object and use the read_lock() method to get a lock. The | |
| * lock provides access to the object through the * and -> operators, as well as the get() method. | |
| * | |
| * @tparam T The type of the object to share. | |
| */ | |
| template<class T> | |
| class Rcu { | |
| public: | |
| /** | |
| * A reader object gives a single thread access to the most recent value. Store a reader object per thread, and use | |
| * read_lock to acquire a lock which in turn provides access to the value. | |
| */ | |
| class Reader { | |
| public: | |
| /** | |
| * A lock object provides access to the value. Getting and using a lock is wait-free. | |
| * Each lock will get the same value as long as there is at least one lock alive. | |
| */ | |
| class RealtimeLock { | |
| public: | |
| /** | |
| * Constructs a lock from given reader. | |
| * All methods are real-time safe (wait-free), but not thread safe. | |
| * Real-time safe: yes, wait-free. | |
| * Thread safe: no. | |
| * @param parent_reader The reader to associate this lock with. | |
| */ | |
| explicit RealtimeLock(Reader& parent_reader) : reader_(&parent_reader) { | |
| if (reader_->num_locks_.fetch_add(1) >= 1) { | |
| // We load the existing value, which results in every lock getting the same value as long as there | |
| // is at least one lock alive. This is by design. | |
| value_ = reader_->value_; | |
| } else { | |
| // As long as we progress the epoch forward, there is no aba problem here. The only effect is that a | |
| // value might be reclaimed later. | |
| reader_->epoch_.store(reader_->owner_.current_epoch_.load()); | |
| // The value we load might belong to a newer epoch than the one we loaded, but this is no problem | |
| // because newer values than the oldest used value are never deleted. | |
| value_ = reader_->value_ = reader_->owner_.most_recent_value_.load(); | |
| } | |
| } | |
| ~RealtimeLock() { | |
| reset(); | |
| } | |
| RealtimeLock(const RealtimeLock&) = delete; | |
| RealtimeLock& operator=(const RealtimeLock&) = delete; | |
| RealtimeLock(RealtimeLock&&) = delete; | |
| RealtimeLock& operator=(RealtimeLock&&) = delete; | |
| /** | |
| * @returns True if the lock holds a value, false otherwise. | |
| */ | |
| explicit operator bool() const { | |
| return value_ != nullptr; | |
| } | |
| /** | |
| * Real-time safe: yes, wait-free. | |
| * Thread safe: no. | |
| * @return A reference to the contained object. Reference is only valid if the value is not nullptr. | |
| */ | |
| T& operator*() { | |
| RAV_ASSERT(value_ != nullptr, "Value is nullptr"); | |
| return *value_; | |
| } | |
| /** | |
| * Real-time safe: yes, wait-free. | |
| * Thread safe: no. | |
| * @return A reference to the contained object. Reference is only valid if the value is not nullptr. | |
| */ | |
| const T& operator*() const { | |
| RAV_ASSERT(value_ != nullptr, "Value is nullptr"); | |
| return *value_; | |
| } | |
| /** | |
| * Real-time safe: yes, wait-free. | |
| * Thread safe: no. | |
| * @return A pointer to the contained object, or nullptr if the value is nullptr. | |
| */ | |
| T* operator->() { | |
| RAV_ASSERT(value_ != nullptr, "Value is nullptr"); | |
| return value_; | |
| } | |
| /** | |
| * Real-time safe: yes, wait-free. | |
| * Thread safe: no. | |
| * @return A pointer to the contained object, or nullptr if the value is nullptr. | |
| */ | |
| const T* operator->() const { | |
| RAV_ASSERT(value_ != nullptr, "Value is nullptr"); | |
| return value_; | |
| } | |
| /** | |
| * Real-time safe: yes, wait-free. | |
| * Thread safe: no. | |
| * @return A pointer to the contained object, or nullptr if the value is nullptr. | |
| */ | |
| T* get() { | |
| return value_; | |
| } | |
| /** | |
| * Real-time safe: yes, wait-free. | |
| * Thread safe: no. | |
| * @return A pointer to the contained object, or nullptr if the value is nullptr. | |
| */ | |
| const T* get() const { | |
| return value_; | |
| } | |
| /** | |
| * Resets this lock, releasing the value. | |
| * Real-time safe: yes, wait-free. | |
| * Thread safe: no. | |
| */ | |
| void reset() { | |
| value_ = nullptr; | |
| if (reader_ == nullptr) { | |
| return; | |
| } | |
| reader_->num_locks_.fetch_sub(1); | |
| RAV_ASSERT_NO_THROW(reader_->num_locks_ >= 0, "Number of locks should be non-negative"); | |
| reader_ = nullptr; | |
| } | |
| private: | |
| Reader* reader_ {nullptr}; | |
| T* value_ {nullptr}; | |
| }; | |
| /** | |
| * Constructs a reader object and registers it with the owner. | |
| * Real-time safe: no. | |
| * Thread safe: yes. | |
| * @param owner The owner of this reader. | |
| */ | |
| explicit Reader(Rcu& owner) : owner_(owner) { | |
| std::lock_guard lock(owner_.readers_mutex_); | |
| owner_.readers_.push_back(this); | |
| } | |
| /** | |
| * Destructor | |
| * Real-time safe: no. | |
| * Thread safe: yes. | |
| */ | |
| ~Reader() { | |
| std::lock_guard lock(owner_.readers_mutex_); | |
| owner_.readers_.erase(std::remove(owner_.readers_.begin(), owner_.readers_.end(), this), owner_.readers_.end()); | |
| } | |
| /** | |
| * Creates a lock object which provides access to the value. | |
| * If there is another lock alive, the new lock will get the same value. Once all locks are destroyed, the value | |
| * will be updated. | |
| * Real-time safe: yes, wait-free. | |
| * Thread safe: no. | |
| * @return The lock object. | |
| */ | |
| RealtimeLock lock_realtime() { | |
| return RealtimeLock(*this); | |
| } | |
| private: | |
| friend class Rcu; | |
| Rcu& owner_; | |
| T* value_ {}; | |
| std::atomic<uint64_t> epoch_ {0}; | |
| std::atomic<int64_t> num_locks_ {0}; | |
| }; | |
| Rcu() = default; | |
| /** | |
| * Constructs an rcu object with a new value. | |
| * Real-time safe: no. | |
| * Thread safe: yes. | |
| * @param new_value | |
| */ | |
| explicit Rcu(std::unique_ptr<T> new_value) { | |
| update(std::move(new_value)); | |
| } | |
| /** | |
| * Constructs an rcu object with a new value. | |
| * Real-time safe: no. | |
| * Thread safe: yes. | |
| * @param value | |
| */ | |
| explicit Rcu(T value) { | |
| update(std::make_unique<T>(std::move(value))); | |
| } | |
| /** | |
| * Real-time safe: no. | |
| * Thread safe: yes. | |
| * @return A reader object which uses this rcu object. | |
| */ | |
| Reader create_reader() { | |
| return Reader(*this); | |
| } | |
| /** | |
| * Updates the current value with a new value constructed from the given arguments. | |
| * Real-time safe: no. | |
| * Thread safe: yes. | |
| * @tparam Args The types of the arguments. | |
| * @param args The arguments to construct the new value. | |
| */ | |
| template<class... Args> | |
| void update(Args&&... args) { | |
| update(std::make_unique<T>(std::forward<Args>(args)...)); | |
| } | |
| /** | |
| * Updates the current value with a new value. | |
| * Real-time safe: no. | |
| * Thread safe: yes. | |
| * @param new_value New value to set. | |
| */ | |
| void update(std::unique_ptr<T> new_value) { | |
| std::lock_guard lock(values_mutex_); | |
| update_internal(std::move(new_value)); | |
| } | |
| /** | |
| * Updates the current value with a new value. Reclaims all previous values. After this function returns all | |
| * previous values have been destroyed. | |
| * Real-time safe: no. | |
| * Thread safe: yes. | |
| * @tparam Args The types of the arguments. | |
| * @param args The arguments to construct the new value. | |
| */ | |
| template<class... Args> | |
| void update_reclaim_all(Args&&... args) { | |
| update_reclaim_all(std::make_unique<T>(std::forward<Args>(args)...)); | |
| } | |
| /** | |
| * Updates the current value with a new value and reclaims all previous values. After this function returns, all | |
| * previous values will have been destroyed. | |
| * Real-time safe: no. | |
| * Thread safe: yes. | |
| * @param new_value The new value to set. | |
| */ | |
| void update_reclaim_all(std::unique_ptr<T> new_value) { | |
| std::lock_guard lock(values_mutex_); | |
| update_internal(std::move(new_value)); | |
| std::ignore = reclaim_internal(); | |
| const auto current_epoch = current_epoch_.load(); | |
| if (values_.size() == 1) { | |
| RAV_ASSERT(values_.front().epoch == current_epoch, "Oldest epoch should be equal to current epoch"); | |
| return; | |
| } | |
| // There are older values than the current one, so we need to reclaim them. | |
| for (uint64_t epoch = values_.front().epoch; epoch < current_epoch; epoch = values_.front().epoch) { | |
| if (values_.size() == 1) { | |
| break; // No more values to reclaim | |
| } | |
| std::ignore = reclaim_internal(); | |
| std::this_thread::yield(); | |
| } | |
| RAV_ASSERT(!values_.empty(), "The last value should have never been reclaimed"); | |
| RAV_ASSERT(values_.front().epoch >= current_epoch, "Oldest epoch should be equal or newer to current epoch"); | |
| } | |
| /** | |
| * Clears the current value. | |
| * Real-time safe: no. | |
| * Thread safe: yes. | |
| */ | |
| void clear() { | |
| update({}); | |
| } | |
| /** | |
| * Reclaims all values which are not used by any reader anymore. Only older objects than the first object used by | |
| * any reader are deleted. | |
| * Real-time safe: no. | |
| * Thread safe: yes. | |
| * @return The number of values which were reclaimed (deleted). | |
| */ | |
| [[nodiscard]] size_t reclaim() { | |
| std::lock_guard lock(values_mutex_); | |
| return reclaim_internal(); | |
| } | |
| /** | |
| * @return The number of values currently stored. | |
| */ | |
| size_t get_num_values() { | |
| std::lock_guard lock(values_mutex_); | |
| return values_.size(); | |
| } | |
| private: | |
| struct EpochAndValue { | |
| uint64_t epoch; | |
| std::unique_ptr<T> value; | |
| }; | |
| // Protects the values_ vector. | |
| std::mutex values_mutex_; | |
| // Holds the current and previous values. | |
| std::vector<EpochAndValue> values_; | |
| // Protects the readers_ vector. | |
| std::mutex readers_mutex_; | |
| // Holds the readers. | |
| std::vector<Reader*> readers_; | |
| // Stores the most recent value. | |
| std::atomic<T*> most_recent_value_ {nullptr}; | |
| // Holds the current epoch. | |
| std::atomic<uint64_t> current_epoch_ {}; | |
| void update_internal(std::unique_ptr<T> new_value) { | |
| most_recent_value_.store(new_value.get()); | |
| // At this point a reader takes most_recent_value_ with current epoch, which is not a problem because newer | |
| // values than the oldest used value are never deleted. | |
| auto epoch = current_epoch_.fetch_add(1) + 1; | |
| values_.emplace_back(EpochAndValue {epoch, std::move(new_value)}); | |
| } | |
| [[nodiscard]] size_t reclaim_internal() { | |
| if (current_epoch_ == 0) { | |
| return 0; // Nothing to reclaim since we're in default state | |
| } | |
| RAV_ASSERT(!values_.empty(), "The last value should have never been reclaimed"); | |
| size_t num_reclaimed = 0; | |
| for (auto it = values_.begin(); it != values_.end() - 1;) { | |
| std::lock_guard readers_lock(readers_mutex_); | |
| for (const auto* r : readers_) { | |
| // r->num_locks_ might be changed by another thread at some point, but this has no consequence because | |
| // in that case it will load a newer value. | |
| if (r->num_locks_.load() > 0 && r->epoch_.load() <= it->epoch) { | |
| // There is a reader with the epoch of this value, so we can't delete this just yet (or any newer | |
| // values). | |
| return num_reclaimed; | |
| } | |
| } | |
| it = values_.erase(it); | |
| ++num_reclaimed; | |
| } | |
| return num_reclaimed; | |
| } | |
| }; | |
| } // namespace rav |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /* | |
| * Owllab License Agreement | |
| * | |
| * This software is provided by Owllab and may not be used, copied, modified, | |
| * merged, published, distributed, sublicensed, or sold without a valid and | |
| * explicit agreement with Owllab. | |
| * | |
| * Copyright (c) 2025 Owllab. All rights reserved. | |
| */ | |
| #include "ravennakit/core/sync/rcu.hpp" | |
| #include "ravennakit/core/util/object_counter.hpp" | |
| #include <future> | |
| #include <thread> | |
| #include <catch2/catch_all.hpp> | |
| static_assert(!std::is_copy_constructible_v<rav::Rcu<int>>); | |
| static_assert(!std::is_move_constructible_v<rav::Rcu<int>>); | |
| static_assert(!std::is_copy_assignable_v<rav::Rcu<int>>); | |
| static_assert(!std::is_move_assignable_v<rav::Rcu<int>>); | |
| static_assert(!std::is_copy_constructible_v<rav::Rcu<int>::Reader>); | |
| static_assert(!std::is_move_constructible_v<rav::Rcu<int>::Reader>); | |
| static_assert(!std::is_copy_assignable_v<rav::Rcu<int>::Reader>); | |
| static_assert(!std::is_move_assignable_v<rav::Rcu<int>::Reader>); | |
| static_assert(!std::is_copy_constructible_v<rav::Rcu<int>::Reader::RealtimeLock>); | |
| static_assert(!std::is_move_constructible_v<rav::Rcu<int>::Reader::RealtimeLock>); | |
| static_assert(!std::is_copy_assignable_v<rav::Rcu<int>::Reader::RealtimeLock>); | |
| static_assert(!std::is_move_assignable_v<rav::Rcu<int>::Reader::RealtimeLock>); | |
| namespace { | |
| constexpr auto k_timeout_seconds = 60; | |
| } | |
| TEST_CASE("rav::Rcu") { | |
| SECTION("Default state") { | |
| rav::Rcu<int> rcu; | |
| rav::Rcu<int>::Reader reader(rcu); | |
| const auto lock = reader.lock_realtime(); | |
| REQUIRE(lock.get() == nullptr); | |
| } | |
| SECTION("Reclaim on default object") { | |
| rav::Rcu<int> rcu; | |
| REQUIRE(rcu.reclaim() == 0); | |
| } | |
| SECTION("Basic operation") { | |
| rav::Rcu<std::string> rcu; | |
| rav::Rcu<std::string>::Reader reader(rcu); | |
| { | |
| const auto lock = reader.lock_realtime(); | |
| REQUIRE(lock.get() == nullptr); | |
| rcu.update("Hello, World!"); | |
| // As long as the first lock is alive, the value won't be updated for subsequent locks of the same reader. | |
| const auto lock2 = reader.lock_realtime(); | |
| REQUIRE(lock.get() == nullptr); | |
| } | |
| // Once the previous locks are destroyed, new locks will get the updated value. | |
| const auto lock3 = reader.lock_realtime(); | |
| REQUIRE(*lock3 == "Hello, World!"); | |
| // Additional locks will get the same value. | |
| const auto lock4 = reader.lock_realtime(); | |
| REQUIRE(lock3.get() == lock4.get()); | |
| } | |
| SECTION("Track object lifetime") { | |
| rav::ObjectCounter counter; | |
| rav::Rcu<rav::CountedObject> rcu; | |
| rcu.update(counter); | |
| REQUIRE(counter.instances_created == 1); | |
| REQUIRE(counter.instances_alive == 1); | |
| rcu.update(counter); | |
| REQUIRE(counter.instances_created == 2); | |
| REQUIRE(counter.instances_alive == 2); | |
| REQUIRE(rcu.reclaim() == 1); | |
| REQUIRE(counter.instances_created == 2); | |
| REQUIRE(counter.instances_alive == 1); | |
| auto reader = rcu.create_reader(); | |
| { | |
| const auto lock1 = reader.lock_realtime(); | |
| REQUIRE(lock1.get() != nullptr); | |
| REQUIRE(lock1->index() == 1); | |
| } | |
| rcu.update(counter); | |
| REQUIRE(counter.instances_created == 3); | |
| REQUIRE(counter.instances_alive == 2); | |
| { | |
| const auto lock2 = reader.lock_realtime(); | |
| REQUIRE(lock2.get() != nullptr); | |
| REQUIRE(lock2->index() == 2); | |
| rcu.update(counter); | |
| REQUIRE(lock2.get() != nullptr); | |
| REQUIRE(lock2->index() == 2); | |
| } | |
| const auto lock3 = reader.lock_realtime(); | |
| REQUIRE(lock3.get() != nullptr); | |
| REQUIRE(lock3->index() == 3); | |
| REQUIRE(rcu.reclaim() == 2); | |
| REQUIRE(counter.instances_created == 4); | |
| REQUIRE(counter.instances_alive == 1); | |
| } | |
| SECTION("The value can be cleared") { | |
| rav::ObjectCounter counter; | |
| rav::Rcu<rav::CountedObject> rcu; | |
| auto reader = rcu.create_reader(); | |
| rcu.update(counter); | |
| REQUIRE(counter.instances_created == 1); | |
| REQUIRE(counter.instances_alive == 1); | |
| { | |
| auto lock = reader.lock_realtime(); | |
| REQUIRE(lock.get() != nullptr); | |
| REQUIRE(lock->index() == 0); | |
| } | |
| rcu.clear(); | |
| REQUIRE(rcu.reclaim() == 1); | |
| REQUIRE(counter.instances_created == 1); | |
| REQUIRE(counter.instances_alive == 0); | |
| { | |
| auto lock = reader.lock_realtime(); | |
| REQUIRE(lock.get() == nullptr); | |
| } | |
| } | |
| SECTION("Reclaim") { | |
| rav::ObjectCounter counter; | |
| rav::Rcu<rav::CountedObject> rcu; | |
| REQUIRE(counter.instances_created == 0); | |
| REQUIRE(counter.instances_alive == 0); | |
| rcu.update(counter); | |
| REQUIRE(counter.instances_created == 1); | |
| REQUIRE(counter.instances_alive == 1); | |
| // The last value should never be reclaimed | |
| REQUIRE(rcu.reclaim() == 0); | |
| REQUIRE(counter.instances_created == 1); | |
| REQUIRE(counter.instances_alive == 1); | |
| rcu.update(counter); | |
| REQUIRE(counter.instances_created == 2); | |
| REQUIRE(counter.instances_alive == 2); | |
| REQUIRE(rcu.reclaim() == 1); | |
| REQUIRE(counter.instances_created == 2); | |
| REQUIRE(counter.instances_alive == 1); | |
| } | |
| SECTION("Update and reclaim all") { | |
| rav::ObjectCounter counter; | |
| rav::Rcu<rav::CountedObject> rcu; | |
| REQUIRE(counter.instances_created == 0); | |
| REQUIRE(counter.instances_alive == 0); | |
| rcu.update_reclaim_all(counter); | |
| REQUIRE(counter.instances_created == 1); | |
| REQUIRE(counter.instances_alive == 1); | |
| rcu.update_reclaim_all(counter); | |
| REQUIRE(counter.instances_created == 2); | |
| REQUIRE(counter.instances_alive == 1); | |
| rcu.update_reclaim_all(counter); | |
| REQUIRE(counter.instances_created == 3); | |
| REQUIRE(counter.instances_alive == 1); | |
| } | |
| SECTION("Update and reclaim all with reader") { | |
| static constexpr size_t k_num_values = 10'000; | |
| rav::Rcu<uint64_t> rcu; | |
| auto reader = rcu.create_reader(); | |
| std::atomic_bool reader_error = false; | |
| std::thread reader_thread([&reader, &reader_error] { | |
| uint64_t value = 0; | |
| while (value < k_num_values) { | |
| auto lock = reader.lock_realtime(); | |
| if (!lock) { | |
| continue; | |
| } | |
| if (*lock < value) { | |
| reader_error = true; | |
| break; | |
| } | |
| value = *lock; | |
| std::this_thread::sleep_for(std::chrono::microseconds(1)); | |
| } | |
| }); | |
| for (uint64_t i = 0; i <= k_num_values; ++i) { | |
| rcu.update_reclaim_all(i); | |
| REQUIRE(rcu.get_num_values() <= 1); | |
| } | |
| reader_thread.join(); | |
| REQUIRE(!reader_error); | |
| } | |
| SECTION("Only objects older than the first object used by any reader are deleted") { | |
| rav::ObjectCounter counter; | |
| rav::Rcu<rav::CountedObject> rcu; | |
| rcu.update(counter); | |
| auto reader1 = rcu.create_reader(); | |
| auto reader2 = rcu.create_reader(); | |
| auto reader1_lock = reader1.lock_realtime(); | |
| REQUIRE(reader1_lock.get() != nullptr); | |
| REQUIRE(reader1_lock->index() == 0); | |
| rcu.update(counter); | |
| rcu.update(counter); | |
| auto reader2_lock = reader2.lock_realtime(); | |
| REQUIRE(reader2_lock.get() != nullptr); | |
| REQUIRE(reader2_lock->index() == 2); | |
| REQUIRE(counter.instances_created == 3); | |
| REQUIRE(counter.instances_alive == 3); | |
| REQUIRE(rcu.reclaim() == 0); | |
| // Because reader1_lock is still active, no values should be deleted. Not even the 2nd one (which is not in use | |
| // currently). | |
| REQUIRE(counter.instances_created == 3); | |
| REQUIRE(counter.instances_alive == 3); | |
| reader1_lock.reset(); | |
| REQUIRE(rcu.reclaim() == 2); | |
| // Now that reader1_lock has been reset, the first 2 objects can be deleted. | |
| REQUIRE(counter.instances_created == 3); | |
| REQUIRE(counter.instances_alive == 1); | |
| } | |
| SECTION("Reader does not block writer") { | |
| rav::Rcu<std::string> rcu; | |
| rcu.update("Hello, World!"); | |
| std::promise<void> value_updated; | |
| std::promise<void> has_read_lock; | |
| auto has_read_lock_future = has_read_lock.get_future(); | |
| auto reader_thread = std::thread([&rcu, &has_read_lock, value_updated_future = value_updated.get_future()] { | |
| auto reader = rcu.create_reader(); | |
| { | |
| auto lock = reader.lock_realtime(); | |
| REQUIRE(lock.get() != nullptr); | |
| REQUIRE(*lock == "Hello, World!"); | |
| has_read_lock.set_value(); | |
| value_updated_future.wait(); | |
| // We should still read the initial value since the never reset the lock | |
| REQUIRE(*lock == "Hello, World!"); | |
| } | |
| auto lock = reader.lock_realtime(); | |
| REQUIRE(lock.get() != nullptr); | |
| REQUIRE(*lock == "Updated value"); | |
| }); | |
| has_read_lock_future.wait(); | |
| auto writer_thread = std::thread([&rcu, &value_updated]() mutable { | |
| rcu.update("Updated value"); | |
| value_updated.set_value(); | |
| }); | |
| reader_thread.join(); | |
| writer_thread.join(); | |
| } | |
| SECTION("Readers can be created and destroyed concurrently") { | |
| rav::Rcu<std::string> rcu("Hello, World!"); | |
| static constexpr size_t num_threads = 100; | |
| std::vector<std::string> results(num_threads); | |
| std::atomic keep_going = true; | |
| std::atomic<size_t> num_active_threads = 0; | |
| std::vector<std::thread> threads; | |
| threads.reserve(num_threads); | |
| for (size_t i = 0; i < num_threads; ++i) { | |
| threads.emplace_back([&] { | |
| const auto thread_id = num_active_threads.fetch_add(1); | |
| while (keep_going) { | |
| auto reader = rcu.create_reader(); | |
| auto lock = reader.lock_realtime(); | |
| results[thread_id] = *lock; | |
| } | |
| }); | |
| } | |
| auto start = std::chrono::steady_clock::now(); | |
| while (num_active_threads < num_threads) { | |
| if (start + std::chrono::seconds(k_timeout_seconds) < std::chrono::steady_clock::now()) { | |
| FAIL("Timeout"); | |
| } | |
| std::this_thread::yield(); | |
| } | |
| // Once all threads are active, keep going for another small amount of time | |
| std::this_thread::sleep_for(std::chrono::milliseconds(100)); | |
| keep_going = false; | |
| for (auto& thread : threads) { | |
| thread.join(); | |
| } | |
| for (auto& result : results) { | |
| REQUIRE(result == "Hello, World!"); | |
| } | |
| } | |
| SECTION("Concurrent reads and writes and reclaims should be thread safe") { | |
| static constexpr size_t num_values = 10'000; | |
| static constexpr size_t num_writer_threads = 3; | |
| static constexpr size_t num_reader_threads = 3; | |
| static constexpr size_t num_reclaim_thread = 3; | |
| // Assign the values we're going to give the rcu object | |
| rav::Rcu<std::pair<size_t, std::string>> rcu; | |
| std::atomic<size_t> num_readers_finished = 0; | |
| std::vector<std::thread> writer_threads; | |
| writer_threads.reserve(num_writer_threads); | |
| // Writers are going to hammer the rcu object with new values until all readers have read all values. | |
| for (size_t i = 0; i < num_writer_threads; ++i) { | |
| writer_threads.emplace_back([&num_readers_finished, &rcu] { | |
| while (num_readers_finished < num_reader_threads) { | |
| for (size_t j = 0; j < num_values; ++j) { | |
| rcu.update(std::make_pair(j, std::to_string(j + 1))); | |
| std::ignore = rcu.reclaim(); | |
| std::this_thread::yield(); | |
| } | |
| } | |
| }); | |
| } | |
| std::vector<std::vector<std::string>> reader_values(num_reader_threads); | |
| std::vector<std::thread> reader_threads; | |
| reader_threads.reserve(num_reader_threads); | |
| // Readers are going to read from the rcu until they have received all values. | |
| for (size_t i = 0; i < num_reader_threads; ++i) { | |
| reader_threads.emplace_back([&reader_values, &rcu, i, &num_readers_finished] { | |
| size_t num_values_read = 0; | |
| std::vector<std::string> output_values(num_values); | |
| auto reader = rcu.create_reader(); | |
| while (num_values_read < num_values) { | |
| const auto lock = reader.lock_realtime(); | |
| if (lock.get() == nullptr) { | |
| continue; | |
| } | |
| auto& it = output_values.at(lock->first); | |
| if (it.empty()) { | |
| it = lock->second; | |
| ++num_values_read; | |
| } | |
| } | |
| reader_values[i] = output_values; | |
| num_readers_finished.fetch_add(1); | |
| }); | |
| } | |
| // These threads are going to reclaim. | |
| std::vector<std::thread> reclaim_threads; | |
| reclaim_threads.reserve(num_reclaim_thread); | |
| for (size_t i = 0; i < num_reclaim_thread; ++i) { | |
| reclaim_threads.emplace_back([&] { | |
| while (num_readers_finished < num_reader_threads) { | |
| std::ignore = rcu.reclaim(); | |
| std::this_thread::yield(); | |
| } | |
| }); | |
| } | |
| for (auto& thread : writer_threads) { | |
| thread.join(); | |
| } | |
| for (auto& thread : reader_threads) { | |
| thread.join(); | |
| } | |
| for (auto& thread : reclaim_threads) { | |
| thread.join(); | |
| } | |
| for (auto& reader_value : reader_values) { | |
| for (size_t i = 0; i < num_values; ++i) { | |
| REQUIRE(reader_value.at(i) == std::to_string(i + 1)); | |
| } | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment