UDP Help

Marc-André Laverdière marcandre.laverdiere at gmail.com
Thu Nov 4 13:39:46 EDT 2010


Execution handler will make your io threads go back faster to picking up
messages. That would make a huge difference.

On 3 Nov 2010 05:11, "cporter71" <cporter71 at gmail.com> wrote:


Hello.  I am a netty newbie coming from a current mina project.  Mina seems
to solve a lot of our needs with the exception of some performance issues.
Currently I am working out a simple test client/server to see how netty
performs - relative to our context of course!

Our use case is generally straight forward -- read UDP packets as fast as
possible and internally queue them into the application.  The packets are
fixed length (~350 bytes).  No writes from the server to the client are
required -- the server is strictly a consumer.

I have been following the examples and reading through the user group but
have not found a good example of a client/server where the client is sending
larger packets as fast as it can.  I created a sample/example demonstrating
such a use case however I clearly have something wrong because I cannot
send/receive 1000 packets on the loopback!  I will continue to dig to see if
I can:
> Find what I am doing wrong!  In the lengthy code attached I am receiving a
> fraction of the messages sent.  There seems to be something magic
> happening around message #110 where the sequences numbers start the
> separation/delta.
> Find information on how to "tune" netty for UDP to handle sustained rates
> of 5+ Mbs from 10-20 clients (assuming appropriate hardware specs).

It is understood that UDP makes no guarantee on delivery however on a
closed/wireless network we would like to see 95%+.  We have built-in retry
logic to fall back on however we would like to make sure the server is not
loosing/dropping the packets.

Any help would be appreciated.

cporter

>> This is my first post here - if there is a better way to post/share code
>> on this forum then please let me know.

///////////////////////
// Client
///////////////////////

package com.example.netty.udp.client;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.Executors;

import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class UdpClient
{
  private static final Logger log        = LoggerFactory.getLogger(
UdpClient.class );

  private String              ip         = "127.0.0.1";
  private int                 port       = 50001;

  private int                 identifier = 0;
  private int                 count      = 0;

  private final SocketAddress address;

  public UdpClient( int identifier, int count )
  {
     this.identifier = identifier;
     this.count = count;

     this.address = new InetSocketAddress( ip, port );
  }

  public UdpClient( String ip, int port, int identifier, int count )
  {
     this.ip = ip;
     this.port = port;

     this.identifier = identifier;
     this.count = count;

     this.address = new InetSocketAddress( ip, port );
  }

  public void send()
  {
     // Configure the client.
     ConnectionlessBootstrap bootstrap = new ConnectionlessBootstrap( new
NioDatagramChannelFactory( Executors
           .newCachedThreadPool() ) );

     // Set up the event pipeline factory.
     bootstrap.setPipelineFactory( new UdpClientPipelineFactory(
identifier, count ) );

     // Make a new connection.
     log.info( "Attempting to connect to address:{}", address );
     bootstrap.connect( address );
  }
}
package com.example.netty.udp.client;

import java.io.IOException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class UdpClientBootstrap
{
  private static final Logger log              = LoggerFactory.getLogger(
UdpClientBootstrap.class );

  private static final int    DefaultIdentifer = 42;
  private static final int    DefaultCount     = 1000;

  public static void main( String[] args ) throws IOException
  {
     try
     {
        int identifier = DefaultIdentifer;
        int count = DefaultCount;

        new UdpClient( identifier, count ).send();

        log.info( "Send complete -- exiting" );
     }
     catch ( Exception e )
     {
        log.error( "Unexpected error", e );
     }
  }
}
package com.example.netty.udp.client;

import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.example.netty.udp.model.DataRecord;

public class UdpClientHandler extends SimpleChannelUpstreamHandler
{
  private static final Logger log        = LoggerFactory.getLogger(
UdpClientHandler.class );

  private int                 identifier = 0;
  private int                 count      = 0;
  private int                 index      = 0;

  public UdpClientHandler( int identifier, int count )
  {
     this.identifier = identifier;
     this.count = count;
  }

  @Override
  public void channelConnected( ChannelHandlerContext ctx,
ChannelStateEvent e )
  {
     log.info( "channelConnected" );
     sendRecords( e );
  }

  @Override
  public void handleUpstream( ChannelHandlerContext ctx, ChannelEvent e )
throws Exception
  {
     if ( e instanceof ChannelStateEvent )
     {
        log.info( e.toString() );
     }
     super.handleUpstream( ctx, e );
  }

  @Override
  public void channelInterestChanged( ChannelHandlerContext ctx,
ChannelStateEvent e )
  {
     log.info( "channelInterestChanged" );
     sendRecords( e );
  }

  @Override
  public void exceptionCaught( ChannelHandlerContext ctx, ExceptionEvent e
)
  {
     log.error( "Unexpected exception from downstream.", e.getCause() );
  }

  private void sendRecords( ChannelStateEvent e )
  {
     Channel channel = e.getChannel();

     while ( channel.isWritable() )
     {
        if ( index <= count )
        {
           DataRecord record = new DataRecord( identifier, index );
           channel.write( record );
           index++;
        }
        else
        {
           break;
        }
     }
  }
}
package com.example.netty.udp.client;

import static org.jboss.netty.channel.Channels.pipeline;

import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;

import com.example.netty.udp.model.DataRecordEncoder;

public class UdpClientPipelineFactory implements ChannelPipelineFactory
{
  private final int identifier;
  private final int count;

  public UdpClientPipelineFactory( int identifier, int count )
  {
     this.identifier = identifier;
     this.count = count;
  }

  public ChannelPipeline getPipeline() throws Exception
  {
     ChannelPipeline pipeline = pipeline();

     // Add the number codec first,
     pipeline.addLast( "encoder", new DataRecordEncoder() );

     // and then business logic.
     pipeline.addLast( "handler", new UdpClientHandler( identifier, count )
);

     return pipeline;
  }
}

///////////////////////
// Server
///////////////////////

package com.example.netty.udp.server;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.Executors;

import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
import org.jboss.netty.channel.FixedReceiveBufferSizePredictorFactory;
import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class UdpServer
{
  private static final Logger log                     =
LoggerFactory.getLogger( UdpServer.class );

  public static final int     OneMg                   = 1024 * 1024;
  public static final int     SocketReceiveBufferSize = OneMg * 64;

  private String              ip                      = "127.0.0.1";
  private int                 port                    = 50001;

  private SocketAddress       address;

  public UdpServer()
  {
  }

  public UdpServer( String ip, int port )
  {
     this.ip = ip;
     this.port = port;
  }

  public void serve()
  {
     try
     {
        this.address = new InetSocketAddress( ip, port );

        ConnectionlessBootstrap bootstrap = new ConnectionlessBootstrap(
new NioDatagramChannelFactory( Executors
              .newCachedThreadPool() ) );

        // Set up the event pipeline factory.
        bootstrap.setPipelineFactory( new UdpServerPipelineFactory() );

        bootstrap.setOption( "receiveBufferSizePredictorFactory", new
FixedReceiveBufferSizePredictorFactory( 1024 ) );

        // Bind and start to accept incoming connections.
        log.info( "Attempting to bind to address:{}", address );
        bootstrap.bind( address );
     }
     catch ( Exception e )
     {
        log.error( "", e );
     }
  }
}
package com.example.netty.udp.server;

import java.io.IOException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class UdpServerBootstrap
{
  private static final Logger log = LoggerFactory.getLogger(
UdpServerBootstrap.class );

  public static void main( String[] args ) throws IOException
  {
     try
     {
        new UdpServer().serve();

        while ( true )
        {
           Thread.sleep( 5000 );
        }
     }
     catch ( Exception e )
     {
        log.error( "Unexpected error", e );
     }
  }
}
package com.example.netty.udp.server;

import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.example.netty.udp.model.DataRecord;

public class UdpServerHandler extends SimpleChannelHandler
{
  private static final Logger log      = LoggerFactory.getLogger(
UdpServerBootstrap.class );

  private static int          received = 0;

  @Override
  public void messageReceived( ChannelHandlerContext ctx, MessageEvent e )
  {
     if ( e.getMessage() instanceof DataRecord )
     {
        DataRecord dataRecord = (DataRecord) e.getMessage();
        log.info( "received:{} sequence:{}", received++,
dataRecord.getSequence() );
     }
     else
        log.error( "Received an unknown message type:{}",
e.getMessage().getClass().getName() );
  }

  @Override
  public void channelBound( ChannelHandlerContext ctx, ChannelStateEvent e
) throws Exception
  {
     log.info( "Successfully bound" );
  }
}
package com.example.netty.udp.server;

import static org.jboss.netty.channel.Channels.pipeline;

import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;

import com.example.netty.udp.model.DataRecordDecoder;

public class UdpServerPipelineFactory implements ChannelPipelineFactory
{
  public ChannelPipeline getPipeline() throws Exception
  {
     ChannelPipeline pipeline = pipeline();

     // Add the number codec first,
     pipeline.addLast( "decoder", new DataRecordDecoder() );

     // and then business logic.
     pipeline.addLast( "handler", new UdpServerHandler() );

     return pipeline;
  }
}

///////////////////////
// Model/Codecs
///////////////////////


package com.example.netty.udp.model;

public class DataRecord
{
  public static final int MessageHeaderSize = 20;
  public static final int DataPayloadSize   = 336;
  public static final int MessageSize       = 366;

  private Byte[]          header            = new Byte[MessageHeaderSize];

  @SuppressWarnings( "unused" )
  private short           count             = 112;
  private Byte[]          data              = new Byte[DataPayloadSize];

  private int             identifier        = 0;
  private int             sequence          = 0;

  public DataRecord( int identifier, int sequence )
  {
     for ( int i = 0; i < MessageHeaderSize; i++ )
        header[i] = (byte) i;

     for ( int i = 0; i < DataPayloadSize; i++ )
        data[i] = (byte) i;

     this.identifier = identifier;
     this.sequence = sequence;
  }

  public int getIdentifier()
  {
     return identifier;
  }

  public void setIdentifier( int identifier )
  {
     this.identifier = identifier;
  }

  public int getSequence()
  {
     return sequence;
  }

  public void setSequence( int sequence )
  {
     this.sequence = sequence;
  }

  public String toString()
  {
     StringBuilder builder = new StringBuilder();
     builder.append( DataRecord.class.getSimpleName() );
     builder.append( " identifier:" ).append( identifier );
     builder.append( " sequence:" ).append( sequence );

     return builder.toString();
  }
}
package com.example.netty.udp.model;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.oneone.OneToOneDecoder;

public class DataRecordDecoder extends OneToOneDecoder
{
  private static final int MessageHeaderSize = 20;

  // private static final Logger log = LoggerFactory.getLogger(
DataRecordDecoder.class );

  @Override
  protected Object decode( ChannelHandlerContext arg0, Channel arg1, Object
msg ) throws Exception
  {
     if ( !(msg instanceof ChannelBuffer) )
        return msg;

     ChannelBuffer buffer = (ChannelBuffer) msg;

     int identifier = buffer.getInt( MessageHeaderSize );
     int sequence = buffer.getInt( MessageHeaderSize + 4 );
     DataRecord record = new DataRecord( identifier, sequence );
     return record;
  }
}
package com.example.netty.udp.model;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;

public class DataRecordEncoder extends OneToOneEncoder
{

  @Override
  protected Object encode( ChannelHandlerContext ctx, Channel channel,
Object msg ) throws Exception
  {
     if ( !(msg instanceof DataRecord) )
        return msg;

     DataRecord record = (DataRecord) msg;

     byte[] data = new byte[DataRecord.MessageSize];
     for ( int i = 0; i < DataRecord.MessageSize; i++ )
        data[i] = (byte) i;

     // Construct a message.
     ChannelBuffer buf = ChannelBuffers.dynamicBuffer();
     buf.writeBytes( data ); // data
     buf.setInt( DataRecord.MessageHeaderSize, record.getIdentifier() );
     buf.setInt( DataRecord.MessageHeaderSize + 4, record.getSequence() );

     return buf;
  }
}

--
View this message in context:
http://netty-forums-and-mailing-lists.685743.n2.nabble.com/UDP-Help-tp5699562p5699562.html
Sent from the Netty User Group mailing list archive at Nabble.com.
_______________________________________________
netty-users mailing list
netty-users at lists.jboss.org
https://lists.jboss.org/mailman/listinfo/netty-users
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.jboss.org/pipermail/netty-users/attachments/20101104/47ea332e/attachment-0001.html 


More information about the netty-users mailing list