thread blocked on channel.close()

vibeofboston djvibe_alb at yahoo.com
Sun Apr 11 13:32:57 EDT 2010


I am in a process of writing a small high frequency order receiving server. i
am writing the skeleton part of the code with netty to just receive simple
StringEncoder/StringDecoder [messages [delimited by \r\n]. 
the server closes the client connection when it receives the message
"LOGOUT" from the client.
everything seems to work fine if the client sends messages, say 10, thru a
single channel.write [11 being LOGOUT]
but if the client sends say around 1000, the server gets stuck on closing
the client channel [1001 being the LOGOUT]. even worse, if you run the
client again, no response from the server. 
I am using YourKit to monitor [attached image] and you will see the blocked
thread.
so what am i missing?..code is pretty straight forward...

Server

public void start() throws FixServerException {
		final ChannelFactory factory= new
NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
Executors.newCachedThreadPool());
		ServerBootstrap bootstrapper= new ServerBootstrap(factory);
		
		ChannelPipeline pipeline= bootstrapper.getPipeline();
		
		MessageQueue queue= new MessageQueue(new
ArrayBlockingQueue<FixMessage>(100));
		final MessageQueueProcessor queueProcessor= new
MessageQueueProcessor(queue);
		ExecutorService executorQueueProcessor = Executors.newFixedThreadPool(1);
		
		FixRawHandler handler= new FixRawHandler(queue);
		pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192,
Delimiters.lineDelimiter()));
		pipeline.addLast("encoder", new StringEncoder(Charset.defaultCharset()));
		pipeline.addLast("decoder", new StringDecoder(Charset.defaultCharset()));
		pipeline.addLast("executor", new ExecutionHandler(new
OrderedMemoryAwareThreadPoolExecutor(10, 1024, 1024)));
		pipeline.addLast("handler", handler);
		
		bootstrapper.setOption("child.tcpNoDelay", true);
		bootstrapper.setOption("child.keepAlive", true);
		bootstrapper.setOption("child.reuseAddress", true);
		
		System.out.println(Thread.currentThread().getName());
		Future<Boolean> queueFuture=
executorQueueProcessor.submit(queueProcessor); /launches the consumer queue
listener
		try
		{
			Channel channel= bootstrapper.bind(new
InetSocketAddress(InetAddress.getLocalHost(), 8080));			
		} catch(Exception ex)
		{
			throw new FixServerException(ex);		
		}
		
		Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { 
			@Override
			public void run() {			
				try
				{
					factory.releaseExternalResources();
					queueProcessor.interrupt();
					System.out.println("released all resources");
				} catch(Exception ex)
				{
					throw new FixServerException(ex);
				}
			}
		}));
		
		System.out.println("Running...");
		
	}



Server Handler

@ChannelPipelineCoverage("all")
public class FixRawHandler extends SimpleChannelUpstreamHandler {
	
	private MessageQueue queue;
	private static int counter=0;

	public FixRawHandler(MessageQueue queue) {
		this.queue= queue;
	}
	
	@Override
	public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)
			throws Exception {
		FixServer.ALL_CHANNELS.add(e.getChannel());
	}
	
	@Override
	public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent
e)
			throws Exception {
		System.out.println("FIX:Connected");

		//TODO: send connection message. check FIX connection established MESSAGE
		Channel ch = e.getChannel();
		e.getChannel().write("FIXSESSION: ACCEPTED\r\n");
	}

	@Override
	public void channelDisconnected(ChannelHandlerContext ctx,
			ChannelStateEvent e) throws Exception {
		System.out.println("FIX:DisConnected");
		FixServer.ALL_CHANNELS.remove(e.getChannel());
		
		//TODO: send disconnected message. check FIX connection disconnected
MESSAGE
	}

	@Override
	public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
			throws Exception {
		String msg= (String) e.getMessage();		
		if(msg.indexOf("LOGOUT")>0)
		{
			System.out.println("Logout received =>"+ msg); //it prints this
			e.getChannel().write("FIXSESSION: LOGOUT\r\n");		
			e.getChannel().close(); //thread gets stuck here 
		} else {
			e.getChannel().write("FIXACK: "+ ++counter + "\r\n");
			queue.getMessages().put(new FixMessage(msg)); //this is a blocking queue.
another thread is consuming this. 
		}
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
			throws Exception {
		e.getCause().printStackTrace();
		Channel ch = e.getChannel();
		ch.close();
	}
}


Client

@ChannelPipelineCoverage("all")
public class Client extends SimpleChannelUpstreamHandler {
	
	@Override
	public void handleUpstream(ChannelHandlerContext arg0, ChannelEvent arg1)
			throws Exception {
		super.handleUpstream(arg0, arg1);
	}
	
	@Override
	public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
			throws Exception {
		  String m = (String) e.getMessage();
		  System.out.println(m);		  

	}
	
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
			throws Exception {
		e.getCause().printStackTrace();
		e.getChannel().close();
	}
	
	private static int i=0;
	
	public static void main(String[] args) throws Exception {
		ChannelFactory factory= new
NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
Executors.newCachedThreadPool());
		
		ClientBootstrap bootstrap= new ClientBootstrap(factory);
		bootstrap.setPipelineFactory(new ClientPipeLineFactory());
		
		bootstrap.setOption("tcpNoDelay", true);
		bootstrap.setOption("keepAlive", true);
		
		ChannelFuture future = bootstrap.connect(new
InetSocketAddress(InetAddress.getLocalHost(), 8080));
		//wait till it connects
		final Channel writer= future.awaitUninterruptibly().getChannel();
		
		if(!future.isSuccess())
		{
			future.getCause().printStackTrace();
			bootstrap.releaseExternalResources();
			return;
		}
		
		for(int index=0; index < 1000; index++) //10 works but 1000 it doesn't
		{
			writer.write("FIX=4.1;SEQ="+ index + ";SYM=MSFT\r\n");			
		}		
		//server does print LOGOUT but never writtens back the logout mesg back to
client
		ChannelFuture lastWriteFuture= writer.write("FIX=4.1;OP=LOGOUT;\r\n");
		
		writer.getCloseFuture().awaitUninterruptibly();
		
		if(lastWriteFuture!=null)
			lastWriteFuture.awaitUninterruptibly();
		//is this duplicatate of writer.getCloseFuture().awaitUninterruptibly();
		writer.close().awaitUninterruptibly();
		bootstrap.releaseExternalResources();
		factory.releaseExternalResources();		
	}
}


Client pipeline 

public class ClientPipeLineFactory implements ChannelPipelineFactory {

	@Override
	public ChannelPipeline getPipeline() throws Exception {
		ChannelPipeline pipeline= Channels.pipeline();
		pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192,
Delimiters.lineDelimiter()));
		pipeline.addLast("encoder", new StringEncoder(Charset.defaultCharset()));
		pipeline.addLast("decoder", new StringDecoder(Charset.defaultCharset()));
		pipeline.addLast("handler", new Client());
		return pipeline;
	}
}


http://n2.nabble.com/file/n4886333/yourkit-threadview.jpg 
-- 
View this message in context: http://n2.nabble.com/thread-blocked-on-channel-close-tp4886333p4886333.html
Sent from the Netty User Group mailing list archive at Nabble.com.


More information about the netty-users mailing list