Author: manik.surtani(a)jboss.com
Date: 2008-04-07 17:16:42 -0400 (Mon, 07 Apr 2008)
New Revision: 5514
Modified:
core/trunk/src/main/java/org/jboss/cache/CacheImpl.java
core/trunk/src/main/java/org/jboss/cache/Fqn.java
core/trunk/src/main/java/org/jboss/cache/RPCManager.java
core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java
core/trunk/src/main/java/org/jboss/cache/invocation/RemoteCacheInvocationDelegate.java
core/trunk/src/main/java/org/jboss/cache/marshall/VersionAwareMarshaller.java
core/trunk/src/main/java/org/jboss/cache/transaction/TransactionEntry.java
core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyReplicationFailoverTest.java
core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyReplicationTestsBase.java
core/trunk/src/test/java/org/jboss/cache/factories/UnitTestCacheConfigurationFactory.java
core/trunk/src/test/java/org/jboss/cache/misc/TestingUtil.java
Log:
JBCACHE-1320: When a view change is received and a buddy knows that a data owner has
died, it should mark the region as defunct
Modified: core/trunk/src/main/java/org/jboss/cache/CacheImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/CacheImpl.java 2008-04-07 14:46:32 UTC (rev
5513)
+++ core/trunk/src/main/java/org/jboss/cache/CacheImpl.java 2008-04-07 21:16:42 UTC (rev
5514)
@@ -45,15 +45,7 @@
import javax.transaction.SystemException;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
/**
* The default implementation class of {@link org.jboss.cache.Cache} and {@link
org.jboss.cache.CacheSPI}. This class
@@ -1714,7 +1706,7 @@
ctx.setOriginLocal(false);
// use a get() call into the cache to make sure cache loading takes place.
- // no need to cache the original skipDataGravitation setting here - it will
always be false of we got here!!
+ // no need to cache the original skipDataGravitation setting here - it will
always be false if we got here!!
ctx.getOptionOverrides().setSkipDataGravitation(true);
Node actualNode = spi.getNode(fqn);
ctx.getOptionOverrides().setSkipDataGravitation(false);
@@ -1722,6 +1714,9 @@
if (trace) log.trace("In local tree, this is " + actualNode);
Fqn backupNodeFqn = null;
+
+ // TODO: Refactor this muck.
+
if (actualNode == null && searchSubtrees)
{
log.trace("Looking at backup trees.");
@@ -1736,14 +1731,41 @@
{
// childName is the name of a buddy group since all child names in
this
// collection are direct children of BUDDY_BACKUP_SUBTREE_FQN
- backupNodeFqn = BuddyManager.getBackupFqn(childName.toString(),
fqn);
+ Fqn backupRoot = new Fqn(BuddyManager.BUDDY_BACKUP_SUBTREE_FQN,
childName);
+ if (BuddyManager.isDeadBackupRoot(backupRoot))
+ {
+ //actualNode = searchDeadRoot(backupRoot, fqn);
+ Set<Integer> deadChildNames = new
TreeSet<Integer>(spi.getChildrenNames(backupRoot));
+ Integer[] elems = deadChildNames.toArray(new Integer[]{});
- // use a get() call into the cache to make sure cache loading takes
place.
- ctx.getOptionOverrides().setSkipDataGravitation(true);
- actualNode = spi.getNode(backupNodeFqn);
- ctx.getOptionOverrides().setSkipDataGravitation(false);
+ // these are integers. we need to start with the highest/most
recent.
+ for (int i = elems.length - 1; i > -1; i--)
+ {
+ Integer versionOfDefunctData = elems[i];
+ backupNodeFqn = new Fqn(new Fqn(backupRoot,
versionOfDefunctData), fqn);
+
+ // use a get() call into the cache to make sure cache loading
takes place.
+ ctx.getOptionOverrides().setSkipDataGravitation(true);
+ actualNode = spi.peek(backupNodeFqn, false);
+ ctx.getOptionOverrides().setSkipDataGravitation(false);
+
+ // break out of the inner loop searching through the dead
node's various backups
+ if (actualNode != null) break;
+ }
+ }
+ else
+ {
+ backupNodeFqn = new Fqn(backupRoot, fqn);
+ // use a get() call into the cache to make sure cache loading
takes place.
+ ctx.getOptionOverrides().setSkipDataGravitation(true);
+ actualNode = spi.getNode(backupNodeFqn);
+ ctx.getOptionOverrides().setSkipDataGravitation(false);
+ }
+
if (trace)
log.trace("Looking for " + backupNodeFqn + ".
Search result: " + actualNode);
+
+ // break out of outer loop searching through all available backups.
if (actualNode != null) break;
}
}
@@ -1769,6 +1791,11 @@
return GravitateResult.subtreeResult(list, backupNodeFqn);
}
+ catch (RuntimeException re)
+ {
+ if (trace) log.trace("Caught throwable", re);
+ throw re;
+ }
finally
{
ctx.setOriginLocal(true);
Modified: core/trunk/src/main/java/org/jboss/cache/Fqn.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/Fqn.java 2008-04-07 14:46:32 UTC (rev 5513)
+++ core/trunk/src/main/java/org/jboss/cache/Fqn.java 2008-04-07 21:16:42 UTC (rev 5514)
@@ -518,7 +518,11 @@
}
else
{
- return String.valueOf(getLastElement());
+ Object last = getLastElement();
+ if (last instanceof String)
+ return (String) last;
+ else
+ return String.valueOf(getLastElement());
}
}
Modified: core/trunk/src/main/java/org/jboss/cache/RPCManager.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/RPCManager.java 2008-04-07 14:46:32 UTC (rev
5513)
+++ core/trunk/src/main/java/org/jboss/cache/RPCManager.java 2008-04-07 21:16:42 UTC (rev
5514)
@@ -2,6 +2,7 @@
import org.jboss.cache.marshall.MethodCall;
import org.jgroups.Address;
+import org.jgroups.Channel;
import org.jgroups.blocks.RspFilter;
import java.util.List;
@@ -124,5 +125,10 @@
*/
void fetchPartialState(List<Address> sources, Fqn subtree) throws Exception;
-
+ /**
+ * Retrieves the Channel
+ *
+ * @return a channel
+ */
+ Channel getChannel();
}
Modified: core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java 2008-04-07 14:46:32 UTC
(rev 5513)
+++ core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java 2008-04-07 21:16:42 UTC
(rev 5514)
@@ -484,6 +484,11 @@
fetchPartialState(sources, encodedStateId);
}
+ public Channel getChannel()
+ {
+ return channel;
+ }
+
public void fetchPartialState(List<Address> sources, Fqn subtree) throws
Exception
{
if (subtree == null)
Modified: core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java 2008-04-07
14:46:32 UTC (rev 5513)
+++ core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java 2008-04-07
21:16:42 UTC (rev 5514)
@@ -92,11 +92,11 @@
final Set<Address> nullBuddyPool = new ConcurrentHashSet<Address>();
/**
- * Map of bddy groups the current instance participates in as a backup node.
+ * Map of buddy groups the current instance participates in as a backup node.
* Keyed on String group name, values are BuddyGroup objects.
* Needs to deal with concurrent access - concurrent assignTo/removeFrom buddy grp
*/
- Map<String, BuddyGroup> buddyGroupsIParticipateIn = new
ConcurrentHashMap<String, BuddyGroup>();
+ Map<Address, BuddyGroup> buddyGroupsIParticipateIn = new
ConcurrentHashMap<Address, BuddyGroup>();
/**
* Queue to deal with queued up view change requests - which are handled
asynchronously
@@ -213,7 +213,7 @@
return config.getBuddyPoolName();
}
- public static String getGroupNameFromAddress(Object address)
+ public static String getGroupNameFromAddress(Address address)
{
return address.toString().replace(':', '_');
}
@@ -326,7 +326,7 @@
}
}
- private synchronized void enqueueViewChange(List<Address> oldMembers,
List<Address> newMembers)
+ private synchronized void enqueueViewChange(MembershipChange mc)
{
// put this on a queue
try
@@ -335,7 +335,6 @@
{
//first empty the queue. All queued up view changes that have not been
processed yet are now obsolete.
queue.clear();
- MembershipChange mc = new MembershipChange(oldMembers, newMembers);
if (log.isTraceEnabled()) log.trace("Enqueueing " + mc + " for
async processing");
queue.put(mc);
}
@@ -536,7 +535,7 @@
}
if (log.isInfoEnabled()) log.info("Assigning self to buddy group " +
newGroup);
- buddyGroupsIParticipateIn.put(newGroup.getGroupName(), newGroup);
+ buddyGroupsIParticipateIn.put(newGroup.getDataOwner(), newGroup);
// Integrate state transfer from the data owner of the buddy group
Fqn integrationBase = new Fqn(BuddyManager.BUDDY_BACKUP_SUBTREE_FQN,
@@ -654,6 +653,32 @@
}
/**
+ * @param dataOwner owner of a data set
+ * @return a backup root for a given data owner
+ */
+ public static Fqn<String> getBackupRoot(Address dataOwner)
+ {
+ return new Fqn<String>(BUDDY_BACKUP_SUBTREE_FQN,
getGroupNameFromAddress(dataOwner));
+ }
+
+ /**
+ * Returns the backup root of a dead data owner
+ *
+ * @param dataOwner owner of data
+ * @return Fqn of dead data owner's root
+ */
+ public static Fqn<String> getDeadBackupRoot(Address dataOwner)
+ {
+ return new Fqn<String>(BUDDY_BACKUP_SUBTREE_FQN,
getGroupNameFromAddress(dataOwner) + ":DEAD");
+ }
+
+ public static boolean isDeadBackupRoot(Fqn f)
+ {
+ return f.getParent().equals(BUDDY_BACKUP_SUBTREE_FQN) &&
f.getLastElementAsString().endsWith(":DEAD");
+ }
+
+
+ /**
* Utility method that retrieves a buddy backup Fqn given the actual Fqn of some data
and the backup subtree for the
* buddy group in question
*
@@ -679,6 +704,21 @@
return name != null && name.hasElement(BuddyManager.BUDDY_BACKUP_SUBTREE);
}
+ public static boolean isDeadBackupFqn(Fqn name)
+ {
+ if (name == null) return false;
+ Object elem1 = name.get(1);
+ if (elem1 instanceof String)
+ {
+ String strElem1 = (String) elem1;
+ return name.hasElement(BuddyManager.BUDDY_BACKUP_SUBTREE) &&
strElem1.endsWith(":DEAD");
+ }
+ else
+ {
+ return false;
+ }
+ }
+
// -------------- methods to be called by the BaseRPCINterceptor --------------------
/**
@@ -1064,10 +1104,46 @@
public static Fqn getActualFqn(Fqn fqn)
{
if (!isBackupFqn(fqn)) return fqn;
- // remove the first 2 elements
- return fqn.getSubFqn(2, fqn.size());
+ // remove the first 2 (or 3 in the case of a dead backup region) elements
+ return fqn.getSubFqn(isDeadBackupFqn(fqn) ? 3 : 2, fqn.size());
}
+ private void migrateDefunctData(Node backupRoot, Address dataOwner)
+ {
+ Fqn defunctBackupRootFqn = getDefunctBackupRootFqn(dataOwner);
+
+ for (Object child : backupRoot.getChildren())
+ {
+ Fqn childFqn = ((Node) child).getFqn();
+ cache.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
+ cache.move(childFqn, defunctBackupRootFqn);
+ }
+
+ cache.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
+ backupRoot.getParent().removeChild(backupRoot.getFqn().getLastElement());
+ }
+
+ private Fqn getDefunctBackupRootFqn(Address dataOwner)
+ {
+ // the defunct Fqn should be: /_BUDDY_BACKUP_/dataOwnerAddess:DEAD/N
+ // where N is a number.
+ Fqn<String> defunctRoot = getDeadBackupRoot(dataOwner);
+ cache.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
+ Node defunctRootNode = cache.getRoot().addChild(defunctRoot);
+ SortedSet childrenNames = new TreeSet(defunctRootNode.getChildrenNames()); // will
be naturally sorted.
+ Integer childName = 1;
+
+ if (!childrenNames.isEmpty())
+ {
+ Integer lastChild = (Integer) childrenNames.last();
+ childName = lastChild + 1;
+ }
+
+ cache.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
+ defunctRootNode.addChild(new Fqn<Integer>(childName));
+ return new Fqn(defunctRoot, childName);
+ }
+
/**
* Asynchronous thread that deals with handling view changes placed on a queue
*/
@@ -1160,6 +1236,26 @@
// always refresh buddy list.
reassignBuddies(members.newMembers);
+
+ // look for missing data owners.
+ // if the view change involves the removal of a data owner of a group in which
we participate in, we should
+ // rename the backup the region accordingly, and remove the group from the list
in which the current instance participates.
+ if (!members.newMembers.containsAll(buddyGroupsIParticipateIn.keySet()))
+ {
+ Set<Address> toRemove = new HashSet<Address>();
+
+ for (Address a : buddyGroupsIParticipateIn.keySet())
+ {
+ if (!members.newMembers.contains(a)) toRemove.add(a);
+ }
+
+ for (Address a : toRemove)
+ {
+ BuddyGroup bg = buddyGroupsIParticipateIn.remove(a);
+ Node backupRoot = cache.getNode(getBackupRoot(bg.getDataOwner()));
+ migrateDefunctData(backupRoot, bg.getDataOwner());
+ }
+ }
}
private void broadcastPoolMembership(MembershipChange members)
@@ -1224,11 +1320,12 @@
// the whole 'oldMembers' concept is only used for buddy pool
announcements.
if (config.getBuddyPoolName() == null)
{
- enqueueViewChange(null, newMembers);
+ enqueueViewChange(new MembershipChange(null, new
Vector<Address>(newMembers)));
}
else
{
- enqueueViewChange(oldMembers == null ? null : new
Vector<Address>(oldMembers), new Vector<Address>(newMembers));
+ MembershipChange mc = new MembershipChange(oldMembers == null ? null : new
Vector<Address>(oldMembers), new Vector<Address>(newMembers));
+ enqueueViewChange(mc);
if (oldMembers == null) oldMembers = new Vector<Address>();
oldMembers.clear();
oldMembers.addAll(newMembers);
Modified:
core/trunk/src/main/java/org/jboss/cache/invocation/RemoteCacheInvocationDelegate.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/invocation/RemoteCacheInvocationDelegate.java 2008-04-07
14:46:32 UTC (rev 5513)
+++
core/trunk/src/main/java/org/jboss/cache/invocation/RemoteCacheInvocationDelegate.java 2008-04-07
21:16:42 UTC (rev 5514)
@@ -2,6 +2,7 @@
import org.apache.commons.logging.LogFactory;
import org.jboss.cache.Fqn;
+import org.jboss.cache.NodeSPI;
import org.jboss.cache.buddyreplication.BuddyGroup;
import org.jboss.cache.buddyreplication.BuddyManager;
import org.jboss.cache.buddyreplication.BuddyNotInitException;
@@ -68,7 +69,6 @@
public void dataGravitationCleanup(Fqn primary, Fqn backup) throws Exception
{
-// MethodCall primaryDataCleanup, backupDataCleanup;
if (buddyManager.isDataGravitationRemoveOnFind())
{
if (log.isTraceEnabled())
@@ -79,16 +79,34 @@
{
// only attempt to clean up the backup if the primary did not exist - a waste
of a call otherwise.
getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
- removeNode(backup);
+ if (removeNode(backup))
+ {
+ // if this is a DIRECT child of a DEAD buddy backup region, then remove
the empty dead region structural node.
+ if (BuddyManager.isDeadBackupFqn(backup) &&
BuddyManager.isDeadBackupRoot(backup.getParent().getParent()))
+ {
+ NodeSPI deadBackupRoot = peek(backup.getParent(), false);
+ if (deadBackupRoot.getChildrenMapDirect().isEmpty())
+ {
+ if (log.isTraceEnabled()) log.trace("Removing dead backup
region " + deadBackupRoot.getFqn());
+ removeNode(deadBackupRoot.getFqn());
+
+ // now check the grand parent and see if we are free of versions
+ deadBackupRoot = peek(deadBackupRoot.getFqn().getParent(), false);
+ if (deadBackupRoot.getChildrenMapDirect().isEmpty())
+ {
+ if (log.isTraceEnabled()) log.trace("Removing dead backup
region " + deadBackupRoot.getFqn());
+ removeNode(deadBackupRoot.getFqn());
+ }
+ }
+ }
+ }
}
}
else
{
if (log.isTraceEnabled())
log.trace("DataGravitationCleanup: Evicting primary (" + primary +
") and backup (" + backup + ")");
- //primaryDataCleanup =
MethodCallFactory.create(MethodDeclarations.evictNodeMethodLocal, primary);
evict(primary, true);
- //backupDataCleanup =
MethodCallFactory.create(MethodDeclarations.evictNodeMethodLocal, backup);
evict(backup, true);
}
}
Modified: core/trunk/src/main/java/org/jboss/cache/marshall/VersionAwareMarshaller.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/marshall/VersionAwareMarshaller.java 2008-04-07
14:46:32 UTC (rev 5513)
+++
core/trunk/src/main/java/org/jboss/cache/marshall/VersionAwareMarshaller.java 2008-04-07
21:16:42 UTC (rev 5514)
@@ -40,6 +40,7 @@
private static final Log log = LogFactory.getLog(VersionAwareMarshaller.class);
private static final int VERSION_200 = 20;
private static final int VERSION_210 = 21;
+ private static final int VERSION_220 = 22;
private static final int CUSTOM_MARSHALLER = 999;
private ComponentRegistry componentRegistry;
@@ -319,6 +320,7 @@
marshallers.put(VERSION_200, marshaller);
}
break;
+ case VERSION_220:
case VERSION_210:
knownVersion = true;
default:
Modified: core/trunk/src/main/java/org/jboss/cache/transaction/TransactionEntry.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/transaction/TransactionEntry.java 2008-04-07
14:46:32 UTC (rev 5513)
+++ core/trunk/src/main/java/org/jboss/cache/transaction/TransactionEntry.java 2008-04-07
21:16:42 UTC (rev 5514)
@@ -271,7 +271,7 @@
lock.release(owner);
if (log.isTraceEnabled())
{
- log.trace("releasing lock for " + ((IdentityLock) lock).getFqn()
+ " (" + lock + ")");
+ log.trace("releasing lock for " + lock.getFqn() + " ("
+ lock + ")");
}
}
}
@@ -438,6 +438,6 @@
*/
public boolean existModifications()
{
- return !modification_list.isEmpty() || !cl_mod_list.isEmpty();
+ return !modification_list.isEmpty() || !cl_mod_list.isEmpty();
}
}
Modified:
core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyReplicationFailoverTest.java
===================================================================
---
core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyReplicationFailoverTest.java 2008-04-07
14:46:32 UTC (rev 5513)
+++
core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyReplicationFailoverTest.java 2008-04-07
21:16:42 UTC (rev 5514)
@@ -9,13 +9,17 @@
import org.jboss.cache.CacheSPI;
import org.jboss.cache.Fqn;
import org.jboss.cache.misc.TestingUtil;
+import org.jgroups.JChannel;
+import org.jgroups.protocols.DISCARD;
import static org.testng.AssertJUnit.*;
+import org.testng.annotations.Test;
/**
* Tests behaviour when data owners fail - essentially this tests data gravitation
*
* @author <a href="mailto:manik@jboss.org">Manik Surtani
(manik(a)jboss.org)</a>
*/
+@Test(groups = "functional")
public class BuddyReplicationFailoverTest extends BuddyReplicationTestsBase
{
protected boolean optimisticLocks = false;
@@ -60,15 +64,21 @@
if (killOwner)
{
System.out.println("***** About to kill original data owner (" +
caches.get(0).getLocalAddress() + "). *****");
- // make sure the JGroups channel shutdown is forced.
-// System.setProperty("org.jboss.cache.shutdown.force",
"true");
- caches.get(0).stop();
- caches.set(0, null);
- // wait up to 5 mins? :S
- TestingUtil.blockUntilViewsReceived(5 * 60 * 1000, false, caches.get(1),
caches.get(2));
+ // forcefully kill data owner.
+ killChannel(caches.get(0), caches.get(1), 2);
-// TestingUtil.sleepThread(600000);
-// TestingUtil.sleepThread(1000);
+ System.out.println("Killed. Testing backup roots.");
+ dumpCacheContents(caches);
+ // assert that the remaining caches have picked new buddies. Cache 1 should
have cache 2's backup data.
+ assert
caches.get(1).peek(BuddyManager.getBackupRoot(caches.get(2).getLocalAddress()), false) !=
null : "Should have new buddy's backup root.";
+ assert
caches.get(1).peek(BuddyManager.getBackupRoot(caches.get(1).getLocalAddress()), false) ==
null : "Should not have self as a backup root.";
+ assert
caches.get(1).peek(BuddyManager.getBackupRoot(caches.get(0).getLocalAddress()), false) ==
null : "Should not have dead node as a backup root.";
+ assert caches.get(1).peek(new
Fqn(BuddyManager.getDeadBackupRoot(caches.get(0).getLocalAddress()), new Integer(1)),
false) != null : "Should have dead node as a defunct backup root.";
+
+ assert
caches.get(2).peek(BuddyManager.getBackupRoot(caches.get(2).getLocalAddress()), false) ==
null : "Should not have self as a backup root.";
+ assert
caches.get(2).peek(BuddyManager.getBackupRoot(caches.get(1).getLocalAddress()), false) !=
null : "Should have new buddy's backup root.";
+ assert
caches.get(2).peek(BuddyManager.getBackupRoot(caches.get(0).getLocalAddress()), false) ==
null : "Should not have dead node as a backup root.";
+ assert caches.get(2).peek(new
Fqn(BuddyManager.getDeadBackupRoot(caches.get(0).getLocalAddress()), new Integer(1)),
false) == null : "Should not have dead node as a defunct backup root.";
}
System.out.println("***** Killed original data owner, about to call a get on a
different cache instance. *****");
@@ -84,7 +94,23 @@
Fqn newBackupFqn = BuddyManager.getBackupFqn(caches.get(2).getLocalAddress(),
fqn);
// use exists instead of get() to prevent going up the interceptor stack
- if (!killOwner) assertTrue("Should be false",
!caches.get(0).exists(fqn));
+ if (!killOwner)
+ {
+ assertTrue("Should be false", !caches.get(0).exists(fqn));
+ }
+ else
+ {
+ assert
caches.get(1).peek(BuddyManager.getBackupRoot(caches.get(2).getLocalAddress()), false) !=
null : "Should have new buddy's backup root.";
+ assert
caches.get(1).peek(BuddyManager.getBackupRoot(caches.get(1).getLocalAddress()), false) ==
null : "Should not have self as a backup root.";
+ assert
caches.get(1).peek(BuddyManager.getBackupRoot(caches.get(0).getLocalAddress()), false) ==
null : "Should not have dead node as a backup root.";
+ assert caches.get(1).peek(new
Fqn(BuddyManager.getDeadBackupRoot(caches.get(0).getLocalAddress()), new Integer(1)),
false) == null : "Should not have dead node as a defunct backup root.";
+ assert
caches.get(1).peek(BuddyManager.getDeadBackupRoot(caches.get(0).getLocalAddress()), false)
== null : "Should not have dead node as a defunct backup root.";
+
+ assert
caches.get(2).peek(BuddyManager.getBackupRoot(caches.get(2).getLocalAddress()), false) ==
null : "Should not have self as a backup root.";
+ assert
caches.get(2).peek(BuddyManager.getBackupRoot(caches.get(1).getLocalAddress()), false) !=
null : "Should have new buddy's backup root.";
+ assert
caches.get(2).peek(BuddyManager.getBackupRoot(caches.get(0).getLocalAddress()), false) ==
null : "Should not have dead node as a backup root.";
+ assert caches.get(2).peek(new
Fqn(BuddyManager.getDeadBackupRoot(caches.get(0).getLocalAddress()), new Integer(1)),
false) == null : "Should not have dead node as a defunct backup root.";
+ }
assertTrue("Should be false", !caches.get(1).exists(fqn));
// the old backup should no longer exist
@@ -105,6 +131,19 @@
assertFalse("Should be null", caches.get(2).exists(newBackupFqn));
}
+ private void killChannel(CacheSPI cacheToKill, CacheSPI anotherCache, int
finalExpectedClusterSize)
+ {
+ JChannel channel = (JChannel) cacheToKill.getRPCManager().getChannel();
+ DISCARD discard = (DISCARD)
channel.getProtocolStack().findProtocol(DISCARD.class);
+ if (discard != null)
+ {
+ discard.setDiscardAll(true);
+ TestingUtil.blockUntilViewReceived(anotherCache, finalExpectedClusterSize,
10000, false);
+ // an extra second, for some reason we need this.
+ TestingUtil.sleepThread(1000);
+ }
+ }
+
public void testDataReplicationSuppression() throws Exception
{
caches = createCaches(3, false, false, optimisticLocks);
Modified:
core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyReplicationTestsBase.java
===================================================================
---
core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyReplicationTestsBase.java 2008-04-07
14:46:32 UTC (rev 5513)
+++
core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyReplicationTestsBase.java 2008-04-07
21:16:42 UTC (rev 5514)
@@ -127,11 +127,11 @@
protected CacheSPI<Object, Object> createCache(boolean optimisticLocks, int
numBuddies, String buddyPoolName, boolean useDataGravitation, boolean removeOnFind,
boolean start) throws Exception
{
- CacheSPI<Object, Object> c = (CacheSPI<Object, Object>) new
DefaultCacheFactory<Object,
Object>().createCache(UnitTestCacheConfigurationFactory.createConfiguration(CacheMode.REPL_SYNC),
false);
+ CacheSPI<Object, Object> c = (CacheSPI<Object, Object>) new
DefaultCacheFactory<Object,
Object>().createCache(UnitTestCacheConfigurationFactory.createConfiguration(CacheMode.REPL_SYNC,
false, false, true), false);
c.getConfiguration().setClusterName("BuddyReplicationTest");
// basic config
String xmlString =
"<config><buddyReplicationEnabled>true</buddyReplicationEnabled>\n"
+
-
"<buddyCommunicationTimeout>500</buddyCommunicationTimeout>\n" +
+
"<buddyCommunicationTimeout>500000</buddyCommunicationTimeout>\n" +
"
<buddyLocatorClass>org.jboss.cache.buddyreplication.NextMemberBuddyLocator</buddyLocatorClass>\n"
+
" <autoDataGravitation>" + useDataGravitation +
"</autoDataGravitation>\n" +
" <dataGravitationRemoveOnFind>" + removeOnFind +
"</dataGravitationRemoveOnFind>\n" +
@@ -311,11 +311,11 @@
assertTrue(buddyLocalAddress + " should be a buddy to " +
dataOwnerLocalAddress,
dataOwnerBuddyManager.getBuddyAddresses().contains(buddyLocalAddress));
// and now on the buddy end
- BuddyGroup group =
buddyBuddyManager.buddyGroupsIParticipateIn.get(BuddyManager.getGroupNameFromAddress(dataOwnerLocalAddress));
+ BuddyGroup group =
buddyBuddyManager.buddyGroupsIParticipateIn.get(dataOwnerLocalAddress);
System.out.println("*** Groups I participate in: " +
buddyBuddyManager.buddyGroupsIParticipateIn);
System.out.println("*** Buddy's version of dataOwner's group " +
group);
- assertTrue("buddy's list of groups it participates in should contain data
owner's group name",
buddyBuddyManager.buddyGroupsIParticipateIn.containsKey(BuddyManager.getGroupNameFromAddress(dataOwnerLocalAddress)));
+ assertTrue("buddy's list of groups it participates in should contain data
owner's group name",
buddyBuddyManager.buddyGroupsIParticipateIn.containsKey(dataOwnerLocalAddress));
if (onlyBuddy) assertEquals(1, group.getBuddies().size());
assertTrue(buddyLocalAddress + " should be a buddy to " +
group.getGroupName(), group.getBuddies().contains(buddyLocalAddress));
}
Modified:
core/trunk/src/test/java/org/jboss/cache/factories/UnitTestCacheConfigurationFactory.java
===================================================================
---
core/trunk/src/test/java/org/jboss/cache/factories/UnitTestCacheConfigurationFactory.java 2008-04-07
14:46:32 UTC (rev 5513)
+++
core/trunk/src/test/java/org/jboss/cache/factories/UnitTestCacheConfigurationFactory.java 2008-04-07
21:16:42 UTC (rev 5514)
@@ -11,9 +11,9 @@
import org.jboss.cache.config.Configuration.CacheMode;
import org.jboss.cache.config.ConfigurationException;
import org.jboss.cache.config.EvictionRegionConfig;
+import org.jboss.cache.eviction.LRUConfiguration;
import org.jboss.cache.transaction.TransactionSetup;
import org.jboss.cache.xml.XmlHelper;
-import org.jboss.cache.eviction.LRUConfiguration;
import org.jgroups.conf.XmlConfigurator;
import org.w3c.dom.Element;
import org.w3c.dom.NodeList;
@@ -47,6 +47,11 @@
public static Configuration createConfiguration(CacheMode mode, boolean useEviction,
boolean usePassivation) throws ConfigurationException
{
+ return createConfiguration(mode, useEviction, usePassivation, false);
+ }
+
+ public static Configuration createConfiguration(CacheMode mode, boolean useEviction,
boolean usePassivation, boolean killable) throws ConfigurationException
+ {
UnitTestXmlConfigurationParser parser = new UnitTestXmlConfigurationParser();
Configuration c = parser.parseFile(DEFAULT_CONFIGURATION_FILE, mode);
@@ -62,6 +67,12 @@
c.setTransactionManagerLookupClass(TransactionSetup.getManagerLookup());
+ if (mode != CacheMode.LOCAL && killable)
+ {
+ String clusterConfig = c.getClusterConfig();
+ c.setClusterConfig(injectDiscard(clusterConfig, 0, 0));
+ }
+
return c;
}
@@ -78,17 +89,17 @@
protected static CacheLoaderConfig getSingleCacheLoaderConfig(boolean passivation,
String preload, String cacheloaderClass, String properties, boolean async, boolean
fetchPersistentState, boolean shared, boolean purgeOnStartup) throws Exception
{
String xml = "<config>\n" +
- "<passivation>" + passivation +
"</passivation>\n" +
- "<preload>" + preload + "</preload>\n"
+
- "<cacheloader>\n" +
- "<class>" + cacheloaderClass +
"</class>\n" +
- "<properties>" + properties +
"</properties>\n" +
- "<async>" + async + "</async>\n" +
- "<shared>" + shared + "</shared>\n" +
- "<fetchPersistentState>" + fetchPersistentState +
"</fetchPersistentState>\n" +
- "<purgeOnStartup>" + purgeOnStartup +
"</purgeOnStartup>\n" +
- "</cacheloader>\n" +
- "</config>";
+ "<passivation>" + passivation +
"</passivation>\n" +
+ "<preload>" + preload + "</preload>\n" +
+ "<cacheloader>\n" +
+ "<class>" + cacheloaderClass + "</class>\n"
+
+ "<properties>" + properties +
"</properties>\n" +
+ "<async>" + async + "</async>\n" +
+ "<shared>" + shared + "</shared>\n" +
+ "<fetchPersistentState>" + fetchPersistentState +
"</fetchPersistentState>\n" +
+ "<purgeOnStartup>" + purgeOnStartup +
"</purgeOnStartup>\n" +
+ "</cacheloader>\n" +
+ "</config>";
Element element = XmlHelper.stringToElement(xml);
return XmlConfigurationParser.parseCacheLoaderConfig(element);
}
@@ -97,6 +108,7 @@
* Helper method that takes a <b>JGroups</b> configuration file and
creates an old-style JGroups config {@link String} that can be used
* in {@link org.jboss.cache.config.Configuration#setClusterConfig(String)}. Note
that expressions
* in the file - such as <tt>${jgroups.udp.mcast_port:45588}</tt> are
expanded out accordingly.
+ *
* @param url url to the cfg file
* @return a String
*/
@@ -123,17 +135,33 @@
/**
* Takes a JGroups configuration "old-style" String and injects the
"DELAY" protcol.
+ *
* @param jgroupsConfigString JGroups config string
- * @param incomingDelay incoming delay
- * @param outgoingDelay outgoing delay
+ * @param incomingDelay incoming delay
+ * @param outgoingDelay outgoing delay
* @return new string
*/
public static String injectDelay(String jgroupsConfigString, int incomingDelay, int
outgoingDelay)
{
- String delay = ":DELAY(in_delay=" +incomingDelay+
";out_delay="+outgoingDelay+")";
+ String delay = ":DELAY(in_delay=" + incomingDelay +
";out_delay=" + outgoingDelay + ")";
return jgroupsConfigString.substring(0, jgroupsConfigString.indexOf(":"))
+ delay + jgroupsConfigString.substring(jgroupsConfigString.indexOf(":"));
}
+ /**
+ * Takes a JGroups configuration "old-style" String and injects the
"DISCARD" protcol.
+ *
+ * @param jgroupsConfigString JGroups config string
+ * @param up factor of incoming messages to discard. 0 is none, 1 is
all.
+ * @param down factor of outgoing messages to discard. 0 is none, 1 is
all.
+ * @return new string
+ */
+ public static String injectDiscard(String jgroupsConfigString, double up, double
down)
+ {
+ String delay = ":DISCARD(up=" + up + ";down=" + down +
")";
+ return jgroupsConfigString.substring(0, jgroupsConfigString.indexOf(":"))
+ delay + jgroupsConfigString.substring(jgroupsConfigString.indexOf(":"));
+ }
+
+
public static EvictionRegionConfig buildLruEvictionRegionConfig(String regionNaame,
int maxNodes, int timeToLive)
{
EvictionRegionConfig erc = new EvictionRegionConfig();
@@ -141,7 +169,7 @@
LRUConfiguration lruConfig = new LRUConfiguration();
lruConfig.setEvictionPolicyClass("org.jboss.cache.eviction.LRUPolicy");
if (maxNodes >= 0) lruConfig.setMaxNodes(maxNodes);
- if (timeToLive >= 0) lruConfig.setTimeToLiveSeconds(timeToLive);
+ if (timeToLive >= 0) lruConfig.setTimeToLiveSeconds(timeToLive);
erc.setEvictionPolicyConfig(lruConfig);
return erc;
}
Modified: core/trunk/src/test/java/org/jboss/cache/misc/TestingUtil.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/misc/TestingUtil.java 2008-04-07 14:46:32 UTC
(rev 5513)
+++ core/trunk/src/test/java/org/jboss/cache/misc/TestingUtil.java 2008-04-07 21:16:42 UTC
(rev 5514)
@@ -188,12 +188,17 @@
*/
public static void blockUntilViewReceived(CacheSPI cache, int groupSize, long
timeout)
{
+ blockUntilViewReceived(cache, groupSize, timeout, true);
+ }
+
+ public static void blockUntilViewReceived(CacheSPI cache, int groupSize, long timeout,
boolean barfIfTooManyMembersInView)
+ {
long failTime = System.currentTimeMillis() + timeout;
while (System.currentTimeMillis() < failTime)
{
sleepThread(100);
- if (isCacheViewComplete(cache, groupSize))
+ if (isCacheViewComplete(cache, groupSize, barfIfTooManyMembersInView))
{
return;
}
@@ -260,7 +265,7 @@
/**
* Checks each cache to see if the number of elements in the array
- * returned by {@link CacheImpl#getMembers()} matches the size of
+ * returned by {@link org.jboss.cache.RPCManager#getMembers()} matches the size of
* the <code>caches</code> parameter.
*
* @param caches caches that should form a View
@@ -495,13 +500,12 @@
/**
* For testing only - introspects a cache and extracts the ComponentRegistry
*
- * @param cache cache to introspect
+ * @param ci cache to introspect
* @return component registry
*/
public static ComponentRegistry extractComponentRegistry(CacheImpl ci)
{
- ComponentRegistry cr = (ComponentRegistry) extractField(ci,
"componentRegistry");
- return cr;
+ return (ComponentRegistry) extractField(ci, "componentRegistry");
}