[jboss-cvs] JBoss Messaging SVN: r6017 - trunk/src/main/org/jboss/messaging/integration/transports/netty.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Mar 6 08:18:00 EST 2009


Author: trustin
Date: 2009-03-06 08:18:00 -0500 (Fri, 06 Mar 2009)
New Revision: 6017

Modified:
   trunk/src/main/org/jboss/messaging/integration/transports/netty/MessagingChannelHandler.java
   trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java
   trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java
Log:
* Fixed a problem where all connections are not closed before ChannelFactory is shut down
* Used ChannelFactory.releaseExternalResources() to simplify the shutdown code
* Changed NettyAcceptor, NettyConnector, and MessagingChannelHandler to maintain the list of all active connections using ChannelGroup

Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/MessagingChannelHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/MessagingChannelHandler.java	2009-03-06 10:40:09 UTC (rev 6016)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/MessagingChannelHandler.java	2009-03-06 13:18:00 UTC (rev 6017)
@@ -32,6 +32,7 @@
 import org.jboss.netty.channel.ExceptionEvent;
 import org.jboss.netty.channel.MessageEvent;
 import org.jboss.netty.channel.SimpleChannelHandler;
+import org.jboss.netty.channel.group.ChannelGroup;
 
 /**
  * Common handler implementation for client and server side handler.
@@ -43,19 +44,29 @@
 {
    private static final Logger log = Logger.getLogger(MessagingChannelHandler.class);
 
+   private final ChannelGroup group;
+   
    private final BufferHandler handler;
 
    private final ConnectionLifeCycleListener listener;
 
    volatile boolean active;
 
-   MessagingChannelHandler(BufferHandler handler, ConnectionLifeCycleListener listener)
+   MessagingChannelHandler(ChannelGroup group, BufferHandler handler, ConnectionLifeCycleListener listener)
    {
+      this.group = group;
       this.handler = handler;
       this.listener = listener;
    }
 
    @Override
+   public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception
+   {
+      group.add(e.getChannel());
+      ctx.sendUpstream(e);
+   }
+
+   @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception
    {
       ChannelBuffer buffer = (ChannelBuffer)e.getMessage();

Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java	2009-03-06 10:40:09 UTC (rev 6016)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java	2009-03-06 13:18:00 UTC (rev 6017)
@@ -43,6 +43,8 @@
 import org.jboss.netty.channel.ChannelPipelineFactory;
 import org.jboss.netty.channel.ChannelStateEvent;
 import static org.jboss.netty.channel.Channels.pipeline;
+
+import org.jboss.netty.channel.group.ChannelGroup;
 import org.jboss.netty.channel.group.DefaultChannelGroup;
 import org.jboss.netty.channel.local.DefaultLocalServerChannelFactory;
 import org.jboss.netty.channel.local.LocalAddress;
@@ -82,7 +84,7 @@
 
    private ChannelFactory channelFactory;
 
-   private DefaultChannelGroup serverChannelGroup;
+   volatile ChannelGroup channelGroup;
 
    private ServerBootstrap bootstrap;
 
@@ -100,7 +102,7 @@
 
    private final boolean useNio;
 
-    private final boolean useInvm;
+   private final boolean useInvm;
 
    private final String host;
 
@@ -269,7 +271,7 @@
             }
 
             ChannelPipelineSupport.addCodecFilter(pipeline, handler);
-            pipeline.addLast("handler", new MessagingServerChannelHandler(handler, listener));
+            pipeline.addLast("handler", new MessagingServerChannelHandler(channelGroup, handler, listener));
             return pipeline;
          }
       };
@@ -289,7 +291,7 @@
       bootstrap.setOption("child.reuseAddress", true);
       bootstrap.setOption("child.keepAlive", true);
 
-      serverChannelGroup = new DefaultChannelGroup("jbm");
+      channelGroup = new DefaultChannelGroup("jbm-acceptor");
 
       String[] hosts = TransportConfiguration.splitHosts(host);
       for (String h : hosts)
@@ -305,7 +307,7 @@
             address = new InetSocketAddress(h, port);
          }
          Channel serverChannel = bootstrap.bind(address);
-         serverChannelGroup.add(serverChannel);
+         channelGroup.add(serverChannel);
       }
    }
 
@@ -323,46 +325,8 @@
 
          httpKeepAliveTimer.cancel();
       }
-      serverChannelGroup.close().awaitUninterruptibly();
-      bossExecutor.shutdownNow();
-      workerExecutor.shutdownNow();
-      
-      if (bossExecutor != null)
-      {
-         for (; ;)
-         {
-            try
-            {
-               if (bossExecutor.awaitTermination(1, TimeUnit.SECONDS))
-               {
-                  break;
-               }
-            }
-            catch (InterruptedException e)
-            {
-               // Ignore
-            }
-         }
-      }
-      
-      if (workerExecutor != null)
-      {
-         for (; ;)
-         {
-            try
-            {
-               if (workerExecutor.awaitTermination(1, TimeUnit.SECONDS))
-               {
-                  break;
-               }
-            }
-            catch (InterruptedException e)
-            {
-               // Ignore
-            }
-         }
-      }
-      
+      channelGroup.close().awaitUninterruptibly();
+      channelFactory.releaseExternalResources();
       channelFactory = null;
 
       for (Connection connection : connections.values())
@@ -383,9 +347,9 @@
    @ChannelPipelineCoverage("one")
    private final class MessagingServerChannelHandler extends MessagingChannelHandler
    {
-      MessagingServerChannelHandler(BufferHandler handler, ConnectionLifeCycleListener listener)
+      MessagingServerChannelHandler(ChannelGroup group, BufferHandler handler, ConnectionLifeCycleListener listener)
       {
-         super(handler, listener);
+         super(group, handler, listener);
       }
 
       @Override

Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java	2009-03-06 10:40:09 UTC (rev 6016)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java	2009-03-06 13:18:00 UTC (rev 6017)
@@ -63,6 +63,8 @@
 import org.jboss.netty.channel.MessageEvent;
 import org.jboss.netty.channel.SimpleChannelHandler;
 import org.jboss.netty.channel.UpstreamMessageEvent;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
 import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
 import org.jboss.netty.channel.socket.http.HttpTunnelAddress;
 import org.jboss.netty.channel.socket.http.HttpTunnelingClientSocketChannelFactory;
@@ -102,6 +104,8 @@
    private ChannelFactory channelFactory;
 
    private ClientBootstrap bootstrap;
+   
+   ChannelGroup channelGroup;
 
    private final BufferHandler handler;
 
@@ -279,6 +283,8 @@
       bootstrap.setOption("keepAlive", true);
       bootstrap.setOption("reuseAddress", true);
 
+      channelGroup = new DefaultChannelGroup("jbm-connector");
+
       final SSLContext context;
       if (sslEnabled)
       {
@@ -315,7 +321,7 @@
                pipeline.addLast("httphandler", new HttpHandler());
             }
             ChannelPipelineSupport.addCodecFilter(pipeline, handler);
-            pipeline.addLast("handler", new MessagingClientChannelHandler(handler, listener));
+            pipeline.addLast("handler", new MessagingClientChannelHandler(channelGroup, handler, listener));
             return pipeline;
          }
       });
@@ -329,49 +335,10 @@
       }
       
       bootstrap = null;
+      channelGroup.close().awaitUninterruptibly();
+      channelFactory.releaseExternalResources();
       channelFactory = null;
-      if (bossExecutor != null)
-      {
-         bossExecutor.shutdownNow();
-      }
-      workerExecutor.shutdownNow();  
       
-      if (bossExecutor != null)
-      {
-         for (; ;)
-         {
-            try
-            {
-               if (bossExecutor.awaitTermination(1, TimeUnit.SECONDS))
-               {
-                  break;
-               }
-            }
-            catch (InterruptedException e)
-            {
-               // Ignore
-            }
-         }
-      }
-      
-      if (workerExecutor != null)
-      {
-         for (; ;)
-         {
-            try
-            {
-               if (workerExecutor.awaitTermination(1, TimeUnit.SECONDS))
-               {
-                  break;
-               }
-            }
-            catch (InterruptedException e)
-            {
-               // Ignore
-            }
-         }
-      }
-      
       for (Connection connection : connections.values())
       {
          listener.connectionDestroyed(connection.getID());
@@ -466,9 +433,9 @@
    @ChannelPipelineCoverage("one")
    private final class MessagingClientChannelHandler extends MessagingChannelHandler
    {
-      MessagingClientChannelHandler(BufferHandler handler, ConnectionLifeCycleListener listener)
+      MessagingClientChannelHandler(ChannelGroup group, BufferHandler handler, ConnectionLifeCycleListener listener)
       {
-         super(handler, listener);
+         super(group, handler, listener);
       }
    }
 




More information about the jboss-cvs-commits mailing list