Log request body with ServerLog for http4s

Im using tapir with http4s, and I want to log requests bodies, for that I’ve created custom ServerLog[F] implementation, and for example in ServerLog.requestHandled method, im trying to extract request body from underlying http4s request, but for large request bodies, I have an error from http4s internals - Body Has Been Consumed Completely Already

here is requestHandled function:

  override def requestHandled(
      ctx: DecodeSuccessContext[F, ?, ?, ?],
      response: ServerResponse[?],
      token: TOKEN,
  ): F[Unit] =
    Applicative[F].whenA(logWhenHandled) {
      (
        showRequest(ctx.request, ctx.endpoint.some),
        showEndpoint(ctx.endpoint),
        showResponse(response),
      ) flatMapN { case (request, endpoint, response) =>
        val message = s"""Request: $request,
                         | handled by: $endpoint${took(token)};
                         | response: $response
                         |""".stripMargin.replaceAll("\n", "")
        log(message)
      }
    }

and here is my showRequest function:

  private def showRequest(
      request: ServerRequest,
      endpoint: Option[AnyEndpoint] = None,
  ): F[String] = {
    val bodyM = (request.underlying, endpoint) match {
      case (underlying: Request[F], Some(_)) =>
        val isBinary = underlying.contentType
          .exists(_.mediaType.binary)
        val isJson = underlying.contentType
          .exists(mT => mT.mediaType == MediaType.application.json || mT.mediaType.subType.endsWith("+json"))

        if (!isBinary || isJson)
          underlying
            .bodyText(implicitly, underlying.charset.getOrElse(Charset.`UTF-8`))
            .compile
            .string
        else
          Applicative[F].pure("<binary>")

      case _ =>
        Applicative[F].pure("<disabled>")
    }

    bodyM map { body => s"${request.showShort} body='$body'" }
  }

Im in stuck right now, mostly the same code as in showRequest just works as logger middleware in http4s.

Hi @Ravenow, I’m investigating the issue.
I have a question: What’s the body type in your endpoint definition? Is it jsonBody?
I suspect that this may be related to requestHandled being called after the response is ready to send back, which means that Tapir must have read the request body stream. Maybe for small inputs http4s preloads all data into memory and the stream is available for reading multiple times, while for larger body it’s directly read from the input, which is not allowed twice. However, doing so in a logger middleware doesn’t cause issues, which may be further related to some internal mechanics of http4s.
Another interesting finding is that this error happens with Ember, while Blaze simply returns an empty body stream without failing.
By the way, are you OK with logging such a large body?

Hi, we are using ember and request type is jsonBody[CaseClass]. If you want I can provide sample project. I`ve did testing with jsons from JSON dummy data | Demos
everything is ok with 64Kb json but fails with any larger json(128Kb+)

I looked into http4s’s RequestLogger, and it confirms my suspicions. In short It wraps the input stream of bytes with special logic that reads message body into an additional String, which will be logged after the stream completes. Then the new request (with stream wrapper) is passed for further processing. This guarantees that the stream will be read only once.
In case of Tapir, underlying.body stream is processed to produce the input JSON, and then you attempt to process it again in requestHandled. Apparently for small bodies it’s OK, as the stream refers to an in-memory byte array, but for larger requests it’s a read-only stream.

That said, I don’t see any quick solution to your problem. Technically one could write a request interceptor and transform the http4s Request into some kind of its extended version which exposes a loggable F[String] built in a similar fashion to what the middleware logger does, but I’m not sure it’s worth pursuing. Do you require to log all JSON bodies, even longer than 65kB?

Ok, I`ve created following middleware:

object MemoizeMiddleware {
  def apply[F[_]: Async, G[_]: MonadCancelThrow](fk: F ~> G)(httpApp: Http[G, F]): Http[G, F] =
    Kleisli { (request: Request[F]) =>
      fk(Async[F].ref(Vector.empty[Chunk[Byte]]))
        .flatMap { vec =>
          val changedRequest = if (isLoggable(request)) {
            val collectChunks: Pipe[F, Byte, Nothing] =
              _.chunks.flatMap(c => Stream.exec(vec.update(_ :+ c)))

            val newBody = Stream.eval(vec.get).flatMap(v => Stream.emits(v)).unchunks

            request
              .withBodyStream(request.body.observe(collectChunks))
              .withAttribute(Keys.RequestBodyStream[F], newBody)
          } else request

          httpApp(changedRequest)
        }
    }

  def httpApp[F[_]: Async](httpApp: HttpApp[F]): HttpApp[F] =
    MemoizeMiddleware.apply(FunctionK.id[F])(httpApp)

  def isLoggable[F[_]](request: Request[F]): Boolean = {
    val isBinary = request.contentType
      .exists(_.mediaType.binary)
    val isJson = request.contentType
      .exists(mT => mT.mediaType == MediaType.application.json || mT.mediaType.subType.endsWith("+json"))

    !isBinary || isJson
  }

  object Keys {
    private[this] val requestBodyStream: Key[Any]     = Key.newKey[SyncIO, Any].unsafeRunSync()
    def RequestBodyStream[F[_]]: Key[Stream[F, Byte]] = requestBodyStream.asInstanceOf[Key[Stream[F, Byte]]]
  }
}

with motsly the same logic as in http4s logger middleware.

Thanks