[jboss-cvs] JBossCache/src/org/jboss/cache/statetransfer ...
Brian Stansberry
brian.stansberry at jboss.com
Tue Jul 18 17:19:35 EDT 2006
User: bstansberry
Date: 06/07/18 17:19:35
Modified: src/org/jboss/cache/statetransfer
StateTransferFactory.java
Added: src/org/jboss/cache/statetransfer
StateTransferGenerator_200.java
StateTransferIntegrator_200.java
Removed: src/org/jboss/cache/statetransfer
StateTransferGenerator_123.java
StateTransferGenerator_1241.java
StateTransferIntegrator_140.java
StateTransferGenerator_140.java
StateTransferIntegrator_123.java
StateTransferGenerator_124.java
StateTransferIntegrator_124.java
StateTransferIntegrator_1241.java
Log:
[JBCACHE-705] Get rid of unneeded state transfer interop code
Revision Changes Path
1.6 +12 -27 JBossCache/src/org/jboss/cache/statetransfer/StateTransferFactory.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: StateTransferFactory.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/StateTransferFactory.java,v
retrieving revision 1.5
retrieving revision 1.6
diff -u -b -r1.5 -r1.6
--- StateTransferFactory.java 22 Apr 2006 05:08:13 -0000 1.5
+++ StateTransferFactory.java 18 Jul 2006 21:19:35 -0000 1.6
@@ -16,10 +16,7 @@
public abstract class StateTransferFactory
{
- private static final short RV_123 = Version.getVersionShort("1.2.3");
- private static final short RV_124 = Version.getVersionShort("1.2.4");
- private static final short RV_124SP1 = Version.getVersionShort("1.2.4.SP1");
- private static final short RV_140 = Version.getVersionShort("1.4.0");
+ private static final short RV_200 = Version.getVersionShort("2.0.0");
/**
* Gets the StateTransferGenerator able to handle the given cache instance.
@@ -27,6 +24,8 @@
* @param cache the cache
*
* @return the {@link StateTransferGenerator}
+ *
+ * @throws IllegalStateException if the cache's ReplicationVersion is < 2.0.0
*/
public static StateTransferGenerator
getStateTransferGenerator(TreeCache cache)
@@ -35,18 +34,10 @@
// Compiler won't let me use a switch
- // Test 1.2.4 and 1.2.4.SP1 first as these are actually lower numbers
- // than 1.2.3 since their shorts used a different algorithm
- if (version == RV_124)
- return new StateTransferGenerator_124(cache);
- else if (version == RV_124SP1)
- return new StateTransferGenerator_1241(cache);
- else if (version <= RV_123 && version > 0) // <= 0 is actually a version > 15.31.63
- return new StateTransferGenerator_123(cache);
- else if (version < RV_140 && version > 0) // <= 0 is actually a version > 15.31.63
- return new StateTransferGenerator_1241(cache);
+ if (version < RV_200 && version > 0) // <= 0 is actually a version > 15.31.63
+ throw new IllegalStateException("State transfer with cache replication version < 2.0.0 not supported");
else
- return new StateTransferGenerator_140(cache); // current default
+ return new StateTransferGenerator_200(cache); // current default
}
/**
@@ -56,6 +47,8 @@
* @param targetFqn Fqn of the node to which the state will be bound
* @param cache cache in which the state will be stored
* @return the {@link StateTransferIntegrator}.
+ *
+ * @throws IllegalStateException if the cache's ReplicationVersion is < 2.0.0
* @throws Exception
*/
public static StateTransferIntegrator
@@ -75,23 +68,15 @@
catch (IOException io)
{
// No short at the head of the stream means version 123
- version = RV_123;
+ throw new IllegalStateException("State transfer with cache replication version < 2.0.0 not supported");
}
// Compiler won't let me use a switch
- // Test 1.2.4 and 1.2.4.SP1 first as these are actually lower numbers
- // than 1.2.3 since their shorts used a different algorithm
- if (version == RV_124)
- return new StateTransferIntegrator_124(in, targetFqn, cache);
- else if (version == RV_124SP1)
- return new StateTransferIntegrator_1241(state, targetFqn, cache);
- else if (version <= RV_123 && version > 0) // <= 0 is actually a version > 15.31.63
- return new StateTransferIntegrator_123(state, targetFqn, cache);
- else if (version < RV_140 && version > 0) // <= 0 is actually a version > 15.31.63
- return new StateTransferIntegrator_1241(state, targetFqn, cache);
+ if (version < RV_200 && version > 0) // <= 0 is actually a version > 15.31.63
+ throw new IllegalStateException("State transfer with cache replication version < 2.0.0 not supported");
else
- return new StateTransferIntegrator_140(state, targetFqn, cache); // current default
+ return new StateTransferIntegrator_200(state, targetFqn, cache); // current default
}
finally {
1.1 date: 2006/07/18 21:19:35; author: bstansberry; state: Exp;JBossCache/src/org/jboss/cache/statetransfer/StateTransferGenerator_200.java
Index: StateTransferGenerator_200.java
===================================================================
/*
* JBoss, the OpenSource J2EE webOS
*
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
package org.jboss.cache.statetransfer;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.cache.DataNode;
import org.jboss.cache.Fqn;
import org.jboss.cache.TreeCache;
import org.jboss.cache.Version;
import org.jboss.cache.aop.InternalDelegate;
import org.jboss.cache.aop.PojoCache;
import org.jboss.cache.aop.util.ObjectUtil;
import org.jboss.cache.loader.ExtendedCacheLoader;
import org.jboss.cache.loader.NodeData;
import org.jboss.cache.util.ExposedByteArrayOutputStream;
import org.jboss.invocation.MarshalledValueOutputStream;
class StateTransferGenerator_200 implements StateTransferGenerator
{
public static final short STATE_TRANSFER_VERSION =
Version.getVersionShort("2.0.0.GA");
private Log log = LogFactory.getLog(getClass().getName());
private TreeCache cache;
private Set internalFqns;
StateTransferGenerator_200(TreeCache cache)
{
this.cache = cache;
this.internalFqns = cache.getInternalFqns();
}
public byte[] generateStateTransfer(DataNode rootNode,
boolean generateTransient,
boolean generatePersistent,
boolean suppressErrors)
throws Throwable
{
boolean debug = log.isDebugEnabled();
Fqn fqn = rootNode.getFqn();
byte[][] states=new byte[3][]; // [transient][associated][persistent]
states[0]=states[1]=states[2]=null;
int[] sizes = new int[3];
byte[] retval = null;
int lastSize;
MarshalledValueOutputStream out;
ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream(1024);
try {
initializeStateTransfer(baos);
lastSize = baos.size();
}
catch (Throwable t)
{
log.error("failed initialing state transfer byte[]", t);
if (!suppressErrors)
throw t;
return null;
}
try {
if(generateTransient) {
out = new MarshalledValueOutputStream(baos);
marshallTransientState(rootNode, out);
out.close();
sizes[0] = baos.size() - lastSize;
lastSize = baos.size();
if (debug) {
log.debug("generated the in-memory state (" + sizes[0] +
" bytes)");
}
// Return any state associated with the subtree but not stored in it
if (cache instanceof PojoCache) {
out = new MarshalledValueOutputStream(baos);
marshallAssociatedState(fqn, out);
out.close();
sizes[1] = baos.size() - lastSize;
lastSize = baos.size();
if (debug) {
log.debug("returning the associated state (" + sizes[1] +
" bytes)");
}
}
}
}
catch(Throwable t) {
log.error("failed getting the in-memory (transient) state", t);
if (!suppressErrors)
throw t;
// Reset the byte array and see if we can continue with persistent state
// TODO reconsider this -- why are errors suppressed at all?
sizes[0] = sizes[1] = 0;
baos.reset();
try {
initializeStateTransfer(baos);
}
catch (Throwable t1) {
log.error("failed re-initializing state transfer", t1);
return null;
}
}
if (generatePersistent) {
try {
if (debug)
log.debug("getting the persistent state");
byte[] persState = null;
if (fqn.size() == 0)
persState = cache.getCacheLoader().loadEntireState();
else
persState = ((ExtendedCacheLoader)cache.getCacheLoader()).loadState(fqn);
if (persState != null) {
sizes[2] = persState.length;
baos.write(persState);
}
if (debug) {
log.debug("generated the persistent state (" + sizes[2] +
" bytes)");
}
}
catch(Throwable t) {
log.error("failed getting the persistent state", t);
if (!suppressErrors)
throw t;
sizes[2] = 0;
}
}
// Overwrite the placeholders used for the sizes of the state transfer
// components with the correct values
try {
byte[] bytes = baos.getRawBuffer();
overwriteInt(bytes, 8, sizes[0]);
overwriteInt(bytes, 12, sizes[1]);
overwriteInt(bytes, 16, sizes[2]);
retval = bytes;
log.info("returning the state for tree rooted in " + fqn.toString() +
"(" + retval.length + " bytes)");
return retval;
}
catch(Throwable t) {
log.error("failed serializing transient and persistent state", t);
if (!suppressErrors)
throw t;
return null;
}
}
private void initializeStateTransfer(OutputStream baos) throws IOException
{
MarshalledValueOutputStream out = new MarshalledValueOutputStream(baos);
out.writeShort(STATE_TRANSFER_VERSION);
// Write a placeholder for the 3 sizes we'll merge in later
out.writeInt(0);
out.writeInt(0);
out.writeInt(0);
out.close();
}
/**
* Do a preorder traversal: visit the node first, then the node's children
* @param fqn Start node
* @param out
* @throws Exception
*/
private void marshallTransientState(DataNode node,
ObjectOutputStream out) throws Exception
{
if (internalFqns.contains(node.getFqn()))
return;
Map attrs;
NodeData nd;
// first handle the current node
attrs=node.getData();
if(attrs == null || attrs.size() == 0)
nd=new NodeData(node.getFqn());
else
nd=new NodeData(node.getFqn(), attrs);
out.writeObject(nd);
// then visit the children
Map children = node.getChildren();
if(children == null)
return;
for(Iterator it=children.entrySet().iterator(); it.hasNext();) {
Map.Entry entry = (Map.Entry) it.next();
marshallTransientState((DataNode) entry.getValue(), out);
}
}
/**
* For each node in the internal reference map that is associated with the
* given Fqn, writes an Object[] to the stream containing the node's
* name and the value of its sole attribute. Does nothing if the Fqn is the
* root node (i.e. "/") or if it is in the internal reference area itself.
*/
private void marshallAssociatedState(Fqn fqn, ObjectOutputStream out)
throws Exception
{
if (fqn == null
|| fqn.size() == 0
|| fqn.isChildOf(InternalDelegate.JBOSS_INTERNAL))
return;
DataNode refMapNode = cache.get(InternalDelegate.JBOSS_INTERNAL_MAP);
Map children = null;
if (refMapNode != null && (children = refMapNode.getChildren()) != null) {
String targetFqn = ObjectUtil.getIndirectFqn(fqn.toString());
Map.Entry entry;
String key;
DataNode value;
for (Iterator iter = children.entrySet().iterator(); iter.hasNext();) {
entry = (Map.Entry) iter.next();
key = (String) entry.getKey();
if (key.startsWith(targetFqn)) {
value = (DataNode) entry.getValue();
out.writeObject(new Object[] { key, value.get(key) });
}
}
}
}
static void overwriteInt(byte[] bytes, int startpos, int newVal)
{
bytes[startpos] = (byte) (newVal >>> 24);
bytes[startpos + 1] = (byte) (newVal >>> 16);
bytes[startpos + 2] = (byte) (newVal >>> 8);
bytes[startpos + 3] = (byte) (newVal >>> 0);
}
}
1.1 date: 2006/07/18 21:19:35; author: bstansberry; state: Exp;JBossCache/src/org/jboss/cache/statetransfer/StateTransferIntegrator_200.java
Index: StateTransferIntegrator_200.java
===================================================================
/*
* JBoss, the OpenSource J2EE webOS
*
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
package org.jboss.cache.statetransfer;
import java.io.ByteArrayInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.cache.DataNode;
import org.jboss.cache.Fqn;
import org.jboss.cache.TreeNode;
import org.jboss.cache.TreeCache;
import org.jboss.cache.aop.InternalDelegate;
import org.jboss.cache.aop.PojoCache;
import org.jboss.cache.factories.NodeFactory;
import org.jboss.cache.loader.CacheLoader;
import org.jboss.cache.loader.ExtendedCacheLoader;
import org.jboss.cache.loader.NodeData;
import org.jboss.cache.buddyreplication.BuddyManager;
import org.jboss.invocation.MarshalledValueInputStream;
class StateTransferIntegrator_200 implements StateTransferIntegrator
{
/** Number of bytes at the beginning of the state transfer byte[]
* utilized by meta-information about the composition of the byte[]
* (6 for stream header, 2 for version short,
* 3 * 4 for lengths of the state components, 2 bytes for close)
*/
private static final int HEADER_LENGTH = 6 + 2 + 4 + 4 + 4;// + 2;
private Log log = LogFactory.getLog(getClass().getName());
private TreeCache cache;
private Fqn targetFqn;
private byte[] state;
private int transientSize;
private int associatedSize;
private int persistentSize;
private boolean transientSet;
private NodeFactory factory;
private byte nodeType;
private Set internalFqns;
StateTransferIntegrator_200(byte[] state, Fqn targetFqn,
TreeCache cache) throws Exception
{
this.targetFqn = targetFqn;
this.cache = cache;
this.state = state;
this.factory = NodeFactory.getInstance();
this.nodeType = cache.isNodeLockingOptimistic()
? NodeFactory.NODE_TYPE_OPTIMISTIC_NODE
: NodeFactory.NODE_TYPE_TREENODE;
this.internalFqns = cache.getInternalFqns();
ByteArrayInputStream bais = new ByteArrayInputStream(state);
MarshalledValueInputStream in = new MarshalledValueInputStream(bais);
in.readShort(); // the version, which we discard
transientSize = in.readInt();
associatedSize = in.readInt();
persistentSize = in.readInt();
in.close();
if (log.isTraceEnabled()) {
log.trace("transient state: " + transientSize + " bytes");
log.trace("associated state: " + associatedSize + " bytes");
log.trace("persistent state: " + persistentSize + " bytes");
}
}
public void integrateTransientState(DataNode target, ClassLoader cl)
throws Exception
{
if (transientSize > 0) {
ClassLoader oldCL = null;
try {
if (cl != null) {
oldCL = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(cl);
}
if (log.isTraceEnabled())
log.trace("integrating transient state for " + target);
integrateTransientState(target);
transientSet = true;
if (log.isTraceEnabled())
log.trace("transient state successfully integrated for " +
targetFqn);
// 3. Set the associated state. We only do this if the normal
// transient state was set.
integrateAssociatedState();
}
finally {
if (!transientSet) {
// Clear any existing state from the targetRoot
target.clear();
target.removeAllChildren();
}
if (oldCL != null)
Thread.currentThread().setContextClassLoader(oldCL);
}
}
}
private void integrateAssociatedState() throws Exception
{
if (associatedSize > 0 && cache instanceof PojoCache) {
DataNode refMapNode = cache.get(InternalDelegate.JBOSS_INTERNAL_MAP);
ByteArrayInputStream in_stream=new ByteArrayInputStream(state, HEADER_LENGTH + transientSize, associatedSize);
MarshalledValueInputStream in=new MarshalledValueInputStream(in_stream);
try {
Object[] nameValue;
while ((nameValue = (Object[]) in.readObject()) != null) {
TreeNode target = refMapNode.getChild(nameValue[0]);
if (target == null) {
// Create the node
Fqn fqn = new Fqn(InternalDelegate.JBOSS_INTERNAL_MAP, nameValue[0]);
target = factory.createDataNode(nodeType,
nameValue[0],
fqn,
refMapNode,
null,
true,
cache);
refMapNode.addChild(nameValue[0], target);
}
target.put(nameValue[0], nameValue[1]);
}
}
catch (EOFException eof) {
// all done
}
if (log.isTraceEnabled())
log.trace("associated state successfully integrated for " + targetFqn);
}
else if (log.isTraceEnabled()) {
log.trace("No need to integrate associated state for " + targetFqn);
}
}
public void integratePersistentState() throws Exception
{
if(persistentSize > 0) {
CacheLoader loader = cache.getCacheLoader();
if(loader == null) {
log.error("cache loader is null, cannot set persistent state");
}
else if (targetFqn.size() == 0){
if (log.isTraceEnabled())
log.trace("setting the persistent state");
byte[] persistentState = getPersistentState();
loader.storeEntireState(persistentState);
if (log.isTraceEnabled())
log.trace("setting the persistent state was successful");
}
else if (loader instanceof ExtendedCacheLoader) {
if (log.isTraceEnabled())
log.trace("setting the persistent state");
// cache_loader.remove(Fqn.fromString("/"));
byte[] persistentState = getPersistentState();
((ExtendedCacheLoader) loader).storeState(persistentState,
targetFqn);
if (log.isTraceEnabled())
log.trace("setting the persistent state was successful");
}
else {
log.error("cache loader does not implement ExtendedCacheLoader, " +
"cannot set persistent state");
}
}
}
private void integrateTransientState(DataNode target)
throws IOException, ClassNotFoundException
{
Set retainedNodes = retainInternalNodes(target);
target.removeAllChildren();
ByteArrayInputStream in_stream=new ByteArrayInputStream(state, HEADER_LENGTH, transientSize);
MarshalledValueInputStream in=new MarshalledValueInputStream(in_stream);
// Read the first NodeData and integrate into our target
NodeData nd = (NodeData) in.readObject();
Map attrs = nd.getAttributes();
if (attrs != null)
target.put(attrs, true);
else
target.clear();
// Check whether this is an integration into the buddy backup subtree
Fqn tferFqn = nd.getFqn();
Fqn tgtFqn = target.getFqn();
boolean move = tgtFqn.isChildOrEquals(BuddyManager.BUDDY_BACKUP_SUBTREE_FQN)
&& !tferFqn.isChildOrEquals(tgtFqn);
// If it is an integration, calculate how many levels of offset
int offset = move ? tgtFqn.size() - tferFqn.size() : 0;
integrateStateTransferChildren(target, offset, in);
in.close();
integrateRetainedNodes(target, retainedNodes);
}
private NodeData integrateStateTransferChildren(DataNode parent,
int offset,
ObjectInputStream in)
throws IOException, ClassNotFoundException
{
int parent_level = parent.getFqn().size();
int target_level = parent_level + 1;
Fqn fqn;
int size;
Object name;
try
{
NodeData nd = (NodeData) in.readObject();
while (nd != null) {
fqn = nd.getFqn();
// If we need to integrate into the buddy backup subtree,
// change the Fqn to fit under it
if (offset > 0)
fqn = new Fqn(parent.getFqn().getFqnChild(offset), fqn);
size = fqn.size();
if (size <= parent_level)
return nd;
else if (size > target_level)
throw new IllegalStateException("NodeData " + fqn +
" is not a direct child of " +
parent.getFqn());
name = fqn.get(size - 1);
// We handle this NodeData. Create a DataNode and
// integrate its data
DataNode target = factory.createDataNode(nodeType,
name,
fqn,
parent,
nd.getAttributes(),
true,
cache);
parent.addChild(name, target);
// Recursively call, which will walk down the tree
// and return the next NodeData that's a child of our parent
nd = integrateStateTransferChildren(target, offset, in);
}
}
catch (EOFException eof) {
// all done
}
return null;
}
private byte[] getPersistentState()
{
byte[] result = new byte[persistentSize];
System.arraycopy(state, HEADER_LENGTH + transientSize + associatedSize, result, 0, persistentSize);
return result;
}
private Set retainInternalNodes(DataNode target)
{
Set result = new HashSet();
Fqn targetFqn = target.getFqn();
for (Iterator it = internalFqns.iterator(); it.hasNext();)
{
Fqn internalFqn = (Fqn) it.next();
if (internalFqn.isChildOf(targetFqn))
{
DataNode internalNode = getInternalNode(target, internalFqn);
if (internalNode != null)
result.add(internalNode);
}
}
return result;
}
private DataNode getInternalNode(DataNode parent, Fqn internalFqn)
{
Object name = internalFqn.get(parent.getFqn().size());
DataNode result = (DataNode) parent.getChild(name);
if (result != null)
{
if (internalFqn.size() < result.getFqn().size())
{
// need to recursively walk down the tree
result = getInternalNode(result, internalFqn);
}
}
return result;
}
private void integrateRetainedNodes(DataNode root, Set retainedNodes)
{
Fqn rootFqn = root.getFqn();
for (Iterator it = retainedNodes.iterator(); it.hasNext();)
{
DataNode retained = (DataNode) it.next();
if (retained.getFqn().isChildOf(rootFqn))
{
integrateRetainedNode(root, retained);
}
}
}
private void integrateRetainedNode(DataNode ancestor, DataNode descendant)
{
Fqn descFqn = descendant.getFqn();
Fqn ancFqn = ancestor.getFqn();
Object name = descFqn.get(ancFqn.size());
DataNode child = (DataNode) ancestor.getChild(name);
if (ancFqn.size() == descFqn.size() + 1)
{
if (child == null)
{
ancestor.addChild(name, descendant);
}
else
{
log.warn("Received unexpected internal node " + descFqn +
" in transferred state");
}
}
else
{
if (child == null)
{
// Missing level -- have to create empty node
// This shouldn't really happen -- internal fqns should
// be immediately under the root
child = factory.createDataNode(nodeType,
name,
new Fqn(ancFqn, name),
ancestor,
null,
true,
cache);
ancestor.addChild(name, child);
}
// Keep walking down the tree
integrateRetainedNode(child, descendant);
}
}
}
More information about the jboss-cvs-commits
mailing list