'GraphQL Subscriptions on WebSockets for Scala applications' post illustration

GraphQL Subscriptions on WebSockets for Scala applications

avatar

15 May,

2019

Say, you've subscribed to a popular blogger who publishes articles about web development on Medium, and whenever this blogger shares a new post, Medium sends you a notification. How can you develop this functionality for your Scala application with GraphQL?

The answer is simple: you need to use GraphQL subscriptions to create a message stream and to continually push slices of the application state with messages to a subscriber through that stream whenever the state changes.

We review how you can implement GraphQL subscriptions in your Scala application using Play 2, Sangria, and Akka Streams.

An overview of a Scala application with GraphQL subscriptions

For a start, we need to clarify several aspects of how we can build our demo Scala application with GraphQL subscriptions.

To create a subscription channel, we can use either Server Sent Events (SSE) or the WebSocket protocol. In our implementation, we decided to use WebSockets to ensure stable communication between the client and our server as this approach. Unlike the use of SSE, it avoids creating multiple HTTP connections per client.

The following diagram gives an overview of the application structure.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
scala-graphql-api
  app                       # The Scala app source code with subscriptions
    controllers             # Contains app controllers
      AppController         # Main controller that executes subscriptions
      GraphQLHandler        # Parses GraphQL queries
      WebSocketFlowActor    # Creates a stream via WebSockets and executes subscriptions
    errors                  # Error classes
    graphql                 # GraphQL schemas and resolvers
      resolvers             # A folder with resolver methods to execute queries
      schemas               # Graphql schemas. In particular, PostSchema
      GraphQL.scala         # Defines global GraphQL-related objects
      GraphQLSubscriptions  # A class to store GraphQL subscriptions
      UserContext           # A GraphQL context class
    models                  # Application models
    modules                 # Application modules: PostModule, DBModule, and PubSubModule
    repositories            # Contains the trait PostRepository
    services                # Contains app services
      ActorRefObserver      # The ActorRef Observer class
      PubSubService         # A class that implements the Observer pattern
      PubSubServiceImpl     # PubSub service implementation 
    views                   # HTML layouts (a graphiql layout)
  conf
  project
  public
  test
  .gitignore
  .travis.yml
  build.sbt
  README.md

We used the following technology stack to build this application:

  • Scala 2.12.8
  • Sangria
  • Play Framework 2.7
  • Akka
  • Monix
  • WebSockets

You can view the code in this repository on GitHub.

Since our Scala API is built on top of Play Framework, you may want to read about working with WebSockets in Play 2 Scala API.

We also took advantage of a popular software design pattern called Observer to implement our solution for GraphQL subscriptions. You may want to refresh your memory on this pattern, too. Start here, if it is new for you.

Talking about our demo application, we should also say about the data we work with. Since we're talking about subscribing to new posts, we use the following data model to represent a Post entity:

1
case class Post(id: Option[Long] = None, title: String, content: String)

To whet your appetite, check out what a GraphQL subscription query looks like (you can actually run the query below for our demo Scala app right now):

1
2
3
4
5
6
7
subscription PostsUpdated {
  postsUpdated {
    id
    title
    content
  }
}

Once a subscription query above is sent, the client application will subscribe to the changes in posts. Once the backend handles this query, the frontend will receive a notification whenever a new post is added:

1
2
3
4
5
6
7
8
9
10
11
12
{
  "data": {
    "postsUpdated": {
      "name": "addPost",
      "post": {
        "id": 1,
        "title": "Test",
        "content": "some content"
      }
    }
  }
}

Now we can focus on implementing subscriptions with Scala, Play, and WebSockets.

A general overview of GraphQL subscriptions with Scala

What's necessary to build a Scala GraphQL API with subscriptions?

First, there are ApplicationController, WebSocketFlowActor, and GraphQLHandler classes. They're all stored under app/controllers and process GraphQL queries sent over HTTP and WebSockets. These we review first.

Next, we define a GraphQL schema for the Post entity with queries, mutations, and, most importantly, subscriptions.

Also, we create a publish-subscribe service. We need to subscribe to events, and so we use the Observer pattern, hence you can look for the PubSubService trait and its implementation in app/services. (We know that Observer and PubSub are same-same but different. But still same. It's just more common for us to name the service PubSubService.)

A Scala server with GraphQL subscriptions

In this section, we explain how GraphQL subscriptions are handled over WebSockets in a Scala app.

It all starts with a route, a WebSocket connection URI '/graphql/subscribe/websockets', which you can look up in the conf/routes file. This is the entry point for all subscription queries. The "usual" queries and mutations are still sent to '/graphql', though.

As you can also see in conf/routes, the method that handles subscriptions is graphqlSubscriptionOverWebSocket() from AppController. Here's it:

1
2
3
4
5
6
7
def graphqlSubscriptionOverWebSocket: WebSocket = WebSocket.accept[String, String] {
  _: RequestHeader =>
    ActorFlow.actorRef {
      actorRef =>
        WebSocketFlowActor.props(actorRef, graphQL, controllerComponents)
    }
}

How does it work? We use WebSocket.accept (provided by Play Framework) to collect data coming through a WebSocket connection. The return type must also be WebSocket, which you can understand as just a Flow class. All messages coming through a WebSocket connection will be dropped into this Flow and the messages created by Flow will be sent back to the client.

Notice that our mechanism for handling client requests over WebSockets is based on Akka Streams. We use Play's utility ActorFlow to convert ActorRef into a flow in order to handle all the messages from WebSockets with an actor. ActorFlow.actorRef accepts a function ActorRef => Properties.

What's props()? It's a method that returns an instance of the class Properties with an actor instance that receives messages sent to the client. We explain more about how it works in the next section.

The class WebSocketFlowActor and companion object WebSocketActorFlow

Inside WebSocketFlowActor.scala, you'll find a class WebSocketFlowActor, which is an actor, and a companion object with the same name. It is the companion object's method props() that gets called in graphqlSubscriptionOverWebSocket().

As you can see below, props() returns a new Props object:

1
2
3
4
def props(outActor: ActorRef, graphQL: GraphQL, controllerComponents: ControllerComponents)
         (implicit ec: ExecutionContext, mat: Materializer): Props = {
  Props(new WebSocketFlowActor(outActor, graphQL, GraphQLSubscriptions(), controllerComponents))
}

The class WebSocketFlowActor, instantiated above in Props, is designed to process all the messages that the client application sends to our Scala server via WebSockets. In the parameters to WebSocketFlowActor constructor, we pass:

  • outActor, an ActorRef instance that accepts and redirects messages back to the WebSocket client;
  • graphQL, a GraphQL instance that contains all schema descriptions in our Scala app;
  • graphQLSubscriptions, an object to store subscriptions created during a WebSocket connection; and
  • controllerComponents, an object that contains the components used by controllers.

This class also overrides the method receive() provided by Actor and uses it to accept GraphQL queries of the String type sent by clients over WebSockets.

In WebSocketFlowActor.receive(), we parse and convert the incoming message into GraphQL components — query, operationName, and variables — before we execute the query. You can see the example of this below (we provide only a part of the receive() method implementation here):

1
2
3
4
5
6
7
8
override def receive: Receive = {
 case message: String =>
   val maybeQuery = Try(Json.parse(message)) match {
     case Success(json) => parseToGraphQLQuery(json)
     case Failure(error) => throw new Error(s"Fail to parse the request body. Reason [$error]")
   }
 // ...
}

The parseToGraphQLQuery() method you can find in app/controllers/GraphQLHandler.scala. Once message is returned from parseToGraphQLQuery(), the key part of receive() located below gets executed:

1
2
3
4
5
val source: AkkaSource[JsValue] = maybeQuery match {
  case Success((query, operationName, variables)) => executeQuery(query, graphQL, variables, operationName)
  case Failure(error) => Source.single(JsString(error.getMessage))
}
source.map(_.toString).runWith(Sink.actorRef[String](outActor, PoisonPill))

Here, we run executeQuery(), the method implemented last in the WebSocketFlorActor class. This method returns a new source — an executed GraphQL subscription — that we map to a string in the last line. Finally, we run Sink.actorRef to flush the string to outActor in order to respond to the subscription query that came through a WebSocket connection.

What happens in the background, when executeQuery() runs?

executeQuery() returns a source (read: a stream) of type AkkaSource[JsValue], and this source is created once the GraphQL subscription query gets executed by Executor.execute() from Sangria.

1
2
3
4
5
6
7
8
9
import sangria.execution.ExecutionScheme.Stream
import sangria.streaming.akkaStreams._

val source: AkkaSource[JsValue] = Executor.execute(
  schema = graphQL.Schema,
  queryAst = queryAst,
  variables = variables.getOrElse(Json.obj()),
  userContext = UserContext(Some(graphQLSubscriptions))
)

The result of the code above is an object of type AkkaSource[JsValue], which is just a stream of JsValue elements. The source gets called here (recall this code from parseToGraphQLQuery():

1
source.map(_.toString).runWith(Sink.actorRef[String](outActor, PoisonPill))

Now each stream element will be redirected to outActor as a string, and each element of the stream is redirected to the client via WebSockets. In order to run the stream and materialize its values, we also connected the source to Sink.

In case a stream is closed because of an error, the actor will receive the object PoisonPill, which stops the actor and closes the WebSocket connection. The source will also be closed.

Let's get back to the executeQuery() method. You might have noticed the UserContext class passed to the Executor.execute(). Basically, UserContext stores subscriptions. That's why it goes hand in hand with GraphQLSubscriptions that we need to cancel if a WebSocket connection is closed, preventing the leak of resources.

How does it work? When a WebSocket connection is closed or interrupted, then the actor WebSocketFlowActor stops. But before it does, the callback postStop() is called inside the actor in which the logic to stop streams is described. An example of postStop() is shown below and defined first in WebSocketFlowActor.

1
2
3
override def postStop(): Unit {
  graphQLSubscriptions.cancelAll()
}

GraphQL schema definition

In this section, we talk about the "second" part of our GraphQL subscription implementation with WebSockets. We define a Post schema and create a publish-subscribe service to publish elements with the possibility to subscribe to updates. First, we look at the schema in app/graphql/schemas/PostSchema.

A Post schema with GraphQL subscription

As you can see in the code below, GraphQL subscriptions are described similarly to mutations and queries:

1
2
3
4
5
6
7
8
9
val Subscriptions: List[Field[UserContext, Unit]] = List(
  Field.subs(
    name = postsUpdated,
    fieldType = PostEventType,
    resolve = sangriaContext => {
      pubSubService.subscribe(Seq(addPost, deletePost, editPost))(sangriaContext.ctx)
    }
  )
)

Notice that the resolve method uses a PubSubService instance to subscribe to three type of events — addPost, editPost, and deletePost.

It's worth noting that in this example we handle updates for all posts. There's no code that handles updates of a specific post.

So, we have code that subscribes to three events. But how do we publish those events? In a mutation that corresponds to a specific event, we publish an event using pubSubService.publish() method:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Field(
  name = addPost,
  fieldType = PostType,
  arguments = List(
    Argument("title", StringType),
    Argument("content", StringType)
  ),
  resolve = sangriaContext =>
    postResolver.addPost(
      sangriaContext.args.arg[String]("title"),
      sangriaContext.args.arg[String]("content")
    ).map {
      createdPost =>
        pubSubService.publish(PostEvent(addPost, createdPost))
        createdPost

    }
)

Whenever the addPost mutation is sent from the client and some user has subscribed to this event before, the resolve function will run pubSubService.publish(PostEvent(addPost, createdPost)) and return the created post.

As you can see, the published element is wrapped in the instance of PostEvent, which contains the event name and the element itself. In this case, we use the event name addPost (which is a variable created in WebSocketFlowActor).

We can now turn our attention to the publish-subscribe service that we created under app/services. Let's review what's going on there.

A Scala publish-subscribe service for GraphQL subscriptions

Our pubsub service consists of a trait, its implementation, and ActorRefObserver all stored under in the services folder. The key component of the subscription handling logic is PubSubService, which implements the pattern Observer. Let's have a closer look at the implementation in app/services/PubSubServiceImpl.

Inside this service, we use the tools from the library Monix, in particular, the class PublishSubject. This is the subject we can subscribe to. Instead of Monix, you can opt for such libraries as RxScala or Akka Streams; we stick to Monix as it's quite simple to use.

Here's the created subject in PubSubServiceImpl:

1
private val subject: PublishSubject[T] = PublishSubject[T]

The instance of the class PublishSubject[T] contains the list of subscribers — observers — to which published events will be sent. This is what the subscribe method looks like:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
override def subscribe(eventNames: Seq[String])
                        (implicit userContext: UserContext): Source[Action[Nothing, T], NotUsed] = {
  require(eventNames.nonEmpty)
  Source
    .actorRef[T](bufferSize, OverflowStrategy.dropHead)
    .mapMaterializedValue {
      actorRef =>
        userContext.graphQlSubs.foreach {
          subs =>
            val cancelable = subject.subscribe(new ActorRefObserver[T](actorRef))
            subs.add(cancelable)
        }
        NotUsed
    }
    .filter {
      event =>
        eventNames.contains(event.name)
    }
    .map {
      event =>
        log.info(s"Sending event [ $event ] to client ...")
        Action(event)
    }
}

The statement Source.actorRef[T] creates a stream that's materialized as ActorRef. Notice that the method mapMaterializedValue() is parametrized and accepts a function ActorRef => NotUsed. Inside this function, we just get all subscriptions ever created and subscribe to them with ActorRefObserver.

Once the subject has subscribed, the method subscribe() returns an instance of Cancelable that we add into our list of subscriptions — subs. The list is located in UserContext, which has the method cancel() to let us cancel subscriptions.

All the messages sent to actorRef will be passed to the stream, and all the elements in the stream will be directed to the client by a WebSocket connection. Using actorRef allows us to redirect elements from subject to Source. For this, we can use the method def subscribe(observer: Observer[A]) from PublishSubject provided by Monix.

Here's our class ActorRefObserver, which inherits Observer[T]. As we can see from the method signature, it accepts an instance of Observer:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
case class ActorRefObserver[T](actorRef: ActorRef) extends Observer[T] {
  override def onNext(elem: T): Future[Ack] = {
    actorRef.tell(elem, noSender)
    Continue
  }

  override def onError(ex: Throwable): Unit = {
    log.info(s"Error has occurred. Reason: ${ex.getCause}")
  }

  override def onComplete(): Unit = {
    log.info(s"Event stream has closed.")
  }
}

The method onNext(elem: T) is called in each subscriber (ActionRefObserver instances) inside the object publisher when new events are published. The important aspect of this is that we must pass actorRef, which we send events to, into the constructor ActorRefObserver.

Let's sum up. We implemented the interface Observer. We put an instance of this interface into the method subscribe() in the class PublishSubject. The class contains a list of subscribers, and when we publish an event, each subscriber will call its onNext() method passing it the published event.

Next, using actorRef, we send this event to the actor — back into the stream (a WebSocket connection). Once the element was published, it is thrown into Source and is filtered by the event name.

Each element of the stream must be wrapped into an instance of Action[Ctx, Val] as specified in the Sangria and GraphQL subscriptions documentation. To publish events, we use the method PubSubService.publish():

1
2
3
4
5
override def publish(event: T): Unit = {
  publisher.onNext(event).map {
    _ => Logger.debug(s"Event published: [$event]")
  }
}

Inside publish(), we call onNext() on the instance of PublishSubject and pass the event into it.

Here's a diagram that demonstrates the flow of the published event and an element:

The flow of published events in GraphQL subscriptions and Scala app built with Sangria

Testing WebSockets with GraphQL subscriptions in a Scala application

Now you can test out our Scala and GraphQL API by running subscription queries via WebSockets. To send queries, you may install a dedicated WebSocket client for Chrome.

Run the application with sbt run and then connect to the Scala server using the link ws://localhost:9000/graphql/subscribe/websockets in the WebSocket client.

A subscription query looks like this:

1
2
3
4
5
6
7
8
9
10
subscription PostsUpdated {
  postsUpdated{
    name
    post{
      id
      title
      content
    }
  }
}

In return, we get a serialized query from the server:

1
{"query":"subscription PostsUpdated {\n  postsUpdated{\n    name\n    post{\n      id\n      title\n      content\n    }\n  }\n}","variables":"null","operationName":"PostsUpdated"}

Now that you've subscribed to the events that can happen to posts, add a post using a mutation:

1
2
3
4
5
6
7
mutation NewPost {
  addPost(title: "Juno Reactor", content: "Navras"){
    id
    title
    content
  }
}

And since you've already subscribed to the event addPost, you should see the following message:

1
2
3
4
5
6
7
8
9
10
11
12
{
  "data":{
    "postsUpdated":{
      "name":"addPost",
      "post":{
        "id":1,
        "title":"Juno Reactor",
        "content":"Navras"
      }
    }
  }
}

Now try to update the existing post (the first post has the ID 1):

1
2
3
4
5
6
7
mutation EditPost {
  editPost(id: 1, title: "Juno Reactor", content: "Nitrogen") {
    id
    title
    content
  }
}

Yet again, a JSON object is returned because we subscribed to the editPost event:

1
2
3
4
5
6
7
8
9
10
11
12
{
  "data":{
    "postsUpdated":{
      "name":"editPost",
      "post":{
        "id":1,
        "title":"New Title",
        "content":"new content"
      }
    }
  }
}

Finally, remove the post:

1
2
3
4
5
6
7
mutation DeletePost{
  deletePost(id: 1){
    id
    title
    content
  }
}

And the result is this object:

1
2
3
4
5
6
7
8
9
10
11
12
{
  "data":{
    "postsUpdated":{
      "name":"deletePost",
      "post":{
        "id":1,
        "title":"New Title",
        "content":"new content"
      }
    }
  }
}

That's how you can use GraphQL subscriptions in your Scala application using Sangria, Akka Streams, and Play Framework. Ask questions in the comments below if you want to know more.

If you're looking for a developer or considering starting a new project,
we are always ready to help!

Comments