|
package promutil |
|
|
|
import ( |
|
"reflect" |
|
"sync" |
|
"unsafe" |
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" |
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" |
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" |
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" |
|
"github.com/cespare/xxhash/v2" |
|
) |
|
|
|
var hashBBP = bytesutil.ByteBufferPool{} |
|
|
|
type LabelsCompressorV2 struct { |
|
mux sync.Mutex |
|
//nextIdx atomic.Uint64 |
|
|
|
labelsToId sync.Map // map[uint64]prompb.Label |
|
//idToLabels sync.Map // map[uint64]*prompb.Label |
|
|
|
//prevIdToLabels map[uint64]*prompb.Label |
|
} |
|
|
|
func (lc *LabelsCompressorV2) Compress(dst []byte, labels []prompb.Label) []byte { |
|
if len(labels) == 0 { |
|
// Fast path |
|
return append(dst, 0) |
|
} |
|
|
|
a := encoding.GetUint64s(len(labels) + 1) |
|
a.A[0] = uint64(len(labels)) |
|
lc.compress(a.A[1:], labels) |
|
dst = encoding.MarshalVarUint64s(dst, a.A) |
|
encoding.PutUint64s(a) |
|
return dst |
|
} |
|
|
|
func (lc *LabelsCompressorV2) compress(dst []uint64, labels []prompb.Label) { |
|
if len(labels) == 0 { |
|
return |
|
} |
|
|
|
//rLocked := true |
|
//lc.mux.RLock() |
|
//defer func() { |
|
// if rLocked { |
|
// lc.mux.RUnlock() |
|
// } else { |
|
// lc.mux.Unlock() |
|
// } |
|
//}() |
|
|
|
var maxSize int |
|
for i := range labels { |
|
maxSize = max(maxSize, len(labels[i].Name)+len(labels[i].Value)) |
|
} |
|
|
|
bb := hashBBP.Get() |
|
defer hashBBP.Put(bb) |
|
bb.Grow(maxSize) |
|
|
|
_ = dst[len(labels)-1] |
|
for i := range labels { |
|
bb.Reset() |
|
bb.Write(s2b(labels[i].Name)) |
|
bb.Write(s2b(labels[i].Value)) |
|
id := xxhash.Sum64(bb.B) |
|
|
|
if _, ok := lc.labelsToId.Load(id); !ok { |
|
//if rLocked { |
|
// lc.mux.RUnlock() |
|
// lc.mux.Lock() |
|
// rLocked = false |
|
//} |
|
//if lc.labelsToId == nil { |
|
// lc.labelsToId = make(map[prompb.Label]uint64) |
|
//} |
|
//if lc.idToLabels == nil { |
|
// lc.idToLabels = make(map[uint64]*prompb.Label) |
|
//} |
|
// |
|
//idx := lc.nextIdx.Add(1) |
|
lc.mux.Lock() |
|
if _, ok = lc.labelsToId.Load(id); !ok { |
|
labelCopy := cloneLabel(labels[i]) |
|
lc.labelsToId.Store(id, labelCopy) |
|
} |
|
lc.mux.Unlock() |
|
} |
|
|
|
dst[i] = id |
|
} |
|
} |
|
|
|
func (lc *LabelsCompressorV2) Decompress(dst []prompb.Label, src []byte) []prompb.Label { |
|
labelsLen, nSize := encoding.UnmarshalVarUint64(src) |
|
if nSize <= 0 { |
|
logger.Panicf("BUG: cannot unmarshal labels length from uvarint") |
|
} |
|
tail := src[nSize:] |
|
if labelsLen == 0 { |
|
// fast path - nothing to decode |
|
if len(tail) > 0 { |
|
logger.Panicf("BUG: unexpected non-empty tail left; len(tail)=%d; tail=%X", len(tail), tail) |
|
} |
|
return dst |
|
} |
|
|
|
a := encoding.GetUint64s(int(labelsLen)) |
|
var err error |
|
tail, err = encoding.UnmarshalVarUint64s(a.A, tail) |
|
if err != nil { |
|
logger.Panicf("BUG: cannot unmarshal label indexes: %s", err) |
|
} |
|
if len(tail) > 0 { |
|
logger.Panicf("BUG: unexpected non-empty tail left: len(tail)=%d; tail=%X", len(tail), tail) |
|
} |
|
dst = lc.decompress(dst, a.A) |
|
encoding.PutUint64s(a) |
|
return dst |
|
} |
|
|
|
func (lc *LabelsCompressorV2) decompress(dst []prompb.Label, src []uint64) []prompb.Label { |
|
for _, idx := range src { |
|
label0, ok := lc.labelsToId.Load(idx) |
|
if !ok { |
|
logger.Panicf("BUG: missing label for idx=%d", idx) |
|
} |
|
dst = append(dst, label0.(prompb.Label)) |
|
} |
|
return dst |
|
} |
|
|
|
//func (lc *LabelsCompressorV2) cleanupLoop() { |
|
// // ticker should be 3x bigger than any aggr interval |
|
// t := time.NewTicker(time.Hour) |
|
// defer t.Stop() |
|
// for { |
|
// select { |
|
// case <-t.C: |
|
// lc.cleanup() |
|
// } |
|
// } |
|
//} |
|
// |
|
//func (lc *LabelsCompressorV2) cleanup() { |
|
// lc.mux.Lock() |
|
// defer lc.mux.Unlock() |
|
// |
|
// prevIdToLabels := lc.prevIdToLabels |
|
// lc.prevIdToLabels = lc.idToLabels |
|
// lc.idToLabels = make(map[uint64]*prompb.Label) |
|
// |
|
// for _, label := range prevIdToLabels { |
|
// delete(lc.labelsToId, *label) |
|
// } |
|
//} |
|
|
|
func s2b(s string) (b []byte) { |
|
strh := (*reflect.StringHeader)(unsafe.Pointer(&s)) |
|
sh := (*reflect.SliceHeader)(unsafe.Pointer(&b)) |
|
sh.Data = strh.Data |
|
sh.Len = strh.Len |
|
sh.Cap = strh.Len |
|
return b |
|
} |