layout: true --- class: center, middle # Akka Typed Christopher Batey Lightbend `@chbatey` ??? --- # About me * Christopher Batey * Work @ Lightbend on the Akka team * Akka * Akka Streams * Akka HTTP * Akka Persistence plugins --- background-image: url(AkkaLogoAndSummary.png) --- background-image: url(AkkaToolkit.png) # Akka Toolkit --- # Why * Type safety -- * Refactoring -- * Making protocols explicit --- # Lock example ```scala sealed trait LockProtocol final case object Lock extends LockProtocol final case object Unlock extends LockProtocol sealed trait LockStatus final case object Granted extends LockStatus final case class Taken(who: ActorRef) extends LockStatus ``` --- # Untyped Actor (mutable) ```scala class MutableActor extends Actor with ActorLogging { import LockProtocol._ private var owner: Option[ActorRef] = None def receive: Receive = { case Lock if owner.isEmpty => owner = Some(sender()) sender() ! Granted case Lock => sender() ! Taken(owner.get) case Unlock => require(owner.contains(sender())) owner = None } } ``` --- # Untyped Actor (become) ```scala class BecomeActor extends Actor { import LockProtocol._ private val unlocked: Receive = { case Lock => sender() ! Granted context.become(locked(sender())) } private def locked(who: ActorRef): Receive = { case Lock => sender() ! Taken(who) case Unlock => require(sender() == who) context.become(unlocked) } override def receive = unlocked } ``` --- # Akka Typed * `ActorRef` becomes `ActorRef[T]` -- * No more Actor trait -- * No more `sender()` -- * No more `actorSelection` --- # Akka Typed * Send 0+ messages -- * Spawn 0+ children -- * Change its behavior --- # Protocol ```scala sealed trait LockProtocol final case class Lock(self: ActorRef[LockStatus]) extends LockProtocol final case class Unlock(self: ActorRef[LockStatus]) extends LockProtocol ``` -- ```scala sealed trait LockStatus final case object Granted extends LockStatus final case class Taken(who: ActorRef[LockStatus]) extends LockStatus ``` --- # Behaviors ```scala val unlocked: Behavior[LockProtocol] = Behaviors.receive[LockProtocol] { case (ctx, Lock(who)) => who ! Granted locked(who) } ``` -- ```scala def locked(by: ActorRef[LockStatus]): Behavior[LockProtocol] = Behaviors.receive[LockProtocol] { case (ctx, Lock(who)) => who ! Taken(by) Behaviors.same case (ctx, Unlock(who)) => require(who == by) unlocked } ``` ??? * Run behaviors without starting an actor (still working on the testkit) --- # Top level behavior ```scala val topLevel = Behaviors.setup[LockStatus] { initialCtx => val lockActor = initialCtx.spawn(unlocked, "lock-a") lockActor ! Lock(initialCtx.self) ??? ``` --- # Top level behavior ```scala val topLevel = Behaviors.setup[LockStatus] { initialCtx => val lockActor = initialCtx.spawn(unlocked, "lock-a") lockActor ! Lock(initialCtx.self) Behaviors.receive { case (ctx, Granted) => ctx.log.info("Yay I have the Lock") ctx.ask(lockActor)(Unlock) { case Success(m) => m case Failure(t) => UnlockFailed } Behaviors.same ``` --- # Top level behavior ```scala val topLevel = Behaviors.setup[LockStatus] { initialCtx => val lockActor = initialCtx.spawn(unlocked, "lock-a") lockActor ! Lock(initialCtx.self) Behaviors.receive { case (ctx, Granted) => ctx.log.info("Yay I have the Lock") ctx.ask(lockActor)(Unlock) { case Success(m) => m case Failure(t) => UnlockFailed } Behaviors.same case (ctx, Taken(who)) => ctx.log.info("Who dare take my lock???") Behaviors.same case (ctx, Released) => ctx.log.info("No more lock for me :(") Behaviors.same } } ``` -- ```scala val system = ActorSystem(topLevel, "TopLevel") ``` --- # Ask example ```scala ctx.ask(lock)(Unlock) { case Success(m) => m case Failure(t) => UnlockFailed } ``` -- ```scala def ask[Req, Res]( otherActor: ActorRef[Req]) (createRequest: ActorRef[Res] ⇒ Req) (mapResponse: Try[Res] ⇒ T) ``` --- background-image: url(AkkaCluster.png) --- background-image: url(AkkaTypedReceptionist.png) --- # Service discovery ```scala sealed trait NeedsLock case class LockActorAvailable(lock: ActorRef[LockProtocol]) extends NeedsLock case object LockNotAvailable extends NeedsLock case object LockGranted extends NeedsLock ``` -- ```scala val needsLockGrant = Behaviors.receive[NeedsLock] { case (ctx, LockGranted) => ctx.log.info("Yay lock, but what do I do with it?? Mutate some global state? Mu haha") Behaviors.same case (ctx, LockNotAvailable) => ctx.log.info("Sad panda") Behaviors.stopped } ``` --- # Registering with the receptionist ```scala val topLevel = Behaviors.setup[NeedsLock] { initialCtx => val lock = initialCtx.spawn(unlocked, "lock-a") initialCtx.system.receptionist ! Register(ServiceKey[LockProtocol]("lock-a"), lock) needsLockInstance } ``` -- ```scala val lockKey = ServiceKey[LockProtocol]("lock-a") initialCtx.ask(initialCtx.system.receptionist)(Find(lockKey)) { case Success(listing) if listing.serviceInstances(lockKey).size == 1 => LockActorAvailable(listing.serviceInstances(lockKey).head) case _ => LockNotAvailable } ``` --- # Using the Lock Actor ``` scala Behaviors.receive[NeedsLock] { case (ctx, LockActorAvailable(lockActor)) => ctx.log.info("Lock actor is available, time to get it") ctx.ask(lockActor)(Lock) { case Success(l) => LockGranted case Failure(t) => LockNotAvailable } needsLockGrant case (ctx, LockNotAvailable) => ctx.log.info("Oh noes, no lock actor") Behaviors.stopped } } ``` --- # Service discovery ```scala sealed trait NeedsLock case class LockActorAvailable(lock: ActorRef[LockProtocol]) extends NeedsLock case object LockNotAvailable extends NeedsLock case object LockGranted extends NeedsLock ``` ```scala val needsLockGrant = Behaviors.receive[NeedsLock] { case (ctx, LockGranted) => ctx.log.info("Yay lock, but what do I do with it?? Mutate some global state? Mu haha") Behaviors.same case (ctx, LockNotAvailable) => ctx.log.info("Sad panda") Behaviors.stopped } ``` --- # Persistence -- ```scala case class Command(data: String) case class Event(data: String) case class State(events: List[String] = Nil) ``` -- ```scala val behavior: Behavior[Command] = PersistentBehaviors.receive[Command, Event, State]( persistenceId = "abc", initialState = State(), commandHandler = (ctx, state, cmd) ⇒ ???, eventHandler = (state, evt) ⇒ ???) ``` --- # Command handler ```scala val commandHandler: CommandHandler[Command, Event, State] = CommandHandler.command { case Cmd(data) ⇒ Effect.persist(Evt(data)) } ``` -- * Persist * PersistAll * Stop * Unhandled --- # Event Handler ```scala val eventHandler: (State, Event) ⇒ (State) = { case (state, Evt(data)) ⇒ state.copy(data :: state.events) } ``` --- # That's all folks There's more: * Cluster Sharding * Receptionist subscribe --- # Thanks! Code and slides at `chbatey/akka-typed-talk` on GitHub --- # Questions?