'Example of Using Tapir with Akka-HTTP Interpreter, Quill and Macwire' post illustration

Example of Using Tapir with Akka-HTTP Interpreter, Quill and Macwire

avatar

Tapir - cutting-edge library for endpoint description, compatible with many well-known libraries like Akka-Http, Play, etc. Compared with defining endpoints via Akka-Http routing DSL, it’s much easier to define and describe endpoints and use this description to create OpenAPI or AsyncAPI documentation. Examples I found were too short and separate from each other, so I decided to make an example, which combines Tapir features, and it’s closer to real projects.

Features, described in this article:

We will make an abstract kind of store, where users can register and log in, view store products, order them and view their own orders. Also, there are admins, which can create, update and delete products, view orders, update their statuses and remove them. Users can’t access admin endpoints and vice versa.

Technology stack:

  • Scala 2.13
  • Tapir 1.0.3 (required Java 11+)
  • Sttp 3.7.2 (required Java 11+, in example used in tests)
  • Akka-Http
  • Quill 4.2.0
  • Circe json 0.14.2
  • Macwire 2.5.7
  • Scalatest, Mockito (testing)

You can find project sources in this GitHub repository

Dependencies list:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
val AkkaHttpVersion = "10.2.9"
val TapirVersion = "1.0.3"
val SttpVersion = "3.7.2"
val CirceVersion = "0.14.2"

"io.getquill" %% "quill-jdbc" % "4.2.0",
"org.postgresql" % "postgresql" % "42.3.6",
"com.softwaremill.macwire" %% "macros" % "2.5.7" % "provided",
"io.jsonwebtoken" % "jjwt" % "0.9.1",
"org.mindrot" % "jbcrypt" % "0.4",
"com.typesafe.scala-logging" %% "scala-logging" % "3.9.5",
"org.scalatestplus.play" %% "scalatestplus-play" % "5.1.0" % Test,
"com.softwaremill.sttp.tapir" %% "tapir-sttp-stub-server" % "1.0.4" % Test,
"org.scalatestplus" %% "mockito-4-6" % "3.2.13.0" % Test,
"com.softwaremill.sttp.tapir" %% "tapir-core" % TapirVersion,
"com.softwaremill.sttp.tapir" %% "tapir-sttp-client" % TapirVersion,
"com.softwaremill.sttp.tapir" %% "tapir-json-circe" % TapirVersion,
"com.softwaremill.sttp.tapir" %% "tapir-akka-http-server" % TapirVersion,
"com.softwaremill.sttp.tapir" %% "tapir-prometheus-metrics" % TapirVersion,
"com.softwaremill.sttp.tapir" %% "tapir-swagger-ui-bundle" % TapirVersion,
"io.circe" %% "circe-core" % CirceVersion,
"io.circe" %% "circe-generic" % CirceVersion,
"io.circe" %% "circe-generic-extras" % CirceVersion,
"io.circe" %% "circe-shapes" % CirceVersion,
"com.softwaremill.sttp.client3" %% "akka-http-backend" % SttpVersion,
"com.softwaremill.sttp.client3" %% "circe" % SttpVersion,
"com.typesafe.akka" %% "akka-http" % AkkaHttpVersion,
"ch.qos.logback" % "logback-classic" % "1.2.11",
"org.apache.logging.log4j" % "log4j-core" % "2.18.0"

Describing first Tapir endpoint

Let’s describe our first test endpoint:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import sttp.model.StatusCode
import sttp.tapir._


val tapirEndpoint = endpoint.get // http type
  .description("test endpoint") // endpoint's description
  .in("test".description("endpoint path")) // description for uri path, /test uri
  .out(stringBody
    .description("type of response")) // This endpoint will return string body. Also, description for body
  .out(statusCode(StatusCode.Created)
    .description("Specifies response status code for success case")) // Description for result status code
  .serverLogic { _ =>
    Future(Right(s"test ok response")) // response in Right for success, left for error
  }

We specified endpoint description, uri path, response type, set static status code for the response and described simple logic. But it’s not enough.

Connecting to database using Quill

Quill is ORM, and we use it for work with databases.

First of all we need to configure work with the database. For example, we’ll define the User class and its DAO.

