[jboss-cvs] JBoss Messaging SVN: r4845 - in trunk/src/main/org/jboss/messaging/core/remoting/impl: netty and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Aug 20 02:44:33 EDT 2008


Author: trustin
Date: 2008-08-20 02:44:33 -0400 (Wed, 20 Aug 2008)
New Revision: 4845

Added:
   trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/MessagingChannelHandler.java
Modified:
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnection.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyAcceptor.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyConnection.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyConnector.java
Log:
Fixed ClientCrashTest failures

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnection.java	2008-08-20 01:19:54 UTC (rev 4844)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnection.java	2008-08-20 06:44:33 UTC (rev 4845)
@@ -23,6 +23,8 @@
 package org.jboss.messaging.core.remoting.impl.mina;
 
 import org.apache.mina.core.buffer.IoBuffer;
+import org.apache.mina.core.future.IoFutureListener;
+import org.apache.mina.core.service.IoConnector;
 import org.apache.mina.core.session.IoSession;
 import org.apache.mina.filter.ssl.SslFilter;
 import org.jboss.messaging.core.logging.Logger;
@@ -68,21 +70,35 @@
          return;
       }
 
-      session.close().awaitUninterruptibly();
+      SslFilter sslFilter = (SslFilter) session.getFilterChain().get("ssl");
 
-      SslFilter sslFilter = (SslFilter) session.getFilterChain().get("ssl");
-      if (sslFilter != null)
-      {
-         try
+      if (session.getService() instanceof IoConnector) {
+         if (sslFilter != null)
          {
-            sslFilter.stopSsl(session).awaitUninterruptibly();
+            try
+            {
+               sslFilter.stopSsl(session).awaitUninterruptibly();
+            }
+            catch (Throwable t)
+            {
+               // ignore
+            }
          }
-         catch (Throwable t)
+         session.close().awaitUninterruptibly();
+      } else {
+         if (sslFilter != null)
          {
-            // ignore
+            try
+            {
+               sslFilter.stopSsl(session).addListener(IoFutureListener.CLOSE);
+            }
+            catch (Throwable t)
+            {
+               // ignore
+            }
+         } else {
+            session.close();
          }
-
-
       }
 
       closed = true;

Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/MessagingChannelHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/MessagingChannelHandler.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/MessagingChannelHandler.java	2008-08-20 06:44:33 UTC (rev 4845)
@@ -0,0 +1,100 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.remoting.impl.netty;
+
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.ConnectionLifeCycleListener;
+import org.jboss.messaging.core.remoting.RemotingHandler;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
+
+/**
+ * Common handler implementation for client and server side handler.
+ *
+ * @author <a href="mailto:tlee at redhat.com">Trustin Lee</a>
+ * @version $Rev$, $Date$
+ */
+class MessagingChannelHandler extends SimpleChannelHandler
+{
+   private static final Logger log = Logger.getLogger(MessagingChannelHandler.class);
+
+   private final RemotingHandler handler;
+   private final ConnectionLifeCycleListener listener;
+   volatile boolean destroyed;
+
+   MessagingChannelHandler(RemotingHandler handler, ConnectionLifeCycleListener listener)
+   {
+      this.handler = handler;
+      this.listener = listener;
+   }
+
+   @Override
+   public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception
+   {
+      ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
+      handler.bufferReceived(e.getChannel().getId(), new ChannelBufferWrapper(buffer));
+   }
+
+   @Override
+   public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception
+   {
+      synchronized (this)
+      {
+         if (!destroyed)
+         {
+            listener.connectionDestroyed(e.getChannel().getId());
+            destroyed = true;
+         }
+      }
+   }
+
+   @Override
+   public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception
+   {
+      log.error(
+            "caught exception " + e.getCause() + " for channel " +
+            e.getChannel(), e.getCause());
+
+      synchronized (this)
+      {
+         if (destroyed)
+         {
+            return;
+         }
+
+         MessagingException me = new MessagingException(MessagingException.INTERNAL_ERROR, "Netty exception");
+         me.initCause(e.getCause());
+         try {
+            listener.connectionException(e.getChannel().getId(), me);
+            destroyed = true;
+         } catch (Exception ex) {
+            log.error("failed to notify the listener:", ex);
+         }
+      }
+   }
+}


Property changes on: trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/MessagingChannelHandler.java
___________________________________________________________________
Name: svn:keywords
   + Rev Date
Name: svn:eol-style
   + native

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyAcceptor.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyAcceptor.java	2008-08-20 01:19:54 UTC (rev 4844)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyAcceptor.java	2008-08-20 06:44:33 UTC (rev 4845)
@@ -22,7 +22,7 @@
 
 package org.jboss.messaging.core.remoting.impl.netty;
 
-import static org.jboss.netty.channel.Channels.pipeline;
+import static org.jboss.netty.channel.Channels.*;
 
 import java.net.InetSocketAddress;
 import java.util.concurrent.ExecutorService;
@@ -31,15 +31,12 @@
 import javax.net.ssl.SSLContext;
 
 import org.jboss.messaging.core.config.Configuration;
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.ConnectionLifeCycleListener;
 import org.jboss.messaging.core.remoting.RemotingHandler;
 import org.jboss.messaging.core.remoting.impl.ssl.SSLSupport;
 import org.jboss.messaging.core.remoting.spi.Acceptor;
 import org.jboss.messaging.core.remoting.spi.Connection;
 import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelFactory;
 import org.jboss.netty.channel.ChannelHandlerContext;
@@ -47,9 +44,6 @@
 import org.jboss.netty.channel.ChannelPipelineCoverage;
 import org.jboss.netty.channel.ChannelPipelineFactory;
 import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
 import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
 
 /**
@@ -63,8 +57,6 @@
  */
 public class NettyAcceptor implements Acceptor
 {
-   private static final Logger log = Logger.getLogger(NettyAcceptor.class);
-
    private ExecutorService bossExecutor;
    private ExecutorService workerExecutor;
    private ChannelFactory channelFactory;
@@ -128,7 +120,7 @@
                ChannelPipelineSupport.addSSLFilter(pipeline, context, false);
             }
             ChannelPipelineSupport.addCodecFilter(pipeline, handler);
-            pipeline.addLast("handler", new NettyHandler());
+            pipeline.addLast("handler", new MessagingServerChannelHandler(handler, listener));
             return pipeline;
          }
       });
