Proper way to implement Timeout interceptor

Hi there!
I would like to implement Interceptor to control a time of effect evaluation.
I am using ZIO and different server interpreters.
First of all I tried to implement it as RequestIntercepter failing effect in case of timeout. Something like this:

    RequestInterceptor.transformResultEffect(new RequestResultEffectTransform[Task] {
      override def apply[B](request: ServerRequest, result: Task[RequestResult[B]]): Task[RequestResult[B]] =
        result.timeoutFail(new Exception(s"timeout ${timeout.toMillis} ms exceeded"))(
          zio.Duration.fromScala(3.seconds)
        )
    })

Also I have at the very beginning another prepended RequestInterceptor which is aim to recover all errors and render proper response from them. Something like this:

class RecoverInterceptor[F, S[_]] extends RequestInterceptor[Task] {

  private val wrapServerError: Throwable => ServerException[_] = {
    case se: ServerException[_] => se
    case t: Throwable           => ServerException.InternalError("Unknown error", t)
  }

  private def outputM(error: ServerException[_]) =
    ValuedEndpointOutput[F](statusCode(StatusCode.InternalServerError) and jsonBody[ServerException[_]], error)

  override def apply[R, B](
    responder: Responder[Task, B],
    requestHandler: EndpointInterceptor[Task] => RequestHandler[Task, R, B]
  ): RequestHandler[Task, R, B] = {
    val next = requestHandler(EndpointInterceptor.noop)
    new RequestHandler[Task, R, B] {
      override def apply(request: ServerRequest, endpoints: List[ServerEndpoint[R, Task]])(implicit
        monad: MonadError[Task]
      ): Task[RequestResult[B]] =
        next(request, endpoints).catchNonFatalOrDie { error =>
          for {
            output <- outputM(wrapServerError(error))
            r      <- responder(request, output)
          } yield RequestResult.Response(r)
        }
    }
  }
}

But when I started to test it I realized that other Endpoint based Interceptors (for example Metric or Tracing Interceptor) were not correctly processing effect failures from Request Interceptor. As a result, when I get request timeout error no metrics are collected from this request and so on.

If I control effect duration on Endpoint Interceptor level everything will be ok. But I want to control time of all requests on Request Interceptor level.

Other way is to implement Metric and Tracing Interceptor via Request Interceptor. But on this level there is no guaranty to grab endpoint path, because request have not been decoded yet (we have only supposed endpoint list at this point). But endpoint path is one of the main attributes of metric and trace span.

So, my question is what is the proper way to implement request timeout in my case? What do you think the best approach to do it?

I think the proper interceptor to use here is the EndpointInterceptor, added to he end of the interceptor list. That way, it is called last when a request arrives, and completes first when the response is generated. So you can attach the timeout to the effect description which computes the response.

Here’s a full example:

package sttp.tapir.examples

import sttp.monad.MonadError
import sttp.tapir.model.ServerRequest
import sttp.tapir.server.interceptor.{
  DecodeFailureContext,
  DecodeSuccessContext,
  EndpointHandler,
  EndpointInterceptor,
  Responder,
  SecurityFailureContext
}
import sttp.tapir.server.interpreter.BodyListener
import sttp.tapir.server.model.{ServerResponse, ValuedEndpointOutput}
import sttp.tapir.server.ziohttp.{ZioHttpInterpreter, ZioHttpServerOptions}
import sttp.tapir.ztapir._
import zio._
import zio.http.{HttpApp, Server}

object Test extends ZIOAppDefault {
  val helloWorld1 = endpoint.get.in("hello1").out(stringBody).zServerLogic(_ => ZIO.sleep(2.seconds).map(_ => "Hello, World!"))
  val helloWorld2 = endpoint.get.in("hello2").out(stringBody).zServerLogic(_ => ZIO.sleep(500.milliseconds).map(_ => "Hello, World!"))

  val so = ZioHttpServerOptions
    .default[Any]
    .appendInterceptor(new EndpointInterceptor[Task] {
      override def apply[B](responder: Responder[Task, B], next: EndpointHandler[Task, B]): EndpointHandler[Task, B] =
        new EndpointHandler[Task, B] {
          def timeout[T](r: ServerRequest, t: Task[T], wrapResponse: ServerResponse[B] => T): Task[T] = {
            t.timeout(1.second).flatMap {
              case Some(r) => ZIO.succeed(r)
              case None    => responder(r, ValuedEndpointOutput(stringBody, "Timeout")).map(wrapResponse)
            }
          }

          override def onDecodeSuccess[A, U, I](
              ctx: DecodeSuccessContext[Task, A, U, I]
          )(implicit monad: MonadError[Task], bodyListener: BodyListener[Task, B]): Task[ServerResponse[B]] =
            timeout(ctx.request, next.onDecodeSuccess(ctx), identity)

          override def onSecurityFailure[A](
              ctx: SecurityFailureContext[Task, A]
          )(implicit monad: MonadError[Task], bodyListener: BodyListener[Task, B]): Task[ServerResponse[B]] =
            timeout(ctx.request, next.onSecurityFailure(ctx), identity)

          override def onDecodeFailure(
              ctx: DecodeFailureContext
          )(implicit monad: MonadError[Task], bodyListener: BodyListener[Task, B]): Task[Option[ServerResponse[B]]] =
            timeout(ctx.request, next.onDecodeFailure(ctx), Some(_))
        }
    })

  val app: HttpApp[Any, Throwable] = ZioHttpInterpreter(so).toHttp(List(helloWorld1, helloWorld2))

  // starting the server
  override def run =
    Server
      .serve(app.withDefaultErrorResponse)
      .provide(
        ZLayer.succeed(Server.Config.default.port(8080)),
        Server.live
      )
      .exitCode
}

Couple of notes:

  • I’m using ZioHttpServerOptions.default.appendInterceptor to add the interceptor after the built-in ones (such as logging, metrics etc.), and before the decode failure handler. So the metrics etc. will “surround” the invocation of this custom one, and the decode failure handling + calling the server logic will be nested inside our interceptor, as e.g. the invocation of next.onDecodeSuccess(ctx).
  • the request interceptors are called first, and while this happens, the stack of endpoint interceptor is being built (request interceptor might add new endpoint interceptors, which is used e.g. by metrics). However, when it comes to processing the response, they are called last, after the metrics, logging etc. have all completed. So that’s why here this is not the correct interceptor type to use.
  • exception handling can be handled similarly - but maybe the built-in ExceptionInterceptor already does what you need it to do?

Finally, I think that the docs surrounding interceptors and how they work might be a bit better. I’ll try to work on that :slight_smile:

@adamw thank you very much for such a detail response.
Actually my case is more complex. I am developing a http server kit tool (wrapper over tapir) with a lot of functionality. Most of functions are implemented via interceptors - logging, tracing, custom metrics, limit checkers and so on. And I want my abstract server to be able to response with different protocols.
For this purpose I have trait Protocol:

trait Protocol {
  type ProtocolFailure

  type ProtocolSuccess[_]

  type TBounds[_]

  def successOutput[Payload: TBounds](about: String): EndpointOutput[ProtocolSuccess[Payload]]

  def failedOutput: EndpointOutput[ProtocolFailure]

  def liftError: ErrorLift[ProtocolFailure]

  def liftResult[Out]: ResultLift[Out, ProtocolSuccess]

}

and a lot of its implementations.

When I am building an endpoint I use Protocol instance with it successOutput and failedOutput definitions and wrapping business logic this way:

myEndpoint
  .serverLogic((auth: PRINCIPAL) => (input: INPUT) =>
    businessLogic(auth, input)
      .mapBoth(
        e => protocol.liftError(e),
        r => protocol.liftResult(r)
      )
  )

As for interceptors instead of binding each interceptor with Protocol instance to be able to make ValuedEndpointOutput for response like this:

val output =
  ValuedEndpointOutput[protocol.ProtocolFailure](
     protocol.failedOutput,
     protocol.liftError(error)
  )

I decided to fail request effect in all feature interceptors and recover error with protocol based response in only one very first interceptor.
What do you think of this approach?

And regarding to this context do we have any way to handle error from business logic with ZIO integration on Interceptor level? As I understand it is not possible because business logic effect (ZIO) wrapped to .either.resurrect. So any effect fail leads to success effect result on interceptor level.

It would be nice if my failed effect of business logic be nested inside interceptors chain (to be able to recover it with protocol response in single place together with interceptors logic).

I found this discussion similar to this subject [improvement] zio-http integration not comfortable failed effect usages · Issue #1782 · softwaremill/tapir · GitHub
If it is still actual I could make a PR with this new method zServerLogicFallible.

I don’t see why not if this works for you :slight_smile:

So if I understand correctly what you want to do, you don’t want to map the errors that might occur while running the server logic to a response using the error outputs, but instead you want to have a failed effect which is handled generally using an interceptor.

I also suspect you do want to have the error outputs for documentation. In that case, maybe an option would be remove the error outputs from the endpoint, right before applying the server logic.

Sth like (skipping security inputs for brevity):

val myEndpoint: PublicEndpoint[I, E, O, C] = ... // this is used to generate docs
val myLogic: I => ZIO[Any, E, O] = ... // E must be a subtype of Throwable
val myServerEndpoint = myEndpoint
  .copy(errorOutput = EndpointOutput.Void()) // the endpoint now has type PublicEndpoint[I, Nothing, O, C]
  .zServerLogic(i => myLogic(i).orDie) // pushing the errors to a defect, will be shortly resurrected, but the types match here

In the ZIO interceptors you deal with RIO which have the error channel set to Throwable - these will be the errors that have been resurrected, before pushing them into the defect channel using .orDie.

I saw the PR - thanks - though I think that effectively not using the error outputs, as specified in the endpoint, for generating the error response, might be confusing. Such an approach mandates using an interceptor. So while this is fine in an application or a custom library/framework, in general in tapir I don’t think that would work. Plus I think it’s best to be explicit about removing the error outputs from response generation as describe above.

So summing up - I think the approach is definitely viable, but not necessarily an addition to tapir that would work for other users.

Though coming to think about it, maybe this can be generalized, just as we now have serverLogicRecoverErrors, maybe we could have serverLogicIgnoreErrors, and similary, zServerLogicIgnoreErrors - with clear documentation on how to handle errors (via interceptors) in that case. But that would need a separate issue & separate discussion to see if there’s broader interest for such a feature.

@adamw thank you very much for your answer!
You totally right, there is no need in my PR because I can wrap my business logic effect into .orDie just now. But I think it is not the right way because .orDie causing unhandled fiber error in this case. I will think up about more idiomatic way how to generalize it. Thanks for pointing me in the direction of how it could be done!