1
2
3
4
5
6
7
8
9
10
case class User(id: Long,
  name: String,
  phoneNumber: String,
  email: String,
  passwordHash: String,
  zip: String,
  city: String,
  address: String,
  role: RoleType, // enum value.
  created: LocalDateTime)
RoleType is Enum, it has two values: Admin and User.

Next, we need to configure postgres database and Quill context to connect to that database. You can check it up in the following example of default postgres database configuration for localhost:

1
2
3
4
5
6
7
8
9
10
11
db.default {
  dataSourceClassName=org.postgresql.ds.PGSimpleDataSource
  dataSource {
    user=postgres
    password=postgres
    databaseName=db
    portNumber=5432
    serverName=localhost
  }
  connectionTimeout=30000
}

And here we created a Quill context. SnakeCase parameter means columns in the database are named using snake case, instead of camelCase. For example, the column in the database for user’s passwordHash is named password_hash:

1
lazy val ctx = new PostgresJdbcContext(SnakeCase, "db.default")

Code of UserDao class, which contains generic (CRUD) functions for users table:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
/**
 * Dao for user.
 *
 * @param context important stuff. Uses for connection to database.
 * @param ec for async work.
 */
class UserDao(context: JdbcContext[_ <: SqlIdiom, _ <: NamingStrategy])
             (implicit ec: ExecutionContext) {

  import context._

  /** Enum values mapping for the database. */
  implicit val encodeRole = 
    getquill.MappedEncoding[RoleType, Int](_.id)
  implicit val decodeRole = 
    getquill.MappedEncoding[Int, RoleType](roleId => Roles.withId(roleId))

  /** Query schema. Closest analogue - table in Slick. */
  private val users = quote {
    querySchema[User]("users")
  }

  /** Creates user and returns generated id. */
  def createUser(user: User): Future[Long] = Future {
    run(users.insertValue(lift(user)).returningGenerated(_.id))
  }

  /**
   * Updates user.
   *
   * @param user user to update
   * @return update result.
   */
  def updateUser(user: User): Future[Long] = Future {
    run(users.filter(_.id == lift(user.id)).updateValue(lift(user)))
  }

  /** Removes user. */
  def deleteUser(userId: Long): Future[Long] = Future {
    run(users.filter(_.id == lift(userId)).delete)
  }

  /** Searches user by id. */
  def find(userId: Long): Future[Option[User]] = Future {
    run(users.filter(_.id == lift(userId))).headOption
  }

  /** Searches user by email. */
  def findByEmail(email: String): Future[Option[User]] = Future {
    run(users.filter(_.email == lift(email))).headOption
  }

  /** Searches users by ids. */
  def findByIds(userIds: Seq[Long]): Future[List[User]] = Future {
    run(users.filter(user => liftQuery(userIds).contains(user.id)))
  }
}

As to Quill, it can be replaced with Slick - another ORM library. I used Quill because it’s easier to define schemas - in Slick you need to define whole column mapping, but in Quill you just need to define the correct DTO (data transfer object) and table for that DTO. For extraordinary values you have to use encoder/decoder, like for the user's roles. Here, as you can see, for queries you need to use inserted values inside lift() or liftQuery functions - without them, you’ll receive compiler error.

Wiring everything using Macwire

Macwire is convenient when you need to create a lot of stuff, and you don’t need to worry about forgetting to add something in the class constructor, when that constructor has changed. I prefer extracting all wiring into separate modules as a trait and extends application entrypoint class.

Example of wiring:

1
2
3
4
5
6
import io.getquill.{PostgresJdbcContext, SnakeCase}
import com.softwaremill.macwire._


lazy val ctx = new PostgresJdbcContext(SnakeCase, "db.default")
lazy val userDao          = wire[UserDao]

As you may notice, you don’t have to wire everything, it just needs to be in scope.

Configuring security

Now, having UserDao, we can create simple authentication and authorization.

First of all, we need to implement authentication.

For authentication, we’ll use jwt tokens in Authorization header, in format ‘Bearer token’, and the user will receive that token (contains userId in token claims) in response body after signing in, providing login and password in request body.

In authentication algorithm we extract that jwt token, parse it and extract user by extracted userId:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import com.example.errors.{ErrorInfo, Unauthorized}
import com.example.models.User

import scala.concurrent.{ExecutionContext, Future}

/**
 * Contains authentication functionality.
 *
 * @param jwtService service, which works with jwt tokens.
 * @param ec for async futures.
 */
class TapirAuthentication(jwtService: JwtService)(implicit ec: ExecutionContext) {

  /** Extracts user from token. Return either Status code 
  with error message or user. */
  def authenticate(token: String): Future[Either[ErrorInfo, User]] = {
    jwtService.extractUserFromJwt(token).map {
      case Left(exception) => 
        Left(Unauthorized("Token is expired. You need to log in first"))
      case Right(userOpt) => userOpt match {
        case Some(user) => Right(user)
        case None => Left(Unauthorized("user from token is not found"))
      }
    }
  }
}

Jwt service function (extracting userId from token and searching for user in database):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import io.jsonwebtoken.{Claims, Jwts, SignatureAlgorithm}
import com.example.dao.UserDao
import com.example.models.User

/**
 * Extracts user from jwt token
 * @param jwt token from authorization header.
 * @param ec for async futures.
 * @return either exception or optional user.
 */
def extractUserFromJwt(jwt: String)(implicit ec: ExecutionContext): 
  Future[Either[Throwable, Option[User]]] = {
  val decodedJwtStr =
    URLDecoder.decode(jwt, StandardCharsets.UTF_8.toString)
  Try {
    Jwts
      .parser()
      .setSigningKey(secret.getBytes(StandardCharsets.UTF_8.toString))
      .parseClaimsJws(decodedJwtStr)
  } match {
    case Failure(exception) => Future.successful(Left(exception))
    case Success(claims) =>
    val jwtClaims: Claims = claims.getBody
    jwtClaims.get("userId").toString.toLongOption match {
      case Some(userId) => userDao.find(userId).map(Right(_))
      case None => Future.successful(Right(None))
    }
  }
}

Jwt service extracts userId from jwt token, retrieves from database and if user is present, returns it as Right(). Value in Left() we will discuss later.

Now, having algorithm of user extraction from jwt token, we can describe secured endpoint.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
/**
 * Creates secured endpoint with role restriction from argument. 
 * If role list is empty - authorization is disabled
 *
 * PartialServerEndpoint explained: [Security input, Security output, 
 * Input, Error response, Output, 
 *    Any if no need in requirements like websocket or effect, 
 * wrapper (in most cases - future)]
 * In security endpoint defined Security input - bearer token, 
 * security output - user,
 * error response - tuple of status code with error message object and wrapper.
 */
def tapirSecurityEndpoint(roles: List[RoleType]): 
  PartialServerEndpoint[String, User, Unit, ErrorInfo, Unit, Any, Future] =
   endpoint // base tapir endpoint
     .securityIn(auth.bearer[String]()
        // defining security input
        .description("Bearer token from Authorization header")) 
     .errorOut(
       oneOf[ErrorInfo](
         // returns required http code for different types of ErrorInfo.
         // For secured endpoint you need to define 
         // all cases before defining security logic
         oneOfVariant(statusCode(StatusCode.Forbidden).and(jsonBody[Forbidden]
           .description("When user doesn't have role for the endpoint"))),
         oneOfVariant(statusCode(StatusCode.Unauthorized).and(jsonBody[Unauthorized]
           .description("When user doesn't authenticated or token is expired"))),
         oneOfVariant(statusCode(StatusCode.NotFound)
           .and(jsonBody[NotFound].description("When something not found"))),
         oneOfVariant(statusCode(StatusCode.BadRequest)
           .and(jsonBody[BadRequest].description("Bad request"))),
         oneOfVariant(statusCode(StatusCode.InternalServerError)
           .and(jsonBody[InternalServerError].description("For exceptional cases"))),
         // default case below.
         oneOfDefaultVariant(jsonBody[ErrorMessage]
           .description("Default result").example(ErrorMessage("Test error message")))
       )
     )
     .serverSecurityLogic(authentication.authenticate(_).flatMap {
        // define security logic here. For example, here is authentication, 
        // chained with authorization
        either => foldEitherOfFuture(either.map(isAuthorized(_, roles)))
          .map(_.flatten)
     })

/**
 * Authorization filter function - checks user for present roles.
 * @param user user to check
 * @param roles restricted roles to check. If empty - skips authorization.
 * @return either error with Forbidden status code or user.
 */
def isAuthorized(user: User, roles: List[RoleType]): Future[Either[ErrorInfo, User]] =
  Future.successful(if (roles.isEmpty || roles.contains(user.role)) 
    Right(user) 
  else 
    Left(Forbidden("user is not allowed to use this endpoint")))

Now we have secured endpoint template, which we can use to protect some endpoint with authentication, but we didn’t discuss error status code customization

Error response customization

By default, Tapir returns Http 400 with empty response in situations, when we return Left() without defining errorOut() details, which is not suitable for some cases, so we need to customize that.

Firstly I used this type of defining error response type:

1
2
3
4
5
6
7
8
endpoint
  .get
  .in(test)
  .errorOut(statusCode
    .description(Some response for that code and another one for another status code))
  .errorOut(jsonBody[ErrorMessage]
    .description(Contains details about error response))
  ... // set here response, logic etc

And as to the described endpoint for error response you had to return required status code and output in tuple. It was enough, but it didn’t describe these error response codes in swagger documentation. And I changed to mapping, described in the security endpoint.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
.errorOut(
  oneOf[ErrorInfo](
    // returns required http code for different types of ErrorInfo. 
    // For secured endpoint you need to define 
    // all cases before defining security logic
    oneOfVariant(statusCode(StatusCode.Forbidden)
      .and(jsonBody[Forbidden]
        .description("When user doesn't have role for the endpoint"))),
    oneOfVariant(statusCode(StatusCode.Unauthorized)
      .and(jsonBody[Unauthorized]
        .description("When user doesn't authenticated or token is expired"))),
    oneOfVariant(statusCode(StatusCode.NotFound)
      .and(jsonBody[NotFound].description("When something not found"))),
    oneOfVariant(statusCode(StatusCode.BadRequest)
      .and(jsonBody[BadRequest].description("Bad request"))),
    oneOfVariant(statusCode(StatusCode.InternalServerError)
      .and(jsonBody[InternalServerError]
      .description("For exceptional cases"))),
    // default case below.
    oneOfDefaultVariant(jsonBody[ErrorMessage].description("Default result")
      .example(ErrorMessage("Test error message")))
  )
)

You can read more about it here. This approach is more convenient because you need only return an appropriate variant of ErrorInfo, and also you can describe different error code responses in swagger documentation.

Protecting endpoints using secured endpoint template

Now, having a security endpoint template, we can make any endpoint protected. For example, create order endpoint:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
 * Create an order endpoint.
 */
// accessible only for users with role User
val createOrderEndpoint = tapirSecurity.tapirSecurityEndpoint(List(Roles.User)) 
  .post // POST endpoint
  .in("orders") // /orders uri
  .description("Creates order for the user")
  .in(jsonBody[CreateOrderForm] // request has to have body of CreateOrderForm
  .description("Contains everything for creating order")
  .example(CreateOrderForm(List(OrderProductForm(1, 5)), "Some delivery comment")))
  .out(statusCode(StatusCode.Created)
    // set static status code for success response
    .description("Returns Created when order is created")) 
  .serverLogic { user => newOrder => 
    // security output => endpoint input => server logic
    if (newOrder.products.forall(product => 
      product.quantity > 0 && product.productId > 0)) {
      orderService.createOrder(user.id, newOrder).map(_ => Right(()))
    } else {
      Future.successful(Left(BadRequest("Some order record contains invalid value!")))
    }
  }

As you can see, we are using a protected endpoint, added restriction that only users with role User can access this endpoint. Also, you can notice that we are not defining error output. That is because we can’t describe error output after describing security logic, so you need to describe it first. If you don’t want to describe error response cases, which won’t happen in your endpoint logic, you can describe security logic for every protected endpoint. I made a template because error responses are similar.

Exception handling customization

