Output based on input?

Hi!
So, depending on a field of my input object (json), I want to either respond with a method that will return a ZIO[R,E,Something] or ZStream[R, E, Something]. I’m not sure how to code that?.
It seems like what I want is somewhat like oneOf, but I’m not sure how to do this properly:

val myEndpoint = endpoint.post
    .name("report")
    .description("Runs a report, returning a json stream if requested")
    .in("a" / "report")
    .in(jsonBody[MyRequest])
    .out(
      oneOf(
        oneOfVariant(jsonBody[MyResponse]), //Do this if request.doStream = false 
        oneOfVariant(streamBody(ZioStreams)(Schema.schemaForString, CodecFormat.TextPlain()).toEndpointIO) // Do this if request.doStream = true
      )
    )

and obviously my impl would be something like:

    val myEndopintImpl = myEndpoint.zServerLogic[Any] { (req: MyRequest) =>
      if(req.doStream)
        reports.streamIt() //returns ZStream[A, Nothing, String]
      else
        reports.doIt() // which returns ZIO[A, B, MyResponse]
    }

You can try oneOf bodies, see here. That’s not an entirely dynamic solution, as you’ll need different data types for both branches, but I think might do the trick.

I tried oneOfBody but I got an error about erasure, I don’t know how to get past that, in the examples it’s all about the status error, which is different from my case.
Thanks for the help.

Hm maybe you can share some code or a simplified example, which would show the problem? Then we might be able to help :slight_smile:

I did :slight_smile:
So… given the following implementation:

val myEndopintImpl = myEndpoint.zServerLogic[Any] { (req: MyRequest) =>
      if(req.doStream)
        reports.streamIt() //returns ZStream[A, Nothing, String]
      else
        reports.doIt() // which returns ZIO[A, B, MyResponse]
    }

What would be the declaration of myEndpoint?

So the error message suggests what might be the fix - you can’t use generic types in oneOfVariant, as run-time checks which variant to use are unreliable then.

One option is to use oneOfDefaultVariant:

val myEndpoint = endpoint.get
    .in(query[Boolean]("stream"))
    .out(
      oneOf(
        oneOfVariant(jsonBody[MyResponse]),
        oneOfDefaultVariant(streamBody(ZioStreams)(Schema.schemaForString, CodecFormat.TextPlain()).toEndpointIO)
      )
    )

Or a class matcher:

val myEndpoint = endpoint.get
    .in(query[Boolean]("stream"))
    .out(
      oneOf(
        oneOfVariant(jsonBody[MyResponse]),
        oneOfVariantClassMatcher(
          streamBody(ZioStreams)(Schema.schemaForString, CodecFormat.TextPlain()).toEndpointIO,
          classOf[ZStream[_, _, _]]
        )
      )
    )

However, I can’t test this using zio-http, as I’m getting an exception when trying the streaming branch:

timestamp=2024-01-10T10:17:25.469592Z level=WARN thread=#zio-fiber-39 message="Fatal exception in Netty" cause="Exception in thread "zio-fiber-" io.netty.handler.codec.EncoderException: java.lang.IllegalStateException: unexpected message type: DefaultFullHttpResponse, state: 2
	at io.netty.handler.codec.http.HttpObjectEncoder.write(HttpObjectEncoder.java:108)
	at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:881)
	at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:863)
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:968)
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:856)
	at io.netty.channel.ChannelDuplexHandler.write(ChannelDuplexHandler.java:115)
	at io.netty.handler.codec.http.HttpServerKeepAliveHandler.write(HttpServerKeepAliveHandler.java:87)
	at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:879)
	at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:863)
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:968)
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:856)
	at io.netty.channel.ChannelDuplexHandler.write(ChannelDuplexHandler.java:115)
	at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:879)
	at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:940)
	at io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1247)
	at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
	at io.netty.channel.kqueue.KQueueEventLoop.run(KQueueEventLoop.java:300)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:833)
	Suppressed: java.lang.IllegalStateException: unexpected message type: DefaultFullHttpResponse, state: 2
		at io.netty.handler.codec.http.HttpObjectEncoder.throwUnexpectedMessageTypeEx(HttpObjectEncoder.java:348)
		at io.netty.handler.codec.http.HttpObjectEncoder.encodeFullHttpMessage(HttpObjectEncoder.java:305)
		at io.netty.handler.codec.http.HttpObjectEncoder.encode(HttpObjectEncoder.java:162)
		at io.netty.handler.codec.http.HttpObjectEncoder.write(HttpObjectEncoder.java:97)
		at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:881)
		at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:863)
		at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:968)
		at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:856)
		at io.netty.channel.ChannelDuplexHandler.write(ChannelDuplexHandler.java:115)
		at io.netty.handler.codec.http.HttpServerKeepAliveHandler.write(HttpServerKeepAliveHandler.java:87)
		at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:879)
		at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:863)
		at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:968)
		at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:856)
		at io.netty.channel.ChannelDuplexHandler.write(ChannelDuplexHandler.java:115)
		at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:879)
		at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:940)
		at io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1247)
		at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
		at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
		at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
		at io.netty.channel.kqueue.KQueueEventLoop.run(KQueueEventLoop.java:300)
		at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
		at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
		at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
		at java.base/java.lang.Thread.run(Thread.java:833)"

Does it work for you?

It seems this occurs when I’m using too short bodies, see "unexpected message type: DefaultFullHttpResponse" with certain response streams · Issue #2608 · zio/zio-http · GitHub

With longer/async bodies things work fine

Awesome, thanks, I’ll give that a try, it’ll be a bit before all my refactoring to use tapir is done for me to try running it.