[jboss-cvs] JBossCache/src/org/jboss/cache/statetransfer ...
Manik Surtani
msurtani at jboss.com
Thu Dec 14 12:18:48 EST 2006
User: msurtani
Date: 06/12/14 12:18:48
Modified: src/org/jboss/cache/statetransfer
StateTransferGenerator.java
StateTransferManager.java
DefaultStateTransferGenerator.java
StateTransferIntegrator.java
DefaultStateTransferIntegrator.java
Log:
The beginnings of porting JBCACHE-871 and JBCACHE-875 as well as rearranging the Node/Cache object model to something sensible
Revision Changes Path
1.4 +5 -5 JBossCache/src/org/jboss/cache/statetransfer/StateTransferGenerator.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: StateTransferGenerator.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/StateTransferGenerator.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -b -r1.3 -r1.4
--- StateTransferGenerator.java 11 Sep 2006 21:53:19 -0000 1.3
+++ StateTransferGenerator.java 14 Dec 2006 17:18:48 -0000 1.4
@@ -6,14 +6,14 @@
*/
package org.jboss.cache.statetransfer;
-import java.io.ObjectOutputStream;
+import org.jboss.cache.Node;
-import org.jboss.cache.DataNode;
+import java.io.ObjectOutputStream;
public interface StateTransferGenerator
{
- public void generateState(ObjectOutputStream stream, DataNode rootNode, boolean generateTransient,
+ public void generateState(ObjectOutputStream stream, Node rootNode, boolean generateTransient,
boolean generatePersistent, boolean suppressErrors) throws Throwable;
}
\ No newline at end of file
1.17 +61 -49 JBossCache/src/org/jboss/cache/statetransfer/StateTransferManager.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: StateTransferManager.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/StateTransferManager.java,v
retrieving revision 1.16
retrieving revision 1.17
diff -u -b -r1.16 -r1.17
--- StateTransferManager.java 7 Dec 2006 22:21:48 -0000 1.16
+++ StateTransferManager.java 14 Dec 2006 17:18:48 -0000 1.17
@@ -6,14 +6,8 @@
*/
package org.jboss.cache.statetransfer;
-import java.io.ByteArrayInputStream;
-import java.io.InputStream;
-import java.io.ObjectInputStream;
-import java.io.OutputStream;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.jboss.cache.DataNode;
import org.jboss.cache.Fqn;
import org.jboss.cache.Node;
import org.jboss.cache.TreeCache;
@@ -28,6 +22,11 @@
import org.jboss.util.stream.MarshalledValueInputStream;
import org.jboss.util.stream.MarshalledValueOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.OutputStream;
+
public class StateTransferManager
{
protected final static Log log = LogFactory.getLog(StateTransferManager.class);
@@ -101,8 +100,10 @@
if (marshaller_.isInactive(fqn.toString()))
{
if (log.isDebugEnabled())
+ {
log.debug("ignoring _getState() for " + fqn + " as it is being activated/inactivated");
- if(usingStreamingStateTransfer)
+ }
+ if (usingStreamingStateTransfer)
{
MarshalledValueOutputStream out = new MarshalledValueOutputStream(os);
out.writeBoolean(false);
@@ -112,9 +113,11 @@
}
}
- DataNode rootNode = cache.findNode(fqn);
+ Node rootNode = cache.findNode(fqn);
if (rootNode == null)
+ {
return null;
+ }
boolean fetchTransientState = cache.getConfiguration().isFetchInMemoryState();
CacheLoaderManager cacheLoaderManager = cache.getCacheLoaderManager();
@@ -131,7 +134,7 @@
}
MarshalledValueOutputStream out = null;
- byte resultBuffer [] = new byte[0];
+ byte resultBuffer[] = new byte[0];
StateTransferGenerator generator = getStateTransferGenerator();
long startTime = System.currentTimeMillis();
if (usingStreamingStateTransfer)
@@ -142,12 +145,12 @@
}
else
{
- ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream(16*1024);
+ ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream(16 * 1024);
out = new MarshalledValueOutputStream(baos);
generator.generateState(out, rootNode, fetchTransientState, fetchPersistentState, suppressErrors);
resultBuffer = baos.getRawBuffer();
}
- log.info("Successfully generated state in " + (System.currentTimeMillis()-startTime) + " msec");
+ log.info("Successfully generated state in " + (System.currentTimeMillis() - startTime) + " msec");
return resultBuffer;
}
finally
@@ -162,9 +165,8 @@
}
/**
- *
* TODO change this javadoc
- *
+ * <p/>
* Requests state from each of the given source nodes in the cluster
* until it gets it or no node replies with a timeout exception. If state
* is returned, integrates it into the given DataNode. If no state is
@@ -183,7 +185,7 @@
Object[] sources, ClassLoader cl)
throws Exception
{
- treeCache.fetchPartialState(sources, subtreeRoot,integrationRoot.getFqn());
+ treeCache.fetchPartialState(sources, subtreeRoot, integrationRoot.getFqn());
}
/**
@@ -206,7 +208,7 @@
throws Exception
{
TreeCache cache = getTreeCache();
- DataNode target = cache.findNode(targetRoot);
+ Node target = cache.findNode(targetRoot);
if (target == null)
{
// Create the integration root, but do not replicate
@@ -234,7 +236,7 @@
* @param cl classloader to use to unmarshal the state, or
* <code>null</code> if the TCCL should be used
*/
- private void setState(Object state, DataNode targetRoot, ClassLoader cl)
+ private void setState(Object state, Node targetRoot, ClassLoader cl)
throws Exception
{
if (state == null)
@@ -254,7 +256,7 @@
true, true);
StateTransferIntegrator integrator = null;
- MarshalledValueInputStream in =null;
+ MarshalledValueInputStream in = null;
if (usingStreamTransfer)
{
in = (MarshalledValueInputStream) state;
@@ -284,8 +286,8 @@
try
{
log.info("starting state integration at node " + targetRoot);
- integrator.integrateState(in,targetRoot, cl);
- log.info("successfully integrated state in " + (System.currentTimeMillis()-startTime) + " msec");
+ integrator.integrateState(in, targetRoot, cl);
+ log.info("successfully integrated state in " + (System.currentTimeMillis() - startTime) + " msec");
}
catch (Throwable t)
{
@@ -303,7 +305,7 @@
/**
* Acquires locks on a root node for an owner for state transfer.
*/
- protected void acquireLocksForStateTransfer(DataNode root,
+ protected void acquireLocksForStateTransfer(Node root,
Object lockOwner,
long timeout,
boolean lockChildren,
@@ -313,10 +315,14 @@
try
{
if (lockChildren)
+ {
root.getNodeSPI().getLock().acquireAll(lockOwner, timeout, NodeLock.LockType.READ);
+ }
else
+ {
root.getNodeSPI().getLock().acquire(lockOwner, timeout, NodeLock.LockType.READ);
}
+ }
catch (TimeoutException te)
{
log.error("Caught TimeoutException acquiring locks on region " +
@@ -324,7 +330,7 @@
if (force)
{
// Until we have FLUSH in place, don't force locks
-// forceAcquireLock(root, lockOwner, lockChildren);
+ // forceAcquireLock(root, lockOwner, lockChildren);
throw te;
}
@@ -340,17 +346,21 @@
*
* @see #acquireLocksForStateTransfer
*/
- protected void releaseStateTransferLocks(DataNode root,
+ protected void releaseStateTransferLocks(Node root,
Object lockOwner,
boolean childrenLocked)
{
try
{
if (childrenLocked)
+ {
root.getNodeSPI().getLock().releaseAll(lockOwner);
+ }
else
+ {
root.getNodeSPI().getLock().release(lockOwner);
}
+ }
catch (Throwable t)
{
log.error("failed releasing locks", t);
@@ -375,7 +385,9 @@
{
Object owner = getTreeCache().getCurrentTransaction();
if (owner == null)
+ {
owner = Thread.currentThread();
+ }
return owner;
}
1.6 +34 -13 JBossCache/src/org/jboss/cache/statetransfer/DefaultStateTransferGenerator.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: DefaultStateTransferGenerator.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/DefaultStateTransferGenerator.java,v
retrieving revision 1.5
retrieving revision 1.6
diff -u -b -r1.5 -r1.6
--- DefaultStateTransferGenerator.java 20 Nov 2006 03:53:55 -0000 1.5
+++ DefaultStateTransferGenerator.java 14 Dec 2006 17:18:48 -0000 1.6
@@ -6,20 +6,21 @@
*/
package org.jboss.cache.statetransfer;
-import java.io.ObjectOutputStream;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.cache.DataNode;
import org.jboss.cache.Fqn;
+import org.jboss.cache.Node;
import org.jboss.cache.TreeCache;
import org.jboss.cache.Version;
import org.jboss.cache.loader.CacheLoader;
-import org.jboss.cache.loader.NodeDataExceptionMarker;
import org.jboss.cache.loader.NodeData;
+import org.jboss.cache.loader.NodeDataExceptionMarker;
+
+import java.io.ObjectOutputStream;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
public class DefaultStateTransferGenerator implements StateTransferGenerator
{
@@ -38,7 +39,7 @@
this.internalFqns = cache.getInternalFqns();
}
- public void generateState(ObjectOutputStream out, DataNode rootNode, boolean generateTransient,
+ public void generateState(ObjectOutputStream out, Node rootNode, boolean generateTransient,
boolean generatePersistent, boolean suppressErrors) throws Throwable
{
Fqn fqn = rootNode.getFqn();
@@ -51,23 +52,31 @@
{
//transient + marker
if (log.isTraceEnabled())
+ {
log.trace("writing transient state for " + fqn);
+ }
marshallTransientState(rootNode, out);
out.writeObject(StateTransferManager.STREAMING_DELIMETER_NODE);
if (log.isTraceEnabled())
+ {
log.trace("transient state succesfully written");
+ }
//associated + marker
if (log.isTraceEnabled())
+ {
log.trace("writing associated state");
+ }
marshallAssociatedState(fqn, out);
out.writeObject(StateTransferManager.STREAMING_DELIMETER_NODE);
if (log.isTraceEnabled())
+ {
log.trace("associated state succesfully written");
+ }
}
@@ -75,7 +84,9 @@
if (cacheLoader != null && generatePersistent)
{
if (log.isTraceEnabled())
+ {
log.trace("writing persistent state for " + fqn + ",using " + cache.getCacheLoader().getClass());
+ }
if (fqn.isRoot())
{
@@ -87,15 +98,17 @@
}
if (log.isTraceEnabled())
+ {
log.trace("persistent state succesfully written");
}
+ }
out.writeObject(StateTransferManager.STREAMING_DELIMETER_NODE);
}
catch (Throwable t)
{
encouteredException = t;
log.error("failed writing state", t);
- out.writeObject(new NodeDataExceptionMarker(t,cache.getLocalAddress()));
+ out.writeObject(new NodeDataExceptionMarker(t, cache.getLocalAddress()));
}
finally
{
@@ -113,11 +126,13 @@
* @param out
* @throws Exception
*/
- protected void marshallTransientState(DataNode node, ObjectOutputStream out) throws Exception
+ protected void marshallTransientState(Node node, ObjectOutputStream out) throws Exception
{
if (internalFqns.contains(node.getFqn()))
+ {
return;
+ }
Map attrs;
NodeData nd;
@@ -125,15 +140,21 @@
// first handle the current node
attrs = node.getData();
if (attrs == null || attrs.size() == 0)
+ {
nd = new NodeData(node.getFqn());
+ }
else
+ {
nd = new NodeData(node.getFqn(), attrs);
+ }
out.writeObject(nd);
// then visit the children
Map children = node.getNodeSPI().getChildrenMap();
if (children == null)
+ {
return;
+ }
for (Iterator it = children.entrySet().iterator(); it.hasNext();)
{
Map.Entry entry = (Map.Entry) it.next();
1.5 +4 -4 JBossCache/src/org/jboss/cache/statetransfer/StateTransferIntegrator.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: StateTransferIntegrator.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/StateTransferIntegrator.java,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -b -r1.4 -r1.5
--- StateTransferIntegrator.java 11 Sep 2006 21:53:19 -0000 1.4
+++ StateTransferIntegrator.java 14 Dec 2006 17:18:48 -0000 1.5
@@ -6,13 +6,13 @@
*/
package org.jboss.cache.statetransfer;
-import java.io.ObjectInputStream;
+import org.jboss.cache.Node;
-import org.jboss.cache.DataNode;
+import java.io.ObjectInputStream;
public interface StateTransferIntegrator
{
- void integrateState(ObjectInputStream ois, DataNode target,ClassLoader cl)throws Exception;
+ void integrateState(ObjectInputStream ois, Node target, ClassLoader cl) throws Exception;
}
\ No newline at end of file
1.7 +78 -59 JBossCache/src/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: DefaultStateTransferIntegrator.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java,v
retrieving revision 1.6
retrieving revision 1.7
diff -u -b -r1.6 -r1.7
--- DefaultStateTransferIntegrator.java 23 Nov 2006 19:43:05 -0000 1.6
+++ DefaultStateTransferIntegrator.java 14 Dec 2006 17:18:48 -0000 1.7
@@ -6,13 +6,6 @@
*/
package org.jboss.cache.statetransfer;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.cache.CacheException;
@@ -26,6 +19,12 @@
import org.jboss.cache.loader.NodeData;
import org.jboss.cache.loader.NodeDataExceptionMarker;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
public class DefaultStateTransferIntegrator implements StateTransferIntegrator
{
@@ -52,7 +51,7 @@
this.internalFqns = cache.getInternalFqns();
}
- public void integrateState(ObjectInputStream ois, DataNode target, ClassLoader cl) throws Exception
+ public void integrateState(ObjectInputStream ois, Node target, ClassLoader cl) throws Exception
{
Throwable cause = null;
try
@@ -61,36 +60,40 @@
integrateAssociatedState(ois);
integratePersistentState(ois);
}
- catch(Throwable t)
+ catch (Throwable t)
{
cause = t;
- log.error("Failed integrating state.",t);
+ log.error("Failed integrating state.", t);
}
finally
{
ois.close();
if (cause != null)
{
- throw new Exception("State transfer failed ",cause);
+ throw new Exception("State transfer failed ", cause);
}
}
}
- protected void integrateTransientState(ObjectInputStream in,DataNode target, ClassLoader cl) throws Exception
+ protected void integrateTransientState(ObjectInputStream in, Node target, ClassLoader cl) throws Exception
{
boolean transientSet = false;
ClassLoader oldCL = setClassLoader(cl);
try
{
if (log.isTraceEnabled())
+ {
log.trace("integrating transient state for " + target);
+ }
integrateTransientState(target, in);
transientSet = true;
if (log.isTraceEnabled())
+ {
log.trace("transient state successfully integrated");
+ }
notifyAllNodesCreated(target);
}
@@ -101,7 +104,7 @@
// Clear any existing state from the targetRoot
log.warn("transient state integration failed, removing all children of " + target);
target.clearData();
- target.removeChildren();
+ ((DataNode) target).removeChildren();
}
resetClassLoader(oldCL);
@@ -127,12 +130,16 @@
if (loader == null)
{
if (log.isTraceEnabled())
+ {
log.trace("cache loader is null, will not attempt to integrate persistent state");
}
+ }
else
{
if (log.isTraceEnabled())
+ {
log.trace("integrating persistent state using " + loader.getClass().getName());
+ }
boolean persistentSet = false;
try
@@ -163,11 +170,13 @@
else
{
if (log.isTraceEnabled())
+ {
log.trace("persistent state integrated successfully");
}
}
}
}
+ }
protected TreeCache getCache()
{
@@ -193,7 +202,7 @@
* Generates NodeAdded notifications for all nodes of the tree. This is
* called whenever the tree is initially retrieved (state transfer)
*/
- private void notifyAllNodesCreated(DataNode curr)
+ private void notifyAllNodesCreated(Node curr)
{
DataNode n;
@@ -222,15 +231,17 @@
private void resetClassLoader(ClassLoader oldLoader)
{
if (oldLoader != null)
+ {
Thread.currentThread().setContextClassLoader(oldLoader);
}
+ }
- private void integrateTransientState(DataNode target, ObjectInputStream in) throws IOException,
+ private void integrateTransientState(Node target, ObjectInputStream in) throws IOException,
ClassNotFoundException
{
Set retainedNodes = retainInternalNodes(target);
- target.removeChildren();
+ ((DataNode) target).removeChildren();
// Read the first NodeData and integrate into our target
NodeData nd = readNode(in);
@@ -267,7 +278,7 @@
return nd;
}
- private NodeData integrateStateTransferChildren(DataNode parent, int offset, ObjectInputStream in)
+ private NodeData integrateStateTransferChildren(Node parent, int offset, ObjectInputStream in)
throws IOException, ClassNotFoundException
{
int parent_level = parent.getFqn().size();
@@ -282,19 +293,25 @@
// If we need to integrate into the buddy backup subtree,
// change the Fqn to fit under it
if (offset > 0)
+ {
fqn = new Fqn(parent.getFqn().getFqnChild(offset), fqn);
+ }
size = fqn.size();
if (size <= parent_level)
+ {
return nd;
+ }
else if (size > target_level)
+ {
throw new IllegalStateException("NodeData " + fqn + " is not a direct child of " + parent.getFqn());
+ }
name = fqn.get(size - 1);
// We handle this NodeData. Create a DataNode and
// integrate its data
- DataNode target = factory.createDataNode(nodeType, name, fqn, parent, nd.getAttributes(), false, null, cache.getCacheSPI());
- parent.addChild(name, target);
+ Node target = factory.createDataNode(nodeType, name, fqn, parent, nd.getAttributes(), false, null, cache.getCacheSPI());
+ ((DataNode) parent).addChild(name, target);
// Recursively call, which will walk down the tree
// and return the next NodeData that's a child of our parent
@@ -303,7 +320,7 @@
return null;
}
- private Set retainInternalNodes(DataNode target)
+ private Set retainInternalNodes(Node target)
{
Set result = new HashSet();
Fqn targetFqn = target.getFqn();
@@ -314,17 +331,19 @@
{
DataNode internalNode = getInternalNode(target, internalFqn);
if (internalNode != null)
+ {
result.add(internalNode);
}
}
+ }
return result;
}
- private DataNode getInternalNode(DataNode parent, Fqn internalFqn)
+ private DataNode getInternalNode(Node parent, Fqn internalFqn)
{
Object name = internalFqn.get(parent.getFqn().size());
- DataNode result = (DataNode) parent.getChild(name);
+ DataNode result = (DataNode) parent.getChild(new Fqn(name));
if (result != null)
{
if (internalFqn.size() < result.getFqn().size())
@@ -336,7 +355,7 @@
return result;
}
- private void integrateRetainedNodes(DataNode root, Set retainedNodes)
+ private void integrateRetainedNodes(Node root, Set retainedNodes)
{
Fqn rootFqn = root.getFqn();
for (Iterator it = retainedNodes.iterator(); it.hasNext();)
@@ -349,17 +368,17 @@
}
}
- private void integrateRetainedNode(DataNode ancestor, DataNode descendant)
+ private void integrateRetainedNode(Node ancestor, DataNode descendant)
{
Fqn descFqn = descendant.getFqn();
Fqn ancFqn = ancestor.getFqn();
Object name = descFqn.get(ancFqn.size());
- DataNode child = (DataNode) ancestor.getChild(name);
+ DataNode child = (DataNode) ancestor.getChild(new Fqn(name));
if (ancFqn.size() == descFqn.size() + 1)
{
if (child == null)
{
- ancestor.addChild(name, descendant);
+ ((DataNode) ancestor).addChild(name, descendant);
}
else
{
@@ -373,8 +392,8 @@
// Missing level -- have to create empty node
// This shouldn't really happen -- internal fqns should
// be immediately under the root
- child = (DataNode) factory.createDataNode(nodeType, name, new Fqn(ancFqn, name), ancestor, null, true, null, cache.getCacheSPI());
- ancestor.addChild(name, child);
+ child = factory.createDataNode(nodeType, name, new Fqn(ancFqn, name), ancestor, null, true, null, cache.getCacheSPI());
+ ((DataNode) ancestor).addChild(name, child);
}
// Keep walking down the tree
More information about the jboss-cvs-commits
mailing list