[jboss-cvs] JBossCache/src/org/jboss/cache/buddyreplication ...
Manik Surtani
msurtani at jboss.com
Thu Dec 21 11:19:33 EST 2006
User: msurtani
Date: 06/12/21 11:19:33
Modified: src/org/jboss/cache/buddyreplication BuddyLocator.java
BuddyManager.java BuddyGroup.java
NextMemberBuddyLocator.java
Log:
refactored
Revision Changes Path
1.5 +9 -8 JBossCache/src/org/jboss/cache/buddyreplication/BuddyLocator.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: BuddyLocator.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/buddyreplication/BuddyLocator.java,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -b -r1.4 -r1.5
--- BuddyLocator.java 23 Oct 2006 05:46:39 -0000 1.4
+++ BuddyLocator.java 21 Dec 2006 16:19:33 -0000 1.5
@@ -8,7 +8,7 @@
import org.jboss.cache.config.BuddyReplicationConfig;
import org.jboss.cache.config.BuddyReplicationConfig.BuddyLocatorConfig;
-import org.jgroups.stack.IpAddress;
+import org.jgroups.Address;
import java.util.List;
import java.util.Map;
@@ -36,6 +36,7 @@
/**
* Initialize this <code>BuddyLocator</code>.
+ *
* @param config configuration for this <code>BuddyLocator</code>. May be
* <code>null</code>, in which case the implementation should
* use its default configuration.
@@ -46,17 +47,17 @@
* Choose a set of buddies for the given node. Invoked when a change in
* cluster membership is detected.
*
- * @param buddyPoolMap Map<IpAddress, String> mapping nodes in the cluster to
+ * @param buddyPoolMap Map<Address, String> mapping nodes in the cluster to
* the "buddy pool" they have identified themselves as
* belonging too. A BuddyLocator implementation can use
* this information to preferentially assign buddies from
* the same buddy pool as <code>dataOwner</code>. May be
* <code>null</code> if buddy pools aren't configured.
- * @param currentMembership List<IpAddress> of the current cluster members
- * @param dataOwner IpAddress of the node for which buddies should be selected
- * @return List<IpAddress> of the nodes that should serve as buddies for
+ * @param currentMembership List<Address> of the current cluster members
+ * @param dataOwner Address of the node for which buddies should be selected
+ * @return List<Address> of the nodes that should serve as buddies for
* <code>dataOwner</code>. Will not be <code>null</code>, may
* be empty.
*/
- public List<IpAddress> locateBuddies(Map<IpAddress, String> buddyPoolMap, List<IpAddress> currentMembership, IpAddress dataOwner);
+ public List<Address> locateBuddies(Map<Address, String> buddyPoolMap, List<Address> currentMembership, Address dataOwner);
}
1.52 +111 -57 JBossCache/src/org/jboss/cache/buddyreplication/BuddyManager.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: BuddyManager.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/buddyreplication/BuddyManager.java,v
retrieving revision 1.51
retrieving revision 1.52
diff -u -b -r1.51 -r1.52
--- BuddyManager.java 21 Dec 2006 15:14:16 -0000 1.51
+++ BuddyManager.java 21 Dec 2006 16:19:33 -0000 1.52
@@ -24,21 +24,22 @@
import org.jboss.util.stream.MarshalledValueInputStream;
import org.jgroups.Address;
import org.jgroups.View;
-import org.jgroups.stack.IpAddress;
import java.io.ByteArrayInputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.Vector;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* Class that manages buddy replication groups.
@@ -72,19 +73,24 @@
/**
* Map of buddy pools received from broadcasts
*/
- Map<IpAddress, String> buddyPool;
+ Map<Address, String> buddyPool = new ConcurrentHashMap<Address, String>();
+
+ /**
+ * The nullBuddyPool is a set of addresses that have not specified buddy pools.
+ */
+ final Set<Address> nullBuddyPool = new HashSet<Address>();
/**
* Map of bddy groups the current instance participates in as a backup node.
* Keyed on String group name, values are BuddyGroup objects.
* Needs to deal with concurrent access - concurrent assignTo/removeFrom buddy grp
*/
- Map<String, BuddyGroup> buddyGroupsIParticipateIn = new ConcurrentHashMap();
+ Map<String, BuddyGroup> buddyGroupsIParticipateIn = new ConcurrentHashMap<String, BuddyGroup>();
/**
* Queue to deal with queued up view change requests - which are handled asynchronously
*/
- private final BlockingQueue<MembershipChange> queue = new LinkedBlockingQueue();
+ private final BlockingQueue<MembershipChange> queue = new LinkedBlockingQueue<MembershipChange>();
/**
* Async thread that handles items on the view change queue
@@ -101,11 +107,17 @@
/**
* number of times to retry communicating with a selected buddy if the buddy has not been initialised.
*/
- private static int UNINIT_BUDDIES_RETRIES = 3;
+ private static int UNINIT_BUDDIES_RETRIES = 5;
/**
* wait time between retries
*/
- private static final long UNINIT_BUDDIES_RETRY_NAPTIME = 500;
+ private static final long[] UNINIT_BUDDIES_RETRY_NAPTIME = {500, 1000, 1500, 2000, 2500};
+
+ /**
+ * Lock to synchronise on to ensure buddy pool info is received before buddies are assigned to groups.
+ */
+ private final Object poolInfoNotifierLock = new Object();
+
/**
* Flag to prevent us receiving and processing remote calls before we've started
@@ -117,9 +129,6 @@
{
this.config = config;
- if (config.getBuddyPoolName() != null)
- buddyPool = new ConcurrentHashMap();
-
BuddyLocatorConfig blc = config.getBuddyLocatorConfig();
try
{
@@ -178,7 +187,7 @@
log.debug("Starting buddy manager");
this.cache = cache;
buddyGroup = new BuddyGroup();
- buddyGroup.setDataOwner((IpAddress) cache.getLocalAddress());
+ buddyGroup.setDataOwner((Address) cache.getLocalAddress());
buddyGroup.setGroupName(getGroupNameFromAddress(cache.getLocalAddress()));
if (config.getBuddyPoolName() != null)
@@ -186,20 +195,19 @@
buddyPool.put(buddyGroup.getDataOwner(), config.getBuddyPoolName());
}
+ broadcastBuddyPoolMembership();
+
// allow waiting threads to process.
initialised = true;
- broadcastBuddyPoolMembership();
-// initLatch.release();
// register a TreeCache Listener to reassign buddies as and when view changes occur
-
cache.getNotifier().addCacheListener(new AbstractCacheListener()
{
- private Vector oldMembers;
+ private Vector<Address> oldMembers;
public void viewChange(View newView)
{
- Vector newMembers = newView.getMembers();
+ Vector<Address> newMembers = newView.getMembers();
// the whole 'oldMembers' concept is only used for buddy pool announcements.
if (config.getBuddyPoolName() == null)
@@ -208,8 +216,8 @@
}
else
{
- enqueueViewChange(oldMembers == null ? null : new Vector(oldMembers), new Vector(newMembers));
- if (oldMembers == null) oldMembers = new Vector();
+ enqueueViewChange(oldMembers == null ? null : new Vector<Address>(oldMembers), new Vector<Address>(newMembers));
+ if (oldMembers == null) oldMembers = new Vector<Address>();
oldMembers.clear();
oldMembers.addAll(newMembers);
}
@@ -243,17 +251,19 @@
// -------------- methods to be called by the tree cache listener --------------------
- static class MembershipChange {
- List oldMembers;
- List newMembers;
- public MembershipChange(List oldMembers, List newMembers)
+ static class MembershipChange
+ {
+ List<Address> oldMembers;
+ List<Address> newMembers;
+
+ public MembershipChange(List<Address> oldMembers, List<Address> newMembers)
{
this.oldMembers = oldMembers;
this.newMembers = newMembers;
}
}
- private void enqueueViewChange(List oldMembers, List newMembers)
+ private void enqueueViewChange(List<Address> oldMembers, List<Address> newMembers)
{
// put this on a queue
try
@@ -273,7 +283,7 @@
* have been added. Makes use of the BuddyLocator and then
* makes RPC calls to remote nodes to assign/remove buddies.
*/
- private void reassignBuddies(List membership) throws Exception
+ private void reassignBuddies(List<Address> membership) throws Exception
{
if (log.isDebugEnabled())
{
@@ -281,24 +291,20 @@
log.debug("Entering updateGroup. Current group: " + buddyGroup + ". Current View membership: " + membership);
}
// some of my buddies have died!
- List newBuddies = buddyLocator.locateBuddies(buddyPool, membership, buddyGroup.getDataOwner());
- List uninitialisedBuddies = new ArrayList();
- Iterator newBuddiesIt = newBuddies.iterator();
- while (newBuddiesIt.hasNext())
+ List<Address> newBuddies = buddyLocator.locateBuddies(buddyPool, membership, buddyGroup.getDataOwner());
+ List<Address> uninitialisedBuddies = new ArrayList<Address>();
+ for (Address newBuddy : newBuddies)
{
- Object newBuddy = newBuddiesIt.next();
if (!buddyGroup.buddies.contains(newBuddy))
{
uninitialisedBuddies.add(newBuddy);
}
}
- List obsoleteBuddies = new ArrayList();
+ List<Address> obsoleteBuddies = new ArrayList<Address>();
// find obsolete buddies
- Iterator originalBuddies = buddyGroup.buddies.iterator();
- while (originalBuddies.hasNext())
+ for (Address origBuddy : buddyGroup.buddies)
{
- Object origBuddy = originalBuddies.next();
if (!newBuddies.contains(origBuddy))
{
obsoleteBuddies.add(origBuddy);
@@ -332,13 +338,28 @@
* Called by TreeCache._remoteAnnounceBuddyPoolName(Address address, String buddyPoolName)
* when a view change occurs and caches need to inform the cluster of which buddy pool it is in.
*/
- public void handlePoolNameBroadcast(IpAddress address, String poolName)
+ public void handlePoolNameBroadcast(Address address, String poolName)
{
if (log.isDebugEnabled())
{
- log.debug("Received announcement that cache instance " + address + " is in buddy pool " + poolName);
+ log.debug(buddyGroup.getDataOwner() + ": received announcement that cache instance " + address + " is in buddy pool " + poolName);
+ }
+ if (poolName != null)
+ buddyPool.put(address, poolName);
+ else
+ {
+ synchronized (nullBuddyPool)
+ {
+ if (!nullBuddyPool.contains(address)) nullBuddyPool.add(address);
+ }
+ }
+
+ // notify any waiting view change threads that buddy pool info has been received.
+ synchronized (poolInfoNotifierLock)
+ {
+ log.trace("Notifying any waiting view change threads that we have received buddy pool info.");
+ poolInfoNotifierLock.notifyAll();
}
- if (poolName != null) buddyPool.put(address, poolName);
}
/**
@@ -400,14 +421,14 @@
ClassLoader cl = (marshaller == null) ? null : marshaller.getClassLoader(fqnS);
Fqn integrationRoot = new Fqn(integrationBase, fqn);
- byte [] stateBuffer = (byte[]) entry.getValue();
+ byte[] stateBuffer = (byte[]) entry.getValue();
MarshalledValueInputStream in = null;
try
{
ByteArrayInputStream bais = new ByteArrayInputStream(stateBuffer);
in = new MarshalledValueInputStream(bais);
boolean hasState = in.readBoolean();
- if(hasState)
+ if (hasState)
{
stateMgr.setState(in, integrationRoot, cl);
}
@@ -427,7 +448,7 @@
public static Fqn getBackupFqn(Object buddyGroupName, Fqn origFqn)
{
- List elements = new ArrayList();
+ List<Object> elements = new ArrayList<Object>();
elements.add(BUDDY_BACKUP_SUBTREE);
elements.add(buddyGroupName);
elements.addAll(origFqn.peekElements());
@@ -442,7 +463,7 @@
return origFqn;
}
- List elements = new ArrayList();
+ List<Object> elements = new ArrayList<Object>();
elements.add(BUDDY_BACKUP_SUBTREE);
elements.add(buddyGroupRoot.get(1));
elements.addAll(origFqn.peekElements());
@@ -504,6 +525,7 @@
MethodCall replicateCall = MethodCallFactory.create(MethodDeclarations.replicateMethod, membershipCall);
int attemptsLeft = UNINIT_BUDDIES_RETRIES;
+ int currentAttempt = 0;
while (attemptsLeft-- > 0)
{
@@ -519,7 +541,7 @@
if (attemptsLeft > 0)
{
log.info("One of the buddies have not been initialised. Will retry after a short nap.");
- Thread.sleep(UNINIT_BUDDIES_RETRY_NAPTIME);
+ Thread.sleep(UNINIT_BUDDIES_RETRY_NAPTIME[currentAttempt++]);
}
else
{
@@ -535,7 +557,7 @@
log.trace("removeFromGroup notification complete");
}
- private void addBuddies(List buddies) throws Exception
+ private void addBuddies(List<Address> buddies) throws Exception
{
// this check is redundant - if buddies is empty this method will not be called. - Manik
@@ -592,6 +614,7 @@
MethodCall replicateCall = MethodCallFactory.create(MethodDeclarations.replicateMethod, membershipCall);
int attemptsLeft = UNINIT_BUDDIES_RETRIES;
+ int currentAttempt = 0;
while (attemptsLeft-- > 0)
{
@@ -607,7 +630,8 @@
if (attemptsLeft > 0)
{
log.info("One of the buddies have not been initialised. Will retry after a short nap.");
- Thread.sleep(UNINIT_BUDDIES_RETRY_NAPTIME);
+ Thread.sleep(UNINIT_BUDDIES_RETRY_NAPTIME[currentAttempt++]);
+
}
else
{
@@ -690,7 +714,7 @@
broadcastBuddyPoolMembership(null);
}
- private void broadcastBuddyPoolMembership(List recipients)
+ private void broadcastBuddyPoolMembership(List<Address> recipients)
{
// broadcast to other caches
if (log.isDebugEnabled())
@@ -849,24 +873,54 @@
private void handleEnqueuedViewChange() throws Exception
{
- log.trace("Handling queued view change");
+ log.trace("Waiting for enqueued view change events");
MembershipChange members = queue.take();
- // always refresh buddy list.
- reassignBuddies(members.newMembers);
-
- if (config.getBuddyPoolName() != null)
+ // make sure new buddies have broadcast their pool memberships.
+ while (!buddyPoolInfoAvailable(members.newMembers))
+ {
+ synchronized (poolInfoNotifierLock)
{
+ log.trace("Not received necessary buddy pool info for all new members yet; waiting on poolInfoNotifierLock.");
+ poolInfoNotifierLock.wait();
+ }
+ }
+
log.trace("Broadcasting pool membership details, triggered by view change.");
+ //if (members.oldMembers == null)
broadcastBuddyPoolMembership();
-// if (members[0] == null)
-// broadcastBuddyPoolMembership();
// else
// {
-// members[1].removeAll(members[0]);
-// broadcastBuddyPoolMembership(members[1]);
+// List<Address> delta = new ArrayList<Address>();
+// delta.addAll(members.newMembers);
+// delta.removeAll(members.oldMembers);
+// broadcastBuddyPoolMembership(delta);
// }
+
+ // always refresh buddy list.
+ reassignBuddies(members.newMembers);
}
+
+ private boolean buddyPoolInfoAvailable(List<Address> newMembers)
+ {
+ boolean infoReceived = true;
+ for (Address address : newMembers)
+ {
+ // make sure no one is concurrently writing to nullBuddyPool.
+ synchronized (nullBuddyPool)
+ {
+// log.trace("Testing on node " + buddyGroup.getDataOwner() + " for candidate " + address);
+// log.trace("Is me? " + address.equals(cache.getLocalAddress()));
+// log.trace("is in bP? " + buddyPool.keySet().contains(address));
+// log.trace("is in nBP? " + nullBuddyPool.contains(address));
+ infoReceived = infoReceived && (address.equals(cache.getLocalAddress()) || buddyPool.keySet().contains(address) || nullBuddyPool.contains(address));
+ }
+ }
+
+ if (log.isTraceEnabled())
+ log.trace(buddyGroup.getDataOwner() + " received buddy pool info for new members " + newMembers + "? " + infoReceived);
+
+ return infoReceived;
}
}
}
\ No newline at end of file
1.9 +7 -8 JBossCache/src/org/jboss/cache/buddyreplication/BuddyGroup.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: BuddyGroup.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/buddyreplication/BuddyGroup.java,v
retrieving revision 1.8
retrieving revision 1.9
diff -u -b -r1.8 -r1.9
--- BuddyGroup.java 11 Dec 2006 21:14:34 -0000 1.8
+++ BuddyGroup.java 21 Dec 2006 16:19:33 -0000 1.9
@@ -6,12 +6,11 @@
*/
package org.jboss.cache.buddyreplication;
-import org.jgroups.stack.IpAddress;
+import org.jgroups.Address;
import java.io.Serializable;
-import java.util.ArrayList;
import java.util.List;
-import java.util.Collections;
+import java.util.Vector;
/**
* Value object that represents a buddy group
@@ -27,12 +26,12 @@
private String groupName;
- private IpAddress dataOwner;
+ private Address dataOwner;
/**
* Vector<Address> - a list of JGroups addresses
*/
- List buddies = Collections.synchronizedList(new ArrayList());
+ List<Address> buddies = new Vector<Address>();//Collections.synchronizedList(new ArrayList());
// List buddies = new ArrayList();
@@ -46,12 +45,12 @@
this.groupName = groupName;
}
- public IpAddress getDataOwner()
+ public Address getDataOwner()
{
return dataOwner;
}
- public void setDataOwner(IpAddress dataOwner)
+ public void setDataOwner(Address dataOwner)
{
this.dataOwner = dataOwner;
}
@@ -61,7 +60,7 @@
return buddies;
}
- public void setBuddies(List buddies)
+ public void setBuddies(List<Address> buddies)
{
this.buddies = buddies;
}
1.8 +9 -8 JBossCache/src/org/jboss/cache/buddyreplication/NextMemberBuddyLocator.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: NextMemberBuddyLocator.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/buddyreplication/NextMemberBuddyLocator.java,v
retrieving revision 1.7
retrieving revision 1.8
diff -u -b -r1.7 -r1.8
--- NextMemberBuddyLocator.java 4 Nov 2006 22:03:24 -0000 1.7
+++ NextMemberBuddyLocator.java 21 Dec 2006 16:19:33 -0000 1.8
@@ -9,6 +9,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.cache.config.BuddyReplicationConfig.BuddyLocatorConfig;
+import org.jgroups.Address;
import org.jgroups.stack.IpAddress;
import java.net.InetAddress;
@@ -61,10 +62,10 @@
}
}
- public List<IpAddress> locateBuddies(Map<IpAddress, String> buddyPoolMap, List<IpAddress> currentMembership, IpAddress dataOwner)
+ public List<Address> locateBuddies(Map<Address, String> buddyPoolMap, List<Address> currentMembership, Address dataOwner)
{
int numBuddiesToFind = Math.min(config.getNumBuddies(), currentMembership.size());
- List<IpAddress> buddies = new ArrayList<IpAddress>(numBuddiesToFind);
+ List<Address> buddies = new ArrayList<Address>(numBuddiesToFind);
// find where we are in the list.
int dataOwnerSubscript = currentMembership.indexOf(dataOwner);
@@ -110,7 +111,7 @@
break;
}
- IpAddress candidate = currentMembership.get(subscript);
+ Address candidate = currentMembership.get(subscript);
if (
!candidate.equals(dataOwner) && // ignore self from selection as buddy
!buddies.contains(candidate) && // havent already considered this candidate
@@ -136,11 +137,11 @@
return ownerPoolName.equals(candidatePoolName);
}
- private boolean isColocated(IpAddress candidate, IpAddress dataOwner)
+ private boolean isColocated(Address candidate, Address dataOwner)
{
// assume they're both IpAddresses??
- InetAddress inetC = candidate.getIpAddress();
- InetAddress inetD = dataOwner.getIpAddress();
+ InetAddress inetC = ((IpAddress) candidate).getIpAddress();
+ InetAddress inetD = ((IpAddress) dataOwner).getIpAddress();
if (inetC.equals(inetD)) return true;
More information about the jboss-cvs-commits
mailing list