UDP Help

cporter71 cporter71 at gmail.com
Tue Nov 2 19:40:49 EDT 2010


Hello.  I am a netty newbie coming from a current mina project.  Mina seems
to solve a lot of our needs with the exception of some performance issues. 
Currently I am working out a simple test client/server to see how netty
performs - relative to our context of course!

Our use case is generally straight forward -- read UDP packets as fast as
possible and internally queue them into the application.  The packets are
fixed length (~350 bytes).  No writes from the server to the client are
required -- the server is strictly a consumer.

I have been following the examples and reading through the user group but
have not found a good example of a client/server where the client is sending
larger packets as fast as it can.  I created a sample/example demonstrating
such a use case however I clearly have something wrong because I cannot
send/receive 1000 packets on the loopback!  I will continue to dig to see if
I can:
> Find what I am doing wrong!  In the lengthy code attached I am receiving a
> fraction of the messages sent.  There seems to be something magic
> happening around message #110 where the sequences numbers start the
> separation/delta.
> Find information on how to "tune" netty for UDP to handle sustained rates
> of 5+ Mbs from 10-20 clients (assuming appropriate hardware specs).  

It is understood that UDP makes no guarantee on delivery however on a
closed/wireless network we would like to see 95%+.  We have built-in retry
logic to fall back on however we would like to make sure the server is not
loosing/dropping the packets.

Any help would be appreciated.

cporter

>> This is my first post here - if there is a better way to post/share code
>> on this forum then please let me know.

///////////////////////
// Client
///////////////////////

package com.example.netty.udp.client;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.Executors;

import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class UdpClient
{
   private static final Logger log        = LoggerFactory.getLogger(
UdpClient.class );

   private String              ip         = "127.0.0.1";
   private int                 port       = 50001;

   private int                 identifier = 0;
   private int                 count      = 0;

   private final SocketAddress address;

   public UdpClient( int identifier, int count )
   {
      this.identifier = identifier;
      this.count = count;

      this.address = new InetSocketAddress( ip, port );
   }

   public UdpClient( String ip, int port, int identifier, int count )
   {
      this.ip = ip;
      this.port = port;

      this.identifier = identifier;
      this.count = count;

      this.address = new InetSocketAddress( ip, port );
   }

   public void send()
   {
      // Configure the client.
      ConnectionlessBootstrap bootstrap = new ConnectionlessBootstrap( new
NioDatagramChannelFactory( Executors
            .newCachedThreadPool() ) );

      // Set up the event pipeline factory.
      bootstrap.setPipelineFactory( new UdpClientPipelineFactory(
identifier, count ) );

      // Make a new connection.
      log.info( "Attempting to connect to address:{}", address );
      bootstrap.connect( address );
   }
}
package com.example.netty.udp.client;

import java.io.IOException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class UdpClientBootstrap
{
   private static final Logger log              = LoggerFactory.getLogger(
UdpClientBootstrap.class );

   private static final int    DefaultIdentifer = 42;
   private static final int    DefaultCount     = 1000;

   public static void main( String[] args ) throws IOException
   {
      try
      {
         int identifier = DefaultIdentifer;
         int count = DefaultCount;

         new UdpClient( identifier, count ).send();

         log.info( "Send complete -- exiting" );
      }
      catch ( Exception e )
      {
         log.error( "Unexpected error", e );
      }
   }
}
package com.example.netty.udp.client;

import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.example.netty.udp.model.DataRecord;

public class UdpClientHandler extends SimpleChannelUpstreamHandler
{
   private static final Logger log        = LoggerFactory.getLogger(
UdpClientHandler.class );

   private int                 identifier = 0;
   private int                 count      = 0;
   private int                 index      = 0;

   public UdpClientHandler( int identifier, int count )
   {
      this.identifier = identifier;
      this.count = count;
   }

   @Override
   public void channelConnected( ChannelHandlerContext ctx,
ChannelStateEvent e )
   {
      log.info( "channelConnected" );
      sendRecords( e );
   }

   @Override
   public void handleUpstream( ChannelHandlerContext ctx, ChannelEvent e )
throws Exception
   {
      if ( e instanceof ChannelStateEvent )
      {
         log.info( e.toString() );
      }
      super.handleUpstream( ctx, e );
   }

   @Override
   public void channelInterestChanged( ChannelHandlerContext ctx,
ChannelStateEvent e )
   {
      log.info( "channelInterestChanged" );
      sendRecords( e );
   }

   @Override
   public void exceptionCaught( ChannelHandlerContext ctx, ExceptionEvent e
)
   {
      log.error( "Unexpected exception from downstream.", e.getCause() );
   }

   private void sendRecords( ChannelStateEvent e )
   {
      Channel channel = e.getChannel();

      while ( channel.isWritable() )
      {
         if ( index <= count )
         {
            DataRecord record = new DataRecord( identifier, index );
            channel.write( record );
            index++;
         }
         else
         {
            break;
         }
      }
   }
}
package com.example.netty.udp.client;

import static org.jboss.netty.channel.Channels.pipeline;

import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;

import com.example.netty.udp.model.DataRecordEncoder;

public class UdpClientPipelineFactory implements ChannelPipelineFactory
{
   private final int identifier;
   private final int count;

   public UdpClientPipelineFactory( int identifier, int count )
   {
      this.identifier = identifier;
      this.count = count;
   }

   public ChannelPipeline getPipeline() throws Exception
   {
      ChannelPipeline pipeline = pipeline();

      // Add the number codec first,
      pipeline.addLast( "encoder", new DataRecordEncoder() );

      // and then business logic.
      pipeline.addLast( "handler", new UdpClientHandler( identifier, count )
);

      return pipeline;
   }
}

///////////////////////
// Server
///////////////////////

package com.example.netty.udp.server;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.Executors;

import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
import org.jboss.netty.channel.FixedReceiveBufferSizePredictorFactory;
import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class UdpServer
{
   private static final Logger log                     =
LoggerFactory.getLogger( UdpServer.class );

   public static final int     OneMg                   = 1024 * 1024;
   public static final int     SocketReceiveBufferSize = OneMg * 64;

   private String              ip                      = "127.0.0.1";
   private int                 port                    = 50001;

   private SocketAddress       address;

   public UdpServer()
   {
   }

   public UdpServer( String ip, int port )
   {
      this.ip = ip;
      this.port = port;
   }

   public void serve()
   {
      try
      {
         this.address = new InetSocketAddress( ip, port );

         ConnectionlessBootstrap bootstrap = new ConnectionlessBootstrap(
new NioDatagramChannelFactory( Executors
               .newCachedThreadPool() ) );

         // Set up the event pipeline factory.
         bootstrap.setPipelineFactory( new UdpServerPipelineFactory() );

         bootstrap.setOption( "receiveBufferSizePredictorFactory", new
FixedReceiveBufferSizePredictorFactory( 1024 ) );

         // Bind and start to accept incoming connections.
         log.info( "Attempting to bind to address:{}", address );
         bootstrap.bind( address );
      }
      catch ( Exception e )
      {
         log.error( "", e );
      }
   }
}
package com.example.netty.udp.server;

import java.io.IOException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class UdpServerBootstrap
{
   private static final Logger log = LoggerFactory.getLogger(
UdpServerBootstrap.class );

   public static void main( String[] args ) throws IOException
   {
      try
      {
         new UdpServer().serve();

         while ( true )
         {
            Thread.sleep( 5000 );
         }
      }
      catch ( Exception e )
      {
         log.error( "Unexpected error", e );
      }
   }
}
package com.example.netty.udp.server;

import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.example.netty.udp.model.DataRecord;

public class UdpServerHandler extends SimpleChannelHandler
{
   private static final Logger log      = LoggerFactory.getLogger(
UdpServerBootstrap.class );

   private static int          received = 0;

   @Override
   public void messageReceived( ChannelHandlerContext ctx, MessageEvent e )
   {
      if ( e.getMessage() instanceof DataRecord )
      {
         DataRecord dataRecord = (DataRecord) e.getMessage();
         log.info( "received:{} sequence:{}", received++,
dataRecord.getSequence() );
      }
      else
         log.error( "Received an unknown message type:{}",
e.getMessage().getClass().getName() );
   }

   @Override
   public void channelBound( ChannelHandlerContext ctx, ChannelStateEvent e
) throws Exception
   {
      log.info( "Successfully bound" );
   }
}
package com.example.netty.udp.server;

import static org.jboss.netty.channel.Channels.pipeline;

import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;

import com.example.netty.udp.model.DataRecordDecoder;

public class UdpServerPipelineFactory implements ChannelPipelineFactory
{
   public ChannelPipeline getPipeline() throws Exception
   {
      ChannelPipeline pipeline = pipeline();

      // Add the number codec first,
      pipeline.addLast( "decoder", new DataRecordDecoder() );

      // and then business logic.
      pipeline.addLast( "handler", new UdpServerHandler() );

      return pipeline;
   }
}

///////////////////////
// Model/Codecs
///////////////////////


package com.example.netty.udp.model;

public class DataRecord
{
   public static final int MessageHeaderSize = 20;
   public static final int DataPayloadSize   = 336;
   public static final int MessageSize       = 366;

   private Byte[]          header            = new Byte[MessageHeaderSize];

   @SuppressWarnings( "unused" )
   private short           count             = 112;
   private Byte[]          data              = new Byte[DataPayloadSize];

   private int             identifier        = 0;
   private int             sequence          = 0;

   public DataRecord( int identifier, int sequence )
   {
      for ( int i = 0; i < MessageHeaderSize; i++ )
         header[i] = (byte) i;

      for ( int i = 0; i < DataPayloadSize; i++ )
         data[i] = (byte) i;

      this.identifier = identifier;
      this.sequence = sequence;
   }

   public int getIdentifier()
   {
      return identifier;
   }

   public void setIdentifier( int identifier )
   {
      this.identifier = identifier;
   }

   public int getSequence()
   {
      return sequence;
   }

   public void setSequence( int sequence )
   {
      this.sequence = sequence;
   }

   public String toString()
   {
      StringBuilder builder = new StringBuilder();
      builder.append( DataRecord.class.getSimpleName() );
      builder.append( " identifier:" ).append( identifier );
      builder.append( " sequence:" ).append( sequence );

      return builder.toString();
   }
}
package com.example.netty.udp.model;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.oneone.OneToOneDecoder;

public class DataRecordDecoder extends OneToOneDecoder
{
   private static final int MessageHeaderSize = 20;

   // private static final Logger log = LoggerFactory.getLogger(
DataRecordDecoder.class );

   @Override
   protected Object decode( ChannelHandlerContext arg0, Channel arg1, Object
msg ) throws Exception
   {
      if ( !(msg instanceof ChannelBuffer) )
         return msg;

      ChannelBuffer buffer = (ChannelBuffer) msg;

      int identifier = buffer.getInt( MessageHeaderSize );
      int sequence = buffer.getInt( MessageHeaderSize + 4 );
      DataRecord record = new DataRecord( identifier, sequence );
      return record;
   }
}
package com.example.netty.udp.model;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;

public class DataRecordEncoder extends OneToOneEncoder
{

   @Override
   protected Object encode( ChannelHandlerContext ctx, Channel channel,
Object msg ) throws Exception
   {
      if ( !(msg instanceof DataRecord) )
         return msg;

      DataRecord record = (DataRecord) msg;

      byte[] data = new byte[DataRecord.MessageSize];
      for ( int i = 0; i < DataRecord.MessageSize; i++ )
         data[i] = (byte) i;

      // Construct a message.
      ChannelBuffer buf = ChannelBuffers.dynamicBuffer();
      buf.writeBytes( data ); // data
      buf.setInt( DataRecord.MessageHeaderSize, record.getIdentifier() );
      buf.setInt( DataRecord.MessageHeaderSize + 4, record.getSequence() );

      return buf;
   }
}

-- 
View this message in context: http://netty-forums-and-mailing-lists.685743.n2.nabble.com/UDP-Help-tp5699562p5699562.html
Sent from the Netty User Group mailing list archive at Nabble.com.


More information about the netty-users mailing list