Skip to content

Instantly share code, notes, and snippets.

@achal7
Created April 1, 2025 22:46
Show Gist options
  • Select an option

  • Save achal7/e12382a491009e1a2b39da895224ce37 to your computer and use it in GitHub Desktop.

Select an option

Save achal7/e12382a491009e1a2b39da895224ce37 to your computer and use it in GitHub Desktop.
module Orders.Service
open Orders.Domain
open Akka.Actor
type OrderCommand =
| CreateOrder of customerId: string * orderId: string
| UpdateOrder of order: Order * status: OrderStatus
| CancelOrder of order: Order
| ConfirmOrder of id: OrderId
type OrderQuery =
| GetOrder of OrderId
| GetAllOrders
type IOrderService =
abstract member Create: customerId: string * orderId: string -> Async<OrderProcessResult>
abstract member Create2: Async<OrderProcessResult>
abstract member Update: order: Order * status: OrderStatus -> Async<OrderProcessResult>
abstract member Cancel: order: Order -> Async<OrderProcessResult>
let makeOrderService (orderManagerRef: IActorRef) : IOrderService =
{ new IOrderService with
member _.Create(customerId, orderId) =
orderManagerRef.Ask<OrderProcessResult>(OrderCommand.CreateOrder(customerId, orderId))
|> Async.AwaitTask
member _.Create2 = failwith "Not Implemented"
member _.Update(order, status) = failwith "Not Implemented"
member _.Cancel(order) = failwith "Not Implemented" }
module OrdersFeature.Actor
open Akka.FSharp
open Orders.Domain
open Orders.Service
open Akka.Actor
open Akka.FSharp
open System.Collections.Concurrent
open System
open Akka.Persistence
open Akka.Persistence.Journal
open System.Text.Json
open System.IO
type OrderActor(orderId: string) as self =
inherit PersistentActor()
// Internal state for the Order. Initially, it's None.
let mutable state: Order option = None
let jsonFile = sprintf "order-%s.json" orderId
let appendEvent (filePath: string) (event: 'T) =
let options = JsonSerializerOptions(WriteIndented = false)
let json = JsonSerializer.Serialize(event, options)
// Append the JSON string followed by a newline.
File.AppendAllText(filePath, json + Environment.NewLine)
/// Update the actor’s state only if the result is successful.
let updateState result =
// Update the state if the result is Ok
result |> Result.iter (fun order -> state <- Some order)
override _.PersistenceId = sprintf "order-%s" orderId
// -------------------------
// Recovery: Replay persisted events
// -------------------------
override _.ReceiveRecover(msg: obj) =
match msg with
| :? OrderEvent as evt ->
match evt with
| OrderCreated order -> state <- Some order
| OrderUpdated order -> state <- Some order
| OrderCancelled orderId ->
match state with
| Some order when order.OrderId = orderId -> state <- None
| _ -> failwith "Not Implemented"
| OrderConfirmed orderId ->
match state with
| Some order when order.OrderId = orderId ->
state <-
Some
{ order with
Status = OrderStatus.Delivered }
| _ -> failwith "Not Implemented"
true
| :? SnapshotOffer as snapshotOffer ->
match snapshotOffer.Snapshot with
| :? Order as order ->
state <- Some order
true
| _ -> false
| :? RecoveryCompleted ->
printfn $"Recovery completed for order {orderId}"
true
| _ -> false
// -------------------------
// Command handling: Persist events in response to commands
// -------------------------
override _.ReceiveCommand(msg: obj) =
let sender = PersistentActor.Context.Sender
match msg with
| :? OrderCommand as command ->
match command with
| OrderCommand.CreateOrder(customerId, id) when id = orderId ->
match state with
| Some _ ->
printfn "Already created"
let result: OrderProcessResult =
Error(OrderProcessErrors.ValidationError [ OrderIdAlreadyExists id ])
sender <! result
false
| None ->
let result: OrderProcessResult = orderWorkflow (customerId, id)
result
|> Result.iter (fun order ->
let evt = OrderCreated order //Tagged(OrderCreated order, [| "order" |])
self.Persist(
evt,
fun evt ->
state <- Some order
// Load the current events from file.
let existingEvents = EventStorage.loadEvents<OrderEvent> jsonFile
// Prepend the new event and save the list.
let updatedEvents = evt :: existingEvents
EventStorage.saveEvents jsonFile updatedEvents
))
//self.Persist(taggedEvent, fun evt -> state <- Some order))
sender <! (result: OrderProcessResult)
true
| OrderCommand.UpdateOrder(order, status) when order.OrderId = orderId ->
match state with
| Some _ ->
// If already created, reply with an error.
sender
<! Error(OrderProcessErrors.ValidationError [ OrderIdAlreadyExists orderId ])
false
| None ->
let result: OrderProcessResult = updateOrder (order, status)
result
|> Result.iter (fun order -> self.Persist(OrderUpdated order, fun evt -> state <- Some order))
sender <! result
true
| OrderCommand.CancelOrder(order) when order.OrderId = orderId ->
match state with
| Some _ ->
let result = cancelOrder order
result
|> Result.iter (fun order ->
self.Persist(OrderCancelled order.OrderId, fun evt -> state <- Some order))
sender <! result
true
| None ->
sender <! Error(OrderProcessErrors.OrderNotFound orderId)
false
| OrderCommand.ConfirmOrder(id: OrderId) when id = orderId ->
match state with
| None ->
sender <! Error(OrderProcessErrors.OrderNotFound orderId)
false
| Some order ->
let result: OrderProcessResult =
Ok(
{ order with
Status = OrderStatus.Delivered }
)
result
|> Result.iter (fun ord ->
let evt = OrderConfirmed ord.OrderId
//self.Persist(OrderConfirmed ord.OrderId, fun evt -> state <- Some ord))
self.Persist(
evt,
fun event ->
state <- Some order
let existingEvents = EventStorage.loadEvents<OrderEvent> jsonFile
let updatedEvents = evt :: existingEvents
EventStorage.saveEvents jsonFile updatedEvents
))
sender <! result
true
| _ ->
sender <! Error(OrderProcessErrors.OrderNotFound orderId)
false
| :? OrderQuery as query ->
match query with
| GetOrder orderId ->
let result: Result<Order, OrderProcessErrors> =
match state with
| Some(order: Order) -> Ok(order)
| None -> Error(OrderProcessErrors.OrderNotFound orderId)
sender <! result
true
| GetAllOrders ->
sender <! Error(OrderProcessErrors.ApplicationError)
false
| _ ->
printfn "Unknown message"
false
override _.Unhandled(message: obj) : unit =
printfn $"Unhandled message {message}"
base.Unhandled(message)
static member Props(orderId: string) =
Akka.Actor.Props.Create<OrderActor>(orderId)
module OrdersFeature.OrderManager
open Akka.FSharp
open Orders.Domain
open Orders.Service
open Akka.Actor
open System.Collections.Concurrent
open OrdersFeature.Actor
open OrdersFeature.Projection
let OrderManager (mailbox: Actor<_>) =
// Use a concurrent dictionary to store (or look up) child OrderActor references.
let orderActors = ConcurrentDictionary<string, IActorRef>()
//let log = Logging.GetLogger(mailbox.Context)
// Define a supervision strategy for child actors.
let childSupervisorStrategy =
Strategy.OneForOne(fun ex ->
printfn "Child actor failed with %A. Restarting child." ex
Directive.Restart)
let getOrCreateOrderActor (orderId: string) =
orderActors.GetOrAdd(
orderId,
fun id ->
// let child =
// spawnOpt mailbox.Context $"order-{id}" (fun m -> new OrderPersistentActor(id) :> ActorBase)
// [ SpawnOption.SupervisorStrategy(childSupervisorStrategy) ]
let child = mailbox.Context.ActorOf(OrderActor.Props(id), $"order-{id}")
mailbox.Context.Watch(child) |> ignore
child
)
// let projection =
// mailbox.Context.ActorOf(Props.Create(fun () -> ProjectionActor()), "projection")
let rec loop () =
actor {
let! (msg: obj) = mailbox.Receive()
match msg with
| :? OrderCommand as command ->
match command with
| OrderCommand.CreateOrder(customerId, orderId) ->
// Route CreateOrder to the corresponding OrderActor.
let actorRef = getOrCreateOrderActor orderId
// Forward the message using the ask pattern.
actorRef.Forward(msg)
| OrderCommand.UpdateOrder(order, status) ->
let actorRef = getOrCreateOrderActor order.OrderId
actorRef.Forward(msg)
| OrderCommand.CancelOrder(order) ->
let actorRef = getOrCreateOrderActor order.OrderId
actorRef.Forward(msg)
| OrderCommand.ConfirmOrder(id: OrderId) ->
let actorRef = getOrCreateOrderActor id
actorRef.Forward(msg)
| :? OrderQuery as query ->
match query with
| GetOrder orderId ->
//let actorRef = getOrCreateOrderActor orderId
let exists, actorRef = orderActors.TryGetValue(orderId)
if exists then
actorRef.Forward(msg)
else
mailbox.Sender() <! "Order not found"
| GetAllOrders ->
// for each actor in orderActors, send a GetOrder message and collect the results
// let orders: (Result<Order, OrderProcessErrors> list) =
// orderActors
// |> Seq.map (fun kvp -> kvp.Key, kvp.Value)
// |> Seq.map (fun (orderId, actorRef) -> actorRef <? GetOrder orderId)
// |> Seq.toArray
// |> Async.Parallel
// |> Async.RunSynchronously
// |> Array.toList
//mailbox.Sender() <! orders
//projection.Forward("GetAllOrders")
//projection <! "GetAllOrders"
let res: Order list = getAllOrders ()
mailbox.Sender() <! res
// let res2: Order list = getAllOrders ()
// mailbox.Sender() <! res2
| :? string as orderId ->
let actorRef = getOrCreateOrderActor orderId
printfn "Stopping order actor %s" actorRef.Path.Name
//mailbox.Context.Stop(actorRef)
actorRef <! PoisonPill.Instance
orderActors.TryRemove(orderId) |> ignore
mailbox.Sender() <! "done"
| _ -> printfn $"Manager received unknown message of type: {msg.GetType().FullName}"
return! loop ()
}
loop ()
namespace Orders
open System.Text.Json
open System.Text.Json.Serialization
open FSharp.SystemTextJson
module Domain =
type OrderId = string
type CustomerId = string
[<JsonFSharpConverter>]
type OrderStatus =
| Pending
| Processing
| Shipped
| Delivered
| Cancelled
type Order =
{ OrderId: OrderId
CustomerId: CustomerId
Status: OrderStatus }
[<JsonFSharpConverter>]
type OrderEvent =
| OrderCreated of Order
| OrderUpdated of Order
| OrderCancelled of OrderId
| OrderConfirmed of OrderId
[<JsonFSharpConverter>]
type ValidationError =
| CustomerNotFound of string
| OrderIdAlreadyExists of string
| RuleViolation of string
[<JsonFSharpConverter>]
type OrderProcessErrors =
| OrderNotFound of OrderId
| ValidationError of ValidationError list
| ApplicationError
type OrderProcessResult = Result<Order, OrderProcessErrors>
type ValidateOrder = string * string -> ValidationError list
type CreateOrder = string * string -> ValidateOrder -> OrderProcessResult
type UpdateOrder = Order * OrderStatus -> OrderProcessResult
type CancelOrder = Order -> OrderProcessResult
let validateOrder: ValidateOrder =
fun (customerId, orderId) ->
if customerId = "" then
[ ValidationError.CustomerNotFound customerId ]
elif orderId = "" then
[ ValidationError.RuleViolation "Order Id is empty" ]
else
[]
let createOrder: CreateOrder =
fun (customerId, orderId) validate ->
let validationErrors = validate (customerId, orderId)
if validationErrors.IsEmpty then
Ok
{ OrderId = orderId
CustomerId = customerId
Status = Pending }
else
Error(OrderProcessErrors.ValidationError validationErrors)
let updateOrder: UpdateOrder =
fun (order, status) ->
if status = Shipped then
Error(OrderProcessErrors.ValidationError [ ValidationError.RuleViolation "Order already shipped" ])
else
Ok { order with Status = status }
let cancelOrder: CancelOrder =
fun (order) ->
if order.Status = Cancelled then
Error(OrderProcessErrors.ValidationError [ ValidationError.RuleViolation "Order already cancelled" ])
else
Ok { order with Status = Cancelled }
let orderWorkflow =
fun (customerId, orderId) -> createOrder (customerId, orderId) validateOrder
module OrdersFeature.Main
open Akka.FSharp
open Orders.Domain
open Orders.Service
open Akka.Actor
open Akka.FSharp
open System.Collections.Concurrent
open System
open Akka.Persistence
open OrdersFeature.OrderManager
open Akka.Configuration
let printOrder (order: Order) =
printfn "Order Details:"
printfn " ID: %s" (OrderId order.OrderId)
printfn " Customer: %s" (CustomerId order.CustomerId)
printfn " Status: %A" order.Status
printfn ""
// Helper to handle and print error results
let printError (error: OrderProcessErrors) =
match error with
| ValidationError errors ->
printfn "Validation Errors:"
errors |> List.iter (fun e -> printfn " %s" (e.ToString()))
| OrderNotFound message -> printfn "Not Found: %s" message
| ApplicationError -> printfn "Application Error"
let mutable orderId: OrderId option = None
// Helper to process and print results
let handleResult (result: OrderProcessResult) =
match result with
| Ok order ->
printOrder order
orderId <- Some order.OrderId
| Error _ -> printfn $"Error........" //printError error
// Runs a full workflow demo for the order API
let rec runOrderDemo (manager: IActorRef) =
async {
let customerId = "customer" + DateTime.Now.Ticks.ToString("x")
printfn "=== Order API Demo ==="
printfn ""
printfn "=========================================="
printfn "Select an option:"
printfn "1. Create a new order"
printfn "2. Get current order state"
printfn "3. Confirm order"
printfn "4. Get final order state"
printfn "5. Try to get a non-existent order"
printfn "6. List all orders"
printfn "7. Exit"
printfn "=========================================="
printf "Enter your choice (1-7): "
let input = Console.ReadLine()
let mutable result = true
match input with
| "1" ->
// Step 1: Create a new order
printfn "Enter order id..."
let orderId = Console.ReadLine()
let! createResult = manager <? (OrderCommand.CreateOrder(customerId, orderId))
handleResult createResult
| "2" ->
// Step 2: Get the current order state
match orderId with
| Some orderId ->
let! (getResult: Result<Order, OrderProcessErrors>) = manager <? (GetOrder orderId)
handleResult getResult
| None -> printfn "No order id found"
| "3" ->
//Step 3: Confirm the order
match orderId with
| Some orderId ->
let! (confirmResult: OrderProcessResult) = manager <? (ConfirmOrder orderId)
handleResult confirmResult
| None -> printfn "No order id found"
printfn "Confirming order..."
| "4" ->
// Step 4: Get the final order state
match orderId with
| Some orderId ->
let! finalResult = manager <? (GetOrder orderId)
handleResult finalResult
| None -> printfn "No order id found"
| "5" ->
// Step 5: Try to get a non-existent order
let input = Console.ReadLine()
let! notFoundResult = manager <? (GetOrder input)
handleResult notFoundResult
| "6" ->
// Step 6: List all orders
// let! (listResult: (Result<Order, OrderProcessErrors> list)) = manager <? (GetAllOrders)
// listResult |> List.iter (fun result -> handleResult result)
//manager <! "GetAllOrders"
let res: Order list = Projection.getAllOrders ()
res |> List.iter (fun order -> printOrder order)
// let! (listResult: Order list) = manager <? "GetAllOrders"
| "7" ->
printfn "Exiting demo..."
result <- false
| _ -> printfn "Invalid choice. Please try again."
return result
}
|> Async.RunSynchronously
let createAkkaSystem () =
// Create the actor system configuration with plugin classes specified
let config =
ConfigurationFactory.ParseString(
"""
akka.persistence.journal.plugin = "akka.persistence.journal.sql"
akka.persistence.journal.sql.class = "Akka.Persistence.Sql.Journal.DefaultJournal, Akka.Persistence.Sql"
akka {
persistence {
journal {
plugin = "akka.persistence.journal.sql"
sql {
class = "Akka.Persistence.Sql.Journal.SqlWriteJournal, Akka.Persistence.Sql"
connection-string = "Data Source=D:\\RnD\\SCP\\src\\Client\\SCP.Cons\\Data\\journal.db;Version=3;"
provider-name = "SQLite"
dialect = "sqlite"
event-adapters {
tagging = "OrdersFeature.Projection+OrderTaggingEventAdapter, SCP.Cons"
}
event-adapter-bindings {
"Orders.Domain+OrderEvent, SCP.Cons" = tagging
}
}
}
snapshot-store {
plugin = "akka.persistence.snapshot-store.sql"
sql {
class = "Akka.Persistence.Sql.Snapshot.SqlSnapshotStore, Akka.Persistence.Sql"
connection-string = "Data Source=D:\\RnD\\SCP\\src\\Client\\SCP.Cons\\Data\\snapshots.db;Version=3;"
provider-name = "SQLite"
dialect = "sqlite"
}
}
}
persistence.query {
journal.sql{
class = "Akka.Persistence.Sql.Query.SqlReadJournalProvider, Akka.Persistence.Sql"
write-plugin = "akka.persistence.journal.sql"
refresh-interval = 1s
buffer-size = 500
}
}
}
"""
)
// Create a new actor system
let system = ActorSystem.Create("ServerActorSystem", config)
system
let runDemo () =
let system = createAkkaSystem () //ActorSystem.Create("OrdersSystem")
let orderManagerRef = spawn system "orderManager" OrderManager
Projection.startProjection system
let mutable continueRunning = true
//olddemo ()
while continueRunning do
printfn "Press any key to continue..."
Console.ReadKey() |> ignore
Console.Clear()
continueRunning <- runOrderDemo orderManagerRef
// Helper to pretty print order information
module OrdersFeature.Projection
open Akka.Actor
open Akka.FSharp
open Akka.Persistence.Query
open Akka.Streams
open Akka.Streams.Dsl
open System.Collections.Concurrent
open Orders.Domain // Assumes your domain types (Order, OrderEvent, etc.) are defined here
//open Akka.Persistence.Query.InMemory
open Akka.Persistence.Journal
open System.Collections.Immutable
type OrderTaggingEventAdapter() =
interface IEventAdapter with
member this.Manifest(event: obj) = "" //event.GetType().AssemblyQualifiedName
member this.ToJournal(event: obj) =
let tags = ImmutableHashSet<string>.Empty.Add("order")
new Tagged(event, tags)
member this.FromJournal(event: obj, manifest: string) : IEventSequence = EventSequence.Single event
type ProjectionActor() =
inherit UntypedActor()
let orderStates = ConcurrentDictionary<string, Order>()
let materializer = ActorMaterializer.Create(ReceiveActor.Context.System)
let handleEvent (envelope: EventEnvelope) =
//printfn "!!! Received event----->: %A" envelope.Event
match envelope.Event with
| :? OrderEvent as orderEvent ->
match orderEvent with
| OrderCreated order -> orderStates.[order.OrderId] <- order
| OrderUpdated order -> orderStates.[order.OrderId] <- order
| OrderCancelled orderId -> orderStates.TryRemove(orderId) |> ignore
| OrderConfirmed orderId ->
match orderStates.TryGetValue(orderId) with
| true, order ->
orderStates.[orderId] <-
{ order with
Status = OrderStatus.Delivered }
| _ -> ()
| _ -> ()
override this.PreStart() =
// let readJournal =
// PersistenceQuery
// .Get(ReceiveActor.Context.System)
// .ReadJournalFor<InMemoryReadJournal>("akka.persistence.query.journal.inmem")
let readJournal =
PersistenceQuery
.Get(ReceiveActor.Context.System)
.ReadJournalFor<Akka.Persistence.Sql.Query.SqlReadJournal>("akka.persistence.journal.sql")
let source = readJournal.EventsByTag("order", NoOffset.Instance)
source.RunForeach(handleEvent, materializer) |> ignore
override this.PostStop() = orderStates.Clear()
override _.OnReceive message =
//printfn "!!! Received message----->: %A" message
let sender = ReceiveActor.Context.Sender
// let res: Order list = orderStates.Values |> Seq.toList
// printfn "Result: %A" res
let result: Order list = []
match message with
| :? string as command when command = "GetAllOrders" -> printfn "GetAllOrders...." //sender <! result
| _ -> ()
// match envelope.Event with
// | :? OrderEvent as orderEvent ->
// // Update the read model based on the event
// ()
// | _ -> ()
let orderStates = ConcurrentDictionary<string, Order>()
let getAllOrders () = orderStates.Values |> Seq.toList
let startProjection (system: ActorSystem) =
let materializer = ActorMaterializer.Create(system)
// let readJournal =
// PersistenceQuery
// .Get(system)
// .ReadJournalFor<InMemoryReadJournal>("akka.persistence.query.journal.inmem")
let readJournal =
PersistenceQuery
.Get(system)
.ReadJournalFor<Akka.Persistence.Sql.Query.SqlReadJournal>("akka.persistence.query.journal.sql")
let handleEvent (envelope: EventEnvelope) =
match envelope.Event with
| :? OrderEvent as orderEvent ->
match orderEvent with
| OrderCreated order -> orderStates.[order.OrderId] <- order
| OrderUpdated order -> orderStates.[order.OrderId] <- order
| OrderCancelled orderId -> orderStates.TryRemove(orderId) |> ignore
| OrderConfirmed orderId ->
match orderStates.TryGetValue(orderId) with
| true, order ->
orderStates.[orderId] <-
{ order with
Status = OrderStatus.Delivered }
| _ -> ()
| _ -> ()
let source = readJournal.EventsByTag("order", NoOffset.Instance)
source.RunForeach(handleEvent, materializer) |> ignore
let orderProjection (mailbox: Actor<_>) =
// In-memory store for orders
let orderStates = ConcurrentDictionary<string, Order>()
// Explicitly annotate the type for the read journal.
// let readJournal: Akka.Persistence.Query.MemoryReadJournal =
// PersistenceQuery
// .Get(mailbox.Context.System)
// .GetReadJournal<Akka.Persistence.Query.MemoryReadJournal>("akka.persistence.query.journal.inmem")
let readJournal =
PersistenceQuery
.Get(mailbox.Context.System)
.ReadJournalFor<Akka.Persistence.Sql.Query.SqlReadJournal>("akka.persistence.journal.sql")
// Use the read journal to get a source stream of events tagged "order".
let source = readJournal.CurrentEventsByTag("order", NoOffset.Instance)
// Create a materializer for running streams.
let materializer = mailbox.Context.System.Materializer()
// // Run the stream to update the in-memory state.
// source.RunForeach(
// (fun (envelope: Akka.Persistence.Query.EventEnvelope) ->
// // Match the event based on its expected type.
// match envelope.Event with
// | :? OrderEvent as orderEvent ->
// match orderEvent with
// | OrderCreated order -> orderStates.[order.OrderId] <- order
// | OrderUpdated order -> orderStates.[order.OrderId] <- order
// | OrderCancelled orderId -> orderStates.TryRemove(orderId) |> ignore
// | OrderConfirmed orderId ->
// match orderStates.TryGetValue(orderId) with
// | true, order ->
// orderStates.[orderId] <-
// { order with
// Status = OrderStatus.Delivered }
// | _ -> ()
// | _ -> ()),
// materializer
// )
// |> ignore
source.RunForeach((fun envelope -> printfn "!!! Received event----->: %A" envelope.Event), materializer)
|> ignore
// Actor loop with explicit type annotation for the received message.
let rec loop () =
actor {
let! (msg: obj) = mailbox.Receive()
match msg with
| :? string as command when command = "GetAllOrders" ->
mailbox.Sender() <! (orderStates.Values |> Seq.toList)
| _ -> ()
return! loop ()
}
loop ()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment