Say, you’ve subscribed to a popular blogger who publishes articles about web development on Medium, and whenever this blogger shares a new post, Medium sends you a notification. How can you develop this functionality for your Scala application with GraphQL?
The answer is simple: you need to use GraphQL subscriptions to create a message stream and to continually push slices of the application state with messages to a subscriber through that stream whenever the state changes.
We review how you can implement GraphQL subscriptions in your Scala application using Play 2, Sangria, and Akka Streams.
An overview of a Scala application with GraphQL subscriptions
For a start, we need to clarify several aspects of how we can build our demo Scala application with GraphQL subscriptions.
To create a subscription channel, we can use either Server Sent Events (SSE) or the WebSocket protocol. In our implementation, we decided to use WebSockets to ensure stable communication between the client and our server as this approach. Unlike the use of SSE, it avoids creating multiple HTTP connections per client.
The following diagram gives an overview of the application structure.
scala-graphql-api
app # The Scala app source code with subscriptions
controllers # Contains app controllers
AppController # Main controller that executes subscriptions
GraphQLHandler # Parses GraphQL queries
WebSocketFlowActor # Creates a stream via WebSockets and executes subscriptions
errors # Error classes
graphql # GraphQL schemas and resolvers
resolvers # A folder with resolver methods to execute queries
schemas # Graphql schemas. In particular, PostSchema
GraphQL.scala # Defines global GraphQL-related objects
GraphQLSubscriptions # A class to store GraphQL subscriptions
UserContext # A GraphQL context class
models # Application models
modules # Application modules: PostModule, DBModule, and PubSubModule
repositories # Contains the trait PostRepository
services # Contains app services
ActorRefObserver # The ActorRef Observer class
PubSubService # A class that implements the Observer pattern
PubSubServiceImpl # PubSub service implementation
views # HTML layouts (a graphiql layout)
conf
project
public
test
.gitignore
.travis.yml
build.sbt
README.md
Code language: PHP (php)
We used the following technology stack to build this application:
- Scala 2.12.8
- Sangria
- Play Framework 2.7
- Akka
- Monix
- WebSockets
You can view the code in
this repository on GitHub.
Since our Scala API is built on top of Play Framework, you may want to read about working with WebSockets in Play 2 Scala API.
We also took advantage of a popular software design pattern called Observer to implement our solution for GraphQL subscriptions. You may want to refresh your memory on this pattern, too. Start here, if it is new for you.
Talking about our demo application, we should also say about the data we work with. Since we’re talking about subscribing to new posts, we use the following data model to represent a Post entity:
case class Post(id: Option[Long] = None, title: String, content: String)
To whet your appetite, check out what a GraphQL subscription query looks like (you can actually run the query below for our demo Scala app right now):
subscription PostsUpdated {
postsUpdated {
id
title
content
}
}
Once a subscription query above is sent, the client application will subscribe to the changes in posts. Once the backend handles this query, the frontend will receive a notification whenever a new post is added:
{
"data": {
"postsUpdated": {
"name": "addPost",
"post": {
"id": 1,
"title": "Test",
"content": "some content"
}
}
}
}
Code language: JSON / JSON with Comments (json)
Now we can focus on implementing subscriptions with Scala, Play, and WebSockets.
A general overview of GraphQL subscriptions with Scala
What’s necessary to build a Scala GraphQL API with subscriptions?
First, there are ApplicationController
, WebSocketFlowActor
, and GraphQLHandler
classes. They’re all stored under app/controllers
and process GraphQL queries sent over HTTP and WebSockets. These we review first.
Next, we define a GraphQL schema for the Post entity with queries, mutations, and, most importantly, subscriptions.
Also, we create a publish-subscribe service. We need to subscribe to events, and so we use the Observer pattern, hence you can look for the PubSubService
trait and its implementation in app/services
. (We know that Observer and PubSub are same-same but different. But still same. It’s just more common for us to name the service PubSubService
.)
A Scala server with GraphQL subscriptions
In this section, we explain how GraphQL subscriptions are handled over WebSockets in a Scala app.
It all starts with a route, a WebSocket connection URI '/graphql/subscribe/websockets'
, which you can look up in the conf/routes
file. This is the entry point for all subscription queries. The “usual” queries and mutations are still sent to '/graphql'
, though.
As you can also see in conf/routes
, the method that handles subscriptions is graphqlSubscriptionOverWebSocket()
from AppController
. Here’s it:
def graphqlSubscriptionOverWebSocket: WebSocket = WebSocket.accept[String, String] {
_: RequestHeader =>
ActorFlow.actorRef {
actorRef =>
WebSocketFlowActor.props(actorRef, graphQL, controllerComponents)
}
}
Code language: JavaScript (javascript)
How does it work? We use WebSocket.accept
(provided by Play Framework) to collect data coming through a WebSocket connection. The return type must also be WebSocket
, which you can understand as just a Flow
class. All messages coming through a WebSocket connection will be dropped into this Flow
and the messages created by Flow
will be sent back to the client.
Notice that our mechanism for handling client requests over WebSockets is based on Akka Streams. We use Play’s utility ActorFlow
to convert ActorRef
into a flow in order to handle all the messages from WebSockets with an actor. ActorFlow.actorRef
accepts a function ActorRef => Properties
.
What’s props()
? It’s a method that returns an instance of the class Properties
with an actor instance that receives messages sent to the client. We explain more about how it works in the next section.
The class WebSocketFlowActor
and companion object WebSocketActorFlow
Inside WebSocketFlowActor.scala
, you’ll find a class WebSocketFlowActor
, which is an actor, and a companion object with the same name. It is the companion object’s method props()
that gets called in graphqlSubscriptionOverWebSocket()
.
As you can see below, props()
returns a new Props
object:
def props(outActor: ActorRef, graphQL: GraphQL, controllerComponents: ControllerComponents)
(implicit ec: ExecutionContext, mat: Materializer): Props = {
Props(new WebSocketFlowActor(outActor, graphQL, GraphQLSubscriptions(), controllerComponents))
}
Code language: JavaScript (javascript)
The class WebSocketFlowActor
, instantiated above in Props
, is designed to process all the messages that the client application sends to our Scala server via WebSockets. In the parameters to WebSocketFlowActor
constructor, we pass:
outActor
, anActorRef
instance that accepts and redirects messages back to the WebSocket client;graphQL
, aGraphQL
instance that contains all schema descriptions in our Scala app;graphQLSubscriptions
, an object to store subscriptions created during a WebSocket connection; andcontrollerComponents
, an object that contains the components used by controllers.
This class also overrides the method receive()
provided by Actor
and uses it to accept GraphQL queries of the String
type sent by clients over WebSockets.
In WebSocketFlowActor.receive()
, we parse and convert the incoming message into GraphQL components — query
, operationName
, and variables
— before we execute the query. You can see the example of this below (we provide only a part of the receive()
method implementation here):
override def receive: Receive = {
case message: String =>
val maybeQuery = Try(Json.parse(message)) match {
case Success(json) => parseToGraphQLQuery(json)
case Failure(error) => throw new Error(s"Fail to parse the request body. Reason [$error]")
}
// ...
}
Code language: PHP (php)
The parseToGraphQLQuery()
method you can find in app/controllers/GraphQLHandler.scala
. Once message
is returned from parseToGraphQLQuery()
, the key part of receive()
located below gets executed:
val source: AkkaSource[JsValue] = maybeQuery match {
case Success((query, operationName, variables)) => executeQuery(query, graphQL, variables, operationName)
case Failure(error) => Source.single(JsString(error.getMessage))
}
source.map(_.toString).runWith(Sink.actorRef[String](outActor, PoisonPill))
Code language: JavaScript (javascript)
Here, we run executeQuery()
, the method implemented last in the WebSocketFlorActor
class. This method returns a new source — an executed GraphQL subscription — that we map to a string in the last line. Finally, we run Sink.actorRef
to flush the string to outActor
in order to respond to the subscription query that came through a WebSocket connection.
What happens in the background, when executeQuery()
runs?
executeQuery()
returns a source (read: a stream) of type AkkaSource[JsValue]
, and this source is created once the GraphQL subscription query gets executed by Executor.execute()
from Sangria.
import sangria.execution.ExecutionScheme.Stream
import sangria.streaming.akkaStreams._
val source: AkkaSource[JsValue] = Executor.execute(
schema = graphQL.Schema,
queryAst = queryAst,
variables = variables.getOrElse(Json.obj()),
userContext = UserContext(Some(graphQLSubscriptions))
)
Code language: JavaScript (javascript)
The result of the code above is an object of type AkkaSource[JsValue]
, which is just a stream of JsValue
elements.
The source gets called here (recall this code from parseToGraphQLQuery()
:
source.map(_.toString).runWith(Sink.actorRef[String](outActor, PoisonPill))
Code language: CSS (css)
Now each stream element will be redirected to outActor
as a string, and each element of the stream is redirected to the client via WebSockets. In order to run the stream and materialize its values, we also connected the source to Sink
.
In case a stream is closed because of an error, the actor will receive the object PoisonPill
, which stops the actor and closes the WebSocket connection. The source will also be closed.
Let’s get back to the executeQuery()
method. You might have noticed the UserContext
class passed to the Executor.execute()
. Basically, UserContext
stores subscriptions. That’s why it goes hand in hand with GraphQLSubscriptions
that we need to cancel if a WebSocket connection is closed, preventing the leak of resources.
How does it work? When a WebSocket connection is closed or interrupted, then the actor WebSocketFlowActor
stops. But before it does, the callback postStop()
is called inside the actor in which the logic to stop streams is described. An example of postStop()
is shown below and defined first in WebSocketFlowActor
.
override def postStop(): Unit {
graphQLSubscriptions.cancelAll()
}
GraphQL schema definition
In this section, we talk about the “second” part of our GraphQL subscription implementation with WebSockets. We define a Post schema and create a publish-subscribe service to publish elements with the possibility to subscribe to updates. First, we look at the schema in app/graphql/schemas/PostSchema
.
A Post schema with GraphQL subscription
As you can see in the code below, GraphQL subscriptions are described similarly to mutations and queries:
val Subscriptions: List[Field[UserContext, Unit]] = List(
Field.subs(
name = postsUpdated,
fieldType = PostEventType,
resolve = sangriaContext => {
pubSubService.subscribe(Seq(addPost, deletePost, editPost))(sangriaContext.ctx)
}
)
)
Code language: PHP (php)
Notice that the resolve method uses a PubSubService
instance to subscribe to three type of events — addPost
, editPost
, and deletePost
.
It’s worth noting that in this example we handle updates for all posts. There’s no code that handles updates of a specific post.
So, we have code that subscribes to three events. But how do we publish those events? In a mutation that corresponds to a specific event, we publish an event using pubSubService.publish()
method:
Field(
name = addPost,
fieldType = PostType,
arguments = List(
Argument("title", StringType),
Argument("content", StringType)
),
resolve = sangriaContext =>
postResolver.addPost(
sangriaContext.args.arg[String]("title"),
sangriaContext.args.arg[String]("content")
).map {
createdPost =>
pubSubService.publish(PostEvent(addPost, createdPost))
createdPost
}
)
Code language: JavaScript (javascript)
Whenever the addPost
mutation is sent from the client and some user has subscribed to this event before, the resolve function will run pubSubService.publish(PostEvent(addPost, createdPost))
and return the created post.
As you can see, the published element is wrapped in the instance of PostEvent
, which contains the event name and the element itself. In this case, we use the event name addPost
(which is a variable created in WebSocketFlowActor
).
We can now turn our attention to the publish-subscribe service that we created under app/services
. Let’s review what’s going on there.
A Scala publish-subscribe service for GraphQL subscriptions
Our pubsub service consists of a trait, its implementation, and ActorRefObserver
all stored under in the service
folder. The key component of the subscription handling logic is PubSubService
, which implements the pattern Observer. Let’s have a closer look at the implementation in app/services/PubSubServiceImpl
.
Inside this service, we use the tools from the library Monix
, in particular, the class PublishSubject
. This is the subject we can subscribe to. Instead of Monix, you can opt for such libraries as RxScala or Akka Streams; we stick to Monix as it’s quite simple to use.
Here’s the created subject in PubSubServiceImpl
:
private val subject: PublishSubject[T] = PublishSubject[T]
Code language: PHP (php)
The instance of the class PublishSubject[T]
contains the list of subscribers — observers — to which published events will be sent. This is what the subscribe
method looks like:
override def subscribe(eventNames: Seq[String])
(implicit userContext: UserContext): Source[Action[Nothing, T], NotUsed] = {
require(eventNames.nonEmpty)
Source
.actorRef[T](bufferSize, OverflowStrategy.dropHead)
.mapMaterializedValue {
actorRef =>
userContext.graphQlSubs.foreach {
subs =>
val cancelable = subject.subscribe(new ActorRefObserver[T](actorRef))
subs.add(cancelable)
}
NotUsed
}
.filter {
event =>
eventNames.contains(event.name)
}
.map {
event =>
log.info(s"Sending event [ $event ] to client ...")
Action(event)
}
}
Code language: PHP (php)
The statement Source.actorRef[T]
creates a stream that’s materialized as ActorRef
. Notice that the method mapMaterializedValue()
is parametrized and accepts a function ActorRef => NotUsed
. Inside this function, we just get all subscriptions ever created and subscribe to them with ActorRefObserver
.
Once the subject has subscribed, the method subscribe()
returns an instance of Cancelable
that we add into our list of subscriptions — subs
. The list is located in UserContext
, which has the method cancel()
to let us cancel subscriptions.
All the messages sent to actorRef
will be passed to the stream, and all the elements in the stream will be directed to the client by a WebSocket connection. Using actorRef
allows us to redirect elements from subject
to Source
. For this, we can use the method def subscribe(observer: Observer[A])
from PublishSubject
provided by Monix.
Here’s our class ActorRefObserver
, which inherits Observer[T]
. As we can see from the method signature, it accepts an instance of Observer:
case class ActorRefObserver[T](actorRef: ActorRef) extends Observer[T] {
override def onNext(elem: T): Future[Ack] = {
actorRef.tell(elem, noSender)
Continue
}
override def onError(ex: Throwable): Unit = {
log.info(s"Error has occurred. Reason: ${ex.getCause}")
}
override def onComplete(): Unit = {
log.info(s"Event stream has closed.")
}
}
The method onNext(elem: T)
is called in each subscriber (ActionRefObserver
instances) inside the object publisher
when new events are published. The important aspect of this is that we must pass actorRef
, which we send events to, into the constructor ActorRefObserver
.
Let’s sum up. We implemented the interface Observer
. We put an instance of this interface into the method subscribe()
in the class PublishSubject
. The class contains a list of subscribers, and when we publish an event, each subscriber will call its onNext()
method passing it the published event.
Next, using actorRef
, we send this event to the actor — back into the stream (a WebSocket connection). Once the element was published, it is thrown into Source
and is filtered by the event name.
Each element of the stream must be wrapped into an instance of Action[Ctx, Val]
as specified in the Sangria and GraphQL subscriptions documentation. To publish events, we use the method PubSubService.publish()
:
override def publish(event: T): Unit = {
publisher.onNext(event).map {
_ => Logger.debug(s"Event published: [$event]")
}
}
Code language: PHP (php)
Inside publish()
, we call onNext()
on the instance of PublishSubject
and pass the event into it.
Here’s a diagram that demonstrates the flow of the published event and an element:
Testing WebSockets with GraphQL subscriptions in a Scala application
Now you can test out our Scala and GraphQL API by running subscription queries via WebSockets. To send queries, you may install a dedicated WebSocket client for Chrome.
Run the application with sbt run
and then connect to the Scala server using the link ws://localhost:9000/graphql/subscribe/websockets
in the WebSocket client.
A subscription query looks like this:
subscription PostsUpdated {
postsUpdated{
name
post{
id
title
content
}
}
}
In return, we get a serialized query from the server:
{"query":"subscription PostsUpdated {\n postsUpdated{\n name\n post{\n id\n title\n content\n }\n }\n}","variables":"null","operationName":"PostsUpdated"}
Code language: JSON / JSON with Comments (json)
Now that you’ve subscribed to the events that can happen to posts, add a post using a mutation:
mutation NewPost {
addPost(title: "Juno Reactor", content: "Navras"){
id
title
content
}
}
Code language: JavaScript (javascript)
And since you’ve already subscribed to the event addPost
, you should see the following message:
{
"data":{
"postsUpdated":{
"name":"addPost",
"post":{
"id":1,
"title":"Juno Reactor",
"content":"Navras"
}
}
}
}
Code language: JSON / JSON with Comments (json)
Now try to update the existing post (the first post has the ID 1
):
mutation EditPost {
editPost(id: 1, title: "Juno Reactor", content: "Nitrogen") {
id
title
content
}
}
Code language: JavaScript (javascript)
Yet again, a JSON object is returned because we subscribed to the editPost
event:
{
"data":{
"postsUpdated":{
"name":"editPost",
"post":{
"id":1,
"title":"New Title",
"content":"new content"
}
}
}
}
Code language: JSON / JSON with Comments (json)
Finally, remove the post:
mutation DeletePost{
deletePost(id: 1){
id
title
content
}
}
And the result is this object:
{
"data":{
"postsUpdated":{
"name":"deletePost",
"post":{
"id":1,
"title":"New Title",
"content":"new content"
}
}
}
}
Code language: JSON / JSON with Comments (json)
That’s how you can use GraphQL subscriptions in your Scala application using Sangria, Akka Streams, and Play Framework. Ask questions in the comments below if you want to know more.