Author: manik.surtani(a)jboss.com
Date: 2008-07-30 13:13:20 -0400 (Wed, 30 Jul 2008)
New Revision: 6446
Modified:
core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java
core/trunk/src/main/java/org/jboss/cache/loader/AbstractCacheLoader.java
core/trunk/src/main/java/org/jboss/cache/loader/CacheLoaderManager.java
core/trunk/src/main/java/org/jboss/cache/remoting/jgroups/ChannelMessageListener.java
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferGenerator.java
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferManager.java
core/trunk/src/main/java/org/jboss/cache/statetransfer/LegacyStateTransferIntegrator.java
core/trunk/src/main/java/org/jboss/cache/statetransfer/LegacyStateTransferManager.java
core/trunk/src/main/java/org/jboss/cache/statetransfer/StateTransferIntegrator.java
core/trunk/src/test/java/org/jboss/cache/statetransfer/StateTransfer200Test.java
core/trunk/src/test/java/org/jboss/cache/statetransfer/StateTransferTestBase.java
Log:
Fixed funky state transfer brittleness
Modified: core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java 2008-07-30
16:06:38 UTC (rev 6445)
+++ core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java 2008-07-30
17:13:20 UTC (rev 6446)
@@ -51,6 +51,7 @@
import java.io.ByteArrayInputStream;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -297,7 +298,10 @@
{
// need to get the root DIRECTLY. cache.getRoot() will pass a call up the
interceptor chain and we will
// have a problem with the cache not being started.
- dataContainer.getRoot().addChildDirect(BUDDY_BACKUP_SUBTREE_FQN);
+
+
cache.getInvocationContext().getOptionOverrides().setSkipCacheStatusCheck(true);
+ cache.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
+ cache.put(BUDDY_BACKUP_SUBTREE_FQN, (Map) Collections.emptyMap());
}
// allow waiting threads to process.
Modified: core/trunk/src/main/java/org/jboss/cache/loader/AbstractCacheLoader.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/loader/AbstractCacheLoader.java 2008-07-30
16:06:38 UTC (rev 6445)
+++ core/trunk/src/main/java/org/jboss/cache/loader/AbstractCacheLoader.java 2008-07-30
17:13:20 UTC (rev 6446)
@@ -85,6 +85,7 @@
if (objectFromStream instanceof NodeDataMarker)
{
// no persistent state sent across; return?
+ if (trace) log.trace("Empty persistent stream?");
return;
}
if (objectFromStream instanceof NodeDataExceptionMarker)
@@ -113,7 +114,11 @@
for (Object aNodeData : nodeData)
{
NodeData nd = (NodeData) aNodeData;
- if (nd.isMarker()) break;
+ if (nd.isMarker())
+ {
+ if (trace) log.trace("Reached delimiter; exiting loop");
+ break;
+ }
Fqn fqn;
if (moveToBuddy)
{
@@ -123,7 +128,7 @@
{
fqn = nd.getFqn();
}
-
+ if (trace) log.trace("Storing state in Fqn " + fqn);
if (nd.getAttributes() != null)
{
this.put(fqn, nd.getAttributes(), true);// creates a node with 0 or more
attributes
Modified: core/trunk/src/main/java/org/jboss/cache/loader/CacheLoaderManager.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/loader/CacheLoaderManager.java 2008-07-30
16:06:38 UTC (rev 6445)
+++ core/trunk/src/main/java/org/jboss/cache/loader/CacheLoaderManager.java 2008-07-30
17:13:20 UTC (rev 6446)
@@ -451,6 +451,7 @@
{
throw new CacheException("Unable to start cache loaders", e);
}
+ fetchPersistentState |= loader.getConfig().isFetchPersistentState();
}
}
Modified:
core/trunk/src/main/java/org/jboss/cache/remoting/jgroups/ChannelMessageListener.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/remoting/jgroups/ChannelMessageListener.java 2008-07-30
16:06:38 UTC (rev 6445)
+++
core/trunk/src/main/java/org/jboss/cache/remoting/jgroups/ChannelMessageListener.java 2008-07-30
17:13:20 UTC (rev 6446)
@@ -35,7 +35,8 @@
*/
protected volatile Exception setStateException;
private final Object stateLock = new Object();
- private Log log = LogFactory.getLog(ChannelMessageListener.class);
+ private static final Log log = LogFactory.getLog(ChannelMessageListener.class);
+ private static final boolean trace = log.isTraceEnabled();
private StateTransferManager stateTransferManager;
private Configuration configuration;
/**
@@ -43,6 +44,7 @@
*/
private volatile boolean isStateSet = false;
+
@Inject
private void injectDependencies(StateTransferManager stateTransferManager,
Configuration configuration)
{
@@ -183,6 +185,7 @@
public byte[] getState(String state_id)
{
+ if (trace) log.trace("Getting state for state id " + state_id);
MarshalledValueOutputStream out = null;
String sourceRoot = state_id;
byte[] result;
@@ -233,6 +236,7 @@
public void getState(String state_id, OutputStream ostream)
{
+ if (trace) log.trace("Getting state for state id " + state_id);
String sourceRoot = state_id;
MarshalledValueOutputStream out = null;
boolean hasDifferentSourceAndIntegrationRoots =
state_id.indexOf(DefaultStateTransferManager.PARTIAL_STATE_DELIMITER) > 0;
@@ -286,6 +290,7 @@
public void setState(String state_id, byte[] state)
{
+ if (trace) log.trace("Receiving state for " + state_id);
if (state == null)
{
log.debug("partial transferred state is null");
@@ -333,7 +338,7 @@
public void setState(String stateId, InputStream istream)
{
- if (log.isTraceEnabled()) log.trace("**** Receiving state for " +
stateId);
+ if (trace) log.trace("Receiving state for " + stateId);
String targetRoot = stateId;
MarshalledValueInputStream in = null;
boolean hasDifferentSourceAndIntegrationRoots =
stateId.indexOf(DefaultStateTransferManager.PARTIAL_STATE_DELIMITER) > 0;
Modified:
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferGenerator.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferGenerator.java 2008-07-30
16:06:38 UTC (rev 6445)
+++
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferGenerator.java 2008-07-30
17:13:20 UTC (rev 6446)
@@ -43,7 +43,7 @@
this.cache = cache;
}
- @Start(priority = 18)
+ @Start(priority = 14)
private void start()
{
this.internalFqns = cache.getInternalFqns();
Modified:
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java 2008-07-30
16:06:38 UTC (rev 6445)
+++
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java 2008-07-30
17:13:20 UTC (rev 6446)
@@ -39,7 +39,8 @@
public class DefaultStateTransferIntegrator implements StateTransferIntegrator
{
- protected Log log = LogFactory.getLog(getClass().getName());
+ private static final Log log =
LogFactory.getLog(DefaultStateTransferIntegrator.class);
+ private static final boolean trace = log.isTraceEnabled();
private CacheSPI cache;
@@ -47,6 +48,7 @@
private Set<Fqn> internalFqns;
+
@Inject
public void inject(CacheSPI<?, ?> cache, NodeFactory nodefactory)
{
@@ -60,14 +62,18 @@
this.internalFqns = cache.getInternalFqns();
}
- public void integrateState(ObjectInputStream ois, Object target, Fqn targetRoot)
throws Exception
+ public void integrateState(ObjectInputStream ois, Object target, Fqn targetRoot,
boolean integratePersistentState) throws Exception
{
// pop version from the stream first!
short version = (Short) cache.getMarshaller().objectFromObjectStream(ois);
log.info("Using version " + version);
integrateTransientState(ois, (InternalNode) target);
- integrateAssociatedState(ois);
- integratePersistentState(ois, targetRoot);
+ if (trace) log.trace("Reading marker for nonexistent associated state");
+ cache.getMarshaller().objectFromObjectStream(ois);
+ if (integratePersistentState)
+ {
+ integratePersistentState(ois, targetRoot);
+ }
}
protected void integrateTransientState(ObjectInputStream in, InternalNode target)
throws Exception
@@ -75,7 +81,7 @@
boolean transientSet = false;
try
{
- if (log.isTraceEnabled())
+ if (trace)
{
log.trace("integrating transient state for " + target);
}
@@ -84,7 +90,7 @@
transientSet = true;
- if (log.isTraceEnabled())
+ if (trace)
{
log.trace("transient state successfully integrated");
}
@@ -105,26 +111,13 @@
}
}
- /**
- * Provided for subclasses that deal with associated state.
- *
- * @throws Exception
- */
- protected void integrateAssociatedState(ObjectInputStream in) throws Exception
- {
- // no-op in this base class
- // just read marker
- cache.getMarshaller().objectFromObjectStream(in);
- }
-
protected void integratePersistentState(ObjectInputStream in, Fqn targetFqn) throws
Exception
{
-
CacheLoaderManager loaderManager = cache.getCacheLoaderManager();
CacheLoader loader = loaderManager == null ? null :
loaderManager.getCacheLoader();
if (loader == null)
{
- if (log.isTraceEnabled())
+ if (trace)
{
log.trace("cache loader is null, will not attempt to integrate
persistent state");
}
@@ -132,7 +125,7 @@
else
{
- if (log.isTraceEnabled())
+ if (trace)
{
log.trace("integrating persistent state using " +
loader.getClass().getName());
}
@@ -165,7 +158,7 @@
}
else
{
- if (log.isTraceEnabled())
+ if (trace)
{
log.trace("persistent state integrated successfully");
}
@@ -234,7 +227,7 @@
}
// read marker off stack
- cache.getMarshaller().objectFromObjectStream(in);
+// cache.getMarshaller().objectFromObjectStream(in);
}
}
Modified:
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferManager.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferManager.java 2008-07-30
16:06:38 UTC (rev 6445)
+++
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferManager.java 2008-07-30
17:13:20 UTC (rev 6446)
@@ -12,7 +12,6 @@
import org.jboss.cache.CacheSPI;
import org.jboss.cache.Fqn;
import org.jboss.cache.InternalNode;
-import org.jboss.cache.NodeFactory;
import org.jboss.cache.NodeSPI;
import org.jboss.cache.RegionEmptyException;
import org.jboss.cache.RegionManager;
@@ -34,6 +33,7 @@
public class DefaultStateTransferManager implements StateTransferManager
{
protected final static Log log =
LogFactory.getLog(DefaultStateTransferManager.class);
+ protected static final boolean trace = log.isTraceEnabled();
public static final NodeData STREAMING_DELIMITER_NODE = new NodeDataMarker();
@@ -47,25 +47,23 @@
boolean fetchTransientState;
boolean fetchPersistentState;
protected long stateRetrievalTimeout;
- private NodeFactory nodeFactory;
protected StateTransferIntegrator integrator;
protected StateTransferGenerator generator;
@Inject
- public void injectDependencies(CacheSPI cache, Marshaller marshaller, RegionManager
regionManager, Configuration configuration, CacheLoaderManager cacheLoaderManager,
NodeFactory nodeFactory, StateTransferIntegrator integrator, StateTransferGenerator
generator)
+ public void injectDependencies(CacheSPI cache, Marshaller marshaller, RegionManager
regionManager, Configuration configuration, CacheLoaderManager cacheLoaderManager,
StateTransferIntegrator integrator, StateTransferGenerator generator)
{
this.cache = cache;
this.regionManager = regionManager;
this.marshaller = marshaller;
this.configuration = configuration;
this.cacheLoaderManager = cacheLoaderManager;
- this.nodeFactory = nodeFactory;
this.integrator = integrator;
this.generator = generator;
}
- @Start(priority = 19)
+ @Start(priority = 14)
public void start()
{
fetchTransientState = configuration.isFetchInMemoryState();
@@ -77,19 +75,18 @@
{
// can't give state for regions currently being activated/inactivated
boolean canProvideState = (!regionManager.isInactive(fqn) &&
cache.peek(fqn, false) != null);
-
+ if (trace) log.trace("Can provide state? " + canProvideState);
if (canProvideState && (fetchPersistentState || fetchTransientState))
{
marshaller.objectToObjectStream(true, out);
long startTime = System.currentTimeMillis();
- InternalNode rootNode = cache.getRoot().getDelegationTarget();
+ InternalNode subtreeRoot = fqn.isRoot() ? cache.getRoot().getDelegationTarget()
: cache.getNode(fqn).getDelegationTarget();
// we don't need READ locks for MVCC based state transfer!
-
if (log.isDebugEnabled())
- log.debug("locking the " + fqn + " subtree to return the
in-memory (transient) state");
+ log.debug("Generating in-memory (transient) state for subtree " +
fqn);
- generator.generateState(out, rootNode, fetchTransientState,
fetchPersistentState, suppressErrors);
+ generator.generateState(out, subtreeRoot, fetchTransientState,
fetchPersistentState, suppressErrors);
if (log.isDebugEnabled())
log.debug("Successfully generated state in " +
(System.currentTimeMillis() - startTime) + " msec");
@@ -175,8 +172,9 @@
* to be directly stored into a tree since we bypass interceptor chain.
*
*/
- if (log.isDebugEnabled()) log.debug("starting state integration at node "
+ targetRoot);
- integrator.integrateState(state, targetRoot.getDelegationTarget(),
targetRoot.getFqn());
+ if (log.isDebugEnabled())
+ log.debug("starting state integration at node " + targetRoot + ".
Fetch Persistent State = " + fetchPersistentState);
+ integrator.integrateState(state, targetRoot.getDelegationTarget(),
targetRoot.getFqn(), fetchPersistentState);
if (log.isDebugEnabled())
log.debug("successfully integrated state in " +
(System.currentTimeMillis() - startTime) + " msec");
Modified:
core/trunk/src/main/java/org/jboss/cache/statetransfer/LegacyStateTransferIntegrator.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/statetransfer/LegacyStateTransferIntegrator.java 2008-07-30
16:06:38 UTC (rev 6445)
+++
core/trunk/src/main/java/org/jboss/cache/statetransfer/LegacyStateTransferIntegrator.java 2008-07-30
17:13:20 UTC (rev 6446)
@@ -63,7 +63,7 @@
this.internalFqns = cache.getInternalFqns();
}
- public void integrateState(ObjectInputStream ois, Object target, Fqn targetFqn) throws
Exception
+ public void integrateState(ObjectInputStream ois, Object target, Fqn targetFqn,
boolean integratePersistentState) throws Exception
{
// pop version from the stream first!
short version = (Short) cache.getMarshaller().objectFromObjectStream(ois);
@@ -72,7 +72,7 @@
// read another marker for the dummy associated state
if (trace) log.trace("Reading marker for nonexistent associated state");
cache.getMarshaller().objectFromObjectStream(ois);
- integratePersistentState(ois, targetFqn);
+ if (integratePersistentState) integratePersistentState(ois, targetFqn);
}
protected void integrateTransientState(ObjectInputStream in, NodeSPI target) throws
Exception
Modified:
core/trunk/src/main/java/org/jboss/cache/statetransfer/LegacyStateTransferManager.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/statetransfer/LegacyStateTransferManager.java 2008-07-30
16:06:38 UTC (rev 6445)
+++
core/trunk/src/main/java/org/jboss/cache/statetransfer/LegacyStateTransferManager.java 2008-07-30
17:13:20 UTC (rev 6446)
@@ -121,7 +121,7 @@
{
log.debug("starting state integration at node " + targetRoot);
}
- integrator.integrateState(state, targetRoot, targetRoot.getFqn());
+ integrator.integrateState(state, targetRoot, targetRoot.getFqn(),
fetchPersistentState);
if (log.isDebugEnabled())
{
log.debug("successfully integrated state in " +
(System.currentTimeMillis() - startTime) + " msec");
Modified:
core/trunk/src/main/java/org/jboss/cache/statetransfer/StateTransferIntegrator.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/statetransfer/StateTransferIntegrator.java 2008-07-30
16:06:38 UTC (rev 6445)
+++
core/trunk/src/main/java/org/jboss/cache/statetransfer/StateTransferIntegrator.java 2008-07-30
17:13:20 UTC (rev 6446)
@@ -16,5 +16,5 @@
public interface StateTransferIntegrator
{
// TODO: target is an Object to support both InternalNodes and NodeSPIs.
- void integrateState(ObjectInputStream ois, Object target, Fqn targetFqn) throws
Exception;
+ void integrateState(ObjectInputStream ois, Object target, Fqn targetFqn, boolean
integratePersistentState) throws Exception;
}
\ No newline at end of file
Modified:
core/trunk/src/test/java/org/jboss/cache/statetransfer/StateTransfer200Test.java
===================================================================
---
core/trunk/src/test/java/org/jboss/cache/statetransfer/StateTransfer200Test.java 2008-07-30
16:06:38 UTC (rev 6445)
+++
core/trunk/src/test/java/org/jboss/cache/statetransfer/StateTransfer200Test.java 2008-07-30
17:13:20 UTC (rev 6446)
@@ -11,12 +11,12 @@
import org.jboss.cache.CacheSPI;
import org.jboss.cache.Fqn;
import org.jboss.cache.Region;
-import org.jboss.cache.util.TestingUtil;
import org.jboss.cache.buddyreplication.BuddyFqnTransformer;
import org.jboss.cache.buddyreplication.BuddyManager;
import org.jboss.cache.config.BuddyReplicationConfig;
import org.jboss.cache.loader.CacheLoader;
import org.jboss.cache.marshall.MarshalledValue;
+import org.jboss.cache.util.TestingUtil;
import static org.testng.AssertJUnit.*;
import org.testng.annotations.Test;
@@ -407,7 +407,7 @@
public void testStalePersistentState() throws Exception
{
- CacheSPI c1 = createCache("1", true, false, true, false);
+ CacheSPI c1 = createCache("1", true, false, true, true);
c1.put(A, "K", "V");
assert c1.get(A, "K").equals("V");
@@ -423,10 +423,12 @@
assert l1.exists(A);
assert l1.get(A).get("K").equals("V");
- Cache c2 = createCache("2", true, false, true, false);
+ Cache c2 = createCache("2", true, false, true, true);
c2.put(B, "K", "V");
+ assert c1.getConfiguration().isFetchInMemoryState();
+ assert c1.getConfiguration().getCacheLoaderConfig().isFetchPersistentState();
c1.start();
assert c1.get(B, "K").equals("V");
Modified:
core/trunk/src/test/java/org/jboss/cache/statetransfer/StateTransferTestBase.java
===================================================================
---
core/trunk/src/test/java/org/jboss/cache/statetransfer/StateTransferTestBase.java 2008-07-30
16:06:38 UTC (rev 6445)
+++
core/trunk/src/test/java/org/jboss/cache/statetransfer/StateTransferTestBase.java 2008-07-30
17:13:20 UTC (rev 6446)
@@ -269,17 +269,12 @@
cache1.put(A_C, "name", BOB);
cache1.put(A_C, "age", FORTY);
- CacheSPI<Object, Object> cache2 = createCache("cache2", false,
false, cacheLoaderClass2, asyncLoader, false, true);
+ CacheSPI<Object, Object> cache2 = createCache("cache2", false,
false, cacheLoaderClass2, asyncLoader, true, true);
- cache2.start();
-
// Pause to give caches time to see each other
TestingUtil.blockUntilViewsReceived(new CacheSPI[]{cache1, cache2}, 60000);
- if (asyncLoader)
- {
- TestingUtil.sleepThread((long) 100);
- }
+ if (asyncLoader) TestingUtil.sleepThread(100);
CacheLoader loader = cache2.getCacheLoaderManager().getCacheLoader();