[jboss-cvs] JBossCache/src/org/jboss/cache/statetransfer ...
Manik Surtani
msurtani at jboss.com
Wed Jan 17 09:13:06 EST 2007
User: msurtani
Date: 07/01/17 09:13:06
Modified: src/org/jboss/cache/statetransfer
DefaultStateTransferIntegrator.java
StateTransferIntegrator.java
StateTransferManager.java StateTransferFactory.java
DefaultStateTransferGenerator.java
Log:
JBCACHE-908
Revision Changes Path
1.20 +60 -33 JBossCache/src/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: DefaultStateTransferIntegrator.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java,v
retrieving revision 1.19
retrieving revision 1.20
diff -u -b -r1.19 -r1.20
--- DefaultStateTransferIntegrator.java 15 Jan 2007 16:19:09 -0000 1.19
+++ DefaultStateTransferIntegrator.java 17 Jan 2007 14:13:06 -0000 1.20
@@ -25,6 +25,8 @@
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -54,17 +56,17 @@
this.internalFqns = cache.getInternalFqns();
}
- public void integrateState(ObjectInputStream ois, Node target, ClassLoader cl) throws Exception
+ public void integrateState(ObjectInputStream ois, Node target) throws Exception
{
- integrateTransientState(ois, (NodeSPI) target, cl);
+ integrateTransientState(ois, (NodeSPI) target);
integrateAssociatedState(ois);
integratePersistentState(ois);
}
- protected void integrateTransientState(ObjectInputStream in, NodeSPI target, ClassLoader cl) throws Exception
+ protected void integrateTransientState(ObjectInputStream in, NodeSPI target) throws Exception
{
boolean transientSet = false;
- ClassLoader oldCL = setClassLoader(cl);
+// ClassLoader oldCL = setClassLoader(cl);
try
{
if (log.isTraceEnabled())
@@ -93,7 +95,7 @@
target.removeChildrenDirect();
}
- resetClassLoader(oldCL);
+// resetClassLoader(oldCL);
}
}
@@ -106,7 +108,7 @@
{
// no-op in this base class
// just read marker
- readNode(in);
+ cache.getMarshaller().objectFromObjectStream(in);
}
protected void integratePersistentState(ObjectInputStream in) throws Exception
@@ -200,6 +202,7 @@
}
}
+ /*
private ClassLoader setClassLoader(ClassLoader newLoader)
{
ClassLoader oldClassLoader = null;
@@ -218,16 +221,21 @@
Thread.currentThread().setContextClassLoader(oldLoader);
}
}
+ */
- private void integrateTransientState(NodeSPI target, ObjectInputStream in) throws IOException,
- ClassNotFoundException
+ private void integrateTransientState(NodeSPI target, ObjectInputStream in) throws Exception
{
Set<Node> retainedNodes = retainInternalNodes(target);
target.removeChildrenDirect();
+ List<NodeData> list = readNodesAsList(in);
+ Iterator<NodeData> nodeDataIterator = list.iterator();
+
// Read the first NodeData and integrate into our target
- NodeData nd = readNode(in);
+ if (nodeDataIterator.hasNext())
+ {
+ NodeData nd = nodeDataIterator.next();
//are there any transient nodes at all?
if (nd != null && !nd.isMarker())
@@ -243,25 +251,44 @@
// If it is an integration, calculate how many levels of offset
int offset = move ? tgtFqn.size() - tferFqn.size() : 0;
- integrateStateTransferChildren(target, offset, in);
+ integrateStateTransferChildren(target, offset, nodeDataIterator);
integrateRetainedNodes(target, retainedNodes);
}
}
- private NodeData readNode(ObjectInputStream in) throws IOException, ClassNotFoundException
+ // read marker off stack
+ cache.getMarshaller().objectFromObjectStream(in);
+ }
+
+ private List<NodeData> readNodesAsList(ObjectInputStream in) throws Exception
{
- NodeData nd = (NodeData) in.readObject();
- if (nd != null && nd.isExceptionMarker())
+ List list = (List) cache.getMarshaller().objectFromObjectStream(in);
+ for (Object o : list)
{
- NodeDataExceptionMarker ndem = (NodeDataExceptionMarker) nd;
+ if (o instanceof NodeDataExceptionMarker)
+ {
+ NodeDataExceptionMarker ndem = (NodeDataExceptionMarker) o;
throw new CacheException("State provider node " + ndem.getCacheNodeIdentity()
+ " threw exception during loadState", ndem.getCause());
}
- return nd;
}
+ return list;
+ }
+
+// private NodeData readNode(ObjectInputStream in) throws IOException, ClassNotFoundException
+// {
+// NodeData nd = (NodeData) in.readObject();
+// if (nd != null && nd.isExceptionMarker())
+// {
+// NodeDataExceptionMarker ndem = (NodeDataExceptionMarker) nd;
+// throw new CacheException("State provider node " + ndem.getCacheNodeIdentity()
+// + " threw exception during loadState", ndem.getCause());
+// }
+// return nd;
+// }
- private NodeData integrateStateTransferChildren(NodeSPI parent, int offset, ObjectInputStream in)
+ private NodeData integrateStateTransferChildren(NodeSPI parent, int offset, Iterator<NodeData> nodeDataIterator)
throws IOException, ClassNotFoundException
{
int parent_level = parent.getFqn().size();
@@ -269,7 +296,7 @@
Fqn fqn;
int size;
Object name;
- NodeData nd = readNode(in);
+ NodeData nd = nodeDataIterator.hasNext() ? nodeDataIterator.next() : null;
while (nd != null && !nd.isMarker())
{
fqn = nd.getFqn();
@@ -308,7 +335,7 @@
// Recursively call, which will walk down the tree
// and return the next NodeData that's a child of our parent
- nd = integrateStateTransferChildren(target, offset, in);
+ nd = integrateStateTransferChildren(target, offset, nodeDataIterator);
}
return null;
}
1.6 +1 -1 JBossCache/src/org/jboss/cache/statetransfer/StateTransferIntegrator.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: StateTransferIntegrator.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/StateTransferIntegrator.java,v
retrieving revision 1.5
retrieving revision 1.6
diff -u -b -r1.5 -r1.6
--- StateTransferIntegrator.java 14 Dec 2006 17:18:48 -0000 1.5
+++ StateTransferIntegrator.java 17 Jan 2007 14:13:06 -0000 1.6
@@ -13,6 +13,6 @@
public interface StateTransferIntegrator
{
- void integrateState(ObjectInputStream ois, Node target, ClassLoader cl) throws Exception;
+ void integrateState(ObjectInputStream ois, Node target) throws Exception;
}
\ No newline at end of file
1.24 +19 -20 JBossCache/src/org/jboss/cache/statetransfer/StateTransferManager.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: StateTransferManager.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/StateTransferManager.java,v
retrieving revision 1.23
retrieving revision 1.24
diff -u -b -r1.23 -r1.24
--- StateTransferManager.java 15 Jan 2007 18:10:56 -0000 1.23
+++ StateTransferManager.java 17 Jan 2007 14:13:06 -0000 1.24
@@ -29,9 +29,9 @@
{
protected final static Log log = LogFactory.getLog(StateTransferManager.class);
- public static final NodeData STREAMING_DELIMETER_NODE = new NodeDataMarker();
+ public static final NodeData STREAMING_DELIMITER_NODE = new NodeDataMarker();
- public static final String PARTIAL_STATE_DELIMETER = "_PARTIAL_STATE_DELIMETER";
+ public static final String PARTIAL_STATE_DELIMITER = "_PARTIAL_STATE_DELIMITER";
private final CacheImpl cache;
@@ -51,6 +51,7 @@
* <p/>
* <p/>
*
+ * @param out stream to write state to
* @param fqn Fqn indicating the uppermost node in the
* portion of the tree whose state should be returned.
* @param timeout max number of ms this method should wait to acquire
@@ -61,8 +62,7 @@
* on the nodes be rolled back? <strong>NOTE:</strong>
* In release 1.2.4, this parameter has no effect.
* @param suppressErrors should any Throwable thrown be suppressed?
- * @return a serialized byte[][], element 0 is the transient state
- * (or null), and element 1 is the persistent state (or null).
+ * @throws Throwable in event of error
*/
public void getState(ObjectOutputStream out, Fqn fqn, long timeout, boolean force, boolean suppressErrors) throws Throwable
{
@@ -77,7 +77,7 @@
if (canProvideState && (fetchPersistentState || fetchTransientState))
{
- out.writeBoolean(true);
+ cache.getMarshaller().objectToObjectStream(true, out);
StateTransferGenerator generator = getStateTransferGenerator();
Object owner = getOwnerForLock();
long startTime = System.currentTimeMillis();
@@ -97,7 +97,7 @@
}
else
{
- out.writeBoolean(false);
+ cache.getMarshaller().objectToObjectStream(false, out);
Exception e = null;
if (!canProvideState)
{
@@ -117,7 +117,7 @@
{
e = new CacheException("Cache instance at " + cache.getLocalAddress() + " is not configured to provide state");
}
- out.writeObject(e);
+ cache.getMarshaller().objectToObjectStream(e, out);
throw e;
}
}
@@ -152,15 +152,15 @@
* <strong>NOTE:</strong> This method performs no locking of nodes; it
* is up to the caller to lock <code>targetRoot</code> before calling
* this method.
+ * <p/>
+ * This method will use any {@linl ClassLoader} needed as defined by the active {@link org.jboss.cache.Region}
+ * in the {@link org.jboss.cache.RegionManager}, pertaining to the targetRoot passed in.
*
- * @param in a serialized byte[][] array where element 0 is the
- * transient state (or null) , and element 1 is the
- * persistent state (or null)
+ * @param in an input stream containing the state
* @param targetRoot fqn of the node into which the state should be integrated
- * @param cl classloader to use to unmarshal the state, or
- * <code>null</code> if the TCCL should be used
+ * @throws Exception In event of error
*/
- public void setState(ObjectInputStream in, Fqn targetRoot, ClassLoader cl) throws Exception
+ public void setState(ObjectInputStream in, Fqn targetRoot) throws Exception
{
CacheImpl cache = getTreeCache();
NodeSPI target = cache.findNode(targetRoot);
@@ -171,15 +171,16 @@
cache.put(targetRoot, null);
target = cache.findNode(targetRoot);
}
- boolean hasState = in.readBoolean();
+ Object o = cache.getMarshaller().objectFromObjectStream(in);
+ Boolean hasState = (Boolean) o;
if (hasState)
{
- setState(in, target, cl);
+ setState(in, target);
}
else
{
throw new CacheException("Cache instance at " + cache.getLocalAddress()
- + " cannot integrate state since state provider could not provide state due to " + in.readObject());
+ + " cannot integrate state since state provider could not provide state due to " + cache.getMarshaller().objectFromObjectStream(in));
}
}
@@ -196,10 +197,8 @@
* transient state (or null) , and element 1 is the
* persistent state (or null)
* @param targetRoot node into which the state should be integrated
- * @param cl classloader to use to unmarshal the state, or
- * <code>null</code> if the TCCL should be used
*/
- private void setState(ObjectInputStream state, NodeSPI targetRoot, ClassLoader cl) throws Exception
+ private void setState(ObjectInputStream state, NodeSPI targetRoot) throws Exception
{
Object owner = getOwnerForLock();
long timeout = cache.getConfiguration().getInitialStateRetrievalTimeout();
@@ -225,7 +224,7 @@
StateTransferIntegrator integrator = getStateTransferIntegrator(state, targetRoot.getFqn());
log.info("starting state integration at node " + targetRoot);
- integrator.integrateState(state, targetRoot, cl);
+ integrator.integrateState(state, targetRoot);
log.info("successfully integrated state in " + (System.currentTimeMillis() - startTime) + " msec");
}
finally
1.14 +2 -2 JBossCache/src/org/jboss/cache/statetransfer/StateTransferFactory.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: StateTransferFactory.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/StateTransferFactory.java,v
retrieving revision 1.13
retrieving revision 1.14
diff -u -b -r1.13 -r1.14
--- StateTransferFactory.java 30 Dec 2006 17:49:57 -0000 1.13
+++ StateTransferFactory.java 17 Jan 2007 14:13:06 -0000 1.14
@@ -18,7 +18,7 @@
* {@link StateTransferIntegrator} instances.
*
* @author <a href="brian.stansberry at jboss.com">Brian Stansberry</a>
- * @version $Revision: 1.13 $
+ * @version $Revision: 1.14 $
*/
public abstract class StateTransferFactory
{
@@ -50,7 +50,7 @@
short version = 0;
try
{
- version = in.readShort();
+ version = (Short) cache.getMarshaller().objectFromObjectStream(in);
}
catch (IOException io)
{
1.16 +32 -11 JBossCache/src/org/jboss/cache/statetransfer/DefaultStateTransferGenerator.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: DefaultStateTransferGenerator.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/DefaultStateTransferGenerator.java,v
retrieving revision 1.15
retrieving revision 1.16
diff -u -b -r1.15 -r1.16
--- DefaultStateTransferGenerator.java 15 Jan 2007 16:19:09 -0000 1.15
+++ DefaultStateTransferGenerator.java 17 Jan 2007 14:13:06 -0000 1.16
@@ -17,7 +17,10 @@
import org.jboss.cache.marshall.NodeData;
import org.jboss.cache.marshall.NodeDataExceptionMarker;
+import java.io.IOException;
import java.io.ObjectOutputStream;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -44,7 +47,7 @@
Fqn fqn = rootNode.getFqn();
try
{
- out.writeShort(STATE_TRANSFER_VERSION);
+ cache.getMarshaller().objectToObjectStream(STATE_TRANSFER_VERSION, out);
if (generateTransient)
{
//transient + marker
@@ -52,9 +55,8 @@
{
log.trace("writing transient state for " + fqn);
}
-
marshallTransientState((NodeSPI) rootNode, out);
- out.writeObject(StateTransferManager.STREAMING_DELIMETER_NODE);
+ delimitStream(out);
if (log.isTraceEnabled())
{
@@ -68,7 +70,7 @@
}
marshallAssociatedState(fqn, out);
- out.writeObject(StateTransferManager.STREAMING_DELIMETER_NODE);
+ delimitStream(out);
if (log.isTraceEnabled())
{
@@ -79,8 +81,8 @@
else
{
//we have to write two markers for transient and associated
- out.writeObject(StateTransferManager.STREAMING_DELIMETER_NODE);
- out.writeObject(StateTransferManager.STREAMING_DELIMETER_NODE);
+ delimitStream(out);
+ delimitStream(out);
}
CacheLoader cacheLoader = cache.getCacheLoaderManager() == null ? null : cache.getCacheLoaderManager().getCacheLoader();
@@ -105,16 +107,27 @@
log.trace("persistent state succesfully written");
}
}
- out.writeObject(StateTransferManager.STREAMING_DELIMETER_NODE);
+ delimitStream(out);
}
catch (Throwable t)
{
- out.writeObject(new NodeDataExceptionMarker(t, cache.getLocalAddress()));
+ cache.getMarshaller().objectToObjectStream(new NodeDataExceptionMarker(t, cache.getLocalAddress()), out);
throw t;
}
}
/**
+ * Places a delimiter marker on the stream
+ *
+ * @param out stream
+ * @throws IOException if there are errs
+ */
+ protected void delimitStream(ObjectOutputStream out) throws Exception
+ {
+ cache.getMarshaller().objectToObjectStream(StateTransferManager.STREAMING_DELIMITER_NODE, out);
+ }
+
+ /**
* Do a preorder traversal: visit the node first, then the node's children
*
* @param out
@@ -122,13 +135,19 @@
*/
protected void marshallTransientState(NodeSPI node, ObjectOutputStream out) throws Exception
{
+ List<NodeData> nodeData = new LinkedList<NodeData>();
+ generateNodeDataList(node, nodeData);
+ cache.getMarshaller().objectToObjectStream(nodeData, out, node.getFqn());
+ }
+ protected void generateNodeDataList(NodeSPI node, List<NodeData> list) throws Exception
+ {
if (internalFqns.contains(node.getFqn()))
{
return;
}
- Map attrs;
+ Map<Object, Object> attrs;
NodeData nd;
// first handle the current node
@@ -141,12 +160,14 @@
{
nd = new NodeData(node.getFqn(), attrs);
}
- out.writeObject(nd);
+
+ list.add(nd);
// then visit the children
for (NodeSPI child : node.getChildrenDirect())
{
- marshallTransientState(child, out);
+ //marshallTransientState(child, out);
+ generateNodeDataList(child, list);
}
}
More information about the jboss-cvs-commits
mailing list