Learning to tame akka-http (WebSocket)

Please feel free to skip down the introductory section if you would like to directly jump into creating a WS Server route (or) be a client of WS using akka.

Please note, akka-http uses akka-stream as its underlying machinery. Hence, knowledge of akka-streams is useful. But, I shall try to keep the details as simple as I could.

Introduction

WebSocket is one of the most flexible protocols in the world of HTTP. Reason being,

a. Every request-response does not have to be in the overhead of a new connection

b. The mechanism of communication does not have to be a request-response. Meaning, it could be a single request and many responses or vice-verse.

c. The request to response can be based on long batch, where the response may happen after a long time.

d. Every end-point of service in WebSocket is not bound by many routes (which is generally a case with HTTP REST is used), i.e, a double edged sword, where the same end-point can be used to service many request-response schema.
This poses a challenge to support heterogeneous schema during the parse of a request. This is achieved by using “message-type” attribute abstraction (in the case of JSON) for every incoming message.

For ProtoBuf, its built onto the message itself so no need of additional “message-type”.

e. If a middleware (like NodeJS) is being used, the connections to the WebSocket can be multiplexed (to reduce the number of connections opened between these applications)

This can be achieved by adding an attribute to the message called “request-id”. This can provide a unique identifier for messages from middleware.

💡In fact, gRPC follows some of these benefits mentioned in WebSocket.

Using Akka WS as a Server

As for a complete working application, I have provided a link at the end of the post.

akka-http provides routes as an abstraction for handling incoming web-socket requests.

//place holder... create an actor system here
implicit val mat: ActorMaterializer = ActorMaterializer()

Http
().bindAndHandleAsync(
route,
"0.0.0.0",
8080,
settings = ServerSettings(system)
)
def route = ???

Now the route definition itself…

def route: Route = {
import akka.http.scaladsl.server.Directives._
(get & path("ws")) {
val incoming: Sink[Message, NotUsed] = ???
val outgoing: Source[Message, Unit] = ???
val flow: Flow[Message, Message, Any] = Flow.fromSinkAndSource(incoming, outgoing)
handleWebSocketMessages(flow)
}
}

The first part of writing an WS Server is to understand the use of akka-streams.

Visualize that incoming messages are Sink and outgoing messages are Source. This does provides couple of subtle machineries.

i) When there are no more Source Message, we are instructing the stream to implicitly end. Meaning, the WS would close.

ii) The messages for Source and Sink needs to be of Message type. The explicitly supported concrete types are

  • TextMessage
  • BinaryMessage

For the remainder part of the discussion, let’s consider using TextMessage.

iii) Due to the way back-pressure can be managed by akka-streams, incoming messages can be streamed entries.
Meaning, they may not be TextMessage.Strict. In the sense, the data may be arriving in multiple frames. This is pretty useful when the an incoming message is large and needs to be processed as a stream.
TextMessage.toStrict provides a way to make Sink message as one large String.

iv) Since, we have the tooling of akka-streams, it would be a simple exercise to make the Incoming and Outgoing handles as Actors (in order to provide simple abstractions to the caller code).

In order to better understand the implementation of incoming and outgoing message(s), let’s try to write a simple Echo WS Server that shall echo back any incoming message to the caller.

We shall start by filling the blank of incoming and then the outgoing values.

Incoming

val incoming: Sink[Message, NotUsed] = Flow[Message] //take the incoming message and make it into a string.
.mapAsync(1) { m ⇒
m.flatten(2.minutes)
}
.collect { case TextMessage.Strict(m) ⇒ m }
.to(Sink.foreach { s ⇒
log.info(s"Incoming message: ${s}")
})

In the incoming Sink,we build it using Flow (another akka-streams machinery to help with data transformation). In terms of the code, it does the following
(1) mapAsync & collect — asynchronously convert the incoming message into a single large String. For now, please ignore the (.flattenoperation) to keep the explanation simpler.
(2) to — takes the String message and echos it to the console.

Please hold off on thought of how to send this message to outgoing. We shall come to this after we understand the outgoing value.

Outgoing

val outgoing = Source
.actorRef[String](5, OverflowStrategy.dropHead)
.mapMaterializedValue(r ⇒ outgoingActor = r)
.map(m ⇒ TextMessage.Strict(m))

