WebSocket state monitoring

Hi,

I’m trying to adopt sttp4 client to interact with a server via WebSockets. WebSockets seems unstable. For example, if the connection drops sttp4 throws exception Caused by: java.io.IOException: Software caused connection abort. Sometimes exception Caused by: java.io.IOException: Operation timed out occurs.

I’m using code from an example:

  implicit val runtime: IORuntime = cats.effect.unsafe.implicits.global

  private def webSocketFramePipe: Pipe[IO, WebSocketFrame.Data[_], WebSocketFrame] = { input =>
    Stream.emit(WebSocketFrame.text("""
                                      |{
                                      |    "op": "subscribe",
                                      |    "args": [
                                      |        "topic-name"
                                      |    ]
                                      |}
                                      |""".stripMargin)) ++ input.flatMap {
      case WebSocketFrame.Text(n, _, _) =>
        println(s"received: $n")
        Stream.emit(WebSocketFrame.Ping(Array(2)))

      case x =>
        println("...other...")
        Stream.empty // ignoring
    }
  }

  HttpClientFs2Backend
    .resource[IO]()
    .use { backend =>
      basicRequest
        .get(uri"wss://stream.server.com/public")
        .response(asWebSocketStream(Fs2Streams[IO])(webSocketFramePipe))
        .send(backend)
        .void
    }
    .unsafeRunSync()

How can I monitor the state of a websocket? And reconnect it if it fails?

I also tried and found out that sending Stream.empty in case WebSocketFrame.Text(n, _, _) => stops streaming after ~30 frames. But if in send WebSocketFrame.Ping in response, then the server sends frames infinitely. What proper message should I send in response to case WebSocketFrame.Text(...)=>?

Thanks for the report and sorry for the late response :slight_smile:

I think you might be hitting several different problems, on various levels.

First, if the connection drops, I think an IOException is the correct behaviour, that is it’s something you would expect. You might get different results depending on the server - if it closed the connection properly (with a Close frame), or just silently stopped responding to packets.

Secondly, reconnecting a websocket I think requires performing the .send again, if there’s an error. You can do an infitie-loop style def doSend = basicRequest.(...).send(backend).recover(doSend), but you’ll probably will want to add some limits to the number of iterations.

Thirdly, there’s the ping-pong behavior. The backend you are using doesn’t perform automatic pings, and it seems that is what your server requires (so that it knows that the client is alive). We should probably add it, along with fixing the example (so that it at least returns Pongs on Pings). I’ve created an issue to cover this.

As for monitoring, what kind of monitoring would you need? There’s the loggin backend, plus the metrics backends. Or did you have something else in mind?

1 Like