Now, after completing all necessary endpoints and checking them, I noticed, that after exception occurred, that back-end returns only http 500 with ‘Internal server error’ text. But it’s not informative for the front-end developer, who will work with this API. So we need to modify exception handling to provide additional information. Tapir has different interceptors and we can modify them.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import sttp.tapir.json.circe.jsonBody
import sttp.tapir.generic.auto._
import sttp.tapir._
import sttp.tapir.server.akkahttp.AkkaHttpServerOptions
import sttp.tapir.server.model.ValuedEndpointOutput
import sttp.tapir.server.interceptor.exception.ExceptionHandler

import io.circe.generic.auto._
import scala.concurrent.{ExecutionContext, Future}


/**
 * Configuration for AkkaHttpServer routes.
 *
 * Contains customization for decode failure handler, exception handler 
 * and applied metrics interceptor
 */
implicit val customServerOptions: AkkaHttpServerOptions = 
  AkkaHttpServerOptions.customiseInterceptors
      .exceptionHandler(ExceptionHandler[Future] { ctx =>
        // defining exception id for the exception to make search in logs easier.
        val exceptionId = UUID.randomUUID() 
        logger.error(s"Intercepted exception ${ctx.e} while processing " + 
          "request, exception id: $exceptionId")
        Future.successful(Some(
          ValuedEndpointOutput[ErrorMessage](jsonBody[ErrorMessage], 
          ErrorMessage(s"Internal Server Error, exception id: $exceptionId"))))
      })
      .options

Now, when some exception occurs, we’ll log it, mark it with an exception id and add this id to the response. And when the front-end developer sends that exception id to us, we will find the issue much faster. Moreover, we can customize behavior for the decode failure handler in this options object. As a result, we now have these options:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import sttp.tapir.server.interceptor.decodefailure.DefaultDecodeFailureHandler
import sttp.tapir.server.interceptor.decodefailure.DefaultDecodeFailureHandler.FailureMessages

implicit val customServerOptions: AkkaHttpServerOptions = 
  AkkaHttpServerOptions.customiseInterceptors
      .decodeFailureHandler(ctx => {
        ctx.failingInput match {
          // when defining how a decode failure should be handled, we need 
          // to describe the output to be used, and
          // a value for this output
          case _: EndpointIO.Body[_, _] =>
            // see this function and then to failureSourceMessage function 
            // to find out which types of decode errors are present
            val failureMessage = FailureMessages.failureMessage(ctx)
            logger.info(s"$failureMessage")
            // warning - log working incorrect when there are several endpoints 
            // with different methods
            DefaultDecodeFailureHandler.default(ctx)
          case _ => DefaultDecodeFailureHandler.default(ctx)
        }
      })
      .exceptionHandler(ExceptionHandler[Future] { ctx =>
        // defining exception id for the exception to make search in logs easier.
        val exceptionId = UUID.randomUUID() 
        logger.error(s"Intercepted exception ${ctx.e} while processing request, " +
          "exception id: $exceptionId")
        Future.successful(Some(ValuedEndpointOutput[ErrorMessage](jsonBody[ErrorMessage], 
          ErrorMessage(s"Internal Server Error, exception id: $exceptionId"))))
      })
      .options

As you can see, we added logging of incorrect body decoding. You can add handling for different types of handlers, for example - queries, paths, methods, etc.

To use this options you can make this options in implicit scope for AkkaHttpInterpreter or use it directly: AkkaHttpServerInterpreter(errorHandler.customServerOptions).toRoute(endpointList)

Adding metrics interceptors

Prometheus' metrics collects information about request handling time, counts requests for each endpoint, and you can add other metrics or integrate custom one. We'll add them to our example.

It’s not hard to connect, you just need to add an interceptor to AkkaHttpServerOptions. For this we need to add this dependency:

"com.softwaremill.sttp.tapir" %% "tapir-prometheus-metrics" % TapirVersion

And we need to add that interceptor:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import sttp.tapir.server.metrics.prometheus.PrometheusMetrics

/** Prometheus metrics interceptor. */
val prometheusMetrics = PrometheusMetrics.default[Future]()

/**
 * Configuration for AkkaHttpServer routes.
 *
 * Contains customization for decode failure handler, exception handler 
 * and applied metrics interceptor
 */
implicit val customServerOptions: AkkaHttpServerOptions = 
  AkkaHttpServerOptions.customiseInterceptors
      .decodeFailureHandler(ctx => {
        ctx.failingInput match {
          // when defining how a decode failure should be handled, 
          // we need to describe the output to be used, and
          // a value for this output
          case _: EndpointIO.Body[_, _] =>
            // see this function and then to failureSourceMessage function 
            // to find out which types of decode errors are present
            val failureMessage = FailureMessages.failureMessage(ctx)
            logger.info(s"$failureMessage")
            // warning - log working incorrect when there are several 
            // endpoints with different methods
            DefaultDecodeFailureHandler.default(ctx)
          case _ => DefaultDecodeFailureHandler.default(ctx)
        }
      })
      .exceptionHandler(ExceptionHandler[Future] { ctx =
      // defining exception id for the exception 
      // to make search in logs easier
        val exceptionId = UUID.randomUUID() 
        logger.error(s"Intercepted exception ${ctx.e} " + 
          "while processing request, exception id: $exceptionId")
        Future.successful(Some(
          ValuedEndpointOutput[ErrorMessage](jsonBody[ErrorMessage], 
            ErrorMessage(s"Internal Server Error, exception id: $exceptionId"))))
      })
      .metricsInterceptor(prometheusMetrics.metricsInterceptor())
      .options

And when we send request to /metrics endpoint, we’ll see prometheus metrics:

Prometheus metrics page screenshot

You can read more about metrics interceptor here.

Wrapping requests for different purposes via custom Akka-HTTP directives

Sometimes, using Tapir’s AkkaHttpServerOptions, you can’t define the interceptor you need. For these cases you can create Akka-Http directives to make some actions, which you can’t describe inside interceptors in these options. For example, we’ll make a request handling time tracker, which works around Tapir endpoints. Solution was adapted from this article.

I will show just the final solution:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
/**
 * Starts timer and returns function, which will stop the timer and 
 * log time with some details.
 *
 * @param request request, on which will be logged after handling request.
 * @return function, which will handle response for request.
 */
def timeRequest(request: HttpRequest): Try[RouteResult] => Unit = {
  val start = System.currentTimeMillis()

  {
    case Success(Complete(resp)) =>
    val d = System.currentTimeMillis() - start
    logger.info(s"[${resp.status.intValue()}] ${request.method.name} " +
    s"${request.uri.path}, took: ${d}ms")
    case Success(Rejected(_)) =>
    case Failure(_) =>
  }
}

/**
 * Directive-wrapper for request.
 *
 * @param onRequest action, which accepts request and returns another function, 
 *                  which accepts response
 * @return ready directive, which can be used for wrapping other directives
 */
def aroundRequest(onRequest: HttpRequest => Try[RouteResult] => Unit): Directive0 =
  extractRequestContext.flatMap { ctx =>
    // starts timer for request and returns function, which you will use 
    // to stop timer and log request time
    val onDone = onRequest(ctx.request) 
    mapInnerRoute { inner =>
      inner.andThen { resultFuture =>
        resultFuture.map {
          case c @ Complete(response) =>
            Complete(response.mapEntity { entity =>
              // stops timer now because response is empty
              if (entity.isKnownEmpty()) { 
                onDone(Success(c))
                entity
              } else {
                // On an empty entity, `transformDataBytes` unsets `isKnownEmpty`.
                // Call onDone right away, since there's no significant amount of
                // data to send, anyway.
                entity.transformDataBytes(Flow[ByteString].watchTermination() {
                  case (mat, future) =>
                  // stops timer after finishing sending response
                  future.map(_ => c).onComplete(onDone) 
                  mat
                })
              }
            })
          case other =>
            onDone(Success(other)) // stops timer and returns other
            other
        }.andThen { 
          // skip this if you use akka.http.scaladsl.server.handleExceptions, 
          // put onDone there
          case Failure(ex) =>
          onDone(Failure(ex)) // stops timer and returns failure
        }
      }
    }
  }

And now routes look like this:

