[jboss-cvs] JBoss Messaging SVN: r3226 - in trunk/src/main/org/jboss: messaging/core/impl and 1 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Sat Oct 20 13:35:44 EDT 2007
Author: timfox
Date: 2007-10-20 13:35:44 -0400 (Sat, 20 Oct 2007)
New Revision: 3226
Modified:
trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
trunk/src/main/org/jboss/messaging/core/impl/MessagingQueue.java
trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
Log:
Make the semaphore clearable
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-10-20 16:13:52 UTC (rev 3225)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-10-20 17:35:44 UTC (rev 3226)
@@ -1033,9 +1033,12 @@
{
if (!delivered)
{
- //We have to wait for another response to arrive first
-
- throw new IllegalStateException("Responses have come back our of order");
+ // Resonpse has come back out of order - this can happen when the failover node is being changed
+ // E.g. failover node changes, replicates start getting sent to the new failover node,
+ // then the new node requests to collect the deliveries from this node, at which point we deliver
+ // all waiting deliveries. Then the responses to the original ones come back.
+ // So we can ignore them
+ break;
}
else
{
Modified: trunk/src/main/org/jboss/messaging/core/impl/MessagingQueue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/MessagingQueue.java 2007-10-20 16:13:52 UTC (rev 3225)
+++ trunk/src/main/org/jboss/messaging/core/impl/MessagingQueue.java 2007-10-20 17:35:44 UTC (rev 3226)
@@ -466,13 +466,7 @@
if (trace) { log.trace("Adding all from recovery area for node " + nodeID +" set " + ids); }
Integer nid = new Integer(nodeID);
-
- //Sanity check
- if (recoveryArea.get(nid) != null)
- {
- throw new IllegalStateException("There are already message ids for node " + nodeID);
- }
-
+
if (!(ids instanceof ConcurrentHashMap))
{
ids = new ConcurrentHashMap(ids);
Modified: trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2007-10-20 16:13:52 UTC (rev 3225)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2007-10-20 17:35:44 UTC (rev 3226)
@@ -75,9 +75,9 @@
import org.jboss.messaging.core.impl.tx.Transaction;
import org.jboss.messaging.core.impl.tx.TransactionRepository;
import org.jboss.messaging.core.impl.tx.TxCallback;
+import org.jboss.messaging.util.ClearableSemaphore;
import org.jboss.messaging.util.ConcurrentHashSet;
import org.jboss.messaging.util.StreamUtils;
-import org.jboss.messaging.util.Throttle;
import org.jgroups.Address;
import org.jgroups.View;
@@ -224,12 +224,10 @@
//We keep use a semaphore to limit the number of concurrent replication requests to avoid
//overwhelming JGroups
- //private Semaphore replicateSemaphore;
+ private ClearableSemaphore replicateSemaphore;
- //private int maxConcurrentReplications;
+ private int maxConcurrentReplications;
- private Throttle throttle = new Throttle(5000, 5);
-
// Constructors ---------------------------------------------------------------------------------
/*
@@ -311,9 +309,7 @@
nbSupport = new NotificationBroadcasterSupport();
- // this.maxConcurrentReplications = maxConcurrentReplications;
-
- //replicateSemaphore = new Semaphore(maxConcurrentReplications, true);
+ replicateSemaphore = new ClearableSemaphore(maxConcurrentReplications);
}
// MessagingComponent overrides -----------------------------------------------------------------
@@ -459,9 +455,7 @@
return added;
}
-
-
-
+
public Binding removeBinding(String queueName, boolean allNodes) throws Throwable
{
Binding binding = internalRemoveBinding(queueName, allNodes, true);
@@ -628,7 +622,8 @@
if (reply)
{
- //replicateSemaphore.acquire();
+ //replicateSemaphore.tryAcquire(250, TimeUnit.MILLISECONDS);
+ replicateSemaphore.acquire();
}
try
@@ -656,8 +651,6 @@
if (address != null)
{
- throttle.ping();
-
groupMember.unicastData(request, address);
}
}
@@ -665,7 +658,7 @@
{
if (reply)
{
- // replicateSemaphore.release();
+ replicateSemaphore.release();
}
throw e;
@@ -683,8 +676,6 @@
if (address != null)
{
- throttle.ping();
-
groupMember.unicastData(request, address);
}
}
@@ -1290,7 +1281,8 @@
if (binding == null)
{
- throw new IllegalStateException("Cannot find queue with name " + queueName +" has it been deployed?");
+ //This is ok - maybe new failover node but queue is not yet deployed
+ return;
}
Queue queue = binding.queue;
@@ -1306,7 +1298,7 @@
//TODO - this does not belong here
ServerSessionEndpoint session = serverPeer.getSession(sessionID);
- // replicateSemaphore.release();
+ replicateSemaphore.release();
if (session == null)
{
@@ -2788,6 +2780,8 @@
//The failover node has changed - we need to move our replicated deliveries
if (trace) { log.trace("Failover node has changed from " + oldFailoverNodeID + " to " + failoverNodeID); }
+
+ replicateSemaphore.disable();
if (!firstNode)
{
@@ -2836,9 +2830,7 @@
session.deliverAnyWaitingDeliveries(null);
- session.collectDeliveries(deliveries, firstNode, null);
-
- // releaseAndReplaceSemaphore();
+ session.collectDeliveries(deliveries, firstNode, null);
}
if (!firstNode)
@@ -2856,26 +2848,12 @@
if (trace) { log.trace("Sent AddAllReplicatedDeliveriesMessage"); }
}
+
+ replicateSemaphore.enable();
}
}
- }
-
- //Now we replace the semaphore since some of the acks may not come back from the old failover node
- // releaseAndReplaceSemaphore();
+ }
}
-
-// private void releaseAndReplaceSemaphore()
-// {
-// if (replicateSemaphore != null)
-// {
-// Semaphore oldSem = replicateSemaphore;
-//
-// replicateSemaphore = new Semaphore(maxConcurrentReplications);
-//
-// oldSem.release(maxConcurrentReplications);
-// }
-// }
-//
/**
* This method fails over all the queues from node <failedNodeId> onto this node. It is triggered
@@ -3066,9 +3044,9 @@
if (session.collectDeliveries(dels, firstNode, queueName))
{
gotSome = true;
- }
-
- // releaseAndReplaceSemaphore();
+ }
+ //Release them all
+ replicateSemaphore.enable();
}
if (gotSome)
More information about the jboss-cvs-commits
mailing list