Author: mircea.markus
Date: 2009-09-23 11:28:49 -0400 (Wed, 23 Sep 2009)
New Revision: 8236
Modified:
core/trunk/pom.xml
core/trunk/src/main/java/org/jboss/cache/interceptors/CacheLoaderInterceptor.java
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferManager.java
core/trunk/src/test/java/org/jboss/cache/statetransfer/NonBlockingStateTransferTest.java
Log:
fix NBST tests and integration with jgroups2.6.13
Modified: core/trunk/pom.xml
===================================================================
--- core/trunk/pom.xml 2009-09-17 15:16:06 UTC (rev 8235)
+++ core/trunk/pom.xml 2009-09-23 15:28:49 UTC (rev 8236)
@@ -28,7 +28,7 @@
<groupId>jgroups</groupId>
<artifactId>jgroups</artifactId>
<!--<version>2.8.0.Beta2</version>-->
- <version>2.6.9.GA</version>
+ <version>2.6.13.CR2</version>
</dependency>
<!--
Modified:
core/trunk/src/main/java/org/jboss/cache/interceptors/CacheLoaderInterceptor.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/interceptors/CacheLoaderInterceptor.java 2009-09-17
15:16:06 UTC (rev 8235)
+++
core/trunk/src/main/java/org/jboss/cache/interceptors/CacheLoaderInterceptor.java 2009-09-23
15:28:49 UTC (rev 8236)
@@ -321,9 +321,9 @@
n = helper.wrapNodeForWriting(ctx, fqn, true, true, true, false, false);
n.setDataLoaded(false);
}
- if (nodeData != null && n != null)
+ if (nodeData != null)
{
- nodeData = setNodeState(ctx, fqn, n, nodeData);
+ setNodeState(ctx, fqn, n, nodeData);
}
}
}
@@ -460,7 +460,7 @@
* If it doesn't exist on disk but in memory, clears the
* uninitialized flag, otherwise returns null.
*/
- private Map setNodeState(InvocationContext ctx, Fqn fqn, NodeSPI n, Map nodeData)
throws Exception
+ private void setNodeState(InvocationContext ctx, Fqn fqn, NodeSPI n, Map nodeData)
throws Exception
{
if (trace) log.trace("setNodeState node is " + n);
if (nodeData != null)
@@ -498,7 +498,6 @@
if (trace) log.trace("Setting dataLoaded to true");
n.setDataLoaded(true);
}
- return nodeData;
}
private Map loadData(InvocationContext ctx, Fqn fqn) throws Exception
Modified:
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java 2009-09-17
15:16:06 UTC (rev 8235)
+++
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java 2009-09-23
15:28:49 UTC (rev 8236)
@@ -349,9 +349,11 @@
{
prepareContextOptions();
NodeSPI targetNode = cache.getNode(target);
+ if (trace) log.trace("Target node has following children: " +
targetNode.getChildrenNames());
for (Object childname : targetNode.getChildrenNames())
{
prepareContextOptions();
+ if (trace) log.trace("Removing child: " + childname);
targetNode.removeChild(childname);
}
@@ -397,6 +399,7 @@
// read marker off stack
// cache.getMarshaller().objectFromObjectStream(in);
}
+ if (trace) log.trace("Finished integrating transient state");
}
@SuppressWarnings("unchecked")
@@ -412,7 +415,9 @@
throw new CacheException(cause);
}
if (obj instanceof NodeDataMarker) return null;
+ if (trace) log.trace("Data readed from stream is: " + obj);
+
return (List<NodeData>) obj;
}
@@ -427,6 +432,7 @@
while (nd != null && !nd.isMarker())
{
fqn = nd.getFqn();
+ if (trace) log.trace("Integrating state for children: " + fqn);
// If we need to integrate into the buddy backup subtree,
// change the Fqn to fit under it
if (offset > 0)
Modified:
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferManager.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferManager.java 2009-09-17
15:16:06 UTC (rev 8235)
+++
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferManager.java 2009-09-23
15:28:49 UTC (rev 8236)
@@ -147,6 +147,7 @@
NodeSPI target = cache.getNode(targetRoot);
if (target == null)
{
+ if(trace) log.trace("Target node not found, creating it");
// Create the integration root, but do not replicate
cache.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
@@ -158,6 +159,7 @@
}
Object o = marshaller.objectFromObjectStream(in);
Boolean hasState = (Boolean) o;
+ if (trace) log.trace("Do we have state to integrate? " + hasState);
if (hasState)
{
setState(in, target);
Modified:
core/trunk/src/test/java/org/jboss/cache/statetransfer/NonBlockingStateTransferTest.java
===================================================================
---
core/trunk/src/test/java/org/jboss/cache/statetransfer/NonBlockingStateTransferTest.java 2009-09-17
15:16:06 UTC (rev 8235)
+++
core/trunk/src/test/java/org/jboss/cache/statetransfer/NonBlockingStateTransferTest.java 2009-09-23
15:28:49 UTC (rev 8236)
@@ -17,6 +17,7 @@
import org.jboss.cache.config.Configuration;
import org.jboss.cache.config.Configuration.CacheMode;
import org.jboss.cache.factories.UnitTestConfigurationFactory;
+import org.jboss.cache.util.CachePrinter;
import org.jboss.cache.util.TestingUtil;
import static org.testng.AssertJUnit.assertEquals;
import org.testng.annotations.Test;
@@ -82,7 +83,9 @@
private final Cache<Object,Object> cache;
private final boolean tx;
private volatile boolean stop;
- private volatile int result;
+ private volatile int end;
+ private volatile int start = 0;
+ private int cleanupRange = 1000;
WritingRunner(Cache<Object, Object> cache, boolean tx)
{
@@ -90,29 +93,48 @@
this.tx = tx;
}
- public int result()
+ WritingRunner(Cache<Object, Object> cache, boolean tx, int cleanupRange)
{
- return result;
+ this.cache = cache;
+ this.tx = tx;
+ this.cleanupRange = cleanupRange;
}
+ public synchronized int end()
+ {
+ return end;
+ }
+
+ public synchronized int start()
+ {
+ return start;
+ }
+
public void run()
{
- int c = 0;
+ end = 0;
while (!stop)
{
try
{
- if (c == 1000)
+ if (end == cleanupRange)
{
startTxIfNeeded();
- for (int i=0; i<1000; i++) cache.removeNode("/test" + i);
- commitTxIfNeeded();
- c = 0;
+ for (int i=0; i<cleanupRange; i++) {
+ start = i + 1;
+ cache.removeNode("/test" + i);
+ commitTxIfNeeded();
+ }
+ synchronized (this)
+ {
+ start = 0;
+ end = 0;
+ }
}
else
{
startTxIfNeeded();
- cache.put("/test" + c, "test", c++);
+ cache.put("/test" + start, "test", start++);
commitTxIfNeeded();
}
}
@@ -127,7 +149,6 @@
stop = true;
}
}
- result = c;
}
private void startTxIfNeeded() throws Exception
@@ -327,9 +348,12 @@
try
{
cache1 = createCache(name);
+ writeInitialData(cache1);
+
cache3 = createCache(name);
+ TestingUtil.blockUntilViewsReceived(new CacheSPI[] { cache1, cache3 }, 10000);
+ verifyInitialData(cache3);
- writeInitialData(cache1);
// Delay the transient copy, so that we get a more thorough log test
cache1.put("/delay", "delay", new DelayTransfer());
@@ -350,9 +374,12 @@
verifyInitialData(cache2);
- int count = writer.result();
+ int end = writer.end();
+ int start = writer.start();
- for (int c = 0; c < count; c++)
+ log.trace("Cache content is: " +
CachePrinter.printCacheDetails(cache2));
+
+ for (int c = start; c < end; c++)
assertEquals(c, cache2.get("/test" + c, "test"));
}
finally
@@ -389,7 +416,7 @@
// Delay the transient copy, so that we get a more thorough log test
cache1.put("/delay", "delay", new DelayTransfer());
- WritingRunner writer = new WritingRunner(cache1, tx);
+ WritingRunner writer = new WritingRunner(cache1, tx, 500);
Thread writerThread = new Thread(writer);
writerThread.setDaemon(true);
writerThread.start();
@@ -405,9 +432,10 @@
verifyInitialData(cache2);
- int count = writer.result();
+ int start = writer.start();
+ int end = writer.end();
- for (int c = 0; c < count; c++)
+ for (int c = start; c < end; c++)
assertEquals(c, cache2.get("/test" + c, "test"));
}
finally