Author: clebert.suconic(a)jboss.com
Date: 2011-03-24 14:00:09 -0400 (Thu, 24 Mar 2011)
New Revision: 10360
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
Log:
https://issues.jboss.org/browse/JBPAPP-6153
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java 2011-03-24
15:37:58 UTC (rev 10359)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java 2011-03-24
18:00:09 UTC (rev 10360)
@@ -166,4 +166,12 @@
boolean isDirectDeliver();
SimpleString getAddress();
+
+ /**
+ * We can't send stuff to DLQ on queues used on clustered-bridge-communication
+ * @return
+ */
+ boolean isInternalQueue();
+
+ void setInternalQueue(boolean internalQueue);
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-03-24
15:37:58 UTC (rev 10359)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-03-24
18:00:09 UTC (rev 10360)
@@ -107,6 +107,9 @@
this.managementNotificationAddress = managementNotificationAddress;
this.flowRecord = flowRecord;
this.connector = connector;
+
+ // we need to disable DLQ check on the clustered bridges
+ queue.setInternalQueue(true);
}
@Override
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-03-24
15:37:58 UTC (rev 10359)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-03-24
18:00:09 UTC (rev 10360)
@@ -715,19 +715,45 @@
HornetQServerImpl.log.debug("Waiting for " + task);
}
+ if (memoryManager != null)
+ {
+ memoryManager.stop();
+ }
+
threadPool.shutdown();
+
+ scheduledPool.shutdown();
- scheduledPool = null;
+ try
+ {
+ if (!threadPool.awaitTermination(10, TimeUnit.SECONDS))
+ {
+ HornetQServerImpl.log.warn("Timed out waiting for pool to
terminate");
+ }
+ }
+ catch (InterruptedException e)
+ {
+ // Ignore
+ }
+ threadPool = null;
- if (memoryManager != null)
+
+ try
{
- memoryManager.stop();
+ if (!scheduledPool.awaitTermination(10, TimeUnit.SECONDS))
+ {
+ HornetQServerImpl.log.warn("Timed out waiting for scheduled pool to
terminate");
+ }
}
+ catch (InterruptedException e)
+ {
+ // Ignore
+ }
- addressSettingsRepository.clear();
+ threadPool = null;
+
+ scheduledPool = null;
- securityRepository.clear();
-
pagingManager = null;
securityStore = null;
resourceManager = null;
@@ -764,19 +790,7 @@
Logger.reset();
}
- try
- {
- if (!threadPool.awaitTermination(5000, TimeUnit.MILLISECONDS))
- {
- HornetQServerImpl.log.warn("Timed out waiting for pool to
terminate");
- }
- }
- catch (InterruptedException e)
- {
- // Ignore
- }
- threadPool = null;
- }
+ }
// HornetQServer implementation
// -----------------------------------------------------------
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-03-24
15:37:58 UTC (rev 10359)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-03-24
18:00:09 UTC (rev 10360)
@@ -32,7 +32,6 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.cursor.PageSubscription;
import org.hornetq.core.paging.cursor.PagedReference;
import org.hornetq.core.persistence.StorageManager;
@@ -153,6 +152,8 @@
private volatile int consumerWithFilterCount;
private final Runnable concurrentPoller = new ConcurrentPoller();
+
+ private boolean internalQueue;
private volatile boolean checkDirect;
@@ -1308,6 +1309,22 @@
return directDeliver;
}
+ /**
+ * @return the internalQueue
+ */
+ public boolean isInternalQueue()
+ {
+ return internalQueue;
+ }
+
+ /**
+ * @param internalQueue the internalQueue to set
+ */
+ public void setInternalQueue(boolean internalQueue)
+ {
+ this.internalQueue = internalQueue;
+ }
+
// Public
// -----------------------------------------------------------------------------
@@ -1582,9 +1599,15 @@
{
ServerMessage message = reference.getMessage();
+ if (internalQueue)
+ {
+ // no DLQ check on internal queues
+ return true;
+ }
+
// TODO: DeliveryCount on paging
- if (message.isDurable() && durable && !reference.isPaged())
+ if (!internalQueue && message.isDurable() && durable &&
!reference.isPaged())
{
storageManager.updateDeliveryCount(reference);
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-03-24
15:37:58 UTC (rev 10359)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-03-24
18:00:09 UTC (rev 10360)
@@ -265,7 +265,7 @@
// the updateDeliveryCount would still be updated after c
if (strictUpdateDeliveryCount)
{
- if (ref.getMessage().isDurable() && ref.getQueue().isDurable())
+ if (ref.getMessage().isDurable() && ref.getQueue().isDurable()
&& !ref.getQueue().isInternalQueue())
{
storageManager.updateDeliveryCount(ref);
}