[jboss-cvs] JBossCache/src/org/jboss/cache/statetransfer ...
Vladmir Blagojevic
vladimir.blagojevic at jboss.com
Thu Aug 24 18:05:34 EDT 2006
User: vblagojevic
Date: 06/08/24 18:05:34
Modified: src/org/jboss/cache/statetransfer
StateTransferFactory.java StateTransferManager.java
Added: src/org/jboss/cache/statetransfer
StreamingStateTransferGenerator_200.java
StreamingStateTransferIntegrator_200.java
Log:
streaming state transfer integration (work in progress)
Revision Changes Path
1.9 +39 -1 JBossCache/src/org/jboss/cache/statetransfer/StateTransferFactory.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: StateTransferFactory.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/StateTransferFactory.java,v
retrieving revision 1.8
retrieving revision 1.9
diff -u -b -r1.8 -r1.9
--- StateTransferFactory.java 20 Jul 2006 21:58:21 -0000 1.8
+++ StateTransferFactory.java 24 Aug 2006 22:05:34 -0000 1.9
@@ -13,13 +13,15 @@
import java.io.ByteArrayInputStream;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
/**
* Factory class able to create {@link StateTransferGenerator} and
* {@link StateTransferIntegrator} instances.
*
* @author <a href="brian.stansberry at jboss.com">Brian Stansberry</a>
- * @version $Revision: 1.8 $
+ * @version $Revision: 1.9 $
*/
public abstract class StateTransferFactory
{
@@ -47,6 +49,18 @@
return new StateTransferGenerator_200(cache); // current default
}
+ public static StateTransferGenerator getStateTransferGenerator(OutputStream os, TreeCache cache)
+ {
+ short version = cache.getConfiguration().getReplicationVersion();
+
+ // Compiler won't let me use a switch
+
+ if (version < RV_200 && version > 0) // <= 0 is actually a version > 15.31.63
+ throw new IllegalStateException("State transfer with cache replication version < 2.0.0 not supported");
+ else
+ return new StreamingStateTransferGenerator_200(os,cache); // current default
+ }
+
/**
* Gets a StateTransferIntegrator able to handle the given state.
*
@@ -93,4 +107,28 @@
catch (IOException io) {}
}
}
+
+ public static StateTransferIntegrator getStateTransferIntegrator(InputStream istream, Fqn fqn, TreeCache treeCache)
+ throws Exception
+ {
+ short version = 0;
+ MarshalledValueInputStream in = new MarshalledValueInputStream(istream);
+ try
+ {
+ version = in.readShort();
+ }
+ catch (IOException io)
+ {
+ // No short at the head of the stream means version 123
+ throw new IllegalStateException("State transfer with cache replication version < 2.0.0 not supported");
+ }
+
+ // Compiler won't let me use a switch
+
+ if (version < RV_200 && version > 0) // <= 0 is actually a version > 15.31.63
+ throw new IllegalStateException("State transfer with cache replication version < 2.0.0 not supported");
+ else
+ return new StreamingStateTransferIntegrator_200(in, fqn, treeCache); // current default
+ }
+
}
1.4 +59 -13 JBossCache/src/org/jboss/cache/statetransfer/StateTransferManager.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: StateTransferManager.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/StateTransferManager.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -b -r1.3 -r1.4
--- StateTransferManager.java 16 Aug 2006 10:52:50 -0000 1.3
+++ StateTransferManager.java 24 Aug 2006 22:05:34 -0000 1.4
@@ -1,5 +1,7 @@
package org.jboss.cache.statetransfer;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -13,6 +15,7 @@
import org.jboss.cache.TreeCache;
import org.jboss.cache.config.Option;
import org.jboss.cache.loader.CacheLoaderManager;
+import org.jboss.cache.loader.NodeData;
import org.jboss.cache.lock.TimeoutException;
import org.jboss.cache.marshall.MethodCallFactory;
import org.jboss.cache.marshall.MethodDeclarations;
@@ -23,6 +26,8 @@
{
protected final static Log log = LogFactory.getLog(StateTransferManager.class);
+ public static final NodeData STREAMING_DELIMETER_NODE = new NodeData(Fqn.fromString("STREAMING_DELIMETER_NODE"),null);
+
private TreeCache treeCache;
private long[] loadStateTimeouts = { 400, 800, 1200 };
@@ -73,8 +78,9 @@
* enabled, the requested Fqn is not the root node, and the
* cache loader does not implement {@link ExtendedCacheLoader}.
*/
- public byte[] getState(Fqn fqn, long timeout, boolean force, boolean suppressErrors) throws Throwable
+ public byte[] getState(OutputStream os,Fqn fqn, long timeout, boolean force, boolean suppressErrors) throws Throwable
{
+ boolean usingStreamingStateTransfer = os!=null;
TreeCache cache = getTreeCache();
VersionAwareMarshaller marshaller_ = null;
@@ -120,7 +126,15 @@
acquireLocksForStateTransfer(rootNode, owner, timeout, true, force);
}
- StateTransferGenerator generator = getStateTransferGenerator();
+ StateTransferGenerator generator = null;
+ if (usingStreamingStateTransfer)
+ {
+ generator = getStateTransferGenerator(os);
+ }
+ else
+ {
+ generator = getStateTransferGenerator();
+ }
return generator.generateStateTransfer(rootNode,
fetchTransientState,
@@ -133,6 +147,11 @@
}
}
+ public byte[] getState(Fqn fqn, long timeout, boolean force, boolean suppressErrors) throws Throwable
+ {
+ return getState(null,fqn,timeout,force,suppressErrors);
+ }
+
/**
* Requests state from each of the given source nodes in the cluster
* until it gets it or no node replies with a timeout exception. If state
@@ -267,7 +286,7 @@
* @param cl classloader to use to unmarshal the state, or
* <code>null</code> if the TCCL should be used
*/
- public void setState(byte[] new_state, Fqn targetRoot, ClassLoader cl)
+ public void setState(Object state, Fqn targetRoot, ClassLoader cl)
throws Exception
{
TreeCache cache = getTreeCache();
@@ -281,7 +300,7 @@
target = cache.findNode(targetRoot);
}
- setState(new_state, target, cl);
+ setState(state, target, cl);
}
/**
@@ -300,16 +319,26 @@
* @param cl classloader to use to unmarshal the state, or
* <code>null</code> if the TCCL should be used
*/
- private void setState(byte[] new_state, DataNode targetRoot, ClassLoader cl)
+ private void setState(Object state, DataNode targetRoot, ClassLoader cl)
throws Exception
{
- if (new_state == null)
+ if (state == null)
{
log.info("new_state is null (may be first member in cluster)");
return;
}
+ byte [] new_state = null;
+ InputStream istream = null;
+ if(state instanceof byte[])
+ {
+ new_state = (byte[]) state;
log.info("received the state (size=" + new_state.length + " bytes)");
+ }
+ else
+ {
+ istream = (InputStream) state;
+ }
Object owner = getOwnerForLock();
try
@@ -319,11 +348,18 @@
getTreeCache().getConfiguration().getInitialStateRetrievalTimeout(),
true, true);
- // 1. Unserialize the states into transient and persistent state
- StateTransferIntegrator integrator = getStateTransferIntegrator(new_state,
+ StateTransferIntegrator integrator =null;
+ if(new_state!=null)
+ {
+ integrator = getStateTransferIntegrator(new_state,targetRoot.getFqn());
+ }
+ else
+ {
+ integrator = getStateTransferIntegrator(istream,
targetRoot.getFqn());
+ }
- // 2. If transient state is available, integrate it
+ // If transient state is available, integrate it
try
{
integrator.integrateTransientState(targetRoot, cl);
@@ -334,7 +370,7 @@
log.error("failed setting transient state", t);
}
- // 3. Store any persistent state
+ // Store any persistent state
integrator.integratePersistentState();
}
finally
@@ -407,12 +443,22 @@
return StateTransferFactory.getStateTransferGenerator(getTreeCache());
}
+ protected StateTransferGenerator getStateTransferGenerator(OutputStream os)
+ {
+ return StateTransferFactory.getStateTransferGenerator(os,getTreeCache());
+ }
+
protected StateTransferIntegrator getStateTransferIntegrator(byte[] state, Fqn targetFqn)
throws Exception
{
return StateTransferFactory.getStateTransferIntegrator(state, targetFqn, getTreeCache());
}
+ private StateTransferIntegrator getStateTransferIntegrator(InputStream istream, Fqn fqn) throws Exception
+ {
+ return StateTransferFactory.getStateTransferIntegrator(istream, fqn, getTreeCache());
+ }
+
/**
* Generates NodeAdded notifications for all nodes of the tree. This is
* called whenever the tree is initially retrieved (state transfer)
1.1 date: 2006/08/24 22:05:34; author: vblagojevic; state: Exp;JBossCache/src/org/jboss/cache/statetransfer/StreamingStateTransferGenerator_200.java
Index: StreamingStateTransferGenerator_200.java
===================================================================
/*
* JBoss, the OpenSource J2EE webOS
*
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
package org.jboss.cache.statetransfer;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.cache.DataNode;
import org.jboss.cache.Fqn;
import org.jboss.cache.TreeCache;
import org.jboss.cache.Version;
import org.jboss.cache.loader.NodeData;
import org.jboss.invocation.MarshalledValueOutputStream;
public class StreamingStateTransferGenerator_200 implements StateTransferGenerator
{
public static final short STATE_TRANSFER_VERSION = Version.getVersionShort("2.0.0.GA");
private Log log = LogFactory.getLog(getClass().getName());
private TreeCache cache;
private Set internalFqns;
private OutputStream os;
protected StreamingStateTransferGenerator_200(OutputStream os, TreeCache cache)
{
this.cache = cache;
this.os = os;
this.internalFqns = cache.getInternalFqns();
}
public byte[] generateStateTransfer(DataNode rootNode, boolean generateTransient, boolean generatePersistent,
boolean suppressErrors) throws Throwable
{
Fqn fqn = rootNode.getFqn();
MarshalledValueOutputStream out = new MarshalledValueOutputStream(os);
try
{
out.writeShort(STATE_TRANSFER_VERSION);
try
{
if (generateTransient)
{
marshallTransientState(rootNode, out);
log.debug("generated the in-memory state");
// Return any state associated with the subtree but not stored in it
marshallAssociatedState(fqn, os);
log.debug("returning the associated state bytes)");
}
}
catch (Throwable t)
{
log.error("failed getting the in-memory (transient) state", t);
if (!suppressErrors)
throw t;
}
finally
{
log.debug("writing delimeter after transient state");
out.writeObject(StateTransferManager.STREAMING_DELIMETER_NODE);
}
if (generatePersistent)
{
try
{
//TODO
}
catch (Throwable t)
{
log.error("failed getting the persistent state", t);
if (!suppressErrors)
throw t;
}
}
}
finally
{
out.close();
}
return null;
}
/**
* Do a preorder traversal: visit the node first, then the node's children
*
* @param out
* @throws Exception
*/
protected void marshallTransientState(DataNode node, ObjectOutputStream out) throws Exception
{
if (internalFqns.contains(node.getFqn()))
return;
Map attrs;
NodeData nd;
// first handle the current node
attrs = node.getData();
if (attrs == null || attrs.size() == 0)
nd = new NodeData(node.getFqn());
else
nd = new NodeData(node.getFqn(), attrs);
out.writeObject(nd);
// then visit the children
Map children = node.getChildren();
if (children == null)
return;
for (Iterator it = children.entrySet().iterator(); it.hasNext();)
{
Map.Entry entry = (Map.Entry) it.next();
marshallTransientState((DataNode) entry.getValue(), out);
}
}
/**
* Does nothing in this base class; can be overridden in a subclass.
*/
protected void marshallAssociatedState(Fqn fqn, OutputStream baos) throws Exception
{
// no-op in this base class
}
protected TreeCache getTreeCache()
{
return cache;
}
}
1.1 date: 2006/08/24 22:05:34; author: vblagojevic; state: Exp;JBossCache/src/org/jboss/cache/statetransfer/StreamingStateTransferIntegrator_200.java
Index: StreamingStateTransferIntegrator_200.java
===================================================================
/*
* JBoss, the OpenSource J2EE webOS
*
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
package org.jboss.cache.statetransfer;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.cache.DataNode;
import org.jboss.cache.Fqn;
import org.jboss.cache.TreeCache;
import org.jboss.cache.buddyreplication.BuddyManager;
import org.jboss.cache.factories.NodeFactory;
import org.jboss.cache.loader.NodeData;
public class StreamingStateTransferIntegrator_200 implements StateTransferIntegrator
{
/** Number of bytes at the beginning of the state transfer byte[]
* utilized by meta-information about the composition of the byte[]
* (6 for stream header, 2 for version short,
* 3 * 4 for lengths of the state components, 2 bytes for close)
*/
protected static final int HEADER_LENGTH = 6 + 2 + 4 + 4 + 4;// + 2;
protected Log log = LogFactory.getLog(getClass().getName());
private TreeCache cache;
private Fqn targetFqn;
private boolean transientSet;
private NodeFactory factory;
private byte nodeType;
private Set internalFqns;
ObjectInputStream in; //used in streaming state transfer
public StreamingStateTransferIntegrator_200(ObjectInputStream inputStream, Fqn targetFqn, TreeCache cache)
{
this.targetFqn = targetFqn;
this.cache = cache;
this.factory = NodeFactory.getInstance();
this.nodeType = cache.getConfiguration().isNodeLockingOptimistic()
? NodeFactory.NODE_TYPE_OPTIMISTIC_NODE
: NodeFactory.NODE_TYPE_TREENODE;
this.internalFqns = cache.getInternalFqns();
in = inputStream;
}
public void integrateTransientState(DataNode target, ClassLoader cl) throws Exception
{
ClassLoader oldCL = setClassLoader(cl);
try
{
if (log.isTraceEnabled())
log.trace("integrating transient state for " + target);
integrateTransientState(target, in);
transientSet = true;
if (log.isTraceEnabled())
log.trace("transient state successfully integrated for " + targetFqn);
integrateAssociatedState();
}
finally
{
if (!transientSet)
{
// Clear any existing state from the targetRoot
target.clear();
target.removeAllChildren();
}
resetClassLoader(oldCL);
}
}
/**
* Provided for subclasses that deal with associated state.
*
* @throws Exception
*/
protected void integrateAssociatedState() throws Exception
{
// no-op in this base class
}
public void integratePersistentState() throws Exception
{
try
{
//TODO
}
finally
{
in.close();
}
}
private ClassLoader setClassLoader(ClassLoader newLoader)
{
ClassLoader oldClassLoader = null;
if (newLoader != null)
{
oldClassLoader = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(newLoader);
}
return oldClassLoader;
}
private void resetClassLoader(ClassLoader oldLoader)
{
if (oldLoader != null)
Thread.currentThread().setContextClassLoader(oldLoader);
}
private void integrateTransientState(DataNode target, ObjectInputStream in) throws IOException,
ClassNotFoundException
{
Set retainedNodes = retainInternalNodes(target);
target.removeAllChildren();
// Read the first NodeData and integrate into our target
NodeData nd = (NodeData) in.readObject();
//are there any transient nodes at all?
if (!StateTransferManager.STREAMING_DELIMETER_NODE.getFqn().equals(nd.getFqn()))
{
Map attrs = nd.getAttributes();
if (attrs != null)
target.put(attrs, true);
else
target.clear();
// Check whether this is an integration into the buddy backup
// subtree
Fqn tferFqn = nd.getFqn();
Fqn tgtFqn = target.getFqn();
boolean move = tgtFqn.isChildOrEquals(BuddyManager.BUDDY_BACKUP_SUBTREE_FQN)
&& !tferFqn.isChildOrEquals(tgtFqn);
// If it is an integration, calculate how many levels of offset
int offset = move ? tgtFqn.size() - tferFqn.size() : 0;
integrateStateTransferChildren(target, offset, in);
integrateRetainedNodes(target, retainedNodes);
}
}
private NodeData integrateStateTransferChildren(DataNode parent, int offset, ObjectInputStream in)
throws IOException, ClassNotFoundException
{
int parent_level = parent.getFqn().size();
int target_level = parent_level + 1;
Fqn fqn;
int size;
Object name;
NodeData nd = (NodeData) in.readObject();
while (nd != null && !StateTransferManager.STREAMING_DELIMETER_NODE.getFqn().equals(nd.getFqn()))
{
fqn = nd.getFqn();
// If we need to integrate into the buddy backup subtree,
// change the Fqn to fit under it
if (offset > 0)
fqn = new Fqn(parent.getFqn().getFqnChild(offset), fqn);
size = fqn.size();
if (size <= parent_level)
return nd;
else if (size > target_level)
throw new IllegalStateException("NodeData " + fqn + " is not a direct child of " + parent.getFqn());
name = fqn.get(size - 1);
// We handle this NodeData. Create a DataNode and
// integrate its data
DataNode target = factory.createDataNode(nodeType, name, fqn, parent, nd.getAttributes(), true, cache);
parent.addChild(name, target);
// Recursively call, which will walk down the tree
// and return the next NodeData that's a child of our parent
nd = integrateStateTransferChildren(target, offset, in);
}
return null;
}
private Set retainInternalNodes(DataNode target)
{
Set result = new HashSet();
Fqn targetFqn = target.getFqn();
for (Iterator it = internalFqns.iterator(); it.hasNext();)
{
Fqn internalFqn = (Fqn) it.next();
if (internalFqn.isChildOf(targetFqn))
{
DataNode internalNode = getInternalNode(target, internalFqn);
if (internalNode != null)
result.add(internalNode);
}
}
return result;
}
private DataNode getInternalNode(DataNode parent, Fqn internalFqn)
{
Object name = internalFqn.get(parent.getFqn().size());
DataNode result = (DataNode) parent.getChild(name);
if (result != null)
{
if (internalFqn.size() < result.getFqn().size())
{
// need to recursively walk down the tree
result = getInternalNode(result, internalFqn);
}
}
return result;
}
private void integrateRetainedNodes(DataNode root, Set retainedNodes)
{
Fqn rootFqn = root.getFqn();
for (Iterator it = retainedNodes.iterator(); it.hasNext();)
{
DataNode retained = (DataNode) it.next();
if (retained.getFqn().isChildOf(rootFqn))
{
integrateRetainedNode(root, retained);
}
}
}
private void integrateRetainedNode(DataNode ancestor, DataNode descendant)
{
Fqn descFqn = descendant.getFqn();
Fqn ancFqn = ancestor.getFqn();
Object name = descFqn.get(ancFqn.size());
DataNode child = (DataNode) ancestor.getChild(name);
if (ancFqn.size() == descFqn.size() + 1)
{
if (child == null)
{
ancestor.addChild(name, descendant);
}
else
{
log.warn("Received unexpected internal node " + descFqn + " in transferred state");
}
}
else
{
if (child == null)
{
// Missing level -- have to create empty node
// This shouldn't really happen -- internal fqns should
// be immediately under the root
child = factory.createDataNode(nodeType, name, new Fqn(ancFqn, name), ancestor, null, true, cache);
ancestor.addChild(name, child);
}
// Keep walking down the tree
integrateRetainedNode(child, descendant);
}
}
protected TreeCache getCache()
{
return cache;
}
protected NodeFactory getFactory()
{
return factory;
}
protected byte getNodeType()
{
return nodeType;
}
protected Fqn getTargetFqn()
{
return targetFqn;
}
}
More information about the jboss-cvs-commits
mailing list