[jboss-cvs] JBossCache/src/org/jboss/cache/statetransfer ...
Vladmir Blagojevic
vladimir.blagojevic at jboss.com
Thu Sep 21 11:14:45 EDT 2006
User: vblagojevic
Date: 06/09/21 11:14:45
Modified: src/org/jboss/cache/statetransfer
DefaultStateTransferGenerator.java
DefaultStateTransferIntegrator.java
Log:
atomic state transfer, see http://jboss.org/index.html?module=bb&op=viewtopic&t=90586
Revision Changes Path
1.3 +35 -67 JBossCache/src/org/jboss/cache/statetransfer/DefaultStateTransferGenerator.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: DefaultStateTransferGenerator.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/DefaultStateTransferGenerator.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -b -r1.2 -r1.3
--- DefaultStateTransferGenerator.java 13 Sep 2006 15:42:17 -0000 1.2
+++ DefaultStateTransferGenerator.java 21 Sep 2006 15:14:45 -0000 1.3
@@ -42,65 +42,37 @@
{
Fqn fqn = rootNode.getFqn();
Throwable encouteredException = null;
- try
- {
- //transient + marker
try
{
out.writeShort(STATE_TRANSFER_VERSION);
if (generateTransient)
{
+ //transient + marker
if (log.isTraceEnabled())
log.trace("writing transient state for " + fqn);
marshallTransientState(rootNode, out);
+ out.writeObject(StateTransferManager.STREAMING_DELIMETER_NODE);
if (log.isTraceEnabled())
log.trace("transient state succesfully written");
- }
- }
- catch (Throwable t)
- {
- encouteredException=t;
- log.error("failed getting the in-memory (transient) state", t);
- out.writeObject(new NodeDataExceptionMarker(t));
- }
- finally
- {
- out.writeObject(StateTransferManager.STREAMING_DELIMETER_NODE);
- }
//associated + marker
- try
- {
if (log.isTraceEnabled())
log.trace("writing associated state");
marshallAssociatedState(fqn, out);
+ out.writeObject(StateTransferManager.STREAMING_DELIMETER_NODE);
if (log.isTraceEnabled())
log.trace("associated state succesfully written");
- }
- catch (Throwable t)
- {
- encouteredException=t;
- log.error("failed writing associated state", t);
- out.writeObject(new NodeDataExceptionMarker(t));
- }
- finally
- {
- out.writeObject(StateTransferManager.STREAMING_DELIMETER_NODE);
- }
- //persistent + marker
- try
- {
+ }
if (generatePersistent)
{
if (log.isTraceEnabled())
- log.trace("writing persistent state for " +
- fqn + ",using " + cache.getCacheLoader().getClass());
+ log.trace("writing persistent state for " + fqn + ",using " + cache.getCacheLoader().getClass());
if (fqn.isRoot())
{
@@ -114,22 +86,18 @@
if (log.isTraceEnabled())
log.trace("persistent state succesfully written");
}
+ out.writeObject(StateTransferManager.STREAMING_DELIMETER_NODE);
}
catch (Throwable t)
{
- encouteredException=t;
- log.error("failed getting the persistent state", t);
+ encouteredException = t;
+ log.error("failed writing state", t);
out.writeObject(new NodeDataExceptionMarker(t));
}
finally
{
- out.writeObject(StateTransferManager.STREAMING_DELIMETER_NODE);
- }
- }
- finally
- {
out.close();
- if(encouteredException!=null && !suppressErrors)
+ if (encouteredException != null && !suppressErrors)
{
throw encouteredException;
}
1.3 +75 -84 JBossCache/src/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: DefaultStateTransferIntegrator.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -b -r1.2 -r1.3
--- DefaultStateTransferIntegrator.java 13 Sep 2006 15:42:17 -0000 1.2
+++ DefaultStateTransferIntegrator.java 21 Sep 2006 15:14:45 -0000 1.3
@@ -15,6 +15,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.jboss.cache.CacheException;
import org.jboss.cache.DataNode;
import org.jboss.cache.Fqn;
import org.jboss.cache.TreeCache;
@@ -22,6 +23,7 @@
import org.jboss.cache.factories.NodeFactory;
import org.jboss.cache.loader.CacheLoader;
import org.jboss.cache.loader.NodeData;
+import org.jboss.cache.loader.NodeDataExceptionMarker;
public class DefaultStateTransferIntegrator implements StateTransferIntegrator
{
@@ -49,6 +51,36 @@
this.internalFqns = cache.getInternalFqns();
}
+ public void integrateState(ObjectInputStream ois, DataNode target, ClassLoader cl) throws Exception
+ {
+ Throwable cause = null;
+ try
+ {
+ integrateTransientState(ois, target, cl);
+ integrateAssociatedState(ois);
+ integratePersistentState(ois);
+ }
+ catch (ClassCastException cce)
+ {
+ cause = cce;
+ log.error("Failed integrating persistent state. One of cacheloaders is not"
+ + " adhering to state stream format [JBCACHE-738].");
+ }
+ catch(Throwable t)
+ {
+ cause = t;
+ log.error("Failed integrating state.",t);
+ }
+ finally
+ {
+ ois.close();
+ if (cause != null)
+ {
+ throw new Exception("State transfer failed ");
+ }
+ }
+ }
+
protected void integrateTransientState(ObjectInputStream in,DataNode target, ClassLoader cl) throws Exception
{
boolean transientSet = false;
@@ -72,6 +104,7 @@
if (!transientSet)
{
// Clear any existing state from the targetRoot
+ log.warn("transient state integration failed, removing all children of " + target);
target.clear();
target.removeAllChildren();
}
@@ -89,7 +122,7 @@
{
// no-op in this base class
// just read marker
- in.readObject();
+ readNode(in);
}
protected void integratePersistentState(ObjectInputStream in) throws Exception
@@ -99,7 +132,8 @@
boolean persistentSet=false;
if (loader == null)
{
- log.error("cache loader is null, cannot set persistent state");
+ if (log.isTraceEnabled())
+ log.trace("cache loader is null, will not attempt to integrate persistent state");
}
else
{
@@ -119,11 +153,9 @@
}
finally
{
- if(!persistentSet)
+ if (!persistentSet)
{
- if (log.isTraceEnabled())
- log.trace("persistent state integration failed, removing all nodes from loader");
-
+ log.warn("persistent state integration failed, removing all nodes from loader");
loader.remove(targetFqn);
}
else
@@ -135,6 +167,26 @@
}
}
+ protected TreeCache getCache()
+ {
+ return cache;
+ }
+
+ protected NodeFactory getFactory()
+ {
+ return factory;
+ }
+
+ protected byte getNodeType()
+ {
+ return nodeType;
+ }
+
+ protected Fqn getTargetFqn()
+ {
+ return targetFqn;
+ }
+
/**
* Generates NodeAdded notifications for all nodes of the tree. This is
* called whenever the tree is initially retrieved (state transfer)
@@ -182,10 +234,10 @@
target.removeAllChildren();
// Read the first NodeData and integrate into our target
- NodeData nd = (NodeData) in.readObject();
+ NodeData nd = readNode(in);
//are there any transient nodes at all?
- if (!nd.isMarker())
+ if (nd != null && !nd.isMarker())
{
Map attrs = nd.getAttributes();
if (attrs != null)
@@ -208,6 +260,17 @@
}
}
+ private NodeData readNode(ObjectInputStream in) throws IOException, ClassNotFoundException
+ {
+ NodeData nd = (NodeData) in.readObject();
+ if (nd != null && nd.isExceptionMarker())
+ {
+ throw new CacheException("State provider cacheloader threw exception during loadState",
+ ((NodeDataExceptionMarker) nd).getCause());
+ }
+ return nd;
+ }
+
private NodeData integrateStateTransferChildren(DataNode parent, int offset, ObjectInputStream in)
throws IOException, ClassNotFoundException
{
@@ -216,7 +279,7 @@
Fqn fqn;
int size;
Object name;
- NodeData nd = (NodeData) in.readObject();
+ NodeData nd = readNode(in);
while (nd != null && !nd.isMarker())
{
fqn = nd.getFqn();
@@ -322,76 +385,4 @@
integrateRetainedNode(child, descendant);
}
}
-
- protected TreeCache getCache()
- {
- return cache;
- }
-
- protected NodeFactory getFactory()
- {
- return factory;
- }
-
- protected byte getNodeType()
- {
- return nodeType;
- }
-
- protected Fqn getTargetFqn()
- {
- return targetFqn;
- }
-
- public void integrateState(ObjectInputStream ois, DataNode target, ClassLoader cl) throws Exception
- {
- Throwable cause=null;
- //first try integrating transient state
- try
- {
- integrateTransientState(ois, target, cl);
- }
- catch(Throwable t)
- {
- cause = t;
- log.error("Failed integrating transient state.",t);
- }
-
- //then try integrating associated state
- try
- {
- integrateAssociatedState(ois);
- }
- catch(Throwable t)
- {
- cause = t;
- log.error("Failed integrating associated state.",t);
- }
-
- //finally try integrating persistent
- try
- {
- integratePersistentState(ois);
- }
- catch (ClassCastException cce)
- {
- cause = cce;
- log.error("Failed integrating persistent state. One of cacheloaders is not"
- + " adhering to state stream format [JBCACHE-738].");
- }
- catch(Throwable t)
- {
- cause = t;
- log.error("Failed integrating persistent state.", t);
- }
- finally
- {
- ois.close();
- if(cause!=null)
- {
- throw new Exception("State transfer failed ");
- }
- }
- }
-
}
More information about the jboss-cvs-commits
mailing list