Tapir - cutting-edge library for endpoint description, compatible with many well-known libraries like Akka-Http, Play,
etc. Compared with defining endpoints via Akka-Http routing DSL, it’s much easier to define and describe endpoints and
use this description to create OpenAPI or
AsyncAPI documentation. Examples I found were too short and separate from each
other, so I decided to make an example, which combines Tapir features, and it’s closer to real projects.
We will make an abstract kind of store, where users can register and log in, view store products, order them and view
their own orders. Also, there are admins, which can create, update and delete products, view orders, update their
statuses and remove them. Users can’t access admin endpoints and vice versa.
Technology stack:
Scala 2.13
Tapir 1.0.3 (required Java 11+)
Sttp 3.7.2 (required Java 11+, in example used in tests)
importsttp.model.StatusCodeimportsttp.tapir._valtapirEndpoint=endpoint.get// http type.description("test endpoint")// endpoint's description.in("test".description("endpoint path"))// description for uri path, /test uri.out(stringBody.description("type of response"))// This endpoint will return string body. Also, description for body.out(statusCode(StatusCode.Created).description("Specifies response status code for success case"))// Description for result status code.serverLogic{_=>Future(Right(s"test ok response"))// response in Right for success, left for error}
We specified endpoint description, uri path, response type, set static status code for the response and described simple
logic. But it’s not enough.
Connecting to database using Quill
Quill is ORM, and we use it for work with databases.
First of all we need to configure work with the database. For example, we’ll define the User class
and its DAO.
RoleType is Enum, it has two values: Admin and User.
Next, we need to configure postgres database and Quill context to connect to that database. You can check it up in the
following example of default postgres database configuration for localhost:
And here we created a Quill context. SnakeCase parameter means columns in the database are named using snake case,
instead of camelCase. For example, the column in the database for user’s passwordHash is named password_hash:
/** * Dao for user. * * @param context important stuff. Uses for connection to database. * @param ec for async work. */classUserDao(context:JdbcContext[_<:SqlIdiom, _<:NamingStrategy])(implicitec:ExecutionContext){importcontext._/** Enum values mapping for the database. */implicitvalencodeRole=getquill.MappedEncoding[RoleType, Int](_.id)implicitvaldecodeRole=getquill.MappedEncoding[Int, RoleType](roleId=>Roles.withId(roleId))/** Query schema. Closest analogue - table in Slick. */privatevalusers=quote{querySchema[User]("users")}/** Creates user and returns generated id. */defcreateUser(user:User):Future[Long]=Future{run(users.insertValue(lift(user)).returningGenerated(_.id))}/** * Updates user. * * @param user user to update * @return update result. */defupdateUser(user:User):Future[Long]=Future{run(users.filter(_.id==lift(user.id)).updateValue(lift(user)))}/** Removes user. */defdeleteUser(userId:Long):Future[Long]=Future{run(users.filter(_.id==lift(userId)).delete)}/** Searches user by id. */deffind(userId:Long):Future[Option[User]]=Future{run(users.filter(_.id==lift(userId))).headOption}/** Searches user by email. */deffindByEmail(email:String):Future[Option[User]]=Future{run(users.filter(_.email==lift(email))).headOption}/** Searches users by ids. */deffindByIds(userIds:Seq[Long]):Future[List[User]]=Future{run(users.filter(user=>liftQuery(userIds).contains(user.id)))}}
As to Quill, it can be replaced with Slick - another ORM library. I used Quill because it’s easier to define schemas -
in Slick you need to define whole column mapping, but in Quill you just need to define the correct DTO (data transfer
object) and table for that DTO. For extraordinary values you have to use encoder/decoder, like for the user's roles.
Here, as you can see, for queries you need to use inserted values inside lift() or liftQuery functions - without them,
you’ll receive compiler error.
Wiring everything using Macwire
Macwire is convenient when you need to create a lot of stuff, and you don’t need to worry about forgetting to add
something in the class constructor, when that constructor has changed. I prefer extracting all wiring into separate
modules as a trait and extends application entrypoint class.
As you may notice, you don’t have to wire everything, it just needs to be in scope.
Configuring security
Now, having UserDao, we can create simple authentication and authorization.
First of all, we need to implement authentication.
For authentication, we’ll use jwt tokens in Authorization header, in format ‘Bearer token’, and the user will receive
that token (contains userId in token claims) in response body after signing in, providing login and password in request
body.
In authentication algorithm we extract that jwt token, parse it and extract user by extracted userId:
1234567891011121314151617181920212223242526
importcom.example.errors.{ErrorInfo,Unauthorized}importcom.example.models.Userimportscala.concurrent.{ExecutionContext,Future}/** * Contains authentication functionality. * * @param jwtService service, which works with jwt tokens. * @param ec for async futures. */classTapirAuthentication(jwtService:JwtService)(implicitec:ExecutionContext){/** Extracts user from token. Return either Status code with error message or user. */defauthenticate(token:String):Future[Either[ErrorInfo, User]]={jwtService.extractUserFromJwt(token).map{caseLeft(exception)=>Left(Unauthorized("Token is expired. You need to log in first"))caseRight(userOpt)=>userOptmatch{caseSome(user)=>Right(user)caseNone=>Left(Unauthorized("user from token is not found"))}}}}
Jwt service function (extracting userId from token and searching for user in database):
1234567891011121314151617181920212223242526272829
importio.jsonwebtoken.{Claims,Jwts,SignatureAlgorithm}importcom.example.dao.UserDaoimportcom.example.models.User/** * Extracts user from jwt token * @param jwt token from authorization header. * @param ec for async futures. * @return either exception or optional user. */defextractUserFromJwt(jwt:String)(implicitec:ExecutionContext):Future[Either[Throwable, Option[User]]]={valdecodedJwtStr=URLDecoder.decode(jwt,StandardCharsets.UTF_8.toString)Try{Jwts.parser().setSigningKey(secret.getBytes(StandardCharsets.UTF_8.toString)).parseClaimsJws(decodedJwtStr)}match{caseFailure(exception)=>Future.successful(Left(exception))caseSuccess(claims)=>valjwtClaims:Claims=claims.getBodyjwtClaims.get("userId").toString.toLongOptionmatch{caseSome(userId)=>userDao.find(userId).map(Right(_))caseNone=>Future.successful(Right(None))}}}
Jwt service extracts userId from jwt token, retrieves from database and if user is present, returns it as Right().
Value in Left() we will discuss later.
Now, having algorithm of user extraction from jwt token, we can describe secured endpoint.
/** * Creates secured endpoint with role restriction from argument. * If role list is empty - authorization is disabled * * PartialServerEndpoint explained: [Security input, Security output, * Input, Error response, Output, * Any if no need in requirements like websocket or effect, * wrapper (in most cases - future)] * In security endpoint defined Security input - bearer token, * security output - user, * error response - tuple of status code with error message object and wrapper. */deftapirSecurityEndpoint(roles:List[RoleType]):PartialServerEndpoint[String, User, Unit, ErrorInfo, Unit, Any, Future]=endpoint// base tapir endpoint.securityIn(auth.bearer[String]()// defining security input.description("Bearer token from Authorization header")).errorOut(oneOf[ErrorInfo](// returns required http code for different types of ErrorInfo.// For secured endpoint you need to define // all cases before defining security logiconeOfVariant(statusCode(StatusCode.Forbidden).and(jsonBody[Forbidden].description("When user doesn't have role for the endpoint"))),oneOfVariant(statusCode(StatusCode.Unauthorized).and(jsonBody[Unauthorized].description("When user doesn't authenticated or token is expired"))),oneOfVariant(statusCode(StatusCode.NotFound).and(jsonBody[NotFound].description("When something not found"))),oneOfVariant(statusCode(StatusCode.BadRequest).and(jsonBody[BadRequest].description("Bad request"))),oneOfVariant(statusCode(StatusCode.InternalServerError).and(jsonBody[InternalServerError].description("For exceptional cases"))),// default case below.oneOfDefaultVariant(jsonBody[ErrorMessage].description("Default result").example(ErrorMessage("Test error message"))))).serverSecurityLogic(authentication.authenticate(_).flatMap{// define security logic here. For example, here is authentication, // chained with authorizationeither=>foldEitherOfFuture(either.map(isAuthorized(_,roles))).map(_.flatten)})/** * Authorization filter function - checks user for present roles. * @param user user to check * @param roles restricted roles to check. If empty - skips authorization. * @return either error with Forbidden status code or user. */defisAuthorized(user:User,roles:List[RoleType]):Future[Either[ErrorInfo, User]]=Future.successful(if(roles.isEmpty||roles.contains(user.role))Right(user)elseLeft(Forbidden("user is not allowed to use this endpoint")))
Now we have secured endpoint template, which we can use to protect some endpoint with authentication, but we didn’t
discuss error status code customization
Error response customization
By default, Tapir returns Http 400 with empty response in situations, when we return Left() without defining errorOut()
details, which is not suitable for some cases, so we need to customize that.
Firstly I used this type of defining error response type:
12345678
endpoint.get.in(“test”).errorOut(statusCode.description(“Someresponseforthatcodeandanotheroneforanotherstatuscode”)).errorOut(jsonBody[ErrorMessage].description(“Containsdetailsabouterrorresponse”))...// set here response, logic etc
And as to the described endpoint for error response you had to return required status code and output in tuple. It was
enough, but it didn’t describe these error response codes in swagger documentation. And I changed to mapping, described
in the security endpoint.
1234567891011121314151617181920212223
.errorOut(oneOf[ErrorInfo](// returns required http code for different types of ErrorInfo. // For secured endpoint you need to define // all cases before defining security logiconeOfVariant(statusCode(StatusCode.Forbidden).and(jsonBody[Forbidden].description("When user doesn't have role for the endpoint"))),oneOfVariant(statusCode(StatusCode.Unauthorized).and(jsonBody[Unauthorized].description("When user doesn't authenticated or token is expired"))),oneOfVariant(statusCode(StatusCode.NotFound).and(jsonBody[NotFound].description("When something not found"))),oneOfVariant(statusCode(StatusCode.BadRequest).and(jsonBody[BadRequest].description("Bad request"))),oneOfVariant(statusCode(StatusCode.InternalServerError).and(jsonBody[InternalServerError].description("For exceptional cases"))),// default case below.oneOfDefaultVariant(jsonBody[ErrorMessage].description("Default result").example(ErrorMessage("Test error message")))))
You can read more about it here.
This approach is more convenient because you need only return an appropriate variant of ErrorInfo, and also you can
describe different error code responses in swagger documentation.
Protecting endpoints using secured endpoint template
Now, having a security endpoint template, we can make any endpoint protected. For example, create order endpoint:
1234567891011121314151617181920212223
/** * Create an order endpoint. */// accessible only for users with role UservalcreateOrderEndpoint=tapirSecurity.tapirSecurityEndpoint(List(Roles.User)).post// POST endpoint.in("orders")// /orders uri.description("Creates order for the user").in(jsonBody[CreateOrderForm]// request has to have body of CreateOrderForm.description("Contains everything for creating order").example(CreateOrderForm(List(OrderProductForm(1,5)),"Some delivery comment"))).out(statusCode(StatusCode.Created)// set static status code for success response.description("Returns Created when order is created")).serverLogic{user=>newOrder=>// security output => endpoint input => server logicif(newOrder.products.forall(product=>product.quantity>0&&product.productId>0)){orderService.createOrder(user.id,newOrder).map(_=>Right(()))}else{Future.successful(Left(BadRequest("Some order record contains invalid value!")))}}
As you can see, we are using a protected endpoint, added restriction that only users with role User can access this
endpoint. Also, you can notice that we are not defining error output. That is because we can’t describe error output
after describing security logic, so you need to describe it first. If you don’t want to describe error response cases,
which won’t happen in your endpoint logic, you can describe security logic for every protected endpoint. I made a
template because error responses are similar.
Exception handling customization
Now, after completing all necessary endpoints and checking them, I noticed, that after exception occurred, that back-end
returns only http 500 with ‘Internal server error’ text. But it’s not informative for the front-end developer, who will
work with this API. So we need to modify exception handling to provide additional information. Tapir has different
interceptors and we can modify them.
1234567891011121314151617181920212223242526272829
importsttp.tapir.json.circe.jsonBodyimportsttp.tapir.generic.auto._importsttp.tapir._importsttp.tapir.server.akkahttp.AkkaHttpServerOptionsimportsttp.tapir.server.model.ValuedEndpointOutputimportsttp.tapir.server.interceptor.exception.ExceptionHandlerimportio.circe.generic.auto._importscala.concurrent.{ExecutionContext,Future}/** * Configuration for AkkaHttpServer routes. * * Contains customization for decode failure handler, exception handler * and applied metrics interceptor */implicitvalcustomServerOptions:AkkaHttpServerOptions=AkkaHttpServerOptions.customiseInterceptors.exceptionHandler(ExceptionHandler[Future]{ctx=>// defining exception id for the exception to make search in logs easier.valexceptionId=UUID.randomUUID()logger.error(s"Intercepted exception ${ctx.e} while processing "+"request, exception id: $exceptionId")Future.successful(Some(ValuedEndpointOutput[ErrorMessage](jsonBody[ErrorMessage],ErrorMessage(s"Internal Server Error, exception id: $exceptionId"))))}).options
Now, when some exception occurs, we’ll log it, mark it with an exception id and add this id to the response. And when
the front-end developer sends that exception id to us, we will find the issue much faster.
Moreover, we can customize behavior for the decode failure handler in this options object. As a result, we now have
these options:
importsttp.tapir.server.interceptor.decodefailure.DefaultDecodeFailureHandlerimportsttp.tapir.server.interceptor.decodefailure.DefaultDecodeFailureHandler.FailureMessagesimplicitvalcustomServerOptions:AkkaHttpServerOptions=AkkaHttpServerOptions.customiseInterceptors.decodeFailureHandler(ctx=>{ctx.failingInputmatch{// when defining how a decode failure should be handled, we need // to describe the output to be used, and// a value for this outputcase_:EndpointIO.Body[_, _]=>// see this function and then to failureSourceMessage function // to find out which types of decode errors are presentvalfailureMessage=FailureMessages.failureMessage(ctx)logger.info(s"$failureMessage")// warning - log working incorrect when there are several endpoints // with different methodsDefaultDecodeFailureHandler.default(ctx)case_=>DefaultDecodeFailureHandler.default(ctx)}}).exceptionHandler(ExceptionHandler[Future]{ctx=>// defining exception id for the exception to make search in logs easier.valexceptionId=UUID.randomUUID()logger.error(s"Intercepted exception ${ctx.e} while processing request, "+"exception id: $exceptionId")Future.successful(Some(ValuedEndpointOutput[ErrorMessage](jsonBody[ErrorMessage],ErrorMessage(s"Internal Server Error, exception id: $exceptionId"))))}).options
As you can see, we added logging of incorrect body decoding. You can add handling for different types of handlers, for
example - queries, paths, methods, etc.
To use this options you can make this options in implicit scope for AkkaHttpInterpreter or use it directly:
AkkaHttpServerInterpreter(errorHandler.customServerOptions).toRoute(endpointList)
Adding metrics interceptors
Prometheus' metrics collects information about request handling time, counts requests for each endpoint, and you can
add other metrics or integrate custom one. We'll add them to our example.
It’s not hard to connect, you just need to add an interceptor to AkkaHttpServerOptions. For this we need to add this dependency:
importsttp.tapir.server.metrics.prometheus.PrometheusMetrics/** Prometheus metrics interceptor. */valprometheusMetrics=PrometheusMetrics.default[Future]()/** * Configuration for AkkaHttpServer routes. * * Contains customization for decode failure handler, exception handler * and applied metrics interceptor */implicitvalcustomServerOptions:AkkaHttpServerOptions=AkkaHttpServerOptions.customiseInterceptors.decodeFailureHandler(ctx=>{ctx.failingInputmatch{// when defining how a decode failure should be handled, // we need to describe the output to be used, and// a value for this outputcase_:EndpointIO.Body[_, _]=>// see this function and then to failureSourceMessage function // to find out which types of decode errors are presentvalfailureMessage=FailureMessages.failureMessage(ctx)logger.info(s"$failureMessage")// warning - log working incorrect when there are several // endpoints with different methodsDefaultDecodeFailureHandler.default(ctx)case_=>DefaultDecodeFailureHandler.default(ctx)}}).exceptionHandler(ExceptionHandler[Future]{ctx=// defining exception id for the exception // to make search in logs easiervalexceptionId=UUID.randomUUID()logger.error(s"Intercepted exception ${ctx.e} "+"while processing request, exception id: $exceptionId")Future.successful(Some(ValuedEndpointOutput[ErrorMessage](jsonBody[ErrorMessage],ErrorMessage(s"Internal Server Error, exception id: $exceptionId"))))}).metricsInterceptor(prometheusMetrics.metricsInterceptor()).options
And when we send request to /metrics endpoint, we’ll see prometheus metrics:
Wrapping requests for different purposes via custom Akka-HTTP directives
Sometimes, using Tapir’s AkkaHttpServerOptions, you can’t define the interceptor you need.
For these cases you can create Akka-Http directives to make some actions, which you can’t describe inside interceptors
in these options.
For example, we’ll make a request handling time tracker, which works around Tapir endpoints. Solution was adapted from
this article.
/** * Starts timer and returns function, which will stop the timer and * log time with some details. * * @param request request, on which will be logged after handling request. * @return function, which will handle response for request. */deftimeRequest(request:HttpRequest):Try[RouteResult]=>Unit={valstart=System.currentTimeMillis(){caseSuccess(Complete(resp))=>vald=System.currentTimeMillis()-startlogger.info(s"[${resp.status.intValue()}] ${request.method.name} "+s"${request.uri.path}, took: ${d}ms")caseSuccess(Rejected(_))=>caseFailure(_)=>}}/** * Directive-wrapper for request. * * @param onRequest action, which accepts request and returns another function, * which accepts response * @return ready directive, which can be used for wrapping other directives */defaroundRequest(onRequest:HttpRequest=>Try[RouteResult]=>Unit):Directive0=extractRequestContext.flatMap{ctx=>// starts timer for request and returns function, which you will use // to stop timer and log request timevalonDone=onRequest(ctx.request)mapInnerRoute{inner=>inner.andThen{resultFuture=>resultFuture.map{casec@Complete(response)=>Complete(response.mapEntity{entity=>// stops timer now because response is emptyif(entity.isKnownEmpty()){onDone(Success(c))entity}else{// On an empty entity, `transformDataBytes` unsets `isKnownEmpty`.// Call onDone right away, since there's no significant amount of// data to send, anyway.entity.transformDataBytes(Flow[ByteString].watchTermination(){case(mat,future)=>// stops timer after finishing sending responsefuture.map(_=>c).onComplete(onDone)mat})}})caseother=>onDone(Success(other))// stops timer and returns otherother}.andThen{// skip this if you use akka.http.scaladsl.server.handleExceptions, // put onDone therecaseFailure(ex)=>onDone(Failure(ex))// stops timer and returns failure}}}}
Swagger - implementation of OpenAPI, used for rendering documentation for HTTP endpoints in convenient form. It is useful
for explaining your API’s for other developers. Alternatively, you can use Redoc, more you can read
here.
We need to add this dependency to our build.sbt to build the swagger specs:
importjava.time.LocalDateTimeimportcom.example.auth.{TapirAuthentication,TapirSecurity}importcom.example.errors.{Forbidden,Unauthorized}importcom.example.models.{Roles,User}importcom.example.services.OrderServiceimportio.circe.syntax.EncoderOpsimportorg.mockito.ArgumentMatchers.anyimportorg.mockito.Mockito.whenimportorg.scalatest.flatspec.AsyncFlatSpecimportorg.scalatest.matchers.should.Matchersimportorg.scalatestplus.mockito.MockitoSugar.mockimportsttp.client3._importio.circe.generic.auto._importsttp.client3.testing.SttpBackendStubimportsttp.model.StatusCodeimportsttp.tapir.server.stub.TapirStubInterpreterimportscala.concurrent.FuturevaltestUser:User=User(1,"test name","+777777777","test@example.com","hash","49050","Dnipro","test address",Roles.User,LocalDateTime.now())/** Case where a user with the wrong role is trying to get an endpoint for another user role. */itshould"Reject user with wrong role"in{// preparationsvalauthentication=mock[TapirAuthentication]when(authentication.authenticate(any[String])).thenReturn(Future.successful(Right(testUser.copy(role=Roles.Admin))))valorderService=mock[OrderService]valorderController=newOrderController(newTapirSecurity(authentication),orderService)// givenvalbackendStub:SttpBackend[Future, Any]=TapirStubInterpreter(SttpBackendStub.asynchronousFuture).whenServerEndpoint(orderController.viewUserOrderListEndpoint).thenRunLogic().backend()// whenvalresponse=basicRequest.get(uri"http://localhost:9000/orders").header("Authorization","Bearer password").send(backendStub)// thenresponse.map{resp=>logger.info(s"orders expecting 403 Forbidden: ${resp.body}")resp.codeshouldBeStatusCode.Forbiddenresp.bodyshouldBeLeft(Forbidden("user is not allowed to use this endpoint").asJson.noSpaces)}}
As you have noticed, I mocked authentication and stubs it to return user with incorrect role. In the given part
I described running endpoint logic and in when part I made a request to that endpoint.
In the part I expect to receive a Forbidden response because a user with this role can’t access that endpoint.