@@ -169,49 +161,18 @@
    // Inner classes -----------------------------------------------------------------------------
 
    @ChannelPipelineCoverage("one")
-   private final class NettyHandler extends SimpleChannelHandler
+   private final class MessagingServerChannelHandler extends MessagingChannelHandler
    {
+      MessagingServerChannelHandler(RemotingHandler handler, ConnectionLifeCycleListener listener)
+      {
+         super(handler, listener);
+      }
+
       @Override
       public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception
       {
          Connection tc = new NettyConnection(e.getChannel());
          listener.connectionCreated(tc);
       }
-
-      @Override
-      public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception
-      {
-         boolean notify;
-         synchronized (NettyAcceptor.this)
-         {
-            notify = channelFactory == null;
-         }
-
-         if (notify) {
-            listener.connectionDestroyed(e.getChannel().getId());
-         }
-      }
-
-      @Override
-      public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception
-      {
-         log.error(
-               "caught exception " + e.getCause() + " for channel " +
-               e.getChannel(), e.getCause());
-         MessagingException me = new MessagingException(MessagingException.INTERNAL_ERROR, "Netty exception");
-         me.initCause(e.getCause());
-         try {
-            listener.connectionException(e.getChannel().getId(), me);
-         } catch (Exception ex) {
-            log.error("failed to notify the listener:", ex);
-         }
-      }
-
-      @Override
-      public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception
-      {
-         ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
-         handler.bufferReceived(e.getChannel().getId(), new ChannelBufferWrapper(buffer));
-      }
    }
 }

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyConnection.java	2008-08-20 01:19:54 UTC (rev 4844)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyConnection.java	2008-08-20 06:44:33 UTC (rev 4845)
@@ -80,6 +80,12 @@
          channel.close();
       }
 
+      // TODO Do not spin - use signal.
+      MessagingChannelHandler handler = (MessagingChannelHandler) channel.getPipeline().get("handler");
+      while (!handler.destroyed) {
+         Thread.yield();
+      }
+
       closed = true;
    }
 

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyConnector.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyConnector.java	2008-08-20 01:19:54 UTC (rev 4844)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/netty/NettyConnector.java	2008-08-20 06:44:33 UTC (rev 4845)
@@ -21,7 +21,7 @@
  */
 package org.jboss.messaging.core.remoting.impl.netty;
 
-import static org.jboss.netty.channel.Channels.pipeline;
+import static org.jboss.netty.channel.Channels.*;
 
 import java.net.InetSocketAddress;
 import java.util.concurrent.ExecutorService;
@@ -31,7 +31,6 @@
 
 import org.jboss.messaging.core.client.ConnectionParams;
 import org.jboss.messaging.core.client.Location;
-import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.ConnectionLifeCycleListener;
 import org.jboss.messaging.core.remoting.RemotingHandler;
@@ -39,7 +38,6 @@
 import org.jboss.messaging.core.remoting.spi.Connection;
 import org.jboss.messaging.core.remoting.spi.Connector;
 import org.jboss.netty.bootstrap.ClientBootstrap;
-import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.channel.ChannelFactory;
 import org.jboss.netty.channel.ChannelFuture;
 import org.jboss.netty.channel.ChannelHandlerContext;
@@ -47,9 +45,6 @@
 import org.jboss.netty.channel.ChannelPipelineCoverage;
 import org.jboss.netty.channel.ChannelPipelineFactory;
 import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
 import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
 import org.jboss.netty.handler.ssl.SslHandler;
 
@@ -166,7 +161,7 @@
                   ChannelPipelineSupport.addSSLFilter(pipeline, context, true);
             }
             ChannelPipelineSupport.addCodecFilter(pipeline, handler);
-            pipeline.addLast("handler", new NettyHandler());
+            pipeline.addLast("handler", new MessagingClientChannelHandler(handler, listener));
             return pipeline;
          }
       });
@@ -212,13 +207,11 @@
    // Inner classes -------------------------------------------------
 
    @ChannelPipelineCoverage("one")
-   private final class NettyHandler extends SimpleChannelHandler
+   private final class MessagingClientChannelHandler extends MessagingChannelHandler
    {
-      @Override
-      public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception
+      MessagingClientChannelHandler(RemotingHandler handler, ConnectionLifeCycleListener listener)
       {
-         ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
-         handler.bufferReceived(e.getChannel().getId(), new ChannelBufferWrapper(buffer));
+         super(handler, listener);
       }
 
       @Override
@@ -230,27 +223,5 @@
             sslHandler.handshake(e.getChannel());
          }
       }
-
-      @Override
-      public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception
-      {
-         listener.connectionDestroyed(e.getChannel().getId());
-      }
-
-      @Override
-      public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception
-      {
-         log.error(
-               "caught exception " + e.getCause() + " for channel " +
-               e.getChannel(), e.getCause());
-
-         MessagingException me = new MessagingException(MessagingException.INTERNAL_ERROR, "Netty exception");
-         me.initCause(e.getCause());
-         try {
-            listener.connectionException(e.getChannel().getId(), me);
-         } catch (Exception ex) {
-            log.error("failed to notify the listener:", ex);
-         }
-      }
    }
 }




More information about the jboss-cvs-commits mailing list