[jboss-cvs] JBossCache/src/org/jboss/cache/statetransfer ...
Vladmir Blagojevic
vladimir.blagojevic at jboss.com
Mon Sep 11 17:53:19 EDT 2006
User: vblagojevic
Date: 06/09/11 17:53:19
Modified: src/org/jboss/cache/statetransfer
AbstractStateTransferGenerator.java
StateTransferIntegrator.java
AbstractStateTransferIntegrator.java
StateTransferGenerator.java
StateTransferFactory.java StateTransferManager.java
Removed: src/org/jboss/cache/statetransfer
StateTransferGenerator_200.java
StreamingStateTransferGenerator_200.java
StateTransferIntegrator_200.java
StreamingStateTransferIntegrator_200.java
Log:
refactoring after Brian's review
Revision Changes Path
1.3 +2 -2 JBossCache/src/org/jboss/cache/statetransfer/AbstractStateTransferGenerator.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: AbstractStateTransferGenerator.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/AbstractStateTransferGenerator.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -b -r1.2 -r1.3
--- AbstractStateTransferGenerator.java 7 Sep 2006 17:41:52 -0000 1.2
+++ AbstractStateTransferGenerator.java 11 Sep 2006 21:53:19 -0000 1.3
@@ -19,7 +19,7 @@
import org.jboss.cache.Version;
import org.jboss.cache.loader.NodeData;
-public class AbstractStateTransferGenerator
+public class AbstractStateTransferGenerator implements StateTransferGenerator
{
public static final short STATE_TRANSFER_VERSION = Version.getVersionShort("2.0.0.GA");
@@ -36,7 +36,7 @@
this.internalFqns = cache.getInternalFqns();
}
- protected void streamState(ObjectOutputStream out, DataNode rootNode, boolean generateTransient,
+ public void generateState(ObjectOutputStream out, DataNode rootNode, boolean generateTransient,
boolean generatePersistent, boolean suppressErrors) throws Throwable
{
Fqn fqn = rootNode.getFqn();
1.4 +3 -1 JBossCache/src/org/jboss/cache/statetransfer/StateTransferIntegrator.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: StateTransferIntegrator.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/StateTransferIntegrator.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -b -r1.3 -r1.4
--- StateTransferIntegrator.java 31 Aug 2006 20:30:45 -0000 1.3
+++ StateTransferIntegrator.java 11 Sep 2006 21:53:19 -0000 1.4
@@ -6,11 +6,13 @@
*/
package org.jboss.cache.statetransfer;
+import java.io.ObjectInputStream;
+
import org.jboss.cache.DataNode;
public interface StateTransferIntegrator
{
- void integrateState(DataNode target,ClassLoader cl)throws Exception;
+ void integrateState(ObjectInputStream ois, DataNode target,ClassLoader cl)throws Exception;
}
\ No newline at end of file
1.5 +2 -2 JBossCache/src/org/jboss/cache/statetransfer/AbstractStateTransferIntegrator.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: AbstractStateTransferIntegrator.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/AbstractStateTransferIntegrator.java,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -b -r1.4 -r1.5
--- AbstractStateTransferIntegrator.java 7 Sep 2006 18:56:39 -0000 1.4
+++ AbstractStateTransferIntegrator.java 11 Sep 2006 21:53:19 -0000 1.5
@@ -23,7 +23,7 @@
import org.jboss.cache.loader.CacheLoader;
import org.jboss.cache.loader.NodeData;
-public class AbstractStateTransferIntegrator
+public class AbstractStateTransferIntegrator implements StateTransferIntegrator
{
protected Log log = LogFactory.getLog(getClass().getName());
@@ -343,7 +343,7 @@
return targetFqn;
}
- protected void integrateStateHelper(ObjectInputStream ois, DataNode target, ClassLoader cl) throws Exception
+ public void integrateState(ObjectInputStream ois, DataNode target, ClassLoader cl) throws Exception
{
Throwable cause=null;
//first try integrating transient state
1.3 +4 -2 JBossCache/src/org/jboss/cache/statetransfer/StateTransferGenerator.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: StateTransferGenerator.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/StateTransferGenerator.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -b -r1.2 -r1.3
--- StateTransferGenerator.java 11 Oct 2005 20:15:15 -0000 1.2
+++ StateTransferGenerator.java 11 Sep 2006 21:53:19 -0000 1.3
@@ -6,12 +6,14 @@
*/
package org.jboss.cache.statetransfer;
+import java.io.ObjectOutputStream;
+
import org.jboss.cache.DataNode;
public interface StateTransferGenerator
{
- public abstract byte[] generateStateTransfer(DataNode rootNode, boolean generateTransient,
+ public void generateState(ObjectOutputStream stream, DataNode rootNode, boolean generateTransient,
boolean generatePersistent, boolean suppressErrors) throws Throwable;
}
\ No newline at end of file
1.11 +9 -72 JBossCache/src/org/jboss/cache/statetransfer/StateTransferFactory.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: StateTransferFactory.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/StateTransferFactory.java,v
retrieving revision 1.10
retrieving revision 1.11
diff -u -b -r1.10 -r1.11
--- StateTransferFactory.java 31 Aug 2006 14:56:46 -0000 1.10
+++ StateTransferFactory.java 11 Sep 2006 21:53:19 -0000 1.11
@@ -6,22 +6,19 @@
*/
package org.jboss.cache.statetransfer;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+
import org.jboss.cache.Fqn;
import org.jboss.cache.TreeCache;
import org.jboss.cache.Version;
-import org.jboss.invocation.MarshalledValueInputStream;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
/**
* Factory class able to create {@link StateTransferGenerator} and
* {@link StateTransferIntegrator} instances.
*
* @author <a href="brian.stansberry at jboss.com">Brian Stansberry</a>
- * @version $Revision: 1.10 $
+ * @version $Revision: 1.11 $
*/
public abstract class StateTransferFactory
{
@@ -46,73 +43,13 @@
if (version < RV_200 && version > 0) // <= 0 is actually a version > 15.31.63
throw new IllegalStateException("State transfer with cache replication version < 2.0.0 not supported");
else
- return new StateTransferGenerator_200(cache); // current default
- }
-
- public static StateTransferGenerator getStateTransferGenerator(OutputStream os, TreeCache cache)
- {
- short version = cache.getConfiguration().getReplicationVersion();
-
- // Compiler won't let me use a switch
-
- if (version < RV_200 && version > 0) // <= 0 is actually a version > 15.31.63
- throw new IllegalStateException("State transfer with cache replication version < 2.0.0 not supported");
- else
- return new StreamingStateTransferGenerator_200(os,cache); // current default
- }
-
- /**
- * Gets a StateTransferIntegrator able to handle the given state.
- *
- * @param state the state
- * @param targetFqn Fqn of the node to which the state will be bound
- * @param cache cache in which the state will be stored
- * @return the {@link StateTransferIntegrator}.
- *
- * @throws IllegalStateException if the cache's ReplicationVersion is < 2.0.0
- * @throws Exception
- */
- public static StateTransferIntegrator
- getStateTransferIntegrator(byte[] state, Fqn targetFqn, TreeCache cache)
- throws Exception
- {
- ByteArrayInputStream bais = new ByteArrayInputStream(state);
- bais.mark(1024);
-
- short version = 0;
- MarshalledValueInputStream in = new MarshalledValueInputStream(bais);
- try {
- try
- {
- version = in.readShort();
- }
- catch (IOException io)
- {
- // No short at the head of the stream means version 123
- throw new IllegalStateException("State transfer with cache replication version < 2.0.0 not supported");
- }
-
- // Compiler won't let me use a switch
-
- if (version < RV_200 && version > 0) // <= 0 is actually a version > 15.31.63
- throw new IllegalStateException("State transfer with cache replication version < 2.0.0 not supported");
- else
- return new StateTransferIntegrator_200(state, targetFqn, cache); // current default
-
- }
- finally {
- try {
- in.close();
- }
- catch (IOException io) {}
- }
+ return new AbstractStateTransferGenerator(cache); // current default
}
- public static StateTransferIntegrator getStateTransferIntegrator(InputStream istream, Fqn fqn, TreeCache treeCache)
+ public static StateTransferIntegrator getStateTransferIntegrator(ObjectInputStream in, Fqn fqn, TreeCache treeCache)
throws Exception
{
short version = 0;
- MarshalledValueInputStream in = new MarshalledValueInputStream(istream);
try
{
version = in.readShort();
@@ -133,7 +70,7 @@
if (version < RV_200 && version > 0) // <= 0 is actually a version > 15.31.63
throw new IllegalStateException("State transfer with cache replication version < 2.0.0 not supported");
else
- return new StreamingStateTransferIntegrator_200(in, fqn, treeCache); // current default
+ return new AbstractStateTransferIntegrator(fqn, treeCache); // current default
}
}
1.11 +33 -58 JBossCache/src/org/jboss/cache/statetransfer/StateTransferManager.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: StateTransferManager.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/StateTransferManager.java,v
retrieving revision 1.10
retrieving revision 1.11
diff -u -b -r1.10 -r1.11
--- StateTransferManager.java 11 Sep 2006 17:02:43 -0000 1.10
+++ StateTransferManager.java 11 Sep 2006 21:53:19 -0000 1.11
@@ -20,12 +20,15 @@
import org.jboss.cache.marshall.MethodCallFactory;
import org.jboss.cache.marshall.MethodDeclarations;
import org.jboss.cache.marshall.VersionAwareMarshaller;
+import org.jboss.cache.util.ExposedByteArrayOutputStream;
+import org.jboss.invocation.MarshalledValueInputStream;
+import org.jboss.invocation.MarshalledValueOutputStream;
+import java.io.ByteArrayInputStream;
import java.io.InputStream;
+import java.io.ObjectInputStream;
import java.io.OutputStream;
-import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import java.util.Vector;
public class StateTransferManager
@@ -132,20 +135,24 @@
acquireLocksForStateTransfer(rootNode, owner, timeout, true, force);
}
- StateTransferGenerator generator = null;
+ MarshalledValueOutputStream out = null;
+ byte resultBuffer [] = new byte[0];
+ StateTransferGenerator generator = getStateTransferGenerator();
+ long startTime = System.currentTimeMillis();
if (usingStreamingStateTransfer)
{
- generator = getStateTransferGenerator(os);
+ out = new MarshalledValueOutputStream(os);
+ generator.generateState(out, rootNode, fetchTransientState, fetchPersistentState, suppressErrors);
}
else
{
- generator = getStateTransferGenerator();
+ ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream(16*1024);
+ out = new MarshalledValueOutputStream(baos);
+ generator.generateState(out, rootNode, fetchTransientState, fetchPersistentState, suppressErrors);
+ resultBuffer = baos.getRawBuffer();
}
-
- return generator.generateStateTransfer(rootNode,
- fetchTransientState,
- fetchPersistentState,
- suppressErrors);
+ log.info("Successfully generated state in " + (System.currentTimeMillis()-startTime) + " msec");
+ return resultBuffer;
}
finally
{
@@ -343,23 +350,24 @@
true, true);
StateTransferIntegrator integrator = null;
+ MarshalledValueInputStream in =null;
if (usingStreamTransfer)
{
- integrator = getStateTransferIntegrator((InputStream) state,
- targetRoot.getFqn());
+ in = new MarshalledValueInputStream((InputStream) state);
}
else
{
- byte [] new_state = (byte[]) state;
- log.info("received the state (size=" + new_state.length + " bytes)");
- integrator = getStateTransferIntegrator(new_state, targetRoot.getFqn());
+ ByteArrayInputStream bais = new ByteArrayInputStream((byte[]) state);
+ in = new MarshalledValueInputStream(bais);
}
+ integrator = getStateTransferIntegrator(in, targetRoot.getFqn());
+ long startTime = System.currentTimeMillis();
try
{
- log.info("starting state integration at node " + targetRoot + " using " + integrator);
- integrator.integrateState(targetRoot, cl);
- log.info("successfully integrated state");
+ log.info("starting state integration at node " + targetRoot);
+ integrator.integrateState(in,targetRoot, cl);
+ log.info("successfully integrated state in " + (System.currentTimeMillis()-startTime) + " msec");
}
catch (Throwable t)
{
@@ -436,45 +444,12 @@
return StateTransferFactory.getStateTransferGenerator(getTreeCache());
}
- protected StateTransferGenerator getStateTransferGenerator(OutputStream os)
- {
- return StateTransferFactory.getStateTransferGenerator(os, getTreeCache());
- }
-
- protected StateTransferIntegrator getStateTransferIntegrator(byte[] state, Fqn targetFqn)
- throws Exception
- {
- return StateTransferFactory.getStateTransferIntegrator(state, targetFqn, getTreeCache());
- }
-
- private StateTransferIntegrator getStateTransferIntegrator(InputStream istream, Fqn fqn) throws Exception
+ protected StateTransferIntegrator getStateTransferIntegrator(ObjectInputStream istream, Fqn fqn) throws Exception
{
return StateTransferFactory.getStateTransferIntegrator(istream, fqn, getTreeCache());
}
/**
- * Generates NodeAdded notifications for all nodes of the tree. This is
- * called whenever the tree is initially retrieved (state transfer)
- */
- private void notifyAllNodesCreated(DataNode curr)
- {
- DataNode n;
- Map children;
-
- if (curr == null) return;
- getTreeCache().getNotifier().notifyNodeCreated(curr.getFqn(), true);
- getTreeCache().getNotifier().notifyNodeCreated(curr.getFqn(), false);
- if ((children = curr.getChildren()) != null)
- {
- for (Iterator it = children.values().iterator(); it.hasNext();)
- {
- n = (DataNode) it.next();
- notifyAllNodesCreated(n);
- }
- }
- }
-
- /**
* Returns an object suitable for use in node locking, either the current
* transaction or the current thread if there is no transaction.
*/
More information about the jboss-cvs-commits
mailing list