1
2
3
4
5
6
7
8
val resultRoute: Route =
    timeTracker.aroundRequest(timeTracker.timeRequest) {
      Directives.concat(
        AkkaHttpServerInterpreter(errorHandler.customServerOptions)
          .toRoute(endpointList),
        AkkaHttpServerInterpreter(errorHandler.customServerOptions)
          .toRoute(errorHandler.prometheusMetrics.metricsEndpoint))
    }

Adding swagger docs

Swagger - implementation of OpenAPI, used for rendering documentation for HTTP endpoints in convenient form. It is useful for explaining your API’s for other developers. Alternatively, you can use Redoc, more you can read here.

We need to add this dependency to our build.sbt to build the swagger specs:

"com.softwaremill.sttp.tapir" %% "tapir-swagger-ui-bundle" % TapirVersion

After adding dependency we need to create a swagger documentation endpoint. For this we provide all endpoints list to swagger interpreter:

1
2
val swaggerEndpoints = SwaggerInterpreter()
  .fromEndpoints[Future](endpointList.map(_.endpoint), "My App", "1.0")

After that we're adding swagger endpoint to final routes:

1
2
3
4
5
6
7
8
9
10
val resultRoute: Route =
  timeTracker.aroundRequest(timeTracker.timeRequest) {
    Directives.concat(
      AkkaHttpServerInterpreter(errorHandler.customServerOptions)
        .toRoute(swaggerEndpoints),
      AkkaHttpServerInterpreter(errorHandler.customServerOptions)
        .toRoute(endpointList),
      AkkaHttpServerInterpreter(errorHandler.customServerOptions)
        .toRoute(errorHandler.prometheusMetrics.metricsEndpoint))
  }

And when we go to /docs endpoint, we see… swagger docs!

Swagger docs page screenshot

API testing

You can easily test endpoints, especially if you separated endpoint logic and endpoints to different places.

For example, I’ll show test for one of secured endpoints:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import java.time.LocalDateTime

import com.example.auth.{TapirAuthentication, TapirSecurity}
import com.example.errors.{Forbidden, Unauthorized}
import com.example.models.{Roles, User}
import com.example.services.OrderService
import io.circe.syntax.EncoderOps
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito.when
import org.scalatest.flatspec.AsyncFlatSpec
import org.scalatest.matchers.should.Matchers
import org.scalatestplus.mockito.MockitoSugar.mock
import sttp.client3._
import io.circe.generic.auto._
import sttp.client3.testing.SttpBackendStub
import sttp.model.StatusCode
import sttp.tapir.server.stub.TapirStubInterpreter

import scala.concurrent.Future


val testUser: User = User(1, "test name", "+777777777", "test@example.com", "hash", 
  "49050", "Dnipro", "test address", Roles.User, LocalDateTime.now())

/** Case where a user with the wrong role is trying to get an endpoint for another 
    user role. */
  it should "Reject user with wrong role" in {
    // preparations
    val authentication = mock[TapirAuthentication]
    when(authentication.authenticate(any[String]))
      .thenReturn(Future.successful(Right(testUser.copy(role = Roles.Admin))))
    val orderService = mock[OrderService]
    val orderController = 
      new OrderController(new TapirSecurity(authentication), orderService)

    // given
    val backendStub: SttpBackend[Future, Any] = 
      TapirStubInterpreter(SttpBackendStub.asynchronousFuture)
          .whenServerEndpoint(orderController.viewUserOrderListEndpoint)
          .thenRunLogic()
          .backend()

    // when
    val response = basicRequest
      .get(uri"http://localhost:9000/orders")
      .header("Authorization", "Bearer password")
      .send(backendStub)

    // then
    response.map { resp =>
      logger.info(s"orders expecting 403 Forbidden: ${resp.body}")
      resp.code shouldBe StatusCode.Forbidden
      resp.body shouldBe 
        Left(Forbidden("user is not allowed to use this endpoint").asJson.noSpaces)
    }
  }

As you have noticed, I mocked authentication and stubs it to return user with incorrect role. In the given part I described running endpoint logic and in when part I made a request to that endpoint. In the part I expect to receive a Forbidden response because a user with this role can’t access that endpoint.

Feel free to read more about Tapir testing here.

I hope you’ll find the above useful!

If you're looking for a developer or considering starting a new project,
we are always ready to help!