[hornetq-commits] JBoss hornetq SVN: r9083 - in trunk: src/main/org/hornetq/core/protocol/core and 4 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Apr 9 08:06:24 EDT 2010


Author: timfox
Date: 2010-04-09 08:06:23 -0400 (Fri, 09 Apr 2010)
New Revision: 9083

Modified:
   trunk/examples/jms/reattach-node/server0/hornetq-jms.xml
   trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
   trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
   trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
   trunk/src/main/org/hornetq/core/server/ServerSession.java
   trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-336

Modified: trunk/examples/jms/reattach-node/server0/hornetq-jms.xml
===================================================================
--- trunk/examples/jms/reattach-node/server0/hornetq-jms.xml	2010-04-09 11:42:08 UTC (rev 9082)
+++ trunk/examples/jms/reattach-node/server0/hornetq-jms.xml	2010-04-09 12:06:23 UTC (rev 9083)
@@ -27,6 +27,10 @@
       to try to reconnect -->
       <failover-on-server-shutdown>true</failover-on-server-shutdown>
       
+      <!-- We need to specify a confirmation-window-size to enable re-attachment, default is -1 which
+      means no re-attachment -->
+      <confirmation-window-size>1048576</confirmation-window-size>
+      
    </connection-factory>
    
    <!-- This is used by the example to send the management operations, it's not central to the example -->

Modified: trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java	2010-04-09 11:42:08 UTC (rev 9082)
+++ trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java	2010-04-09 12:06:23 UTC (rev 9083)
@@ -147,8 +147,6 @@
    {
       log.warn("Client connection failed, clearing up resources for session " + session.getName());
 
-      session.runConnectionFailureRunners();
-
       try
       {
          session.close();
@@ -177,7 +175,6 @@
 
    public void connectionClosed()
    {
-      session.runConnectionFailureRunners();
    }
 
    private void addConnectionListeners()

Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java	2010-04-09 11:42:08 UTC (rev 9082)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java	2010-04-09 12:06:23 UTC (rev 9083)
@@ -567,8 +567,7 @@
             {
                session.getSession().rollback(true);
                session.getSession().close();
-               session.getSession().runConnectionFailureRunners();
-            }
+             }
             catch (Exception e)
             {
                log.warn(e.getMessage(), e);
@@ -587,7 +586,6 @@
                {
                   serverSession.rollback(true);
                   serverSession.close();
-                  serverSession.runConnectionFailureRunners();
                }
                catch (Exception e)
                {

Modified: trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java	2010-04-09 11:42:08 UTC (rev 9082)
+++ trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java	2010-04-09 12:06:23 UTC (rev 9083)
@@ -31,8 +31,10 @@
 import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.logging.Logger;
+import org.hornetq.core.protocol.core.ServerSessionPacketHandler;
 import org.hornetq.core.protocol.core.impl.CoreProtocolManagerFactory;
 import org.hornetq.core.protocol.stomp.StompProtocolManagerFactory;
+import org.hornetq.core.remoting.FailureListener;
 import org.hornetq.core.remoting.server.RemotingService;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.management.ManagementService;
@@ -365,16 +367,32 @@
 
       if (conn != null)
       {
-         // if the connection has no failure listeners it means the sesssions etc were already closed so this is a clean
-         // shutdown, therefore we can destroy the connection
-         // otherwise client might have crashed/exited without closing connections so we leave them for connection TTL
+         // Bit of a hack - find a better way to do this
 
-         if (conn.connection.getFailureListeners().isEmpty())
+         List<FailureListener> failureListeners = conn.connection.getFailureListeners();
+
+         boolean empty = true;
+
+         for (FailureListener listener : failureListeners)
          {
+            if (listener instanceof ServerSessionPacketHandler)
+            {
+               empty = false;
+
+               break;
+            }
+         }
+
+         // We only destroy the connection if the connection has no sessions attached to it
+         // Otherwise it means the connection has died without the sessions being closed first
+         // so we need to keep them for ttl, in case re-attachment occurs
+         if (empty)
+         {
             connections.remove(connectionID);
 
             conn.connection.destroy();
          }
+
       }
    }
 

Modified: trunk/src/main/org/hornetq/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerSession.java	2010-04-09 11:42:08 UTC (rev 9082)
+++ trunk/src/main/org/hornetq/core/server/ServerSession.java	2010-04-09 12:06:23 UTC (rev 9083)
@@ -110,6 +110,4 @@
    void close() throws Exception;
 
    void setTransferring(boolean transferring);
-   
-   void runConnectionFailureRunners();
 }

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2010-04-09 11:42:08 UTC (rev 9082)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2010-04-09 12:06:23 UTC (rev 9083)
@@ -73,7 +73,7 @@
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
  * @author <a href="mailto:andy.taylor at jboss.org>Andy Taylor</a>
  */
-public class ServerSessionImpl implements ServerSession, FailureListener, CloseListener
+public class ServerSessionImpl implements ServerSession, FailureListener
 {
    // Constants -----------------------------------------------------------------------------
 
@@ -102,7 +102,7 @@
    private final Map<Long, ServerConsumer> consumers = new ConcurrentHashMap<Long, ServerConsumer>();
 
    private Transaction tx;
-   
+
    private final boolean xa;
 
    private final StorageManager storageManager;
@@ -117,8 +117,10 @@
 
    private volatile boolean started = false;
 
-   private final Map<SimpleString, Runnable> failureRunners = new HashMap<SimpleString, Runnable>();
+   // private final Map<SimpleString, Runnable> failureRunners = new HashMap<SimpleString, Runnable>();
 
+   private final Map<SimpleString, TempQueueCleanerUpper> tempQueueCleannerUppers = new HashMap<SimpleString, TempQueueCleanerUpper>();
+
    private final String name;
 
    private final HornetQServer server;
@@ -131,7 +133,7 @@
    private final RoutingContext routingContext = new RoutingContextImpl(null);
 
    private final SessionCallback callback;
-   
+
    private volatile SimpleString defaultAddress;
 
    // Constructors ---------------------------------------------------------------------------------
@@ -182,7 +184,7 @@
       {
          tx = new TransactionImpl(storageManager);
       }
-      
+
       this.xa = xa;
 
       this.strictUpdateDeliveryCount = strictUpdateDeliveryCount;
@@ -196,12 +198,10 @@
       this.managementAddress = managementAddress;
 
       this.callback = callback;
-      
+
       this.defaultAddress = defaultAddress;
 
       remotingConnection.addFailureListener(this);
-
-      remotingConnection.addCloseListener(this);
    }
 
    // ServerSession implementation ----------------------------------------------------------------------------
@@ -272,7 +272,7 @@
       }
 
       remotingConnection.removeFailureListener(this);
-
+      
       callback.closed();
    }
 
@@ -359,9 +359,9 @@
          // session is closed.
          // It is up to the user to delete the queue when finished with it
 
-         failureRunners.put(name, new Runnable()
+         CloseListener closeListener = new CloseListener()
          {
-            public void run()
+            public void connectionClosed()
             {
                try
                {
@@ -375,10 +375,57 @@
                   ServerSessionImpl.log.error("Failed to remove temporary queue " + name);
                }
             }
-         });
+         };
+
+         TempQueueCleanerUpper cleaner = new TempQueueCleanerUpper(postOffice, name);
+
+         remotingConnection.addCloseListener(cleaner);
+         remotingConnection.addFailureListener(cleaner);
+
+         tempQueueCleannerUppers.put(name, cleaner);
       }
    }
 
+   private static class TempQueueCleanerUpper implements CloseListener, FailureListener
+   {
+      private final PostOffice postOffice;
+
+      private final SimpleString bindingName;
+
+      TempQueueCleanerUpper(final PostOffice postOffice, final SimpleString bindingName)
+      {
+         this.postOffice = postOffice;
+
+         this.bindingName = bindingName;
+      }
+
+      private void run()
+      {
+         try
+         {
+            if (postOffice.getBinding(bindingName) != null)
+            {
+               postOffice.removeBinding(bindingName);
+            }
+         }
+         catch (Exception e)
+         {
+            ServerSessionImpl.log.error("Failed to remove temporary queue " + bindingName);
+         }
+      }
+
+      public void connectionFailed(HornetQException exception)
+      {
+         run();
+      }
+
+      public void connectionClosed()
+      {
+         run();
+      }
+
+   }
+
    public void deleteQueue(final SimpleString name) throws Exception
    {
       Binding binding = postOffice.getBinding(name);
@@ -390,7 +437,14 @@
 
       server.destroyQueue(name, this);
 
-      failureRunners.remove(name);
+      TempQueueCleanerUpper cleaner = this.tempQueueCleannerUppers.remove(name);
+      
+      if (cleaner != null)
+      {
+         remotingConnection.removeCloseListener(cleaner);
+         
+         remotingConnection.removeFailureListener(cleaner);
+      }
    }
 
    public QueueQueryResult executeQueueQuery(final SimpleString name) throws Exception
@@ -465,7 +519,7 @@
    public void acknowledge(final long consumerID, final long messageID) throws Exception
    {
       ServerConsumer consumer = consumers.get(consumerID);
-      
+
       if (this.xa && tx == null)
       {
          throw new HornetQXAException(XAException.XAER_PROTO, "Invalid transaction state");
@@ -473,11 +527,11 @@
 
       consumer.acknowledge(autoCommitAcks, tx, messageID);
    }
-   
+
    public void individualAcknowledge(final long consumerID, final long messageID) throws Exception
    {
       ServerConsumer consumer = consumers.get(consumerID);
-      
+
       if (this.xa && tx == null)
       {
          throw new HornetQXAException(XAException.XAER_PROTO, "Invalid transaction state");
@@ -921,27 +975,27 @@
 
       currentLargeMessage = msg;
    }
-   
+
    public void send(final ServerMessage message) throws Exception
    {
       long id = storageManager.generateUniqueID();
 
       SimpleString address = message.getAddress();
-      
+
       message.setMessageID(id);
       message.encodeMessageIDToBuffer();
-          
+
       if (address == null)
       {
          if (message.isDurable())
          {
-            //We need to force a re-encode when the message gets persisted or when it gets reloaded
-            //it will have no address
+            // We need to force a re-encode when the message gets persisted or when it gets reloaded
+            // it will have no address
             message.setAddress(defaultAddress);
          }
          else
          {
-            //We don't want to force a re-encode when the message gets sent to the consumer
+            // We don't want to force a re-encode when the message gets sent to the consumer
             message.setAddressTransient(defaultAddress);
          }
       }
@@ -955,8 +1009,8 @@
       else
       {
          doSend(message);
-      }      
-      
+      }
+
       if (defaultAddress == null)
       {
          defaultAddress = address;
@@ -986,9 +1040,9 @@
    }
 
    public void requestProducerCredits(final SimpleString address, final int credits) throws Exception
-   { 
+   {
       PagingStore store = postOffice.getPagingManager().getPageStore(address);
-      
+
       store.executeRunnableWhenMemoryAvailable(new Runnable()
       {
          public void run()
@@ -1008,21 +1062,6 @@
       }
    }
 
-   public void runConnectionFailureRunners()
-   {
-      for (Runnable runner : failureRunners.values())
-      {
-         try
-         {
-            runner.run();
-         }
-         catch (Throwable t)
-         {
-            ServerSessionImpl.log.error("Failed to execute failure runner", t);
-         }
-      }
-   }
-
    // FailureListener implementation
    // --------------------------------------------------------------------
 
@@ -1032,18 +1071,6 @@
       {
          ServerSessionImpl.log.warn("Client connection failed, clearing up resources for session " + name);
 
-         for (Runnable runner : failureRunners.values())
-         {
-            try
-            {
-               runner.run();
-            }
-            catch (Throwable t)
-            {
-               ServerSessionImpl.log.error("Failed to execute failure runner", t);
-            }
-         }
-
          close();
 
          ServerSessionImpl.log.warn("Cleared up resources for session " + name);
@@ -1054,29 +1081,7 @@
       }
    }
 
-   public void connectionClosed()
-   {
-      try
-      {
-         for (Runnable runner : failureRunners.values())
-         {
-            try
-            {
-               runner.run();
-            }
-            catch (Throwable t)
-            {
-               ServerSessionImpl.log.error("Failed to execute failure runner", t);
-            }
-         }
-      }
-      catch (Throwable t)
-      {
-         ServerSessionImpl.log.error("Failed to fire listeners " + this);
-      }
 
-   }
-
    // Public
    // ----------------------------------------------------------------------------
 
@@ -1174,7 +1179,7 @@
       {
          throw new HornetQXAException(XAException.XAER_PROTO, "Invalid transaction state");
       }
-      
+
       if (tx == null || autoCommitSends)
       {
       }



More information about the hornetq-commits mailing list