Hi there.
I’m introducing SSE in my own little framework.
I have this code.
Capability
package framework.tapir.capabilities
/** Capability trait for server-sent events. */
trait ServerSentEvents
Shared code
extension [SecurityInput, Input, Output, AuthError, Requirements](
e: Endpoint[SecurityInput, Input, AuthError, Output, Requirements]
) {
/** Marks the endpoint as server-sent events.
*
* Example:
* {{{
* val updates =
* endpoint
* .get
* .securityIn(auth.bearer[AppAuthData]()).errorOut(jsonBody[AppAuthError])
* // Note that we need to use cookie security for SSE as browsers do not support sending custom headers
* .securityViaCookie[CookieAppAuthData]
* .in(RootPath / "updates")
* .serverSentEvents[AppUpdate]
* }}}
*/
def serverSentEvents[Output2](using
codec: Codec[String, Output2, ?]
): Endpoint[SecurityInput, Input, AuthError, Output2, Requirements & ServerSentEvents] = {
e.withOutputPublic(EndpointIO.Body(RawBodyType.StringBody(StandardCharsets.UTF_8), codec, EndpointIO.Info.empty))
}
}
Which requires this nifty hack:
package sttp.tapir
// A hack to make package private methods visible
extension [SECURITY_INPUT, INPUT, ERROR_OUTPUT, OUTPUT, R](
e: Endpoint[SECURITY_INPUT, INPUT, ERROR_OUTPUT, OUTPUT, R]
) {
/** Allows you to completely replace the security input of the endpoint. */
def withSecurityInputPublic[SI2, R2](
input: EndpointInput[SI2]
): Endpoint[SI2, INPUT, ERROR_OUTPUT, OUTPUT, R & R2] =
e.withSecurityInput(input)
/** Allows you to completely replace the input of the endpoint. */
def withInputPublic[I2, R2](
input: EndpointInput[I2]
): Endpoint[SECURITY_INPUT, I2, ERROR_OUTPUT, OUTPUT, R & R2] =
e.withInput(input)
/** Allows you to completely replace the output of the endpoint. */
def withOutputPublic[O2, R2](
output: EndpointOutput[O2]
): Endpoint[SECURITY_INPUT, INPUT, ERROR_OUTPUT, O2, R & R2] =
e.withOutput(output)
}
Side-note: Why aren’t with*
methods public anyway?
Server side
package framework.exts
import sttp.tapir.*
import sttp.tapir.server.*
import sttp.tapir.server.http4s.*
import sttp.model.sse.ServerSentEvent
import sttp.capabilities.fs2.Fs2Streams
import scala.annotation.targetName
import sttp.tapir.given
import framework.tapir.capabilities.ServerSentEvents
// TODO: make this actually fail to compile if the endpoint doesn't have ServerSentEvents capability
extension [SECURITY_INPUT, INPUT, ERROR_OUTPUT, OUTPUT, R <: ServerSentEvents](
e: Endpoint[SECURITY_INPUT, INPUT, ERROR_OUTPUT, OUTPUT, R]
) {
/** Turns the endpoint into a simple server-sent events endpoint. */
def toSSE[F[_]](using
codec: TapirCodec[String, OUTPUT, ?]
): Endpoint[SECURITY_INPUT, INPUT, ERROR_OUTPUT, Stream[F, OUTPUT], Fs2Streams[F]] = {
e.toSSE { output =>
val encoded = codec.encode(output)
ServerSentEvent(data = Some(encoded))
}
}
/** Turns the endpoint into a simple server-sent events endpoint that serves JSON. */
def toSSEJson[F[_]](using
encoder: CirceEncoder[OUTPUT]
): Endpoint[SECURITY_INPUT, INPUT, ERROR_OUTPUT, Stream[F, OUTPUT], Fs2Streams[F]] = {
e.toSSE { output =>
val encoded = encoder(output).noSpaces
ServerSentEvent(data = Some(encoded))
}
}
/** Turns the endpoint into a simple server-sent events endpoint. */
def toSSE[F[_]](
mapper: OUTPUT => ServerSentEvent
): Endpoint[SECURITY_INPUT, INPUT, ERROR_OUTPUT, Stream[F, OUTPUT], Fs2Streams[F]] = {
e.toSSEStream(_.map(mapper))
}
/** Turns the endpoint into a simple server-sent events endpoint. */
def toSSEStream[F[_]](
pipe: fs2.Pipe[F, OUTPUT, ServerSentEvent]
): Endpoint[SECURITY_INPUT, INPUT, ERROR_OUTPUT, Stream[F, OUTPUT], Fs2Streams[F]] = {
val sseBody = serverSentEventsBody[F]
val sseEndpoint =
e.withOutputPublic(sseBody.toEndpointIO)
.asInstanceOf[Endpoint[
SECURITY_INPUT,
INPUT,
ERROR_OUTPUT,
Stream[F, ServerSentEvent],
Fs2Streams[F],
]]
val endpoint =
sseEndpoint
.mapOut[fs2.Stream[F, OUTPUT]](_ => throw new NotImplementedError("this should never be invoked"))(pipe)
endpoint
}
}
Client-side
package framework.exts
import com.raquo.airstream.custom.CustomSource
import framework.data.AppBaseUri
import framework.prelude.sttpClientInterpreter
import framework.utils.{NetworkError, NetworkOrAuthError}
import org.scalajs.dom.{EventSource, EventSourceInit, MessageEvent}
import sttp.client3.Request
import sttp.tapir.{DecodeResult, Endpoint, PublicEndpoint}
import scala.annotation.targetName
import scala.scalajs.js.JSON
import framework.tapir.capabilities.ServerSentEvents
// TODO: make this actually fail to compile if the endpoint doesn't have ServerSentEvents capability
extension [SecurityInput, Input, Output, AuthError, Requirements <: ServerSentEvents](
e: Endpoint[SecurityInput, Input, AuthError, Output, Requirements]
) {
/** Turns the endpoint into a server-sent event stream without decoding.
*
* @param withCredentials
* A boolean value, defaulting to false, indicating if CORS should be set to include credentials.
*/
def toSSEStreamRaw(
securityParams: SecurityInput,
params: Input,
withCredentials: js.UndefOr[Boolean] = js.undefined,
)(using baseUri: AppBaseUri): EventStream[MessageEvent] = {
val uri = e.toReq(securityParams, params).uri
val options = js.Dynamic.literal(withCredentials = withCredentials).asInstanceOf[EventSourceInit]
EventStream.fromDomEventSource(new EventSource(uri.toString, options))
}
/** Turns the endpoint into a server-sent event stream.
*
* Decoding failures will throw an exception.
*
* @param withCredentials
* A boolean value, defaulting to false, indicating if CORS should be set to include credentials.
*/
def toSSEStream(
securityParams: SecurityInput,
params: Input,
withCredentials: js.UndefOr[Boolean] = js.undefined,
)(using
baseUri: AppBaseUri,
codec: TapirCodec[String, Output, ?],
): EventStream[(MessageEvent, Output)] = {
val evtStream = toSSEStreamRaw(securityParams, params, withCredentials)
evtStream.map { evt =>
val either = for {
dataStr <- evt.data match {
case str: String => Right(str)
case other => Left(s"Event data is not a string: $other")
}
output <- codec.decode(dataStr) match {
case DecodeResult.Value(output) => Right(output)
case failure: DecodeResult.Failure => Left(s"Failed to decode event data '$dataStr': $failure")
}
} yield (evt, output)
either match {
case Left(err) => throw new Exception(err)
case Right(value) => value
}
}
}
}
And while this works, it does not have type safety. Even if the endpoint has the Any
requirement, I can still invoke all of the extension methods in server and client side code.
Here’s a little example in scastie: Scastie - An interactive playground for Scala.
Why are Requirements
in Endpoint
contravariant and is there a way to achieve my goal?