[jboss-cvs] JBossCache/src/org/jboss/cache/buddyreplication ...
Manik Surtani
manik at jboss.org
Sun May 27 11:21:55 EDT 2007
User: msurtani
Date: 07/05/27 11:21:55
Modified: src/org/jboss/cache/buddyreplication BuddyManager.java
Log:
Fixed more BR brittleness
Revision Changes Path
1.76 +94 -93 JBossCache/src/org/jboss/cache/buddyreplication/BuddyManager.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: BuddyManager.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/buddyreplication/BuddyManager.java,v
retrieving revision 1.75
retrieving revision 1.76
diff -u -b -r1.75 -r1.76
--- BuddyManager.java 25 May 2007 16:34:51 -0000 1.75
+++ BuddyManager.java 27 May 2007 15:21:55 -0000 1.76
@@ -25,6 +25,7 @@
import org.jboss.util.stream.MarshalledValueInputStream;
import org.jboss.util.stream.MarshalledValueOutputStream;
import org.jgroups.Address;
+import org.jgroups.Channel;
import org.jgroups.View;
import org.jgroups.util.Util;
@@ -34,6 +35,7 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -54,10 +56,6 @@
private static Log log = LogFactory.getLog(BuddyManager.class);
/**
- * A place to drain the view change queue.
- */
- private Collection<MembershipChange> sink = new NullCollection<MembershipChange>();
- /**
* Configuration object.
*/
final BuddyReplicationConfig config;
@@ -124,8 +122,10 @@
*/
private final Object poolInfoNotifierLock = new Object();
-
private CountDownLatch initialisationLatch = new CountDownLatch(1);
+ // a dummy MembershipChange - a poison-pill to be placed on the membership change queue to notify async handler
+ // threads to exit gracefully when the BuddyManager has been stopped.
+ private static final MembershipChange STOP_NOTIFIER = new MembershipChange(null, null);
public BuddyManager(BuddyReplicationConfig config)
{
@@ -186,6 +186,22 @@
return s.replace(':', '_');
}
+ /**
+ * Stops the buddy manager and the related async thread.
+ */
+ public void stop()
+ {
+ try
+ {
+ queue.clear();
+ queue.put(STOP_NOTIFIER);
+ }
+ catch (InterruptedException ie)
+ {
+ // do nothing - we're stopping anyway
+ }
+ }
+
public void init(CacheImpl cache) throws CacheException
{
log.debug("Starting buddy manager");
@@ -265,6 +281,11 @@
this.oldMembers = oldMembers;
this.newMembers = newMembers;
}
+
+ public String toString()
+ {
+ return "MembershipChange: Old members = " + oldMembers + " New members = " + newMembers;
+ }
}
private synchronized void enqueueViewChange(List<Address> oldMembers, List<Address> newMembers)
@@ -272,9 +293,14 @@
// put this on a queue
try
{
+ if (queue.peek() != STOP_NOTIFIER)
+ {
//first empty the queue. All queued up view changes that have not been processed yet are now obsolete.
- queue.drainTo(sink);
- queue.put(new MembershipChange(oldMembers, newMembers));
+ queue.clear();
+ MembershipChange mc = new MembershipChange(oldMembers, newMembers);
+ if (log.isTraceEnabled()) log.trace("Enqueueing " + mc + " for async processing");
+ queue.put(mc);
+ }
}
catch (InterruptedException e)
{
@@ -289,8 +315,10 @@
* have been added. Makes use of the BuddyLocator and then
* makes RPC calls to remote nodes to assign/remove buddies.
*/
- private void reassignBuddies(List<Address> membership) throws CacheException
+ private void reassignBuddies(List<Address> members) throws CacheException
{
+ List<Address> membership = new ArrayList<Address>(members); // defensive copy
+
if (log.isDebugEnabled())
{
log.debug("Data owner address " + cache.getLocalAddress());
@@ -298,6 +326,13 @@
}
// some of my buddies have died!
List<Address> newBuddies = buddyLocator.locateBuddies(buddyPool, membership, buddyGroup.getDataOwner());
+ List<Address> unreachableBuddies;
+ if (!(unreachableBuddies = checkBuddyStatus(newBuddies)).isEmpty())
+ {
+ // some of the new buddies are unreachable. Ditch them, try the algo again.
+ membership.removeAll(unreachableBuddies);
+ newBuddies = buddyLocator.locateBuddies(buddyPool, membership, buddyGroup.getDataOwner());
+ }
List<Address> uninitialisedBuddies = new ArrayList<Address>();
List<Address> originalBuddies = buddyGroup.getBuddies();
@@ -350,6 +385,21 @@
}
+ /**
+ * Tests whether all members in the list are valid JGroups members.
+ *
+ * @param members
+ * @return
+ */
+ private List<Address> checkBuddyStatus(List<Address> members)
+ {
+ Channel ch = cache.getConfiguration().getRuntimeConfig().getChannel();
+ View currentView = ch.getView();
+ List<Address> deadBuddies = new LinkedList<Address>();
+ for (Address a : members) if (!currentView.containsMember(a)) deadBuddies.add(a);
+ return deadBuddies;
+ }
+
// -------------- methods to be called by the tree cache --------------------
/**
@@ -402,24 +452,23 @@
buddyGroupsIParticipateIn.remove(groupName);
// remove backup data for this group
- // TODO: should we REALLY be doing this? How do we know that this node didn't die and by removing this data we're deleting bkups?
-// if (log.isInfoEnabled()) log.info("Removing backup data for group " + groupName);
-//
-// try
-// {
-// // should be a LOCAL call.
-// // cache.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
-//
-// // cache.remove(new Fqn(BUDDY_BACKUP_SUBTREE_FQN, groupName));
-// }
-// catch (CacheException e)
-// {
-// log.error("Unable to remove backup data for group " + groupName, e);
-// }
-// finally
-// {
-// cache.getInvocationContext().getOptionOverrides().setCacheModeLocal(false);
-// }
+ if (log.isInfoEnabled()) log.info("Removing backup data for group " + groupName);
+
+ try
+ {
+ // should be a LOCAL call.
+ cache.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
+
+ cache.remove(new Fqn(BUDDY_BACKUP_SUBTREE_FQN, groupName));
+ }
+ catch (CacheException e)
+ {
+ log.error("Unable to remove backup data for group " + groupName, e);
+ }
+ finally
+ {
+ cache.getInvocationContext().getOptionOverrides().setCacheModeLocal(false);
+ }
}
/**
@@ -943,6 +992,7 @@
private class AsyncViewChangeHandlerThread implements Runnable
{
private Thread t;
+ private boolean isRunning = true;
public void start()
{
@@ -966,7 +1016,7 @@
{
log.debug("Caught InterruptedException", e);
}
- while (!Thread.interrupted())
+ while (!Thread.interrupted() && isRunning)
{
try
{
@@ -989,6 +1039,20 @@
{
log.trace("Waiting for enqueued view change events");
MembershipChange members = queue.take();
+ if (members == STOP_NOTIFIER)
+ {
+ // time to go home
+ isRunning = false;
+ return;
+ }
+
+ // there is a strange case where JGroups issues view changes and just includes self in new views, and then
+ // quickly corrects it. Happens intermittently on some unit tests. If this is such a case, please ignore.
+ if (members.newMembers.size() == 1 && members.newMembers.get(0).equals(cache.getLocalAddress()))
+ {
+ log.info("Ignoring membership change event since it only contains self.");
+ return;
+ }
broadcastPoolMembership(members);
@@ -1050,73 +1114,10 @@
return infoReceived;
}
- }
-
- private class NullCollection<E> implements Collection<E>
- {
-
- public int size()
- {
- return 0; //Does nothing
- }
-
- public boolean isEmpty()
- {
- return true;
- }
-
- public boolean contains(Object o)
- {
- return false;
- }
-
- public Iterator<E> iterator()
- {
- return null;
- }
-
- public Object[] toArray()
- {
- return new Object[0];
- }
-
- public <T> T[] toArray(T[] a)
- {
- return null;
- }
-
- public boolean add(E o)
- {
- return true;
- }
-
- public boolean remove(Object o)
- {
- return true;
- }
-
- public boolean containsAll(Collection<?> c)
- {
- return false;
- }
-
- public boolean addAll(Collection<? extends E> c)
- {
- return true;
- }
-
- public boolean removeAll(Collection<?> c)
- {
- return true;
- }
-
- public boolean retainAll(Collection<?> c)
- {
- return true;
- }
- public void clear()
+ public void stop()
{
+ if (t != null) t.interrupt();
}
}
}
\ No newline at end of file
More information about the jboss-cvs-commits
mailing list