Last active
October 20, 2025 11:31
-
-
Save samber/db9ba8ea0a25f3cfce9d19e904ff2d8b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| func MyCustomOperator[T, R any](project func(ctx context.Context, item T, index int64) (context.Context, R)) func(ro.Observable[T]) Observable[R] { | |
| return func(source Observable[T]) Observable[R] { | |
| return ro.NewUnsafeObservableWithContext(func(subscriberCtx context.Context, destination ro.Observer[R]) ro.Teardown { | |
| // initialize external lib here (on subscription) | |
| sub := source.SubscribeWithContext( | |
| subscriberCtx, | |
| ro.NewObserverWithContext( | |
| func(ctx context.Context, value T) { | |
| // handle message then pass message to the downstream operators: | |
| // destination.Next(ctx, outputValue) | |
| }, | |
| destination.ErrorWithContext, | |
| destination.CompleteWithContext, | |
| ), | |
| ) | |
| return func() { | |
| sub.Unsubscribe() | |
| // close lib on cancellation/error/completion | |
| } | |
| }) | |
| } | |
| } |
Author
1- Yes, the initialization/unsubscription sticks to the main goroutine. The ObserveOn operator sends the downstream processing into a different goroutine, while SubscribeOn sends the upstream to a different goroutine.
2- In your example, you should use destination.NextWithContext(ctx, value) instead of destination.Next(value), because it breaks the context propagation.
3- In such an operator that returns the same type, you should have the following prototype: func MyCustomOperator[T any](operatorID string) func(ro.Observable[T]) ro.Observable[T]. It will allow you not to cast the value.
If you want to call runtime.LockOSThread() and runtime.UnlockOSThread() in the same goroutine, you should do this:
obs := ro.Pipe4(
ro.Just("value-1", "value-2"),
ro.SubscribeOn[string](2),
ro.TapOnSubscribe(func() {
runtime.LockOSThread()
}),
MyCustomOperator[string, string]("operator-1"),
ro.TapOnFinalize(func() {
runtime.UnlockOSThread()
}),
ro.SubscribeOn[string](2),
...
)
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Thanks for the example. However, after some testing, it does not feel like the initialization and cleanup in done in the same goroutine as the processing. Here's my test code. Appreciate if you could let me know what I did wrong. Thank you:
Sample output: