[jboss-cvs] JBoss Messaging SVN: r6017 - trunk/src/main/org/jboss/messaging/integration/transports/netty.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Mar 6 08:18:00 EST 2009
Author: trustin
Date: 2009-03-06 08:18:00 -0500 (Fri, 06 Mar 2009)
New Revision: 6017
Modified:
trunk/src/main/org/jboss/messaging/integration/transports/netty/MessagingChannelHandler.java
trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java
trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java
Log:
* Fixed a problem where all connections are not closed before ChannelFactory is shut down
* Used ChannelFactory.releaseExternalResources() to simplify the shutdown code
* Changed NettyAcceptor, NettyConnector, and MessagingChannelHandler to maintain the list of all active connections using ChannelGroup
Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/MessagingChannelHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/MessagingChannelHandler.java 2009-03-06 10:40:09 UTC (rev 6016)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/MessagingChannelHandler.java 2009-03-06 13:18:00 UTC (rev 6017)
@@ -32,6 +32,7 @@
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
+import org.jboss.netty.channel.group.ChannelGroup;
/**
* Common handler implementation for client and server side handler.
@@ -43,19 +44,29 @@
{
private static final Logger log = Logger.getLogger(MessagingChannelHandler.class);
+ private final ChannelGroup group;
+
private final BufferHandler handler;
private final ConnectionLifeCycleListener listener;
volatile boolean active;
- MessagingChannelHandler(BufferHandler handler, ConnectionLifeCycleListener listener)
+ MessagingChannelHandler(ChannelGroup group, BufferHandler handler, ConnectionLifeCycleListener listener)
{
+ this.group = group;
this.handler = handler;
this.listener = listener;
}
@Override
+ public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception
+ {
+ group.add(e.getChannel());
+ ctx.sendUpstream(e);
+ }
+
+ @Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception
{
ChannelBuffer buffer = (ChannelBuffer)e.getMessage();
Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java 2009-03-06 10:40:09 UTC (rev 6016)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java 2009-03-06 13:18:00 UTC (rev 6017)
@@ -43,6 +43,8 @@
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ChannelStateEvent;
import static org.jboss.netty.channel.Channels.pipeline;
+
+import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.local.DefaultLocalServerChannelFactory;
import org.jboss.netty.channel.local.LocalAddress;
@@ -82,7 +84,7 @@
private ChannelFactory channelFactory;
- private DefaultChannelGroup serverChannelGroup;
+ volatile ChannelGroup channelGroup;
private ServerBootstrap bootstrap;
@@ -100,7 +102,7 @@
private final boolean useNio;
- private final boolean useInvm;
+ private final boolean useInvm;
private final String host;
@@ -269,7 +271,7 @@
}
ChannelPipelineSupport.addCodecFilter(pipeline, handler);
- pipeline.addLast("handler", new MessagingServerChannelHandler(handler, listener));
+ pipeline.addLast("handler", new MessagingServerChannelHandler(channelGroup, handler, listener));
return pipeline;
}
};
@@ -289,7 +291,7 @@
bootstrap.setOption("child.reuseAddress", true);
bootstrap.setOption("child.keepAlive", true);
- serverChannelGroup = new DefaultChannelGroup("jbm");
+ channelGroup = new DefaultChannelGroup("jbm-acceptor");
String[] hosts = TransportConfiguration.splitHosts(host);
for (String h : hosts)
@@ -305,7 +307,7 @@
address = new InetSocketAddress(h, port);
}
Channel serverChannel = bootstrap.bind(address);
- serverChannelGroup.add(serverChannel);
+ channelGroup.add(serverChannel);
}
}
@@ -323,46 +325,8 @@
httpKeepAliveTimer.cancel();
}
- serverChannelGroup.close().awaitUninterruptibly();
- bossExecutor.shutdownNow();
- workerExecutor.shutdownNow();
-
- if (bossExecutor != null)
- {
- for (; ;)
- {
- try
- {
- if (bossExecutor.awaitTermination(1, TimeUnit.SECONDS))
- {
- break;
- }
- }
- catch (InterruptedException e)
- {
- // Ignore
- }
- }
- }
-
- if (workerExecutor != null)
- {
- for (; ;)
- {
- try
- {
- if (workerExecutor.awaitTermination(1, TimeUnit.SECONDS))
- {
- break;
- }
- }
- catch (InterruptedException e)
- {
- // Ignore
- }
- }
- }
-
+ channelGroup.close().awaitUninterruptibly();
+ channelFactory.releaseExternalResources();
channelFactory = null;
for (Connection connection : connections.values())
@@ -383,9 +347,9 @@
@ChannelPipelineCoverage("one")
private final class MessagingServerChannelHandler extends MessagingChannelHandler
{
- MessagingServerChannelHandler(BufferHandler handler, ConnectionLifeCycleListener listener)
+ MessagingServerChannelHandler(ChannelGroup group, BufferHandler handler, ConnectionLifeCycleListener listener)
{
- super(handler, listener);
+ super(group, handler, listener);
}
@Override
Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java 2009-03-06 10:40:09 UTC (rev 6016)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java 2009-03-06 13:18:00 UTC (rev 6017)
@@ -63,6 +63,8 @@
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.UpstreamMessageEvent;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
import org.jboss.netty.channel.socket.http.HttpTunnelAddress;
import org.jboss.netty.channel.socket.http.HttpTunnelingClientSocketChannelFactory;
@@ -102,6 +104,8 @@
private ChannelFactory channelFactory;
private ClientBootstrap bootstrap;
+
+ ChannelGroup channelGroup;
private final BufferHandler handler;
@@ -279,6 +283,8 @@
bootstrap.setOption("keepAlive", true);
bootstrap.setOption("reuseAddress", true);
+ channelGroup = new DefaultChannelGroup("jbm-connector");
+
final SSLContext context;
if (sslEnabled)
{
@@ -315,7 +321,7 @@
pipeline.addLast("httphandler", new HttpHandler());
}
ChannelPipelineSupport.addCodecFilter(pipeline, handler);
- pipeline.addLast("handler", new MessagingClientChannelHandler(handler, listener));
+ pipeline.addLast("handler", new MessagingClientChannelHandler(channelGroup, handler, listener));
return pipeline;
}
});
@@ -329,49 +335,10 @@
}
bootstrap = null;
+ channelGroup.close().awaitUninterruptibly();
+ channelFactory.releaseExternalResources();
channelFactory = null;
- if (bossExecutor != null)
- {
- bossExecutor.shutdownNow();
- }
- workerExecutor.shutdownNow();
- if (bossExecutor != null)
- {
- for (; ;)
- {
- try
- {
- if (bossExecutor.awaitTermination(1, TimeUnit.SECONDS))
- {
- break;
- }
- }
- catch (InterruptedException e)
- {
- // Ignore
- }
- }
- }
-
- if (workerExecutor != null)
- {
- for (; ;)
- {
- try
- {
- if (workerExecutor.awaitTermination(1, TimeUnit.SECONDS))
- {
- break;
- }
- }
- catch (InterruptedException e)
- {
- // Ignore
- }
- }
- }
-
for (Connection connection : connections.values())
{
listener.connectionDestroyed(connection.getID());
@@ -466,9 +433,9 @@
@ChannelPipelineCoverage("one")
private final class MessagingClientChannelHandler extends MessagingChannelHandler
{
- MessagingClientChannelHandler(BufferHandler handler, ConnectionLifeCycleListener listener)
+ MessagingClientChannelHandler(ChannelGroup group, BufferHandler handler, ConnectionLifeCycleListener listener)
{
- super(handler, listener);
+ super(group, handler, listener);
}
}
More information about the jboss-cvs-commits
mailing list