class: center, middle # Responsive, back-pressured services with Akka Christopher Batey Lightbend `@chbatey` ??? - Useful for non Akka projects --- # About me * Christopher Batey * Work @ Lightbend on the Akka team * Akka * Akka-streams * Akka-http * Akka persistence for Apache Cassandra * Alpakka ??? - Building services rather than libraries/frameworks --- class: center, middle # Responsive # Scalable ??? - High level goals - Everyone: Responsive even for low throughput - Scalable for those who need it - Scalable in process: Execution model appropriate - Scalable across machines - Resource efficiency --- class: center, middle # Asynchronous # Back pressured ??? - Techniques to achieve the goals - Different/better techniques? - Async - Remaining responsive - Back pressure/FlowControl - Dealing with components that run at different speeds - Much more important when moving away from thread per request --- # Goals * High level * What does async give us? * What does flow control give us? * Concrete tech * Flow control with Akka streams * Demos * TCP Client -> TCP Server (slow server) * Http Client -> TCP -> Http Server -> TCP -> Apache Cassandra (slow client) ??? - Takeaways for this presentation - Flow control in your application and between them - TCP windowing, receive and send buffers, akka streams - Big point: If the client slows down we stop fetching results from the database. No wasted effort. Constant memory footprint --- # Use case * HTTP Service, endpoints for: * User information from database * Getting user activity over a large time span -- * Requirements * Respond in a timely manner, even if it is a failure * Don't do any unnecessary work * Constant memory footprint ??? - --- class: center, middle # Responsive ??? - Single request --- # Responsiveness * Control over when your application responds * Service time vs response time * Queues and buffers * Dependencies * Don't make their problem your problem
-- .center[![queue](response-time.png)] ??? - Response time vs service time - Yellow is service time - Red is response time --- class: center, middle # Execution models ??? - How does a byte get some the wire to your code? - What threads are used? - Single threaded? - Thread per request? - Thread per core without blocking IO? - How to get this wrong even with the right tools? --- # Thread per request ```scala def post(request: HttpRequest): HttpResponse = { // do something very important ??? } ``` ??? - Options for this API? - Hystrix --- # Thread per request ```scala def post(request: HttpRequest): HttpResponse = { val userId = request.getQueryParam("userId") val user: User = getUserFromDatabase(userId) ExternalService.sendPresentToUser(user) HttpResponse(200) } ``` ??? - Giving up the thread of control --- # Thread per request ```scala def post(request: HttpRequest): HttpResponse = { // nice and quick val userId = request.getQueryParam("userId") // 5 millis to 10 seconds? val user: User = getUserFromDatabase(userId) // 5 milliss to 10 seconds? ExternalService.sendPresentToUser(user) HttpResponse(200) } ``` ??? - Pros: very simple programming model - Socket read timeouts? - "back pressure" via max number of threads --- # Demonstrating this ``` http://skillsmatter.com/skillscasts/6520-think-your-software-is-fault-tolerant-prove-it ``` ??? - Demonstrates network level timeouts don't work for SLAs --- # Async ```scala def request(request: HttpRequest): Future[HttpResponse] = ??? ``` * Timeout independent to dependency times * Option not to block on IO * Works well for small responses that are ready to go ??? - --- # Async ```scala def request(request: HttpRequest): Future[HttpResponse] = for { user <- lookupUser(request.getQueryParam("userId")) _ <- AsyncExternalService.sendPresentToUser(user) } yield HttpResponse(200) ``` ??? - Dependencies to be async as well - Benefits even if all deps aren't async --- # Akka HTTP ```scala path("user" / Segment) { name => get { val user: Future[Option[User]] = DataAccess.lookupUser(name) onComplete(user) { case Success(Some(u)) => complete(u) case Success(None) => complete(NotFound) case Failure(t) => complete(InternalServerError, t.getMessage) } } ``` ??? - Routing DSL - Lower level API - Can complete with: value, future, stream - Need a way to serialise User --- # Database ```scala val session = ... // From the Cassandra driver ``` ```scala def lookupUser(userId: UserId): Future[Option[User]] = { session.executeAsync("select * from users where user_id = ?", userId).asScala .map((rs: ResultSet) => Option(rs.one()) .map(row => User( row.getString("user_id"), row.getString("user_name"), row.getInt("age")))) } ``` ??? - Async now all the way through - How do we do a request timeout now? --- # Akka HTTP ```scala path("user" / Segment) { name => get { withRequestTimeout(500.millis) { val user: Future[Option[User]] = DataAccess.lookupUser(name) onComplete(user) { case Success(Some(u)) => complete(u) case Success(None) => complete(NotFound) case Failure(t) => complete(InternalServerError, t.getMessage) } } } ``` ??? - Surrounding with a directive --- class: centre, middle # Responsiveness under load ??? - Now assume a large request or a large number of requests - How to handle? --- # Playing fair ``` +-----------+ | | +---->| Database | | | | | +-----------+ | +---------+ +----------+ | | +---------------> | | | Client | | Server | | | <---------------+ | | +---------+ +----------+ | | | | +------------+ | | | +------+| Service | | | +------------+ ``` ??? - How do we stop the server overloading the service? - How do we handle the client consuming our response slowly? - We'll demo this with HTTP / Apache Cassandra --- # The default .center[![queue](toomuchmail.jpg)] --- # Queueing and/or buffering * How would this work if Kafka was between the services?
-- .center[![queue](pubsub-half.png)] ??? - Evenly matches the queue would go up and down --- # Fast publisher * Without the queue: * Down stream gets overwhelmed * Publisher has wasted resources
-- .center[![queue](pubsub-full.png)] --
* Would this work in memory? * Why produce data no one is ready to consume? ??? - How many of your applications deal with the queue filling up and send back back pressure? --- # Reacting to failure * Is the consumer slow or down? * Circuit breakers
.center[![Circuit breaker](circuit-breaker.png)] ??? - Circuit breakers - Static throttling? - Hystrix - Not acceptable to react, we should prevent! --- # Flow control * Dynamically adjust the rate based on demand
.center[![queue](demand.png)] ??? - Scenarios - HTTP to database - Kafka to Cassandra - Reactive streams - Subscriber dictates flow, keeping publisher aware of its free buffer space --- # TCP flow control * Only send data that the other side is expected to handle ??? - How how works - How to build this into your application --- # TCP windowing .center[![queue](tcp-window-initial.png)] --
.center[![queue](tcp-window-1.png)] --
.center[![queue](tcp-window-2.png)] --
.center[![queue](tcp-window-3.png)] ??? - Notes --- # TCP windowing
--- # TCP windowing
??? - Window fills up, client stops sending - Heart beats to know when it can start sending again --- # Akka Streams 101 * Sources * Emit elements * Iterables * Files * TCP connections * Kafka -- * Sinks * Sends data some where or side effects * Side effects * Databases * TCP connections -- * Flows * Reusable components that transform/filter ??? - Doesn't need to be linear --- # Akka Streams ```scala implicit val system = ActorSystem() implicit val materialiser = ActorMaterializer() ``` -- ```scala val source: Source[Int, NotUsed] = Source(1 to 100) ``` -- ```scala val foldingSink: Sink[Int, Future[Int]] = Sink.fold(0)(_ + _) ``` -- ```scala val ran: Future[Int] = source.runWith(foldingSink) ``` -- ```scala 5050 ``` --- # Akka Streams ```scala val source: Source[Int, NotUsed] = Source(1 to 100) ``` -- ```scala val printingSink: Sink[Any, Future[Done]] = Sink.foreach(println) ``` -- ```scala val runPrintingSink: Future[Done] = source.runWith(printingSink) ``` --- # Akka Streams ```scala val runAgain: Future[Int] = source .map(_ * 100) .runWith(foldingSink) ``` -- ```scala val flow: Flow[Int, Int, NotUsed] = Flow[Int] .filter(_ % 2 == 0) .map(_ * 10) ``` -- ```scala val viaAFlow: Source[Int, NotUsed] = source.via(flow) ``` --- class: center, middle # Putting it all together HTTP Client -> TCP -> Server -> HTTP Server -> TCP -> Apache Cassandra ??? - Demo time or code on slides --- # Putting it all together * Akka HTTP client * Akka HTTP server * Custom Cassandra Source --- # Akka Client ```scala def eventsForUser(userId: String): Future[Source[Event, NotUsed]] = { val response: Future[HttpResponse] = Http().singleRequest( HttpRequest(uri = s"http://localhost:8080/user/tracking/$userId") ) response.map { case HttpResponse(StatusCodes.OK, headers, entity, _) => val responseBytes: Source[ByteString, Any] = entity.dataBytes responseBytes.via(Framing.delimiter( ByteString("\n"), maximumFrameLength = 100, allowTruncation = true)) .map(_.utf8String.parseJson.convertTo[Event]) .mapMaterializedValue(_ => NotUsed) } } ``` ??? - Break down Future[Source[Event, NotUsed]] - responseBytes is a Source! - Describe framing --- # Akka Client ```scala def eventsForUser(userId: String): Future[Source[Event, NotUsed]] = { val response: Future[HttpResponse] = Http().singleRequest( HttpRequest(uri = s"http://localhost:8080/user/tracking/$userId") ) ... ``` -- ```scala case HttpResponse(StatusCodes.OK, headers, entity, _) => val responseBytes: Source[ByteString, Any] = entity.dataBytes ``` -- ```scala responseBytes.via(Framing.delimiter( ByteString("\n"), maximumFrameLength = 100, allowTruncation = true)) .map(_.utf8String.parseJson.convertTo[Event]) ``` --- # Akka Client ```scala def eventsForUser(userId: String): Future[Source[Event, NotUsed]] = { val response: Future[HttpResponse] = Http().singleRequest( HttpRequest(uri = s"http://localhost:8080/user/tracking/$userId") ) response.map { case HttpResponse(StatusCodes.OK, headers, entity, _) => val responseBytes: Source[ByteString, Any] = entity.dataBytes responseBytes.via(Framing.delimiter( ByteString("\n"), maximumFrameLength = 100, allowTruncation = true)) .map(_.utf8String.parseJson.convertTo[Event]) .mapMaterializedValue(_ => NotUsed) } } ``` --- # Akka Server ```scala val bound: Future[Http.ServerBinding] = Http().bindAndHandle(route, "localhost", 8080) ``` -- ```scala def bindAndHandle( handler: Flow[HttpRequest, HttpResponse, Any], interface: String, port: Int) ``` -- ```scala val streamingRoute = path("user" / "tracking" / Segment) { name: String => val source: Source[Domain.Event, NotUsed] = DataAccess.lookupEvents(name) val asJson: Source[ByteString, NotUsed] = source.map(e => ByteString(s"${e.toJson.toString()}\n", StandardCharsets.UTF_8)) complete(HttpEntity(ContentTypes.`application/json`, asJson)) } ``` --- # Cassandra Source ```scala // userByIdCQL = "select * from user_tracking where name = ?" @deprecated("Sorry we did this", "always") def lookupEvents1(userId: UserId): Future[Seq[Event]] = { session.executeAsync(userByIdCQL, userId).asScala .map(rs => rs.all().asScala.map(row => { Event( row.getString("user_id"), UUIDs.unixTimestamp(row.getUUID("time")), row.getString("event")) })) } ``` --- # Cassandra Source ```scala def lookupEvents(userId: UserId): Source[Event, NotUsed] = { CassandraSource(userByIdCQL, userId)(session) .map(row => Event( row.getString("user_id"), UUIDs.unixTimestamp(row.getUUID("time")), row.getString("event"))) } ``` --- # Cassandra Source ```scala class CassandraSource(statement: Statement, session: Session) extends GraphStage[SourceShape[Row]] { override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with StageLogging { override def preStart(): Unit = { // do initial query } setHandler( out, new OutHandler { override def onPull(): Unit = { // Demand from the sink } } ) } } ``` --- # Cassandra Source: Starting ```scala override def preStart(): Unit = { implicit val ec = materializer.executionContext fetchCB = getAsyncCallback[Try[ResultSet]](tryPush) // set fetch size to something small session.executeAsync(statement).asScala.onComplete(fetchCB.invoke) } ``` --- # Cassandra Source: When query completes ```scala def tryPush(resultSet: Try[ResultSet]): Unit = resultSet match { case Success(rs) => lastResult = Some(rs) if (rs.getAvailableWithoutFetching > 0) { if (isAvailable(out)) { push(out, rs.one()) } } else { completeStage() } case Failure(failure) => failStage(failure) } } ``` --- # Cassandra Source: Starting ```scala // val rs: Option[ResultSet] setHandler( out, new OutHandler { override def onPull(): Unit = { lastResult match { case Some(rs) if rs.getAvailableWithoutFetching > 0 => push(out, rs.one()) case Some(rs) if rs.isExhausted => completeStage() case Some(rs) => log.info("Fetching more results from Cassandra") rs.fetchMoreResults().asScala.onComplete(fetchCB.invoke) case None => } } } ) ``` ??? - Demo time or code on slides --- class: center, middle # Demo time --- # Demo summary * TCP slow server * TCP buffers fill up * Client stop doing work -- * HTTP slow client * Client makes request for a large payload * TCP buffers fill up * Server *stops* getting data from database * Client then demands more * Everything starts flowing --- # Summary * Flow control can flow through the network into your app * Prevent your applications from being overloaded * Save CPU cycles -- * Akka streams is one way to do it in your application -- * Take advantage of TCP's flow control --- class: center, middle # Thanks! Code and slides at `chbatey/akka-streams-examples` on GitHub --- # Questions?