Skip to content

Instantly share code, notes, and snippets.

@samber
Last active October 20, 2025 11:31
Show Gist options
  • Select an option

  • Save samber/db9ba8ea0a25f3cfce9d19e904ff2d8b to your computer and use it in GitHub Desktop.

Select an option

Save samber/db9ba8ea0a25f3cfce9d19e904ff2d8b to your computer and use it in GitHub Desktop.
func MyCustomOperator[T, R any](project func(ctx context.Context, item T, index int64) (context.Context, R)) func(ro.Observable[T]) Observable[R] {
return func(source Observable[T]) Observable[R] {
return ro.NewUnsafeObservableWithContext(func(subscriberCtx context.Context, destination ro.Observer[R]) ro.Teardown {
// initialize external lib here (on subscription)
sub := source.SubscribeWithContext(
subscriberCtx,
ro.NewObserverWithContext(
func(ctx context.Context, value T) {
// handle message then pass message to the downstream operators:
// destination.Next(ctx, outputValue)
},
destination.ErrorWithContext,
destination.CompleteWithContext,
),
)
return func() {
sub.Unsubscribe()
// close lib on cancellation/error/completion
}
})
}
}
@l0rem1psum
Copy link

Thanks for the example. However, after some testing, it does not feel like the initialization and cleanup in done in the same goroutine as the processing. Here's my test code. Appreciate if you could let me know what I did wrong. Thank you:

package main

import (
	"bytes"
	"context"
	"fmt"
	"runtime"
	"strconv"
	"time"

	"github.com/samber/ro"
)

func main() {
	obs := ro.Pipe4(
		ro.Just("value-1", "value-2"),
		ro.ObserveOn[string](2),
		MyCustomOperator[string, string]("operator-1"),
		ro.ObserveOn[string](2),
		MyCustomOperator[string, string]("operator-2"),
	)

	sub := obs.Subscribe(nil)
	defer sub.Unsubscribe()

	// Wait for completion
	time.Sleep(4 * time.Second)
}

func MyCustomOperator[T, R any](operatorID string) func(ro.Observable[T]) ro.Observable[R] {
	return func(source ro.Observable[T]) ro.Observable[R] {
		return ro.NewUnsafeObservableWithContext(func(subscriberCtx context.Context, destination ro.Observer[R]) ro.Teardown {
			// initialize external lib here (on subscription)
			runtime.LockOSThread()
			fmt.Println("Initializing operartor", operatorID, "on GoroutineID:", getGoroutineID())

			sub := source.SubscribeWithContext(
				subscriberCtx,
				ro.NewObserverWithContext(
					func(ctx context.Context, value T) {
						fmt.Println("Operator", operatorID, "processing value:", value, "on GoroutineID:", getGoroutineID())
						destination.Next(any(value).(R))
					},
					destination.ErrorWithContext,
					destination.CompleteWithContext,
				),
			)

			return func() {
				sub.Unsubscribe()

				fmt.Println("Cleaning up operator", operatorID, "on GoroutineID:", getGoroutineID())
				runtime.UnlockOSThread()
				// close lib on cancellation/error/completion
			}
		})
	}
}

func getGoroutineID() uint64 {
	buf := make([]byte, 64)
	n := runtime.Stack(buf, false)
	buf = buf[:n]
	buf = bytes.TrimPrefix(buf, []byte("goroutine "))
	buf = buf[:bytes.IndexByte(buf, ' ')]
	id, _ := strconv.ParseUint(string(buf), 10, 64)
	return id
}

Sample output:

Initializing operartor operator-2 on GoroutineID: 1
Initializing operartor operator-1 on GoroutineID: 1
Operator operator-1 processing value: value-1 on GoroutineID: 7
Operator operator-1 processing value: value-2 on GoroutineID: 7
Operator operator-2 processing value: value-1 on GoroutineID: 6
Operator operator-2 processing value: value-2 on GoroutineID: 6
Cleaning up operator operator-1 on GoroutineID: 1
Cleaning up operator operator-2 on GoroutineID: 1

@samber
Copy link
Author

samber commented Oct 20, 2025

1- Yes, the initialization/unsubscription sticks to the main goroutine. The ObserveOn operator sends the downstream processing into a different goroutine, while SubscribeOn sends the upstream to a different goroutine.
2- In your example, you should use destination.NextWithContext(ctx, value) instead of destination.Next(value), because it breaks the context propagation.
3- In such an operator that returns the same type, you should have the following prototype: func MyCustomOperator[T any](operatorID string) func(ro.Observable[T]) ro.Observable[T]. It will allow you not to cast the value.

If you want to call runtime.LockOSThread() and runtime.UnlockOSThread() in the same goroutine, you should do this:

obs := ro.Pipe4(
    ro.Just("value-1", "value-2"),
    ro.SubscribeOn[string](2),
    ro.TapOnSubscribe(func() {
         runtime.LockOSThread()
    }),
    MyCustomOperator[string, string]("operator-1"),
    ro.TapOnFinalize(func() {
         runtime.UnlockOSThread()
    }),
    ro.SubscribeOn[string](2),
    ...
)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment