I'm seeing an inconsistent problem where some messages are corrupted. We're sending serialized objects over websockets, and are seeing some corrupted messages with larger messages and sometimes when sending more quickly. We've testing with a number of versions including undertow 1.1.8.Final, but was seeing the same issue with newer versions as well.  Wireshark appears to show the corrupted data on the client side.


I've included sample code. The server creates a handler at localhost:6025 and then listens for messages.  When a message is received it checks the message against an expected pattern and prints out any bytes that don’t match.

 

The client connects to the server and sends five 5000 byte messages one second apart.  On my machine at least one of the 5 messages would come across with some corrupted data. I narrowed this down some - if the message is 2930 bytes, it appears to get through. At 2931, I start seeing some corruption. Using same test code with 1000 byte messages I was able to send 10000 messages with no sleep in between each send and all messages went through fine.  


Hopefully I'm missing something obvious. Thanks.


-----


import io.undertow.Handlers;

import io.undertow.Undertow;

import io.undertow.websockets.WebSocketConnectionCallback;

import io.undertow.websockets.core.AbstractReceiveListener;

import io.undertow.websockets.core.BufferedBinaryMessage;

import io.undertow.websockets.core.WebSocketChannel;

import io.undertow.websockets.core.WebSockets;

import io.undertow.websockets.spi.WebSocketHttpExchange;

import org.xnio.Pooled;


import java.io.IOException;

import java.nio.ByteBuffer;

import java.util.Arrays;


public class WebsocketTestServer

{

  public WebsocketTestServer()

  {

    Undertow server = Undertow.builder()

        .addHttpListener(6025, "localhost")

        .setHandler(Handlers.pathTemplate().add("test", Handlers.websocket(new TestHandler())))

        .build();


    server.start();

    System.out.println("Server Started");

  }


  private class TestHandler extends AbstractReceiveListener implements WebSocketConnectionCallback

  {

    @Override

    public void onConnect(WebSocketHttpExchange webSocketHttpExchange, WebSocketChannel webSocketChannel)

    {

      System.out.println("Client connected");

      webSocketChannel.getReceiveSetter().set(this);

      webSocketChannel.resumeReceives();

    }


    @Override

    protected void onFullBinaryMessage(WebSocketChannel channel, BufferedBinaryMessage message) throws IOException

    {

      try (Pooled<ByteBuffer[]> pooled = message.getData())

      {

        ByteBuffer[] buffers = pooled.getResource();

        ByteBuffer bb = (buffers.length == 1) ? buffers[0] : WebSockets.mergeBuffers(buffers);

        if (bb.limit() > 0)

        {

          checkData(bb);

        }

      }

    }

  }


  // Checks data against expected pattern

  public static void checkData(ByteBuffer data)

  {

    long messageId = data.getLong(0);

    for (int i = Long.BYTES; i < data.limit(); i++)

    {

      byte expected = (byte) (i%256);

      byte actual = data.get(i);


      if (actual != expected)

      {

        System.err.println("**** Corrupted data for message " + messageId + " byte " + i + " expected " + expected + " but was " + actual);

      }

    }

  }



  public static void main(String[] args) throws InterruptedException

  {

    WebsocketTestServer server = new WebsocketTestServer();


    while (true)

    {

      Thread.sleep(5000);

    }

  }

}


----------

import javax.websocket.ClientEndpoint;
import javax.websocket.ContainerProvider;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.WebSocketContainer;
import java.net.URI;
import java.nio.ByteBuffer;

@ClientEndpoint
public class WebsocketTestClient
{
  private Session session;

  public WebsocketTestClient()
  {
    try
    {
      WebSocketContainer container = ContainerProvider.getWebSocketContainer();
      container.connectToServer(this, URI.create("ws://localhost:6025/test"));
    }
    catch (Exception e)
    {
      e.printStackTrace();
    }
  }

  @OnOpen
  public void onOpen(Session session)
  {
    System.out.println("Session opened");
    this.session = session;
  }

  public void sendData(long index)
  {
    try
    {
      session.getBasicRemote().sendBinary(createTestData(index));
    }
    catch (Exception e)
    {
      e.printStackTrace();
    }
  }

  public static final ByteBuffer createTestData(long messageId)
  {
    ByteBuffer buffer = ByteBuffer.allocate(5000);
    buffer.putLong(messageId);
    for(int i=Long.BYTES; i< buffer.limit(); i++)
    {
      buffer.put((byte) (i%256));
    }
    buffer.flip();

    return buffer;
  }

  public static void main(String[] args) throws InterruptedException
  {
    WebsocketTestClient client = new WebsocketTestClient();
    Thread.sleep(1000);

    for(int i=0; i<5; i++)
    {
      System.out.println("Sending " + i);
      client.sendData(i);
      Thread.sleep(1000);
    }
    System.out.println("Done");
  }
}