[jboss-cvs] JBossCache/src/org/jboss/cache/statetransfer ...
Vladmir Blagojevic
vladimir.blagojevic at jboss.com
Thu Dec 21 16:41:10 EST 2006
User: vblagojevic
Date: 06/12/21 16:41:10
Modified: src/org/jboss/cache/statetransfer
DefaultStateTransferGenerator.java
DefaultStateTransferIntegrator.java
StateTransferManager.java
Log:
final state transfer polishing (all tests pass now)
Revision Changes Path
1.7 +5 -17 JBossCache/src/org/jboss/cache/statetransfer/DefaultStateTransferGenerator.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: DefaultStateTransferGenerator.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/DefaultStateTransferGenerator.java,v
retrieving revision 1.6
retrieving revision 1.7
diff -u -b -r1.6 -r1.7
--- DefaultStateTransferGenerator.java 14 Dec 2006 17:18:48 -0000 1.6
+++ DefaultStateTransferGenerator.java 21 Dec 2006 21:41:10 -0000 1.7
@@ -8,7 +8,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.jboss.cache.DataNode;
import org.jboss.cache.Fqn;
import org.jboss.cache.Node;
import org.jboss.cache.TreeCache;
@@ -43,8 +42,6 @@
boolean generatePersistent, boolean suppressErrors) throws Throwable
{
Fqn fqn = rootNode.getFqn();
- Throwable encouteredException = null;
-
try
{
out.writeShort(STATE_TRANSFER_VERSION);
@@ -106,17 +103,8 @@
}
catch (Throwable t)
{
- encouteredException = t;
- log.error("failed writing state", t);
out.writeObject(new NodeDataExceptionMarker(t, cache.getLocalAddress()));
- }
- finally
- {
- out.close();
- if (encouteredException != null && !suppressErrors)
- {
- throw encouteredException;
- }
+ throw t;
}
}
@@ -158,7 +146,7 @@
for (Iterator it = children.entrySet().iterator(); it.hasNext();)
{
Map.Entry entry = (Map.Entry) it.next();
- marshallTransientState((DataNode) entry.getValue(), out);
+ marshallTransientState((Node) entry.getValue(), out);
}
}
1.9 +19 -29 JBossCache/src/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: DefaultStateTransferIntegrator.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java,v
retrieving revision 1.8
retrieving revision 1.9
diff -u -b -r1.8 -r1.9
--- DefaultStateTransferIntegrator.java 20 Dec 2006 22:28:13 -0000 1.8
+++ DefaultStateTransferIntegrator.java 21 Dec 2006 21:41:10 -0000 1.9
@@ -9,10 +9,10 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.cache.CacheException;
-import org.jboss.cache.DataNode;
import org.jboss.cache.Fqn;
import org.jboss.cache.Node;
import org.jboss.cache.TreeCache;
+import org.jboss.cache.TreeNode;
import org.jboss.cache.buddyreplication.BuddyManager;
import org.jboss.cache.factories.NodeFactory;
import org.jboss.cache.loader.CacheLoader;
@@ -53,20 +53,10 @@
public void integrateState(ObjectInputStream ois, Node target, ClassLoader cl) throws Exception
{
- Throwable cause = null;
- try
- {
integrateTransientState(ois, target, cl);
integrateAssociatedState(ois);
integratePersistentState(ois);
}
- catch (Throwable t)
- {
- cause = t;
- log.error("Failed integrating state.", t);
- throw new Exception("State transfer failed ", cause);
- }
- }
protected void integrateTransientState(ObjectInputStream in, Node target, ClassLoader cl) throws Exception
{
@@ -97,7 +87,7 @@
// Clear any existing state from the targetRoot
log.warn("transient state integration failed, removing all children of " + target);
target.clearData();
- ((DataNode) target).removeChildren();
+ ((TreeNode) target).removeChildren();
}
resetClassLoader(oldCL);
@@ -197,7 +187,7 @@
*/
private void notifyAllNodesCreated(Node curr)
{
- DataNode n;
+ TreeNode n;
if (curr == null) return;
getCache().getNotifier().notifyNodeCreated(curr.getFqn(), true);
@@ -205,7 +195,7 @@
Set children = curr.getChildren();
for (Iterator it = children.iterator(); it.hasNext();)
{
- n = (DataNode) it.next();
+ n = (TreeNode) it.next();
notifyAllNodesCreated(n);
}
}
@@ -234,7 +224,7 @@
{
Set retainedNodes = retainInternalNodes(target);
- ((DataNode) target).removeChildren();
+ ((TreeNode) target).removeChildren();
// Read the first NodeData and integrate into our target
NodeData nd = readNode(in);
@@ -242,7 +232,7 @@
//are there any transient nodes at all?
if (nd != null && !nd.isMarker())
{
- target.put(nd.getAttributes(), true);
+ target.put(nd.getAttributes());
// Check whether this is an integration into the buddy backup
// subtree
@@ -301,10 +291,10 @@
name = fqn.get(size - 1);
- // We handle this NodeData. Create a DataNode and
+ // We handle this NodeData. Create a TreeNode and
// integrate its data
Node target = factory.createDataNode(nodeType, name, fqn, parent, nd.getAttributes(), false, null, cache.getCacheSPI());
- ((DataNode) parent).addChild(name, target);
+ ((TreeNode) parent).addChild(name, target);
// Recursively call, which will walk down the tree
// and return the next NodeData that's a child of our parent
@@ -322,7 +312,7 @@
Fqn internalFqn = (Fqn) it.next();
if (internalFqn.isChildOf(targetFqn))
{
- DataNode internalNode = getInternalNode(target, internalFqn);
+ TreeNode internalNode = getInternalNode(target, internalFqn);
if (internalNode != null)
{
result.add(internalNode);
@@ -333,10 +323,10 @@
return result;
}
- private DataNode getInternalNode(Node parent, Fqn internalFqn)
+ private TreeNode getInternalNode(Node parent, Fqn internalFqn)
{
Object name = internalFqn.get(parent.getFqn().size());
- DataNode result = (DataNode) parent.getChild(new Fqn(name));
+ TreeNode result = (TreeNode) parent.getChild(new Fqn(name));
if (result != null)
{
if (internalFqn.size() < result.getFqn().size())
@@ -353,7 +343,7 @@
Fqn rootFqn = root.getFqn();
for (Iterator it = retainedNodes.iterator(); it.hasNext();)
{
- DataNode retained = (DataNode) it.next();
+ TreeNode retained = (TreeNode) it.next();
if (retained.getFqn().isChildOf(rootFqn))
{
integrateRetainedNode(root, retained);
@@ -361,17 +351,17 @@
}
}
- private void integrateRetainedNode(Node ancestor, DataNode descendant)
+ private void integrateRetainedNode(Node ancestor, TreeNode descendant)
{
Fqn descFqn = descendant.getFqn();
Fqn ancFqn = ancestor.getFqn();
Object name = descFqn.get(ancFqn.size());
- DataNode child = (DataNode) ancestor.getChild(new Fqn(name));
+ TreeNode child = (TreeNode) ancestor.getChild(new Fqn(name));
if (ancFqn.size() == descFqn.size() + 1)
{
if (child == null)
{
- ((DataNode) ancestor).addChild(name, descendant);
+ ((TreeNode) ancestor).addChild(name, descendant);
}
else
{
@@ -386,7 +376,7 @@
// This shouldn't really happen -- internal fqns should
// be immediately under the root
child = factory.createDataNode(nodeType, name, new Fqn(ancFqn, name), ancestor, null, true, null, cache.getCacheSPI());
- ((DataNode) ancestor).addChild(name, child);
+ ((TreeNode) ancestor).addChild(name, child);
}
// Keep walking down the tree
1.19 +42 -20 JBossCache/src/org/jboss/cache/statetransfer/StateTransferManager.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: StateTransferManager.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/StateTransferManager.java,v
retrieving revision 1.18
retrieving revision 1.19
diff -u -b -r1.18 -r1.19
--- StateTransferManager.java 20 Dec 2006 22:28:13 -0000 1.18
+++ StateTransferManager.java 21 Dec 2006 21:41:10 -0000 1.19
@@ -8,6 +8,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.jboss.cache.CacheException;
import org.jboss.cache.Fqn;
import org.jboss.cache.Node;
import org.jboss.cache.TreeCache;
@@ -67,7 +68,7 @@
VersionAwareMarshaller marshaller = cache.getMarshaller();
// can't give state for regions currently being activated/inactivated
- boolean canProvideState = !(marshaller.isInactive(fqn.toString()) || cache.findNode(fqn) == null);
+ boolean canProvideState = (!marshaller.isInactive(fqn.toString()) && cache.findNode(fqn) != null);
boolean fetchTransientState = cache.getConfiguration().isFetchInMemoryState();
CacheLoaderManager cacheLoaderManager = cache.getCacheLoaderManager();
@@ -96,12 +97,31 @@
else
{
out.writeBoolean(false);
+ Exception e = null;
+ if(!canProvideState)
+ {
+ String exceptionMessage = "Cache instance at " + cache.getLocalAddress() + " cannot provide state for fqn " + fqn +".";
+
+ if(marshaller.isInactive(fqn.toString()))
+ {
+ exceptionMessage += " Region for fqn " + fqn + " is inactive.";
+ }
+ if(cache.findNode(fqn) == null)
+ {
+ exceptionMessage += " There is no cache node at fqn " + fqn;
+ }
+ e = new CacheException(exceptionMessage);
+ }
+ if(!fetchPersistentState && !fetchTransientState)
+ {
+ e = new CacheException("Cache instance at " + cache.getLocalAddress() + " is not configured to provide state");
+ }
+ out.writeObject(e);
+ throw e;
}
}
/**
- * TODO change this javadoc
- * <p/>
* Requests state from each of the given source nodes in the cluster
* until it gets it or no node replies with a timeout exception. If state
* is returned, integrates it into the given DataNode. If no state is
@@ -132,14 +152,14 @@
* is up to the caller to lock <code>targetRoot</code> before calling
* this method.
*
- * @param state a serialized byte[][] array where element 0 is the
+ * @param in a serialized byte[][] array where element 0 is the
* transient state (or null) , and element 1 is the
* persistent state (or null)
* @param targetRoot fqn of the node into which the state should be integrated
* @param cl classloader to use to unmarshal the state, or
* <code>null</code> if the TCCL should be used
*/
- public void setState(ObjectInputStream state, Fqn targetRoot, ClassLoader cl) throws Exception
+ public void setState(ObjectInputStream in, Fqn targetRoot, ClassLoader cl) throws Exception
{
TreeCache cache = getTreeCache();
Node target = cache.findNode(targetRoot);
@@ -150,7 +170,16 @@
cache.put(targetRoot, null);
target = cache.findNode(targetRoot);
}
- setState(state, target, cl);
+ boolean hasState = in.readBoolean();
+ if(hasState)
+ {
+ setState(in, target, cl);
+ }
+ else
+ {
+ throw new CacheException("Cache instance at " + cache.getLocalAddress()
+ + " cannot integrate state since state provider could not provide state due to " + in.readObject());
+ }
}
/**
@@ -172,13 +201,13 @@
private void setState(ObjectInputStream state, Node targetRoot, ClassLoader cl) throws Exception
{
Object owner = getOwnerForLock();
+ long timeout = cache.getConfiguration().getInitialStateRetrievalTimeout();
long startTime = System.currentTimeMillis();
try
{
// Acquire a lock on the root node
- acquireLocksForStateTransfer(targetRoot, owner, getTreeCache().getConfiguration()
- .getInitialStateRetrievalTimeout(), true, true);
+ acquireLocksForStateTransfer(targetRoot, owner, timeout, true, true);
/*
* Vladimir/Manik/Brian (Dec 7,2006)
@@ -193,18 +222,11 @@
option.setBypassInterceptorChain(true);
cache.getInvocationContext().setOptionOverrides(option);
- try
- {
StateTransferIntegrator integrator = getStateTransferIntegrator(state, targetRoot.getFqn());
log.info("starting state integration at node " + targetRoot);
integrator.integrateState(state, targetRoot, cl);
log.info("successfully integrated state in " + (System.currentTimeMillis() - startTime) + " msec");
}
- catch (Throwable t)
- {
- log.error("failed integrating state", t);
- }
- }
finally
{
releaseStateTransferLocks(targetRoot, owner, true);
More information about the jboss-cvs-commits
mailing list