thread blocked on channel.close()
vibeofboston
djvibe_alb at yahoo.com
Sun Apr 11 13:32:57 EDT 2010
I am in a process of writing a small high frequency order receiving server. i
am writing the skeleton part of the code with netty to just receive simple
StringEncoder/StringDecoder [messages [delimited by \r\n].
the server closes the client connection when it receives the message
"LOGOUT" from the client.
everything seems to work fine if the client sends messages, say 10, thru a
single channel.write [11 being LOGOUT]
but if the client sends say around 1000, the server gets stuck on closing
the client channel [1001 being the LOGOUT]. even worse, if you run the
client again, no response from the server.
I am using YourKit to monitor [attached image] and you will see the blocked
thread.
so what am i missing?..code is pretty straight forward...
Server
public void start() throws FixServerException {
final ChannelFactory factory= new
NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
Executors.newCachedThreadPool());
ServerBootstrap bootstrapper= new ServerBootstrap(factory);
ChannelPipeline pipeline= bootstrapper.getPipeline();
MessageQueue queue= new MessageQueue(new
ArrayBlockingQueue<FixMessage>(100));
final MessageQueueProcessor queueProcessor= new
MessageQueueProcessor(queue);
ExecutorService executorQueueProcessor = Executors.newFixedThreadPool(1);
FixRawHandler handler= new FixRawHandler(queue);
pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192,
Delimiters.lineDelimiter()));
pipeline.addLast("encoder", new StringEncoder(Charset.defaultCharset()));
pipeline.addLast("decoder", new StringDecoder(Charset.defaultCharset()));
pipeline.addLast("executor", new ExecutionHandler(new
OrderedMemoryAwareThreadPoolExecutor(10, 1024, 1024)));
pipeline.addLast("handler", handler);
bootstrapper.setOption("child.tcpNoDelay", true);
bootstrapper.setOption("child.keepAlive", true);
bootstrapper.setOption("child.reuseAddress", true);
System.out.println(Thread.currentThread().getName());
Future<Boolean> queueFuture=
executorQueueProcessor.submit(queueProcessor); /launches the consumer queue
listener
try
{
Channel channel= bootstrapper.bind(new
InetSocketAddress(InetAddress.getLocalHost(), 8080));
} catch(Exception ex)
{
throw new FixServerException(ex);
}
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
try
{
factory.releaseExternalResources();
queueProcessor.interrupt();
System.out.println("released all resources");
} catch(Exception ex)
{
throw new FixServerException(ex);
}
}
}));
System.out.println("Running...");
}
Server Handler
@ChannelPipelineCoverage("all")
public class FixRawHandler extends SimpleChannelUpstreamHandler {
private MessageQueue queue;
private static int counter=0;
public FixRawHandler(MessageQueue queue) {
this.queue= queue;
}
@Override
public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
FixServer.ALL_CHANNELS.add(e.getChannel());
}
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent
e)
throws Exception {
System.out.println("FIX:Connected");
//TODO: send connection message. check FIX connection established MESSAGE
Channel ch = e.getChannel();
e.getChannel().write("FIXSESSION: ACCEPTED\r\n");
}
@Override
public void channelDisconnected(ChannelHandlerContext ctx,
ChannelStateEvent e) throws Exception {
System.out.println("FIX:DisConnected");
FixServer.ALL_CHANNELS.remove(e.getChannel());
//TODO: send disconnected message. check FIX connection disconnected
MESSAGE
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception {
String msg= (String) e.getMessage();
if(msg.indexOf("LOGOUT")>0)
{
System.out.println("Logout received =>"+ msg); //it prints this
e.getChannel().write("FIXSESSION: LOGOUT\r\n");
e.getChannel().close(); //thread gets stuck here
} else {
e.getChannel().write("FIXACK: "+ ++counter + "\r\n");
queue.getMessages().put(new FixMessage(msg)); //this is a blocking queue.
another thread is consuming this.
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
throws Exception {
e.getCause().printStackTrace();
Channel ch = e.getChannel();
ch.close();
}
}
Client
@ChannelPipelineCoverage("all")
public class Client extends SimpleChannelUpstreamHandler {
@Override
public void handleUpstream(ChannelHandlerContext arg0, ChannelEvent arg1)
throws Exception {
super.handleUpstream(arg0, arg1);
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception {
String m = (String) e.getMessage();
System.out.println(m);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
throws Exception {
e.getCause().printStackTrace();
e.getChannel().close();
}
private static int i=0;
public static void main(String[] args) throws Exception {
ChannelFactory factory= new
NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
Executors.newCachedThreadPool());
ClientBootstrap bootstrap= new ClientBootstrap(factory);
bootstrap.setPipelineFactory(new ClientPipeLineFactory());
bootstrap.setOption("tcpNoDelay", true);
bootstrap.setOption("keepAlive", true);
ChannelFuture future = bootstrap.connect(new
InetSocketAddress(InetAddress.getLocalHost(), 8080));
//wait till it connects
final Channel writer= future.awaitUninterruptibly().getChannel();
if(!future.isSuccess())
{
future.getCause().printStackTrace();
bootstrap.releaseExternalResources();
return;
}
for(int index=0; index < 1000; index++) //10 works but 1000 it doesn't
{
writer.write("FIX=4.1;SEQ="+ index + ";SYM=MSFT\r\n");
}
//server does print LOGOUT but never writtens back the logout mesg back to
client
ChannelFuture lastWriteFuture= writer.write("FIX=4.1;OP=LOGOUT;\r\n");
writer.getCloseFuture().awaitUninterruptibly();
if(lastWriteFuture!=null)
lastWriteFuture.awaitUninterruptibly();
//is this duplicatate of writer.getCloseFuture().awaitUninterruptibly();
writer.close().awaitUninterruptibly();
bootstrap.releaseExternalResources();
factory.releaseExternalResources();
}
}
Client pipeline
public class ClientPipeLineFactory implements ChannelPipelineFactory {
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline= Channels.pipeline();
pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192,
Delimiters.lineDelimiter()));
pipeline.addLast("encoder", new StringEncoder(Charset.defaultCharset()));
pipeline.addLast("decoder", new StringDecoder(Charset.defaultCharset()));
pipeline.addLast("handler", new Client());
return pipeline;
}
}
http://n2.nabble.com/file/n4886333/yourkit-threadview.jpg
--
View this message in context: http://n2.nabble.com/thread-blocked-on-channel-close-tp4886333p4886333.html
Sent from the Netty User Group mailing list archive at Nabble.com.
More information about the netty-users
mailing list