Actor model
Clustering
Persistence (Event Sourcing)
HTTP
Streams
Typed refinements
Multi DC
gRPC
Artery TCP
Stream Refs
sealed trait LockProtocol
final case object Lock extends LockProtocol
final case object Unlock extends LockProtocol
sealed trait LockStatus
final case object Granted extends LockStatus
final case class Taken(who: ActorRef) extends LockStatus
class MutableActor extends Actor with ActorLogging {
private var owner: Option[ActorRef] = None
def receive: Receive = {
case Lock if owner.isEmpty =>
owner = Some(sender())
sender() ! Granted
case Lock =>
sender() ! Taken(owner.get)
case Unlock =>
require(owner.contains(sender()))
owner = None
}
}
class BecomeActor extends Actor {
private val unlocked: Receive = {
case Lock =>
sender() ! Granted
context.become(locked(sender()))
}
private def locked(who: ActorRef): Receive = {
case Lock =>
sender() ! Taken(who)
case Unlock =>
require(sender() == who)
context.become(unlocked)
}
override def receive: Receive = unlocked
}
ActorRef
becomes ActorRef[T]
sender()
actorSelection
sealed trait LockProtocol
case class Lock(ref: ActorRef[LockStatus]) extends LockProtocol
case class Unlock(ref: ActorRef[LockStatus]) extends LockProtocol
sealed trait LockStatus
case object Granted extends LockStatus
case class Taken(who: ActorRef[LockStatus]) extends LockStatus
case object Released extends LockStatus
case object UnlockFailed extends LockStatus
def locked(by: ActorRef[LockStatus]): Behavior[LockProtocol] =
Behaviors.receiveMessage[LockProtocol] {
case (Lock(who)) =>
who ! Taken(by)
Behaviors.same
case (Unlock(who)) =>
require(who == by)
who ! Released
unlocked
}
val unlocked: Behavior[LockProtocol] =
Behaviors.receiveMessagePartial[LockProtocol] {
case (Lock(who)) =>
who ! Granted
locked(who)
}
ActorRef[Receptionist.Command]
case class Find[T] (...) extends Command
case class Subscribe[T] (...) extends Command
case class Listing[T](...) extends Command
val topLevel = Behaviors.setup[NeedsLock] { ctx =>
val lock = ctx.spawn(unlocked, "lock-a")
ctx.system.receptionist !
Register(ServiceKey[LockProtocol]("lock-a"), lock)
needsLockInstance
}
initialCtx.ask(initialCtx.system.receptionist)(Find(key)) {
case Success(listing) if hasListing(key, listing) =>
LockActorAvailable(listing.serviceInstances(key).head)
case _ =>
LockNotAvailable
}
def ask[Req, Res](
otherActor: ActorRef[Req])
(createRequest: ActorRef[Res] ⇒ Req)
(mapResponse: Try[Res] ⇒ T)
val needsLockInstance = Behaviors.setup[NeedsLock] { initialCtx =>
val key = ServiceKey[LockProtocol]("lock-a")
initialCtx.ask(initialCtx.system.receptionist)(Find(key)) {
case Success(listing) if hasListing(key, listing) =>
LockActorAvailable(listing.serviceInstances(key).head)
case _ =>
LockNotAvailable
}
Behaviors.receive[NeedsLock] {
case (ctx, LockActorAvailable(lockActor)) =>
ctx.log.info("Lock actor is available, time to get it")
ctx.ask(lockActor)(Lock) {
case Success(l) => LockGranted
case Failure(t) => LockNotAvailable
}
needsLockGrant
case (ctx, LockNotAvailable) =>
ctx.log.info("Oh noes, no lock actor")
Behaviors.stopped
}
val system = ActorSystem(topLevel, "TopLevel")
case class Command(data: String)
case class Event(data: String)
case class State(events: List[String] = Nil)
val behavior: Behavior[Command] =
PersistentBehaviors.receive[Command, Event, State](
persistenceId = "abc",
initialState = State(),
commandHandler = (ctx, state, cmd) ⇒ ???,
eventHandler = (state, evt) ⇒ ???)
val commandHandler: CommandHandler[Command, Event, State] =
CommandHandler.command {
case Cmd(data) ⇒ Effect.persist(Evt(data))
}
val eventHandler: (State, Event) ⇒ (State) = {
case (state, Evt(data)) ⇒ state.copy(data :: state.events)
}
Running Akka cluster across DCs
Membership management during partitions
Singletons and sharding
Distributed data
cluster.multi-data-center.self-data-center = "a"
val clusterSingleton = ClusterSingleton(system)
val singleton: ActorRef[CounterCommand] =
clusterSingleton.spawn[CounterCommand](TypedCounter.counter(0),
"cat-counter",
Props.empty,
ClusterSingletonSettings(system),
GoodByeCounter)
singleton ! Increment
singleton ! Increment
val count: Future[Int] = singleton ? GetValue
cluster.multi-data-center.self-data-center = "b"
val proxy = clusterSingleton.spawn[CounterCommand](
TypedCounter.counter(0),
"cat-counter",
Props.empty,
ClusterSingletonSettings(system)
.withDataCenter("a")
,
GoodByeCounter
)
proxy ! Increment
proxy ! Increment
val count: Future[Int] = proxy ? GetValue
Reactive streams over the network
val source = Source(1 to 100)
val sourceRef: Future[SourceRef[Int]] =
source.runWith(StreamRefs.sourceRef())
trait SourceRef[T] {
def source: Source[T, NotUsed]
}
streamRef.runWith(Sink.foreach(println))
service GreeterService {
rpc SayHello (HelloRequest) returns (HelloReply) {}
rpc ItKeepsTalking (stream HelloRequest) returns (HelloReply) {}
rpc ItKeepsReplying (HelloRequest) returns (stream HelloReply) {}
rpc StreamHellos (stream HelloRequest) returns (stream HelloReply) {}
}
new StreamObserver[T]() {
override def onNext(summary: T): Unit = { ... }
override def onError(t: T): Unit = { ... }
override def onCompleted(): Unit = { ... }
}
rpc SayHello (HelloRequest) returns (HelloReply) {}
def sayHello(in: HelloRequest): Future[HelloReply] =
rpc ItKeepsTalking (stream HelloRequest) returns (HelloReply) {}
def itKeepsTalking(in: Source[HelloRequest, NotUsed]):
Future[HelloReply] = {
in.runWith(Sink.foreach(println))
.map(_ => HelloReply("I have read your stream"))
}
rpc ItKeepsReplying (HelloRequest) returns (stream HelloReply) {}
override def itKeepsReplying(in: HelloRequest):
Source[HelloReply, NotUsed] = {
Source(List(HelloReply(s"Hello"), HelloReply(s"Hello ${in.name}")))
}
rpc StreamHellos (stream HelloRequest) returns (stream HelloReply) {}
override def streamHellos(in: Source[HelloRequest, NotUsed]):
Source[HelloReply, NotUsed] = {
in.map(hr => HelloReply(s"Hello ${hr.name}"))
}
val service: HttpRequest => Future[HttpResponse] =
GreeterServiceHandler(new GreeterServiceImpl(mat))
Http2().bindAndHandleAsync(service, "127.0.0.1", 8080, HttpConnectionContext(http2 = Always))
.foreach { binding =>
println(s"gRPC server bound to: ${binding.localAddress}")
}
val client = new GreeterServiceClient(new GrpcClientSettings(
"127.0.0.1",
8080
))
val response: Future[HelloReply] =
client.sayHello(HelloRequest("chbatey"))
val clientStreaming: Future[HelloReply] =
client.itKeepsTalking(Source(List(
HelloRequest("chbatey"),
HelloRequest("trevor"))))
val streamedResponse: Source[HelloReply, NotUsed] =
client.streamHellos(???)
streamedResponse.runWith(Sink.foreach(println))
Scala