[jboss-cvs] JBossCache/src/org/jboss/cache/statetransfer ...
Vladmir Blagojevic
vladimir.blagojevic at jboss.com
Wed Dec 20 17:28:13 EST 2006
User: vblagojevic
Date: 06/12/20 17:28:13
Modified: src/org/jboss/cache/statetransfer
DefaultStateTransferIntegrator.java
StateTransferManager.java
Log:
final state transfer refactoring
Revision Changes Path
1.8 +2 -9 JBossCache/src/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: DefaultStateTransferIntegrator.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java,v
retrieving revision 1.7
retrieving revision 1.8
diff -u -b -r1.7 -r1.8
--- DefaultStateTransferIntegrator.java 14 Dec 2006 17:18:48 -0000 1.7
+++ DefaultStateTransferIntegrator.java 20 Dec 2006 22:28:13 -0000 1.8
@@ -64,16 +64,9 @@
{
cause = t;
log.error("Failed integrating state.", t);
- }
- finally
- {
- ois.close();
- if (cause != null)
- {
throw new Exception("State transfer failed ", cause);
}
}
- }
protected void integrateTransientState(ObjectInputStream in, Node target, ClassLoader cl) throws Exception
{
1.18 +43 -134 JBossCache/src/org/jboss/cache/statetransfer/StateTransferManager.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: StateTransferManager.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/StateTransferManager.java,v
retrieving revision 1.17
retrieving revision 1.18
diff -u -b -r1.17 -r1.18
--- StateTransferManager.java 14 Dec 2006 17:18:48 -0000 1.17
+++ StateTransferManager.java 20 Dec 2006 22:28:13 -0000 1.18
@@ -18,14 +18,10 @@
import org.jboss.cache.lock.NodeLock;
import org.jboss.cache.lock.TimeoutException;
import org.jboss.cache.marshall.VersionAwareMarshaller;
-import org.jboss.cache.util.ExposedByteArrayOutputStream;
-import org.jboss.util.stream.MarshalledValueInputStream;
-import org.jboss.util.stream.MarshalledValueOutputStream;
-import java.io.ByteArrayInputStream;
-import java.io.InputStream;
import java.io.ObjectInputStream;
-import java.io.OutputStream;
+import java.io.ObjectOutputStream;
+
public class StateTransferManager
{
@@ -33,42 +29,25 @@
public static final NodeData STREAMING_DELIMETER_NODE = new NodeDataMarker();
-
public static final String PARTIAL_STATE_DELIMETER = "_PARTIAL_STATE_DELIMETER";
- private TreeCache treeCache;
- private long[] loadStateTimeouts = {400, 800, 1200};
+ private final TreeCache cache;
public StateTransferManager(TreeCache cache)
{
- this.treeCache = cache;
+ this.cache = cache;
}
public TreeCache getTreeCache()
{
- return treeCache;
- }
-
- public void setTreeCache(TreeCache cache)
- {
- this.treeCache = cache;
- }
-
- public long[] getLoadStateTimeouts()
- {
- return loadStateTimeouts;
- }
-
- public void setLoadStateTimeouts(long[] loadStateTimeouts)
- {
- this.loadStateTimeouts = loadStateTimeouts;
+ return cache;
}
/**
- * Returns the state for the portion of the tree named by <code>fqn</code>.
+ * Writes the state for the portion of the tree named by <code>fqn</code> to
+ * the provided OutputStream.
+ *
* <p/>
- * State returned is a serialized byte[][], element 0 is the transient state
- * (or null), and element 1 is the persistent state (or null).
*
* @param fqn Fqn indicating the uppermost node in the
* portion of the tree whose state should be returned.
@@ -83,85 +62,41 @@
* @return a serialized byte[][], element 0 is the transient state
* (or null), and element 1 is the persistent state (or null).
*/
- public byte[] getState(OutputStream os, Fqn fqn, long timeout, boolean force, boolean suppressErrors) throws Throwable
+ public void getState(ObjectOutputStream out, Fqn fqn, long timeout, boolean force, boolean suppressErrors) throws Throwable
{
- boolean usingStreamingStateTransfer = os != null;
- TreeCache cache = getTreeCache();
+ VersionAwareMarshaller marshaller = cache.getMarshaller();
- VersionAwareMarshaller marshaller_ = null;
- if (cache.getConfiguration().isUseRegionBasedMarshalling())
- {
- marshaller_ = cache.getMarshaller();
- }
-
- if (marshaller_ != null)
- {
// can't give state for regions currently being activated/inactivated
- if (marshaller_.isInactive(fqn.toString()))
- {
- if (log.isDebugEnabled())
- {
- log.debug("ignoring _getState() for " + fqn + " as it is being activated/inactivated");
- }
- if (usingStreamingStateTransfer)
- {
- MarshalledValueOutputStream out = new MarshalledValueOutputStream(os);
- out.writeBoolean(false);
- out.close();
- }
- return null;
- }
- }
-
- Node rootNode = cache.findNode(fqn);
- if (rootNode == null)
- {
- return null;
- }
+ boolean canProvideState = !(marshaller.isInactive(fqn.toString()) || cache.findNode(fqn) == null);
boolean fetchTransientState = cache.getConfiguration().isFetchInMemoryState();
CacheLoaderManager cacheLoaderManager = cache.getCacheLoaderManager();
boolean fetchPersistentState = cacheLoaderManager != null && cacheLoaderManager.isFetchPersistentState();
+ if (canProvideState && (fetchPersistentState || fetchTransientState))
+ {
+ out.writeBoolean(true);
+ StateTransferGenerator generator = getStateTransferGenerator();
Object owner = getOwnerForLock();
+ long startTime = System.currentTimeMillis();
+ Node rootNode = cache.findNode(fqn);
try
{
- if (fetchTransientState || fetchPersistentState)
- {
log.info("locking the " + fqn + " subtree to return the in-memory (transient) state");
acquireLocksForStateTransfer(rootNode, owner, timeout, true, force);
- }
-
- MarshalledValueOutputStream out = null;
- byte resultBuffer[] = new byte[0];
- StateTransferGenerator generator = getStateTransferGenerator();
- long startTime = System.currentTimeMillis();
- if (usingStreamingStateTransfer)
- {
- out = new MarshalledValueOutputStream(os);
- out.writeBoolean(true);
generator.generateState(out, rootNode, fetchTransientState, fetchPersistentState, suppressErrors);
- }
- else
- {
- ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream(16 * 1024);
- out = new MarshalledValueOutputStream(baos);
- generator.generateState(out, rootNode, fetchTransientState, fetchPersistentState, suppressErrors);
- resultBuffer = baos.getRawBuffer();
- }
log.info("Successfully generated state in " + (System.currentTimeMillis() - startTime) + " msec");
- return resultBuffer;
}
finally
{
releaseStateTransferLocks(rootNode, owner, true);
}
}
-
- public byte[] getState(Fqn fqn, long timeout, boolean force, boolean suppressErrors) throws Throwable
+ else
{
- return getState(null, fqn, timeout, force, suppressErrors);
+ out.writeBoolean(false);
+ }
}
/**
@@ -185,7 +120,7 @@
Object[] sources, ClassLoader cl)
throws Exception
{
- treeCache.fetchPartialState(sources, subtreeRoot, integrationRoot.getFqn());
+ cache.fetchPartialState(sources, subtreeRoot, integrationRoot.getFqn());
}
/**
@@ -204,8 +139,7 @@
* @param cl classloader to use to unmarshal the state, or
* <code>null</code> if the TCCL should be used
*/
- public void setState(Object state, Fqn targetRoot, ClassLoader cl)
- throws Exception
+ public void setState(ObjectInputStream state, Fqn targetRoot, ClassLoader cl) throws Exception
{
TreeCache cache = getTreeCache();
Node target = cache.findNode(targetRoot);
@@ -216,7 +150,6 @@
cache.put(targetRoot, null);
target = cache.findNode(targetRoot);
}
-
setState(state, target, cl);
}
@@ -236,39 +169,16 @@
* @param cl classloader to use to unmarshal the state, or
* <code>null</code> if the TCCL should be used
*/
- private void setState(Object state, Node targetRoot, ClassLoader cl)
- throws Exception
- {
- if (state == null)
+ private void setState(ObjectInputStream state, Node targetRoot, ClassLoader cl) throws Exception
{
- log.info("new_state is null (may be first member in cluster)");
- return;
- }
-
- boolean usingStreamTransfer = (state instanceof InputStream) ? true : false;
-
Object owner = getOwnerForLock();
+ long startTime = System.currentTimeMillis();
+
try
{
// Acquire a lock on the root node
- acquireLocksForStateTransfer(targetRoot, owner,
- getTreeCache().getConfiguration().getInitialStateRetrievalTimeout(),
- true, true);
-
- StateTransferIntegrator integrator = null;
- MarshalledValueInputStream in = null;
- if (usingStreamTransfer)
- {
- in = (MarshalledValueInputStream) state;
- }
- else
- {
- ByteArrayInputStream bais = new ByteArrayInputStream((byte[]) state);
- in = new MarshalledValueInputStream(bais);
- }
-
- integrator = getStateTransferIntegrator(in, targetRoot.getFqn());
- long startTime = System.currentTimeMillis();
+ acquireLocksForStateTransfer(targetRoot, owner, getTreeCache().getConfiguration()
+ .getInitialStateRetrievalTimeout(), true, true);
/*
* Vladimir/Manik/Brian (Dec 7,2006)
@@ -281,12 +191,13 @@
Option option = new Option();
option.setBypassInterceptorChain(true);
- treeCache.getInvocationContext().setOptionOverrides(option);
+ cache.getInvocationContext().setOptionOverrides(option);
try
{
+ StateTransferIntegrator integrator = getStateTransferIntegrator(state, targetRoot.getFqn());
log.info("starting state integration at node " + targetRoot);
- integrator.integrateState(in, targetRoot, cl);
+ integrator.integrateState(state, targetRoot, cl);
log.info("successfully integrated state in " + (System.currentTimeMillis() - startTime) + " msec");
}
catch (Throwable t)
@@ -298,7 +209,6 @@
{
releaseStateTransferLocks(targetRoot, owner, true);
}
-
}
@@ -388,7 +298,6 @@
{
owner = Thread.currentThread();
}
-
return owner;
}
}
More information about the jboss-cvs-commits
mailing list