This feature aims to save messages that cannot be sent when the component is offline and resend them when it is online again.
For instance, if queue is offline for some reason, replicator should keep the messages that it needs to send to queue and resend them when queue is online.
Likewise, with repository, if repository is offline for some reason, replicator should keep the messages that it needs to send to repository and resend them when repository is online.
We don't send compensate messages to the offline components, and it might cause databases between cells to not be equal since a compensate message could be a status change rather than a delete operation.
With this feature, we will try to resend the original and compensate message when the component (queue or repository) is back online.
When a message is Failed, both the original and a compensate messages are added to the corresponding slice in BufferedCache, as shown below:
BufferedCache Structure:
type BufferedCache struct {
data []Message
rwMutex sync.RWMutex
cacheTtl int32
}For instance, if message001 fails to go to queue but succeeds to go to repo. Then a compensate message is sent to repo because queue could not process it. Then, the original message and the compensate message are kept in BufferedCache to resend them when queue is available again. [only if error code not in 15 or 6]
The messages that have failed and received the grpc error codes below, can´t be considered eligible to be cached.
For example, if a message already exist in a database, so does not make sense in trying to resend it.
Another case is, if a message is corrupted, so it shouldn´t be considered eligible to be cached and for the purpose of being resent
15=data_loss
6=already_exist
The same for repo message, if message002 fails to go to repository but succeeds to go to queue. Then a compensate message is sent to queue because repository could not process it. Then, the original message and the compensate message are kept in BufferedCache to resend them when repository is available again.
Obs.: If both repo and queue is down, this feature does not apply.
Message Structure in the slice.
type Message struct {
request *protos.CommandRequest
createdAt time.Time
}BufferedCache has two processors. One for QueueDLQProcessing and one for RepositoryDLQProcessing.
These processes will be 2 go routines running separately, so it will be responsible for read and dispatch messages as well as clearing the slice or eviction messages that have expired, based on the TTL setting from config file.
These processors will run to process the messages from the cache using backoff strategy.
[Queue processor will only retry in backoff strategy only if error code not in 15, 6]
As concurrent scenarios like processor reading, writing or eviction deleting the records, below are the proposal to implement the logic
Using sync.RWMutex which helps to avoid concurrent reads and writes on the slices. It is an optimized lock strategy which only when queue or repository is down, so normal flow won't lock. While we have evaluated the impact of using a mutex on system performance, the benefits outweigh the drawbacks in this context. We assume that the write processor operates only when the queue is down, reducing the system's responsibility to process messages. Even if the replicator experiences temporary throttling, it should not significantly impact the overall system performance.