[jboss-cvs] JBossCache/src/org/jboss/cache ...
Vladmir Blagojevic
vladimir.blagojevic at jboss.com
Wed Dec 20 17:28:13 EST 2006
User: vblagojevic
Date: 06/12/20 17:28:13
Modified: src/org/jboss/cache TreeCache.java
Log:
final state transfer refactoring
Revision Changes Path
1.298 +127 -72 JBossCache/src/org/jboss/cache/TreeCache.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: TreeCache.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/TreeCache.java,v
retrieving revision 1.297
retrieving revision 1.298
diff -u -b -r1.297 -r1.298
--- TreeCache.java 20 Dec 2006 16:04:02 -0000 1.297
+++ TreeCache.java 20 Dec 2006 22:28:13 -0000 1.298
@@ -39,6 +39,7 @@
import org.jboss.cache.notifications.Notifier;
import org.jboss.cache.optimistic.DataVersion;
import org.jboss.cache.statetransfer.StateTransferManager;
+import org.jboss.cache.util.ExposedByteArrayOutputStream;
import org.jboss.cache.util.MapCopy;
import org.jboss.util.stream.MarshalledValueInputStream;
import org.jboss.util.stream.MarshalledValueOutputStream;
@@ -58,6 +59,7 @@
import org.jgroups.stack.IpAddress;
import org.jgroups.util.Rsp;
import org.jgroups.util.RspList;
+import org.jgroups.util.Util;
import org.w3c.dom.Element;
import javax.management.MBeanServer;
@@ -66,6 +68,8 @@
import javax.transaction.SystemException;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
+
+import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -95,7 +99,7 @@
* @author <a href="mailto:manik at jboss.org">Manik Surtani (manik at jboss.org)</a>
* @author Brian Stansberry
* @author Daniel Huang (dhuang at jboss.org)
- * @version $Id: TreeCache.java,v 1.297 2006/12/20 16:04:02 msurtani Exp $
+ * @version $Id: TreeCache.java,v 1.298 2006/12/20 22:28:13 vblagojevic Exp $
* <p/>
* @see <a href="http://labs.jboss.com/portal/jbosscache/docs">JBossCache doc</a>
*/
@@ -1136,15 +1140,30 @@
* @param suppressErrors should any Throwable thrown be suppressed?
* @return a serialized byte[][], element 0 is the transient state
* (or null), and element 1 is the persistent state (or null).
+ *
+ *
+ *TODO here only because of BuddyManager state transfer
+ *Consider for removal if BuddyManager transfer changes
+ *
*/
- public byte[] _getState(Fqn fqn, long timeout, boolean force, boolean suppressErrors) throws Throwable
+ public byte[] generateState(Fqn fqn, long timeout, boolean force, boolean suppressErrors) throws Throwable
{
- return getStateTransferManager().getState(fqn, timeout, force, suppressErrors);
- }
- public void _getState(OutputStream os, Fqn fqn, long timeout, boolean force, boolean suppressErrors) throws Throwable
+ MarshalledValueOutputStream out = null;
+ byte[] result = null;
+ try
+ {
+ ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream(16 * 1024);
+ out = new MarshalledValueOutputStream(baos);
+ getStateTransferManager().getState(out, fqn, timeout, force, suppressErrors);
+ result = baos.getRawBuffer();
+ }
+ finally
{
- getStateTransferManager().getState(os, fqn, timeout, force, suppressErrors);
+ Util.close(out);
+ }
+
+ return result;
}
private void removeLocksForDeadMembers(Node node,
@@ -3248,25 +3267,17 @@
}
}
- /**
- * Returns a copy of the current cache (tree). It actually returns a 2
- * element array of byte[], element 0 being the transient state (or null)
- * and element 1 being the persistent state (or null)
- */
public byte[] getState()
{
+ MarshalledValueOutputStream out = null;
+ byte[] result = null;
try
{
- // // We use the lock acquisition timeout rather than the
- // // state transfer timeout, otherwise we'd never try
- // // to break locks before the requesting node gives up
- // return cache._getState(Fqn.fromString(SEPARATOR),
- // cache.getLockAcquisitionTimeout(),
- // true,
- // true);
- // Until flush is in place, use the old mechanism
- // where we wait the full state retrieval timeout
- return _getState(Fqn.ROOT, configuration.getInitialStateRetrievalTimeout(), true, true);
+ ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream(16 * 1024);
+ out = new MarshalledValueOutputStream(baos);
+
+ getStateTransferManager().getState(out,Fqn.ROOT, configuration.getInitialStateRetrievalTimeout(), true, true);
+ result = baos.getRawBuffer();
}
catch (Throwable t)
{
@@ -3275,8 +3286,12 @@
my_log.error("Caught " + t.getClass().getName() +
" while responding to initial state transfer request;" +
" returning null", t);
- return null;
}
+ finally
+ {
+ Util.close(out);
+ }
+ return result;
}
public void setState(byte[] new_state)
@@ -3286,9 +3301,16 @@
my_log.debug("transferred state is null (may be first member in cluster)");
return;
}
+ ByteArrayInputStream bais = new ByteArrayInputStream(new_state);
+ MarshalledValueInputStream in = null;
try
{
- getStateTransferManager().setState(new_state, Fqn.ROOT, null);
+ in = new MarshalledValueInputStream(bais);
+ boolean hasState = in.readBoolean();
+ if(hasState)
+ {
+ getStateTransferManager().setState(in, Fqn.ROOT, null);
+ }
isStateSet = true;
}
catch (Throwable t)
@@ -3305,6 +3327,7 @@
}
finally
{
+ Util.close(in);
synchronized (stateLock)
{
// Notify wait that state has been set.
@@ -3315,15 +3338,24 @@
public byte[] getState(String state_id)
{
+ MarshalledValueOutputStream out = null;
String sourceRoot = state_id;
+ byte[] result = null;
+
boolean hasDifferentSourceAndIntegrationRoots = state_id.indexOf(StateTransferManager.PARTIAL_STATE_DELIMETER) > 0;
if (hasDifferentSourceAndIntegrationRoots)
{
sourceRoot = state_id.split(StateTransferManager.PARTIAL_STATE_DELIMETER)[0];
}
+
try
{
- return _getState(Fqn.fromString(sourceRoot), configuration.getInitialStateRetrievalTimeout(), true, true);
+ ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream(16 * 1024);
+ out = new MarshalledValueOutputStream(baos);
+
+ getStateTransferManager().getState(out, Fqn.fromString(sourceRoot),
+ configuration.getInitialStateRetrievalTimeout(), true, true);
+ result = baos.getRawBuffer();
}
catch (Throwable t)
{
@@ -3332,15 +3364,21 @@
my_log.error("Caught " + t.getClass().getName() +
" while responding to partial state transfer request;" +
" returning null", t);
- return null;
}
+ finally
+ {
+ Util.close(out);
+ }
+ return result;
}
public void getState(OutputStream ostream)
{
+ MarshalledValueOutputStream out = null;
try
{
- _getState(ostream, Fqn.ROOT, configuration.getInitialStateRetrievalTimeout(), true, true);
+ out = new MarshalledValueOutputStream(ostream);
+ getStateTransferManager().getState(out, Fqn.ROOT, configuration.getInitialStateRetrievalTimeout(), true, true);
}
catch (Throwable t)
{
@@ -3350,11 +3388,16 @@
" while responding to initial state transfer request;" +
" returning null", t);
}
+ finally
+ {
+ Util.close(out);
+ }
}
public void getState(String state_id, OutputStream ostream)
{
String sourceRoot = state_id;
+ MarshalledValueOutputStream out = null;
boolean hasDifferentSourceAndIntegrationRoots = state_id.indexOf(StateTransferManager.PARTIAL_STATE_DELIMETER) > 0;
if (hasDifferentSourceAndIntegrationRoots)
{
@@ -3362,7 +3405,8 @@
}
try
{
- _getState(ostream, Fqn.fromString(sourceRoot), configuration.getInitialStateRetrievalTimeout(), true, true);
+ out = new MarshalledValueOutputStream(ostream);
+ getStateTransferManager().getState(out, Fqn.fromString(sourceRoot), configuration.getInitialStateRetrievalTimeout(), true, true);
}
catch (Throwable t)
{
@@ -3372,6 +3416,10 @@
" while responding to partial state transfer request;" +
" returning null", t);
}
+ finally
+ {
+ Util.close(out);
+ }
}
public void setState(InputStream istream)
@@ -3381,19 +3429,16 @@
my_log.debug("stream is null (may be first member in cluster)");
return;
}
+ MarshalledValueInputStream in = null;
try
{
- MarshalledValueInputStream in = new MarshalledValueInputStream(istream);
+ in = new MarshalledValueInputStream(istream);
boolean hasState = in.readBoolean();
- if (!hasState)
- {
- in.close();
- }
- else
+ if(hasState)
{
getStateTransferManager().setState(in, Fqn.ROOT, null);
- isStateSet = true;
}
+ isStateSet = true;
}
catch (Throwable t)
{
@@ -3409,6 +3454,7 @@
}
finally
{
+ Util.close(in);
synchronized (stateLock)
{
// Notify wait that state has been set.
@@ -3419,6 +3465,13 @@
public void setState(String state_id, byte[] state)
{
+ if (state == null)
+ {
+ my_log.debug("partial transferred state is null");
+ return;
+ }
+
+ MarshalledValueInputStream in = null;
String targetRoot = state_id;
boolean hasDifferentSourceAndIntegrationRoots = state_id.indexOf(StateTransferManager.PARTIAL_STATE_DELIMETER) > 0;
if (hasDifferentSourceAndIntegrationRoots)
@@ -3428,8 +3481,6 @@
try
{
- if (state != null)
- {
my_log.debug("Setting received partial state for subroot " + state_id);
Fqn subroot = Fqn.fromString(targetRoot);
Region region = regionManager.getRegion(subroot, false);
@@ -3439,9 +3490,14 @@
// If a classloader is registered for the node's region, use it
cl = region.getClassLoader();
}
- getStateTransferManager().setState(state, subroot, cl);
- isStateSet = true;
+ ByteArrayInputStream bais = new ByteArrayInputStream(state);
+ in = new MarshalledValueInputStream(bais);
+ boolean hasState = in.readBoolean();
+ if(hasState)
+ {
+ getStateTransferManager().setState(in, subroot, cl);
}
+ isStateSet = true;
}
catch (Throwable t)
{
@@ -3457,6 +3513,7 @@
}
finally
{
+ Util.close(in);
synchronized (stateLock)
{
// Notify wait that state has been set.
@@ -3468,6 +3525,7 @@
public void setState(String state_id, InputStream istream)
{
String targetRoot = state_id;
+ MarshalledValueInputStream in = null;
boolean hasDifferentSourceAndIntegrationRoots = state_id.indexOf(StateTransferManager.PARTIAL_STATE_DELIMETER) > 0;
if (hasDifferentSourceAndIntegrationRoots)
{
@@ -3481,15 +3539,8 @@
try
{
- MarshalledValueInputStream in = new MarshalledValueInputStream(istream);
- boolean hasState = in.readBoolean();
- if (!hasState)
- {
- in.close();
- }
- else
- {
my_log.debug("Setting received partial state for subroot " + state_id);
+ in = new MarshalledValueInputStream(istream);
Fqn subroot = Fqn.fromString(targetRoot);
Region region = regionManager.getRegion(subroot, false);
ClassLoader cl = null;
@@ -3498,9 +3549,12 @@
// If a classloader is registered for the node's region, use it
cl = region.getClassLoader();
}
- getStateTransferManager().setState(in, Fqn.fromString(state_id), cl);
- isStateSet = true;
+ boolean hasState = in.readBoolean();
+ if(hasState)
+ {
+ getStateTransferManager().setState(in,subroot, cl);
}
+ isStateSet = true;
}
catch (Throwable t)
{
@@ -3516,6 +3570,7 @@
}
finally
{
+ Util.close(in);
synchronized (stateLock)
{
// Notify wait that state has been set.
More information about the jboss-cvs-commits
mailing list