[jboss-cvs] JBossCache/src/org/jboss/cache/statetransfer ...
Vladmir Blagojevic
vladimir.blagojevic at jboss.com
Thu Aug 31 10:56:46 EDT 2006
User: vblagojevic
Date: 06/08/31 10:56:46
Modified: src/org/jboss/cache/statetransfer
StateTransferIntegrator_200.java
StateTransferFactory.java
StateTransferGenerator_200.java
StreamingStateTransferGenerator_200.java
StateTransferManager.java
StreamingStateTransferIntegrator_200.java
Log:
CacheLoader API change (remove byte based load/store)
Revision Changes Path
1.5 +20 -15 JBossCache/src/org/jboss/cache/statetransfer/StateTransferIntegrator_200.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: StateTransferIntegrator_200.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/StateTransferIntegrator_200.java,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -b -r1.4 -r1.5
--- StateTransferIntegrator_200.java 20 Jul 2006 21:58:21 -0000 1.4
+++ StateTransferIntegrator_200.java 31 Aug 2006 14:56:46 -0000 1.5
@@ -127,25 +127,30 @@
public void integratePersistentState() throws Exception
{
- if(persistentSize > 0) {
+ if (persistentSize > 0)
+ {
CacheLoader loader = cache.getCacheLoader();
- if(loader == null) {
+ if (loader == null)
+ {
log.error("cache loader is null, cannot set persistent state");
}
- else if (targetFqn.size() == 0){
- if (log.isTraceEnabled())
- log.trace("setting the persistent state");
- byte[] persistentState = getPersistentState();
- loader.storeEntireState(persistentState);
+ else
+ {
+ ByteArrayInputStream in_stream = new ByteArrayInputStream(getPersistentState());
+ MarshalledValueInputStream in = new MarshalledValueInputStream(in_stream);
+
if (log.isTraceEnabled())
- log.trace("setting the persistent state was successful");
+ log.trace("setting the persistent state with " + loader.getClass());
+
+ if (targetFqn.isRoot())
+ {
+ loader.storeEntireState(in);
}
- else {
- if (log.isTraceEnabled())
- log.trace("setting the persistent state");
- // cache_loader.remove(Fqn.fromString("/"));
- byte[] persistentState = getPersistentState();
- loader.storeState(persistentState, targetFqn);
+ else
+ {
+ loader.storeState(targetFqn, in);
+ }
+
if (log.isTraceEnabled())
log.trace("setting the persistent state was successful");
}
1.10 +8 -3 JBossCache/src/org/jboss/cache/statetransfer/StateTransferFactory.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: StateTransferFactory.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/StateTransferFactory.java,v
retrieving revision 1.9
retrieving revision 1.10
diff -u -b -r1.9 -r1.10
--- StateTransferFactory.java 24 Aug 2006 22:05:34 -0000 1.9
+++ StateTransferFactory.java 31 Aug 2006 14:56:46 -0000 1.10
@@ -21,7 +21,7 @@
* {@link StateTransferIntegrator} instances.
*
* @author <a href="brian.stansberry at jboss.com">Brian Stansberry</a>
- * @version $Revision: 1.9 $
+ * @version $Revision: 1.10 $
*/
public abstract class StateTransferFactory
{
@@ -119,8 +119,13 @@
}
catch (IOException io)
{
- // No short at the head of the stream means version 123
- throw new IllegalStateException("State transfer with cache replication version < 2.0.0 not supported");
+ // something is wrong with this stream, close it
+ try
+ {
+ in.close();
+ }
+ catch(IOException ignored){}
+ throw new IllegalStateException("Stream corrupted ",io);
}
// Compiler won't let me use a switch
1.5 +43 -19 JBossCache/src/org/jboss/cache/statetransfer/StateTransferGenerator_200.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: StateTransferGenerator_200.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/StateTransferGenerator_200.java,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -b -r1.4 -r1.5
--- StateTransferGenerator_200.java 22 Aug 2006 20:37:53 -0000 1.4
+++ StateTransferGenerator_200.java 31 Aug 2006 14:56:46 -0000 1.5
@@ -16,6 +16,7 @@
import org.jboss.cache.util.ExposedByteArrayOutputStream;
import org.jboss.invocation.MarshalledValueOutputStream;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
@@ -28,6 +29,10 @@
public static final short STATE_TRANSFER_VERSION =
Version.getVersionShort("2.0.0.GA");
+ //whenever we wrap stream A with object based stream B,B writes a few bytes
+ //of a stream header to underlying stream A
+ public static final int OBJECT_STREAM_MARKER_LENGTH =4;
+
private Log log = LogFactory.getLog(getClass().getName());
private TreeCache cache;
@@ -109,32 +114,51 @@
}
}
- if (generatePersistent) {
- try {
- if (debug)
- log.debug("getting the persistent state");
+ if (generatePersistent)
+ {
+ ByteArrayOutputStream out_stream = new ByteArrayOutputStream(1024);
+ ObjectOutputStream out = new MarshalledValueOutputStream(out_stream);
byte[] persState = null;
- if (fqn.size() == 0)
- persState = cache.getCacheLoader().loadEntireState();
- else
- persState = cache.getCacheLoader().loadState(fqn);
-
- if (persState != null) {
- sizes[2] = persState.length;
- baos.write(persState);
+ boolean persistentStateProvidedOk = false;
+ try
+ {
+ if (debug)
+ log.debug("getting the persistent state from cacheloader " + cache.getCacheLoader().getClass());
+ if (fqn.isRoot())
+ {
+ cache.getCacheLoader().loadEntireState(out);
}
-
- if (debug) {
- log.debug("generated the persistent state (" + sizes[2] +
- " bytes)");
+ else
+ {
+ cache.getCacheLoader().loadState(fqn, out);
}
+ persistentStateProvidedOk=true;
}
- catch(Throwable t) {
- log.error("failed getting the persistent state", t);
+ catch (Throwable t)
+ {
+ log.error("cacheloader failed while getting the persistent state", t);
if (!suppressErrors)
throw t;
+ }
+ finally
+ {
+ out.close();
+ persState = out_stream.toByteArray();
+ if (persistentStateProvidedOk && persState.length > OBJECT_STREAM_MARKER_LENGTH)
+ {
+ sizes[2] = persState.length;
+ baos.write(persState);
+ }
+ else
+ {
sizes[2] = 0;
}
+
+ if (debug)
+ {
+ log.debug("generated the persistent state (" + sizes[2] + " bytes)");
+ }
+ }
}
// Overwrite the placeholders used for the sizes of the state transfer
1.2 +45 -15 JBossCache/src/org/jboss/cache/statetransfer/StreamingStateTransferGenerator_200.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: StreamingStateTransferGenerator_200.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/StreamingStateTransferGenerator_200.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -b -r1.1 -r1.2
--- StreamingStateTransferGenerator_200.java 24 Aug 2006 22:05:34 -0000 1.1
+++ StreamingStateTransferGenerator_200.java 31 Aug 2006 14:56:46 -0000 1.2
@@ -48,17 +48,26 @@
try
{
- out.writeShort(STATE_TRANSFER_VERSION);
try
{
+ out.writeShort(STATE_TRANSFER_VERSION);
if (generateTransient)
{
+ if (log.isTraceEnabled())
+ log.trace("writing transient state for " + fqn);
+
marshallTransientState(rootNode, out);
- log.debug("generated the in-memory state");
- // Return any state associated with the subtree but not stored in it
+ if (log.isTraceEnabled())
+ log.trace("transient state succesfully written");
+
+ if (log.isTraceEnabled())
+ log.trace("writing associated state");
+
marshallAssociatedState(fqn, os);
- log.debug("returning the associated state bytes)");
+
+ if (log.isTraceEnabled())
+ log.trace("associated state succesfully written");
}
}
catch (Throwable t)
@@ -69,15 +78,30 @@
}
finally
{
- log.debug("writing delimeter after transient state");
+ if (log.isTraceEnabled())
+ log.trace("writing delimeter after transient state");
+
out.writeObject(StateTransferManager.STREAMING_DELIMETER_NODE);
}
-
+ try
+ {
if (generatePersistent)
{
- try
+ if (log.isTraceEnabled())
+ log.trace("writing persistent state for " + fqn);
+
+ if (fqn.isRoot())
+ {
+ cache.getCacheLoader().loadEntireState(out);
+ }
+ else
{
- //TODO
+ cache.getCacheLoader().loadState(fqn, out);
+ }
+
+ if (log.isTraceEnabled())
+ log.trace("persistent state succesfully written");
+ }
}
catch (Throwable t)
{
@@ -85,6 +109,12 @@
if (!suppressErrors)
throw t;
}
+ finally
+ {
+ if (log.isTraceEnabled())
+ log.trace("writing delimeter after persistent state");
+
+ out.writeObject(StateTransferManager.STREAMING_DELIMETER_NODE);
}
}
finally
1.7 +74 -66 JBossCache/src/org/jboss/cache/statetransfer/StateTransferManager.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: StateTransferManager.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/StateTransferManager.java,v
retrieving revision 1.6
retrieving revision 1.7
diff -u -b -r1.6 -r1.7
--- StateTransferManager.java 30 Aug 2006 17:08:18 -0000 1.6
+++ StateTransferManager.java 31 Aug 2006 14:56:46 -0000 1.7
@@ -1,5 +1,12 @@
package org.jboss.cache.statetransfer;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Vector;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.cache.CacheException;
@@ -9,27 +16,21 @@
import org.jboss.cache.config.Option;
import org.jboss.cache.loader.CacheLoaderManager;
import org.jboss.cache.loader.NodeData;
+import org.jboss.cache.loader.NodeDataMarker;
import org.jboss.cache.lock.TimeoutException;
-import org.jboss.cache.marshall.MethodCall;
import org.jboss.cache.marshall.MethodCallFactory;
import org.jboss.cache.marshall.MethodDeclarations;
import org.jboss.cache.marshall.VersionAwareMarshaller;
-
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Vector;
+import org.jboss.cache.marshall.MethodCall;
public class StateTransferManager
{
protected final static Log log = LogFactory.getLog(StateTransferManager.class);
- public static final NodeData STREAMING_DELIMETER_NODE = new NodeData(Fqn.fromString("STREAMING_DELIMETER_NODE"), null);
+ public static final NodeData STREAMING_DELIMETER_NODE = new NodeDataMarker();
private TreeCache treeCache;
- private long[] loadStateTimeouts = {400, 800, 1200};
+ private long[] loadStateTimeouts = { 400, 800, 1200 };
public StateTransferManager(TreeCache cache)
{
@@ -78,9 +79,9 @@
* enabled, the requested Fqn is not the root node, and the
* cache loader does not implement {@link ExtendedCacheLoader}.
*/
- public byte[] getState(OutputStream os, Fqn fqn, long timeout, boolean force, boolean suppressErrors) throws Throwable
+ public byte[] getState(OutputStream os,Fqn fqn, long timeout, boolean force, boolean suppressErrors) throws Throwable
{
- boolean usingStreamingStateTransfer = os != null;
+ boolean usingStreamingStateTransfer = os!=null;
TreeCache cache = getTreeCache();
VersionAwareMarshaller marshaller_ = null;
@@ -149,7 +150,7 @@
public byte[] getState(Fqn fqn, long timeout, boolean force, boolean suppressErrors) throws Throwable
{
- return getState(null, fqn, timeout, force, suppressErrors);
+ return getState(null,fqn,timeout,force,suppressErrors);
}
/**
@@ -329,7 +330,7 @@
byte [] new_state = null;
InputStream istream = null;
- if (state instanceof byte[])
+ if(state instanceof byte[])
{
new_state = (byte[]) state;
log.info("received the state (size=" + new_state.length + " bytes)");
@@ -347,10 +348,10 @@
getTreeCache().getConfiguration().getInitialStateRetrievalTimeout(),
true, true);
- StateTransferIntegrator integrator = null;
- if (new_state != null)
+ StateTransferIntegrator integrator =null;
+ if(new_state!=null)
{
- integrator = getStateTransferIntegrator(new_state, targetRoot.getFqn());
+ integrator = getStateTransferIntegrator(new_state,targetRoot.getFqn());
}
else
{
@@ -374,6 +375,13 @@
}
finally
{
+ if(istream!=null)
+ {
+ try
+ {
+ istream.close();
+ }catch(Exception ignored){}
+ }
releaseStateTransferLocks(targetRoot, owner, true);
}
@@ -444,7 +452,7 @@
protected StateTransferGenerator getStateTransferGenerator(OutputStream os)
{
- return StateTransferFactory.getStateTransferGenerator(os, getTreeCache());
+ return StateTransferFactory.getStateTransferGenerator(os,getTreeCache());
}
protected StateTransferIntegrator getStateTransferIntegrator(byte[] state, Fqn targetFqn)
1.2 +24 -3 JBossCache/src/org/jboss/cache/statetransfer/StreamingStateTransferIntegrator_200.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: StreamingStateTransferIntegrator_200.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/StreamingStateTransferIntegrator_200.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -b -r1.1 -r1.2
--- StreamingStateTransferIntegrator_200.java 24 Aug 2006 22:05:34 -0000 1.1
+++ StreamingStateTransferIntegrator_200.java 31 Aug 2006 14:56:46 -0000 1.2
@@ -20,6 +20,7 @@
import org.jboss.cache.TreeCache;
import org.jboss.cache.buddyreplication.BuddyManager;
import org.jboss.cache.factories.NodeFactory;
+import org.jboss.cache.loader.CacheLoader;
import org.jboss.cache.loader.NodeData;
public class StreamingStateTransferIntegrator_200 implements StateTransferIntegrator
@@ -104,7 +105,27 @@
try
{
- //TODO
+ CacheLoader loader = cache.getCacheLoader();
+ if (loader == null)
+ {
+ log.error("cache loader is null, cannot set persistent state");
+ }
+ else if (targetFqn.isRoot())
+ {
+ if (log.isTraceEnabled())
+ log.trace("setting the persistent state");
+ loader.storeEntireState(in);
+ if (log.isTraceEnabled())
+ log.trace("setting the persistent state was successful");
+ }
+ else
+ {
+ if (log.isTraceEnabled())
+ log.trace("setting partial persistent state at " + targetFqn);
+ loader.storeState(targetFqn, in);
+ if (log.isTraceEnabled())
+ log.trace("setting partial persistent state was successful");
+ }
}
finally
{
@@ -140,7 +161,7 @@
NodeData nd = (NodeData) in.readObject();
//are there any transient nodes at all?
- if (!StateTransferManager.STREAMING_DELIMETER_NODE.getFqn().equals(nd.getFqn()))
+ if (!nd.isMarker())
{
Map attrs = nd.getAttributes();
if (attrs != null)
@@ -172,7 +193,7 @@
int size;
Object name;
NodeData nd = (NodeData) in.readObject();
- while (nd != null && !StateTransferManager.STREAMING_DELIMETER_NODE.getFqn().equals(nd.getFqn()))
+ while (nd != null && !nd.isMarker())
{
fqn = nd.getFqn();
// If we need to integrate into the buddy backup subtree,
More information about the jboss-cvs-commits
mailing list