Skip to content

Instantly share code, notes, and snippets.

@GuillaumeFalourd
Last active February 11, 2025 20:24
Show Gist options
  • Select an option

  • Save GuillaumeFalourd/1ae2924013663a0f16543cf7eefd6d27 to your computer and use it in GitHub Desktop.

Select an option

Save GuillaumeFalourd/1ae2924013663a0f16543cf7eefd6d27 to your computer and use it in GitHub Desktop.
request for comment template

Cache DLQ Buffer

Goal:

This feature aims to save messages that cannot be sent when the component is offline and resend them when it is online again.

For instance, if queue is offline for some reason, replicator should keep the messages that it needs to send to queue and resend them when queue is online.

Likewise, with repository, if repository is offline for some reason, replicator should keep the messages that it needs to send to repository and resend them when repository is online.

Motivation:

Why do we need this ?

We don't send compensate messages to the offline components, and it might cause databases between cells to not be equal since a compensate message could be a status change rather than a delete operation.

With this feature, we will try to resend the original and compensate message when the component (queue or repository) is back online.

Tech Details

When a message is Failed, both the original and a compensate messages are added to the corresponding slice in BufferedCache, as shown below:

BufferedCache Structure:

type BufferedCache struct {
    data     []Message
    rwMutex  sync.RWMutex
    cacheTtl int32
}

For instance, if message001 fails to go to queue but succeeds to go to repo. Then a compensate message is sent to repo because queue could not process it. Then, the original message and the compensate message are kept in BufferedCache to resend them when queue is available again. [only if error code not in 15 or 6]

The messages that have failed and received the grpc error codes below, can´t be considered eligible to be cached.

For example, if a message already exist in a database, so does not make sense in trying to resend it.

Another case is, if a message is corrupted, so it shouldn´t be considered eligible to be cached and for the purpose of being resent

15=data_loss
6=already_exist

The same for repo message, if message002 fails to go to repository but succeeds to go to queue. Then a compensate message is sent to queue because repository could not process it. Then, the original message and the compensate message are kept in BufferedCache to resend them when repository is available again.

Obs.: If both repo and queue is down, this feature does not apply.

Message Structure in the slice.

type Message struct {
    request     *protos.CommandRequest
    createdAt   time.Time
}

BufferedCache has two processors. One for QueueDLQProcessing and one for RepositoryDLQProcessing.

These processes will be 2 go routines running separately, so it will be responsible for read and dispatch messages as well as clearing the slice or eviction messages that have expired, based on the TTL setting from config file.

QueueDlqProcessor and RepoDlqProcessor

These processors will run to process the messages from the cache using backoff strategy.
[Queue processor will only retry in backoff strategy only if error code not in 15, 6]

QueueDlq, RepoDlq Processors will be started in background on startup

As concurrent scenarios like processor reading, writing or eviction deleting the records, below are the proposal to implement the logic

Using sync.RWMutex which helps to avoid concurrent reads and writes on the slices. It is an optimized lock strategy which only when queue or repository is down, so normal flow won't lock. While we have evaluated the impact of using a mutex on system performance, the benefits outweigh the drawbacks in this context. We assume that the write processor operates only when the queue is down, reducing the system's responsibility to process messages. Even if the replicator experiences temporary throttling, it should not significantly impact the overall system performance.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment