Created
April 1, 2025 22:46
-
-
Save achal7/e12382a491009e1a2b39da895224ce37 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| module Orders.Service | |
| open Orders.Domain | |
| open Akka.Actor | |
| type OrderCommand = | |
| | CreateOrder of customerId: string * orderId: string | |
| | UpdateOrder of order: Order * status: OrderStatus | |
| | CancelOrder of order: Order | |
| | ConfirmOrder of id: OrderId | |
| type OrderQuery = | |
| | GetOrder of OrderId | |
| | GetAllOrders | |
| type IOrderService = | |
| abstract member Create: customerId: string * orderId: string -> Async<OrderProcessResult> | |
| abstract member Create2: Async<OrderProcessResult> | |
| abstract member Update: order: Order * status: OrderStatus -> Async<OrderProcessResult> | |
| abstract member Cancel: order: Order -> Async<OrderProcessResult> | |
| let makeOrderService (orderManagerRef: IActorRef) : IOrderService = | |
| { new IOrderService with | |
| member _.Create(customerId, orderId) = | |
| orderManagerRef.Ask<OrderProcessResult>(OrderCommand.CreateOrder(customerId, orderId)) | |
| |> Async.AwaitTask | |
| member _.Create2 = failwith "Not Implemented" | |
| member _.Update(order, status) = failwith "Not Implemented" | |
| member _.Cancel(order) = failwith "Not Implemented" } | |
| module OrdersFeature.Actor | |
| open Akka.FSharp | |
| open Orders.Domain | |
| open Orders.Service | |
| open Akka.Actor | |
| open Akka.FSharp | |
| open System.Collections.Concurrent | |
| open System | |
| open Akka.Persistence | |
| open Akka.Persistence.Journal | |
| open System.Text.Json | |
| open System.IO | |
| type OrderActor(orderId: string) as self = | |
| inherit PersistentActor() | |
| // Internal state for the Order. Initially, it's None. | |
| let mutable state: Order option = None | |
| let jsonFile = sprintf "order-%s.json" orderId | |
| let appendEvent (filePath: string) (event: 'T) = | |
| let options = JsonSerializerOptions(WriteIndented = false) | |
| let json = JsonSerializer.Serialize(event, options) | |
| // Append the JSON string followed by a newline. | |
| File.AppendAllText(filePath, json + Environment.NewLine) | |
| /// Update the actor’s state only if the result is successful. | |
| let updateState result = | |
| // Update the state if the result is Ok | |
| result |> Result.iter (fun order -> state <- Some order) | |
| override _.PersistenceId = sprintf "order-%s" orderId | |
| // ------------------------- | |
| // Recovery: Replay persisted events | |
| // ------------------------- | |
| override _.ReceiveRecover(msg: obj) = | |
| match msg with | |
| | :? OrderEvent as evt -> | |
| match evt with | |
| | OrderCreated order -> state <- Some order | |
| | OrderUpdated order -> state <- Some order | |
| | OrderCancelled orderId -> | |
| match state with | |
| | Some order when order.OrderId = orderId -> state <- None | |
| | _ -> failwith "Not Implemented" | |
| | OrderConfirmed orderId -> | |
| match state with | |
| | Some order when order.OrderId = orderId -> | |
| state <- | |
| Some | |
| { order with | |
| Status = OrderStatus.Delivered } | |
| | _ -> failwith "Not Implemented" | |
| true | |
| | :? SnapshotOffer as snapshotOffer -> | |
| match snapshotOffer.Snapshot with | |
| | :? Order as order -> | |
| state <- Some order | |
| true | |
| | _ -> false | |
| | :? RecoveryCompleted -> | |
| printfn $"Recovery completed for order {orderId}" | |
| true | |
| | _ -> false | |
| // ------------------------- | |
| // Command handling: Persist events in response to commands | |
| // ------------------------- | |
| override _.ReceiveCommand(msg: obj) = | |
| let sender = PersistentActor.Context.Sender | |
| match msg with | |
| | :? OrderCommand as command -> | |
| match command with | |
| | OrderCommand.CreateOrder(customerId, id) when id = orderId -> | |
| match state with | |
| | Some _ -> | |
| printfn "Already created" | |
| let result: OrderProcessResult = | |
| Error(OrderProcessErrors.ValidationError [ OrderIdAlreadyExists id ]) | |
| sender <! result | |
| false | |
| | None -> | |
| let result: OrderProcessResult = orderWorkflow (customerId, id) | |
| result | |
| |> Result.iter (fun order -> | |
| let evt = OrderCreated order //Tagged(OrderCreated order, [| "order" |]) | |
| self.Persist( | |
| evt, | |
| fun evt -> | |
| state <- Some order | |
| // Load the current events from file. | |
| let existingEvents = EventStorage.loadEvents<OrderEvent> jsonFile | |
| // Prepend the new event and save the list. | |
| let updatedEvents = evt :: existingEvents | |
| EventStorage.saveEvents jsonFile updatedEvents | |
| )) | |
| //self.Persist(taggedEvent, fun evt -> state <- Some order)) | |
| sender <! (result: OrderProcessResult) | |
| true | |
| | OrderCommand.UpdateOrder(order, status) when order.OrderId = orderId -> | |
| match state with | |
| | Some _ -> | |
| // If already created, reply with an error. | |
| sender | |
| <! Error(OrderProcessErrors.ValidationError [ OrderIdAlreadyExists orderId ]) | |
| false | |
| | None -> | |
| let result: OrderProcessResult = updateOrder (order, status) | |
| result | |
| |> Result.iter (fun order -> self.Persist(OrderUpdated order, fun evt -> state <- Some order)) | |
| sender <! result | |
| true | |
| | OrderCommand.CancelOrder(order) when order.OrderId = orderId -> | |
| match state with | |
| | Some _ -> | |
| let result = cancelOrder order | |
| result | |
| |> Result.iter (fun order -> | |
| self.Persist(OrderCancelled order.OrderId, fun evt -> state <- Some order)) | |
| sender <! result | |
| true | |
| | None -> | |
| sender <! Error(OrderProcessErrors.OrderNotFound orderId) | |
| false | |
| | OrderCommand.ConfirmOrder(id: OrderId) when id = orderId -> | |
| match state with | |
| | None -> | |
| sender <! Error(OrderProcessErrors.OrderNotFound orderId) | |
| false | |
| | Some order -> | |
| let result: OrderProcessResult = | |
| Ok( | |
| { order with | |
| Status = OrderStatus.Delivered } | |
| ) | |
| result | |
| |> Result.iter (fun ord -> | |
| let evt = OrderConfirmed ord.OrderId | |
| //self.Persist(OrderConfirmed ord.OrderId, fun evt -> state <- Some ord)) | |
| self.Persist( | |
| evt, | |
| fun event -> | |
| state <- Some order | |
| let existingEvents = EventStorage.loadEvents<OrderEvent> jsonFile | |
| let updatedEvents = evt :: existingEvents | |
| EventStorage.saveEvents jsonFile updatedEvents | |
| )) | |
| sender <! result | |
| true | |
| | _ -> | |
| sender <! Error(OrderProcessErrors.OrderNotFound orderId) | |
| false | |
| | :? OrderQuery as query -> | |
| match query with | |
| | GetOrder orderId -> | |
| let result: Result<Order, OrderProcessErrors> = | |
| match state with | |
| | Some(order: Order) -> Ok(order) | |
| | None -> Error(OrderProcessErrors.OrderNotFound orderId) | |
| sender <! result | |
| true | |
| | GetAllOrders -> | |
| sender <! Error(OrderProcessErrors.ApplicationError) | |
| false | |
| | _ -> | |
| printfn "Unknown message" | |
| false | |
| override _.Unhandled(message: obj) : unit = | |
| printfn $"Unhandled message {message}" | |
| base.Unhandled(message) | |
| static member Props(orderId: string) = | |
| Akka.Actor.Props.Create<OrderActor>(orderId) | |
| module OrdersFeature.OrderManager | |
| open Akka.FSharp | |
| open Orders.Domain | |
| open Orders.Service | |
| open Akka.Actor | |
| open System.Collections.Concurrent | |
| open OrdersFeature.Actor | |
| open OrdersFeature.Projection | |
| let OrderManager (mailbox: Actor<_>) = | |
| // Use a concurrent dictionary to store (or look up) child OrderActor references. | |
| let orderActors = ConcurrentDictionary<string, IActorRef>() | |
| //let log = Logging.GetLogger(mailbox.Context) | |
| // Define a supervision strategy for child actors. | |
| let childSupervisorStrategy = | |
| Strategy.OneForOne(fun ex -> | |
| printfn "Child actor failed with %A. Restarting child." ex | |
| Directive.Restart) | |
| let getOrCreateOrderActor (orderId: string) = | |
| orderActors.GetOrAdd( | |
| orderId, | |
| fun id -> | |
| // let child = | |
| // spawnOpt mailbox.Context $"order-{id}" (fun m -> new OrderPersistentActor(id) :> ActorBase) | |
| // [ SpawnOption.SupervisorStrategy(childSupervisorStrategy) ] | |
| let child = mailbox.Context.ActorOf(OrderActor.Props(id), $"order-{id}") | |
| mailbox.Context.Watch(child) |> ignore | |
| child | |
| ) | |
| // let projection = | |
| // mailbox.Context.ActorOf(Props.Create(fun () -> ProjectionActor()), "projection") | |
| let rec loop () = | |
| actor { | |
| let! (msg: obj) = mailbox.Receive() | |
| match msg with | |
| | :? OrderCommand as command -> | |
| match command with | |
| | OrderCommand.CreateOrder(customerId, orderId) -> | |
| // Route CreateOrder to the corresponding OrderActor. | |
| let actorRef = getOrCreateOrderActor orderId | |
| // Forward the message using the ask pattern. | |
| actorRef.Forward(msg) | |
| | OrderCommand.UpdateOrder(order, status) -> | |
| let actorRef = getOrCreateOrderActor order.OrderId | |
| actorRef.Forward(msg) | |
| | OrderCommand.CancelOrder(order) -> | |
| let actorRef = getOrCreateOrderActor order.OrderId | |
| actorRef.Forward(msg) | |
| | OrderCommand.ConfirmOrder(id: OrderId) -> | |
| let actorRef = getOrCreateOrderActor id | |
| actorRef.Forward(msg) | |
| | :? OrderQuery as query -> | |
| match query with | |
| | GetOrder orderId -> | |
| //let actorRef = getOrCreateOrderActor orderId | |
| let exists, actorRef = orderActors.TryGetValue(orderId) | |
| if exists then | |
| actorRef.Forward(msg) | |
| else | |
| mailbox.Sender() <! "Order not found" | |
| | GetAllOrders -> | |
| // for each actor in orderActors, send a GetOrder message and collect the results | |
| // let orders: (Result<Order, OrderProcessErrors> list) = | |
| // orderActors | |
| // |> Seq.map (fun kvp -> kvp.Key, kvp.Value) | |
| // |> Seq.map (fun (orderId, actorRef) -> actorRef <? GetOrder orderId) | |
| // |> Seq.toArray | |
| // |> Async.Parallel | |
| // |> Async.RunSynchronously | |
| // |> Array.toList | |
| //mailbox.Sender() <! orders | |
| //projection.Forward("GetAllOrders") | |
| //projection <! "GetAllOrders" | |
| let res: Order list = getAllOrders () | |
| mailbox.Sender() <! res | |
| // let res2: Order list = getAllOrders () | |
| // mailbox.Sender() <! res2 | |
| | :? string as orderId -> | |
| let actorRef = getOrCreateOrderActor orderId | |
| printfn "Stopping order actor %s" actorRef.Path.Name | |
| //mailbox.Context.Stop(actorRef) | |
| actorRef <! PoisonPill.Instance | |
| orderActors.TryRemove(orderId) |> ignore | |
| mailbox.Sender() <! "done" | |
| | _ -> printfn $"Manager received unknown message of type: {msg.GetType().FullName}" | |
| return! loop () | |
| } | |
| loop () | |
| namespace Orders | |
| open System.Text.Json | |
| open System.Text.Json.Serialization | |
| open FSharp.SystemTextJson | |
| module Domain = | |
| type OrderId = string | |
| type CustomerId = string | |
| [<JsonFSharpConverter>] | |
| type OrderStatus = | |
| | Pending | |
| | Processing | |
| | Shipped | |
| | Delivered | |
| | Cancelled | |
| type Order = | |
| { OrderId: OrderId | |
| CustomerId: CustomerId | |
| Status: OrderStatus } | |
| [<JsonFSharpConverter>] | |
| type OrderEvent = | |
| | OrderCreated of Order | |
| | OrderUpdated of Order | |
| | OrderCancelled of OrderId | |
| | OrderConfirmed of OrderId | |
| [<JsonFSharpConverter>] | |
| type ValidationError = | |
| | CustomerNotFound of string | |
| | OrderIdAlreadyExists of string | |
| | RuleViolation of string | |
| [<JsonFSharpConverter>] | |
| type OrderProcessErrors = | |
| | OrderNotFound of OrderId | |
| | ValidationError of ValidationError list | |
| | ApplicationError | |
| type OrderProcessResult = Result<Order, OrderProcessErrors> | |
| type ValidateOrder = string * string -> ValidationError list | |
| type CreateOrder = string * string -> ValidateOrder -> OrderProcessResult | |
| type UpdateOrder = Order * OrderStatus -> OrderProcessResult | |
| type CancelOrder = Order -> OrderProcessResult | |
| let validateOrder: ValidateOrder = | |
| fun (customerId, orderId) -> | |
| if customerId = "" then | |
| [ ValidationError.CustomerNotFound customerId ] | |
| elif orderId = "" then | |
| [ ValidationError.RuleViolation "Order Id is empty" ] | |
| else | |
| [] | |
| let createOrder: CreateOrder = | |
| fun (customerId, orderId) validate -> | |
| let validationErrors = validate (customerId, orderId) | |
| if validationErrors.IsEmpty then | |
| Ok | |
| { OrderId = orderId | |
| CustomerId = customerId | |
| Status = Pending } | |
| else | |
| Error(OrderProcessErrors.ValidationError validationErrors) | |
| let updateOrder: UpdateOrder = | |
| fun (order, status) -> | |
| if status = Shipped then | |
| Error(OrderProcessErrors.ValidationError [ ValidationError.RuleViolation "Order already shipped" ]) | |
| else | |
| Ok { order with Status = status } | |
| let cancelOrder: CancelOrder = | |
| fun (order) -> | |
| if order.Status = Cancelled then | |
| Error(OrderProcessErrors.ValidationError [ ValidationError.RuleViolation "Order already cancelled" ]) | |
| else | |
| Ok { order with Status = Cancelled } | |
| let orderWorkflow = | |
| fun (customerId, orderId) -> createOrder (customerId, orderId) validateOrder | |
| module OrdersFeature.Main | |
| open Akka.FSharp | |
| open Orders.Domain | |
| open Orders.Service | |
| open Akka.Actor | |
| open Akka.FSharp | |
| open System.Collections.Concurrent | |
| open System | |
| open Akka.Persistence | |
| open OrdersFeature.OrderManager | |
| open Akka.Configuration | |
| let printOrder (order: Order) = | |
| printfn "Order Details:" | |
| printfn " ID: %s" (OrderId order.OrderId) | |
| printfn " Customer: %s" (CustomerId order.CustomerId) | |
| printfn " Status: %A" order.Status | |
| printfn "" | |
| // Helper to handle and print error results | |
| let printError (error: OrderProcessErrors) = | |
| match error with | |
| | ValidationError errors -> | |
| printfn "Validation Errors:" | |
| errors |> List.iter (fun e -> printfn " %s" (e.ToString())) | |
| | OrderNotFound message -> printfn "Not Found: %s" message | |
| | ApplicationError -> printfn "Application Error" | |
| let mutable orderId: OrderId option = None | |
| // Helper to process and print results | |
| let handleResult (result: OrderProcessResult) = | |
| match result with | |
| | Ok order -> | |
| printOrder order | |
| orderId <- Some order.OrderId | |
| | Error _ -> printfn $"Error........" //printError error | |
| // Runs a full workflow demo for the order API | |
| let rec runOrderDemo (manager: IActorRef) = | |
| async { | |
| let customerId = "customer" + DateTime.Now.Ticks.ToString("x") | |
| printfn "=== Order API Demo ===" | |
| printfn "" | |
| printfn "==========================================" | |
| printfn "Select an option:" | |
| printfn "1. Create a new order" | |
| printfn "2. Get current order state" | |
| printfn "3. Confirm order" | |
| printfn "4. Get final order state" | |
| printfn "5. Try to get a non-existent order" | |
| printfn "6. List all orders" | |
| printfn "7. Exit" | |
| printfn "==========================================" | |
| printf "Enter your choice (1-7): " | |
| let input = Console.ReadLine() | |
| let mutable result = true | |
| match input with | |
| | "1" -> | |
| // Step 1: Create a new order | |
| printfn "Enter order id..." | |
| let orderId = Console.ReadLine() | |
| let! createResult = manager <? (OrderCommand.CreateOrder(customerId, orderId)) | |
| handleResult createResult | |
| | "2" -> | |
| // Step 2: Get the current order state | |
| match orderId with | |
| | Some orderId -> | |
| let! (getResult: Result<Order, OrderProcessErrors>) = manager <? (GetOrder orderId) | |
| handleResult getResult | |
| | None -> printfn "No order id found" | |
| | "3" -> | |
| //Step 3: Confirm the order | |
| match orderId with | |
| | Some orderId -> | |
| let! (confirmResult: OrderProcessResult) = manager <? (ConfirmOrder orderId) | |
| handleResult confirmResult | |
| | None -> printfn "No order id found" | |
| printfn "Confirming order..." | |
| | "4" -> | |
| // Step 4: Get the final order state | |
| match orderId with | |
| | Some orderId -> | |
| let! finalResult = manager <? (GetOrder orderId) | |
| handleResult finalResult | |
| | None -> printfn "No order id found" | |
| | "5" -> | |
| // Step 5: Try to get a non-existent order | |
| let input = Console.ReadLine() | |
| let! notFoundResult = manager <? (GetOrder input) | |
| handleResult notFoundResult | |
| | "6" -> | |
| // Step 6: List all orders | |
| // let! (listResult: (Result<Order, OrderProcessErrors> list)) = manager <? (GetAllOrders) | |
| // listResult |> List.iter (fun result -> handleResult result) | |
| //manager <! "GetAllOrders" | |
| let res: Order list = Projection.getAllOrders () | |
| res |> List.iter (fun order -> printOrder order) | |
| // let! (listResult: Order list) = manager <? "GetAllOrders" | |
| | "7" -> | |
| printfn "Exiting demo..." | |
| result <- false | |
| | _ -> printfn "Invalid choice. Please try again." | |
| return result | |
| } | |
| |> Async.RunSynchronously | |
| let createAkkaSystem () = | |
| // Create the actor system configuration with plugin classes specified | |
| let config = | |
| ConfigurationFactory.ParseString( | |
| """ | |
| akka.persistence.journal.plugin = "akka.persistence.journal.sql" | |
| akka.persistence.journal.sql.class = "Akka.Persistence.Sql.Journal.DefaultJournal, Akka.Persistence.Sql" | |
| akka { | |
| persistence { | |
| journal { | |
| plugin = "akka.persistence.journal.sql" | |
| sql { | |
| class = "Akka.Persistence.Sql.Journal.SqlWriteJournal, Akka.Persistence.Sql" | |
| connection-string = "Data Source=D:\\RnD\\SCP\\src\\Client\\SCP.Cons\\Data\\journal.db;Version=3;" | |
| provider-name = "SQLite" | |
| dialect = "sqlite" | |
| event-adapters { | |
| tagging = "OrdersFeature.Projection+OrderTaggingEventAdapter, SCP.Cons" | |
| } | |
| event-adapter-bindings { | |
| "Orders.Domain+OrderEvent, SCP.Cons" = tagging | |
| } | |
| } | |
| } | |
| snapshot-store { | |
| plugin = "akka.persistence.snapshot-store.sql" | |
| sql { | |
| class = "Akka.Persistence.Sql.Snapshot.SqlSnapshotStore, Akka.Persistence.Sql" | |
| connection-string = "Data Source=D:\\RnD\\SCP\\src\\Client\\SCP.Cons\\Data\\snapshots.db;Version=3;" | |
| provider-name = "SQLite" | |
| dialect = "sqlite" | |
| } | |
| } | |
| } | |
| persistence.query { | |
| journal.sql{ | |
| class = "Akka.Persistence.Sql.Query.SqlReadJournalProvider, Akka.Persistence.Sql" | |
| write-plugin = "akka.persistence.journal.sql" | |
| refresh-interval = 1s | |
| buffer-size = 500 | |
| } | |
| } | |
| } | |
| """ | |
| ) | |
| // Create a new actor system | |
| let system = ActorSystem.Create("ServerActorSystem", config) | |
| system | |
| let runDemo () = | |
| let system = createAkkaSystem () //ActorSystem.Create("OrdersSystem") | |
| let orderManagerRef = spawn system "orderManager" OrderManager | |
| Projection.startProjection system | |
| let mutable continueRunning = true | |
| //olddemo () | |
| while continueRunning do | |
| printfn "Press any key to continue..." | |
| Console.ReadKey() |> ignore | |
| Console.Clear() | |
| continueRunning <- runOrderDemo orderManagerRef | |
| // Helper to pretty print order information | |
| module OrdersFeature.Projection | |
| open Akka.Actor | |
| open Akka.FSharp | |
| open Akka.Persistence.Query | |
| open Akka.Streams | |
| open Akka.Streams.Dsl | |
| open System.Collections.Concurrent | |
| open Orders.Domain // Assumes your domain types (Order, OrderEvent, etc.) are defined here | |
| //open Akka.Persistence.Query.InMemory | |
| open Akka.Persistence.Journal | |
| open System.Collections.Immutable | |
| type OrderTaggingEventAdapter() = | |
| interface IEventAdapter with | |
| member this.Manifest(event: obj) = "" //event.GetType().AssemblyQualifiedName | |
| member this.ToJournal(event: obj) = | |
| let tags = ImmutableHashSet<string>.Empty.Add("order") | |
| new Tagged(event, tags) | |
| member this.FromJournal(event: obj, manifest: string) : IEventSequence = EventSequence.Single event | |
| type ProjectionActor() = | |
| inherit UntypedActor() | |
| let orderStates = ConcurrentDictionary<string, Order>() | |
| let materializer = ActorMaterializer.Create(ReceiveActor.Context.System) | |
| let handleEvent (envelope: EventEnvelope) = | |
| //printfn "!!! Received event----->: %A" envelope.Event | |
| match envelope.Event with | |
| | :? OrderEvent as orderEvent -> | |
| match orderEvent with | |
| | OrderCreated order -> orderStates.[order.OrderId] <- order | |
| | OrderUpdated order -> orderStates.[order.OrderId] <- order | |
| | OrderCancelled orderId -> orderStates.TryRemove(orderId) |> ignore | |
| | OrderConfirmed orderId -> | |
| match orderStates.TryGetValue(orderId) with | |
| | true, order -> | |
| orderStates.[orderId] <- | |
| { order with | |
| Status = OrderStatus.Delivered } | |
| | _ -> () | |
| | _ -> () | |
| override this.PreStart() = | |
| // let readJournal = | |
| // PersistenceQuery | |
| // .Get(ReceiveActor.Context.System) | |
| // .ReadJournalFor<InMemoryReadJournal>("akka.persistence.query.journal.inmem") | |
| let readJournal = | |
| PersistenceQuery | |
| .Get(ReceiveActor.Context.System) | |
| .ReadJournalFor<Akka.Persistence.Sql.Query.SqlReadJournal>("akka.persistence.journal.sql") | |
| let source = readJournal.EventsByTag("order", NoOffset.Instance) | |
| source.RunForeach(handleEvent, materializer) |> ignore | |
| override this.PostStop() = orderStates.Clear() | |
| override _.OnReceive message = | |
| //printfn "!!! Received message----->: %A" message | |
| let sender = ReceiveActor.Context.Sender | |
| // let res: Order list = orderStates.Values |> Seq.toList | |
| // printfn "Result: %A" res | |
| let result: Order list = [] | |
| match message with | |
| | :? string as command when command = "GetAllOrders" -> printfn "GetAllOrders...." //sender <! result | |
| | _ -> () | |
| // match envelope.Event with | |
| // | :? OrderEvent as orderEvent -> | |
| // // Update the read model based on the event | |
| // () | |
| // | _ -> () | |
| let orderStates = ConcurrentDictionary<string, Order>() | |
| let getAllOrders () = orderStates.Values |> Seq.toList | |
| let startProjection (system: ActorSystem) = | |
| let materializer = ActorMaterializer.Create(system) | |
| // let readJournal = | |
| // PersistenceQuery | |
| // .Get(system) | |
| // .ReadJournalFor<InMemoryReadJournal>("akka.persistence.query.journal.inmem") | |
| let readJournal = | |
| PersistenceQuery | |
| .Get(system) | |
| .ReadJournalFor<Akka.Persistence.Sql.Query.SqlReadJournal>("akka.persistence.query.journal.sql") | |
| let handleEvent (envelope: EventEnvelope) = | |
| match envelope.Event with | |
| | :? OrderEvent as orderEvent -> | |
| match orderEvent with | |
| | OrderCreated order -> orderStates.[order.OrderId] <- order | |
| | OrderUpdated order -> orderStates.[order.OrderId] <- order | |
| | OrderCancelled orderId -> orderStates.TryRemove(orderId) |> ignore | |
| | OrderConfirmed orderId -> | |
| match orderStates.TryGetValue(orderId) with | |
| | true, order -> | |
| orderStates.[orderId] <- | |
| { order with | |
| Status = OrderStatus.Delivered } | |
| | _ -> () | |
| | _ -> () | |
| let source = readJournal.EventsByTag("order", NoOffset.Instance) | |
| source.RunForeach(handleEvent, materializer) |> ignore | |
| let orderProjection (mailbox: Actor<_>) = | |
| // In-memory store for orders | |
| let orderStates = ConcurrentDictionary<string, Order>() | |
| // Explicitly annotate the type for the read journal. | |
| // let readJournal: Akka.Persistence.Query.MemoryReadJournal = | |
| // PersistenceQuery | |
| // .Get(mailbox.Context.System) | |
| // .GetReadJournal<Akka.Persistence.Query.MemoryReadJournal>("akka.persistence.query.journal.inmem") | |
| let readJournal = | |
| PersistenceQuery | |
| .Get(mailbox.Context.System) | |
| .ReadJournalFor<Akka.Persistence.Sql.Query.SqlReadJournal>("akka.persistence.journal.sql") | |
| // Use the read journal to get a source stream of events tagged "order". | |
| let source = readJournal.CurrentEventsByTag("order", NoOffset.Instance) | |
| // Create a materializer for running streams. | |
| let materializer = mailbox.Context.System.Materializer() | |
| // // Run the stream to update the in-memory state. | |
| // source.RunForeach( | |
| // (fun (envelope: Akka.Persistence.Query.EventEnvelope) -> | |
| // // Match the event based on its expected type. | |
| // match envelope.Event with | |
| // | :? OrderEvent as orderEvent -> | |
| // match orderEvent with | |
| // | OrderCreated order -> orderStates.[order.OrderId] <- order | |
| // | OrderUpdated order -> orderStates.[order.OrderId] <- order | |
| // | OrderCancelled orderId -> orderStates.TryRemove(orderId) |> ignore | |
| // | OrderConfirmed orderId -> | |
| // match orderStates.TryGetValue(orderId) with | |
| // | true, order -> | |
| // orderStates.[orderId] <- | |
| // { order with | |
| // Status = OrderStatus.Delivered } | |
| // | _ -> () | |
| // | _ -> ()), | |
| // materializer | |
| // ) | |
| // |> ignore | |
| source.RunForeach((fun envelope -> printfn "!!! Received event----->: %A" envelope.Event), materializer) | |
| |> ignore | |
| // Actor loop with explicit type annotation for the received message. | |
| let rec loop () = | |
| actor { | |
| let! (msg: obj) = mailbox.Receive() | |
| match msg with | |
| | :? string as command when command = "GetAllOrders" -> | |
| mailbox.Sender() <! (orderStates.Values |> Seq.toList) | |
| | _ -> () | |
| return! loop () | |
| } | |
| loop () |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment