<p>Execution handler will make your io threads go back faster to picking up messages. That would make a huge difference.</p>
<p><blockquote type="cite">On 3 Nov 2010 05:11, "cporter71" <<a href="mailto:cporter71@gmail.com">cporter71@gmail.com</a>> wrote:<br><br><br>
Hello. I am a netty newbie coming from a current mina project. Mina seems<br>
to solve a lot of our needs with the exception of some performance issues.<br>
Currently I am working out a simple test client/server to see how netty<br>
performs - relative to our context of course!<br>
<br>
Our use case is generally straight forward -- read UDP packets as fast as<br>
possible and internally queue them into the application. The packets are<br>
fixed length (~350 bytes). No writes from the server to the client are<br>
required -- the server is strictly a consumer.<br>
<br>
I have been following the examples and reading through the user group but<br>
have not found a good example of a client/server where the client is sending<br>
larger packets as fast as it can. I created a sample/example demonstrating<br>
such a use case however I clearly have something wrong because I cannot<br>
send/receive 1000 packets on the loopback! I will continue to dig to see if<br>
I can:<br>
> Find what I am doing wrong! In the lengthy code attached I am receiving a<br>
> fraction of the messages sent. There seems to be something magic<br>
> happening around message #110 where the sequences numbers start the<br>
> separation/delta.<br>
> Find information on how to "tune" netty for UDP to handle sustained rates<br>
> of 5+ Mbs from 10-20 clients (assuming appropriate hardware specs).<br>
<br>
It is understood that UDP makes no guarantee on delivery however on a<br>
closed/wireless network we would like to see 95%+. We have built-in retry<br>
logic to fall back on however we would like to make sure the server is not<br>
loosing/dropping the packets.<br>
<br>
Any help would be appreciated.<br>
<br>
cporter<br>
<br>
>> This is my first post here - if there is a better way to post/share code<br>
>> on this forum then please let me know.<br>
<br>
///////////////////////<br>
// Client<br>
///////////////////////<br>
<br>
package com.example.netty.udp.client;<br>
<br>
import java.net.InetSocketAddress;<br>
import java.net.SocketAddress;<br>
import java.util.concurrent.Executors;<br>
<br>
import org.jboss.netty.bootstrap.ConnectionlessBootstrap;<br>
import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;<br>
import org.slf4j.Logger;<br>
import org.slf4j.LoggerFactory;<br>
<br>
public class UdpClient<br>
{<br>
private static final Logger log = LoggerFactory.getLogger(<br>
UdpClient.class );<br>
<br>
private String ip = "127.0.0.1";<br>
private int port = 50001;<br>
<br>
private int identifier = 0;<br>
private int count = 0;<br>
<br>
private final SocketAddress address;<br>
<br>
public UdpClient( int identifier, int count )<br>
{<br>
this.identifier = identifier;<br>
this.count = count;<br>
<br>
this.address = new InetSocketAddress( ip, port );<br>
}<br>
<br>
public UdpClient( String ip, int port, int identifier, int count )<br>
{<br>
this.ip = ip;<br>
this.port = port;<br>
<br>
this.identifier = identifier;<br>
this.count = count;<br>
<br>
this.address = new InetSocketAddress( ip, port );<br>
}<br>
<br>
public void send()<br>
{<br>
// Configure the client.<br>
ConnectionlessBootstrap bootstrap = new ConnectionlessBootstrap( new<br>
NioDatagramChannelFactory( Executors<br>
.newCachedThreadPool() ) );<br>
<br>
// Set up the event pipeline factory.<br>
bootstrap.setPipelineFactory( new UdpClientPipelineFactory(<br>
identifier, count ) );<br>
<br>
// Make a new connection.<br>
<a href="http://log.info" target="_blank">log.info</a>( "Attempting to connect to address:{}", address );<br>
bootstrap.connect( address );<br>
}<br>
}<br>
package com.example.netty.udp.client;<br>
<br>
import java.io.IOException;<br>
<br>
import org.slf4j.Logger;<br>
import org.slf4j.LoggerFactory;<br>
<br>
public class UdpClientBootstrap<br>
{<br>
private static final Logger log = LoggerFactory.getLogger(<br>
UdpClientBootstrap.class );<br>
<br>
private static final int DefaultIdentifer = 42;<br>
private static final int DefaultCount = 1000;<br>
<br>
public static void main( String[] args ) throws IOException<br>
{<br>
try<br>
{<br>
int identifier = DefaultIdentifer;<br>
int count = DefaultCount;<br>
<br>
new UdpClient( identifier, count ).send();<br>
<br>
<a href="http://log.info" target="_blank">log.info</a>( "Send complete -- exiting" );<br>
}<br>
catch ( Exception e )<br>
{<br>
log.error( "Unexpected error", e );<br>
}<br>
}<br>
}<br>
package com.example.netty.udp.client;<br>
<br>
import org.jboss.netty.channel.Channel;<br>
import org.jboss.netty.channel.ChannelEvent;<br>
import org.jboss.netty.channel.ChannelHandlerContext;<br>
import org.jboss.netty.channel.ChannelStateEvent;<br>
import org.jboss.netty.channel.ExceptionEvent;<br>
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;<br>
import org.slf4j.Logger;<br>
import org.slf4j.LoggerFactory;<br>
<br>
import com.example.netty.udp.model.DataRecord;<br>
<br>
public class UdpClientHandler extends SimpleChannelUpstreamHandler<br>
{<br>
private static final Logger log = LoggerFactory.getLogger(<br>
UdpClientHandler.class );<br>
<br>
private int identifier = 0;<br>
private int count = 0;<br>
private int index = 0;<br>
<br>
public UdpClientHandler( int identifier, int count )<br>
{<br>
this.identifier = identifier;<br>
this.count = count;<br>
}<br>
<br>
@Override<br>
public void channelConnected( ChannelHandlerContext ctx,<br>
ChannelStateEvent e )<br>
{<br>
<a href="http://log.info" target="_blank">log.info</a>( "channelConnected" );<br>
sendRecords( e );<br>
}<br>
<br>
@Override<br>
public void handleUpstream( ChannelHandlerContext ctx, ChannelEvent e )<br>
throws Exception<br>
{<br>
if ( e instanceof ChannelStateEvent )<br>
{<br>
<a href="http://log.info" target="_blank">log.info</a>( e.toString() );<br>
}<br>
super.handleUpstream( ctx, e );<br>
}<br>
<br>
@Override<br>
public void channelInterestChanged( ChannelHandlerContext ctx,<br>
ChannelStateEvent e )<br>
{<br>
<a href="http://log.info" target="_blank">log.info</a>( "channelInterestChanged" );<br>
sendRecords( e );<br>
}<br>
<br>
@Override<br>
public void exceptionCaught( ChannelHandlerContext ctx, ExceptionEvent e<br>
)<br>
{<br>
log.error( "Unexpected exception from downstream.", e.getCause() );<br>
}<br>
<br>
private void sendRecords( ChannelStateEvent e )<br>
{<br>
Channel channel = e.getChannel();<br>
<br>
while ( channel.isWritable() )<br>
{<br>
if ( index <= count )<br>
{<br>
DataRecord record = new DataRecord( identifier, index );<br>
channel.write( record );<br>
index++;<br>
}<br>
else<br>
{<br>
break;<br>
}<br>
}<br>
}<br>
}<br>
package com.example.netty.udp.client;<br>
<br>
import static org.jboss.netty.channel.Channels.pipeline;<br>
<br>
import org.jboss.netty.channel.ChannelPipeline;<br>
import org.jboss.netty.channel.ChannelPipelineFactory;<br>
<br>
import com.example.netty.udp.model.DataRecordEncoder;<br>
<br>
public class UdpClientPipelineFactory implements ChannelPipelineFactory<br>
{<br>
private final int identifier;<br>
private final int count;<br>
<br>
public UdpClientPipelineFactory( int identifier, int count )<br>
{<br>
this.identifier = identifier;<br>
this.count = count;<br>
}<br>
<br>
public ChannelPipeline getPipeline() throws Exception<br>
{<br>
ChannelPipeline pipeline = pipeline();<br>
<br>
// Add the number codec first,<br>
pipeline.addLast( "encoder", new DataRecordEncoder() );<br>
<br>
// and then business logic.<br>
pipeline.addLast( "handler", new UdpClientHandler( identifier, count )<br>
);<br>
<br>
return pipeline;<br>
}<br>
}<br>
<br>
///////////////////////<br>
// Server<br>
///////////////////////<br>
<br>
package com.example.netty.udp.server;<br>
<br>
import java.net.InetSocketAddress;<br>
import java.net.SocketAddress;<br>
import java.util.concurrent.Executors;<br>
<br>
import org.jboss.netty.bootstrap.ConnectionlessBootstrap;<br>
import org.jboss.netty.channel.FixedReceiveBufferSizePredictorFactory;<br>
import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;<br>
import org.slf4j.Logger;<br>
import org.slf4j.LoggerFactory;<br>
<br>
public class UdpServer<br>
{<br>
private static final Logger log =<br>
LoggerFactory.getLogger( UdpServer.class );<br>
<br>
public static final int OneMg = 1024 * 1024;<br>
public static final int SocketReceiveBufferSize = OneMg * 64;<br>
<br>
private String ip = "127.0.0.1";<br>
private int port = 50001;<br>
<br>
private SocketAddress address;<br>
<br>
public UdpServer()<br>
{<br>
}<br>
<br>
public UdpServer( String ip, int port )<br>
{<br>
this.ip = ip;<br>
this.port = port;<br>
}<br>
<br>
public void serve()<br>
{<br>
try<br>
{<br>
this.address = new InetSocketAddress( ip, port );<br>
<br>
ConnectionlessBootstrap bootstrap = new ConnectionlessBootstrap(<br>
new NioDatagramChannelFactory( Executors<br>
.newCachedThreadPool() ) );<br>
<br>
// Set up the event pipeline factory.<br>
bootstrap.setPipelineFactory( new UdpServerPipelineFactory() );<br>
<br>
bootstrap.setOption( "receiveBufferSizePredictorFactory", new<br>
FixedReceiveBufferSizePredictorFactory( 1024 ) );<br>
<br>
// Bind and start to accept incoming connections.<br>
<a href="http://log.info" target="_blank">log.info</a>( "Attempting to bind to address:{}", address );<br>
bootstrap.bind( address );<br>
}<br>
catch ( Exception e )<br>
{<br>
log.error( "", e );<br>
}<br>
}<br>
}<br>
package com.example.netty.udp.server;<br>
<br>
import java.io.IOException;<br>
<br>
import org.slf4j.Logger;<br>
import org.slf4j.LoggerFactory;<br>
<br>
public class UdpServerBootstrap<br>
{<br>
private static final Logger log = LoggerFactory.getLogger(<br>
UdpServerBootstrap.class );<br>
<br>
public static void main( String[] args ) throws IOException<br>
{<br>
try<br>
{<br>
new UdpServer().serve();<br>
<br>
while ( true )<br>
{<br>
Thread.sleep( 5000 );<br>
}<br>
}<br>
catch ( Exception e )<br>
{<br>
log.error( "Unexpected error", e );<br>
}<br>
}<br>
}<br>
package com.example.netty.udp.server;<br>
<br>
import org.jboss.netty.channel.ChannelHandlerContext;<br>
import org.jboss.netty.channel.ChannelStateEvent;<br>
import org.jboss.netty.channel.MessageEvent;<br>
import org.jboss.netty.channel.SimpleChannelHandler;<br>
import org.slf4j.Logger;<br>
import org.slf4j.LoggerFactory;<br>
<br>
import com.example.netty.udp.model.DataRecord;<br>
<br>
public class UdpServerHandler extends SimpleChannelHandler<br>
{<br>
private static final Logger log = LoggerFactory.getLogger(<br>
UdpServerBootstrap.class );<br>
<br>
private static int received = 0;<br>
<br>
@Override<br>
public void messageReceived( ChannelHandlerContext ctx, MessageEvent e )<br>
{<br>
if ( e.getMessage() instanceof DataRecord )<br>
{<br>
DataRecord dataRecord = (DataRecord) e.getMessage();<br>
<a href="http://log.info" target="_blank">log.info</a>( "received:{} sequence:{}", received++,<br>
dataRecord.getSequence() );<br>
}<br>
else<br>
log.error( "Received an unknown message type:{}",<br>
e.getMessage().getClass().getName() );<br>
}<br>
<br>
@Override<br>
public void channelBound( ChannelHandlerContext ctx, ChannelStateEvent e<br>
) throws Exception<br>
{<br>
<a href="http://log.info" target="_blank">log.info</a>( "Successfully bound" );<br>
}<br>
}<br>
package com.example.netty.udp.server;<br>
<br>
import static org.jboss.netty.channel.Channels.pipeline;<br>
<br>
import org.jboss.netty.channel.ChannelPipeline;<br>
import org.jboss.netty.channel.ChannelPipelineFactory;<br>
<br>
import com.example.netty.udp.model.DataRecordDecoder;<br>
<br>
public class UdpServerPipelineFactory implements ChannelPipelineFactory<br>
{<br>
public ChannelPipeline getPipeline() throws Exception<br>
{<br>
ChannelPipeline pipeline = pipeline();<br>
<br>
// Add the number codec first,<br>
pipeline.addLast( "decoder", new DataRecordDecoder() );<br>
<br>
// and then business logic.<br>
pipeline.addLast( "handler", new UdpServerHandler() );<br>
<br>
return pipeline;<br>
}<br>
}<br>
<br>
///////////////////////<br>
// Model/Codecs<br>
///////////////////////<br>
<br>
<br>
package com.example.netty.udp.model;<br>
<br>
public class DataRecord<br>
{<br>
public static final int MessageHeaderSize = 20;<br>
public static final int DataPayloadSize = 336;<br>
public static final int MessageSize = 366;<br>
<br>
private Byte[] header = new Byte[MessageHeaderSize];<br>
<br>
@SuppressWarnings( "unused" )<br>
private short count = 112;<br>
private Byte[] data = new Byte[DataPayloadSize];<br>
<br>
private int identifier = 0;<br>
private int sequence = 0;<br>
<br>
public DataRecord( int identifier, int sequence )<br>
{<br>
for ( int i = 0; i < MessageHeaderSize; i++ )<br>
header[i] = (byte) i;<br>
<br>
for ( int i = 0; i < DataPayloadSize; i++ )<br>
data[i] = (byte) i;<br>
<br>
this.identifier = identifier;<br>
this.sequence = sequence;<br>
}<br>
<br>
public int getIdentifier()<br>
{<br>
return identifier;<br>
}<br>
<br>
public void setIdentifier( int identifier )<br>
{<br>
this.identifier = identifier;<br>
}<br>
<br>
public int getSequence()<br>
{<br>
return sequence;<br>
}<br>
<br>
public void setSequence( int sequence )<br>
{<br>
this.sequence = sequence;<br>
}<br>
<br>
public String toString()<br>
{<br>
StringBuilder builder = new StringBuilder();<br>
builder.append( DataRecord.class.getSimpleName() );<br>
builder.append( " identifier:" ).append( identifier );<br>
builder.append( " sequence:" ).append( sequence );<br>
<br>
return builder.toString();<br>
}<br>
}<br>
package com.example.netty.udp.model;<br>
<br>
import org.jboss.netty.buffer.ChannelBuffer;<br>
import org.jboss.netty.channel.Channel;<br>
import org.jboss.netty.channel.ChannelHandlerContext;<br>
import org.jboss.netty.handler.codec.oneone.OneToOneDecoder;<br>
<br>
public class DataRecordDecoder extends OneToOneDecoder<br>
{<br>
private static final int MessageHeaderSize = 20;<br>
<br>
// private static final Logger log = LoggerFactory.getLogger(<br>
DataRecordDecoder.class );<br>
<br>
@Override<br>
protected Object decode( ChannelHandlerContext arg0, Channel arg1, Object<br>
msg ) throws Exception<br>
{<br>
if ( !(msg instanceof ChannelBuffer) )<br>
return msg;<br>
<br>
ChannelBuffer buffer = (ChannelBuffer) msg;<br>
<br>
int identifier = buffer.getInt( MessageHeaderSize );<br>
int sequence = buffer.getInt( MessageHeaderSize + 4 );<br>
DataRecord record = new DataRecord( identifier, sequence );<br>
return record;<br>
}<br>
}<br>
package com.example.netty.udp.model;<br>
<br>
import org.jboss.netty.buffer.ChannelBuffer;<br>
import org.jboss.netty.buffer.ChannelBuffers;<br>
import org.jboss.netty.channel.Channel;<br>
import org.jboss.netty.channel.ChannelHandlerContext;<br>
import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;<br>
<br>
public class DataRecordEncoder extends OneToOneEncoder<br>
{<br>
<br>
@Override<br>
protected Object encode( ChannelHandlerContext ctx, Channel channel,<br>
Object msg ) throws Exception<br>
{<br>
if ( !(msg instanceof DataRecord) )<br>
return msg;<br>
<br>
DataRecord record = (DataRecord) msg;<br>
<br>
byte[] data = new byte[DataRecord.MessageSize];<br>
for ( int i = 0; i < DataRecord.MessageSize; i++ )<br>
data[i] = (byte) i;<br>
<br>
// Construct a message.<br>
ChannelBuffer buf = ChannelBuffers.dynamicBuffer();<br>
buf.writeBytes( data ); // data<br>
buf.setInt( DataRecord.MessageHeaderSize, record.getIdentifier() );<br>
buf.setInt( DataRecord.MessageHeaderSize + 4, record.getSequence() );<br>
<br>
return buf;<br>
}<br>
}<br>
<font color="#888888"><br>
--<br>
View this message in context: <a href="http://netty-forums-and-mailing-lists.685743.n2.nabble.com/UDP-Help-tp5699562p5699562.html" target="_blank">http://netty-forums-and-mailing-lists.685743.n2.nabble.com/UDP-Help-tp5699562p5699562.html</a><br>
Sent from the Netty User Group mailing list archive at Nabble.com.<br>
_______________________________________________<br>
netty-users mailing list<br>
<a href="mailto:netty-users@lists.jboss.org">netty-users@lists.jboss.org</a><br>
<a href="https://lists.jboss.org/mailman/listinfo/netty-users" target="_blank">https://lists.jboss.org/mailman/listinfo/netty-users</a><br>
</font></blockquote></p>