Say, you've subscribed to a popular blogger who publishes articles about web development on Medium, and whenever this blogger shares a new post, Medium sends you a notification. How can you develop this functionality for your Scala application with GraphQL?
The answer is simple: you need to use GraphQL subscriptions to create a message stream and to continually push slices of the application state with messages to a subscriber through that stream whenever the state changes.
We review how you can implement GraphQL subscriptions in your Scala application using Play 2, Sangria, and Akka Streams.
An overview of a Scala application with GraphQL subscriptions
For a start, we need to clarify several aspects of how we can build our demo Scala application with GraphQL subscriptions.
To create a subscription channel, we can use either Server Sent Events (SSE) or the WebSocket protocol. In our implementation, we decided to use WebSockets to ensure stable communication between the client and our server as this approach. Unlike the use of SSE, it avoids creating multiple HTTP connections per client.
The following diagram gives an overview of the application structure.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
|
We used the following technology stack to build this application:
- Scala 2.12.8
- Sangria
- Play Framework 2.7
- Akka
- Monix
- WebSockets
You can view the code in this repository on GitHub.
Since our Scala API is built on top of Play Framework, you may want to read about working with WebSockets in Play 2 Scala API.
We also took advantage of a popular software design pattern called Observer to implement our solution for GraphQL subscriptions. You may want to refresh your memory on this pattern, too. Start here, if it is new for you.
Talking about our demo application, we should also say about the data we work with. Since we're talking about subscribing to new posts, we use the following data model to represent a Post entity:
1
case class Post(id: Option[Long] = None, title: String, content: String)
To whet your appetite, check out what a GraphQL subscription query looks like (you can actually run the query below for our demo Scala app right now):
1 2 3 4 5 6 7 |
|
Once a subscription query above is sent, the client application will subscribe to the changes in posts. Once the backend handles this query, the frontend will receive a notification whenever a new post is added:
1 2 3 4 5 6 7 8 9 10 11 12 |
|
Now we can focus on implementing subscriptions with Scala, Play, and WebSockets.
A general overview of GraphQL subscriptions with Scala
What's necessary to build a Scala GraphQL API with subscriptions?
First, there are ApplicationController
, WebSocketFlowActor
, and GraphQLHandler
classes. They're all stored under
app/controllers
and process GraphQL queries sent over HTTP and WebSockets. These we review first.
Next, we define a GraphQL schema for the Post entity with queries, mutations, and, most importantly, subscriptions.
Also, we create a publish-subscribe service. We need to subscribe to events, and so we use the Observer pattern, hence
you can look for the PubSubService
trait and its implementation in app/services
. (We know that Observer and PubSub
are same-same but different. But still same. It's just more common for us to name the service PubSubService
.)
A Scala server with GraphQL subscriptions
In this section, we explain how GraphQL subscriptions are handled over WebSockets in a Scala app.
It all starts with a route, a WebSocket connection URI '/graphql/subscribe/websockets'
, which you can look up in the
conf/routes
file. This is the entry point for all subscription queries. The "usual" queries and mutations are still
sent to '/graphql'
, though.
As you can also see in conf/routes
, the method that handles subscriptions is graphqlSubscriptionOverWebSocket()
from
AppController
. Here's it:
1 2 3 4 5 6 7 |
|
How does it work? We use WebSocket.accept
(provided by Play Framework) to collect data coming through a WebSocket
connection. The return type must also be WebSocket
, which you can understand as just a Flow
class. All messages
coming through a WebSocket connection will be dropped into this Flow
and the messages created by Flow
will be sent
back to the client.
Notice that our mechanism for handling client requests over WebSockets is based on Akka Streams. We use Play's utility
ActorFlow
to convert ActorRef
into a flow in order to handle all the messages from WebSockets with an actor.
ActorFlow.actorRef
accepts a function ActorRef => Properties
.
What's props()
? It's a method that returns an instance of the class Properties
with an actor instance that receives
messages sent to the client. We explain more about how it works in the next section.
The class WebSocketFlowActor
and companion object WebSocketActorFlow
Inside WebSocketFlowActor.scala
, you'll find a class WebSocketFlowActor
, which is an actor, and a companion object
with the same name. It is the companion object's method props()
that gets called in graphqlSubscriptionOverWebSocket()
.
As you can see below, props()
returns a new Props
object:
1 2 3 4 |
|
The class WebSocketFlowActor
, instantiated above in Props
, is designed to process all the messages that the client
application sends to our Scala server via WebSockets. In the parameters to WebSocketFlowActor
constructor, we pass:
outActor
, anActorRef
instance that accepts and redirects messages back to the WebSocket client;graphQL
, aGraphQL
instance that contains all schema descriptions in our Scala app;graphQLSubscriptions
, an object to store subscriptions created during a WebSocket connection; andcontrollerComponents
, an object that contains the components used by controllers.
This class also overrides the method receive()
provided by Actor
and uses it to accept GraphQL queries of the
String
type sent by clients over WebSockets.
In WebSocketFlowActor.receive()
, we parse and convert the incoming message into GraphQL components — query
,
operationName
, and variables
— before we execute the query. You can see the example of this below (we provide
only a part of the receive()
method implementation here):
1 2 3 4 5 6 7 8 |
|
The parseToGraphQLQuery()
method you can find in app/controllers/GraphQLHandler.scala
. Once message
is returned
from parseToGraphQLQuery()
, the key part of receive()
located below gets executed:
1 2 3 4 5 |
|
Here, we run executeQuery()
, the method implemented last in the WebSocketFlorActor
class. This method returns a new
source — an executed GraphQL subscription — that we map to a string in the last line. Finally, we run
Sink.actorRef
to flush the string to outActor
in order to respond to the subscription query that came through
a WebSocket connection.
What happens in the background, when executeQuery()
runs?
executeQuery()
returns a source (read: a stream) of type AkkaSource[JsValue]
, and this source is created once
the GraphQL subscription query gets executed by Executor.execute()
from Sangria.
1 2 3 4 5 6 7 8 9 |
|
The result of the code above is an object of type AkkaSource[JsValue]
, which is just a stream of JsValue
elements.
The source gets called here (recall this code from parseToGraphQLQuery()
:
1
|
|
Now each stream element will be redirected to outActor
as a string, and each element of the stream is redirected to
the client via WebSockets. In order to run the stream and materialize its values, we also connected the source to Sink
.
In case a stream is closed because of an error, the actor will receive the object PoisonPill
, which stops the actor
and closes the WebSocket connection. The source will also be closed.
Let's get back to the executeQuery()
method. You might have noticed the UserContext
class passed to the
Executor.execute()
. Basically, UserContext
stores subscriptions. That's why it goes hand in hand with
GraphQLSubscriptions
that we need to cancel if a WebSocket connection is closed, preventing the leak of resources.
How does it work? When a WebSocket connection is closed or interrupted, then the actor WebSocketFlowActor
stops. But
before it does, the callback postStop()
is called inside the actor in which the logic to stop streams is described.
An example of postStop()
is shown below and defined first in WebSocketFlowActor
.
1 2 3 |
|
GraphQL schema definition
In this section, we talk about the "second" part of our GraphQL subscription implementation with WebSockets. We define
a Post schema and create a publish-subscribe service to publish elements with the possibility to subscribe to updates.
First, we look at the schema in app/graphql/schemas/PostSchema
.
A Post schema with GraphQL subscription
As you can see in the code below, GraphQL subscriptions are described similarly to mutations and queries:
1 2 3 4 5 6 7 8 9 |
|
Notice that the resolve method uses a PubSubService
instance to subscribe to three type of events — addPost
,
editPost
, and deletePost
.
It's worth noting that in this example we handle updates for all posts. There's no code that handles updates of a specific post.
So, we have code that subscribes to three events. But how do we publish those events? In a mutation that corresponds to
a specific event, we publish an event using pubSubService.publish()
method:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
|
Whenever the addPost
mutation is sent from the client and some user has subscribed to this event before, the resolve
function will run pubSubService.publish(PostEvent(addPost, createdPost))
and return the created post.
As you can see, the published element is wrapped in the instance of PostEvent
, which contains the event name and the
element itself. In this case, we use the event name addPost
(which is a variable created in WebSocketFlowActor
).
We can now turn our attention to the publish-subscribe service that we created under app/services
. Let's review what's
going on there.
A Scala publish-subscribe service for GraphQL subscriptions
Our pubsub service consists of a trait, its implementation, and ActorRefObserver
all stored under in the services
folder. The key component of the subscription handling logic is PubSubService
, which implements the pattern Observer.
Let's have a closer look at the implementation in app/services/PubSubServiceImpl
.
Inside this service, we use the tools from the library Monix
, in particular, the class PublishSubject
. This is
the subject we can subscribe to. Instead of Monix, you can opt for such libraries as RxScala or Akka Streams; we stick
to Monix as it's quite simple to use.
Here's the created subject in PubSubServiceImpl
:
1
|
|
The instance of the class PublishSubject[T]
contains the list of subscribers — observers — to which
published events will be sent. This is what the subscribe
method looks like:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
|
The statement Source.actorRef[T]
creates a stream that's materialized as ActorRef
. Notice that the method
mapMaterializedValue()
is parametrized and accepts a function ActorRef => NotUsed
. Inside this function, we just get
all subscriptions ever created and subscribe to them with ActorRefObserver
.
Once the subject has subscribed, the method subscribe()
returns an instance of Cancelable
that we add into our list
of subscriptions — subs
. The list is located in UserContext
, which has the method cancel()
to let us cancel
subscriptions.
All the messages sent to actorRef
will be passed to the stream, and all the elements in the stream will be directed to
the client by a WebSocket connection. Using actorRef
allows us to redirect elements from subject
to Source
. For
this, we can use the method def subscribe(observer: Observer[A])
from PublishSubject
provided by Monix.
Here's our class ActorRefObserver
, which inherits Observer[T]
. As we can see from the method signature, it accepts
an instance of Observer:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
|
The method onNext(elem: T)
is called in each subscriber (ActionRefObserver
instances) inside the object publisher
when new events are published. The important aspect of this is that we must pass actorRef
, which we send events to,
into the constructor ActorRefObserver
.
Let's sum up. We implemented the interface Observer
. We put an instance of this interface into the method subscribe()
in the class PublishSubject
. The class contains a list of subscribers, and when we publish an event, each subscriber
will call its onNext()
method passing it the published event.
Next, using actorRef
, we send this event to the actor — back into the stream (a WebSocket connection). Once the
element was published, it is thrown into Source
and is filtered by the event name.
Each element of the stream must be wrapped into an instance of Action[Ctx, Val]
as specified in the
Sangria and GraphQL subscriptions documentation.
To publish events, we use the method PubSubService.publish()
:
1 2 3 4 5 |
|
Inside publish()
, we call onNext()
on the instance of PublishSubject
and pass the event into it.
Here's a diagram that demonstrates the flow of the published event and an element:
Testing WebSockets with GraphQL subscriptions in a Scala application
Now you can test out our Scala and GraphQL API by running subscription queries via WebSockets. To send queries, you may install a dedicated WebSocket client for Chrome.
Run the application with sbt run
and then connect to the Scala server using the link
ws://localhost:9000/graphql/subscribe/websockets
in the WebSocket client.
A subscription query looks like this:
1 2 3 4 5 6 7 8 9 10 |
|
In return, we get a serialized query from the server:
1
|
|
Now that you've subscribed to the events that can happen to posts, add a post using a mutation:
1 2 3 4 5 6 7 |
|
And since you've already subscribed to the event addPost
, you should see the following message:
1 2 3 4 5 6 7 8 9 10 11 12 |
|
Now try to update the existing post (the first post has the ID 1
):
1 2 3 4 5 6 7 |
|
Yet again, a JSON object is returned because we subscribed to the editPost
event:
1 2 3 4 5 6 7 8 9 10 11 12 |
|
Finally, remove the post:
1 2 3 4 5 6 7 |
|
And the result is this object:
1 2 3 4 5 6 7 8 9 10 11 12 |
|
That's how you can use GraphQL subscriptions in your Scala application using Sangria, Akka Streams, and Play Framework. Ask questions in the comments below if you want to know more.