WebSocket state monitoring

Hi,

I’m trying to adopt sttp4 client to interact with server via WebSockets. WebSockets seems unstable. For example, if a connection drops, sttp4 throws exception:

Exception in thread "main" java.lang.ExceptionInInitializerError
	at WebSocketApp.main(WebSocket.scala)
Caused by: sttp.client4.SttpClientException$ReadException: Exception when sending request: GET wss://stream.server.com/public/
	at sttp.client4.SttpClientExceptionExtensions.defaultExceptionToSttpClientException(SttpClientExceptionExtensions.scala:24)
	at sttp.client4.SttpClientExceptionExtensions.defaultExceptionToSttpClientException$(SttpClientExceptionExtensions.scala:8)
	at sttp.client4.SttpClientException$.defaultExceptionToSttpClientException(SttpClientException.scala:24)
	at sttp.client4.httpclient.HttpClientBackend.adjustExceptions$$anonfun$1(HttpClientBackend.scala:55)
	at sttp.client4.SttpClientException$$anon$1.applyOrElse(SttpClientException.scala:35)
	at sttp.client4.SttpClientException$$anon$1.applyOrElse(SttpClientException.scala:34)
	at cats.ApplicativeError.recoverWith$$anonfun$1(ApplicativeError.scala:172)
	at modify @ fs2.internal.Scope.close(Scope.scala:262)
	at flatten @ sttp.client4.impl.cats.CatsMonadError.flatten(CatsMonadError.scala:21)
	at delay @ sttp.client4.impl.cats.CatsMonadError.eval(CatsMonadError.scala:19)
	at flatten @ sttp.client4.impl.cats.CatsMonadError.flatten(CatsMonadError.scala:21)
	at modify @ fs2.internal.Scope.close(Scope.scala:262)
	at flatten @ sttp.client4.impl.cats.CatsMonadError.flatten(CatsMonadError.scala:21)
Caused by: java.io.IOException: Software caused connection abort
	at java.base/sun.nio.ch.SocketDispatcher.read0(Native Method)
	at java.base/sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:47)
	at java.base/sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:330)
	at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:284)
	at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:259)
	at java.base/sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:417)
	at java.net.http/jdk.internal.net.http.SocketTube.readAvailable(SocketTube.java:1170)
	at java.net.http/jdk.internal.net.http.SocketTube$InternalReadPublisher$InternalReadSubscription.read(SocketTube.java:833)
	at java.net.http/jdk.internal.net.http.SocketTube$SocketFlowTask.run(SocketTube.java:181)
	at java.net.http/jdk.internal.net.http.common.SequentialScheduler$SchedulableTask.run(SequentialScheduler.java:230)
	at java.net.http/jdk.internal.net.http.common.SequentialScheduler.runOrSchedule(SequentialScheduler.java:303)
	at java.net.http/jdk.internal.net.http.common.SequentialScheduler.runOrSchedule(SequentialScheduler.java:256)
	at java.net.http/jdk.internal.net.http.SocketTube$InternalReadPublisher$InternalReadSubscription.signalReadable(SocketTube.java:774)
	at java.net.http/jdk.internal.net.http.SocketTube$InternalReadPublisher$ReadEvent.signalEvent(SocketTube.java:957)
	at java.net.http/jdk.internal.net.http.SocketTube$SocketFlowEvent.handle(SocketTube.java:253)
	at java.net.http/jdk.internal.net.http.HttpClientImpl$SelectorManager.handleEvent(HttpClientImpl.java:979)
	at java.net.http/jdk.internal.net.http.HttpClientImpl$SelectorManager.lambda$run$3(HttpClientImpl.java:934)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
	at java.net.http/jdk.internal.net.http.HttpClientImpl$SelectorManager.run(HttpClientImpl.java:934)
	at flatMap @ sttp.client4.impl.cats.CatsMonadError.flatMap(CatsMonadError.scala:12)
	at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
	at handleErrorWith @ fs2.Compiler$Target.handleErrorWith(Compiler.scala:161)
	at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)

Sometimes operation timed out occurs:

Exception in thread "main" java.lang.ExceptionInInitializerError
	at WebSocketApp.main(WebSocket.scala)
Caused by: sttp.client4.SttpClientException$ReadException: Exception when sending request: GET wss://stream.bybit.com/v5/public/option
	at sttp.client4.SttpClientExceptionExtensions.defaultExceptionToSttpClientException(SttpClientExceptionExtensions.scala:24)
	at sttp.client4.SttpClientExceptionExtensions.defaultExceptionToSttpClientException$(SttpClientExceptionExtensions.scala:8)
	at sttp.client4.SttpClientException$.defaultExceptionToSttpClientException(SttpClientException.scala:24)
	at sttp.client4.httpclient.HttpClientBackend.adjustExceptions$$anonfun$1(HttpClientBackend.scala:55)
	at sttp.client4.SttpClientException$$anon$1.applyOrElse(SttpClientException.scala:35)
	at sttp.client4.SttpClientException$$anon$1.applyOrElse(SttpClientException.scala:34)
	at cats.ApplicativeError.recoverWith$$anonfun$1(ApplicativeError.scala:172)
	at modify @ fs2.internal.Scope.close(Scope.scala:262)
	at flatten @ sttp.client4.impl.cats.CatsMonadError.flatten(CatsMonadError.scala:21)
	at delay @ sttp.client4.impl.cats.CatsMonadError.eval(CatsMonadError.scala:19)
	at flatten @ sttp.client4.impl.cats.CatsMonadError.flatten(CatsMonadError.scala:21)
	at modify @ fs2.internal.Scope.close(Scope.scala:262)
	at flatten @ sttp.client4.impl.cats.CatsMonadError.flatten(CatsMonadError.scala:21)
Caused by: java.io.IOException: Operation timed out
	at java.base/sun.nio.ch.SocketDispatcher.read0(Native Method)
	at java.base/sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:47)
	at java.base/sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:330)
	at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:284)
	at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:259)
	at java.base/sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:417)
	at java.net.http/jdk.internal.net.http.SocketTube.readAvailable(SocketTube.java:1170)
	at java.net.http/jdk.internal.net.http.SocketTube$InternalReadPublisher$InternalReadSubscription.read(SocketTube.java:833)
	at java.net.http/jdk.internal.net.http.SocketTube$SocketFlowTask.run(SocketTube.java:181)
	at java.net.http/jdk.internal.net.http.common.SequentialScheduler$SchedulableTask.run(SequentialScheduler.java:230)
	at java.net.http/jdk.internal.net.http.common.SequentialScheduler.runOrSchedule(SequentialScheduler.java:303)
	at java.net.http/jdk.internal.net.http.common.SequentialScheduler.runOrSchedule(SequentialScheduler.java:256)
	at java.net.http/jdk.internal.net.http.SocketTube$InternalReadPublisher$InternalReadSubscription.signalReadable(SocketTube.java:774)
	at java.net.http/jdk.internal.net.http.SocketTube$InternalReadPublisher$ReadEvent.signalEvent(SocketTube.java:957)
	at java.net.http/jdk.internal.net.http.SocketTube$SocketFlowEvent.handle(SocketTube.java:253)
	at java.net.http/jdk.internal.net.http.HttpClientImpl$SelectorManager.handleEvent(HttpClientImpl.java:979)
	at java.net.http/jdk.internal.net.http.HttpClientImpl$SelectorManager.lambda$run$3(HttpClientImpl.java:934)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
	at java.net.http/jdk.internal.net.http.HttpClientImpl$SelectorManager.run(HttpClientImpl.java:934)
	at flatMap @ sttp.client4.impl.cats.CatsMonadError.flatMap(CatsMonadError.scala:12)
	at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)
	at handleErrorWith @ fs2.Compiler$Target.handleErrorWith(Compiler.scala:161)
	at flatMap @ fs2.Compiler$Target.flatMap(Compiler.scala:163)

I’m using the code from examples:

  implicit val runtime: IORuntime = cats.effect.unsafe.implicits.global

  private def webSocketFramePipe: Pipe[IO, WebSocketFrame.Data[_], WebSocketFrame] = { input =>
    Stream.emit(WebSocketFrame.text("""
                                      |{
                                      |    "op": "subscribe",
                                      |    "args": [
                                      |        "topic-name"
                                      |    ]
                                      |}
                                      |""".stripMargin)) ++ input.flatMap {
      case WebSocketFrame.Text(n, _, _) =>
        println(s"Received: $n")
        Stream.emit(WebSocketFrame.Ping(Array(2)))

      case x =>
        Stream.empty // ignoring
    }
  }

  HttpClientFs2Backend
    .resource[IO]()
    .use { backend =>
      basicRequest
        .get(uri"wss://stream.server.com/public")
        .response(asWebSocketStream(Fs2Streams[IO])(webSocketFramePipe))
        .send(backend)
        .void
    }
    .unsafeRunSync()

How to check the state of created WebSocket? And then reconnect it if it fails?

Also, I tried and found that if I send Stream.empty in case WebSocketFrame.Text(n, _, _) => the data stream stops after ~30 frames. If I send WebSocketFrame.Ping then the server sends frames infinitely. What proper message should be sent in response to case WebSocketFrame.Text?