[jboss-cvs] JBossCache/src/org/jboss/cache/statetransfer ...
Vladmir Blagojevic
vladimir.blagojevic at jboss.com
Thu Aug 31 16:30:45 EDT 2006
User: vblagojevic
Date: 06/08/31 16:30:45
Modified: src/org/jboss/cache/statetransfer
StateTransferGenerator_200.java
StateTransferIntegrator.java
StateTransferIntegrator_200.java
StreamingStateTransferGenerator_200.java
StateTransferManager.java
StreamingStateTransferIntegrator_200.java
Added: src/org/jboss/cache/statetransfer
AbstractStateTransferGenerator.java
AbstractStateTransferIntegrator.java
Log:
state transfer refactoring
Revision Changes Path
1.6 +8 -240 JBossCache/src/org/jboss/cache/statetransfer/StateTransferGenerator_200.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: StateTransferGenerator_200.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/StateTransferGenerator_200.java,v
retrieving revision 1.5
retrieving revision 1.6
diff -u -b -r1.5 -r1.6
--- StateTransferGenerator_200.java 31 Aug 2006 14:56:46 -0000 1.5
+++ StateTransferGenerator_200.java 31 Aug 2006 20:30:45 -0000 1.6
@@ -6,258 +6,26 @@
*/
package org.jboss.cache.statetransfer;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.jboss.cache.DataNode;
-import org.jboss.cache.Fqn;
import org.jboss.cache.TreeCache;
-import org.jboss.cache.Version;
-import org.jboss.cache.loader.NodeData;
import org.jboss.cache.util.ExposedByteArrayOutputStream;
import org.jboss.invocation.MarshalledValueOutputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectOutputStream;
-import java.io.OutputStream;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-
-public class StateTransferGenerator_200 implements StateTransferGenerator
+public class StateTransferGenerator_200 extends AbstractStateTransferGenerator implements StateTransferGenerator
{
- public static final short STATE_TRANSFER_VERSION =
- Version.getVersionShort("2.0.0.GA");
-
- //whenever we wrap stream A with object based stream B,B writes a few bytes
- //of a stream header to underlying stream A
- public static final int OBJECT_STREAM_MARKER_LENGTH =4;
-
- private Log log = LogFactory.getLog(getClass().getName());
-
- private TreeCache cache;
- private Set internalFqns;
protected StateTransferGenerator_200(TreeCache cache)
{
- this.cache = cache;
- this.internalFqns = cache.getInternalFqns();
- }
-
- public byte[] generateStateTransfer(DataNode rootNode,
- boolean generateTransient,
- boolean generatePersistent,
- boolean suppressErrors)
- throws Throwable
- {
- boolean debug = log.isDebugEnabled();
-
- Fqn fqn = rootNode.getFqn();
-
- byte[][] states=new byte[3][]; // [transient][associated][persistent]
- states[0]=states[1]=states[2]=null;
- int[] sizes = new int[3];
- byte[] retval = null;
- int lastSize;
-
- ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream(1024);
- try {
- initializeStateTransfer(baos);
- lastSize = baos.size();
- }
- catch (Throwable t)
- {
- log.error("failed initialing state transfer byte[]", t);
- if (!suppressErrors)
- throw t;
-
- return null;
- }
-
- try {
-
- if(generateTransient) {
- MarshalledValueOutputStream out = new MarshalledValueOutputStream(baos);
- marshallTransientState(rootNode, out);
- sizes[0] = baos.size() - lastSize;
- lastSize = baos.size();
- if (debug) {
- log.debug("generated the in-memory state (" + sizes[0] +
- " bytes)");
- }
-
- // Return any state associated with the subtree but not stored in it
- marshallAssociatedState(fqn, baos);
- sizes[1] = baos.size() - lastSize;
- lastSize = baos.size();
- if (debug) {
- log.debug("returning the associated state (" + sizes[1] +
- " bytes)");
- }
- }
- }
- catch(Throwable t) {
- log.error("failed getting the in-memory (transient) state", t);
- if (!suppressErrors)
- throw t;
-
- // Reset the byte array and see if we can continue with persistent state
- // TODO reconsider this -- why are errors suppressed at all?
- sizes[0] = sizes[1] = 0;
- baos.reset();
- try {
- initializeStateTransfer(baos);
- }
- catch (Throwable t1) {
- log.error("failed re-initializing state transfer", t1);
- return null;
- }
- }
-
- if (generatePersistent)
- {
- ByteArrayOutputStream out_stream = new ByteArrayOutputStream(1024);
- ObjectOutputStream out = new MarshalledValueOutputStream(out_stream);
- byte[] persState = null;
- boolean persistentStateProvidedOk = false;
- try
- {
- if (debug)
- log.debug("getting the persistent state from cacheloader " + cache.getCacheLoader().getClass());
- if (fqn.isRoot())
- {
- cache.getCacheLoader().loadEntireState(out);
- }
- else
- {
- cache.getCacheLoader().loadState(fqn, out);
- }
- persistentStateProvidedOk=true;
- }
- catch (Throwable t)
- {
- log.error("cacheloader failed while getting the persistent state", t);
- if (!suppressErrors)
- throw t;
+ super(cache);
}
- finally
- {
- out.close();
- persState = out_stream.toByteArray();
- if (persistentStateProvidedOk && persState.length > OBJECT_STREAM_MARKER_LENGTH)
- {
- sizes[2] = persState.length;
- baos.write(persState);
- }
- else
- {
- sizes[2] = 0;
- }
-
- if (debug)
- {
- log.debug("generated the persistent state (" + sizes[2] + " bytes)");
- }
- }
- }
-
- // Overwrite the placeholders used for the sizes of the state transfer
- // components with the correct values
- try {
- byte[] bytes = baos.getRawBuffer();
- overwriteInt(bytes, 8, sizes[0]);
- overwriteInt(bytes, 12, sizes[1]);
- overwriteInt(bytes, 16, sizes[2]);
- retval = bytes;
-
- log.info("returning the state for tree rooted in " + fqn.toString() +
- "(" + retval.length + " bytes)");
- return retval;
- }
- catch(Throwable t) {
- log.error("failed serializing transient and persistent state", t);
- if (!suppressErrors)
- throw t;
- return null;
- }
-
- }
-
- /**
- * Write the state transfer version as well as placeholder ints for the
- * sizes of the components of the state transfer.
- */
- protected void initializeStateTransfer(OutputStream baos) throws IOException
+ public byte[] generateStateTransfer(DataNode rootNode, boolean generateTransient, boolean generatePersistent,
+ boolean suppressErrors) throws Throwable
{
+ int initialSize = 16*1024;
+ ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream(initialSize);
MarshalledValueOutputStream out = new MarshalledValueOutputStream(baos);
- out.writeShort(STATE_TRANSFER_VERSION);
- // Write a placeholder for the 3 sizes we'll merge in later
- out.writeInt(0);
- out.writeInt(0);
- out.writeInt(0);
- out.close();
- }
-
- /**
- * Do a preorder traversal: visit the node first, then the node's children
- *
- * @param out
- * @throws Exception
- */
- protected void marshallTransientState(DataNode node,
- ObjectOutputStream out) throws Exception
- {
-
- if (internalFqns.contains(node.getFqn()))
- return;
-
- Map attrs;
- NodeData nd;
-
- // first handle the current node
- attrs=node.getData();
- if(attrs == null || attrs.size() == 0)
- nd=new NodeData(node.getFqn());
- else
- nd=new NodeData(node.getFqn(), attrs);
- out.writeObject(nd);
-
- // then visit the children
- Map children = node.getChildren();
- if(children == null)
- return;
- for(Iterator it=children.entrySet().iterator(); it.hasNext();) {
- Map.Entry entry = (Map.Entry) it.next();
- marshallTransientState((DataNode) entry.getValue(), out);
- }
-
- out.close();
- }
-
- /**
- * Does nothing in this base class; can be overridden in a subclass.
- */
- protected void marshallAssociatedState(Fqn fqn,
- OutputStream baos) throws Exception
- {
- // no-op in this base class
- }
-
- protected TreeCache getTreeCache()
- {
- return cache;
- }
-
- /**
- * Overwrites the bytes in the given array starting at the given position
- * with another new integer.
- */
- public static void overwriteInt(byte[] bytes, int startpos, int newVal)
- {
- bytes[startpos] = (byte) (newVal >>> 24);
- bytes[startpos + 1] = (byte) (newVal >>> 16);
- bytes[startpos + 2] = (byte) (newVal >>> 8);
- bytes[startpos + 3] = (byte) (newVal >>> 0);
+ streamState(out, rootNode, generateTransient, generatePersistent, suppressErrors);
+ return baos.getRawBuffer();
}
}
1.3 +2 -4 JBossCache/src/org/jboss/cache/statetransfer/StateTransferIntegrator.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: StateTransferIntegrator.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/StateTransferIntegrator.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -b -r1.2 -r1.3
--- StateTransferIntegrator.java 11 Oct 2005 20:15:15 -0000 1.2
+++ StateTransferIntegrator.java 31 Aug 2006 20:30:45 -0000 1.3
@@ -11,8 +11,6 @@
public interface StateTransferIntegrator
{
- void integrateTransientState(DataNode target, ClassLoader cl) throws Exception;
-
- void integratePersistentState() throws Exception;
+ void integrateState(DataNode target,ClassLoader cl)throws Exception;
}
\ No newline at end of file
1.6 +17 -353 JBossCache/src/org/jboss/cache/statetransfer/StateTransferIntegrator_200.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: StateTransferIntegrator_200.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/StateTransferIntegrator_200.java,v
retrieving revision 1.5
retrieving revision 1.6
diff -u -b -r1.5 -r1.6
--- StateTransferIntegrator_200.java 31 Aug 2006 14:56:46 -0000 1.5
+++ StateTransferIntegrator_200.java 31 Aug 2006 20:30:45 -0000 1.6
@@ -6,374 +6,38 @@
*/
package org.jboss.cache.statetransfer;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import java.io.ByteArrayInputStream;
+
import org.jboss.cache.DataNode;
import org.jboss.cache.Fqn;
import org.jboss.cache.TreeCache;
-import org.jboss.cache.buddyreplication.BuddyManager;
-import org.jboss.cache.factories.NodeFactory;
-import org.jboss.cache.loader.CacheLoader;
-import org.jboss.cache.loader.NodeData;
import org.jboss.invocation.MarshalledValueInputStream;
-import java.io.ByteArrayInputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-
-public class StateTransferIntegrator_200 implements StateTransferIntegrator
+public class StateTransferIntegrator_200 extends AbstractStateTransferIntegrator implements StateTransferIntegrator
{
- /** Number of bytes at the beginning of the state transfer byte[]
- * utilized by meta-information about the composition of the byte[]
- * (6 for stream header, 2 for version short,
- * 3 * 4 for lengths of the state components, 2 bytes for close)
- */
- protected static final int HEADER_LENGTH = 6 + 2 + 4 + 4 + 4;// + 2;
- protected Log log = LogFactory.getLog(getClass().getName());
+ byte[] state = null;
- private TreeCache cache;
- private Fqn targetFqn;
- private byte[] state;
- private int transientSize;
- private int associatedSize;
- private int persistentSize;
- private boolean transientSet;
- private NodeFactory factory;
- private byte nodeType;
- private Set internalFqns;
-
-
- protected StateTransferIntegrator_200(byte[] state, Fqn targetFqn,
- TreeCache cache) throws Exception
+ protected StateTransferIntegrator_200(byte[] state, Fqn targetFqn, TreeCache cache) throws Exception
{
- this.targetFqn = targetFqn;
- this.cache = cache;
+ super(targetFqn, cache);
this.state = state;
- this.factory = NodeFactory.getInstance();
- this.nodeType = cache.getConfiguration().isNodeLockingOptimistic()
- ? NodeFactory.NODE_TYPE_OPTIMISTIC_NODE
- : NodeFactory.NODE_TYPE_TREENODE;
- this.internalFqns = cache.getInternalFqns();
+ }
+ public void integrateState(DataNode target, ClassLoader cl) throws Exception
+ {
ByteArrayInputStream bais = new ByteArrayInputStream(state);
MarshalledValueInputStream in = new MarshalledValueInputStream(bais);
in.readShort(); // the version, which we discard
- transientSize = in.readInt();
- associatedSize = in.readInt();
- persistentSize = in.readInt();
- in.close();
- if (log.isTraceEnabled()) {
- log.trace("transient state: " + transientSize + " bytes");
- log.trace("associated state: " + associatedSize + " bytes");
- log.trace("persistent state: " + persistentSize + " bytes");
- }
- }
-
- public void integrateTransientState(DataNode target, ClassLoader cl)
- throws Exception
- {
- if (transientSize > 0) {
-
- ClassLoader oldCL = null;
- try {
- if (cl != null) {
- oldCL = Thread.currentThread().getContextClassLoader();
- Thread.currentThread().setContextClassLoader(cl);
- }
-
- if (log.isTraceEnabled())
- log.trace("integrating transient state for " + target);
-
- integrateTransientState(target);
-
- transientSet = true;
-
- if (log.isTraceEnabled())
- log.trace("transient state successfully integrated for " +
- targetFqn);
-
- // 3. Set the associated state. We only do this if the normal
- // transient state was set.
- integrateAssociatedState();
- }
- finally {
- if (!transientSet) {
- // Clear any existing state from the targetRoot
- target.clear();
- target.removeAllChildren();
- }
-
- if (oldCL != null)
- Thread.currentThread().setContextClassLoader(oldCL);
- }
- }
- }
-
- /**
- * Provided for subclasses that deal with associated state.
- *
- * @throws Exception
- */
- protected void integrateAssociatedState() throws Exception
- {
- // no-op in this base class
- }
-
- public void integratePersistentState() throws Exception
- {
- if (persistentSize > 0)
- {
- CacheLoader loader = cache.getCacheLoader();
- if (loader == null)
- {
- log.error("cache loader is null, cannot set persistent state");
- }
- else
- {
- ByteArrayInputStream in_stream = new ByteArrayInputStream(getPersistentState());
- MarshalledValueInputStream in = new MarshalledValueInputStream(in_stream);
-
- if (log.isTraceEnabled())
- log.trace("setting the persistent state with " + loader.getClass());
-
- if (targetFqn.isRoot())
- {
- loader.storeEntireState(in);
- }
- else
- {
- loader.storeState(targetFqn, in);
- }
-
- if (log.isTraceEnabled())
- log.trace("setting the persistent state was successful");
- }
- }
- }
-
- private void integrateTransientState(DataNode target)
- throws IOException, ClassNotFoundException
- {
- Set retainedNodes = retainInternalNodes(target);
-
- target.removeAllChildren();
- ByteArrayInputStream in_stream=new ByteArrayInputStream(state, HEADER_LENGTH, transientSize);
- MarshalledValueInputStream in=new MarshalledValueInputStream(in_stream);
-
- // Read the first NodeData and integrate into our target
- NodeData nd = (NodeData) in.readObject();
- Map attrs = nd.getAttributes();
- if (attrs != null)
- target.put(attrs, true);
- else
- target.clear();
-
- // Check whether this is an integration into the buddy backup subtree
- Fqn tferFqn = nd.getFqn();
- Fqn tgtFqn = target.getFqn();
- boolean move = tgtFqn.isChildOrEquals(BuddyManager.BUDDY_BACKUP_SUBTREE_FQN)
- && !tferFqn.isChildOrEquals(tgtFqn);
- // If it is an integration, calculate how many levels of offset
- int offset = move ? tgtFqn.size() - tferFqn.size() : 0;
-
- integrateStateTransferChildren(target, offset, in);
-
- in.close();
-
- integrateRetainedNodes(target, retainedNodes);
- }
-
- private NodeData integrateStateTransferChildren(DataNode parent,
- int offset,
- ObjectInputStream in)
- throws IOException, ClassNotFoundException
- {
- int parent_level = parent.getFqn().size();
- int target_level = parent_level + 1;
- Fqn fqn;
- int size;
- Object name;
try
{
- NodeData nd = (NodeData) in.readObject();
- while (nd != null) {
- fqn = nd.getFqn();
- // If we need to integrate into the buddy backup subtree,
- // change the Fqn to fit under it
- if (offset > 0)
- fqn = new Fqn(parent.getFqn().getFqnChild(offset), fqn);
- size = fqn.size();
- if (size <= parent_level)
- return nd;
- else if (size > target_level)
- throw new IllegalStateException("NodeData " + fqn +
- " is not a direct child of " +
- parent.getFqn());
-
- name = fqn.get(size - 1);
-
- // We handle this NodeData. Create a DataNode and
- // integrate its data
- DataNode target = factory.createDataNode(nodeType,
- name,
- fqn,
- parent,
- nd.getAttributes(),
- true,
- cache);
- parent.addChild(name, target);
-
- // Recursively call, which will walk down the tree
- // and return the next NodeData that's a child of our parent
- nd = integrateStateTransferChildren(target, offset, in);
- }
- }
- catch (EOFException eof) {
- // all done
- }
-
- return null;
+ integrateTransientState(in, target, cl);
+ integratePersistentState(in);
}
-
- private byte[] getPersistentState()
- {
- byte[] result = new byte[persistentSize];
- System.arraycopy(state, HEADER_LENGTH + transientSize + associatedSize, result, 0, persistentSize);
- return result;
- }
-
- private Set retainInternalNodes(DataNode target)
- {
- Set result = new HashSet();
- Fqn targetFqn = target.getFqn();
- for (Iterator it = internalFqns.iterator(); it.hasNext();)
- {
- Fqn internalFqn = (Fqn) it.next();
- if (internalFqn.isChildOf(targetFqn))
- {
- DataNode internalNode = getInternalNode(target, internalFqn);
- if (internalNode != null)
- result.add(internalNode);
- }
- }
-
- return result;
- }
-
- private DataNode getInternalNode(DataNode parent, Fqn internalFqn)
- {
- Object name = internalFqn.get(parent.getFqn().size());
- DataNode result = (DataNode) parent.getChild(name);
- if (result != null)
- {
- if (internalFqn.size() < result.getFqn().size())
- {
- // need to recursively walk down the tree
- result = getInternalNode(result, internalFqn);
- }
- }
- return result;
- }
-
- private void integrateRetainedNodes(DataNode root, Set retainedNodes)
- {
- Fqn rootFqn = root.getFqn();
- for (Iterator it = retainedNodes.iterator(); it.hasNext();)
- {
- DataNode retained = (DataNode) it.next();
- if (retained.getFqn().isChildOf(rootFqn))
- {
- integrateRetainedNode(root, retained);
- }
- }
- }
-
- private void integrateRetainedNode(DataNode ancestor, DataNode descendant)
- {
- Fqn descFqn = descendant.getFqn();
- Fqn ancFqn = ancestor.getFqn();
- Object name = descFqn.get(ancFqn.size());
- DataNode child = (DataNode) ancestor.getChild(name);
- if (ancFqn.size() == descFqn.size() + 1)
- {
- if (child == null)
+ finally
{
- ancestor.addChild(name, descendant);
- }
- else
- {
- log.warn("Received unexpected internal node " + descFqn +
- " in transferred state");
- }
- }
- else
- {
- if (child == null)
- {
- // Missing level -- have to create empty node
- // This shouldn't really happen -- internal fqns should
- // be immediately under the root
- child = factory.createDataNode(nodeType,
- name,
- new Fqn(ancFqn, name),
- ancestor,
- null,
- true,
- cache);
- ancestor.addChild(name, child);
- }
-
- // Keep walking down the tree
- integrateRetainedNode(child, descendant);
- }
- }
-
- protected int getAssociatedSize()
- {
- return associatedSize;
- }
-
- protected TreeCache getCache()
- {
- return cache;
- }
-
- protected NodeFactory getFactory()
- {
- return factory;
- }
-
- protected byte getNodeType()
- {
- return nodeType;
- }
-
- protected int getPersistentSize()
- {
- return persistentSize;
- }
-
- protected byte[] getState()
- {
- return state;
- }
-
- protected int getTransientSize()
- {
- return transientSize;
+ in.close();
}
-
- protected Fqn getTargetFqn()
- {
- return targetFqn;
}
-
-
}
1.3 +6 -147 JBossCache/src/org/jboss/cache/statetransfer/StreamingStateTransferGenerator_200.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: StreamingStateTransferGenerator_200.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/StreamingStateTransferGenerator_200.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -b -r1.2 -r1.3
--- StreamingStateTransferGenerator_200.java 31 Aug 2006 14:56:46 -0000 1.2
+++ StreamingStateTransferGenerator_200.java 31 Aug 2006 20:30:45 -0000 1.3
@@ -6,168 +6,27 @@
*/
package org.jboss.cache.statetransfer;
-import java.io.ObjectOutputStream;
import java.io.OutputStream;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.jboss.cache.DataNode;
-import org.jboss.cache.Fqn;
import org.jboss.cache.TreeCache;
-import org.jboss.cache.Version;
-import org.jboss.cache.loader.NodeData;
import org.jboss.invocation.MarshalledValueOutputStream;
-public class StreamingStateTransferGenerator_200 implements StateTransferGenerator
+public class StreamingStateTransferGenerator_200 extends AbstractStateTransferGenerator implements StateTransferGenerator
{
- public static final short STATE_TRANSFER_VERSION = Version.getVersionShort("2.0.0.GA");
-
- private Log log = LogFactory.getLog(getClass().getName());
-
- private TreeCache cache;
-
- private Set internalFqns;
-
private OutputStream os;
protected StreamingStateTransferGenerator_200(OutputStream os, TreeCache cache)
{
- this.cache = cache;
+ super(cache);
this.os = os;
- this.internalFqns = cache.getInternalFqns();
}
public byte[] generateStateTransfer(DataNode rootNode, boolean generateTransient, boolean generatePersistent,
boolean suppressErrors) throws Throwable
{
- Fqn fqn = rootNode.getFqn();
MarshalledValueOutputStream out = new MarshalledValueOutputStream(os);
-
- try
- {
- try
- {
- out.writeShort(STATE_TRANSFER_VERSION);
- if (generateTransient)
- {
- if (log.isTraceEnabled())
- log.trace("writing transient state for " + fqn);
-
- marshallTransientState(rootNode, out);
-
- if (log.isTraceEnabled())
- log.trace("transient state succesfully written");
-
- if (log.isTraceEnabled())
- log.trace("writing associated state");
-
- marshallAssociatedState(fqn, os);
-
- if (log.isTraceEnabled())
- log.trace("associated state succesfully written");
- }
- }
- catch (Throwable t)
- {
- log.error("failed getting the in-memory (transient) state", t);
- if (!suppressErrors)
- throw t;
- }
- finally
- {
- if (log.isTraceEnabled())
- log.trace("writing delimeter after transient state");
-
- out.writeObject(StateTransferManager.STREAMING_DELIMETER_NODE);
- }
- try
- {
- if (generatePersistent)
- {
- if (log.isTraceEnabled())
- log.trace("writing persistent state for " + fqn);
-
- if (fqn.isRoot())
- {
- cache.getCacheLoader().loadEntireState(out);
- }
- else
- {
- cache.getCacheLoader().loadState(fqn, out);
- }
-
- if (log.isTraceEnabled())
- log.trace("persistent state succesfully written");
- }
- }
- catch (Throwable t)
- {
- log.error("failed getting the persistent state", t);
- if (!suppressErrors)
- throw t;
- }
- finally
- {
- if (log.isTraceEnabled())
- log.trace("writing delimeter after persistent state");
-
- out.writeObject(StateTransferManager.STREAMING_DELIMETER_NODE);
- }
- }
- finally
- {
- out.close();
- }
+ streamState(out, rootNode, generateTransient, generatePersistent, suppressErrors);
return null;
}
-
- /**
- * Do a preorder traversal: visit the node first, then the node's children
- *
- * @param out
- * @throws Exception
- */
- protected void marshallTransientState(DataNode node, ObjectOutputStream out) throws Exception
- {
-
- if (internalFqns.contains(node.getFqn()))
- return;
-
- Map attrs;
- NodeData nd;
-
- // first handle the current node
- attrs = node.getData();
- if (attrs == null || attrs.size() == 0)
- nd = new NodeData(node.getFqn());
- else
- nd = new NodeData(node.getFqn(), attrs);
- out.writeObject(nd);
-
- // then visit the children
- Map children = node.getChildren();
- if (children == null)
- return;
- for (Iterator it = children.entrySet().iterator(); it.hasNext();)
- {
- Map.Entry entry = (Map.Entry) it.next();
- marshallTransientState((DataNode) entry.getValue(), out);
- }
- }
-
- /**
- * Does nothing in this base class; can be overridden in a subclass.
- */
- protected void marshallAssociatedState(Fqn fqn, OutputStream baos) throws Exception
- {
- // no-op in this base class
- }
-
- protected TreeCache getTreeCache()
- {
- return cache;
- }
}
1.8 +21 -35 JBossCache/src/org/jboss/cache/statetransfer/StateTransferManager.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: StateTransferManager.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/StateTransferManager.java,v
retrieving revision 1.7
retrieving revision 1.8
diff -u -b -r1.7 -r1.8
--- StateTransferManager.java 31 Aug 2006 14:56:46 -0000 1.7
+++ StateTransferManager.java 31 Aug 2006 20:30:45 -0000 1.8
@@ -1,3 +1,9 @@
+/*
+ * JBoss, the OpenSource J2EE webOS
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
package org.jboss.cache.statetransfer;
import java.io.InputStream;
@@ -328,17 +334,7 @@
return;
}
- byte [] new_state = null;
- InputStream istream = null;
- if(state instanceof byte[])
- {
- new_state = (byte[]) state;
- log.info("received the state (size=" + new_state.length + " bytes)");
- }
- else
- {
- istream = (InputStream) state;
- }
+ boolean usingStreamTransfer = (state instanceof InputStream)?true:false;
Object owner = getOwnerForLock();
try
@@ -349,39 +345,29 @@
true, true);
StateTransferIntegrator integrator =null;
- if(new_state!=null)
+ if(usingStreamTransfer)
{
- integrator = getStateTransferIntegrator(new_state,targetRoot.getFqn());
+ integrator = getStateTransferIntegrator((InputStream)state,
+ targetRoot.getFqn());
}
else
{
- integrator = getStateTransferIntegrator(istream,
- targetRoot.getFqn());
+ byte [] new_state = (byte[]) state;
+ log.info("received the state (size=" + new_state.length + " bytes)");
+ integrator = getStateTransferIntegrator(new_state,targetRoot.getFqn());
}
- // If transient state is available, integrate it
try
{
- integrator.integrateTransientState(targetRoot, cl);
- notifyAllNodesCreated(targetRoot);
+ integrator.integrateState(targetRoot, cl);
}
catch (Throwable t)
{
- log.error("failed setting transient state", t);
+ log.error("failed setting state", t);
}
-
- // Store any persistent state
- integrator.integratePersistentState();
}
finally
{
- if(istream!=null)
- {
- try
- {
- istream.close();
- }catch(Exception ignored){}
- }
releaseStateTransferLocks(targetRoot, owner, true);
}
1.3 +7 -291 JBossCache/src/org/jboss/cache/statetransfer/StreamingStateTransferIntegrator_200.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: StreamingStateTransferIntegrator_200.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/StreamingStateTransferIntegrator_200.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -b -r1.2 -r1.3
--- StreamingStateTransferIntegrator_200.java 31 Aug 2006 14:56:46 -0000 1.2
+++ StreamingStateTransferIntegrator_200.java 31 Aug 2006 20:30:45 -0000 1.3
@@ -6,317 +6,33 @@
*/
package org.jboss.cache.statetransfer;
-import java.io.IOException;
import java.io.ObjectInputStream;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.jboss.cache.DataNode;
import org.jboss.cache.Fqn;
import org.jboss.cache.TreeCache;
-import org.jboss.cache.buddyreplication.BuddyManager;
-import org.jboss.cache.factories.NodeFactory;
-import org.jboss.cache.loader.CacheLoader;
-import org.jboss.cache.loader.NodeData;
-public class StreamingStateTransferIntegrator_200 implements StateTransferIntegrator
+public class StreamingStateTransferIntegrator_200 extends AbstractStateTransferIntegrator implements StateTransferIntegrator
{
- /** Number of bytes at the beginning of the state transfer byte[]
- * utilized by meta-information about the composition of the byte[]
- * (6 for stream header, 2 for version short,
- * 3 * 4 for lengths of the state components, 2 bytes for close)
- */
- protected static final int HEADER_LENGTH = 6 + 2 + 4 + 4 + 4;// + 2;
-
- protected Log log = LogFactory.getLog(getClass().getName());
-
- private TreeCache cache;
-
- private Fqn targetFqn;
-
- private boolean transientSet;
-
- private NodeFactory factory;
-
- private byte nodeType;
-
- private Set internalFqns;
-
ObjectInputStream in; //used in streaming state transfer
- public StreamingStateTransferIntegrator_200(ObjectInputStream inputStream, Fqn targetFqn, TreeCache cache)
+ protected StreamingStateTransferIntegrator_200(ObjectInputStream inputStream, Fqn targetFqn, TreeCache cache)
{
- this.targetFqn = targetFqn;
- this.cache = cache;
- this.factory = NodeFactory.getInstance();
- this.nodeType = cache.getConfiguration().isNodeLockingOptimistic()
- ? NodeFactory.NODE_TYPE_OPTIMISTIC_NODE
- : NodeFactory.NODE_TYPE_TREENODE;
- this.internalFqns = cache.getInternalFqns();
+ super(targetFqn,cache);
in = inputStream;
}
- public void integrateTransientState(DataNode target, ClassLoader cl) throws Exception
- {
- ClassLoader oldCL = setClassLoader(cl);
- try
- {
- if (log.isTraceEnabled())
- log.trace("integrating transient state for " + target);
-
- integrateTransientState(target, in);
-
- transientSet = true;
-
- if (log.isTraceEnabled())
- log.trace("transient state successfully integrated for " + targetFqn);
-
- integrateAssociatedState();
- }
- finally
- {
- if (!transientSet)
- {
- // Clear any existing state from the targetRoot
- target.clear();
- target.removeAllChildren();
- }
-
- resetClassLoader(oldCL);
- }
- }
- /**
- * Provided for subclasses that deal with associated state.
- *
- * @throws Exception
- */
- protected void integrateAssociatedState() throws Exception
+ public void integrateState(DataNode target, ClassLoader cl) throws Exception
{
- // no-op in this base class
- }
-
- public void integratePersistentState() throws Exception
- {
-
try
{
- CacheLoader loader = cache.getCacheLoader();
- if (loader == null)
- {
- log.error("cache loader is null, cannot set persistent state");
- }
- else if (targetFqn.isRoot())
- {
- if (log.isTraceEnabled())
- log.trace("setting the persistent state");
- loader.storeEntireState(in);
- if (log.isTraceEnabled())
- log.trace("setting the persistent state was successful");
- }
- else
- {
- if (log.isTraceEnabled())
- log.trace("setting partial persistent state at " + targetFqn);
- loader.storeState(targetFqn, in);
- if (log.isTraceEnabled())
- log.trace("setting partial persistent state was successful");
- }
+ integrateTransientState(in, target, cl);
+ integratePersistentState(in);
}
finally
{
in.close();
}
}
-
- private ClassLoader setClassLoader(ClassLoader newLoader)
- {
- ClassLoader oldClassLoader = null;
- if (newLoader != null)
- {
- oldClassLoader = Thread.currentThread().getContextClassLoader();
- Thread.currentThread().setContextClassLoader(newLoader);
- }
- return oldClassLoader;
- }
-
- private void resetClassLoader(ClassLoader oldLoader)
- {
- if (oldLoader != null)
- Thread.currentThread().setContextClassLoader(oldLoader);
- }
-
- private void integrateTransientState(DataNode target, ObjectInputStream in) throws IOException,
- ClassNotFoundException
- {
- Set retainedNodes = retainInternalNodes(target);
-
- target.removeAllChildren();
-
- // Read the first NodeData and integrate into our target
- NodeData nd = (NodeData) in.readObject();
-
- //are there any transient nodes at all?
- if (!nd.isMarker())
- {
- Map attrs = nd.getAttributes();
- if (attrs != null)
- target.put(attrs, true);
- else
- target.clear();
-
- // Check whether this is an integration into the buddy backup
- // subtree
- Fqn tferFqn = nd.getFqn();
- Fqn tgtFqn = target.getFqn();
- boolean move = tgtFqn.isChildOrEquals(BuddyManager.BUDDY_BACKUP_SUBTREE_FQN)
- && !tferFqn.isChildOrEquals(tgtFqn);
- // If it is an integration, calculate how many levels of offset
- int offset = move ? tgtFqn.size() - tferFqn.size() : 0;
-
- integrateStateTransferChildren(target, offset, in);
-
- integrateRetainedNodes(target, retainedNodes);
- }
- }
-
- private NodeData integrateStateTransferChildren(DataNode parent, int offset, ObjectInputStream in)
- throws IOException, ClassNotFoundException
- {
- int parent_level = parent.getFqn().size();
- int target_level = parent_level + 1;
- Fqn fqn;
- int size;
- Object name;
- NodeData nd = (NodeData) in.readObject();
- while (nd != null && !nd.isMarker())
- {
- fqn = nd.getFqn();
- // If we need to integrate into the buddy backup subtree,
- // change the Fqn to fit under it
- if (offset > 0)
- fqn = new Fqn(parent.getFqn().getFqnChild(offset), fqn);
- size = fqn.size();
- if (size <= parent_level)
- return nd;
- else if (size > target_level)
- throw new IllegalStateException("NodeData " + fqn + " is not a direct child of " + parent.getFqn());
-
- name = fqn.get(size - 1);
-
- // We handle this NodeData. Create a DataNode and
- // integrate its data
- DataNode target = factory.createDataNode(nodeType, name, fqn, parent, nd.getAttributes(), true, cache);
- parent.addChild(name, target);
-
- // Recursively call, which will walk down the tree
- // and return the next NodeData that's a child of our parent
- nd = integrateStateTransferChildren(target, offset, in);
- }
- return null;
- }
-
- private Set retainInternalNodes(DataNode target)
- {
- Set result = new HashSet();
- Fqn targetFqn = target.getFqn();
- for (Iterator it = internalFqns.iterator(); it.hasNext();)
- {
- Fqn internalFqn = (Fqn) it.next();
- if (internalFqn.isChildOf(targetFqn))
- {
- DataNode internalNode = getInternalNode(target, internalFqn);
- if (internalNode != null)
- result.add(internalNode);
- }
- }
-
- return result;
- }
-
- private DataNode getInternalNode(DataNode parent, Fqn internalFqn)
- {
- Object name = internalFqn.get(parent.getFqn().size());
- DataNode result = (DataNode) parent.getChild(name);
- if (result != null)
- {
- if (internalFqn.size() < result.getFqn().size())
- {
- // need to recursively walk down the tree
- result = getInternalNode(result, internalFqn);
- }
- }
- return result;
- }
-
- private void integrateRetainedNodes(DataNode root, Set retainedNodes)
- {
- Fqn rootFqn = root.getFqn();
- for (Iterator it = retainedNodes.iterator(); it.hasNext();)
- {
- DataNode retained = (DataNode) it.next();
- if (retained.getFqn().isChildOf(rootFqn))
- {
- integrateRetainedNode(root, retained);
- }
- }
- }
-
- private void integrateRetainedNode(DataNode ancestor, DataNode descendant)
- {
- Fqn descFqn = descendant.getFqn();
- Fqn ancFqn = ancestor.getFqn();
- Object name = descFqn.get(ancFqn.size());
- DataNode child = (DataNode) ancestor.getChild(name);
- if (ancFqn.size() == descFqn.size() + 1)
- {
- if (child == null)
- {
- ancestor.addChild(name, descendant);
- }
- else
- {
- log.warn("Received unexpected internal node " + descFqn + " in transferred state");
- }
- }
- else
- {
- if (child == null)
- {
- // Missing level -- have to create empty node
- // This shouldn't really happen -- internal fqns should
- // be immediately under the root
- child = factory.createDataNode(nodeType, name, new Fqn(ancFqn, name), ancestor, null, true, cache);
- ancestor.addChild(name, child);
- }
-
- // Keep walking down the tree
- integrateRetainedNode(child, descendant);
- }
- }
-
- protected TreeCache getCache()
- {
- return cache;
- }
-
- protected NodeFactory getFactory()
- {
- return factory;
- }
-
- protected byte getNodeType()
- {
- return nodeType;
- }
-
- protected Fqn getTargetFqn()
- {
- return targetFqn;
- }
-
}
1.1 date: 2006/08/31 20:30:45; author: vblagojevic; state: Exp;JBossCache/src/org/jboss/cache/statetransfer/AbstractStateTransferGenerator.java
Index: AbstractStateTransferGenerator.java
===================================================================
/*
* JBoss, the OpenSource J2EE webOS
*
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
package org.jboss.cache.statetransfer;
import java.io.ObjectOutputStream;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.cache.DataNode;
import org.jboss.cache.Fqn;
import org.jboss.cache.TreeCache;
import org.jboss.cache.Version;
import org.jboss.cache.loader.NodeData;
public class AbstractStateTransferGenerator
{
public static final short STATE_TRANSFER_VERSION = Version.getVersionShort("2.0.0.GA");
private Log log = LogFactory.getLog(getClass().getName());
private TreeCache cache;
private Set internalFqns;
protected AbstractStateTransferGenerator(TreeCache cache)
{
this.cache = cache;
this.internalFqns = cache.getInternalFqns();
}
protected void streamState(ObjectOutputStream out, DataNode rootNode, boolean generateTransient,
boolean generatePersistent, boolean suppressErrors) throws Throwable
{
Fqn fqn = rootNode.getFqn();
try
{
//transient + marker
try
{
out.writeShort(STATE_TRANSFER_VERSION);
if (generateTransient)
{
if (log.isTraceEnabled())
log.trace("writing transient state for " + fqn);
marshallTransientState(rootNode, out);
if (log.isTraceEnabled())
log.trace("transient state succesfully written");
}
}
catch (Throwable t)
{
log.error("failed getting the in-memory (transient) state", t);
if (!suppressErrors)
throw t;
}
finally
{
if (log.isTraceEnabled())
log.trace("writing delimeter after transient state");
out.writeObject(StateTransferManager.STREAMING_DELIMETER_NODE);
}
//associated + marker
try
{
if (log.isTraceEnabled())
log.trace("writing associated state");
marshallAssociatedState(fqn, out);
if (log.isTraceEnabled())
log.trace("associated state succesfully written");
}
catch (Throwable t)
{
log.error("failed writing associated state", t);
if (!suppressErrors)
throw t;
}
finally
{
if (log.isTraceEnabled())
log.trace("writing delimeter after associated state");
out.writeObject(StateTransferManager.STREAMING_DELIMETER_NODE);
}
//persistent + marker
try
{
if (generatePersistent)
{
if (log.isTraceEnabled())
log.trace("writing persistent state for " +
fqn + ",using " + cache.getCacheLoader().getClass());
if (fqn.isRoot())
{
cache.getCacheLoader().loadEntireState(out);
}
else
{
cache.getCacheLoader().loadState(fqn, out);
}
if (log.isTraceEnabled())
log.trace("persistent state succesfully written");
}
}
catch (Throwable t)
{
log.error("failed getting the persistent state", t);
if (!suppressErrors)
throw t;
}
finally
{
if (log.isTraceEnabled())
log.trace("writing delimeter after persistent state");
out.writeObject(StateTransferManager.STREAMING_DELIMETER_NODE);
}
}
finally
{
out.close();
}
}
/**
* Do a preorder traversal: visit the node first, then the node's children
*
* @param out
* @throws Exception
*/
protected void marshallTransientState(DataNode node, ObjectOutputStream out) throws Exception
{
if (internalFqns.contains(node.getFqn()))
return;
Map attrs;
NodeData nd;
// first handle the current node
attrs = node.getData();
if (attrs == null || attrs.size() == 0)
nd = new NodeData(node.getFqn());
else
nd = new NodeData(node.getFqn(), attrs);
out.writeObject(nd);
// then visit the children
Map children = node.getChildren();
if (children == null)
return;
for (Iterator it = children.entrySet().iterator(); it.hasNext();)
{
Map.Entry entry = (Map.Entry) it.next();
marshallTransientState((DataNode) entry.getValue(), out);
}
}
/**
* Does nothing in this base class; can be overridden in a subclass.
*/
protected void marshallAssociatedState(Fqn fqn, ObjectOutputStream baos) throws Exception
{
// no-op in this base class
}
protected TreeCache getTreeCache()
{
return cache;
}
}
1.1 date: 2006/08/31 20:30:45; author: vblagojevic; state: Exp;JBossCache/src/org/jboss/cache/statetransfer/AbstractStateTransferIntegrator.java
Index: AbstractStateTransferIntegrator.java
===================================================================
/*
* JBoss, the OpenSource J2EE webOS
*
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
package org.jboss.cache.statetransfer;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.cache.DataNode;
import org.jboss.cache.Fqn;
import org.jboss.cache.TreeCache;
import org.jboss.cache.buddyreplication.BuddyManager;
import org.jboss.cache.factories.NodeFactory;
import org.jboss.cache.loader.CacheLoader;
import org.jboss.cache.loader.NodeData;
public class AbstractStateTransferIntegrator
{
protected Log log = LogFactory.getLog(getClass().getName());
private TreeCache cache;
private Fqn targetFqn;
private NodeFactory factory;
private byte nodeType;
private Set internalFqns;
public AbstractStateTransferIntegrator(Fqn targetFqn, TreeCache cache)
{
this.targetFqn = targetFqn;
this.cache = cache;
this.factory = NodeFactory.getInstance();
this.nodeType = cache.getConfiguration().isNodeLockingOptimistic()
? NodeFactory.NODE_TYPE_OPTIMISTIC_NODE
: NodeFactory.NODE_TYPE_TREENODE;
this.internalFqns = cache.getInternalFqns();
}
protected void integrateTransientState(ObjectInputStream in,DataNode target, ClassLoader cl) throws Exception
{
boolean transientSet = false;
ClassLoader oldCL = setClassLoader(cl);
try
{
if (log.isTraceEnabled())
log.trace("integrating transient state for " + target);
integrateTransientState(target, in);
transientSet = true;
if (log.isTraceEnabled())
log.trace("transient state successfully integrated for " + targetFqn);
integrateAssociatedState(in);
notifyAllNodesCreated(target);
}
finally
{
if (!transientSet)
{
// Clear any existing state from the targetRoot
target.clear();
target.removeAllChildren();
}
resetClassLoader(oldCL);
}
}
/**
* Provided for subclasses that deal with associated state.
*
* @throws Exception
*/
protected void integrateAssociatedState(ObjectInputStream in) throws Exception
{
// no-op in this base class
// just read marker
in.readObject();
}
protected void integratePersistentState(ObjectInputStream in) throws Exception
{
CacheLoader loader = cache.getCacheLoader();
if (loader == null)
{
log.error("cache loader is null, cannot set persistent state");
}
else
{
if (log.isTraceEnabled())
log.trace("setting the persistent state using " + loader.getClass());
if (targetFqn.isRoot())
{
loader.storeEntireState(in);
}
else
{
loader.storeState(targetFqn, in);
}
if (log.isTraceEnabled())
log.trace("setting persistent state was successful");
}
}
/**
* Generates NodeAdded notifications for all nodes of the tree. This is
* called whenever the tree is initially retrieved (state transfer)
*/
private void notifyAllNodesCreated(DataNode curr)
{
DataNode n;
Map children;
if (curr == null) return;
getCache().getNotifier().notifyNodeCreated(curr.getFqn(), true);
getCache().getNotifier().notifyNodeCreated(curr.getFqn(), false);
if ((children = curr.getChildren()) != null)
{
for (Iterator it = children.values().iterator(); it.hasNext();)
{
n = (DataNode) it.next();
notifyAllNodesCreated(n);
}
}
}
private ClassLoader setClassLoader(ClassLoader newLoader)
{
ClassLoader oldClassLoader = null;
if (newLoader != null)
{
oldClassLoader = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(newLoader);
}
return oldClassLoader;
}
private void resetClassLoader(ClassLoader oldLoader)
{
if (oldLoader != null)
Thread.currentThread().setContextClassLoader(oldLoader);
}
private void integrateTransientState(DataNode target, ObjectInputStream in) throws IOException,
ClassNotFoundException
{
Set retainedNodes = retainInternalNodes(target);
target.removeAllChildren();
// Read the first NodeData and integrate into our target
NodeData nd = (NodeData) in.readObject();
//are there any transient nodes at all?
if (!nd.isMarker())
{
Map attrs = nd.getAttributes();
if (attrs != null)
target.put(attrs, true);
else
target.clear();
// Check whether this is an integration into the buddy backup
// subtree
Fqn tferFqn = nd.getFqn();
Fqn tgtFqn = target.getFqn();
boolean move = tgtFqn.isChildOrEquals(BuddyManager.BUDDY_BACKUP_SUBTREE_FQN)
&& !tferFqn.isChildOrEquals(tgtFqn);
// If it is an integration, calculate how many levels of offset
int offset = move ? tgtFqn.size() - tferFqn.size() : 0;
integrateStateTransferChildren(target, offset, in);
integrateRetainedNodes(target, retainedNodes);
}
}
private NodeData integrateStateTransferChildren(DataNode parent, int offset, ObjectInputStream in)
throws IOException, ClassNotFoundException
{
int parent_level = parent.getFqn().size();
int target_level = parent_level + 1;
Fqn fqn;
int size;
Object name;
NodeData nd = (NodeData) in.readObject();
while (nd != null && !nd.isMarker())
{
fqn = nd.getFqn();
// If we need to integrate into the buddy backup subtree,
// change the Fqn to fit under it
if (offset > 0)
fqn = new Fqn(parent.getFqn().getFqnChild(offset), fqn);
size = fqn.size();
if (size <= parent_level)
return nd;
else if (size > target_level)
throw new IllegalStateException("NodeData " + fqn + " is not a direct child of " + parent.getFqn());
name = fqn.get(size - 1);
// We handle this NodeData. Create a DataNode and
// integrate its data
DataNode target = factory.createDataNode(nodeType, name, fqn, parent, nd.getAttributes(), true, cache);
parent.addChild(name, target);
// Recursively call, which will walk down the tree
// and return the next NodeData that's a child of our parent
nd = integrateStateTransferChildren(target, offset, in);
}
return null;
}
private Set retainInternalNodes(DataNode target)
{
Set result = new HashSet();
Fqn targetFqn = target.getFqn();
for (Iterator it = internalFqns.iterator(); it.hasNext();)
{
Fqn internalFqn = (Fqn) it.next();
if (internalFqn.isChildOf(targetFqn))
{
DataNode internalNode = getInternalNode(target, internalFqn);
if (internalNode != null)
result.add(internalNode);
}
}
return result;
}
private DataNode getInternalNode(DataNode parent, Fqn internalFqn)
{
Object name = internalFqn.get(parent.getFqn().size());
DataNode result = (DataNode) parent.getChild(name);
if (result != null)
{
if (internalFqn.size() < result.getFqn().size())
{
// need to recursively walk down the tree
result = getInternalNode(result, internalFqn);
}
}
return result;
}
private void integrateRetainedNodes(DataNode root, Set retainedNodes)
{
Fqn rootFqn = root.getFqn();
for (Iterator it = retainedNodes.iterator(); it.hasNext();)
{
DataNode retained = (DataNode) it.next();
if (retained.getFqn().isChildOf(rootFqn))
{
integrateRetainedNode(root, retained);
}
}
}
private void integrateRetainedNode(DataNode ancestor, DataNode descendant)
{
Fqn descFqn = descendant.getFqn();
Fqn ancFqn = ancestor.getFqn();
Object name = descFqn.get(ancFqn.size());
DataNode child = (DataNode) ancestor.getChild(name);
if (ancFqn.size() == descFqn.size() + 1)
{
if (child == null)
{
ancestor.addChild(name, descendant);
}
else
{
log.warn("Received unexpected internal node " + descFqn + " in transferred state");
}
}
else
{
if (child == null)
{
// Missing level -- have to create empty node
// This shouldn't really happen -- internal fqns should
// be immediately under the root
child = factory.createDataNode(nodeType, name, new Fqn(ancFqn, name), ancestor, null, true, cache);
ancestor.addChild(name, child);
}
// Keep walking down the tree
integrateRetainedNode(child, descendant);
}
}
protected TreeCache getCache()
{
return cache;
}
protected NodeFactory getFactory()
{
return factory;
}
protected byte getNodeType()
{
return nodeType;
}
protected Fqn getTargetFqn()
{
return targetFqn;
}
}
More information about the jboss-cvs-commits
mailing list