Skip to content

Instantly share code, notes, and snippets.

@makasim
Last active August 21, 2025 20:55
Show Gist options
  • Select an option

  • Save makasim/4bd5e115147d2321b59b302a576a4eee to your computer and use it in GitHub Desktop.

Select an option

Save makasim/4bd5e115147d2321b59b302a576a4eee to your computer and use it in GitHub Desktop.
Weak pointer based labels compressor

Everytime we create a new key we increment usedBy on every label that is included in that key. Every time the weak key is garbage collected (checked in cleanup func) we decrement usedBy. When usedBy reaches zero it means nobody uses the label and we can safely remove it and remove the key.

The key can decompress itself using unsafe pointer dereference, see Key.Decompress func.

This is basically a workaround for missing slice comparable customization. Once go has one the code could be simplified a lot.

package promutil
import (
"sync"
"time"
"unsafe"
"weak"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
)
type Key struct {
ptrs []unsafe.Pointer
}
func (k *Key) Decompress() []prompb.Label {
labels := make([]prompb.Label, 0, len(k.ptrs))
for i := range k.ptrs {
labelPtr := k.ptrs[i]
if labelPtr == nil {
panic("nil label pointer")
}
label := (*prompb.Label)(labelPtr)
labels = append(labels, *label)
}
return labels
}
type labelEntry struct {
usedBy int
label prompb.Label
}
type LabelsCompressorV2 struct {
mux sync.Mutex
labels map[unsafe.Pointer]*labelEntry
keys map[*Key]weak.Pointer[Key]
}
func NewLabelsCompressorV2() *LabelsCompressorV2 {
lc := &LabelsCompressorV2{
labels: make(map[unsafe.Pointer]*labelEntry),
keys: make(map[*Key]weak.Pointer[Key]),
}
go lc.cleanup()
return lc
}
func (lc *LabelsCompressorV2) Compress(labels []prompb.Label) *Key {
lc.mux.Lock()
defer lc.mux.Unlock()
key := &Key{}
for i := range labels {
l := cloneLabel(labels[i])
le := &labelEntry{
label: l,
}
lePtr := unsafe.Pointer(le)
lc.labels[lePtr] = le
key.ptrs = append(key.ptrs, lePtr)
}
weakKey, ok := lc.keys[key]
if !ok {
for _, ptr := range key.ptrs {
lc.labels[ptr].usedBy++
}
lc.keys[key] = weak.Make(key)
}
if weakKey.Value() == nil {
lc.keys[key] = weak.Make(key)
}
return weakKey.Value()
}
func (lc *LabelsCompressorV2) cleanup() {
t := time.NewTicker(5 * time.Minute)
defer t.Stop()
for {
select {
case <-t.C:
lc.doCleanup()
}
}
}
func (lc *LabelsCompressorV2) doCleanup() {
lc.mux.Lock()
defer lc.mux.Unlock()
for key, weakPtr := range lc.keys {
if weakPtr.Value() != nil {
continue
}
for _, ptr := range key.ptrs {
if entry, ok := lc.labels[ptr]; ok {
entry.usedBy--
if entry.usedBy <= 0 {
delete(lc.labels, ptr)
}
}
}
delete(lc.keys, key)
}
}
package promutil
import (
"reflect"
"sync"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
"github.com/cespare/xxhash/v2"
)
var hashBBP = bytesutil.ByteBufferPool{}
type LabelsCompressorV2 struct {
mux sync.Mutex
//nextIdx atomic.Uint64
labelsToId sync.Map // map[uint64]prompb.Label
//idToLabels sync.Map // map[uint64]*prompb.Label
//prevIdToLabels map[uint64]*prompb.Label
}
func (lc *LabelsCompressorV2) Compress(dst []byte, labels []prompb.Label) []byte {
if len(labels) == 0 {
// Fast path
return append(dst, 0)
}
a := encoding.GetUint64s(len(labels) + 1)
a.A[0] = uint64(len(labels))
lc.compress(a.A[1:], labels)
dst = encoding.MarshalVarUint64s(dst, a.A)
encoding.PutUint64s(a)
return dst
}
func (lc *LabelsCompressorV2) compress(dst []uint64, labels []prompb.Label) {
if len(labels) == 0 {
return
}
//rLocked := true
//lc.mux.RLock()
//defer func() {
// if rLocked {
// lc.mux.RUnlock()
// } else {
// lc.mux.Unlock()
// }
//}()
var maxSize int
for i := range labels {
maxSize = max(maxSize, len(labels[i].Name)+len(labels[i].Value))
}
bb := hashBBP.Get()
defer hashBBP.Put(bb)
bb.Grow(maxSize)
_ = dst[len(labels)-1]
for i := range labels {
bb.Reset()
bb.Write(s2b(labels[i].Name))
bb.Write(s2b(labels[i].Value))
id := xxhash.Sum64(bb.B)
if _, ok := lc.labelsToId.Load(id); !ok {
//if rLocked {
// lc.mux.RUnlock()
// lc.mux.Lock()
// rLocked = false
//}
//if lc.labelsToId == nil {
// lc.labelsToId = make(map[prompb.Label]uint64)
//}
//if lc.idToLabels == nil {
// lc.idToLabels = make(map[uint64]*prompb.Label)
//}
//
//idx := lc.nextIdx.Add(1)
lc.mux.Lock()
if _, ok = lc.labelsToId.Load(id); !ok {
labelCopy := cloneLabel(labels[i])
lc.labelsToId.Store(id, labelCopy)
}
lc.mux.Unlock()
}
dst[i] = id
}
}
func (lc *LabelsCompressorV2) Decompress(dst []prompb.Label, src []byte) []prompb.Label {
labelsLen, nSize := encoding.UnmarshalVarUint64(src)
if nSize <= 0 {
logger.Panicf("BUG: cannot unmarshal labels length from uvarint")
}
tail := src[nSize:]
if labelsLen == 0 {
// fast path - nothing to decode
if len(tail) > 0 {
logger.Panicf("BUG: unexpected non-empty tail left; len(tail)=%d; tail=%X", len(tail), tail)
}
return dst
}
a := encoding.GetUint64s(int(labelsLen))
var err error
tail, err = encoding.UnmarshalVarUint64s(a.A, tail)
if err != nil {
logger.Panicf("BUG: cannot unmarshal label indexes: %s", err)
}
if len(tail) > 0 {
logger.Panicf("BUG: unexpected non-empty tail left: len(tail)=%d; tail=%X", len(tail), tail)
}
dst = lc.decompress(dst, a.A)
encoding.PutUint64s(a)
return dst
}
func (lc *LabelsCompressorV2) decompress(dst []prompb.Label, src []uint64) []prompb.Label {
for _, idx := range src {
label0, ok := lc.labelsToId.Load(idx)
if !ok {
logger.Panicf("BUG: missing label for idx=%d", idx)
}
dst = append(dst, label0.(prompb.Label))
}
return dst
}
//func (lc *LabelsCompressorV2) cleanupLoop() {
// // ticker should be 3x bigger than any aggr interval
// t := time.NewTicker(time.Hour)
// defer t.Stop()
// for {
// select {
// case <-t.C:
// lc.cleanup()
// }
// }
//}
//
//func (lc *LabelsCompressorV2) cleanup() {
// lc.mux.Lock()
// defer lc.mux.Unlock()
//
// prevIdToLabels := lc.prevIdToLabels
// lc.prevIdToLabels = lc.idToLabels
// lc.idToLabels = make(map[uint64]*prompb.Label)
//
// for _, label := range prevIdToLabels {
// delete(lc.labelsToId, *label)
// }
//}
func s2b(s string) (b []byte) {
strh := (*reflect.StringHeader)(unsafe.Pointer(&s))
sh := (*reflect.SliceHeader)(unsafe.Pointer(&b))
sh.Data = strh.Data
sh.Len = strh.Len
sh.Cap = strh.Len
return b
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment