Last active
February 3, 2026 07:18
-
-
Save tedmax100/0a4a03fc3269b452f11344787c644491 to your computer and use it in GitHub Desktop.
identity_demo_test.go
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // Copyright The OpenTelemetry Authors | |
| // SPDX-License-Identifier: Apache-2.0 | |
| // 檔案位置: processor/deltatocumulativeprocessor/internal/identity_demo_test.go | |
| package internal | |
| import ( | |
| "testing" | |
| "github.com/stretchr/testify/assert" | |
| "github.com/stretchr/testify/require" | |
| "go.opentelemetry.io/collector/pdata/pcommon" | |
| "go.opentelemetry.io/collector/pdata/pmetric" | |
| "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" | |
| "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/delta" | |
| ) | |
| // TestStreamIdentity_SameAttributes_SameStream 驗證相同 attributes 會產生相同的 stream identity | |
| // 即使 timestamp 和 value 不同 | |
| func TestStreamIdentity_SameAttributes_SameStream(t *testing.T) { | |
| res := newResource("my-service", "host-1") | |
| scope := newScope("otelcol/prometheus", "v0.90.0") | |
| metric := newSumMetric("http_requests_total", "1") | |
| // DataPoint 1: timestamp=1100, value=100 | |
| dp1 := metric.Sum().DataPoints().AppendEmpty() | |
| dp1.Attributes().PutStr("method", "GET") | |
| dp1.Attributes().PutStr("status", "200") | |
| dp1.SetStartTimestamp(1000) | |
| dp1.SetTimestamp(1100) | |
| dp1.SetIntValue(100) | |
| // DataPoint 2: 相同 attributes,但不同 timestamp 和 value | |
| dp2 := metric.Sum().DataPoints().AppendEmpty() | |
| dp2.Attributes().PutStr("method", "GET") | |
| dp2.Attributes().PutStr("status", "200") | |
| dp2.SetStartTimestamp(2000) // 不同 | |
| dp2.SetTimestamp(2100) // 不同 | |
| dp2.SetIntValue(200) // 不同 | |
| metricID := identity.OfResourceMetric(res, scope, metric) | |
| stream1 := identity.OfStream(metricID, dp1) | |
| stream2 := identity.OfStream(metricID, dp2) | |
| // 關鍵驗證:相同 attributes = 相同 stream | |
| assert.Equal(t, stream1, stream2, | |
| "datapoints with same attributes should have same stream identity, "+ | |
| "regardless of timestamp/value differences") | |
| } | |
| // TestStreamIdentity_DifferentAttributes_DifferentStream 驗證不同 attributes 會產生不同的 stream | |
| func TestStreamIdentity_DifferentAttributes_DifferentStream(t *testing.T) { | |
| res := newResource("my-service", "host-1") | |
| scope := newScope("otelcol/prometheus", "v0.90.0") | |
| metric := newSumMetric("http_requests_total", "1") | |
| metricID := identity.OfResourceMetric(res, scope, metric) | |
| testCases := []struct { | |
| name string | |
| attrs1 map[string]string | |
| attrs2 map[string]string | |
| }{ | |
| { | |
| name: "different status label", | |
| attrs1: map[string]string{"method": "GET", "status": "200"}, | |
| attrs2: map[string]string{"method": "GET", "status": "500"}, | |
| }, | |
| { | |
| name: "different method label", | |
| attrs1: map[string]string{"method": "GET", "status": "200"}, | |
| attrs2: map[string]string{"method": "POST", "status": "200"}, | |
| }, | |
| { | |
| name: "extra label", | |
| attrs1: map[string]string{"method": "GET"}, | |
| attrs2: map[string]string{"method": "GET", "handler": "/api"}, | |
| }, | |
| { | |
| name: "missing label", | |
| attrs1: map[string]string{"method": "GET", "status": "200"}, | |
| attrs2: map[string]string{"method": "GET"}, | |
| }, | |
| } | |
| for _, tc := range testCases { | |
| t.Run(tc.name, func(t *testing.T) { | |
| dp1 := pmetric.NewNumberDataPoint() | |
| for k, v := range tc.attrs1 { | |
| dp1.Attributes().PutStr(k, v) | |
| } | |
| dp2 := pmetric.NewNumberDataPoint() | |
| for k, v := range tc.attrs2 { | |
| dp2.Attributes().PutStr(k, v) | |
| } | |
| stream1 := identity.OfStream(metricID, dp1) | |
| stream2 := identity.OfStream(metricID, dp2) | |
| assert.NotEqual(t, stream1, stream2, | |
| "datapoints with different attributes should have different stream identities") | |
| }) | |
| } | |
| } | |
| // TestStreamIdentity_DifferentResource_DifferentStream 驗證不同 Resource 會產生不同的 stream | |
| func TestStreamIdentity_DifferentResource_DifferentStream(t *testing.T) { | |
| res1 := newResource("my-service", "host-1") | |
| res2 := newResource("my-service", "host-2") // 不同的 host | |
| scope := newScope("otelcol/prometheus", "v0.90.0") | |
| metric := newSumMetric("http_requests_total", "1") | |
| dp := pmetric.NewNumberDataPoint() | |
| dp.Attributes().PutStr("method", "GET") | |
| metricID1 := identity.OfResourceMetric(res1, scope, metric) | |
| metricID2 := identity.OfResourceMetric(res2, scope, metric) | |
| stream1 := identity.OfStream(metricID1, dp) | |
| stream2 := identity.OfStream(metricID2, dp) | |
| assert.NotEqual(t, stream1, stream2, | |
| "same metric from different resources (hosts) should have different stream identities") | |
| } | |
| // TestStreamIdentity_DifferentMetricName_DifferentStream 驗證不同 metric name 會產生不同的 stream | |
| func TestStreamIdentity_DifferentMetricName_DifferentStream(t *testing.T) { | |
| res := newResource("my-service", "host-1") | |
| scope := newScope("otelcol/prometheus", "v0.90.0") | |
| metric1 := newSumMetric("http_requests_total", "1") | |
| metric2 := newSumMetric("http_request_duration_seconds", "s") | |
| dp := pmetric.NewNumberDataPoint() | |
| dp.Attributes().PutStr("method", "GET") | |
| metricID1 := identity.OfResourceMetric(res, scope, metric1) | |
| metricID2 := identity.OfResourceMetric(res, scope, metric2) | |
| stream1 := identity.OfStream(metricID1, dp) | |
| stream2 := identity.OfStream(metricID2, dp) | |
| assert.NotEqual(t, stream1, stream2, | |
| "different metric names should produce different stream identities") | |
| } | |
| // TestStreamIdentity_Collision_MultiplePodsWithoutInstanceLabel 展示多 Pod 發送相同 series 會導致 collision | |
| // 這就是 ErrOlderStart 的常見原因 | |
| func TestStreamIdentity_Collision_MultiplePodsWithoutInstanceLabel(t *testing.T) { | |
| scope := newScope("otelcol/prometheus", "v0.90.0") | |
| metric := newSumMetric("payment_requests_total", "1") | |
| dp := pmetric.NewNumberDataPoint() | |
| dp.Attributes().PutStr("method", "credit_card") | |
| t.Run("without pod name - collision occurs", func(t *testing.T) { | |
| // 錯誤示範:Resource 沒有包含 pod name | |
| resPodA := pcommon.NewResource() | |
| resPodA.Attributes().PutStr("service.name", "payment-service") | |
| // 沒有 k8s.pod.name! | |
| resPodB := pcommon.NewResource() | |
| resPodB.Attributes().PutStr("service.name", "payment-service") | |
| // 沒有 k8s.pod.name! | |
| metricIDA := identity.OfResourceMetric(resPodA, scope, metric) | |
| metricIDB := identity.OfResourceMetric(resPodB, scope, metric) | |
| streamA := identity.OfStream(metricIDA, dp) | |
| streamB := identity.OfStream(metricIDB, dp) | |
| // 這會導致 collision!兩個 Pod 的數據會被當成同一個 stream | |
| assert.Equal(t, streamA, streamB, | |
| "without instance labels, different pods produce the SAME stream identity - this causes ErrOlderStart!") | |
| }) | |
| t.Run("with pod name - no collision", func(t *testing.T) { | |
| // 正確示範:Resource 包含 pod name | |
| resPodA := pcommon.NewResource() | |
| resPodA.Attributes().PutStr("service.name", "payment-service") | |
| resPodA.Attributes().PutStr("k8s.pod.name", "payment-service-abc123") | |
| resPodB := pcommon.NewResource() | |
| resPodB.Attributes().PutStr("service.name", "payment-service") | |
| resPodB.Attributes().PutStr("k8s.pod.name", "payment-service-xyz789") | |
| metricIDA := identity.OfResourceMetric(resPodA, scope, metric) | |
| metricIDB := identity.OfResourceMetric(resPodB, scope, metric) | |
| streamA := identity.OfStream(metricIDA, dp) | |
| streamB := identity.OfStream(metricIDB, dp) | |
| // 正確:不同 Pod 產生不同 stream,各自獨立累加 | |
| assert.NotEqual(t, streamA, streamB, | |
| "with instance labels, different pods produce different stream identities - each accumulates independently") | |
| }) | |
| } | |
| // TestStreamIdentity_Components 驗證 stream identity 的各個組成部分 | |
| func TestStreamIdentity_Components(t *testing.T) { | |
| baseRes := newResource("my-service", "host-1") | |
| baseScope := newScope("otelcol/prometheus", "v0.90.0") | |
| baseMetric := newSumMetric("http_requests_total", "1") | |
| baseDP := pmetric.NewNumberDataPoint() | |
| baseDP.Attributes().PutStr("method", "GET") | |
| baseMetricID := identity.OfResourceMetric(baseRes, baseScope, baseMetric) | |
| baseStream := identity.OfStream(baseMetricID, baseDP) | |
| t.Run("changing resource.attributes changes stream", func(t *testing.T) { | |
| diffRes := newResource("other-service", "host-1") // different service.name | |
| metricID := identity.OfResourceMetric(diffRes, baseScope, baseMetric) | |
| stream := identity.OfStream(metricID, baseDP) | |
| assert.NotEqual(t, baseStream, stream) | |
| }) | |
| t.Run("changing scope.name changes stream", func(t *testing.T) { | |
| diffScope := newScope("otelcol/otlp", "v0.90.0") // different scope name | |
| metricID := identity.OfResourceMetric(baseRes, diffScope, baseMetric) | |
| stream := identity.OfStream(metricID, baseDP) | |
| assert.NotEqual(t, baseStream, stream) | |
| }) | |
| t.Run("changing scope.version changes stream", func(t *testing.T) { | |
| diffScope := newScope("otelcol/prometheus", "v0.91.0") // different version | |
| metricID := identity.OfResourceMetric(baseRes, diffScope, baseMetric) | |
| stream := identity.OfStream(metricID, baseDP) | |
| assert.NotEqual(t, baseStream, stream) | |
| }) | |
| t.Run("changing metric.name changes stream", func(t *testing.T) { | |
| diffMetric := newSumMetric("http_errors_total", "1") // different name | |
| metricID := identity.OfResourceMetric(baseRes, baseScope, diffMetric) | |
| stream := identity.OfStream(metricID, baseDP) | |
| assert.NotEqual(t, baseStream, stream) | |
| }) | |
| t.Run("changing metric.unit changes stream", func(t *testing.T) { | |
| diffMetric := newSumMetric("http_requests_total", "ms") // different unit | |
| metricID := identity.OfResourceMetric(baseRes, baseScope, diffMetric) | |
| stream := identity.OfStream(metricID, baseDP) | |
| assert.NotEqual(t, baseStream, stream) | |
| }) | |
| t.Run("changing datapoint.attributes changes stream", func(t *testing.T) { | |
| diffDP := pmetric.NewNumberDataPoint() | |
| diffDP.Attributes().PutStr("method", "POST") // different attribute value | |
| stream := identity.OfStream(baseMetricID, diffDP) | |
| assert.NotEqual(t, baseStream, stream) | |
| }) | |
| } | |
| // TestStreamIdentity_NotAffectedBy 驗證哪些欄位不影響 stream identity | |
| func TestStreamIdentity_NotAffectedBy(t *testing.T) { | |
| res := newResource("my-service", "host-1") | |
| scope := newScope("otelcol/prometheus", "v0.90.0") | |
| metric := newSumMetric("http_requests_total", "1") | |
| metricID := identity.OfResourceMetric(res, scope, metric) | |
| // 基準 datapoint | |
| baseDP := pmetric.NewNumberDataPoint() | |
| baseDP.Attributes().PutStr("method", "GET") | |
| baseDP.SetStartTimestamp(1000) | |
| baseDP.SetTimestamp(1100) | |
| baseDP.SetIntValue(100) | |
| baseStream := identity.OfStream(metricID, baseDP) | |
| t.Run("timestamp does not affect identity", func(t *testing.T) { | |
| dp := pmetric.NewNumberDataPoint() | |
| dp.Attributes().PutStr("method", "GET") | |
| dp.SetStartTimestamp(1000) | |
| dp.SetTimestamp(9999) // different timestamp | |
| dp.SetIntValue(100) | |
| stream := identity.OfStream(metricID, dp) | |
| assert.Equal(t, baseStream, stream, "timestamp should not affect stream identity") | |
| }) | |
| t.Run("start_timestamp does not affect identity", func(t *testing.T) { | |
| dp := pmetric.NewNumberDataPoint() | |
| dp.Attributes().PutStr("method", "GET") | |
| dp.SetStartTimestamp(5000) // different start timestamp | |
| dp.SetTimestamp(1100) | |
| dp.SetIntValue(100) | |
| stream := identity.OfStream(metricID, dp) | |
| assert.Equal(t, baseStream, stream, "start_timestamp should not affect stream identity") | |
| }) | |
| t.Run("value does not affect identity", func(t *testing.T) { | |
| dp := pmetric.NewNumberDataPoint() | |
| dp.Attributes().PutStr("method", "GET") | |
| dp.SetStartTimestamp(1000) | |
| dp.SetTimestamp(1100) | |
| dp.SetIntValue(99999) // different value | |
| stream := identity.OfStream(metricID, dp) | |
| assert.Equal(t, baseStream, stream, "value should not affect stream identity") | |
| }) | |
| } | |
| // TestStreamIdentity_HashConsistency 驗證 hash 的一致性 | |
| func TestStreamIdentity_HashConsistency(t *testing.T) { | |
| res := newResource("my-service", "host-1") | |
| scope := newScope("otelcol/prometheus", "v0.90.0") | |
| metric := newSumMetric("http_requests_total", "1") | |
| metricID := identity.OfResourceMetric(res, scope, metric) | |
| dp := pmetric.NewNumberDataPoint() | |
| dp.Attributes().PutStr("method", "GET") | |
| dp.Attributes().PutStr("status", "200") | |
| stream := identity.OfStream(metricID, dp) | |
| // 多次計算應該得到相同的 hash | |
| for range 100 { | |
| dp2 := pmetric.NewNumberDataPoint() | |
| dp2.Attributes().PutStr("method", "GET") | |
| dp2.Attributes().PutStr("status", "200") | |
| stream2 := identity.OfStream(metricID, dp2) | |
| require.Equal(t, stream, stream2, "hash should be consistent across multiple calculations") | |
| } | |
| } | |
| // === Helper functions === | |
| func newResource(serviceName, hostName string) pcommon.Resource { | |
| res := pcommon.NewResource() | |
| res.Attributes().PutStr("service.name", serviceName) | |
| res.Attributes().PutStr("host.name", hostName) | |
| return res | |
| } | |
| func newScope(name, version string) pcommon.InstrumentationScope { | |
| scope := pcommon.NewInstrumentationScope() | |
| scope.SetName(name) | |
| scope.SetVersion(version) | |
| return scope | |
| } | |
| func newSumMetric(name, unit string) pmetric.Metric { | |
| metric := pmetric.NewMetric() | |
| metric.SetName(name) | |
| metric.SetUnit(unit) | |
| sum := metric.SetEmptySum() | |
| sum.SetIsMonotonic(true) | |
| sum.SetAggregationTemporality(pmetric.AggregationTemporalityDelta) | |
| return metric | |
| } | |
| // ============================================================================= | |
| // Delta Aggregation Error Tests (ErrOlderStart & ErrOutOfOrder) | |
| // ============================================================================= | |
| // noopAggregate 是一個不做任何事的聚合函數,用於測試 | |
| func noopAggregate(_, _ pmetric.NumberDataPoint) error { | |
| return nil | |
| } | |
| // TestDeltaAggregate_FirstSample 驗證第一筆數據會直接存入 state | |
| func TestDeltaAggregate_FirstSample(t *testing.T) { | |
| state := pmetric.NewNumberDataPoint() | |
| // state.Timestamp() == 0 表示是空的 | |
| dp := pmetric.NewNumberDataPoint() | |
| dp.SetStartTimestamp(1000) | |
| dp.SetTimestamp(1100) | |
| dp.SetIntValue(100) | |
| err := delta.Aggregate(state, dp, noopAggregate) | |
| require.NoError(t, err, "first sample should not return error") | |
| assert.Equal(t, pcommon.Timestamp(1000), state.StartTimestamp()) | |
| assert.Equal(t, pcommon.Timestamp(1100), state.Timestamp()) | |
| assert.Equal(t, int64(100), state.IntValue()) | |
| } | |
| // TestDeltaAggregate_NormalSequence 驗證正常的數據序列 | |
| func TestDeltaAggregate_NormalSequence(t *testing.T) { | |
| state := pmetric.NewNumberDataPoint() | |
| // 第一筆: StartTimestamp=1000, Timestamp=1100 | |
| dp1 := pmetric.NewNumberDataPoint() | |
| dp1.SetStartTimestamp(1000) | |
| dp1.SetTimestamp(1100) | |
| dp1.SetIntValue(100) | |
| err := delta.Aggregate(state, dp1, noopAggregate) | |
| require.NoError(t, err) | |
| // 第二筆: StartTimestamp=1100 (接續上一筆), Timestamp=1200 | |
| dp2 := pmetric.NewNumberDataPoint() | |
| dp2.SetStartTimestamp(1100) // >= state.StartTimestamp (1000) ✓ | |
| dp2.SetTimestamp(1200) // > state.Timestamp (1100) ✓ | |
| dp2.SetIntValue(50) | |
| err = delta.Aggregate(state, dp2, noopAggregate) | |
| require.NoError(t, err, "normal sequence should not return error") | |
| assert.Equal(t, pcommon.Timestamp(1200), state.Timestamp(), "state timestamp should be updated") | |
| } | |
| // TestDeltaAggregate_ErrOutOfOrder 驗證 out-of-order 錯誤的觸發 | |
| func TestDeltaAggregate_ErrOutOfOrder(t *testing.T) { | |
| testCases := []struct { | |
| name string | |
| stateStart pcommon.Timestamp | |
| stateTimestamp pcommon.Timestamp | |
| dpStart pcommon.Timestamp | |
| dpTimestamp pcommon.Timestamp | |
| expectError bool | |
| }{ | |
| { | |
| name: "dp.Timestamp < state.Timestamp → ErrOutOfOrder", | |
| stateStart: 1000, | |
| stateTimestamp: 1200, | |
| dpStart: 1000, // >= state.StartTimestamp ✓ (不會觸發 ErrOlderStart) | |
| dpTimestamp: 1100, // < state.Timestamp ✗ | |
| expectError: true, | |
| }, | |
| { | |
| name: "dp.Timestamp == state.Timestamp → ErrOutOfOrder", | |
| stateStart: 1000, | |
| stateTimestamp: 1200, | |
| dpStart: 1000, // >= state.StartTimestamp ✓ | |
| dpTimestamp: 1200, // == state.Timestamp ✗ | |
| expectError: true, | |
| }, | |
| { | |
| name: "dp.Timestamp > state.Timestamp → OK", | |
| stateStart: 1000, | |
| stateTimestamp: 1200, | |
| dpStart: 1000, // >= state.StartTimestamp ✓ | |
| dpTimestamp: 1300, // > state.Timestamp ✓ | |
| expectError: false, | |
| }, | |
| } | |
| for _, tc := range testCases { | |
| t.Run(tc.name, func(t *testing.T) { | |
| // 設置 state | |
| state := pmetric.NewNumberDataPoint() | |
| state.SetStartTimestamp(tc.stateStart) | |
| state.SetTimestamp(tc.stateTimestamp) | |
| state.SetIntValue(100) | |
| // 設置新的 datapoint | |
| dp := pmetric.NewNumberDataPoint() | |
| dp.SetStartTimestamp(tc.dpStart) | |
| dp.SetTimestamp(tc.dpTimestamp) | |
| dp.SetIntValue(50) | |
| err := delta.Aggregate(state, dp, noopAggregate) | |
| if tc.expectError { | |
| require.Error(t, err) | |
| // 驗證錯誤類型和內容 | |
| var outOfOrder delta.ErrOutOfOrder | |
| require.ErrorAs(t, err, &outOfOrder, | |
| "expected ErrOutOfOrder, got %T", err) | |
| assert.Equal(t, tc.stateTimestamp, outOfOrder.Last, | |
| "ErrOutOfOrder.Last should be state's timestamp") | |
| assert.Equal(t, tc.dpTimestamp, outOfOrder.Sample, | |
| "ErrOutOfOrder.Sample should be dp's timestamp") | |
| } else { | |
| require.NoError(t, err) | |
| } | |
| }) | |
| } | |
| } | |
| // TestDeltaAggregate_ErrOlderStart 驗證 older start 錯誤的觸發 | |
| func TestDeltaAggregate_ErrOlderStart(t *testing.T) { | |
| testCases := []struct { | |
| name string | |
| stateStart pcommon.Timestamp | |
| stateTimestamp pcommon.Timestamp | |
| dpStart pcommon.Timestamp | |
| dpTimestamp pcommon.Timestamp | |
| expectError bool | |
| }{ | |
| { | |
| name: "dp.StartTimestamp < state.StartTimestamp → ErrOlderStart", | |
| stateStart: 1000, | |
| stateTimestamp: 1200, | |
| dpStart: 500, // < state.StartTimestamp ✗ | |
| dpTimestamp: 600, // 這個不重要,因為會先觸發 ErrOlderStart | |
| expectError: true, | |
| }, | |
| { | |
| name: "dp.StartTimestamp == state.StartTimestamp → OK (check next condition)", | |
| stateStart: 1000, | |
| stateTimestamp: 1200, | |
| dpStart: 1000, // == state.StartTimestamp ✓ | |
| dpTimestamp: 1300, // > state.Timestamp ✓ | |
| expectError: false, | |
| }, | |
| { | |
| name: "dp.StartTimestamp > state.StartTimestamp → OK (check next condition)", | |
| stateStart: 1000, | |
| stateTimestamp: 1200, | |
| dpStart: 1100, // > state.StartTimestamp ✓ | |
| dpTimestamp: 1300, // > state.Timestamp ✓ | |
| expectError: false, | |
| }, | |
| } | |
| for _, tc := range testCases { | |
| t.Run(tc.name, func(t *testing.T) { | |
| // 設置 state | |
| state := pmetric.NewNumberDataPoint() | |
| state.SetStartTimestamp(tc.stateStart) | |
| state.SetTimestamp(tc.stateTimestamp) | |
| state.SetIntValue(100) | |
| // 設置新的 datapoint | |
| dp := pmetric.NewNumberDataPoint() | |
| dp.SetStartTimestamp(tc.dpStart) | |
| dp.SetTimestamp(tc.dpTimestamp) | |
| dp.SetIntValue(50) | |
| err := delta.Aggregate(state, dp, noopAggregate) | |
| if tc.expectError { | |
| require.Error(t, err) | |
| // 驗證錯誤類型和內容 | |
| var olderStart delta.ErrOlderStart | |
| require.ErrorAs(t, err, &olderStart, | |
| "expected ErrOlderStart, got %T", err) | |
| assert.Equal(t, tc.stateStart, olderStart.Start, | |
| "ErrOlderStart.Start should be state's start timestamp") | |
| assert.Equal(t, tc.dpStart, olderStart.Sample, | |
| "ErrOlderStart.Sample should be dp's start timestamp") | |
| } else { | |
| require.NoError(t, err) | |
| } | |
| }) | |
| } | |
| } | |
| // TestDeltaAggregate_ErrorPriority 驗證錯誤的優先順序 | |
| // ErrOlderStart 會先於 ErrOutOfOrder 被檢查 | |
| func TestDeltaAggregate_ErrorPriority(t *testing.T) { | |
| // 設置 state | |
| state := pmetric.NewNumberDataPoint() | |
| state.SetStartTimestamp(1000) | |
| state.SetTimestamp(1200) | |
| // 這個 dp 同時滿足兩個錯誤條件: | |
| // - dp.StartTimestamp (500) < state.StartTimestamp (1000) → ErrOlderStart | |
| // - dp.Timestamp (600) < state.Timestamp (1200) → ErrOutOfOrder | |
| dp := pmetric.NewNumberDataPoint() | |
| dp.SetStartTimestamp(500) // < 1000, 會觸發 ErrOlderStart | |
| dp.SetTimestamp(600) // < 1200, 也會觸發 ErrOutOfOrder (但不會被執行到) | |
| err := delta.Aggregate(state, dp, noopAggregate) | |
| require.Error(t, err) | |
| // 驗證是 ErrOlderStart 而不是 ErrOutOfOrder | |
| var olderStart delta.ErrOlderStart | |
| require.ErrorAs(t, err, &olderStart, | |
| "ErrOlderStart should be returned first") | |
| // ErrOutOfOrder 不應該被返回 | |
| var outOfOrder delta.ErrOutOfOrder | |
| assert.NotErrorAs(t, err, &outOfOrder, | |
| "ErrOutOfOrder should NOT be returned when ErrOlderStart applies") | |
| } | |
| // TestDeltaAggregate_RealWorldScenario_MultiplePodsCollision 模擬多 Pod 發送相同 series 的場景 | |
| func TestDeltaAggregate_RealWorldScenario_MultiplePodsCollision(t *testing.T) { | |
| t.Log(` | |
| 場景:兩個 Pod (A 和 B) 發送相同的 metric series (因為沒有 instance label) | |
| 由於 stream identity 相同,它們的數據會被送到同一個 state 進行聚合 | |
| 時間線: | |
| Pod A 啟動於 T=1000,發送數據 | |
| Pod B 啟動於 T=500 (較早),但數據較晚到達 collector | |
| 結果:Pod B 的數據因為 StartTimestamp < state.StartTimestamp 而被丟棄 | |
| `) | |
| // Shared state (因為兩個 Pod 的 stream identity 相同) | |
| state := pmetric.NewNumberDataPoint() | |
| // Pod A 的數據先到達 | |
| dpPodA := pmetric.NewNumberDataPoint() | |
| dpPodA.SetStartTimestamp(1000) // Pod A 啟動時間 | |
| dpPodA.SetTimestamp(1100) | |
| dpPodA.SetIntValue(100) | |
| err := delta.Aggregate(state, dpPodA, noopAggregate) | |
| require.NoError(t, err, "Pod A's first sample should succeed") | |
| t.Logf("After Pod A: state.StartTimestamp=%d, state.Timestamp=%d", | |
| state.StartTimestamp(), state.Timestamp()) | |
| // Pod B 的數據較晚到達,但它的 StartTimestamp 較早 | |
| dpPodB := pmetric.NewNumberDataPoint() | |
| dpPodB.SetStartTimestamp(500) // Pod B 啟動時間 (較早) | |
| dpPodB.SetTimestamp(600) | |
| dpPodB.SetIntValue(50) | |
| err = delta.Aggregate(state, dpPodB, noopAggregate) | |
| // Pod B 的數據會被拒絕 | |
| require.Error(t, err) | |
| var olderStart delta.ErrOlderStart | |
| require.ErrorAs(t, err, &olderStart) | |
| t.Logf("Pod B rejected: %v", err) | |
| t.Log("這就是為什麼需要在 resource attributes 中加入 instance label (如 k8s.pod.name)") | |
| } | |
| // TestDeltaAggregate_RealWorldScenario_NetworkDelay 模擬網路延遲導致的 out-of-order | |
| func TestDeltaAggregate_RealWorldScenario_NetworkDelay(t *testing.T) { | |
| t.Log(` | |
| 場景:同一個來源的數據因為網路延遲而亂序到達 | |
| 時間線: | |
| T=1100 的數據先發送 | |
| T=1200 的數據後發送 | |
| 但因為網路問題,T=1200 先到達,T=1100 後到達 | |
| 結果:T=1100 的數據因為 Timestamp <= state.Timestamp 而被丟棄 | |
| `) | |
| state := pmetric.NewNumberDataPoint() | |
| // T=1200 的數據先到達 | |
| dp1 := pmetric.NewNumberDataPoint() | |
| dp1.SetStartTimestamp(1000) | |
| dp1.SetTimestamp(1200) // 較晚的數據先到達 | |
| dp1.SetIntValue(200) | |
| err := delta.Aggregate(state, dp1, noopAggregate) | |
| require.NoError(t, err) | |
| t.Logf("After first arrival: state.Timestamp=%d", state.Timestamp()) | |
| // T=1100 的數據後到達 (因為網路延遲) | |
| dp2 := pmetric.NewNumberDataPoint() | |
| dp2.SetStartTimestamp(1000) // 相同的 series | |
| dp2.SetTimestamp(1100) // < state.Timestamp (1200) | |
| dp2.SetIntValue(100) | |
| err = delta.Aggregate(state, dp2, noopAggregate) | |
| // 這筆數據會被拒絕 | |
| require.Error(t, err) | |
| var outOfOrder delta.ErrOutOfOrder | |
| require.ErrorAs(t, err, &outOfOrder) | |
| t.Logf("Delayed data rejected: %v", err) | |
| assert.Equal(t, pcommon.Timestamp(1200), outOfOrder.Last) | |
| assert.Equal(t, pcommon.Timestamp(1100), outOfOrder.Sample) | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment