Created
June 27, 2025 15:55
-
-
Save drahtzieher/c12f3934883b6c8cabf0ab3d53ca8ac7 to your computer and use it in GitHub Desktop.
Nats Jetstream KV session store for SCS (with cbor encoder)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package sessionstore | |
| import ( | |
| "context" | |
| "fmt" | |
| "time" | |
| "github.com/fxamacker/cbor/v2" | |
| "github.com/nats-io/nats.go/jetstream" | |
| ) | |
| type JetStreamStore struct { | |
| js jetstream.JetStream | |
| kv jetstream.KeyValue | |
| } | |
| func CreateSessionBucket(js jetstream.JetStream, bucketName string) (jetstream.KeyValue, error) { | |
| ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) | |
| defer cancel() | |
| config := jetstream.KeyValueConfig{ | |
| Bucket: bucketName, | |
| Description: "Session store for web application sessions", | |
| History: 1, | |
| TTL: 0, | |
| Storage: jetstream.MemoryStorage, | |
| LimitMarkerTTL: 24 * time.Hour, | |
| } | |
| kv, err := js.CreateOrUpdateKeyValue(ctx, config) | |
| if err != nil { | |
| return nil, fmt.Errorf("failed to create/update KeyValue bucket: %w", err) | |
| } | |
| return kv, nil | |
| } | |
| func New(js jetstream.JetStream, kv jetstream.KeyValue) *JetStreamStore { | |
| return &JetStreamStore{ | |
| js: js, | |
| kv: kv, | |
| } | |
| } | |
| func (s *JetStreamStore) Find(token string) ([]byte, bool, error) { | |
| ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) | |
| defer cancel() | |
| return s.FindCtx(ctx, token) | |
| } | |
| func (s *JetStreamStore) FindCtx(ctx context.Context, token string) ([]byte, bool, error) { | |
| entry, err := s.kv.Get(ctx, token) | |
| if err != nil { | |
| if err == jetstream.ErrKeyNotFound { | |
| return nil, false, nil | |
| } | |
| return nil, false, fmt.Errorf("failed to get session data: %w", err) | |
| } | |
| var sessionData []byte | |
| if err := cbor.Unmarshal(entry.Value(), &sessionData); err != nil { | |
| return nil, false, fmt.Errorf("failed to unmarshal session data: %w", err) | |
| } | |
| return sessionData, true, nil | |
| } | |
| func (s *JetStreamStore) Commit(token string, data []byte, expiry time.Time) error { | |
| ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) | |
| defer cancel() | |
| return s.CommitCtx(ctx, token, data, expiry) | |
| } | |
| func (s *JetStreamStore) CommitCtx(ctx context.Context, token string, data []byte, expiry time.Time) error { | |
| encodedData, err := cbor.Marshal(data) | |
| if err != nil { | |
| return fmt.Errorf("failed to marshal session data: %w", err) | |
| } | |
| ttl := time.Until(expiry) | |
| if ttl <= 0 { | |
| return fmt.Errorf("session expiry time is in the past") | |
| } | |
| _, err = s.kv.Create(ctx, token, encodedData, jetstream.KeyTTL(ttl)) | |
| if err != nil { | |
| if err == jetstream.ErrKeyExists { | |
| entry, getErr := s.kv.Get(ctx, token) | |
| if getErr != nil { | |
| return fmt.Errorf("failed to get existing session for update: %w", getErr) | |
| } | |
| _, err = s.kv.Update(ctx, token, encodedData, entry.Revision()) | |
| if err != nil { | |
| return fmt.Errorf("failed to update session: %w", err) | |
| } | |
| } else { | |
| return fmt.Errorf("failed to create session: %w", err) | |
| } | |
| } | |
| return nil | |
| } | |
| func (s *JetStreamStore) Delete(token string) error { | |
| ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) | |
| defer cancel() | |
| return s.DeleteCtx(ctx, token) | |
| } | |
| func (s *JetStreamStore) DeleteCtx(ctx context.Context, token string) error { | |
| err := s.kv.Purge(ctx, token) | |
| if err != nil && err != jetstream.ErrKeyNotFound { | |
| return fmt.Errorf("failed to delete session: %w", err) | |
| } | |
| return nil | |
| } | |
| func (s *JetStreamStore) AllCtx(ctx context.Context) (map[string][]byte, error) { | |
| lister, err := s.kv.ListKeys(ctx) | |
| if err != nil { | |
| return nil, fmt.Errorf("failed to list keys: %w", err) | |
| } | |
| results := make(map[string][]byte) | |
| for key := range lister.Keys() { | |
| entry, err := s.kv.Get(ctx, key) | |
| if err != nil { | |
| if err == jetstream.ErrKeyNotFound { | |
| continue | |
| } | |
| fmt.Printf("Error retrieving session %s: %v\n", key, err) | |
| continue | |
| } | |
| var sessionData []byte | |
| if err := cbor.Unmarshal(entry.Value(), &sessionData); err != nil { | |
| fmt.Printf("Error unmarshaling session %s: %v\n", key, err) | |
| continue | |
| } | |
| results[key] = sessionData | |
| } | |
| lister.Stop() | |
| return results, nil | |
| } | |
| func (s *JetStreamStore) Status(ctx context.Context) (jetstream.KeyValueStatus, error) { | |
| return s.kv.Status(ctx) | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment