Introducing ServerSentEvents capability: failing to achieve type safety

Hi there.

I’m introducing SSE in my own little framework.

I have this code.

Capability

package framework.tapir.capabilities

/** Capability trait for server-sent events. */
trait ServerSentEvents

Shared code

extension [SecurityInput, Input, Output, AuthError, Requirements](
  e: Endpoint[SecurityInput, Input, AuthError, Output, Requirements]
) {
  /** Marks the endpoint as server-sent events.
    *
    * Example:
    * {{{
    * val updates =
    *   endpoint
    *     .get
    *     .securityIn(auth.bearer[AppAuthData]()).errorOut(jsonBody[AppAuthError])
    *     // Note that we need to use cookie security for SSE as browsers do not support sending custom headers
    *     .securityViaCookie[CookieAppAuthData]
    *     .in(RootPath / "updates")
    *     .serverSentEvents[AppUpdate]
    * }}}
    */
  def serverSentEvents[Output2](using
    codec: Codec[String, Output2, ?]
  ): Endpoint[SecurityInput, Input, AuthError, Output2, Requirements & ServerSentEvents] = {
    e.withOutputPublic(EndpointIO.Body(RawBodyType.StringBody(StandardCharsets.UTF_8), codec, EndpointIO.Info.empty))
  }
}

Which requires this nifty hack:

package sttp.tapir

// A hack to make package private methods visible

extension [SECURITY_INPUT, INPUT, ERROR_OUTPUT, OUTPUT, R](
  e: Endpoint[SECURITY_INPUT, INPUT, ERROR_OUTPUT, OUTPUT, R]
) {

  /** Allows you to completely replace the security input of the endpoint. */
  def withSecurityInputPublic[SI2, R2](
    input: EndpointInput[SI2]
  ): Endpoint[SI2, INPUT, ERROR_OUTPUT, OUTPUT, R & R2] =
    e.withSecurityInput(input)

  /** Allows you to completely replace the input of the endpoint. */
  def withInputPublic[I2, R2](
    input: EndpointInput[I2]
  ): Endpoint[SECURITY_INPUT, I2, ERROR_OUTPUT, OUTPUT, R & R2] =
    e.withInput(input)

  /** Allows you to completely replace the output of the endpoint. */
  def withOutputPublic[O2, R2](
    output: EndpointOutput[O2]
  ): Endpoint[SECURITY_INPUT, INPUT, ERROR_OUTPUT, O2, R & R2] =
    e.withOutput(output)
}

Side-note: Why aren’t with* methods public anyway?

Server side

package framework.exts

import sttp.tapir.*
import sttp.tapir.server.*
import sttp.tapir.server.http4s.*
import sttp.model.sse.ServerSentEvent
import sttp.capabilities.fs2.Fs2Streams
import scala.annotation.targetName
import sttp.tapir.given
import framework.tapir.capabilities.ServerSentEvents

// TODO: make this actually fail to compile if the endpoint doesn't have ServerSentEvents capability
extension [SECURITY_INPUT, INPUT, ERROR_OUTPUT, OUTPUT, R <: ServerSentEvents](
  e: Endpoint[SECURITY_INPUT, INPUT, ERROR_OUTPUT, OUTPUT, R]
) {

  /** Turns the endpoint into a simple server-sent events endpoint. */
  def toSSE[F[_]](using
    codec: TapirCodec[String, OUTPUT, ?]
  ): Endpoint[SECURITY_INPUT, INPUT, ERROR_OUTPUT, Stream[F, OUTPUT], Fs2Streams[F]] = {
    e.toSSE { output =>
      val encoded = codec.encode(output)
      ServerSentEvent(data = Some(encoded))
    }
  }

  /** Turns the endpoint into a simple server-sent events endpoint that serves JSON. */
  def toSSEJson[F[_]](using
    encoder: CirceEncoder[OUTPUT]
  ): Endpoint[SECURITY_INPUT, INPUT, ERROR_OUTPUT, Stream[F, OUTPUT], Fs2Streams[F]] = {
    e.toSSE { output =>
      val encoded = encoder(output).noSpaces
      ServerSentEvent(data = Some(encoded))
    }
  }

  /** Turns the endpoint into a simple server-sent events endpoint. */
  def toSSE[F[_]](
    mapper: OUTPUT => ServerSentEvent
  ): Endpoint[SECURITY_INPUT, INPUT, ERROR_OUTPUT, Stream[F, OUTPUT], Fs2Streams[F]] = {
    e.toSSEStream(_.map(mapper))
  }

  /** Turns the endpoint into a simple server-sent events endpoint. */
  def toSSEStream[F[_]](
    pipe: fs2.Pipe[F, OUTPUT, ServerSentEvent]
  ): Endpoint[SECURITY_INPUT, INPUT, ERROR_OUTPUT, Stream[F, OUTPUT], Fs2Streams[F]] = {
    val sseBody = serverSentEventsBody[F]

    val sseEndpoint =
      e.withOutputPublic(sseBody.toEndpointIO)
        .asInstanceOf[Endpoint[
          SECURITY_INPUT,
          INPUT,
          ERROR_OUTPUT,
          Stream[F, ServerSentEvent],
          Fs2Streams[F],
        ]]
    val endpoint =
      sseEndpoint
        .mapOut[fs2.Stream[F, OUTPUT]](_ => throw new NotImplementedError("this should never be invoked"))(pipe)
    endpoint
  }
}

Client-side

package framework.exts

import com.raquo.airstream.custom.CustomSource
import framework.data.AppBaseUri
import framework.prelude.sttpClientInterpreter
import framework.utils.{NetworkError, NetworkOrAuthError}
import org.scalajs.dom.{EventSource, EventSourceInit, MessageEvent}
import sttp.client3.Request
import sttp.tapir.{DecodeResult, Endpoint, PublicEndpoint}

import scala.annotation.targetName
import scala.scalajs.js.JSON
import framework.tapir.capabilities.ServerSentEvents

// TODO: make this actually fail to compile if the endpoint doesn't have ServerSentEvents capability
extension [SecurityInput, Input, Output, AuthError, Requirements <: ServerSentEvents](
  e: Endpoint[SecurityInput, Input, AuthError, Output, Requirements]
) {

  /** Turns the endpoint into a server-sent event stream without decoding.
    *
    * @param withCredentials
    *   A boolean value, defaulting to false, indicating if CORS should be set to include credentials.
    */
  def toSSEStreamRaw(
    securityParams: SecurityInput,
    params: Input,
    withCredentials: js.UndefOr[Boolean] = js.undefined,
  )(using baseUri: AppBaseUri): EventStream[MessageEvent] = {
    val uri = e.toReq(securityParams, params).uri
    val options = js.Dynamic.literal(withCredentials = withCredentials).asInstanceOf[EventSourceInit]
    EventStream.fromDomEventSource(new EventSource(uri.toString, options))
  }

  /** Turns the endpoint into a server-sent event stream.
    *
    * Decoding failures will throw an exception.
    *
    * @param withCredentials
    *   A boolean value, defaulting to false, indicating if CORS should be set to include credentials.
    */
  def toSSEStream(
    securityParams: SecurityInput,
    params: Input,
    withCredentials: js.UndefOr[Boolean] = js.undefined,
  )(using
    baseUri: AppBaseUri,
    codec: TapirCodec[String, Output, ?],
  ): EventStream[(MessageEvent, Output)] = {
    val evtStream = toSSEStreamRaw(securityParams, params, withCredentials)
    evtStream.map { evt =>
      val either = for {
        dataStr <- evt.data match {
          case str: String => Right(str)
          case other       => Left(s"Event data is not a string: $other")
        }
        output <- codec.decode(dataStr) match {
          case DecodeResult.Value(output)    => Right(output)
          case failure: DecodeResult.Failure => Left(s"Failed to decode event data '$dataStr': $failure")
        }
      } yield (evt, output)

      either match {
        case Left(err)    => throw new Exception(err)
        case Right(value) => value
      }
    }
  }
}

And while this works, it does not have type safety. Even if the endpoint has the Any requirement, I can still invoke all of the extension methods in server and client side code.

Here’s a little example in scastie: Scastie - An interactive playground for Scala.

Why are Requirements in Endpoint contravariant and is there a way to achieve my goal?

The requirements, or rather: capabilities are contravariant as an endpoint which doesn’t require any capabilities from the interpreter, might be safely cast to an endpoint, which requires e.g. a streaming capability from the interpreter; but not the other way round. That is, if we have:

trait SomeServerInterpreter:
  def toRoutes(e: Endpoint[Streams])

you can call someServerInterpreter.toRoute(Endpoint[Any]), but you can’t (shouldn’t): `someServerInterpreter.toRoute(Endpoint[WebSockets & Streams]).

Maybe there’s some confusion as to the purpose of the capabilities type parameter. It’s not mean for requiring that an endpoint has a specific capability. But the other way round: it’s used for requiring, that the interpreter has a specific capability.

I think the with* methods are private because there was no use-cases for making them public.

Btw. there’s SSE support available for Pekko/Http4s interpreters, as well as a parser here: sttp-model/core/src/main/scala/sttp/model/sse/ServerSentEvent.scala at master · softwaremill/sttp-model · GitHub

Yes, I am using them. But they are only available in server side, with no client/shared code support, to I had to do that part myself.

Ah :slight_smile: Well if you think some functionality could be provided OOTB, please create a feature request. Or maybe write about your solution in a blog post?