[jboss-cvs] JBoss Messaging SVN: r3239 - in trunk/src/main/org/jboss: messaging/core/impl/postoffice and 1 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Oct 25 08:37:55 EDT 2007
Author: timfox
Date: 2007-10-25 08:37:55 -0400 (Thu, 25 Oct 2007)
New Revision: 3239
Modified:
trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
trunk/src/main/org/jboss/messaging/util/ClearableSemaphore.java
Log:
http://jira.jboss.com/jira/browse/JBMESSAGING-1116
Modified: trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java 2007-10-21 23:29:00 UTC (rev 3238)
+++ trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java 2007-10-25 12:37:55 UTC (rev 3239)
@@ -25,6 +25,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -38,8 +39,8 @@
import org.jboss.messaging.core.contract.ClusterNotification;
import org.jboss.messaging.core.contract.ClusterNotificationListener;
import org.jboss.messaging.core.contract.Replicator;
+import org.jboss.messaging.util.ConcurrentHashSet;
import org.jboss.messaging.util.Util;
-import org.jboss.messaging.util.ConcurrentHashSet;
import org.jboss.remoting.Client;
import org.jboss.remoting.ClientDisconnectedException;
import org.jboss.remoting.ConnectionListener;
@@ -70,7 +71,7 @@
private Map<String, String> remotingSessions;
// Set<ConnectionEndpoint>
- private Set activeConnectionEndpoints;
+ private Set<ConnectionEndpoint> activeConnectionEndpoints;
private Map</** CFUniqueName*/ String, ConnectionFactoryCallbackInformation> cfCallbackInfo;
@@ -80,14 +81,41 @@
public SimpleConnectionManager()
{
- jmsClients = new HashMap();
- remotingSessions = new HashMap();
- activeConnectionEndpoints = new HashSet();
+ jmsClients = new HashMap<String, Map<String, ConnectionEndpoint>>();
+ remotingSessions = new HashMap<String, String>();
+ activeConnectionEndpoints = new HashSet<ConnectionEndpoint>();
cfCallbackInfo = new ConcurrentHashMap<String, ConnectionFactoryCallbackInformation>();
}
// ConnectionManager implementation -------------------------------------------------------------
+ private void dump()
+ {
+ log.debug("***********Dumping conn map");
+ for (Iterator iter = jmsClients.entrySet().iterator(); iter.hasNext(); )
+ {
+ Map.Entry entry = (Map.Entry)iter.next();
+
+ String jmsClientVMID = (String)entry.getKey();
+
+ Map endpoints = (Map)entry.getValue();
+
+ log.debug(jmsClientVMID + "----->");
+
+ for (Iterator iter2 = endpoints.entrySet().iterator(); iter2.hasNext(); )
+ {
+ Map.Entry entry2 = (Map.Entry)iter2.next();
+
+ String sessionID = (String)entry2.getKey();
+
+ ConnectionEndpoint endpoint = (ConnectionEndpoint)entry2.getValue();
+
+ log.debug(" " + sessionID + "------>" + System.identityHashCode(endpoint));
+ }
+ }
+ log.debug("*** Dumped conn map");
+ }
+
public synchronized void registerConnection(String jmsClientVMID,
String remotingClientSessionID,
ConnectionEndpoint endpoint)
@@ -96,7 +124,8 @@
if (endpoints == null)
{
- endpoints = new HashMap();
+ endpoints = new HashMap<String, ConnectionEndpoint>();
+
jmsClients.put(jmsClientVMID, endpoints);
}
@@ -108,6 +137,8 @@
log.debug("registered connection " + endpoint + " as " +
Util.guidToString(remotingClientSessionID));
+
+ dump();
}
public synchronized ConnectionEndpoint unregisterConnection(String jmsClientVMId,
@@ -143,7 +174,7 @@
public synchronized List getActiveConnections()
{
// I will make a copy to avoid ConcurrentModification
- ArrayList list = new ArrayList();
+ List<ConnectionEndpoint> list = new ArrayList<ConnectionEndpoint>();
list.addAll(activeConnectionEndpoints);
return list;
}
@@ -229,6 +260,8 @@
log.trace("SimpleConnectionManager was notified about node leaving from node " +
notification.nodeID);
+
+ dump();
try
{
//We remove any consumers with the same JVMID as the node that just failed
@@ -249,7 +282,7 @@
int failedNodeID = notification.nodeID;
String clientVMID = (String)ids.get(new Integer(failedNodeID));
-
+
if (clientVMID == null)
{
log.error("Cannot find ClientVMID for failed node " + failedNodeID);
@@ -342,9 +375,9 @@
if (endpoints != null)
{
- List<ConnectionEndpoint> sces = new ArrayList();
+ List<ConnectionEndpoint> sces = new ArrayList<ConnectionEndpoint>();
- for(Map.Entry<String, ConnectionEndpoint> entry: endpoints.entrySet())
+ for (Map.Entry<String, ConnectionEndpoint> entry: endpoints.entrySet())
{
ConnectionEndpoint sce = entry.getValue();
sces.add(sce);
Modified: trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2007-10-21 23:29:00 UTC (rev 3238)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2007-10-25 12:37:55 UTC (rev 3239)
@@ -108,6 +108,8 @@
public static final String FAILOVER_COMPLETED_NOTIFICATION = "FAILOVER_COMPLETED";
+ private static final long SEMAPHORE_ACQUIRE_TIMEOUT = 10000;
+
//End only used in testing
// Static ---------------------------------------------------------------------------------------
@@ -351,8 +353,10 @@
//calculate the failover map
calculateFailoverMap();
+ String clientVMId = JMSClientVMIdentifier.instance;
+
//add our vm identifier to the replicator
- put(Replicator.JVM_ID_KEY, JMSClientVMIdentifier.instance);
+ put(Replicator.JVM_ID_KEY, clientVMId);
groupMember.multicastControl(new JoinClusterRequest(thisNodeID, info), true);
@@ -619,8 +623,15 @@
if (reply)
{
- //replicateSemaphore.tryAcquire(250, TimeUnit.MILLISECONDS);
- replicateSemaphore.acquire();
+ //We timeout to avoid locking the system in event of failure
+ boolean ok = replicateSemaphore.tryAcquire(SEMAPHORE_ACQUIRE_TIMEOUT);
+
+ if (!ok)
+ {
+ log.warn("Timed out trying to acquire replication semaphore");
+
+ return;
+ }
}
try
@@ -886,7 +897,7 @@
public void nodesLeft(List addresses) throws Throwable
{
if (trace) { log.trace("Nodes left " + addresses.size()); }
-
+
checkStartReaper();
Map oldFailoverMap = new HashMap(this.failoverMap);
@@ -898,6 +909,13 @@
calculateFailoverMap();
if (trace) { log.trace("First node is now " + firstNode); }
+
+ if (firstNode)
+ {
+ //If we are now the first node in the cluster then any outstanding replication requests will not get responses
+ //so we must release these and we have no more need of a semaphore until another node joins
+ replicateSemaphore.disable();
+ }
Iterator iter = addresses.iterator();
@@ -1091,6 +1109,12 @@
calculateFailoverMap();
+ if (wasFirstNode)
+ {
+ //If we were the first node but now another node has joined - we need to re-enable the semaphore
+ replicateSemaphore.enable();
+ }
+
//Note - when a node joins, we DO NOT send it replicated data - this is because it won't have deployed it's queues
//the data is requested by the new node when it deploys its queues
@@ -2780,8 +2804,6 @@
if (trace) { log.trace("Failover node has changed from " + oldFailoverNodeID + " to " + failoverNodeID); }
- replicateSemaphore.reset();
-
if (!firstNode)
{
//If the old node still exists we need to send a message to remove any replicated deliveries
Modified: trunk/src/main/org/jboss/messaging/util/ClearableSemaphore.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/ClearableSemaphore.java 2007-10-21 23:29:00 UTC (rev 3238)
+++ trunk/src/main/org/jboss/messaging/util/ClearableSemaphore.java 2007-10-25 12:37:55 UTC (rev 3239)
@@ -1,6 +1,7 @@
package org.jboss.messaging.util;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
import org.jboss.logging.Logger;
@@ -42,25 +43,47 @@
}
}
+ public boolean tryAcquire(long timeout) throws InterruptedException
+ {
+ Semaphore sem = semaphore;
+
+ if (sem != null)
+ {
+ return sem.tryAcquire(timeout, TimeUnit.MILLISECONDS);
+ }
+ else
+ {
+ return true;
+ }
+ }
+
public void release()
{
Semaphore sem = semaphore;
if (sem != null)
{
- sem.release();
+ sem.release();
}
}
- public synchronized void reset()
+ public synchronized void disable()
{
if (semaphore != null)
{
Semaphore oldSem = semaphore;
- createSemaphore();
+ semaphore = null;
oldSem.release(permits);
}
}
+
+ public synchronized void enable()
+ {
+ if (semaphore == null)
+ {
+ createSemaphore();
+ }
+ }
}
More information about the jboss-cvs-commits
mailing list