Learning to tame akka-http (WebSocket)
Please feel free to skip down the introductory section if you would like to directly jump into creating a WS Server route (or) be a client of WS using akka.
Please note, akka-http
uses akka-stream
as its underlying machinery. Hence, knowledge of akka-streams
is useful. But, I shall try to keep the details as simple as I could.
Introduction
WebSocket is one of the most flexible protocols in the world of HTTP. Reason being,
a. Every request-response does not have to be in the overhead of a new connection
b. The mechanism of communication does not have to be a request-response. Meaning, it could be a single request and many responses or vice-verse.
c. The request to response can be based on long batch, where the response may happen after a long time.
d. Every end-point of service in WebSocket is not bound by many routes (which is generally a case with HTTP REST is used), i.e, a double edged sword, where the same end-point can be used to service many request-response schema.
This poses a challenge to support heterogeneous schema during the parse of a request. This is achieved by using “message-type” attribute abstraction (in the case of JSON) for every incoming message.
For ProtoBuf, its built onto the message itself so no need of additional “message-type”.
e. If a middleware (like NodeJS) is being used, the connections to the WebSocket can be multiplexed (to reduce the number of connections opened between these applications)
This can be achieved by adding an attribute to the message called “request-id”. This can provide a unique identifier for messages from middleware.
💡In fact, gRPC follows some of these benefits mentioned in WebSocket.
Using Akka WS as a Server
As for a complete working application, I have provided a link at the end of the post.
akka-http
provides routes
as an abstraction for handling incoming web-socket requests.
//place holder... create an actor system here
implicit val mat: ActorMaterializer = ActorMaterializer()
Http().bindAndHandleAsync(
route,
"0.0.0.0",
8080,
settings = ServerSettings(system)
)
def route = ???
Now the route
definition itself…
def route: Route = {
import akka.http.scaladsl.server.Directives._
(get & path("ws")) {
val incoming: Sink[Message, NotUsed] = ???
val outgoing: Source[Message, Unit] = ???
val flow: Flow[Message, Message, Any] = Flow.fromSinkAndSource(incoming, outgoing)
handleWebSocketMessages(flow)
}
}
The first part of writing an WS Server is to understand the use of akka-streams
.
Visualize that incoming messages are Sink
and outgoing messages are Source
. This does provides couple of subtle machineries.
i) When there are no more Source Message, we are instructing the stream to implicitly end. Meaning, the WS would close.
ii) The messages for Source and Sink needs to be of Message
type. The explicitly supported concrete types are
- TextMessage
- BinaryMessage
For the remainder part of the discussion, let’s consider using TextMessage.
iii) Due to the way back-pressure can be managed by akka-streams
, incoming messages can be streamed entries.
Meaning, they may not be TextMessage.Strict
. In the sense, the data may be arriving in multiple frames. This is pretty useful when the an incoming message is large and needs to be processed as a stream. TextMessage.toStrict
provides a way to make Sink message as one large String
.
iv) Since, we have the tooling of akka-streams
, it would be a simple exercise to make the Incoming and Outgoing handles as Actors (in order to provide simple abstractions to the caller code).
In order to better understand the implementation of incoming
and outgoing
message(s), let’s try to write a simple Echo WS Server that shall echo back any incoming message to the caller.
We shall start by filling the blank of incoming
and then the outgoing
values.
Incoming
val incoming: Sink[Message, NotUsed] = Flow[Message] //take the incoming message and make it into a string.
.mapAsync(1) { m ⇒
m.flatten(2.minutes)
}
.collect { case TextMessage.Strict(m) ⇒ m }
.to(Sink.foreach { s ⇒
log.info(s"Incoming message: ${s}")
})
In the incoming Sink
,we build it using Flow
(another akka-streams
machinery to help with data transformation). In terms of the code, it does the following
(1) mapAsync
& collect
— asynchronously convert the incoming message into a single large String. For now, please ignore the (.flatten
operation) to keep the explanation simpler.
(2) to
— takes the String message and echos it to the console.
Please hold off on thought of how to send this message to outgoing
. We shall come to this after we understand the outgoing
value.
Outgoing
val outgoing = Source
.actorRef[String](5, OverflowStrategy.dropHead)
.mapMaterializedValue(r ⇒ outgoingActor = r)
.map(m ⇒ TextMessage.Strict(m))
The outgoing is much more simpler than incoming. Since, we would like to control the connection logicality of the WS, we use an actor
based Source. But, the actor itself accepts String
messages and converts them into TextMessage
.
The mapMaterializedValue
in-between provides us with the magical handle to the actorRef
of the Source.
The missing bit of the above incoming can use this actorRef to echo the message back to the caller.
Here is the complete route
method that can help clear up its working.
def route(implicit log: LoggingAdapter): Route = {
import akka.http.scaladsl.server.Directives._
import akka.stream.scaladsl._
(get & path("ws")) {
var outgoingActor: ActorRef = null //placeholder hack.
val incoming: Sink[Message, NotUsed] = Flow[Message] //take the incoming message and make it into a string.
.mapAsync(1) { m ⇒
m.flatten(2.minutes)
}
.collect { case TextMessage.Strict(m) ⇒ m }
.to(Sink.foreach { s ⇒
log.info(s"Incoming message: ${s}")
outgoingActor ! s
})
val outgoing = Source
.actorRef[String](5, OverflowStrategy.dropHead)
.mapMaterializedValue(r ⇒ outgoingActor = r)
.map(m ⇒ TextMessage.Strict(m))
val flow: Flow[Message, Message, Any] = Flow.fromSinkAndSource(incoming, outgoing)
handleWebSocketMessages(flow)
}
}
In case if you would like the complete working code (including the creation of Actor et al which also includes the flatten
operation omitted above) here is one…
import akka.NotUsed
import akka.actor._
import akka.event.LoggingAdapter
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws.{BinaryMessage, Message, TextMessage}
import akka.http.scaladsl.server.Route
import akka.http.scaladsl.settings.ServerSettings
import akka.stream.{ActorMaterializer, Materializer, OverflowStrategy}
import com.typesafe.config.ConfigFactory
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.util.Try
object WSEchoServer extends App {
implicit class RichMessage(private val value: Message) extends AnyVal {
def flatten(duration: FiniteDuration)(implicit mat: Materializer): Future[Any] = {
value match {
case v: TextMessage => v.toStrict(duration)
case v: BinaryMessage => v.toStrict(duration)
case _ => Future.successful(NotUsed)
}
}
}
val config = ConfigFactory.load()
implicit val system = ActorSystem("ws-server-test", config)
implicit val log = system.log
implicit val mat: ActorMaterializer = ActorMaterializer()
sys.addShutdownHook {
system.log.info("Shutting down...")
system.terminate()
Try(Await.ready(system.whenTerminated, 5 minutes))
println("Done.")
}
Http().bindAndHandleAsync(
Route.asyncHandler(route),
"0.0.0.0",
8888,
settings = ServerSettings(system)
)
log.info("Ready!")
def route(implicit log: LoggingAdapter): Route = {
import akka.http.scaladsl.server.Directives._
import akka.stream.scaladsl._
(get & path("ws")) {
var outgoingActor: ActorRef = null //placeholder hack.
val incoming: Sink[Message, NotUsed] = Flow[Message] //take the incoming message and make it into a string.
.mapAsync(1) { m ⇒
m.flatten(2.minutes)
}
.collect { case TextMessage.Strict(m) ⇒ m }
.to(Sink.foreach { s ⇒
log.info(s"Incoming message: ${s}")
outgoingActor ! s
})
val outgoing = Source
.actorRef[String](5, OverflowStrategy.dropHead)
.mapMaterializedValue(r ⇒ outgoingActor = r)
.map(m ⇒ TextMessage.Strict(m))
val flow: Flow[Message, Message, Any] = Flow.fromSinkAndSource(incoming, outgoing)
handleWebSocketMessages(flow)
}
}
}
I have also written another example of using Akka-WS server using Akka Typed
that provides a bit more flexibility with managing incoming and outgoing messages using Actors at https://github.com/babloo80/akka-http-websocket
Using Akka WS as a Client
The machinery to use Akka WS as a client is almost identical to the code above on writing as a server.
val req: WebSocketRequest = WebSocketRequest("ws://localhost:8888/ws1")
val flow = ???
Http().singleWebSocketRequest(req, flow)
The flow
follows the same code as described in server above.
⛑ Suggestions / Feedback ! 😃