Last active
May 25, 2017 09:53
-
-
Save denistex/56fdd22a890be0559b69e84e4afa46a2 to your computer and use it in GitHub Desktop.
Concise content of "Akka in Action" book
- Scale up - more resources (CPU).
- Scale out - more servers in a cluster.
- Traditional scaling (thread, locks, RPC, etc.) is complex and inflexible.
- Actors - programming model for scaling up and out.
- Akka is centered on actors. Reactive Manifesto - base ideas of Akka.
- Actors executed asynchronously.
- Actors can receive and send messages. Messages are immutable.
- No type safety (yet). Any message can be send to the actor.
- Actor operations:
- send: sending messages;
- create: creating child actors;
- become: moving between states like state machines;
- supervise: keeping track of child actors.
- Actors decoupled on three axes:
- space/location: no expectation about where other components are located;
- time: no expectation about when the work will be done;
- interface: no expectation about what messages other components can understand.
ActorSystem- container for all actors.- New actor is created by
ActorSystem.actorOfwithPropsobject passed as argument. Propsobject describes how the actor should be created. It eventually calls the actor constructor.ActorSystemreturnsActorReffor created actor (notActoritself).ActorRefis used to send messages to the actor.- Every actor has a name (unique per level in the hierarchy).
ActorPath- path to the actor in the hierarchy (like URL). Path can be relative or absolute.- Every actor has a mailbox - messages queue.
- Dispatcher pushes down the mailbox, make the actor to process next message.
- Example projects repo.
- You have to be familiar with sbt.
- httpie is good tool to test HTTP servers.
- Actor messages usually bundled together in the companion object.
Actor.sender()is used to send reply to the message sender.ActorRef.forward(message)is used to forward the message (original sender of the message is not changed).- Actors use
ActorContext.actorOf(props, name)instead ofActorSystem.actorOf(props, name)to create child actors. ActorContext.childrenandActorContext.child(name)returns children of the actor.- Heroku can be used to deploy actors applications.
git pushis used to deploy the project to Heroku.
- Actors unit testing can be:
- synchronous or asynchronous;
- single-threaded, multithreaded or multiple JVM.
- Messages can be:
- one-way: fire and forget;
- two-way: request-response.
- Three variations of actor can be tested:
- silent actor: changes the state on new message, but doesn't send any messages or producing any side effects; changes are not directly observable from the outside;
- sending actor: send message(s) to other actors; this includes: mutating copy actor, forwarding actor, transforming actor, filtering actor and sequencing actor;
- side effecting actor: produces side-effects on new message (writes to log, for example).
- Test classes should extend
TestKit. TestKit.testActoris internal actor that can be used as receiver in test environment.TestKitprovides method to assert messages received bytestActor:expectMsg,expectMsgPF,expectNoMsg,receiveWhileandignoreMsg.TestProbeclass can be used to work with several test actors.TestProbecan be instantiated, no need to extend it.- Silent actors in single thread are tested using
TestActorRef[SilentActor]that provides direct access to the actor and its state withunderlyingActor. - Silent actors must support
GetStatemessage to be able for testing in multithreaded environment. - Sending actors are tested using
TestKitmethod (expectMsg,ignoreMsg, etc.). - Side effecting actors are hard to test and usually it's better to add some extra functional to the actor and make it sending message to the optional receiver with side effect data. After that side effecting actors can be tested like sending ones.
- Test class can extend
ImplicitSenderto implicitly replacesender()ref withtestActor. Can be useful when testing two-way messages.
- Let-it-crash principle.
- Fault avoidance strategies:
- isolation: isolate failed component;
- redundancy: backup components existence;
- replacement: failed component can be easily replaced with backup;
- reboot: failed component can be restarted;
- suspend: calls to failed component should be suspended util backup is ready to process them;
- separation of concerns: fault-recovery code separated from the normall processing.
- Two separate flows of application:
- normal logic: regular actors;
- fault-recovery: supervisors.
- Two states of an actor:
- started;
- terminated.
- Three events:
- start;
- stop;
- restart.
- An actor can be stopped using
ActorSystem.stop(actor),ActorContext.stop(actor)or by sendingPoisonPillmessage to it. - Four hooks:
preStart,postStop,preRestart(optionally callspostStop) andpostRestart(optionally callspreStart). - Failing message passed to
preRestarthook as a parameter. - Restarted actor replaces crashed one,
ActorRefautomatically switches to the new instance. - Stopped actor doesn't process messages and its
ActorRefswitches to specialdeadLettersActorRef. - Actor can monitor any other (not only its child) with
ActorContext.watch(actor). In this case monitor receiveTerminatedmessage if monitored actor is stopped. - Monitor is not receive any messages if monitored actor is restarted.
- User space - supervisor hierarchy under the
/useractor path. - Most dangerous actors should be as low down the hierarchy as possible.
- Two way to define a hierarchy of supervisors:
- one supervisor creates all actors in the application and supervises them (only restart of actors can be used);
- parent actor supervises its children and decide what to do with crashed child.
- Two predefined supervisor strategies:
- default: stops actor when it's failed to initialize or was killed, restarts in other cases;
- stopping: stops actor on every exception.
- Predefined strategies catch
Exceptioninstances only (notThrowable). - Unhandled exceptions automatically escalades to parent of the supervisor.
- Four decisions are available for a supervisor:
- restart: child will be recreated from its props; the failing message is removed from the mailbox (can be reprocessed in restart hooks);
- resume: error is ignored, same actor instance continues to process messages;
- stop: child will be terminated, message processign stopped;
- escalade: the problem will be escaladed to the parent of the supervisor.
- Two strategies available for supervisor decisions:
OneForOneStrategy: applies the decision to the crashed child only;AllForOneStrategy: applies the decision to all children of the supervisor.
- Both
OneForOneStrategyandAllForOneStrategyhavemaxNrOfRetriesandwithinTimeRangeparameters.
- A future makes it possible to process the result of a function without ever waiting in the current thread for the result.
- A future is read-only placeholder for a function result that will be available at some point in the future.
- Futures can be combined with other futures in many ways.
- Futures and actors can be used together.
- Scala futures is not a wrapper around Plain Old Java Futures (
java.util.concurrent.Future). Future.apply(block)executesblockon another thread.Future.foreachasynchronously processes the future result when it becomes available.Future.mapandFuture.flatMapcall a passed function when the future contains a successfull result and returns new Future.- Implicit
ExecutionContextmust be provided to use futures.scala.concurrent.Implicits.globalis a global execution context. - The dispatcher of an actor system can be used as an
ExecutionContext. Better than the global one. Promiseis write side of the Future/Promise model.- A promise can only be completed once. (
IllegalStateExceptionis thrown). DefaultPromise[T]extends bothFuture[T]andPromise[T], thread-safe.Future.onCompletedreceivesTrythat can beSuccessorFailure.Trysupports pattern matching.- Fatal exceptions (like
OutOfMemoryError) never handled by a future. They are thrown straight through the JVM. Future.onFailuremethod can be used instead ofFuture.onCompletedif only exceptions has to be processed.Future.recoverandFuture.recoverWithmethods are used to provide default future value in case the future is failed.- Code block passed to
Future.recoverandFuture.recoverWithmethods executed synchronously after the error has been returned. Keep this block simple. Future.firstCompletedOfreturns first completed future (successed or failed).Future.findcan be used to find first successed future.Future.zipcombines two futures and returns a future of tuple.- For comprehension can be used instead of
Future.map. Future.sequenceconverts sequence of futures to future of sequence (see alsoFuture.traverse).Future.foldcan be used to collect data from sequence of futures.akka.pattern.askreturns a future that wraps actor answer.akka.pattern.pipesends message to an actor when it become available in the future.- Calling
sender()from the future body is not safe due to mutable nature of actors. Capture it invalor usepipe. - The value contained in the future should be immutable to avoid sharing of mutable state.
- Node - a running application that communicates across the network.
- A node has a specific role in the distributed system.
- A node uses a specific network transport protocol (TCP, UDP) to communicate with other nodes.
- Messages between nodes are encoded into network-specific protocol data units.
- Messages need to be translated to and from bytes, respectively known as serialization and deserialization.
- When nodes are part of the same distributed system, they share a group membership.
- Membership can be static or dynamic or a mix of both.
- Some kind of discovery mechanizm needed to support nodes in a dynamic membership.
- Common network topologies:
- centralized/local;
- client-server;
- star;
- ring;
- peer-to-peer/mesh;
- tree.
- Local programming differs from distributed one in four important areas:
- latency;
- partial failure;
- memory access;
- concurrency.
- Akka provides distributed model both for local and distributed programming.
- Two ways to get a reference to a remote actor: lookup and deploy.
- Akka-remote module should be configured in
src/main/resources/application.conf(transports, host, port). ActorSystem.actorSelectionis used to lookup remote actors.- Java serialization should not be used. Akka will log a warning.
ActorSystem.actorForis deprecated.RemoteLookupProxyis used to lookup remote actor and keep correctActorRefto it.- Built-in
Identifymessage andActorIdentityreply are used to getActorRefto remote actor by path. - Two ways to deploy an actor remotely:
- through configuration: no code changes needed;
- programmatically:
Props.withDeploy(Deploy(scope = RemoteScope(AddressFromURIString(uri)))).
RemoteBoxOfficeForwarderis used to deploy remote actor (through configuration), watch it, keep actualActorRefand forward (proxy) messages to it.RemoteLookupProxyandRemoteBoxOfficeForwarderlost messages during reconnect to the crashed remote actor.- To use multi-JVM testing:
sbt-multi-jvmplugin should be registered in theproject/plugins.sbtfile;- multi-JVM configuration file should be added into the
project/dir; - node roles should be defined in object that extends
MultiNodeConfigclass; - test class should extend
MultiNodeSpec,MultiNodeSpecCallbacksandImplicitSender.
MultiNodeSpec.enterBarrieris used to synchronize nodes in test.
- The
ConfigFactoryis used to get the configuration. - Following configuration formats are supported:
application.properties: the Java property file format;application.json: the JSON style;application.conf: the HOCON format;
Config.getConfig(path)is used to get subtree of the config.- Variables substitution is allowed in the config with familiar syntax:
connectstr = "jdbc:mysql://${hostname}/mydata". - System properties or environment variables allowed in the config:
hostname = ${?HOST_NAME}. - Following values for the same key overrides previous value (if the new value is not empty).
- Exception is thrown when trying to get value that isn't set before.
- The
reference.conffile contains default values for the configuration. - The configuration library will find all the
reference.conffiles in all applicaiton components and integrate them into the configuration fallback structure. - Following configurations are used by default (upper configurations overrides lower ones):
- system properties;
application.conf;application.json;application.properties;reference.conf.
ConfigFactory.load("myapp")will loadmyapp.{conf,json,properties}instead ofapplication.{...}.- Config file name can be overriden with Java system properties (name should include the extension):
config.resource: resource name;config.file: file path;config.url: file URL.
Configobject can be passed toActorSystemas a second parameter:ActorSystem("front", config).ActorSystemusesConfigFactory.load()by default (loadsapplication.{conf,json,properties}or config that specified in Java system properties).- Loaded config can be accessed through
ActorSystem:actorSystem.settings.config. - Configuration file can include other configs with
include "baseConfig". - Lifting a configuration can be implemented using
withFallbackmethod:child.withFallback(parent). - Akka logger is an actor that receives log messages and forwards them to the preferred logging framework.
- Two built-in loggers available by default:
akka.event.Logging$DefaultLogger: sends messages toSTDOUT;akka.event.slf4j.Slf4jLogger: uses SLF4j as logging framework.
- Custom logger can be used (just an actor with specialized messages interface).
ActorLoggingtrait should be mixed to actor to use logging.- Placeholders available in logs:
log.debug("two args: {}, {}", "one", "two"). - Lot of config options available for Akka system logs, check docs.
LoggingReceivetrait should be used to track actor received messages.- Register
sbt-native-packagedplugin inproject/plugins.sbtto build application bundle withsbt. - Distribution is created with
sbt stage.
- There are three enterprise integration patterns (EIPs) discussed:
- pipes and filters;
- scatter-gather;
- routing slip;
- Pipes and filters introduce independent processing units with same interface (filters), so the pipeline can be constructed from them. Units can be reordered or replaced in any way and this won't change logic of rest units. In most cases units are filters, but it's not mandatory.
- Scatter-gather can be applied in two different scenarios:
- competing tasks: the processing tasks are all focused on one thing (but they may be doing it in different ways), the gather selects one result following specified criteria;
- parallel cooperative processing: the tasks are performing a subtask, the gather combines results into a single message.
- Scatter-gather introduces three components:
- scatter: recevies one message and scatters the job to several processing tasks;
- processing tasks: doing the same jobs in parallel or doing some parts of the main job concurrently;
- gather: filters or combines tasks result.
- Recipient list pattern can be used as simple implementation of scatter component.
- Gather timeout event can be implemented with Akka scheduler:
context.system.scheduler.scheduleOnce, that will send message to the actor after specified time. - To keep buffer of already received messages on restart, gather can use
preRestarthook to resend messages to itself. - Aggregator trait provided by Akka implements Aggregator pattern and can be used as gather component.
- Pipes and filters can be combined with scatter-gather components in two ways:
- scatter-gather implementation presents one filter in the pipeline;
- the pipeline is used in the processing tasks of scatter-gather pattern.
- Routing slip is a dynamic version of pipes and filters patterns.
- In routing slip units have the same interface and can be combined in many ways.
- The messages sent between units has
routeSlipfield that contains the list of following recepients. Each actor takes head from the list and send result to it with tail ofrouteSliplist. - It's better to implement route slip functions in separate trait that will send message to next task. In this case task actors won't contain copy-paste of code responsible for messages transferring.
- Three reasons for using routing to control message flow:
- performance;
- message content;
- state.
- In Akka a separation is made between the routing logic and the actor that represents the router.
- The built-in routers come in two varieties:
- pool: manage the routees;
- group: don't manage the routees (creation, adding, watching, removing and termination has to be done by the client code).
- Available routers within Akka:
RoundRobinRoutingLogic/RoundRobinPool/RoundRobinGroup;RandomRoutingLogic/RandomPool/RandomGroup;SmallestMailboxRoutingLogic/SmallestMailboxPool: group is not available because it can't check mailboxes of the routees;BalancingPool: one mailbox for all the routees, distributes the messages to the idle routees;BroadcastRoutingLogic/BroadcastPool/BroadcastGroup: recipient list;ScatterGatherFirstCompletedRoutingLogic/ScatterGatherFirstCompletedPool/ScatterGatherFirstCompletedGroup: scatter-gather pattern implementation;ConsistentHashingRoutingLogic/ConsistentHashingPool/ConsistentHashingGroup.
- Two different ways to configure the router:
- through configuration file;
- in source code.
- Some messages are processed by the router itself (instead of redirect to routees):
Kill: terminates the router (pool routees will terminated as well);PoisonPill: terminates the router (pool routees will terminated as well);Broadcast: sends the content of the message to all the routees (DO NOT use withBalancingPool).
RemoteRouteConfigconfigures router to use the routees on remote servers.- Resizer can be configured to dynamically change size of the pool:
- should be enabled with
enabled = on; lower-bound,upper-bound: min and max number of the routees;pressure-treshold: mailbox size of router that considered to be under pressure (special value0means that when the routee is processing a message, it's under pressure);rampup-rate: how fast routees should be added (0.25means +25% of current routees number, value rounded up);backoff-treshold: when decrease number of routees (0.3means decrease number when there are less than 30% of non-idle routees);backoff-rate: how fast routees should be removed (0.1means -10% of current routees number);messages-per-resize: number of messages received before another resize action is allowed.
- should be enabled with
- Default supervisor strategy of the pool always escalade fails to its own supervisor. That can lead to unexpected restart of the pool with all the routees.
- Supervisor strategy can be passed as a parameter to the pool constructor (default strategy:
SupervisorStrategy.defaultStrategy). - If the resizer of the pool is not configured the pool won't spawn a new route to replace terminated one (will just remove them from the list). Use resizer to keep the specified minimum number of routees.
- Groups configures with the routees paths instead of routees count.
- Paths to remote routees can be set in the configuration to make group work with remote routees. No additional changes needed.
- Group doesn't watch the routees - it will send messages to the terminated routee (it hopes routee becomes available on this path sometimes).
- Special messages to the groups:
GetRoutees: returns routees sequence (Java collection);AddRoutee: adds specified routee to the group;RemoveRoutee: removes specified routee from the group.
- Three implementations of the
Routeetrait:ActorRefRoutee: DO NOT use it to add the routee to the group, because in this case the group will watch the route and will terminate itself if the routee termiates;ActorSelectionRoutee: usesActorSelection;SeveralRoutees: a list ofRoutees.
- To remove a route the same
Routeinstance as sent inAddRouteshould be sent inRemoveRoutemessage. So useActorSelectionRoute. - The consistent hashing routers use virtual nodes before the routees to get a bigger chance to equally spread all the messages over the routees.
- The consistent hashing routers support three ways to translate the message into a message key (it's possible to use the three solutions in one router):
- router-specific: a partial function is passed to the router;
- message-specific: message should implement
ConsistentHashable; - sender-specific: message should be wrapped into
ConsistentHashableEnvelopeby the sender.
- Content-based routing can be implemented with regular actors, no Akka routers required.
- State-based routers can use
becomeandunbecomemethods:becomereplacesreceivefunction with specified one;unbecomerestores originalreceivefunction of the router.
- If state-based router fails and restarts, original
receivefunction is restored.
- Two channel types:
- point-to-point;
- publish-subscribe.
- The point-to-point channel sends the message to one receiver.
- The point-to-point channel can have multiple receivers, but every message is delivered to just one receiver (example: round-robin router).
- When multiple messages are sent through point-to-point channel, the order of these messages isn't changed.
- The publish-subscribe channel has dynamic nature and decouples the receivers and the sender.
- The publisher actor shouldn't know anything about the subscribers of publish-subscribe channel.
- Every
ActorSystemhaseventStreamthat can manage multiple publish-subscribe channels (classified by message type). - The actor can subscribe to a specific message type in event stream.
EventBuscan be implemented to create custom public-subscribe channel.EventBusis generalized, there are three entities:Event: type of all events in the channel;Subscriber: type of subscriber allowed to register on that event bus;Classifier: the classifier to be used in selecting subscribers for dispatching events.
- Three auxiliary traits to keep track of the subscribers:
LookupClassification: keeps a set of subscribers for each possible classifier, usingclassifymethod, which should be implemented;SubchannelClassification: used when classifiers form a hierarchy and it's possible to subscribe at the parent nodes as well (example: message types inEventStreamimplementation);ScanningClassification: can be used when classifiers have an overlap (oneEventcan be part of more classifiers).
ActorEventBustrait definesSubscriberentity asActorRef, also implementscompareSubscribersmethod needed byLookupClassification.- Dead-letter channel (or dead-letter queue or dead-message queue) is a special channel that contain all the messages that can't be processed or delivered.
EventStreamis used to implement dead-letter channel.- Messages in dead-letter channel are wrapped into a
DeadLetterobject. - The actor can subscribe to dead-letter channel with
system.eventStream.subscribe(actor, classOf[DeadLetter]). - A message can be sent to
system.deadLetteractor to be published in dead-letter channel. In this case initial receiver becomesDeadLetteractor. To avoid this the original message can be wrapped into aDeadLetterobject manually before sending to theDeadLetteractor. - Messages from dead-letter channel can be reinserted to the mailbox to keep them from dropping.
- Akka can't guarantee message delivery in all cases (no system can).
- For local actors, the delivery is guaranteed as long as there are no critical VM errors.
- For remote actors Akka guarantees that messages are delivered at most once (a message is delivered once or it's not delivered and it's lost).
- Three reasons why Akka doesn't implement fully guaranteed delivery:
- fully guaranteed delivery results in a performance penalty even when the system don't need that level of guarantees;
- systems need guaranteed processing not delivery only, but this is system dependent and Akka can't deduce this;
- it's always possible to add stricter guarantees on top of basic ones, but inverse is not possible.
ReliableProxycan be used to increase reliability of sending messages using remote actors.ReliableProxyestablished a tunnel between sender and receiver, tracks received messages and repeat failed ones until they are delivered as well.ReliableProxytunnel is only one-way and for one receiver. When the receiver replies to the sender, the tunnel is not used (another tunnel has to be made in this case).
FSMtrait is used to build finite-state machines.FSMtrait takes two type parameters:State: the super type of all states;StateData: the super type of all possible state data types that's tracked by the FSM.
FSMuseful methods:startWithdefines the initial state and the initial state data;when(State) { PartialFunction }declares transitions for the state;whenUnhandled { PartialFunction }declares default behavior for unhandled events;goto(State) { PartialFunction }declares next state during event processing;goto(State) using StateDatadeclares next state and updates data;staykeeps the current state during event processing;stay using StateDatakeeps the current state and updates data;onTransition { PartialFunction }declares entry and exit actions for states (stateDataandnextStateDatavariables available);initializestarts the FSM.
- An actor can subscribe to FSM transition events by sending
SubscribeTransitionCallback(actor)message to the FSM. FSM will reply withCurrentStatemessage and will sendTransitionmessage on each transition event. - The state timeout can be send in two ways:
- as
whensecond parameter:when(State, stateTimeout: FiniteDuration); - by calling
forMaxmethod:goto(State) using (StateData) forMax (FiniteDuration).
- as
- State receives
StateTimeoutmessage on timeout. The timer is cancelled upon receipt of any other message while in the current state. - Akka has support for sending messages using timers within FSM. API calls:
setTimer(name: String, msg: Any, timeout: FiniteDuration, repeat: Boolean);cancelTimer(name: String): cancels the timer immediately, even if the timer already fired and equeued the message, the message won't be processed after this call;isTimerActive(name: String): Boolean.
- Handler
onTermination { PartialFunction }is used to handle FSM termination. It receivesStopEvent(Reason, State, Data)object. There are three possible reasons:Normal: FSMstopmethod has been called;Shutdown: the actor has been terminated;Failure(cause: Any): an error occurred.
- Akka
Agentallows multiple actors work with shared state:Agent.applyorAgent.getreturns the current state;Agent.send(State)orAgent.send(State => State)updates the state (use second version to combine new state with current);Agent.alter(State => State)updates the state and returnsFuturewith new state;Agent.futurereturnsFuturethat finishes when the pending state changes are all processed.
- New agents can be created witn
maporflatMapmethods.
- Endpoints are the glue between the external service and the rest of the system. The endpoint encapsulates the interface between two services.
- Consumer endpoint receives messages from the external service and translates them to internal format of the system.
- Producer endpoint converts internal format of the message and produce it to the external service.
- Normalizer pattern translates different messages (from different services and endpoints) to common internal format to allow general processing.
- In complex cases normalizer can be splitted to three sub-components:
- a set of transport implementations (for example, EMail, REST and MQueue);
- a set of format translators (from plain text, JSON and XML);
- a router that selects translator for each received message (it should know how to distinguish between all the message types).
- There is a trade-off between flexibility and complexity: some transport implementations can be connected directly to translators if just one message type is transferred through this transport - this decreases complexety but also decreases flexibility.
- Canonical data model pattern can be used when lot of systems has to be connected together.
- In canonical data model each system provides endpoint that converts messages between internal system format and common canonical format that is used in communication bus.
- Apache Camel and Akka Camel module provide support for a great variety of transport layers and makes it possible to implement the standard EIPs in a few lines of code.
- Apache Camel allows to select transport layer implementation at runtime. This can be used in tests.
- Apache Camel uses URI to define the transport protocol and its properties.
akka.camel.Consumershould be extended to implement a consumer:endpointUrishould be implemented to specify URI of the desired component;receiveshould be implemented to process messages ofCamelMessagetype.
- To send a response in the consumer just reply to the sender actor as usual.
CamelExtension.activationFutureFor(Consumer)is used to "wait" a consumer to become ready.CamelContext.addComponentis used to add parametrized component (for example,ActiveMQComponent).BrokerRegistryis used to stop ActiveMQ message brokers:BrokerRegistry.getInstance().getBrokers.foreach { case (_, b) => b.stop() }.akka.camel.Producershould be extended to implement a producer:transformOutgoingMessagecan be implemented to convert message before sending;transformResponsecan be implemented to convert received response (fromCamelMessage);routeResponsecan be implemented to route received response to other receiver (default implementation routes response to the original sender); note:transformResponseshould be called manually from overridenrouteResponse.
- Akka HTTP module allows to define routes with directives.
- Generic directive form is this:
name(arguments) { extractions => ... // inner route }. - Directive examples:
get { ... }match on GET requests;post { ... }match on POST requests;path(PathMatcher)match request path;pathPrefix(PathMatcher)match prefix of the path;complete(value)completes the request with the value;
- The values can be extracted from the request path with path matchers:
pathPrefix("orders" / IntNumber) { id => ... }. - Akka HTTP module provides a test kit to test routes.
ScalatestRouteTestshould be extended in test class to use the kit. RequestTimeouttrait can be extended to automatically readakka.http.server.request-timeoutvalue from the configuration.- Entity marshallers should be provided in implicit scope to enable custom type marshallers in akka-http (for example, to enable XML marshalling
akka.http.scaladsl.marshallers.xml.ScalaXmlSupport._should be imported).
- A stream of data is a sequence of elements that could have no end.
- Streams provided by akka-stream library.
- Using akka-stream involves two steps:
- define a blueprint: how streams need to be processed;
- execute the blueprint: the graph is turned into actors that actually stream the data.
SourceandSinkare stream endpoints.- The
SourceandSinkformRunnableGraphwhen connected together. - All inputs and outputs in graph should be connected to form
RunnableGraph. RunnableGraph.runmaterialize the graph - it creates needed resources and do actual work. It requiresMaterializerin implicit scope.ActorMaterializerconvertsRunnableGraphinto actors.- A stream can be cancelled using
take,takeWhileandtakeWithin. - Streams use special protocol between data publisher and subscriber.
- Data subscriber use nonblocking back pressure to signal the publisher how much data it can process.
- Back pressure is traversed from the end of stream to the beginning to ensure no publisher sends more messages than slowest consumer can process.
- Reactive Streams Initiative is a standard for asynchronous stream processing with nonblocking back pressure.
- Akka-stream uses buffers internaly, so it requests batches of elements instead of requesting every single one.
- Akka-stream performs operator fusion to remove unnecessary asynchronous boundaries in the graph. As many stages in a graph as possible are run on a single actor. This behavior is configurable.
- Sources and sinks can provide an auxiliary value in a
Future. It's possible to configure which value should be kept in each transition (left, right or both). Flowcomponent is used to perform processing on a stream data. AFlowhas one input port and one output.- Akka-stream has a couple of predefined
Flows for framing that can be used to identify frames of data in a stream. - The
Flowhas many collection-like operations, such asmapandfilter. - A stream is not a collection. Big difference: the size of stream is not known.
- By default, stream processing is stopeed when an exception occurs. Supervisor strategy can be set for every graph component or for the complete graph to avoid this.
- Another option: catch exceptions and pass them through the stream as special messages.
- Bidirectional flow has two open inputs and two open outputs.
BidiFlowcan be stacked on top of a flow as an adapter.- In other words:
BidiFlowprovides two flows that can be connected before and after the existing flow to adapt input and output of the flow. - Akka-http internally uses akka-stream.
- HTTP request entity has a
dataBytesfield which isSourceof data in the HTTP stream. Sourcecan be send to the client asHttpEntity(ContentType, Source).- Akka-http makes it possible to create custom marshallers and unmarshallers for entity data of different content types.
- Akka-stream provides graph DSL.
- Graph DSL provides
GraphDSL.Builderto create the nodes and a~>method to connect nodes. Flows can be merged withMergeGraphStageorMergePreferredGraphStage.Source.combinecan be used to mergeSources.- Buffers can be used in streams.
Flow.buffermethod requires two arguments: buffer size and overflow strategy. - Buffer overflow strategies:
dropHead;dropTail;dropBuffer;dropNew;backpressure;fail.
Flow.groupedWithingroups stream elements that arrived during specified time. Can be used to decrease load to the consumer.Flow.expandregisters iterator that will be used when there are no elements available from the flow but fast consumer is ready to process. It will pull elements from the iterator in this case.
- A
clusteris a dynamic group of nodes, it makes it possible to dynamically grow and shrink the number of nodes. - Each node is a JVM with actor system started in it.
- All actor systems in the cluster should have the same name (this is the name of the cluster as well).
- Each cluster should contain one or more seed nodes. Seed nodes is a founders of the cluster.
- New node should know a list of seed nodes to join to the cluster. The list can be set in the configuration file.
- New node sends join message to each seed node; the first seed node to respond will get to handle the join command.
- One of the nodes in the cluster is the leader. The leader decides if a member node is up or down.
- The first node, in sort order, that is
UporLeavingautomatically becomes the leader. - If node in the cluster is down, it's flagged as
UNREACHABLE. - The leader can't execute any leader actions as long as any of the nodes are unreachable. No node can leave or join the cluster in this case.
- The unreachable node have to be taken down (use
downmethod orakka.cluster.auto-down-unreachable-aftersetting). - An actor can use
Cluster.subscribemethod to subscribe to cluster events:MemberUp,MemberExited,MemberRemoved,UnreachableMember,ReachableMemberandCurrentClusterState. ActorRef.watchmethod can be used as usual to watch actors in the cluster.- Routers can be used with
ClusterRouterPoolandClusterRouterGroupwrappers. - Cluster can be tested on local machine or in multi-JVM environment with
sbt-multi-jvmplugin in usual way.
- Event sourcing captures a sequence of immutable events in a journal.
- A persistent actor (
PersistentActortrait) works in two modes: it recovers from events (receiveRecovermethod) or it processes commands (receiveCommandmethod). - Every persistent actor requires a
persistentIdto identify the events in the journal for that actor. persistorpersistAsyncmethods is used inreceiveCommandto store events in the journal:persist(Event)(Event => Unit).- Commands are messages that are sent to the actor to execute some logic.
- Events provide evidence that the actor has executed the logic correctly.
- Snapshots can reduce the required storage space and speed up recovery of the state.
saveSnapshotmethod is used to save snapshots.- Recovery process can be customized by overriding the
recoverymethod. - Persistence query is a module for querying a journal.
PersistenceQuery.readJournalForreturns a specific read journal which is used to query the data.- Two types of queries:
- methods starting with
current: returns aSourcewith all currently stored events; - methods that don't start with
current: returns aSourcewith current events and continuously provide "live" events as they arrive.
- methods starting with
- Custom serializer is the best choice in most cases, but it's possible to use third-party libraries: akka-kryo-serialization and Stamina.
- Custom serializer has to extend
Serializertrait. - Serializers can be bound to the classes in configuration file using
akka.actor.serializersandakka.actor.serialization-bindingsections. - Akka-persistence doesn't just serialize the events and snapshots directly into the
JournalorSnapshotStore. The serialized objects are wrapped into internal format.EventAdaptercan help when it's needed to query the backend database of a journal plugin. - Sharding is the distribution of the actor state across servers.
- Cluster singleton extension guarantees that there's only one specified actor at any point in time in the cluster.
ClusterSingletonManageractor ensures singleton guarantee andClusterSingletonProxyactor always points to the current singleton in the cluster.- Default LevelDB journal is not safe for production use - akka-persistence-cassandra should be used.
ClusterShardingextension divides actors to nodes in shards.Shardis basically a group of sharded actors.- A
ShardRegionmanages a number ofShards and forwards messages to the sharded actors. - A
ShardingCoordinator(which is a cluster singleton) determines whichShardRegionwill own theShardbehind the scenes. - Special functions (
extractEntityIdandextractShardId) should be implemented by sharded actor's companion object to create IDs of commands and shards. The implementation should ensure that there will be no duplicate shared actors running in the cluster. ClusterShardingmodule will automatically start a sharded actor once it tries to forward a command. User code shouldn't start sharded actors.- A sharded actor can ask
Shardto be passivated when it's not used. This can help to control memory usage.
- 80% of performance improvements can be made by addressing only 20% of the system (Pareto principle):
- it's possible to make minor changes to improve performance;
- only changes to 20% of the system will have any effect on the performance;
- These 20% are called bottlenecks.
- Solving the first bottleneck gives the biggest improvement. Solving the next bottleneck will result in a lesser improvement (the concept of diminishing returns).
- Two types of performance problems:
- the throughput is too low: the number of requests that can be served is too low, usually solved by scaling;
- the latency is too long: each request takes too long to be processed, generally require design changes.
- Performance terms:
- arrival rate: number of messages arriving during a period;
- throughput: number of completions during a period;
- service time: the time needed to process a single job (or service rate - average number of jobs serviced during a period);
- the latency: the time between the entry and the exit;
- the utilization: the percentage of the time the node is busy processing messages.
- Messages queue size is an important metric indicating that there's a problem.
- Optimal performance: each time a task is completed, there's another one to do, but the wait time is vanishingly small.
- The queue size can be retrieved from the mailbox, and the utilization needs the statistics of the processing unit.
- The following data is needed from the Akka actor:
- when a message is received and added to the mailbox;
- when it was sent to be processed, removed from the mailbox and handed over to the processing unit;
- when the message was done processing and left the processing unit.
- These metrics can be retrieved by using custom mailbox and actor trait with overriden
receivemethod (both should send statistics). - To create a custom mailbox
MailboxTypeandMessageQueuetraits should be implemented. MailboxTypethat is used by a dispatcher should be set in the configuration file (akka.actor.default-mailboxfor the default dispatcher).- To improve the performace three parameters can be changed:
- number of services: actually a scaling-up, increases the possible throughput of the node;
- arrival rate: reducing number of messages to be processed;
- service time: making processing faster, also improves the throughput.
- Available buit-in dispatchers:
Dispatcher: default dispatcher, binds its actors to a thread pool, has fixed thread pool size;PinnedDispatcher: binds an actor to a single and unique thread, thread isn't shared between actors;BalancingDispatcher: redistributes the messages from busy actors to idle actors;CallingThreadDispatcher: uses the current thread to process the messages of an actor, only used for testing.
Props.withDispatcheris used to select the dispatcher defined in the configuration file.- When the CPU utilization is 80% or higher, increasing the number of threads will probably not increase the performance.
- When the CPU utilization is low this means that the processing of messages is mainly waiting, freezings and blocking calls should be removed. If this is impossible the number of threads can be increased.
- Three parameters to configure the number of threads:
parallelism-factor: used to calculate number of threads from available processors;parallelism-min: min number of threads;parallelism-max: max number of threads.
- The executore used by the dispatcher can be changed to use a dynamic thread pool.
- Three possible values of the executor configuration item:
fork-join-executor: default executor;thread-pool-executor: used when a dynamic thread pool is needed;- fully qualified class name (FQCN): custom executor (should extend Java
ExecutorService).
- Two important parameters of
thread-pool-executor:task-queue-size: size of waiting thread requests before the pool is increased, -1 disables pool increase (how quickly the pool size will grow);keep-alive-time: time a thread can be idle before it's cleaned up (how quickly the pool size will decrease).
- There is
throughputdispatcher parameter that defines the maximum number of messages an actor may process before it has to release the thread back to the pool (default value: 5). - Increasing the
throughputwill improve performance when there are a lot of messages but message processing is fast. - The
throughputcan negatively impact performance if message processing is long. Thethroughput-deadline-timeparameter bounds time that actor can keep a thread. - Whether it's needed to increase or decrease the
throughputconfiguration is completely dependent on the arrival rate and the function of the system.
- Akka-typed module provides typed Actor API.
- There is no
sender()method in akka-typed. - Actors are defined in terms of typed behaviors.
- Every message is passed to an immutable behavior.
- The behavior of an actor can be swithced over time or stay the same.
preStart,preRestartand other methods are replaced by special signal messages.- The akka-typed API is highly likely to change and shouldn't be used in production.
- Akka Distributed Data provides conflict-free replicated data types (CRDTs) in an Akka cluster.
- CRDTs always have a
mergefunction that merges data entries into one consistent view without any coordination between the nodes. - The types of data structures that can be used as CRDT are limited.
- It's possible to build custom CRDT data structure (it should implement
mergefunction according to the rules of CRDT). - Akka Distributed Data provides
Replicatoractor to replicate a data structure throughout the Akka cluster. - It's possible to subscribe to updates of the data structure.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment