Created
February 28, 2025 07:54
-
-
Save Pothulapati/c5165777ab51df3e2c2d1fe45f539279 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| tarun_dragonflydb_io@tar-shake-test:~/RedisShake$ git diff . | |
| diff --git a/internal/writer/redis_standalone_writer.go b/internal/writer/redis_standalone_writer.go | |
| index 838b6a1..5460cc0 100644 | |
| --- a/internal/writer/redis_standalone_writer.go | |
| +++ b/internal/writer/redis_standalone_writer.go | |
| @@ -2,16 +2,13 @@ package writer | |
| import ( | |
| "context" | |
| - "errors" | |
| "fmt" | |
| - "strconv" | |
| "strings" | |
| "sync" | |
| "sync/atomic" | |
| "time" | |
| "RedisShake/internal/client" | |
| - "RedisShake/internal/client/proto" | |
| "RedisShake/internal/config" | |
| "RedisShake/internal/entry" | |
| "RedisShake/internal/log" | |
| @@ -84,18 +81,6 @@ func (w *redisStandaloneWriter) Write(e *entry.Entry) { | |
| w.ch <- e | |
| } | |
| -func (w *redisStandaloneWriter) switchDbTo(newDbId int) { | |
| - log.Debugf("[%s] switch db to [%d]", w.stat.Name, newDbId) | |
| - w.client.Send("select", strconv.Itoa(newDbId)) | |
| - w.DbId = newDbId | |
| - if !w.offReply { | |
| - w.chWaitReply <- &entry.Entry{ | |
| - Argv: []string{"select", strconv.Itoa(newDbId)}, | |
| - CmdName: "select", | |
| - } | |
| - } | |
| -} | |
| - | |
| func (w *redisStandaloneWriter) processWrite(ctx context.Context) { | |
| ticker := time.NewTicker(10 * time.Millisecond) | |
| defer ticker.Stop() | |
| @@ -104,57 +89,45 @@ func (w *redisStandaloneWriter) processWrite(ctx context.Context) { | |
| case <-ctx.Done(): | |
| // do nothing until w.ch is closed | |
| case <-ticker.C: | |
| - w.client.Flush() | |
| + // Skip actual flush | |
| + log.Debugf("[%s] Would have flushed Redis connection", w.stat.Name) | |
| case e, ok := <-w.ch: | |
| if !ok { | |
| // clean up and exit | |
| - w.client.Flush() | |
| + log.Debugf("[%s] Would have performed final flush", w.stat.Name) | |
| w.chWg.Done() | |
| return | |
| } | |
| - // switch db if we need | |
| + // Log db switch without actually doing it | |
| if w.DbId != e.DbId { | |
| - w.switchDbTo(e.DbId) | |
| - } | |
| - // send | |
| - bytes := e.Serialize() | |
| - for e.SerializedSize+atomic.LoadInt64(&w.stat.UnansweredBytes) > config.Opt.Advanced | |
| .TargetRedisClientMaxQuerybufLen { | |
| - time.Sleep(1 * time.Nanosecond) | |
| + log.Infof("[%s] Would have switched db from [%d] to [%d]", w.stat.Name, w.Db | |
| Id, e.DbId) | |
| + w.DbId = e.DbId | |
| } | |
| - log.Debugf("[%s] send cmd. cmd=[%s]", w.stat.Name, e.String()) | |
| + | |
| + // Calculate bytes but don't actually send | |
| + // bytes := e.Serialize() | |
| + // log.Infof("[%s] Would have sent cmd (skipped). cmd=[%s], size=[%d]", w.stat. | |
| Name, e.String(), len(bytes)) | |
| + | |
| + // Maintain stats for compatibility | |
| if !w.offReply { | |
| select { | |
| case w.chWaitReply <- e: | |
| default: | |
| - w.client.Flush() | |
| + log.Debugf("[%s] Would have flushed buffer", w.stat.Name) | |
| w.chWaitReply <- e | |
| } | |
| atomic.AddInt64(&w.stat.UnansweredBytes, e.SerializedSize) | |
| atomic.AddInt64(&w.stat.UnansweredEntries, 1) | |
| } | |
| - w.client.SendBytesBuff(bytes) | |
| } | |
| } | |
| } | |
| func (w *redisStandaloneWriter) processReply() { | |
| for e := range w.chWaitReply { | |
| - reply, err := w.client.Receive() | |
| - log.Debugf("[%s] receive reply. reply=[%v], cmd=[%s]", w.stat.Name, reply, e.String()) | |
| - | |
| - // It's good to skip the nil error since some write commands will return the null reply. For | |
| example, | |
| - // the SET command with NX option will return nil if the key already exists. | |
| - if err != nil && !errors.Is(err, proto.Nil) { | |
| - if err.Error() == "BUSYKEY Target key name already exists." { | |
| - if config.Opt.Advanced.RDBRestoreCommandBehavior == "skip" { | |
| - log.Debugf("[%s] redisStandaloneWriter received BUSYKEY reply. cmd=[ | |
| %s]", w.stat.Name, e.String()) | |
| - } else if config.Opt.Advanced.RDBRestoreCommandBehavior == "panic" { | |
| - log.Panicf("[%s] redisStandaloneWriter received BUSYKEY reply. cmd=[ | |
| %s]", w.stat.Name, e.String()) | |
| - } | |
| - } else { | |
| - log.Panicf("[%s] receive reply failed. cmd=[%s], error=[%v]", w.stat.Name, e | |
| .String(), err) | |
| - } | |
| - } | |
| + // Skip actual reply waiting, just log and update counters | |
| + // log.Debugf("[%s] Would have waited for reply for cmd=[%s]", w.stat.Name, e.String()) | |
| + | |
| if strings.EqualFold(e.CmdName, "select") { // skip select command | |
| continue | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment