UDP Help
cporter71
cporter71 at gmail.com
Tue Nov 2 19:40:49 EDT 2010
Hello. I am a netty newbie coming from a current mina project. Mina seems
to solve a lot of our needs with the exception of some performance issues.
Currently I am working out a simple test client/server to see how netty
performs - relative to our context of course!
Our use case is generally straight forward -- read UDP packets as fast as
possible and internally queue them into the application. The packets are
fixed length (~350 bytes). No writes from the server to the client are
required -- the server is strictly a consumer.
I have been following the examples and reading through the user group but
have not found a good example of a client/server where the client is sending
larger packets as fast as it can. I created a sample/example demonstrating
such a use case however I clearly have something wrong because I cannot
send/receive 1000 packets on the loopback! I will continue to dig to see if
I can:
> Find what I am doing wrong! In the lengthy code attached I am receiving a
> fraction of the messages sent. There seems to be something magic
> happening around message #110 where the sequences numbers start the
> separation/delta.
> Find information on how to "tune" netty for UDP to handle sustained rates
> of 5+ Mbs from 10-20 clients (assuming appropriate hardware specs).
It is understood that UDP makes no guarantee on delivery however on a
closed/wireless network we would like to see 95%+. We have built-in retry
logic to fall back on however we would like to make sure the server is not
loosing/dropping the packets.
Any help would be appreciated.
cporter
>> This is my first post here - if there is a better way to post/share code
>> on this forum then please let me know.
///////////////////////
// Client
///////////////////////
package com.example.netty.udp.client;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.Executors;
import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class UdpClient
{
private static final Logger log = LoggerFactory.getLogger(
UdpClient.class );
private String ip = "127.0.0.1";
private int port = 50001;
private int identifier = 0;
private int count = 0;
private final SocketAddress address;
public UdpClient( int identifier, int count )
{
this.identifier = identifier;
this.count = count;
this.address = new InetSocketAddress( ip, port );
}
public UdpClient( String ip, int port, int identifier, int count )
{
this.ip = ip;
this.port = port;
this.identifier = identifier;
this.count = count;
this.address = new InetSocketAddress( ip, port );
}
public void send()
{
// Configure the client.
ConnectionlessBootstrap bootstrap = new ConnectionlessBootstrap( new
NioDatagramChannelFactory( Executors
.newCachedThreadPool() ) );
// Set up the event pipeline factory.
bootstrap.setPipelineFactory( new UdpClientPipelineFactory(
identifier, count ) );
// Make a new connection.
log.info( "Attempting to connect to address:{}", address );
bootstrap.connect( address );
}
}
package com.example.netty.udp.client;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class UdpClientBootstrap
{
private static final Logger log = LoggerFactory.getLogger(
UdpClientBootstrap.class );
private static final int DefaultIdentifer = 42;
private static final int DefaultCount = 1000;
public static void main( String[] args ) throws IOException
{
try
{
int identifier = DefaultIdentifer;
int count = DefaultCount;
new UdpClient( identifier, count ).send();
log.info( "Send complete -- exiting" );
}
catch ( Exception e )
{
log.error( "Unexpected error", e );
}
}
}
package com.example.netty.udp.client;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.example.netty.udp.model.DataRecord;
public class UdpClientHandler extends SimpleChannelUpstreamHandler
{
private static final Logger log = LoggerFactory.getLogger(
UdpClientHandler.class );
private int identifier = 0;
private int count = 0;
private int index = 0;
public UdpClientHandler( int identifier, int count )
{
this.identifier = identifier;
this.count = count;
}
@Override
public void channelConnected( ChannelHandlerContext ctx,
ChannelStateEvent e )
{
log.info( "channelConnected" );
sendRecords( e );
}
@Override
public void handleUpstream( ChannelHandlerContext ctx, ChannelEvent e )
throws Exception
{
if ( e instanceof ChannelStateEvent )
{
log.info( e.toString() );
}
super.handleUpstream( ctx, e );
}
@Override
public void channelInterestChanged( ChannelHandlerContext ctx,
ChannelStateEvent e )
{
log.info( "channelInterestChanged" );
sendRecords( e );
}
@Override
public void exceptionCaught( ChannelHandlerContext ctx, ExceptionEvent e
)
{
log.error( "Unexpected exception from downstream.", e.getCause() );
}
private void sendRecords( ChannelStateEvent e )
{
Channel channel = e.getChannel();
while ( channel.isWritable() )
{
if ( index <= count )
{
DataRecord record = new DataRecord( identifier, index );
channel.write( record );
index++;
}
else
{
break;
}
}
}
}
package com.example.netty.udp.client;
import static org.jboss.netty.channel.Channels.pipeline;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import com.example.netty.udp.model.DataRecordEncoder;
public class UdpClientPipelineFactory implements ChannelPipelineFactory
{
private final int identifier;
private final int count;
public UdpClientPipelineFactory( int identifier, int count )
{
this.identifier = identifier;
this.count = count;
}
public ChannelPipeline getPipeline() throws Exception
{
ChannelPipeline pipeline = pipeline();
// Add the number codec first,
pipeline.addLast( "encoder", new DataRecordEncoder() );
// and then business logic.
pipeline.addLast( "handler", new UdpClientHandler( identifier, count )
);
return pipeline;
}
}
///////////////////////
// Server
///////////////////////
package com.example.netty.udp.server;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.Executors;
import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
import org.jboss.netty.channel.FixedReceiveBufferSizePredictorFactory;
import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class UdpServer
{
private static final Logger log =
LoggerFactory.getLogger( UdpServer.class );
public static final int OneMg = 1024 * 1024;
public static final int SocketReceiveBufferSize = OneMg * 64;
private String ip = "127.0.0.1";
private int port = 50001;
private SocketAddress address;
public UdpServer()
{
}
public UdpServer( String ip, int port )
{
this.ip = ip;
this.port = port;
}
public void serve()
{
try
{
this.address = new InetSocketAddress( ip, port );
ConnectionlessBootstrap bootstrap = new ConnectionlessBootstrap(
new NioDatagramChannelFactory( Executors
.newCachedThreadPool() ) );
// Set up the event pipeline factory.
bootstrap.setPipelineFactory( new UdpServerPipelineFactory() );
bootstrap.setOption( "receiveBufferSizePredictorFactory", new
FixedReceiveBufferSizePredictorFactory( 1024 ) );
// Bind and start to accept incoming connections.
log.info( "Attempting to bind to address:{}", address );
bootstrap.bind( address );
}
catch ( Exception e )
{
log.error( "", e );
}
}
}
package com.example.netty.udp.server;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class UdpServerBootstrap
{
private static final Logger log = LoggerFactory.getLogger(
UdpServerBootstrap.class );
public static void main( String[] args ) throws IOException
{
try
{
new UdpServer().serve();
while ( true )
{
Thread.sleep( 5000 );
}
}
catch ( Exception e )
{
log.error( "Unexpected error", e );
}
}
}
package com.example.netty.udp.server;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.example.netty.udp.model.DataRecord;
public class UdpServerHandler extends SimpleChannelHandler
{
private static final Logger log = LoggerFactory.getLogger(
UdpServerBootstrap.class );
private static int received = 0;
@Override
public void messageReceived( ChannelHandlerContext ctx, MessageEvent e )
{
if ( e.getMessage() instanceof DataRecord )
{
DataRecord dataRecord = (DataRecord) e.getMessage();
log.info( "received:{} sequence:{}", received++,
dataRecord.getSequence() );
}
else
log.error( "Received an unknown message type:{}",
e.getMessage().getClass().getName() );
}
@Override
public void channelBound( ChannelHandlerContext ctx, ChannelStateEvent e
) throws Exception
{
log.info( "Successfully bound" );
}
}
package com.example.netty.udp.server;
import static org.jboss.netty.channel.Channels.pipeline;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import com.example.netty.udp.model.DataRecordDecoder;
public class UdpServerPipelineFactory implements ChannelPipelineFactory
{
public ChannelPipeline getPipeline() throws Exception
{
ChannelPipeline pipeline = pipeline();
// Add the number codec first,
pipeline.addLast( "decoder", new DataRecordDecoder() );
// and then business logic.
pipeline.addLast( "handler", new UdpServerHandler() );
return pipeline;
}
}
///////////////////////
// Model/Codecs
///////////////////////
package com.example.netty.udp.model;
public class DataRecord
{
public static final int MessageHeaderSize = 20;
public static final int DataPayloadSize = 336;
public static final int MessageSize = 366;
private Byte[] header = new Byte[MessageHeaderSize];
@SuppressWarnings( "unused" )
private short count = 112;
private Byte[] data = new Byte[DataPayloadSize];
private int identifier = 0;
private int sequence = 0;
public DataRecord( int identifier, int sequence )
{
for ( int i = 0; i < MessageHeaderSize; i++ )
header[i] = (byte) i;
for ( int i = 0; i < DataPayloadSize; i++ )
data[i] = (byte) i;
this.identifier = identifier;
this.sequence = sequence;
}
public int getIdentifier()
{
return identifier;
}
public void setIdentifier( int identifier )
{
this.identifier = identifier;
}
public int getSequence()
{
return sequence;
}
public void setSequence( int sequence )
{
this.sequence = sequence;
}
public String toString()
{
StringBuilder builder = new StringBuilder();
builder.append( DataRecord.class.getSimpleName() );
builder.append( " identifier:" ).append( identifier );
builder.append( " sequence:" ).append( sequence );
return builder.toString();
}
}
package com.example.netty.udp.model;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.oneone.OneToOneDecoder;
public class DataRecordDecoder extends OneToOneDecoder
{
private static final int MessageHeaderSize = 20;
// private static final Logger log = LoggerFactory.getLogger(
DataRecordDecoder.class );
@Override
protected Object decode( ChannelHandlerContext arg0, Channel arg1, Object
msg ) throws Exception
{
if ( !(msg instanceof ChannelBuffer) )
return msg;
ChannelBuffer buffer = (ChannelBuffer) msg;
int identifier = buffer.getInt( MessageHeaderSize );
int sequence = buffer.getInt( MessageHeaderSize + 4 );
DataRecord record = new DataRecord( identifier, sequence );
return record;
}
}
package com.example.netty.udp.model;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
public class DataRecordEncoder extends OneToOneEncoder
{
@Override
protected Object encode( ChannelHandlerContext ctx, Channel channel,
Object msg ) throws Exception
{
if ( !(msg instanceof DataRecord) )
return msg;
DataRecord record = (DataRecord) msg;
byte[] data = new byte[DataRecord.MessageSize];
for ( int i = 0; i < DataRecord.MessageSize; i++ )
data[i] = (byte) i;
// Construct a message.
ChannelBuffer buf = ChannelBuffers.dynamicBuffer();
buf.writeBytes( data ); // data
buf.setInt( DataRecord.MessageHeaderSize, record.getIdentifier() );
buf.setInt( DataRecord.MessageHeaderSize + 4, record.getSequence() );
return buf;
}
}
--
View this message in context: http://netty-forums-and-mailing-lists.685743.n2.nabble.com/UDP-Help-tp5699562p5699562.html
Sent from the Netty User Group mailing list archive at Nabble.com.
More information about the netty-users
mailing list