<p>Execution handler will make your io threads go back faster to picking up messages. That would make a huge difference.</p>
<p><blockquote type="cite">On 3 Nov 2010 05:11, &quot;cporter71&quot; &lt;<a href="mailto:cporter71@gmail.com">cporter71@gmail.com</a>&gt; wrote:<br><br><br>
Hello.  I am a netty newbie coming from a current mina project.  Mina seems<br>
to solve a lot of our needs with the exception of some performance issues.<br>
Currently I am working out a simple test client/server to see how netty<br>
performs - relative to our context of course!<br>
<br>
Our use case is generally straight forward -- read UDP packets as fast as<br>
possible and internally queue them into the application.  The packets are<br>
fixed length (~350 bytes).  No writes from the server to the client are<br>
required -- the server is strictly a consumer.<br>
<br>
I have been following the examples and reading through the user group but<br>
have not found a good example of a client/server where the client is sending<br>
larger packets as fast as it can.  I created a sample/example demonstrating<br>
such a use case however I clearly have something wrong because I cannot<br>
send/receive 1000 packets on the loopback!  I will continue to dig to see if<br>
I can:<br>
&gt; Find what I am doing wrong!  In the lengthy code attached I am receiving a<br>
&gt; fraction of the messages sent.  There seems to be something magic<br>
&gt; happening around message #110 where the sequences numbers start the<br>
&gt; separation/delta.<br>
&gt; Find information on how to &quot;tune&quot; netty for UDP to handle sustained rates<br>
&gt; of 5+ Mbs from 10-20 clients (assuming appropriate hardware specs).<br>
<br>
It is understood that UDP makes no guarantee on delivery however on a<br>
closed/wireless network we would like to see 95%+.  We have built-in retry<br>
logic to fall back on however we would like to make sure the server is not<br>
loosing/dropping the packets.<br>
<br>
Any help would be appreciated.<br>
<br>
cporter<br>
<br>
&gt;&gt; This is my first post here - if there is a better way to post/share code<br>
&gt;&gt; on this forum then please let me know.<br>
<br>
///////////////////////<br>
// Client<br>
///////////////////////<br>
<br>
package com.example.netty.udp.client;<br>
<br>
import java.net.InetSocketAddress;<br>
import java.net.SocketAddress;<br>
import java.util.concurrent.Executors;<br>
<br>
import org.jboss.netty.bootstrap.ConnectionlessBootstrap;<br>
import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;<br>
import org.slf4j.Logger;<br>
import org.slf4j.LoggerFactory;<br>
<br>
public class UdpClient<br>
{<br>
   private static final Logger log        = LoggerFactory.getLogger(<br>
UdpClient.class );<br>
<br>
   private String              ip         = &quot;127.0.0.1&quot;;<br>
   private int                 port       = 50001;<br>
<br>
   private int                 identifier = 0;<br>
   private int                 count      = 0;<br>
<br>
   private final SocketAddress address;<br>
<br>
   public UdpClient( int identifier, int count )<br>
   {<br>
      this.identifier = identifier;<br>
      this.count = count;<br>
<br>
      this.address = new InetSocketAddress( ip, port );<br>
   }<br>
<br>
   public UdpClient( String ip, int port, int identifier, int count )<br>
   {<br>
      this.ip = ip;<br>
      this.port = port;<br>
<br>
      this.identifier = identifier;<br>
      this.count = count;<br>
<br>
      this.address = new InetSocketAddress( ip, port );<br>
   }<br>
<br>
   public void send()<br>
   {<br>
      // Configure the client.<br>
      ConnectionlessBootstrap bootstrap = new ConnectionlessBootstrap( new<br>
NioDatagramChannelFactory( Executors<br>
            .newCachedThreadPool() ) );<br>
<br>
      // Set up the event pipeline factory.<br>
      bootstrap.setPipelineFactory( new UdpClientPipelineFactory(<br>
identifier, count ) );<br>
<br>
      // Make a new connection.<br>
      <a href="http://log.info" target="_blank">log.info</a>( &quot;Attempting to connect to address:{}&quot;, address );<br>
      bootstrap.connect( address );<br>
   }<br>
}<br>
package com.example.netty.udp.client;<br>
<br>
import java.io.IOException;<br>
<br>
import org.slf4j.Logger;<br>
import org.slf4j.LoggerFactory;<br>
<br>
public class UdpClientBootstrap<br>
{<br>
   private static final Logger log              = LoggerFactory.getLogger(<br>
UdpClientBootstrap.class );<br>
<br>
   private static final int    DefaultIdentifer = 42;<br>
   private static final int    DefaultCount     = 1000;<br>
<br>
   public static void main( String[] args ) throws IOException<br>
   {<br>
      try<br>
      {<br>
         int identifier = DefaultIdentifer;<br>
         int count = DefaultCount;<br>
<br>
         new UdpClient( identifier, count ).send();<br>
<br>
         <a href="http://log.info" target="_blank">log.info</a>( &quot;Send complete -- exiting&quot; );<br>
      }<br>
      catch ( Exception e )<br>
      {<br>
         log.error( &quot;Unexpected error&quot;, e );<br>
      }<br>
   }<br>
}<br>
package com.example.netty.udp.client;<br>
<br>
import org.jboss.netty.channel.Channel;<br>
import org.jboss.netty.channel.ChannelEvent;<br>
import org.jboss.netty.channel.ChannelHandlerContext;<br>
import org.jboss.netty.channel.ChannelStateEvent;<br>
import org.jboss.netty.channel.ExceptionEvent;<br>
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;<br>
import org.slf4j.Logger;<br>
import org.slf4j.LoggerFactory;<br>
<br>
import com.example.netty.udp.model.DataRecord;<br>
<br>
public class UdpClientHandler extends SimpleChannelUpstreamHandler<br>
{<br>
   private static final Logger log        = LoggerFactory.getLogger(<br>
UdpClientHandler.class );<br>
<br>
   private int                 identifier = 0;<br>
   private int                 count      = 0;<br>
   private int                 index      = 0;<br>
<br>
   public UdpClientHandler( int identifier, int count )<br>
   {<br>
      this.identifier = identifier;<br>
      this.count = count;<br>
   }<br>
<br>
   @Override<br>
   public void channelConnected( ChannelHandlerContext ctx,<br>
ChannelStateEvent e )<br>
   {<br>
      <a href="http://log.info" target="_blank">log.info</a>( &quot;channelConnected&quot; );<br>
      sendRecords( e );<br>
   }<br>
<br>
   @Override<br>
   public void handleUpstream( ChannelHandlerContext ctx, ChannelEvent e )<br>
throws Exception<br>
   {<br>
      if ( e instanceof ChannelStateEvent )<br>
      {<br>
         <a href="http://log.info" target="_blank">log.info</a>( e.toString() );<br>
      }<br>
      super.handleUpstream( ctx, e );<br>
   }<br>
<br>
   @Override<br>
   public void channelInterestChanged( ChannelHandlerContext ctx,<br>
ChannelStateEvent e )<br>
   {<br>
      <a href="http://log.info" target="_blank">log.info</a>( &quot;channelInterestChanged&quot; );<br>
      sendRecords( e );<br>
   }<br>
<br>
   @Override<br>
   public void exceptionCaught( ChannelHandlerContext ctx, ExceptionEvent e<br>
)<br>
   {<br>
      log.error( &quot;Unexpected exception from downstream.&quot;, e.getCause() );<br>
   }<br>
<br>
   private void sendRecords( ChannelStateEvent e )<br>
   {<br>
      Channel channel = e.getChannel();<br>
<br>
      while ( channel.isWritable() )<br>
      {<br>
         if ( index &lt;= count )<br>
         {<br>
            DataRecord record = new DataRecord( identifier, index );<br>
            channel.write( record );<br>
            index++;<br>
         }<br>
         else<br>
         {<br>
            break;<br>
         }<br>
      }<br>
   }<br>
}<br>
package com.example.netty.udp.client;<br>
<br>
import static org.jboss.netty.channel.Channels.pipeline;<br>
<br>
import org.jboss.netty.channel.ChannelPipeline;<br>
import org.jboss.netty.channel.ChannelPipelineFactory;<br>
<br>
import com.example.netty.udp.model.DataRecordEncoder;<br>
<br>
public class UdpClientPipelineFactory implements ChannelPipelineFactory<br>
{<br>
   private final int identifier;<br>
   private final int count;<br>
<br>
   public UdpClientPipelineFactory( int identifier, int count )<br>
   {<br>
      this.identifier = identifier;<br>
      this.count = count;<br>
   }<br>
<br>
   public ChannelPipeline getPipeline() throws Exception<br>
   {<br>
      ChannelPipeline pipeline = pipeline();<br>
<br>
      // Add the number codec first,<br>
      pipeline.addLast( &quot;encoder&quot;, new DataRecordEncoder() );<br>
<br>
      // and then business logic.<br>
      pipeline.addLast( &quot;handler&quot;, new UdpClientHandler( identifier, count )<br>
);<br>
<br>
      return pipeline;<br>
   }<br>
}<br>
<br>
///////////////////////<br>
// Server<br>
///////////////////////<br>
<br>
package com.example.netty.udp.server;<br>
<br>
import java.net.InetSocketAddress;<br>
import java.net.SocketAddress;<br>
import java.util.concurrent.Executors;<br>
<br>
import org.jboss.netty.bootstrap.ConnectionlessBootstrap;<br>
import org.jboss.netty.channel.FixedReceiveBufferSizePredictorFactory;<br>
import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;<br>
import org.slf4j.Logger;<br>
import org.slf4j.LoggerFactory;<br>
<br>
public class UdpServer<br>
{<br>
   private static final Logger log                     =<br>
LoggerFactory.getLogger( UdpServer.class );<br>
<br>
   public static final int     OneMg                   = 1024 * 1024;<br>
   public static final int     SocketReceiveBufferSize = OneMg * 64;<br>
<br>
   private String              ip                      = &quot;127.0.0.1&quot;;<br>
   private int                 port                    = 50001;<br>
<br>
   private SocketAddress       address;<br>
<br>
   public UdpServer()<br>
   {<br>
   }<br>
<br>
   public UdpServer( String ip, int port )<br>
   {<br>
      this.ip = ip;<br>
      this.port = port;<br>
   }<br>
<br>
   public void serve()<br>
   {<br>
      try<br>
      {<br>
         this.address = new InetSocketAddress( ip, port );<br>
<br>
         ConnectionlessBootstrap bootstrap = new ConnectionlessBootstrap(<br>
new NioDatagramChannelFactory( Executors<br>
               .newCachedThreadPool() ) );<br>
<br>
         // Set up the event pipeline factory.<br>
         bootstrap.setPipelineFactory( new UdpServerPipelineFactory() );<br>
<br>
         bootstrap.setOption( &quot;receiveBufferSizePredictorFactory&quot;, new<br>
FixedReceiveBufferSizePredictorFactory( 1024 ) );<br>
<br>
         // Bind and start to accept incoming connections.<br>
         <a href="http://log.info" target="_blank">log.info</a>( &quot;Attempting to bind to address:{}&quot;, address );<br>
         bootstrap.bind( address );<br>
      }<br>
      catch ( Exception e )<br>
      {<br>
         log.error( &quot;&quot;, e );<br>
      }<br>
   }<br>
}<br>
package com.example.netty.udp.server;<br>
<br>
import java.io.IOException;<br>
<br>
import org.slf4j.Logger;<br>
import org.slf4j.LoggerFactory;<br>
<br>
public class UdpServerBootstrap<br>
{<br>
   private static final Logger log = LoggerFactory.getLogger(<br>
UdpServerBootstrap.class );<br>
<br>
   public static void main( String[] args ) throws IOException<br>
   {<br>
      try<br>
      {<br>
         new UdpServer().serve();<br>
<br>
         while ( true )<br>
         {<br>
            Thread.sleep( 5000 );<br>
         }<br>
      }<br>
      catch ( Exception e )<br>
      {<br>
         log.error( &quot;Unexpected error&quot;, e );<br>
      }<br>
   }<br>
}<br>
package com.example.netty.udp.server;<br>
<br>
import org.jboss.netty.channel.ChannelHandlerContext;<br>
import org.jboss.netty.channel.ChannelStateEvent;<br>
import org.jboss.netty.channel.MessageEvent;<br>
import org.jboss.netty.channel.SimpleChannelHandler;<br>
import org.slf4j.Logger;<br>
import org.slf4j.LoggerFactory;<br>
<br>
import com.example.netty.udp.model.DataRecord;<br>
<br>
public class UdpServerHandler extends SimpleChannelHandler<br>
{<br>
   private static final Logger log      = LoggerFactory.getLogger(<br>
UdpServerBootstrap.class );<br>
<br>
   private static int          received = 0;<br>
<br>
   @Override<br>
   public void messageReceived( ChannelHandlerContext ctx, MessageEvent e )<br>
   {<br>
      if ( e.getMessage() instanceof DataRecord )<br>
      {<br>
         DataRecord dataRecord = (DataRecord) e.getMessage();<br>
         <a href="http://log.info" target="_blank">log.info</a>( &quot;received:{} sequence:{}&quot;, received++,<br>
dataRecord.getSequence() );<br>
      }<br>
      else<br>
         log.error( &quot;Received an unknown message type:{}&quot;,<br>
e.getMessage().getClass().getName() );<br>
   }<br>
<br>
   @Override<br>
   public void channelBound( ChannelHandlerContext ctx, ChannelStateEvent e<br>
) throws Exception<br>
   {<br>
      <a href="http://log.info" target="_blank">log.info</a>( &quot;Successfully bound&quot; );<br>
   }<br>
}<br>
package com.example.netty.udp.server;<br>
<br>
import static org.jboss.netty.channel.Channels.pipeline;<br>
<br>
import org.jboss.netty.channel.ChannelPipeline;<br>
import org.jboss.netty.channel.ChannelPipelineFactory;<br>
<br>
import com.example.netty.udp.model.DataRecordDecoder;<br>
<br>
public class UdpServerPipelineFactory implements ChannelPipelineFactory<br>
{<br>
   public ChannelPipeline getPipeline() throws Exception<br>
   {<br>
      ChannelPipeline pipeline = pipeline();<br>
<br>
      // Add the number codec first,<br>
      pipeline.addLast( &quot;decoder&quot;, new DataRecordDecoder() );<br>
<br>
      // and then business logic.<br>
      pipeline.addLast( &quot;handler&quot;, new UdpServerHandler() );<br>
<br>
      return pipeline;<br>
   }<br>
}<br>
<br>
///////////////////////<br>
// Model/Codecs<br>
///////////////////////<br>
<br>
<br>
package com.example.netty.udp.model;<br>
<br>
public class DataRecord<br>
{<br>
   public static final int MessageHeaderSize = 20;<br>
   public static final int DataPayloadSize   = 336;<br>
   public static final int MessageSize       = 366;<br>
<br>
   private Byte[]          header            = new Byte[MessageHeaderSize];<br>
<br>
   @SuppressWarnings( &quot;unused&quot; )<br>
   private short           count             = 112;<br>
   private Byte[]          data              = new Byte[DataPayloadSize];<br>
<br>
   private int             identifier        = 0;<br>
   private int             sequence          = 0;<br>
<br>
   public DataRecord( int identifier, int sequence )<br>
   {<br>
      for ( int i = 0; i &lt; MessageHeaderSize; i++ )<br>
         header[i] = (byte) i;<br>
<br>
      for ( int i = 0; i &lt; DataPayloadSize; i++ )<br>
         data[i] = (byte) i;<br>
<br>
      this.identifier = identifier;<br>
      this.sequence = sequence;<br>
   }<br>
<br>
   public int getIdentifier()<br>
   {<br>
      return identifier;<br>
   }<br>
<br>
   public void setIdentifier( int identifier )<br>
   {<br>
      this.identifier = identifier;<br>
   }<br>
<br>
   public int getSequence()<br>
   {<br>
      return sequence;<br>
   }<br>
<br>
   public void setSequence( int sequence )<br>
   {<br>
      this.sequence = sequence;<br>
   }<br>
<br>
   public String toString()<br>
   {<br>
      StringBuilder builder = new StringBuilder();<br>
      builder.append( DataRecord.class.getSimpleName() );<br>
      builder.append( &quot; identifier:&quot; ).append( identifier );<br>
      builder.append( &quot; sequence:&quot; ).append( sequence );<br>
<br>
      return builder.toString();<br>
   }<br>
}<br>
package com.example.netty.udp.model;<br>
<br>
import org.jboss.netty.buffer.ChannelBuffer;<br>
import org.jboss.netty.channel.Channel;<br>
import org.jboss.netty.channel.ChannelHandlerContext;<br>
import org.jboss.netty.handler.codec.oneone.OneToOneDecoder;<br>
<br>
public class DataRecordDecoder extends OneToOneDecoder<br>
{<br>
   private static final int MessageHeaderSize = 20;<br>
<br>
   // private static final Logger log = LoggerFactory.getLogger(<br>
DataRecordDecoder.class );<br>
<br>
   @Override<br>
   protected Object decode( ChannelHandlerContext arg0, Channel arg1, Object<br>
msg ) throws Exception<br>
   {<br>
      if ( !(msg instanceof ChannelBuffer) )<br>
         return msg;<br>
<br>
      ChannelBuffer buffer = (ChannelBuffer) msg;<br>
<br>
      int identifier = buffer.getInt( MessageHeaderSize );<br>
      int sequence = buffer.getInt( MessageHeaderSize + 4 );<br>
      DataRecord record = new DataRecord( identifier, sequence );<br>
      return record;<br>
   }<br>
}<br>
package com.example.netty.udp.model;<br>
<br>
import org.jboss.netty.buffer.ChannelBuffer;<br>
import org.jboss.netty.buffer.ChannelBuffers;<br>
import org.jboss.netty.channel.Channel;<br>
import org.jboss.netty.channel.ChannelHandlerContext;<br>
import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;<br>
<br>
public class DataRecordEncoder extends OneToOneEncoder<br>
{<br>
<br>
   @Override<br>
   protected Object encode( ChannelHandlerContext ctx, Channel channel,<br>
Object msg ) throws Exception<br>
   {<br>
      if ( !(msg instanceof DataRecord) )<br>
         return msg;<br>
<br>
      DataRecord record = (DataRecord) msg;<br>
<br>
      byte[] data = new byte[DataRecord.MessageSize];<br>
      for ( int i = 0; i &lt; DataRecord.MessageSize; i++ )<br>
         data[i] = (byte) i;<br>
<br>
      // Construct a message.<br>
      ChannelBuffer buf = ChannelBuffers.dynamicBuffer();<br>
      buf.writeBytes( data ); // data<br>
      buf.setInt( DataRecord.MessageHeaderSize, record.getIdentifier() );<br>
      buf.setInt( DataRecord.MessageHeaderSize + 4, record.getSequence() );<br>
<br>
      return buf;<br>
   }<br>
}<br>
<font color="#888888"><br>
--<br>
View this message in context: <a href="http://netty-forums-and-mailing-lists.685743.n2.nabble.com/UDP-Help-tp5699562p5699562.html" target="_blank">http://netty-forums-and-mailing-lists.685743.n2.nabble.com/UDP-Help-tp5699562p5699562.html</a><br>

Sent from the Netty User Group mailing list archive at Nabble.com.<br>
_______________________________________________<br>
netty-users mailing list<br>
<a href="mailto:netty-users@lists.jboss.org">netty-users@lists.jboss.org</a><br>
<a href="https://lists.jboss.org/mailman/listinfo/netty-users" target="_blank">https://lists.jboss.org/mailman/listinfo/netty-users</a><br>
</font></blockquote></p>