[hornetq-commits] JBoss hornetq SVN: r10360 - in branches/Branch_2_2_EAP/src/main/org/hornetq/core/server: cluster/impl and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Mar 24 14:00:09 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-03-24 14:00:09 -0400 (Thu, 24 Mar 2011)
New Revision: 10360

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
Log:
https://issues.jboss.org/browse/JBPAPP-6153

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java	2011-03-24 15:37:58 UTC (rev 10359)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java	2011-03-24 18:00:09 UTC (rev 10360)
@@ -166,4 +166,12 @@
    boolean isDirectDeliver();
 
    SimpleString getAddress();
+   
+   /**
+    * We can't send stuff to DLQ on queues used on clustered-bridge-communication
+    * @return
+    */
+   boolean isInternalQueue();
+   
+   void setInternalQueue(boolean internalQueue);
 }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java	2011-03-24 15:37:58 UTC (rev 10359)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java	2011-03-24 18:00:09 UTC (rev 10360)
@@ -107,6 +107,9 @@
       this.managementNotificationAddress = managementNotificationAddress;
       this.flowRecord = flowRecord;
       this.connector = connector;
+      
+      // we need to disable DLQ check on the clustered bridges
+      queue.setInternalQueue(true);
    }
 
    @Override

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-03-24 15:37:58 UTC (rev 10359)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-03-24 18:00:09 UTC (rev 10360)
@@ -715,19 +715,45 @@
             HornetQServerImpl.log.debug("Waiting for " + task);
          }
 
+         if (memoryManager != null)
+         {
+            memoryManager.stop();
+         }
+         
          threadPool.shutdown();
+         
+         scheduledPool.shutdown();
 
-         scheduledPool = null;
+         try
+         {
+            if (!threadPool.awaitTermination(10, TimeUnit.SECONDS))
+            {
+               HornetQServerImpl.log.warn("Timed out waiting for pool to terminate");
+            }
+         }
+         catch (InterruptedException e)
+         {
+            // Ignore
+         }
+         threadPool = null;
 
-         if (memoryManager != null)
+         
+         try
          {
-            memoryManager.stop();
+            if (!scheduledPool.awaitTermination(10, TimeUnit.SECONDS))
+            {
+               HornetQServerImpl.log.warn("Timed out waiting for scheduled pool to terminate");
+            }
          }
+         catch (InterruptedException e)
+         {
+            // Ignore
+         }
 
-         addressSettingsRepository.clear();
+         threadPool = null;
+         
+         scheduledPool = null;
 
-         securityRepository.clear();
-
          pagingManager = null;
          securityStore = null;
          resourceManager = null;
@@ -764,19 +790,7 @@
          Logger.reset();
       }
 
-      try
-      {
-         if (!threadPool.awaitTermination(5000, TimeUnit.MILLISECONDS))
-         {
-            HornetQServerImpl.log.warn("Timed out waiting for pool to terminate");
-         }
-      }
-      catch (InterruptedException e)
-      {
-         // Ignore
-      }
-      threadPool = null;
-   }
+    }
 
    // HornetQServer implementation
    // -----------------------------------------------------------

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java	2011-03-24 15:37:58 UTC (rev 10359)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java	2011-03-24 18:00:09 UTC (rev 10360)
@@ -32,7 +32,6 @@
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.core.filter.Filter;
 import org.hornetq.core.logging.Logger;
-import org.hornetq.core.paging.PagedMessage;
 import org.hornetq.core.paging.cursor.PageSubscription;
 import org.hornetq.core.paging.cursor.PagedReference;
 import org.hornetq.core.persistence.StorageManager;
@@ -153,6 +152,8 @@
    private volatile int consumerWithFilterCount;
 
    private final Runnable concurrentPoller = new ConcurrentPoller();
+   
+   private boolean internalQueue;
 
    private volatile boolean checkDirect;
 
@@ -1308,6 +1309,22 @@
       return directDeliver;
    }
 
+   /**
+    * @return the internalQueue
+    */
+   public boolean isInternalQueue()
+   {
+      return internalQueue;
+   }
+
+   /**
+    * @param internalQueue the internalQueue to set
+    */
+   public void setInternalQueue(boolean internalQueue)
+   {
+      this.internalQueue = internalQueue;
+   }
+
    // Public
    // -----------------------------------------------------------------------------
 
@@ -1582,9 +1599,15 @@
    {
       ServerMessage message = reference.getMessage();
 
+      if (internalQueue)
+      {
+         // no DLQ check on internal queues
+         return true;
+      }
+
       // TODO: DeliveryCount on paging
       
-      if (message.isDurable() && durable && !reference.isPaged())
+      if (!internalQueue && message.isDurable() && durable && !reference.isPaged())
       {
          storageManager.updateDeliveryCount(reference);
       }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2011-03-24 15:37:58 UTC (rev 10359)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2011-03-24 18:00:09 UTC (rev 10360)
@@ -265,7 +265,7 @@
             // the updateDeliveryCount would still be updated after c
             if (strictUpdateDeliveryCount)
             {
-               if (ref.getMessage().isDurable() && ref.getQueue().isDurable())
+               if (ref.getMessage().isDurable() && ref.getQueue().isDurable() && !ref.getQueue().isInternalQueue())
                {
                   storageManager.updateDeliveryCount(ref);
                }



More information about the hornetq-commits mailing list