[hornetq-commits] JBoss hornetq SVN: r11743 - in branches/Branch_2_2_EAP/src/main/org/hornetq/core: remoting/impl/netty and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Nov 22 16:37:07 EST 2011


Author: clebert.suconic at jboss.com
Date: 2011-11-22 16:37:07 -0500 (Tue, 22 Nov 2011)
New Revision: 11743

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/HornetQChannelHandler.java
Log:
avoiding dead locks through stomp / close session / exception handling

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompConnection.java	2011-11-22 21:35:33 UTC (rev 11742)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompConnection.java	2011-11-22 21:37:07 UTC (rev 11743)
@@ -17,6 +17,7 @@
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
 
 import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.HornetQBuffers;
@@ -66,6 +67,8 @@
 
    private final Object failLock = new Object();
    
+   private final Executor executor;
+   
    private volatile boolean dataReceived;
 
    public StompDecoder getDecoder()
@@ -73,7 +76,7 @@
       return decoder;
    }
 
-   StompConnection(final Acceptor acceptorUsed, final Connection transportConnection, final StompProtocolManager manager)
+   StompConnection(final Acceptor acceptorUsed, final Connection transportConnection, final StompProtocolManager manager, final Executor executor)
    {
       this.transportConnection = transportConnection;
 
@@ -82,6 +85,8 @@
       this.creationTime = System.currentTimeMillis();
       
       this.acceptorUsed = acceptorUsed;
+      
+      this.executor = executor;
    }
 
    public void addFailureListener(final FailureListener listener)
@@ -322,7 +327,6 @@
    private void callFailureListeners(final HornetQException me)
    {
       final List<FailureListener> listenersClone = new ArrayList<FailureListener>(failureListeners);
-
       for (final FailureListener listener : listenersClone)
       {
          try
@@ -343,20 +347,26 @@
    {
       final List<CloseListener> listenersClone = new ArrayList<CloseListener>(closeListeners);
 
-      for (final CloseListener listener : listenersClone)
-      {
-         try
+      // avoiding a dead lock
+      executor.execute(new Runnable(){
+         public void run()
          {
-            listener.connectionClosed();
+            for (final CloseListener listener : listenersClone)
+            {
+               try
+               {
+                  listener.connectionClosed();
+               }
+               catch (final Throwable t)
+               {
+                  // Failure of one listener to execute shouldn't prevent others
+                  // from
+                  // executing
+                  log.error("Failed to execute failure listener", t);
+               }
+            }
          }
-         catch (final Throwable t)
-         {
-            // Failure of one listener to execute shouldn't prevent others
-            // from
-            // executing
-            log.error("Failed to execute failure listener", t);
-         }
-      }
+      });
    }
 
 }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java	2011-11-22 21:35:33 UTC (rev 11742)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java	2011-11-22 21:37:07 UTC (rev 11743)
@@ -120,7 +120,7 @@
 
    public ConnectionEntry createConnectionEntry(final Acceptor acceptorUsed, final Connection connection)
    {
-      StompConnection conn = new StompConnection(acceptorUsed, connection, this);
+      StompConnection conn = new StompConnection(acceptorUsed, connection, this, server.getExecutorFactory().getExecutor());
 
       // Note that STOMP has no heartbeat, so if connection ttl is non zero, data must continue to be sent or connection
       // will be timed out and closed!

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/HornetQChannelHandler.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/HornetQChannelHandler.java	2011-11-22 21:35:33 UTC (rev 11742)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/HornetQChannelHandler.java	2011-11-22 21:37:07 UTC (rev 11743)
@@ -96,19 +96,20 @@
    @Override
    public void exceptionCaught(final ChannelHandlerContext ctx, final ExceptionEvent e) throws Exception
    {
-      synchronized (this)
+      if (!active)
       {
-         if (!active)
-         {
-            return;
-         }
+         return;
+      }
 
-         // We don't want to log this - since it is normal for this to happen during failover/reconnect
-         // and we don't want to spew out stack traces in that event
-         // The user has access to this exeception anyway via the HornetQException initial cause
+      // We don't want to log this - since it is normal for this to happen during failover/reconnect
+      // and we don't want to spew out stack traces in that event
+      // The user has access to this exeception anyway via the HornetQException initial cause
 
-         HornetQException me = new HornetQException(HornetQException.INTERNAL_ERROR, "Netty exception");
-         me.initCause(e.getCause());
+      HornetQException me = new HornetQException(HornetQException.INTERNAL_ERROR, "Netty exception");
+      me.initCause(e.getCause());
+      
+      synchronized (listener)
+      {
          try
          {
             listener.connectionException(e.getChannel().getId(), me);



More information about the hornetq-commits mailing list