Skip to content

Instantly share code, notes, and snippets.

@Pothulapati
Created February 28, 2025 07:54
Show Gist options
  • Select an option

  • Save Pothulapati/c5165777ab51df3e2c2d1fe45f539279 to your computer and use it in GitHub Desktop.

Select an option

Save Pothulapati/c5165777ab51df3e2c2d1fe45f539279 to your computer and use it in GitHub Desktop.
tarun_dragonflydb_io@tar-shake-test:~/RedisShake$ git diff .
diff --git a/internal/writer/redis_standalone_writer.go b/internal/writer/redis_standalone_writer.go
index 838b6a1..5460cc0 100644
--- a/internal/writer/redis_standalone_writer.go
+++ b/internal/writer/redis_standalone_writer.go
@@ -2,16 +2,13 @@ package writer
import (
"context"
- "errors"
"fmt"
- "strconv"
"strings"
"sync"
"sync/atomic"
"time"
"RedisShake/internal/client"
- "RedisShake/internal/client/proto"
"RedisShake/internal/config"
"RedisShake/internal/entry"
"RedisShake/internal/log"
@@ -84,18 +81,6 @@ func (w *redisStandaloneWriter) Write(e *entry.Entry) {
w.ch <- e
}
-func (w *redisStandaloneWriter) switchDbTo(newDbId int) {
- log.Debugf("[%s] switch db to [%d]", w.stat.Name, newDbId)
- w.client.Send("select", strconv.Itoa(newDbId))
- w.DbId = newDbId
- if !w.offReply {
- w.chWaitReply <- &entry.Entry{
- Argv: []string{"select", strconv.Itoa(newDbId)},
- CmdName: "select",
- }
- }
-}
-
func (w *redisStandaloneWriter) processWrite(ctx context.Context) {
ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()
@@ -104,57 +89,45 @@ func (w *redisStandaloneWriter) processWrite(ctx context.Context) {
case <-ctx.Done():
// do nothing until w.ch is closed
case <-ticker.C:
- w.client.Flush()
+ // Skip actual flush
+ log.Debugf("[%s] Would have flushed Redis connection", w.stat.Name)
case e, ok := <-w.ch:
if !ok {
// clean up and exit
- w.client.Flush()
+ log.Debugf("[%s] Would have performed final flush", w.stat.Name)
w.chWg.Done()
return
}
- // switch db if we need
+ // Log db switch without actually doing it
if w.DbId != e.DbId {
- w.switchDbTo(e.DbId)
- }
- // send
- bytes := e.Serialize()
- for e.SerializedSize+atomic.LoadInt64(&w.stat.UnansweredBytes) > config.Opt.Advanced
.TargetRedisClientMaxQuerybufLen {
- time.Sleep(1 * time.Nanosecond)
+ log.Infof("[%s] Would have switched db from [%d] to [%d]", w.stat.Name, w.Db
Id, e.DbId)
+ w.DbId = e.DbId
}
- log.Debugf("[%s] send cmd. cmd=[%s]", w.stat.Name, e.String())
+
+ // Calculate bytes but don't actually send
+ // bytes := e.Serialize()
+ // log.Infof("[%s] Would have sent cmd (skipped). cmd=[%s], size=[%d]", w.stat.
Name, e.String(), len(bytes))
+
+ // Maintain stats for compatibility
if !w.offReply {
select {
case w.chWaitReply <- e:
default:
- w.client.Flush()
+ log.Debugf("[%s] Would have flushed buffer", w.stat.Name)
w.chWaitReply <- e
}
atomic.AddInt64(&w.stat.UnansweredBytes, e.SerializedSize)
atomic.AddInt64(&w.stat.UnansweredEntries, 1)
}
- w.client.SendBytesBuff(bytes)
}
}
}
func (w *redisStandaloneWriter) processReply() {
for e := range w.chWaitReply {
- reply, err := w.client.Receive()
- log.Debugf("[%s] receive reply. reply=[%v], cmd=[%s]", w.stat.Name, reply, e.String())
-
- // It's good to skip the nil error since some write commands will return the null reply. For
example,
- // the SET command with NX option will return nil if the key already exists.
- if err != nil && !errors.Is(err, proto.Nil) {
- if err.Error() == "BUSYKEY Target key name already exists." {
- if config.Opt.Advanced.RDBRestoreCommandBehavior == "skip" {
- log.Debugf("[%s] redisStandaloneWriter received BUSYKEY reply. cmd=[
%s]", w.stat.Name, e.String())
- } else if config.Opt.Advanced.RDBRestoreCommandBehavior == "panic" {
- log.Panicf("[%s] redisStandaloneWriter received BUSYKEY reply. cmd=[
%s]", w.stat.Name, e.String())
- }
- } else {
- log.Panicf("[%s] receive reply failed. cmd=[%s], error=[%v]", w.stat.Name, e
.String(), err)
- }
- }
+ // Skip actual reply waiting, just log and update counters
+ // log.Debugf("[%s] Would have waited for reply for cmd=[%s]", w.stat.Name, e.String())
+
if strings.EqualFold(e.CmdName, "select") { // skip select command
continue
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment