[jboss-cvs] JBossCache/src/org/jboss/cache/buddyreplication ...
Manik Surtani
msurtani at jboss.com
Sat Dec 9 10:14:46 EST 2006
User: msurtani
Date: 06/12/09 10:14:46
Modified: src/org/jboss/cache/buddyreplication Tag:
Branch_JBossCache_1_4_0 BuddyManager.java
Log:
Refactored for more correct behaviour
Revision Changes Path
No revision
No revision
1.33.2.8 +848 -814 JBossCache/src/org/jboss/cache/buddyreplication/BuddyManager.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: BuddyManager.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/buddyreplication/BuddyManager.java,v
retrieving revision 1.33.2.7
retrieving revision 1.33.2.8
diff -u -b -r1.33.2.7 -r1.33.2.8
--- BuddyManager.java 10 Nov 2006 20:30:24 -0000 1.33.2.7
+++ BuddyManager.java 9 Dec 2006 15:14:46 -0000 1.33.2.8
@@ -32,10 +32,12 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
import java.util.Vector;
/**
@@ -72,6 +74,10 @@
*/
Map buddyPool;
/**
+ * The nullBuddyPool is a set of addresses that have not specified buddy pools.
+ */
+ Set nullBuddyPool;
+ /**
* Name of the buddy pool for current instance. May be null if buddy pooling is not used.
*/
String buddyPoolName;
@@ -86,7 +92,6 @@
* Needs to deal with concurrent access - concurrent assignTo/removeFrom buddy grp
*/
Map buddyGroupsIParticipateIn = new ConcurrentReaderHashMap();
-// Map buddyGroupsIParticipateIn = new HashMap();
/**
* Queue to deal with queued up view change requests - which are handled asynchronously
@@ -119,7 +124,11 @@
* Flag to prevent us receiving and processing remote calls before we've started
*/
private boolean initialised = false;
-// private Latch initLatch = new Latch();
+
+ /**
+ * Lock to synchronise on to ensure buddy pool info is received before buddies are assigned to groups.
+ */
+ private final Object poolInfoNotifierLock = new Object();
public BuddyManager(Element element)
{
@@ -138,16 +147,17 @@
}
finally
{
- if (log.isDebugEnabled()) log.debug("Using buddy communication timeout of " + buddyCommunicationTimeout + " millis");
+ if (log.isDebugEnabled())
+ log.debug("Using buddy communication timeout of " + buddyCommunicationTimeout + " millis");
}
buddyPoolName = XmlHelper.readStringContents(element, "buddyPoolName");
if (buddyPoolName != null && buddyPoolName.equals("")) buddyPoolName = null;
// initialise buddy pool map
- if (buddyPoolName != null) buddyPool = new ConcurrentReaderHashMap();
+ buddyPool = new ConcurrentReaderHashMap();
+ nullBuddyPool = new HashSet();
// now read the buddy locator details and create accordingly.
-
String buddyLocatorClass = null;
Properties buddyLocatorProperties = null;
try
@@ -206,21 +216,23 @@
public void init(TreeCache cache) throws Exception
{
- log.debug("Starting buddy manager");
this.cache = cache;
+ final Object localAddress = cache.getLocalAddress();
buddyGroup = new BuddyGroup();
- buddyGroup.setDataOwner((IpAddress) cache.getLocalAddress());
- buddyGroup.setGroupName(getGroupNameFromAddress(cache.getLocalAddress()));
+ buddyGroup.setDataOwner((IpAddress) localAddress);
+ buddyGroup.setGroupName(getGroupNameFromAddress(localAddress));
+ log.debug("Starting buddy manager for data owner " + buddyGroup.getDataOwner());
+
if (buddyPoolName != null)
{
buddyPool.put(buddyGroup.getDataOwner(), buddyPoolName);
}
+ broadcastBuddyPoolMembership();
+
// allow waiting threads to process.
initialised = true;
- broadcastBuddyPoolMembership();
-// initLatch.release();
// register a TreeCache Listener to reassign buddies as and when view changes occur
@@ -264,19 +276,11 @@
{
Vector newMembers = newView.getMembers();
- // the whole 'oldMembers' concept is only used for buddy pool announcements.
- if (buddyPoolName == null)
- {
- enqueueViewChange(null, newMembers);
- }
- else
- {
enqueueViewChange(oldMembers == null ? null : new Vector(oldMembers), new Vector(newMembers));
if (oldMembers == null) oldMembers = new Vector();
oldMembers.clear();
oldMembers.addAll(newMembers);
}
- }
});
// assign buddies based on what we know now
@@ -389,9 +393,19 @@
{
if (log.isDebugEnabled())
{
- log.debug("Received announcement that cache instance " + address + " is in buddy pool " + poolName);
+ log.debug(buddyGroup.getDataOwner() + ": received announcement that cache instance " + address + " is in buddy pool " + poolName);
+ }
+ if (poolName != null)
+ buddyPool.put(address, poolName);
+ else
+ nullBuddyPool.add(address);
+
+ // notify any waiting view change threads that buddy pool info has been received.
+ synchronized (poolInfoNotifierLock)
+ {
+ log.trace("Notifying any waiting view change threads that we have received buddy pool info.");
+ poolInfoNotifierLock.notifyAll();
}
- if (poolName != null) buddyPool.put(address, poolName);
}
/**
@@ -400,7 +414,6 @@
*/
public void handleRemoveFromBuddyGroup(String groupName) throws BuddyNotInitException
{
- //waitForInit();
if (!initialised) throw new BuddyNotInitException("Not yet initialised");
if (log.isInfoEnabled()) log.info("Removing self from buddy group " + groupName);
buddyGroupsIParticipateIn.remove(groupName);
@@ -427,6 +440,8 @@
*/
public void handleAssignToBuddyGroup(BuddyGroup newGroup, Map state) throws Exception
{
+ if (log.isTraceEnabled())
+ log.trace("Handling assign to buddy grp. Sender: " + newGroup.getGroupName() + "; My instance: " + buddyGroup.getDataOwner());
// if we haven't initialised, throw an exception.
if (!initialised) throw new BuddyNotInitException("Not yet initialised");
@@ -442,7 +457,7 @@
marshaller = cache.getMarshaller();
}
- for (Iterator it = state.entrySet().iterator(); it.hasNext(); )
+ for (Iterator it = state.entrySet().iterator(); it.hasNext();)
{
Map.Entry entry = (Map.Entry) it.next();
Fqn fqn = (Fqn) entry.getKey();
@@ -510,14 +525,12 @@
{
return transformFqns(call, call.getMethodId() != MethodDeclarations.dataGravitationCleanupMethod_id);
}
+
public JBCMethodCall transformFqns(JBCMethodCall call, boolean transformForCurrentCall)
{
if (call != null && call.getArgs() != null)
{
JBCMethodCall call2 = new JBCMethodCall(call.getMethod(), (Object[]) call.getArgs().clone(), call.getMethodId());
-// call2.setId(call.getId());
-// call2.setMethod(call.getMethod());
-// call2.setArgs((Object[]) call.getArgs().clone());
handleArgs(call2.getArgs(), transformForCurrentCall);
return call2;
}
@@ -547,7 +560,7 @@
{
try
{
- makeRemoteCall(buddies, replicateCall);
+ makeRemoteCall(buddies, replicateCall, true);
break;
}
catch (Exception e)
@@ -576,12 +589,6 @@
private void addBuddies(List buddies) throws Exception
{
- // this check is redundant - if buddies is empty this method will not be called. - Manik
-
-// if (buddies.size() == 0)
-// return;
-
-
if (log.isInfoEnabled())
{
log.info("Assigning new buddies to buddy group [" + buddyGroup.getGroupName() + "]. New buddies are " + buddies);
@@ -638,7 +645,7 @@
{
try
{
- makeRemoteCall(buddies, replicateCall);
+ makeRemoteCall(buddies, replicateCall, true);
break;
}
catch (Exception e)
@@ -663,7 +670,8 @@
}
- log.trace("addToGroup notification complete");
+ if (log.isTraceEnabled())
+ log.trace("addToGroup notification complete (data owner " + buddyGroup.getDataOwner() + ")");
}
private byte[] acquireState(Fqn fqn) throws Exception
@@ -746,7 +754,7 @@
try
{
- makeRemoteCall(recipients, replicateCall);
+ makeRemoteCall(recipients, replicateCall, true);
}
catch (Exception e)
{
@@ -754,7 +762,7 @@
}
}
- private void makeRemoteCall(List recipients, MethodCall call) throws Exception
+ private void makeRemoteCall(List recipients, MethodCall call, boolean sync) throws Exception
{
// remove non-members from dest list
if (recipients != null)
@@ -771,7 +779,7 @@
}
}
- cache.callRemoteMethods(recipients, call, true, true, buddyCommunicationTimeout);
+ cache.callRemoteMethods(recipients, call, sync, true, buddyCommunicationTimeout);
}
@@ -812,6 +820,7 @@
/**
* Assumes the backup Fqn if the current instance is the data owner
+ *
* @param originalFqn
* @return backup fqn
*/
@@ -892,24 +901,49 @@
private void handleEnqueuedViewChange() throws Exception
{
- log.trace("Handling queued view change");
+ log.trace("Waiting for enqueued view change events");
List[] members = (List[]) queue.take(); // 2 element array - 0 - oldMembers, 1 - newMembers
+ // make sure new buddies have broadcast their pool memberships.
+ while (!buddyPoolInfoAvailable(members[1]))
+ {
+ synchronized (poolInfoNotifierLock)
+ {
+ log.trace("Not received necessary buddy pool info for all new members yet; waiting on poolInfoNotifierLock.");
+ poolInfoNotifierLock.wait();
+ }
+ }
+
+ log.trace("Broadcasting pool membership details, triggered by view change.");
+ if (members[0] == null)
+ broadcastBuddyPoolMembership();
+ else
+ {
+ List delta = new ArrayList();
+ delta.addAll(members[1]);
+ delta.removeAll(members[0]);
+ broadcastBuddyPoolMembership(delta);
+ }
+
// always refresh buddy list.
reassignBuddies(members[1]);
+ }
- if (buddyPoolName != null)
+ private boolean buddyPoolInfoAvailable(List newMembers)
{
- log.trace("Broadcasting pool membership details, triggered by view change.");
- broadcastBuddyPoolMembership();
-// if (members[0] == null)
-// broadcastBuddyPoolMembership();
-// else
-// {
-// members[1].removeAll(members[0]);
-// broadcastBuddyPoolMembership(members[1]);
-// }
+ boolean infoReceived = true;
+ Iterator i = newMembers.iterator();
+ while (i.hasNext())
+ {
+ Object address = i.next();
+ infoReceived = infoReceived && (address.equals(cache.getLocalAddress()) || buddyPool.keySet().contains(address) || nullBuddyPool.contains(address));
}
+
+ log.trace(buddyGroup.getDataOwner() + " received buddy pool info for new members " + newMembers + "? " + infoReceived);
+
+ return infoReceived;
}
+
}
}
+
More information about the jboss-cvs-commits
mailing list