The outgoing is much more simpler than incoming. Since, we would like to control the connection logicality of the WS, we use an actor based Source. But, the actor itself accepts String messages and converts them into TextMessage.

The mapMaterializedValue in-between provides us with the magical handle to the actorRef of the Source.
The missing bit of the above incoming can use this actorRef to echo the message back to the caller.

Here is the complete route method that can help clear up its working.

def route(implicit log: LoggingAdapter): Route = {
import akka.http.scaladsl.server.Directives._
import akka.stream.scaladsl._
(get & path("ws")) {
var outgoingActor: ActorRef = null //placeholder hack.
val incoming: Sink[Message, NotUsed] = Flow[Message] //take the incoming message and make it into a string.
.mapAsync(1) { m ⇒
m.flatten(2.minutes)
}
.collect { case TextMessage.Strict(m) ⇒ m }
.to(Sink.foreach { s ⇒
log.info(s"Incoming message: ${s}")
outgoingActor ! s
})

val outgoing = Source
.actorRef[String](5, OverflowStrategy.dropHead)
.mapMaterializedValue(r ⇒ outgoingActor = r)
.map(m ⇒ TextMessage.Strict(m))

val flow: Flow[Message, Message, Any] = Flow.fromSinkAndSource(incoming, outgoing)
handleWebSocketMessages(flow)
}
}

In case if you would like the complete working code (including the creation of Actor et al which also includes the flatten operation omitted above) here is one…

import akka.NotUsed
import akka.actor._
import akka.event.LoggingAdapter
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws.{BinaryMessage, Message, TextMessage}
import akka.http.scaladsl.server.Route
import akka.http.scaladsl.settings.ServerSettings
import akka.stream.{ActorMaterializer, Materializer, OverflowStrategy}
import com.typesafe.config.ConfigFactory

import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.util.Try

object WSEchoServer extends App {

implicit class RichMessage(private val value: Message) extends AnyVal {
def flatten(duration: FiniteDuration)(implicit mat: Materializer): Future[Any] = {
value match {
case v: TextMessage => v.toStrict(duration)
case v: BinaryMessage => v.toStrict(duration)
case _ => Future.successful(NotUsed)
}
}
}

val config = ConfigFactory.load()
implicit val system = ActorSystem("ws-server-test", config)
implicit val log = system.log
implicit val mat: ActorMaterializer = ActorMaterializer()
sys.addShutdownHook {
system.log.info("Shutting down...")
system.terminate()
Try(Await.ready(system.whenTerminated, 5 minutes))
println("Done.")
}
Http().bindAndHandleAsync(
Route.asyncHandler(route),
"0.0.0.0",
8888,
settings = ServerSettings(system)
)
log.info("Ready!")

def route(implicit log: LoggingAdapter): Route = {
import akka.http.scaladsl.server.Directives._
import akka.stream.scaladsl._
(get & path("ws")) {
var outgoingActor: ActorRef = null //placeholder hack.
val incoming: Sink[Message, NotUsed] = Flow[Message] //take the incoming message and make it into a string.
.mapAsync(1) { m ⇒
m.flatten(2.minutes)
}
.collect { case TextMessage.Strict(m) ⇒ m }
.to(Sink.foreach { s ⇒
log.info(s"Incoming message: ${s}")
outgoingActor ! s
})

val outgoing = Source
.actorRef[String](5, OverflowStrategy.dropHead)
.mapMaterializedValue(r ⇒ outgoingActor = r)
.map(m ⇒ TextMessage.Strict(m))

val flow: Flow[Message, Message, Any] = Flow.fromSinkAndSource(incoming, outgoing)
handleWebSocketMessages(flow)
}
}

}

I have also written another example of using Akka-WS server using Akka Typedthat provides a bit more flexibility with managing incoming and outgoing messages using Actors at https://github.com/babloo80/akka-http-websocket

Using Akka WS as a Client

The machinery to use Akka WS as a client is almost identical to the code above on writing as a server.

val req: WebSocketRequest = WebSocketRequest("ws://localhost:8888/ws1")
val flow = ???
Http
().singleWebSocketRequest(req, flow)

The flow follows the same code as described in server above.

⛑ Suggestions / Feedback ! 😃

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store