Skip to content

Instantly share code, notes, and snippets.

@drahtzieher
Created June 27, 2025 15:55
Show Gist options
  • Select an option

  • Save drahtzieher/c12f3934883b6c8cabf0ab3d53ca8ac7 to your computer and use it in GitHub Desktop.

Select an option

Save drahtzieher/c12f3934883b6c8cabf0ab3d53ca8ac7 to your computer and use it in GitHub Desktop.
Nats Jetstream KV session store for SCS (with cbor encoder)
package sessionstore
import (
"context"
"fmt"
"time"
"github.com/fxamacker/cbor/v2"
"github.com/nats-io/nats.go/jetstream"
)
type JetStreamStore struct {
js jetstream.JetStream
kv jetstream.KeyValue
}
func CreateSessionBucket(js jetstream.JetStream, bucketName string) (jetstream.KeyValue, error) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
config := jetstream.KeyValueConfig{
Bucket: bucketName,
Description: "Session store for web application sessions",
History: 1,
TTL: 0,
Storage: jetstream.MemoryStorage,
LimitMarkerTTL: 24 * time.Hour,
}
kv, err := js.CreateOrUpdateKeyValue(ctx, config)
if err != nil {
return nil, fmt.Errorf("failed to create/update KeyValue bucket: %w", err)
}
return kv, nil
}
func New(js jetstream.JetStream, kv jetstream.KeyValue) *JetStreamStore {
return &JetStreamStore{
js: js,
kv: kv,
}
}
func (s *JetStreamStore) Find(token string) ([]byte, bool, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
return s.FindCtx(ctx, token)
}
func (s *JetStreamStore) FindCtx(ctx context.Context, token string) ([]byte, bool, error) {
entry, err := s.kv.Get(ctx, token)
if err != nil {
if err == jetstream.ErrKeyNotFound {
return nil, false, nil
}
return nil, false, fmt.Errorf("failed to get session data: %w", err)
}
var sessionData []byte
if err := cbor.Unmarshal(entry.Value(), &sessionData); err != nil {
return nil, false, fmt.Errorf("failed to unmarshal session data: %w", err)
}
return sessionData, true, nil
}
func (s *JetStreamStore) Commit(token string, data []byte, expiry time.Time) error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
return s.CommitCtx(ctx, token, data, expiry)
}
func (s *JetStreamStore) CommitCtx(ctx context.Context, token string, data []byte, expiry time.Time) error {
encodedData, err := cbor.Marshal(data)
if err != nil {
return fmt.Errorf("failed to marshal session data: %w", err)
}
ttl := time.Until(expiry)
if ttl <= 0 {
return fmt.Errorf("session expiry time is in the past")
}
_, err = s.kv.Create(ctx, token, encodedData, jetstream.KeyTTL(ttl))
if err != nil {
if err == jetstream.ErrKeyExists {
entry, getErr := s.kv.Get(ctx, token)
if getErr != nil {
return fmt.Errorf("failed to get existing session for update: %w", getErr)
}
_, err = s.kv.Update(ctx, token, encodedData, entry.Revision())
if err != nil {
return fmt.Errorf("failed to update session: %w", err)
}
} else {
return fmt.Errorf("failed to create session: %w", err)
}
}
return nil
}
func (s *JetStreamStore) Delete(token string) error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
return s.DeleteCtx(ctx, token)
}
func (s *JetStreamStore) DeleteCtx(ctx context.Context, token string) error {
err := s.kv.Purge(ctx, token)
if err != nil && err != jetstream.ErrKeyNotFound {
return fmt.Errorf("failed to delete session: %w", err)
}
return nil
}
func (s *JetStreamStore) AllCtx(ctx context.Context) (map[string][]byte, error) {
lister, err := s.kv.ListKeys(ctx)
if err != nil {
return nil, fmt.Errorf("failed to list keys: %w", err)
}
results := make(map[string][]byte)
for key := range lister.Keys() {
entry, err := s.kv.Get(ctx, key)
if err != nil {
if err == jetstream.ErrKeyNotFound {
continue
}
fmt.Printf("Error retrieving session %s: %v\n", key, err)
continue
}
var sessionData []byte
if err := cbor.Unmarshal(entry.Value(), &sessionData); err != nil {
fmt.Printf("Error unmarshaling session %s: %v\n", key, err)
continue
}
results[key] = sessionData
}
lister.Stop()
return results, nil
}
func (s *JetStreamStore) Status(ctx context.Context) (jetstream.KeyValueStatus, error) {
return s.kv.Status(ctx)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment