Skip to content

Instantly share code, notes, and snippets.

@tedmax100
Last active February 3, 2026 07:18
Show Gist options
  • Select an option

  • Save tedmax100/0a4a03fc3269b452f11344787c644491 to your computer and use it in GitHub Desktop.

Select an option

Save tedmax100/0a4a03fc3269b452f11344787c644491 to your computer and use it in GitHub Desktop.
identity_demo_test.go
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
// 檔案位置: processor/deltatocumulativeprocessor/internal/identity_demo_test.go
package internal
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/delta"
)
// TestStreamIdentity_SameAttributes_SameStream 驗證相同 attributes 會產生相同的 stream identity
// 即使 timestamp 和 value 不同
func TestStreamIdentity_SameAttributes_SameStream(t *testing.T) {
res := newResource("my-service", "host-1")
scope := newScope("otelcol/prometheus", "v0.90.0")
metric := newSumMetric("http_requests_total", "1")
// DataPoint 1: timestamp=1100, value=100
dp1 := metric.Sum().DataPoints().AppendEmpty()
dp1.Attributes().PutStr("method", "GET")
dp1.Attributes().PutStr("status", "200")
dp1.SetStartTimestamp(1000)
dp1.SetTimestamp(1100)
dp1.SetIntValue(100)
// DataPoint 2: 相同 attributes,但不同 timestamp 和 value
dp2 := metric.Sum().DataPoints().AppendEmpty()
dp2.Attributes().PutStr("method", "GET")
dp2.Attributes().PutStr("status", "200")
dp2.SetStartTimestamp(2000) // 不同
dp2.SetTimestamp(2100) // 不同
dp2.SetIntValue(200) // 不同
metricID := identity.OfResourceMetric(res, scope, metric)
stream1 := identity.OfStream(metricID, dp1)
stream2 := identity.OfStream(metricID, dp2)
// 關鍵驗證:相同 attributes = 相同 stream
assert.Equal(t, stream1, stream2,
"datapoints with same attributes should have same stream identity, "+
"regardless of timestamp/value differences")
}
// TestStreamIdentity_DifferentAttributes_DifferentStream 驗證不同 attributes 會產生不同的 stream
func TestStreamIdentity_DifferentAttributes_DifferentStream(t *testing.T) {
res := newResource("my-service", "host-1")
scope := newScope("otelcol/prometheus", "v0.90.0")
metric := newSumMetric("http_requests_total", "1")
metricID := identity.OfResourceMetric(res, scope, metric)
testCases := []struct {
name string
attrs1 map[string]string
attrs2 map[string]string
}{
{
name: "different status label",
attrs1: map[string]string{"method": "GET", "status": "200"},
attrs2: map[string]string{"method": "GET", "status": "500"},
},
{
name: "different method label",
attrs1: map[string]string{"method": "GET", "status": "200"},
attrs2: map[string]string{"method": "POST", "status": "200"},
},
{
name: "extra label",
attrs1: map[string]string{"method": "GET"},
attrs2: map[string]string{"method": "GET", "handler": "/api"},
},
{
name: "missing label",
attrs1: map[string]string{"method": "GET", "status": "200"},
attrs2: map[string]string{"method": "GET"},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
dp1 := pmetric.NewNumberDataPoint()
for k, v := range tc.attrs1 {
dp1.Attributes().PutStr(k, v)
}
dp2 := pmetric.NewNumberDataPoint()
for k, v := range tc.attrs2 {
dp2.Attributes().PutStr(k, v)
}
stream1 := identity.OfStream(metricID, dp1)
stream2 := identity.OfStream(metricID, dp2)
assert.NotEqual(t, stream1, stream2,
"datapoints with different attributes should have different stream identities")
})
}
}
// TestStreamIdentity_DifferentResource_DifferentStream 驗證不同 Resource 會產生不同的 stream
func TestStreamIdentity_DifferentResource_DifferentStream(t *testing.T) {
res1 := newResource("my-service", "host-1")
res2 := newResource("my-service", "host-2") // 不同的 host
scope := newScope("otelcol/prometheus", "v0.90.0")
metric := newSumMetric("http_requests_total", "1")
dp := pmetric.NewNumberDataPoint()
dp.Attributes().PutStr("method", "GET")
metricID1 := identity.OfResourceMetric(res1, scope, metric)
metricID2 := identity.OfResourceMetric(res2, scope, metric)
stream1 := identity.OfStream(metricID1, dp)
stream2 := identity.OfStream(metricID2, dp)
assert.NotEqual(t, stream1, stream2,
"same metric from different resources (hosts) should have different stream identities")
}
// TestStreamIdentity_DifferentMetricName_DifferentStream 驗證不同 metric name 會產生不同的 stream
func TestStreamIdentity_DifferentMetricName_DifferentStream(t *testing.T) {
res := newResource("my-service", "host-1")
scope := newScope("otelcol/prometheus", "v0.90.0")
metric1 := newSumMetric("http_requests_total", "1")
metric2 := newSumMetric("http_request_duration_seconds", "s")
dp := pmetric.NewNumberDataPoint()
dp.Attributes().PutStr("method", "GET")
metricID1 := identity.OfResourceMetric(res, scope, metric1)
metricID2 := identity.OfResourceMetric(res, scope, metric2)
stream1 := identity.OfStream(metricID1, dp)
stream2 := identity.OfStream(metricID2, dp)
assert.NotEqual(t, stream1, stream2,
"different metric names should produce different stream identities")
}
// TestStreamIdentity_Collision_MultiplePodsWithoutInstanceLabel 展示多 Pod 發送相同 series 會導致 collision
// 這就是 ErrOlderStart 的常見原因
func TestStreamIdentity_Collision_MultiplePodsWithoutInstanceLabel(t *testing.T) {
scope := newScope("otelcol/prometheus", "v0.90.0")
metric := newSumMetric("payment_requests_total", "1")
dp := pmetric.NewNumberDataPoint()
dp.Attributes().PutStr("method", "credit_card")
t.Run("without pod name - collision occurs", func(t *testing.T) {
// 錯誤示範:Resource 沒有包含 pod name
resPodA := pcommon.NewResource()
resPodA.Attributes().PutStr("service.name", "payment-service")
// 沒有 k8s.pod.name!
resPodB := pcommon.NewResource()
resPodB.Attributes().PutStr("service.name", "payment-service")
// 沒有 k8s.pod.name!
metricIDA := identity.OfResourceMetric(resPodA, scope, metric)
metricIDB := identity.OfResourceMetric(resPodB, scope, metric)
streamA := identity.OfStream(metricIDA, dp)
streamB := identity.OfStream(metricIDB, dp)
// 這會導致 collision!兩個 Pod 的數據會被當成同一個 stream
assert.Equal(t, streamA, streamB,
"without instance labels, different pods produce the SAME stream identity - this causes ErrOlderStart!")
})
t.Run("with pod name - no collision", func(t *testing.T) {
// 正確示範:Resource 包含 pod name
resPodA := pcommon.NewResource()
resPodA.Attributes().PutStr("service.name", "payment-service")
resPodA.Attributes().PutStr("k8s.pod.name", "payment-service-abc123")
resPodB := pcommon.NewResource()
resPodB.Attributes().PutStr("service.name", "payment-service")
resPodB.Attributes().PutStr("k8s.pod.name", "payment-service-xyz789")
metricIDA := identity.OfResourceMetric(resPodA, scope, metric)
metricIDB := identity.OfResourceMetric(resPodB, scope, metric)
streamA := identity.OfStream(metricIDA, dp)
streamB := identity.OfStream(metricIDB, dp)
// 正確:不同 Pod 產生不同 stream,各自獨立累加
assert.NotEqual(t, streamA, streamB,
"with instance labels, different pods produce different stream identities - each accumulates independently")
})
}
// TestStreamIdentity_Components 驗證 stream identity 的各個組成部分
func TestStreamIdentity_Components(t *testing.T) {
baseRes := newResource("my-service", "host-1")
baseScope := newScope("otelcol/prometheus", "v0.90.0")
baseMetric := newSumMetric("http_requests_total", "1")
baseDP := pmetric.NewNumberDataPoint()
baseDP.Attributes().PutStr("method", "GET")
baseMetricID := identity.OfResourceMetric(baseRes, baseScope, baseMetric)
baseStream := identity.OfStream(baseMetricID, baseDP)
t.Run("changing resource.attributes changes stream", func(t *testing.T) {
diffRes := newResource("other-service", "host-1") // different service.name
metricID := identity.OfResourceMetric(diffRes, baseScope, baseMetric)
stream := identity.OfStream(metricID, baseDP)
assert.NotEqual(t, baseStream, stream)
})
t.Run("changing scope.name changes stream", func(t *testing.T) {
diffScope := newScope("otelcol/otlp", "v0.90.0") // different scope name
metricID := identity.OfResourceMetric(baseRes, diffScope, baseMetric)
stream := identity.OfStream(metricID, baseDP)
assert.NotEqual(t, baseStream, stream)
})
t.Run("changing scope.version changes stream", func(t *testing.T) {
diffScope := newScope("otelcol/prometheus", "v0.91.0") // different version
metricID := identity.OfResourceMetric(baseRes, diffScope, baseMetric)
stream := identity.OfStream(metricID, baseDP)
assert.NotEqual(t, baseStream, stream)
})
t.Run("changing metric.name changes stream", func(t *testing.T) {
diffMetric := newSumMetric("http_errors_total", "1") // different name
metricID := identity.OfResourceMetric(baseRes, baseScope, diffMetric)
stream := identity.OfStream(metricID, baseDP)
assert.NotEqual(t, baseStream, stream)
})
t.Run("changing metric.unit changes stream", func(t *testing.T) {
diffMetric := newSumMetric("http_requests_total", "ms") // different unit
metricID := identity.OfResourceMetric(baseRes, baseScope, diffMetric)
stream := identity.OfStream(metricID, baseDP)
assert.NotEqual(t, baseStream, stream)
})
t.Run("changing datapoint.attributes changes stream", func(t *testing.T) {
diffDP := pmetric.NewNumberDataPoint()
diffDP.Attributes().PutStr("method", "POST") // different attribute value
stream := identity.OfStream(baseMetricID, diffDP)
assert.NotEqual(t, baseStream, stream)
})
}
// TestStreamIdentity_NotAffectedBy 驗證哪些欄位不影響 stream identity
func TestStreamIdentity_NotAffectedBy(t *testing.T) {
res := newResource("my-service", "host-1")
scope := newScope("otelcol/prometheus", "v0.90.0")
metric := newSumMetric("http_requests_total", "1")
metricID := identity.OfResourceMetric(res, scope, metric)
// 基準 datapoint
baseDP := pmetric.NewNumberDataPoint()
baseDP.Attributes().PutStr("method", "GET")
baseDP.SetStartTimestamp(1000)
baseDP.SetTimestamp(1100)
baseDP.SetIntValue(100)
baseStream := identity.OfStream(metricID, baseDP)
t.Run("timestamp does not affect identity", func(t *testing.T) {
dp := pmetric.NewNumberDataPoint()
dp.Attributes().PutStr("method", "GET")
dp.SetStartTimestamp(1000)
dp.SetTimestamp(9999) // different timestamp
dp.SetIntValue(100)
stream := identity.OfStream(metricID, dp)
assert.Equal(t, baseStream, stream, "timestamp should not affect stream identity")
})
t.Run("start_timestamp does not affect identity", func(t *testing.T) {
dp := pmetric.NewNumberDataPoint()
dp.Attributes().PutStr("method", "GET")
dp.SetStartTimestamp(5000) // different start timestamp
dp.SetTimestamp(1100)
dp.SetIntValue(100)
stream := identity.OfStream(metricID, dp)
assert.Equal(t, baseStream, stream, "start_timestamp should not affect stream identity")
})
t.Run("value does not affect identity", func(t *testing.T) {
dp := pmetric.NewNumberDataPoint()
dp.Attributes().PutStr("method", "GET")
dp.SetStartTimestamp(1000)
dp.SetTimestamp(1100)
dp.SetIntValue(99999) // different value
stream := identity.OfStream(metricID, dp)
assert.Equal(t, baseStream, stream, "value should not affect stream identity")
})
}
// TestStreamIdentity_HashConsistency 驗證 hash 的一致性
func TestStreamIdentity_HashConsistency(t *testing.T) {
res := newResource("my-service", "host-1")
scope := newScope("otelcol/prometheus", "v0.90.0")
metric := newSumMetric("http_requests_total", "1")
metricID := identity.OfResourceMetric(res, scope, metric)
dp := pmetric.NewNumberDataPoint()
dp.Attributes().PutStr("method", "GET")
dp.Attributes().PutStr("status", "200")
stream := identity.OfStream(metricID, dp)
// 多次計算應該得到相同的 hash
for range 100 {
dp2 := pmetric.NewNumberDataPoint()
dp2.Attributes().PutStr("method", "GET")
dp2.Attributes().PutStr("status", "200")
stream2 := identity.OfStream(metricID, dp2)
require.Equal(t, stream, stream2, "hash should be consistent across multiple calculations")
}
}
// === Helper functions ===
func newResource(serviceName, hostName string) pcommon.Resource {
res := pcommon.NewResource()
res.Attributes().PutStr("service.name", serviceName)
res.Attributes().PutStr("host.name", hostName)
return res
}
func newScope(name, version string) pcommon.InstrumentationScope {
scope := pcommon.NewInstrumentationScope()
scope.SetName(name)
scope.SetVersion(version)
return scope
}
func newSumMetric(name, unit string) pmetric.Metric {
metric := pmetric.NewMetric()
metric.SetName(name)
metric.SetUnit(unit)
sum := metric.SetEmptySum()
sum.SetIsMonotonic(true)
sum.SetAggregationTemporality(pmetric.AggregationTemporalityDelta)
return metric
}
// =============================================================================
// Delta Aggregation Error Tests (ErrOlderStart & ErrOutOfOrder)
// =============================================================================
// noopAggregate 是一個不做任何事的聚合函數,用於測試
func noopAggregate(_, _ pmetric.NumberDataPoint) error {
return nil
}
// TestDeltaAggregate_FirstSample 驗證第一筆數據會直接存入 state
func TestDeltaAggregate_FirstSample(t *testing.T) {
state := pmetric.NewNumberDataPoint()
// state.Timestamp() == 0 表示是空的
dp := pmetric.NewNumberDataPoint()
dp.SetStartTimestamp(1000)
dp.SetTimestamp(1100)
dp.SetIntValue(100)
err := delta.Aggregate(state, dp, noopAggregate)
require.NoError(t, err, "first sample should not return error")
assert.Equal(t, pcommon.Timestamp(1000), state.StartTimestamp())
assert.Equal(t, pcommon.Timestamp(1100), state.Timestamp())
assert.Equal(t, int64(100), state.IntValue())
}
// TestDeltaAggregate_NormalSequence 驗證正常的數據序列
func TestDeltaAggregate_NormalSequence(t *testing.T) {
state := pmetric.NewNumberDataPoint()
// 第一筆: StartTimestamp=1000, Timestamp=1100
dp1 := pmetric.NewNumberDataPoint()
dp1.SetStartTimestamp(1000)
dp1.SetTimestamp(1100)
dp1.SetIntValue(100)
err := delta.Aggregate(state, dp1, noopAggregate)
require.NoError(t, err)
// 第二筆: StartTimestamp=1100 (接續上一筆), Timestamp=1200
dp2 := pmetric.NewNumberDataPoint()
dp2.SetStartTimestamp(1100) // >= state.StartTimestamp (1000) ✓
dp2.SetTimestamp(1200) // > state.Timestamp (1100) ✓
dp2.SetIntValue(50)
err = delta.Aggregate(state, dp2, noopAggregate)
require.NoError(t, err, "normal sequence should not return error")
assert.Equal(t, pcommon.Timestamp(1200), state.Timestamp(), "state timestamp should be updated")
}
// TestDeltaAggregate_ErrOutOfOrder 驗證 out-of-order 錯誤的觸發
func TestDeltaAggregate_ErrOutOfOrder(t *testing.T) {
testCases := []struct {
name string
stateStart pcommon.Timestamp
stateTimestamp pcommon.Timestamp
dpStart pcommon.Timestamp
dpTimestamp pcommon.Timestamp
expectError bool
}{
{
name: "dp.Timestamp < state.Timestamp → ErrOutOfOrder",
stateStart: 1000,
stateTimestamp: 1200,
dpStart: 1000, // >= state.StartTimestamp ✓ (不會觸發 ErrOlderStart)
dpTimestamp: 1100, // < state.Timestamp ✗
expectError: true,
},
{
name: "dp.Timestamp == state.Timestamp → ErrOutOfOrder",
stateStart: 1000,
stateTimestamp: 1200,
dpStart: 1000, // >= state.StartTimestamp ✓
dpTimestamp: 1200, // == state.Timestamp ✗
expectError: true,
},
{
name: "dp.Timestamp > state.Timestamp → OK",
stateStart: 1000,
stateTimestamp: 1200,
dpStart: 1000, // >= state.StartTimestamp ✓
dpTimestamp: 1300, // > state.Timestamp ✓
expectError: false,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// 設置 state
state := pmetric.NewNumberDataPoint()
state.SetStartTimestamp(tc.stateStart)
state.SetTimestamp(tc.stateTimestamp)
state.SetIntValue(100)
// 設置新的 datapoint
dp := pmetric.NewNumberDataPoint()
dp.SetStartTimestamp(tc.dpStart)
dp.SetTimestamp(tc.dpTimestamp)
dp.SetIntValue(50)
err := delta.Aggregate(state, dp, noopAggregate)
if tc.expectError {
require.Error(t, err)
// 驗證錯誤類型和內容
var outOfOrder delta.ErrOutOfOrder
require.ErrorAs(t, err, &outOfOrder,
"expected ErrOutOfOrder, got %T", err)
assert.Equal(t, tc.stateTimestamp, outOfOrder.Last,
"ErrOutOfOrder.Last should be state's timestamp")
assert.Equal(t, tc.dpTimestamp, outOfOrder.Sample,
"ErrOutOfOrder.Sample should be dp's timestamp")
} else {
require.NoError(t, err)
}
})
}
}
// TestDeltaAggregate_ErrOlderStart 驗證 older start 錯誤的觸發
func TestDeltaAggregate_ErrOlderStart(t *testing.T) {
testCases := []struct {
name string
stateStart pcommon.Timestamp
stateTimestamp pcommon.Timestamp
dpStart pcommon.Timestamp
dpTimestamp pcommon.Timestamp
expectError bool
}{
{
name: "dp.StartTimestamp < state.StartTimestamp → ErrOlderStart",
stateStart: 1000,
stateTimestamp: 1200,
dpStart: 500, // < state.StartTimestamp ✗
dpTimestamp: 600, // 這個不重要,因為會先觸發 ErrOlderStart
expectError: true,
},
{
name: "dp.StartTimestamp == state.StartTimestamp → OK (check next condition)",
stateStart: 1000,
stateTimestamp: 1200,
dpStart: 1000, // == state.StartTimestamp ✓
dpTimestamp: 1300, // > state.Timestamp ✓
expectError: false,
},
{
name: "dp.StartTimestamp > state.StartTimestamp → OK (check next condition)",
stateStart: 1000,
stateTimestamp: 1200,
dpStart: 1100, // > state.StartTimestamp ✓
dpTimestamp: 1300, // > state.Timestamp ✓
expectError: false,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// 設置 state
state := pmetric.NewNumberDataPoint()
state.SetStartTimestamp(tc.stateStart)
state.SetTimestamp(tc.stateTimestamp)
state.SetIntValue(100)
// 設置新的 datapoint
dp := pmetric.NewNumberDataPoint()
dp.SetStartTimestamp(tc.dpStart)
dp.SetTimestamp(tc.dpTimestamp)
dp.SetIntValue(50)
err := delta.Aggregate(state, dp, noopAggregate)
if tc.expectError {
require.Error(t, err)
// 驗證錯誤類型和內容
var olderStart delta.ErrOlderStart
require.ErrorAs(t, err, &olderStart,
"expected ErrOlderStart, got %T", err)
assert.Equal(t, tc.stateStart, olderStart.Start,
"ErrOlderStart.Start should be state's start timestamp")
assert.Equal(t, tc.dpStart, olderStart.Sample,
"ErrOlderStart.Sample should be dp's start timestamp")
} else {
require.NoError(t, err)
}
})
}
}
// TestDeltaAggregate_ErrorPriority 驗證錯誤的優先順序
// ErrOlderStart 會先於 ErrOutOfOrder 被檢查
func TestDeltaAggregate_ErrorPriority(t *testing.T) {
// 設置 state
state := pmetric.NewNumberDataPoint()
state.SetStartTimestamp(1000)
state.SetTimestamp(1200)
// 這個 dp 同時滿足兩個錯誤條件:
// - dp.StartTimestamp (500) < state.StartTimestamp (1000) → ErrOlderStart
// - dp.Timestamp (600) < state.Timestamp (1200) → ErrOutOfOrder
dp := pmetric.NewNumberDataPoint()
dp.SetStartTimestamp(500) // < 1000, 會觸發 ErrOlderStart
dp.SetTimestamp(600) // < 1200, 也會觸發 ErrOutOfOrder (但不會被執行到)
err := delta.Aggregate(state, dp, noopAggregate)
require.Error(t, err)
// 驗證是 ErrOlderStart 而不是 ErrOutOfOrder
var olderStart delta.ErrOlderStart
require.ErrorAs(t, err, &olderStart,
"ErrOlderStart should be returned first")
// ErrOutOfOrder 不應該被返回
var outOfOrder delta.ErrOutOfOrder
assert.NotErrorAs(t, err, &outOfOrder,
"ErrOutOfOrder should NOT be returned when ErrOlderStart applies")
}
// TestDeltaAggregate_RealWorldScenario_MultiplePodsCollision 模擬多 Pod 發送相同 series 的場景
func TestDeltaAggregate_RealWorldScenario_MultiplePodsCollision(t *testing.T) {
t.Log(`
場景:兩個 Pod (A 和 B) 發送相同的 metric series (因為沒有 instance label)
由於 stream identity 相同,它們的數據會被送到同一個 state 進行聚合
時間線:
Pod A 啟動於 T=1000,發送數據
Pod B 啟動於 T=500 (較早),但數據較晚到達 collector
結果:Pod B 的數據因為 StartTimestamp < state.StartTimestamp 而被丟棄
`)
// Shared state (因為兩個 Pod 的 stream identity 相同)
state := pmetric.NewNumberDataPoint()
// Pod A 的數據先到達
dpPodA := pmetric.NewNumberDataPoint()
dpPodA.SetStartTimestamp(1000) // Pod A 啟動時間
dpPodA.SetTimestamp(1100)
dpPodA.SetIntValue(100)
err := delta.Aggregate(state, dpPodA, noopAggregate)
require.NoError(t, err, "Pod A's first sample should succeed")
t.Logf("After Pod A: state.StartTimestamp=%d, state.Timestamp=%d",
state.StartTimestamp(), state.Timestamp())
// Pod B 的數據較晚到達,但它的 StartTimestamp 較早
dpPodB := pmetric.NewNumberDataPoint()
dpPodB.SetStartTimestamp(500) // Pod B 啟動時間 (較早)
dpPodB.SetTimestamp(600)
dpPodB.SetIntValue(50)
err = delta.Aggregate(state, dpPodB, noopAggregate)
// Pod B 的數據會被拒絕
require.Error(t, err)
var olderStart delta.ErrOlderStart
require.ErrorAs(t, err, &olderStart)
t.Logf("Pod B rejected: %v", err)
t.Log("這就是為什麼需要在 resource attributes 中加入 instance label (如 k8s.pod.name)")
}
// TestDeltaAggregate_RealWorldScenario_NetworkDelay 模擬網路延遲導致的 out-of-order
func TestDeltaAggregate_RealWorldScenario_NetworkDelay(t *testing.T) {
t.Log(`
場景:同一個來源的數據因為網路延遲而亂序到達
時間線:
T=1100 的數據先發送
T=1200 的數據後發送
但因為網路問題,T=1200 先到達,T=1100 後到達
結果:T=1100 的數據因為 Timestamp <= state.Timestamp 而被丟棄
`)
state := pmetric.NewNumberDataPoint()
// T=1200 的數據先到達
dp1 := pmetric.NewNumberDataPoint()
dp1.SetStartTimestamp(1000)
dp1.SetTimestamp(1200) // 較晚的數據先到達
dp1.SetIntValue(200)
err := delta.Aggregate(state, dp1, noopAggregate)
require.NoError(t, err)
t.Logf("After first arrival: state.Timestamp=%d", state.Timestamp())
// T=1100 的數據後到達 (因為網路延遲)
dp2 := pmetric.NewNumberDataPoint()
dp2.SetStartTimestamp(1000) // 相同的 series
dp2.SetTimestamp(1100) // < state.Timestamp (1200)
dp2.SetIntValue(100)
err = delta.Aggregate(state, dp2, noopAggregate)
// 這筆數據會被拒絕
require.Error(t, err)
var outOfOrder delta.ErrOutOfOrder
require.ErrorAs(t, err, &outOfOrder)
t.Logf("Delayed data rejected: %v", err)
assert.Equal(t, pcommon.Timestamp(1200), outOfOrder.Last)
assert.Equal(t, pcommon.Timestamp(1100), outOfOrder.Sample)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment