[jboss-cvs] JBoss Messaging SVN: r8348 - in branches/Branch_JBossMessaging_1_4_0_SP3_CP09_JBMESSAGING-1801_JBMESSAGING-1805_JBMESSAGING-1851_JBMESSAGING-1864: src/main/org/jboss/messaging/core/impl/postoffice and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Jun 16 14:12:20 EDT 2011
Author: jbertram
Date: 2011-06-16 14:12:20 -0400 (Thu, 16 Jun 2011)
New Revision: 8348
Modified:
branches/Branch_JBossMessaging_1_4_0_SP3_CP09_JBMESSAGING-1801_JBMESSAGING-1805_JBMESSAGING-1851_JBMESSAGING-1864/
branches/Branch_JBossMessaging_1_4_0_SP3_CP09_JBMESSAGING-1801_JBMESSAGING-1805_JBMESSAGING-1851_JBMESSAGING-1864/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
Log:
JBPAPP-6588
Property changes on: branches/Branch_JBossMessaging_1_4_0_SP3_CP09_JBMESSAGING-1801_JBMESSAGING-1805_JBMESSAGING-1851_JBMESSAGING-1864
___________________________________________________________________
Modified: svn:mergeinfo
- /branches/Branch_1_4:8010,8013,8238,8245
+ /branches/Branch_1_4:8010,8013,8238,8245,8305
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP09_JBMESSAGING-1801_JBMESSAGING-1805_JBMESSAGING-1851_JBMESSAGING-1864/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP09_JBMESSAGING-1801_JBMESSAGING-1805_JBMESSAGING-1851_JBMESSAGING-1864/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2011-06-16 12:14:35 UTC (rev 8347)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP09_JBMESSAGING-1801_JBMESSAGING-1805_JBMESSAGING-1851_JBMESSAGING-1864/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2011-06-16 18:12:20 UTC (rev 8348)
@@ -238,6 +238,9 @@
private boolean failoverOnNodeLeave;
+ //https://issues.jboss.org/browse/JBMESSAGING-1864
+ private Object failoverLock = new Object();
+
// Constructors ---------------------------------------------------------------------------------
public boolean isFailoverOnNodeLeave()
@@ -462,27 +465,30 @@
public boolean addBinding(Binding binding, boolean allNodes) throws Exception
{
- if (allNodes && !binding.queue.isClustered())
- {
- throw new IllegalArgumentException("Cannot bind a non clustered queue on all nodes");
- }
-
- boolean added = internalAddBinding(binding, allNodes, true);
-
- if (added && allNodes && clustered && binding.queue.isClustered())
- {
- //Now we must wait for all the bindings to appear in state
- //This is necessary since the second bind in an all bind is sent asynchronously to avoid deadlock
-
- waitForBindUnbind(binding.queue.getName(), true);
- }
-
- if (added)
- {
- requestDeliveries(binding.queue);
- }
-
- return added;
+ synchronized (failoverLock)
+ {
+ if (allNodes && !binding.queue.isClustered())
+ {
+ throw new IllegalArgumentException("Cannot bind a non clustered queue on all nodes");
+ }
+
+ boolean added = internalAddBinding(binding, allNodes, true);
+
+ if (added && allNodes && clustered && binding.queue.isClustered())
+ {
+ // Now we must wait for all the bindings to appear in state
+ // This is necessary since the second bind in an all bind is sent asynchronously to avoid deadlock
+
+ waitForBindUnbind(binding.queue.getName(), true);
+ }
+
+ if (added)
+ {
+ requestDeliveries(binding.queue);
+ }
+
+ return added;
+ }
}
public Binding removeBinding(String queueName, boolean allNodes) throws Throwable
@@ -3114,133 +3120,141 @@
pm.mergeTransactions(failedNodeID.intValue(), thisNodeID);
- // Need to lock
- boolean intr = Thread.interrupted();
- for (;;)
+ synchronized (failoverLock)
{
- try
+
+ // Need to lock
+ boolean intr = Thread.interrupted();
+ for (;;)
{
- lock.writeLock().acquire();
- break;
+ try
+ {
+ lock.writeLock().acquire();
+ break;
+ }
+ catch (InterruptedException ex)
+ {
+ intr = true;
+ }
}
- catch (InterruptedException ex)
+
+ try
{
- intr = true;
- }
- }
+ Map nameMap = (Map)nameMaps.get(failedNodeID);
- try
- {
- Map nameMap = (Map)nameMaps.get(failedNodeID);
-
- List toRemove = new ArrayList();
-
- if (nameMap != null)
- {
- Iterator iter = nameMap.values().iterator();
-
- while (iter.hasNext())
- {
- Binding binding = (Binding)iter.next();
-
- Queue queue = binding.queue;
-
- if (queue.isRecoverable() && queue.getNodeID() == failedNodeID.intValue())
- {
- toRemove.add(binding);
- }
- }
- }
-
- Iterator iter = toRemove.iterator();
+ List toRemove = new ArrayList();
- while (iter.hasNext())
- {
- Binding binding = (Binding)iter.next();
-
- Condition condition = binding.condition;
-
- Queue queue = binding.queue;
-
- // Sanity check
- if (!queue.isRecoverable())
+ if (nameMap != null)
{
- throw new IllegalStateException("Found non recoverable queue " +
- queue.getName() + " in map, these should have been removed!");
+ Iterator iter = nameMap.values().iterator();
+
+ while (iter.hasNext())
+ {
+ Binding binding = (Binding)iter.next();
+
+ Queue queue = binding.queue;
+
+ if (queue.isRecoverable() && queue.getNodeID() == failedNodeID.intValue())
+ {
+ toRemove.add(binding);
+ }
+ }
}
- // Sanity check
- if (!queue.isClustered())
+ Iterator iter = toRemove.iterator();
+
+ while (iter.hasNext())
{
- throw new IllegalStateException("Queue " + queue.getName() + " is not clustered!");
+ Binding binding = (Binding)iter.next();
+
+ Condition condition = binding.condition;
+
+ Queue queue = binding.queue;
+
+ // Sanity check
+ if (!queue.isRecoverable())
+ {
+ throw new IllegalStateException("Found non recoverable queue " + queue.getName() +
+ " in map, these should have been removed!");
+ }
+
+ // Sanity check
+ if (!queue.isClustered())
+ {
+ throw new IllegalStateException("Queue " + queue.getName() + " is not clustered!");
+ }
+
+ // Remove from the in-memory map - no need to broadcast anything - they will get removed from other nodes
+ // in memory
+ // maps when the other nodes detect failure
+ removeBindingInMemory(binding.queue.getNodeID(), binding.queue.getName());
+
+ // Find if there is a local queue with the same name
+
+ Queue localQueue = null;
+
+ if (localNameMap != null)
+ {
+ Binding b = (Binding)localNameMap.get(queue.getName());
+ if (b != null)
+ {
+ localQueue = b.queue;
+ }
+ }
+
+ if (localQueue != null)
+ {
+ // need to merge the queues
+
+ log.debug(this + " has already a queue: " + queue.getName() + " queue so merging queues");
+
+ localQueue.mergeIn(queue.getChannelID(), failedNodeID.intValue());
+
+ log.debug("Merged queue");
+
+ // Delete from storage
+
+ // Note we must do this *after* we have done any merge.
+ // This is because if we did it first, then the merge failed, we'd be left with the old channel
+ // deleted
+ // but the messages would have still be in the old channel
+ // meaning they would have disappeared from the users point of view and it would involve manual
+ // database intervention to correct it
+ // See http://jira.jboss.com/jira/browse/JBMESSAGING-1113
+
+ deleteBindingFromStorage(queue);
+
+ log.debug(this + " deleted binding for " + queue.getName());
+ }
+ else
+ {
+ // Cannot failover if there is no queue deployed.
+
+ log.warn("Cannot failover " + queue.getName() +
+ " since it does not exist on this node. " +
+ "You must deploy your clustered destinations on ALL nodes of the cluster");
+ }
+
+ // Note we do not need to send an unbind request across the cluster - this is because
+ // when the node crashes a view change will hit the other nodes and that will cause
+ // all binding data for that node to be removed anyway.
}
-
- //Remove from the in-memory map - no need to broadcast anything - they will get removed from other nodes in memory
- //maps when the other nodes detect failure
- removeBindingInMemory(binding.queue.getNodeID(), binding.queue.getName());
-
- //Find if there is a local queue with the same name
-
- Queue localQueue = null;
-
- if (localNameMap != null)
- {
- Binding b = (Binding)localNameMap.get(queue.getName());
- if (b != null)
- {
- localQueue = b.queue;
- }
- }
-
- if (localQueue != null)
- {
- //need to merge the queues
-
- log.debug(this + " has already a queue: " + queue.getName() + " queue so merging queues");
-
- localQueue.mergeIn(queue.getChannelID(), failedNodeID.intValue());
-
- log.debug("Merged queue");
-
- //Delete from storage
-
- //Note we must do this *after* we have done any merge.
- //This is because if we did it first, then the merge failed, we'd be left with the old channel deleted
- //but the messages would have still be in the old channel
- //meaning they would have disappeared from the users point of view and it would involve manual
- //database intervention to correct it
- //See http://jira.jboss.com/jira/browse/JBMESSAGING-1113
-
- deleteBindingFromStorage(queue);
-
- log.debug(this + " deleted binding for " + queue.getName());
- }
- else
- {
- //Cannot failover if there is no queue deployed.
-
- log.warn("Cannot failover " + queue.getName() + " since it does not exist on this node. " +
- "You must deploy your clustered destinations on ALL nodes of the cluster");
- }
- // Note we do not need to send an unbind request across the cluster - this is because
- // when the node crashes a view change will hit the other nodes and that will cause
- // all binding data for that node to be removed anyway.
+ log.debug(this + ": server side fail over is now complete");
}
+ finally
+ {
+ lock.writeLock().release();
+ if (intr)
+ Thread.currentThread().interrupt();
+ }
- log.debug(this + ": server side fail over is now complete");
+ // Now clean the data for the failed node
+
+ // TODO - does this need to be inside the lock above?
+ cleanDataForNode(failedNodeID);
}
- finally
- {
- lock.writeLock().release();
- if (intr) Thread.currentThread().interrupt();
- }
-
- //Now clean the data for the failed node
-
- //TODO - does this need to be inside the lock above?
- cleanDataForNode(failedNodeID);
-
+
log.debug(this + " announcing that failover procedure is complete");
notification = new ClusterNotification(ClusterNotification.TYPE_FAILOVER_END, failedNodeID.intValue(), null);
More information about the jboss-cvs-commits
mailing list