[jboss-cvs] JBossCache/src/org/jboss/cache/statetransfer ...
Vladmir Blagojevic
vladimir.blagojevic at jboss.com
Tue Nov 14 14:56:10 EST 2006
User: vblagojevic
Date: 06/11/14 14:56:10
Modified: src/org/jboss/cache/statetransfer StateTransferManager.java
Log:
[JBCACHE-591] partial state transfer
Revision Changes Path
1.13 +26 -122 JBossCache/src/org/jboss/cache/statetransfer/StateTransferManager.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: StateTransferManager.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/StateTransferManager.java,v
retrieving revision 1.12
retrieving revision 1.13
diff -u -b -r1.12 -r1.13
--- StateTransferManager.java 24 Oct 2006 11:35:31 -0000 1.12
+++ StateTransferManager.java 14 Nov 2006 19:56:09 -0000 1.13
@@ -6,9 +6,13 @@
*/
package org.jboss.cache.statetransfer;
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.OutputStream;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.jboss.cache.CacheException;
import org.jboss.cache.DataNode;
import org.jboss.cache.Fqn;
import org.jboss.cache.TreeCache;
@@ -16,20 +20,10 @@
import org.jboss.cache.loader.NodeData;
import org.jboss.cache.loader.NodeDataMarker;
import org.jboss.cache.lock.TimeoutException;
-import org.jboss.cache.marshall.MethodCall;
-import org.jboss.cache.marshall.MethodCallFactory;
-import org.jboss.cache.marshall.MethodDeclarations;
import org.jboss.cache.marshall.VersionAwareMarshaller;
import org.jboss.cache.util.ExposedByteArrayOutputStream;
-import org.jboss.util.stream.MarshalledValueOutputStream;
import org.jboss.util.stream.MarshalledValueInputStream;
-
-import java.io.ByteArrayInputStream;
-import java.io.InputStream;
-import java.io.ObjectInputStream;
-import java.io.OutputStream;
-import java.util.List;
-import java.util.Vector;
+import org.jboss.util.stream.MarshalledValueOutputStream;
public class StateTransferManager
{
@@ -37,6 +31,9 @@
public static final NodeData STREAMING_DELIMETER_NODE = new NodeDataMarker();
+
+ public static final String PARTIAL_STATE_DELIMETER = "_PARTIAL_STATE_DELIMETER";
+
private TreeCache treeCache;
private long[] loadStateTimeouts = {400, 800, 1200};
@@ -98,18 +95,16 @@
if (marshaller_ != null)
{
// can't give state for regions currently being activated/inactivated
- if (cache.isActivatingDeactivating(fqn))
+ if (marshaller_.isInactive(fqn.toString()))
{
if (log.isDebugEnabled())
log.debug("ignoring _getState() for " + fqn + " as it is being activated/inactivated");
- return null;
- }
-
- // Can't give state for inactive nodes
- if (marshaller_.isInactive(fqn.toString()))
+ if(usingStreamingStateTransfer)
{
- if (log.isDebugEnabled())
- log.debug("ignoring _getState() for inactive region " + fqn);
+ MarshalledValueOutputStream out = new MarshalledValueOutputStream(os);
+ out.writeBoolean(false);
+ out.close();
+ }
return null;
}
}
@@ -139,6 +134,7 @@
if (usingStreamingStateTransfer)
{
out = new MarshalledValueOutputStream(os);
+ out.writeBoolean(true);
generator.generateState(out, rootNode, fetchTransientState, fetchPersistentState, suppressErrors);
}
else
@@ -163,6 +159,9 @@
}
/**
+ *
+ * TODO change this javadoc
+ *
* Requests state from each of the given source nodes in the cluster
* until it gets it or no node replies with a timeout exception. If state
* is returned, integrates it into the given DataNode. If no state is
@@ -181,102 +180,7 @@
Object[] sources, ClassLoader cl)
throws Exception
{
- TreeCache cache = getTreeCache();
- // Call each node in the cluster with progressively longer timeouts
- // until we get state or no cluster node returns a TimeoutException
- long[] timeouts = getLoadStateTimeouts();
- Object ourself = cache.getLocalAddress(); // ignore ourself when we call
- boolean stateSet = false;
- TimeoutException timeoutException = null;
- Object timeoutTarget = null;
-
- boolean trace = log.isTraceEnabled();
-
- for (int i = 0; i < timeouts.length; i++)
- {
- timeoutException = null;
-
- Boolean force = (i == timeouts.length - 1) ? Boolean.TRUE
- : Boolean.FALSE;
-
- MethodCall psmc = MethodCallFactory.create(MethodDeclarations.getPartialStateMethod, subtreeRoot,
- timeouts[i],
- force,
- Boolean.FALSE);
-
- MethodCall replPsmc = MethodCallFactory.create(MethodDeclarations.replicateMethod,
- psmc);
-
- // Iterate over the group members, seeing if anyone
- // can give us state for this region
- for (int j = 0; j < sources.length; j++)
- {
- Object target = sources[j];
- if (ourself.equals(target))
- continue;
-
- Vector targets = new Vector();
- targets.add(target);
-
- List responses = cache.callRemoteMethods(targets, replPsmc, true,
- true, cache.getConfiguration().getSyncReplTimeout());
- Object rsp = null;
- if (responses != null && responses.size() > 0)
- {
- rsp = responses.get(0);
- if (rsp instanceof byte[])
- {
- setState((byte[]) rsp, integrationRoot, cl);
- stateSet = true;
-
- if (log.isDebugEnabled())
- {
- log.debug("loadState(): " + ourself +
- " got state from " + target);
- }
-
- break;
- }
- else if (rsp instanceof TimeoutException)
- {
- timeoutException = (TimeoutException) rsp;
- timeoutTarget = target;
- if (trace)
- {
- log.trace("TreeCache.activateRegion(): " + ourself +
- " got a TimeoutException from " + target);
- }
- }
- }
-
- if (trace)
- {
- log.trace("TreeCache.activateRegion(): " + ourself +
- " No usable response from node " + target +
- (rsp == null ? "" : (" -- received " + rsp)));
- }
- }
-
- // We've looped through all targets; if we got state or didn't
- // but no one sent a timeout (which means no one had state)
- // we don't want to try again
- if (stateSet || timeoutException == null)
- break;
- }
-
- if (!stateSet)
- {
- // If we got a timeout exception on the final try,
- // this is a failure condition
- if (timeoutException != null)
- {
- throw new CacheException("Failed getting state due to timeout on " +
- timeoutTarget, timeoutException);
- }
-
- if (log.isDebugEnabled())
- log.debug("TreeCache.activateRegion(): No nodes able to give state");
- }
+ treeCache.fetchPartialState(sources, subtreeRoot,integrationRoot.getFqn());
}
/**
@@ -350,7 +254,7 @@
MarshalledValueInputStream in =null;
if (usingStreamTransfer)
{
- in = new MarshalledValueInputStream((InputStream) state);
+ in = (MarshalledValueInputStream) state;
}
else
{
More information about the jboss-cvs-commits
mailing list