[jbosscache-commits] JBoss Cache SVN: r5623 - in core/trunk/src: main/java/org/jboss/cache/factories and 4 other directories.
jbosscache-commits at lists.jboss.org
jbosscache-commits at lists.jboss.org
Tue Apr 22 12:09:06 EDT 2008
Author: mircea.markus
Date: 2008-04-22 12:09:06 -0400 (Tue, 22 Apr 2008)
New Revision: 5623
Added:
core/trunk/src/main/java/org/jboss/cache/remoting/jgroups/ChannelMessageListener.java
Removed:
core/trunk/src/main/java/org/jboss/cache/remoting/jgroups/CacheMessageListener.java
Modified:
core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
core/trunk/src/main/java/org/jboss/cache/factories/EmptyConstructorFactory.java
core/trunk/src/main/java/org/jboss/cache/marshall/CacheMarshaller200.java
core/trunk/src/test/java/org/jboss/cache/marshall/ActiveInactiveTest.java
core/trunk/src/test/java/org/jboss/cache/marshall/AsyncReplTest.java
core/trunk/src/test/java/org/jboss/cache/statetransfer/FailedStateTransferTest.java
Log:
JBCACHE-1222 - bug fixing - marshalling
Modified: core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java 2008-04-22 16:03:18 UTC (rev 5622)
+++ core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java 2008-04-22 16:09:06 UTC (rev 5623)
@@ -22,7 +22,7 @@
import org.jboss.cache.marshall.Marshaller;
import org.jboss.cache.marshall.MethodCallWrapper;
import org.jboss.cache.notifications.Notifier;
-import org.jboss.cache.remoting.jgroups.CacheMessageListener;
+import org.jboss.cache.remoting.jgroups.ChannelMessageListener;
import org.jboss.cache.statetransfer.StateTransferManager;
import org.jboss.cache.transaction.GlobalTransaction;
import org.jboss.cache.transaction.TransactionTable;
@@ -81,7 +81,7 @@
/**
* JGroups message listener.
*/
- private CacheMessageListener messageListener;
+ private ChannelMessageListener messageListener;
private Configuration configuration;
private Notifier notifier;
private CacheSPI spi;
@@ -97,7 +97,7 @@
private boolean isInLocalMode;
@Inject
- private void setupDependencies(CacheMessageListener messageListener, Configuration configuration, Notifier notifier,
+ private void setupDependencies(ChannelMessageListener messageListener, Configuration configuration, Notifier notifier,
CacheSPI spi, Marshaller marshaller, TransactionTable txTable,
TransactionManager txManager, InvocationContextContainer container, InterceptorChain interceptorChain,
CacheLifecycleManager lifecycleManager)
@@ -439,6 +439,7 @@
useOutOfBandMessage = false;
+ //todo check whether we can get rid of the MethodCallWrapper and use the command directly
rsps = responseFilter == null
? disp.callRemoteMethods(validMembers, new MethodCallWrapper(command), modeToUse, timeout, isUsingBuddyReplication, useOutOfBandMessage)
: disp.callRemoteMethods(validMembers, new MethodCallWrapper(command), modeToUse, timeout, isUsingBuddyReplication, useOutOfBandMessage, responseFilter);
Modified: core/trunk/src/main/java/org/jboss/cache/factories/EmptyConstructorFactory.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/factories/EmptyConstructorFactory.java 2008-04-22 16:03:18 UTC (rev 5622)
+++ core/trunk/src/main/java/org/jboss/cache/factories/EmptyConstructorFactory.java 2008-04-22 16:09:06 UTC (rev 5623)
@@ -13,7 +13,7 @@
import org.jboss.cache.marshall.Marshaller;
import org.jboss.cache.marshall.VersionAwareMarshaller;
import org.jboss.cache.notifications.Notifier;
-import org.jboss.cache.remoting.jgroups.CacheMessageListener;
+import org.jboss.cache.remoting.jgroups.ChannelMessageListener;
import org.jboss.cache.statetransfer.StateTransferManager;
import org.jboss.cache.transaction.TransactionTable;
@@ -24,7 +24,7 @@
* @since 2.1.0
*/
@DefaultFactoryFor(classes = {StateTransferManager.class, TransactionTable.class, RegionManager.class, Notifier.class,
- CacheMessageListener.class, CacheLoaderManager.class, Marshaller.class,
+ ChannelMessageListener.class, CacheLoaderManager.class, Marshaller.class,
InvocationContextContainer.class, CacheInvocationDelegate.class,
CacheTransactionHelper.class, CacheData.class, CommandsFactory.class, LockManager.class})
public class EmptyConstructorFactory extends ComponentFactory
Modified: core/trunk/src/main/java/org/jboss/cache/marshall/CacheMarshaller200.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/CacheMarshaller200.java 2008-04-22 16:03:18 UTC (rev 5622)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/CacheMarshaller200.java 2008-04-22 16:09:06 UTC (rev 5623)
@@ -92,9 +92,9 @@
region = rrv.region;
o = rrv.returnValue;
}
- else if (o instanceof MarshallableCommand)
+ else if (o instanceof MethodCallWrapper)
{
- MarshallableCommand marshallableCommand = (MarshallableCommand) o;
+ MarshallableCommand marshallableCommand = ((MethodCallWrapper) o).getCommand();
region = extractFqnRegion(marshallableCommand);
}
Deleted: core/trunk/src/main/java/org/jboss/cache/remoting/jgroups/CacheMessageListener.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/remoting/jgroups/CacheMessageListener.java 2008-04-22 16:03:18 UTC (rev 5622)
+++ core/trunk/src/main/java/org/jboss/cache/remoting/jgroups/CacheMessageListener.java 2008-04-22 16:09:06 UTC (rev 5623)
@@ -1,376 +0,0 @@
-package org.jboss.cache.remoting.jgroups;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.jboss.cache.CacheException;
-import org.jboss.cache.Fqn;
-import org.jboss.cache.config.Configuration;
-import org.jboss.cache.factories.annotations.Inject;
-import org.jboss.cache.statetransfer.StateTransferManager;
-import org.jboss.cache.util.ExposedByteArrayOutputStream;
-import org.jboss.util.stream.MarshalledValueInputStream;
-import org.jboss.util.stream.MarshalledValueOutputStream;
-import org.jgroups.ExtendedMessageListener;
-import org.jgroups.Message;
-import org.jgroups.util.Util;
-
-import java.io.ByteArrayInputStream;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-/**
- * JGroups MessageListener
- *
- * @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
- * @since 2.1.0
- */
-public class CacheMessageListener implements ExtendedMessageListener
-{
- /**
- * Reference to an exception that was raised during
- * state installation on this node.
- */
- protected volatile Exception setStateException;
- private final Object stateLock = new Object();
- private Log log = LogFactory.getLog(CacheMessageListener.class);
- private StateTransferManager stateTransferManager;
- private Configuration configuration;
- /**
- * True if state was initialized during start-up.
- */
- private volatile boolean isStateSet = false;
-
- @Inject
- private void injectDependencies(StateTransferManager stateTransferManager, Configuration configuration)
- {
- this.stateTransferManager = stateTransferManager;
- this.configuration = configuration;
- }
-
- public boolean isStateSet()
- {
- return isStateSet;
- }
-
- public void setStateSet(boolean stateSet)
- {
- isStateSet = stateSet;
- }
-
- public void waitForState() throws Exception
- {
- synchronized (stateLock)
- {
- while (!isStateSet)
- {
- if (setStateException != null)
- {
- throw setStateException;
- }
-
- try
- {
- stateLock.wait();
- }
- catch (InterruptedException iex)
- {
- }
- }
- }
- }
-
- protected void stateReceivedSuccess()
- {
- isStateSet = true;
- setStateException = null;
- }
-
- protected void stateReceivingFailed(Throwable t)
- {
- if (t instanceof CacheException)
- {
- log.debug(t);
- }
- else
- {
- log.error("failed setting state", t);
- }
- if (t instanceof Exception)
- {
- setStateException = (Exception) t;
- }
- else
- {
- setStateException = new Exception(t);
- }
- }
-
- protected void stateProducingFailed(Throwable t)
- {
- if (t instanceof CacheException)
- {
- log.debug(t);
- }
- else
- {
- log.error("Caught " + t.getClass().getName()
- + " while responding to state transfer request", t);
- }
- }
-
- /**
- * Callback, does nothing.
- */
- public void receive(Message msg)
- {
- }
-
- public byte[] getState()
- {
- MarshalledValueOutputStream out = null;
- byte[] result = null;
- ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream(16 * 1024);
- try
- {
- out = new MarshalledValueOutputStream(baos);
-
- stateTransferManager.getState(out, Fqn.ROOT, configuration.getStateRetrievalTimeout(), true, true);
- }
- catch (Throwable t)
- {
- stateProducingFailed(t);
- }
- finally
- {
- result = baos.getRawBuffer();
- Util.close(out);
- }
- return result;
- }
-
- public void setState(byte[] new_state)
- {
- if (new_state == null)
- {
- log.debug("transferred state is null (may be first member in cluster)");
- return;
- }
- ByteArrayInputStream bais = new ByteArrayInputStream(new_state);
- MarshalledValueInputStream in = null;
- try
- {
- in = new MarshalledValueInputStream(bais);
- stateTransferManager.setState(in, Fqn.ROOT);
- stateReceivedSuccess();
- }
- catch (Throwable t)
- {
- stateReceivingFailed(t);
- }
- finally
- {
- Util.close(in);
- synchronized (stateLock)
- {
- // Notify wait that state has been set.
- stateLock.notifyAll();
- }
- }
- }
-
- public byte[] getState(String state_id)
- {
- MarshalledValueOutputStream out = null;
- String sourceRoot = state_id;
- byte[] result = null;
-
- boolean hasDifferentSourceAndIntegrationRoots = state_id.indexOf(StateTransferManager.PARTIAL_STATE_DELIMITER) > 0;
- if (hasDifferentSourceAndIntegrationRoots)
- {
- sourceRoot = state_id.split(StateTransferManager.PARTIAL_STATE_DELIMITER)[0];
- }
-
- ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream(16 * 1024);
- try
- {
- out = new MarshalledValueOutputStream(baos);
-
- stateTransferManager.getState(out, Fqn.fromString(sourceRoot),
- configuration.getStateRetrievalTimeout(), true, true);
- }
- catch (Throwable t)
- {
- stateProducingFailed(t);
- }
- finally
- {
- result = baos.getRawBuffer();
- Util.close(out);
- }
- return result;
- }
-
- public void getState(OutputStream ostream)
- {
- MarshalledValueOutputStream out = null;
- try
- {
- out = new MarshalledValueOutputStream(ostream);
- stateTransferManager.getState(out, Fqn.ROOT, configuration.getStateRetrievalTimeout(), true, true);
- }
- catch (Throwable t)
- {
- stateProducingFailed(t);
- }
- finally
- {
- Util.close(out);
- }
- }
-
- public void getState(String state_id, OutputStream ostream)
- {
- String sourceRoot = state_id;
- MarshalledValueOutputStream out = null;
- boolean hasDifferentSourceAndIntegrationRoots = state_id.indexOf(StateTransferManager.PARTIAL_STATE_DELIMITER) > 0;
- if (hasDifferentSourceAndIntegrationRoots)
- {
- sourceRoot = state_id.split(StateTransferManager.PARTIAL_STATE_DELIMITER)[0];
- }
- try
- {
- out = new MarshalledValueOutputStream(ostream);
- stateTransferManager.getState(out, Fqn.fromString(sourceRoot), configuration.getStateRetrievalTimeout(), true, true);
- }
- catch (Throwable t)
- {
- stateProducingFailed(t);
- }
- finally
- {
- Util.close(out);
- }
- }
-
- public void setState(InputStream istream)
- {
- if (istream == null)
- {
- log.debug("stream is null (may be first member in cluster)");
- return;
- }
- MarshalledValueInputStream in = null;
- try
- {
- in = new MarshalledValueInputStream(istream);
- stateTransferManager.setState(in, Fqn.ROOT);
- stateReceivedSuccess();
- }
- catch (Throwable t)
- {
- stateReceivingFailed(t);
- }
- finally
- {
- Util.close(in);
- synchronized (stateLock)
- {
- // Notify wait that state has been set.
- stateLock.notifyAll();
- }
- }
- }
-
- public void setState(String state_id, byte[] state)
- {
- if (state == null)
- {
- log.debug("partial transferred state is null");
- return;
- }
-
- MarshalledValueInputStream in = null;
- String targetRoot = state_id;
- boolean hasDifferentSourceAndIntegrationRoots = state_id.indexOf(StateTransferManager.PARTIAL_STATE_DELIMITER) > 0;
- if (hasDifferentSourceAndIntegrationRoots)
- {
- targetRoot = state_id.split(StateTransferManager.PARTIAL_STATE_DELIMITER)[1];
- }
- try
- {
- log.debug("Setting received partial state for subroot " + state_id);
- Fqn subroot = Fqn.fromString(targetRoot);
-// Region region = regionManager.getRegion(subroot, false);
-// ClassLoader cl = null;
-// if (region != null)
-// {
-// // If a classloader is registered for the node's region, use it
-// cl = region.getClassLoader();
-// }
- ByteArrayInputStream bais = new ByteArrayInputStream(state);
- in = new MarshalledValueInputStream(bais);
- //getStateTransferManager().setState(in, subroot, cl);
- stateTransferManager.setState(in, subroot);
- stateReceivedSuccess();
- }
- catch (Throwable t)
- {
- stateReceivingFailed(t);
- }
- finally
- {
- Util.close(in);
- synchronized (stateLock)
- {
- // Notify wait that state has been set.
- stateLock.notifyAll();
- }
- }
- }
-
- public void setState(String state_id, InputStream istream)
- {
- String targetRoot = state_id;
- MarshalledValueInputStream in = null;
- boolean hasDifferentSourceAndIntegrationRoots = state_id.indexOf(StateTransferManager.PARTIAL_STATE_DELIMITER) > 0;
- if (hasDifferentSourceAndIntegrationRoots)
- {
- targetRoot = state_id.split(StateTransferManager.PARTIAL_STATE_DELIMITER)[1];
- }
- if (istream == null)
- {
- log.debug("stream is null (may be first member in cluster). State is not set");
- return;
- }
-
- try
- {
- log.debug("Setting received partial state for subroot " + state_id);
- in = new MarshalledValueInputStream(istream);
- Fqn subroot = Fqn.fromString(targetRoot);
-// Region region = regionManager.getRegion(subroot, false);
-// ClassLoader cl = null;
-// if (region != null)
-// {
-// // If a classloader is registered for the node's region, use it
-// cl = region.getClassLoader();
-// }
- //getStateTransferManager().setState(in, subroot, cl);
- stateTransferManager.setState(in, subroot);
- stateReceivedSuccess();
- }
- catch (Throwable t)
- {
- stateReceivingFailed(t);
- }
- finally
- {
- Util.close(in);
- synchronized (stateLock)
- {
- // Notify wait that state has been set.
- stateLock.notifyAll();
- }
- }
- }
-}
Copied: core/trunk/src/main/java/org/jboss/cache/remoting/jgroups/ChannelMessageListener.java (from rev 5617, core/trunk/src/main/java/org/jboss/cache/remoting/jgroups/CacheMessageListener.java)
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/remoting/jgroups/ChannelMessageListener.java (rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/remoting/jgroups/ChannelMessageListener.java 2008-04-22 16:09:06 UTC (rev 5623)
@@ -0,0 +1,377 @@
+package org.jboss.cache.remoting.jgroups;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.cache.CacheException;
+import org.jboss.cache.Fqn;
+import org.jboss.cache.config.Configuration;
+import org.jboss.cache.factories.annotations.Inject;
+import org.jboss.cache.statetransfer.StateTransferManager;
+import org.jboss.cache.util.ExposedByteArrayOutputStream;
+import org.jboss.util.stream.MarshalledValueInputStream;
+import org.jboss.util.stream.MarshalledValueOutputStream;
+import org.jgroups.ExtendedMessageListener;
+import org.jgroups.Message;
+import org.jgroups.util.Util;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * JGroups MessageListener
+ *
+ * @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
+ * @since 2.1.0
+ */
+public class ChannelMessageListener implements ExtendedMessageListener
+{
+ /**
+ * Reference to an exception that was raised during
+ * state installation on this node.
+ */
+ protected volatile Exception setStateException;
+ private final Object stateLock = new Object();
+ private Log log = LogFactory.getLog(ChannelMessageListener.class);
+ private StateTransferManager stateTransferManager;
+ private Configuration configuration;
+ /**
+ * True if state was initialized during start-up.
+ */
+ private volatile boolean isStateSet = false;
+
+ @Inject
+ private void injectDependencies(StateTransferManager stateTransferManager, Configuration configuration)
+ {
+ this.stateTransferManager = stateTransferManager;
+ this.configuration = configuration;
+ }
+
+ public boolean isStateSet()
+ {
+ return isStateSet;
+ }
+
+ public void setStateSet(boolean stateSet)
+ {
+ isStateSet = stateSet;
+ }
+
+ public void waitForState() throws Exception
+ {
+ synchronized (stateLock)
+ {
+ while (!isStateSet)
+ {
+ if (setStateException != null)
+ {
+ throw setStateException;
+ }
+
+ try
+ {
+ stateLock.wait();
+ }
+ catch (InterruptedException iex)
+ {
+ }
+ }
+ }
+ }
+
+ protected void stateReceivedSuccess()
+ {
+ isStateSet = true;
+ setStateException = null;
+ }
+
+ protected void stateReceivingFailed(Throwable t)
+ {
+ if (t instanceof CacheException)
+ {
+ log.debug(t);
+ }
+ else
+ {
+ log.error("failed setting state", t);
+ }
+ if (t instanceof Exception)
+ {
+ setStateException = (Exception) t;
+ }
+ else
+ {
+ setStateException = new Exception(t);
+ }
+ }
+
+ protected void stateProducingFailed(Throwable t)
+ {
+ if (t instanceof CacheException)
+ {
+ log.debug(t);
+ }
+ else
+ {
+ log.error("Caught " + t.getClass().getName()
+ + " while responding to state transfer request", t);
+ }
+ }
+
+ /**
+ * Callback, does nothing.
+ */
+ public void receive(Message msg)
+ {
+ }
+
+ public byte[] getState()
+ {
+ MarshalledValueOutputStream out = null;
+ byte[] result = null;
+ ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream(16 * 1024);
+ try
+ {
+ out = new MarshalledValueOutputStream(baos);
+
+ stateTransferManager.getState(out, Fqn.ROOT, configuration.getStateRetrievalTimeout(), true, true);
+ }
+ catch (Throwable t)
+ {
+ stateProducingFailed(t);
+ }
+ finally
+ {
+ result = baos.getRawBuffer();
+ Util.close(out);
+ }
+ return result;
+ }
+
+ public void setState(byte[] new_state)
+ {
+ if (new_state == null)
+ {
+ log.debug("transferred state is null (may be first member in cluster)");
+ return;
+ }
+ ByteArrayInputStream bais = new ByteArrayInputStream(new_state);
+ MarshalledValueInputStream in = null;
+ try
+ {
+ in = new MarshalledValueInputStream(bais);
+ stateTransferManager.setState(in, Fqn.ROOT);
+ stateReceivedSuccess();
+ }
+ catch (Throwable t)
+ {
+ stateReceivingFailed(t);
+ }
+ finally
+ {
+ Util.close(in);
+ synchronized (stateLock)
+ {
+ // Notify wait that state has been set.
+ stateLock.notifyAll();
+ }
+ }
+ }
+
+ public byte[] getState(String state_id)
+ {
+ MarshalledValueOutputStream out = null;
+ String sourceRoot = state_id;
+ byte[] result = null;
+
+ boolean hasDifferentSourceAndIntegrationRoots = state_id.indexOf(StateTransferManager.PARTIAL_STATE_DELIMITER) > 0;
+ if (hasDifferentSourceAndIntegrationRoots)
+ {
+ sourceRoot = state_id.split(StateTransferManager.PARTIAL_STATE_DELIMITER)[0];
+ }
+
+ ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream(16 * 1024);
+ try
+ {
+ out = new MarshalledValueOutputStream(baos);
+
+ stateTransferManager.getState(out, Fqn.fromString(sourceRoot),
+ configuration.getStateRetrievalTimeout(), true, true);
+ }
+ catch (Throwable t)
+ {
+ stateProducingFailed(t);
+ }
+ finally
+ {
+ result = baos.getRawBuffer();
+ Util.close(out);
+ }
+ return result;
+ }
+
+ public void getState(OutputStream ostream)
+ {
+ MarshalledValueOutputStream out = null;
+ try
+ {
+ out = new MarshalledValueOutputStream(ostream);
+ stateTransferManager.getState(out, Fqn.ROOT, configuration.getStateRetrievalTimeout(), true, true);
+ }
+ catch (Throwable t)
+ {
+ stateProducingFailed(t);
+ }
+ finally
+ {
+ Util.close(out);
+ }
+ }
+
+ public void getState(String state_id, OutputStream ostream)
+ {
+ String sourceRoot = state_id;
+ MarshalledValueOutputStream out = null;
+ boolean hasDifferentSourceAndIntegrationRoots = state_id.indexOf(StateTransferManager.PARTIAL_STATE_DELIMITER) > 0;
+ if (hasDifferentSourceAndIntegrationRoots)
+ {
+ sourceRoot = state_id.split(StateTransferManager.PARTIAL_STATE_DELIMITER)[0];
+ }
+ try
+ {
+ out = new MarshalledValueOutputStream(ostream);
+ stateTransferManager.getState(out, Fqn.fromString(sourceRoot), configuration.getStateRetrievalTimeout(), true, true);
+ }
+ catch (Throwable t)
+ {
+ stateProducingFailed(t);
+ }
+ finally
+ {
+ Util.close(out);
+ }
+ }
+
+ public void setState(InputStream istream)
+ {
+ if (istream == null)
+ {
+ log.debug("stream is null (may be first member in cluster)");
+ return;
+ }
+ MarshalledValueInputStream in = null;
+ try
+ {
+ in = new MarshalledValueInputStream(istream);
+ stateTransferManager.setState(in, Fqn.ROOT);
+ stateReceivedSuccess();
+ }
+ catch (Throwable t)
+ {
+ stateReceivingFailed(t);
+ }
+ finally
+ {
+ Util.close(in);
+ synchronized (stateLock)
+ {
+ // Notify wait that state has been set.
+ stateLock.notifyAll();
+ }
+ }
+ }
+
+ public void setState(String state_id, byte[] state)
+ {
+ if (state == null)
+ {
+ log.debug("partial transferred state is null");
+ return;
+ }
+
+ MarshalledValueInputStream in = null;
+ String targetRoot = state_id;
+ boolean hasDifferentSourceAndIntegrationRoots = state_id.indexOf(StateTransferManager.PARTIAL_STATE_DELIMITER) > 0;
+ if (hasDifferentSourceAndIntegrationRoots)
+ {
+ targetRoot = state_id.split(StateTransferManager.PARTIAL_STATE_DELIMITER)[1];
+ }
+ try
+ {
+ log.debug("Setting received partial state for subroot " + state_id);
+ Fqn subroot = Fqn.fromString(targetRoot);
+// Region region = regionManager.getRegion(subroot, false);
+// ClassLoader cl = null;
+// if (region != null)
+// {
+// // If a classloader is registered for the node's region, use it
+// cl = region.getClassLoader();
+// }
+ ByteArrayInputStream bais = new ByteArrayInputStream(state);
+ in = new MarshalledValueInputStream(bais);
+ //getStateTransferManager().setState(in, subroot, cl);
+ stateTransferManager.setState(in, subroot);
+ stateReceivedSuccess();
+ }
+ catch (Throwable t)
+ {
+ stateReceivingFailed(t);
+ }
+ finally
+ {
+ Util.close(in);
+ synchronized (stateLock)
+ {
+ // Notify wait that state has been set.
+ stateLock.notifyAll();
+ }
+ }
+ }
+
+ public void setState(String stateId, InputStream istream)
+ {
+ if (log.isTraceEnabled()) log.trace("**** Receiving state for " + stateId);
+ String targetRoot = stateId;
+ MarshalledValueInputStream in = null;
+ boolean hasDifferentSourceAndIntegrationRoots = stateId.indexOf(StateTransferManager.PARTIAL_STATE_DELIMITER) > 0;
+ if (hasDifferentSourceAndIntegrationRoots)
+ {
+ targetRoot = stateId.split(StateTransferManager.PARTIAL_STATE_DELIMITER)[1];
+ }
+ if (istream == null)
+ {
+ log.debug("stream is null (may be first member in cluster). State is not set");
+ return;
+ }
+
+ try
+ {
+ log.debug("Setting received partial state for subroot " + stateId);
+ in = new MarshalledValueInputStream(istream);
+ Fqn subroot = Fqn.fromString(targetRoot);
+// Region region = regionManager.getRegion(subroot, false);
+// ClassLoader cl = null;
+// if (region != null)
+// {
+// // If a classloader is registered for the node's region, use it
+// cl = region.getClassLoader();
+// }
+ //getStateTransferManager().setState(in, subroot, cl);
+ stateTransferManager.setState(in, subroot);
+ stateReceivedSuccess();
+ }
+ catch (Throwable t)
+ {
+ stateReceivingFailed(t);
+ }
+ finally
+ {
+ Util.close(in);
+ synchronized (stateLock)
+ {
+ // Notify wait that state has been set.
+ stateLock.notifyAll();
+ }
+ }
+ }
+}
Modified: core/trunk/src/test/java/org/jboss/cache/marshall/ActiveInactiveTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/marshall/ActiveInactiveTest.java 2008-04-22 16:03:18 UTC (rev 5622)
+++ core/trunk/src/test/java/org/jboss/cache/marshall/ActiveInactiveTest.java 2008-04-22 16:09:06 UTC (rev 5623)
@@ -153,7 +153,7 @@
public void testObjectFromByteBuffer() throws Exception
{
PutKeyValueCommand put = new PutKeyValueCommand(null, A_B, "name", "Joe", false, false);
- ReplicateCommand replicate = new ReplicateCommand(put);
+ MethodCallWrapper replicate = new MethodCallWrapper(new ReplicateCommand(put));
rman.setDefaultInactive(true);
// register A as an inactive marshalling region
Modified: core/trunk/src/test/java/org/jboss/cache/marshall/AsyncReplTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/marshall/AsyncReplTest.java 2008-04-22 16:03:18 UTC (rev 5622)
+++ core/trunk/src/test/java/org/jboss/cache/marshall/AsyncReplTest.java 2008-04-22 16:09:06 UTC (rev 5623)
@@ -43,9 +43,9 @@
{
CacheSPI<Object, Object> cache1, cache2;
String props = null;
- Person ben_;
- Address addr_;
- Throwable ex_;
+ Person ben;
+ Address addr;
+ Throwable ex;
private Fqn<String> aop = Fqn.fromString("/aop");
protected boolean useMarshalledValues = false;
@@ -60,11 +60,11 @@
cache2 = createCache("TestCache");
- addr_ = new Address();
- addr_.setCity("San Jose");
- ben_ = new Person();
- ben_.setName("Ben");
- ben_.setAddress(addr_);
+ addr = new Address();
+ addr.setCity("San Jose");
+ ben = new Person();
+ ben.setName("Ben");
+ ben.setAddress(addr);
// Pause to give caches time to see each other
TestingUtil.blockUntilViewsReceived(new CacheSPI[]{cache1, cache2}, 60000);
@@ -109,8 +109,8 @@
}
if (useMarshalledValues) Thread.currentThread().setContextClassLoader(cla);
- cache1.put(aop, "person", ben_);
- cache1.put(Fqn.fromString("/alias"), "person", ben_);
+ cache1.put(aop, "person", ben);
+ cache1.put(Fqn.fromString("/alias"), "person", ben);
if (useMarshalledValues) resetContextClassLoader();
TestingUtil.sleepThread(1000);
@@ -120,7 +120,7 @@
ben2 = cache2.get(aop, "person");
if (useMarshalledValues) resetContextClassLoader();
assertNotNull(ben2);
- assertEquals(ben_.toString(), ben2.toString());
+ assertEquals(ben.toString(), ben2.toString());
Class<?> claz = clb.loadClass(ADDRESS_CLASSNAME);
Object add = claz.newInstance();
@@ -161,7 +161,7 @@
Object scopedBen2 = getPersonFromClassloader(clb);
if (useMarshalledValues) Thread.currentThread().setContextClassLoader(cla);
- cache1.put(Fqn.fromString("/aop/1"), "person", ben_);
+ cache1.put(Fqn.fromString("/aop/1"), "person", ben);
cache1.put(Fqn.fromString("/aop/2"), "person", scopedBen1);
if (useMarshalledValues) resetContextClassLoader();
TestingUtil.sleepThread(1000);
@@ -170,7 +170,7 @@
// Can't cast it to Person. CCE will resutl.
if (useMarshalledValues) Thread.currentThread().setContextClassLoader(clb);
ben2 = cache2.get(Fqn.fromString("/aop/1"), "person");
- assertEquals(ben_.toString(), ben2.toString());
+ assertEquals(ben.toString(), ben2.toString());
ben2 = cache2.get(Fqn.fromString("/aop/2"), "person");
assertFalse("cache2 deserialized with scoped classloader", ben2 instanceof Person);
@@ -181,13 +181,13 @@
public void testTxPut() throws Exception
{
beginTransaction();
- cache1.put(aop, "person", ben_);
- cache1.put(aop, "person1", ben_);
+ cache1.put(aop, "person", ben);
+// cache1.put(aop, "person1", ben);
commit();
TestingUtil.sleepThread(1000);
Person ben2 = (Person) cache2.get(aop, "person");
assertNotNull("Person from 2nd cache should not be null ", ben2);
- assertEquals(ben_.toString(), ben2.toString());
+ assertEquals(ben.toString(), ben2.toString());
}
public void testTxCLSet2() throws Exception
@@ -205,7 +205,7 @@
if (useMarshalledValues) Thread.currentThread().setContextClassLoader(cla);
beginTransaction();
- cache1.put(aop, "person", ben_);
+ cache1.put(aop, "person", ben);
commit();
if (useMarshalledValues) resetContextClassLoader();
@@ -216,7 +216,7 @@
if (useMarshalledValues) Thread.currentThread().setContextClassLoader(clb);
ben2 = cache2.get(aop, "person");
if (useMarshalledValues) resetContextClassLoader();
- assertEquals(ben_.toString(), ben2.toString());
+ assertEquals(ben.toString(), ben2.toString());
Class<?> claz = clb.loadClass(ADDRESS_CLASSNAME);
Object add = claz.newInstance();
Modified: core/trunk/src/test/java/org/jboss/cache/statetransfer/FailedStateTransferTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/statetransfer/FailedStateTransferTest.java 2008-04-22 16:03:18 UTC (rev 5622)
+++ core/trunk/src/test/java/org/jboss/cache/statetransfer/FailedStateTransferTest.java 2008-04-22 16:09:06 UTC (rev 5623)
@@ -31,7 +31,7 @@
import org.jboss.cache.factories.UnitTestCacheConfigurationFactory;
import org.jboss.cache.lock.TimeoutException;
import org.jboss.cache.misc.TestingUtil;
-import org.jboss.cache.remoting.jgroups.CacheMessageListener;
+import org.jboss.cache.remoting.jgroups.ChannelMessageListener;
import static org.testng.AssertJUnit.fail;
import org.testng.annotations.Test;
@@ -61,8 +61,8 @@
// inject our own message listener and re-wire deps
ComponentRegistry cr = TestingUtil.extractComponentRegistry(cache);
-// cr.unregisterComponent(CacheMessageListener.class);
- cr.registerComponent(CacheMessageListener.class.getName(), new SecretiveStateCacheMessageListener(), CacheMessageListener.class);
+// cr.unregisterComponent(ChannelMessageListener.class);
+ cr.registerComponent(ChannelMessageListener.class.getName(), new SecretiveStateCacheMessageListener(), ChannelMessageListener.class);
// cr.updateDependencies();
cache.start();
@@ -80,8 +80,8 @@
// inject our own message listener and re-wire deps
cr = TestingUtil.extractComponentRegistry(recipient);
- //cr.unregisterComponent(CacheMessageListener.class);
- cr.registerComponent(CacheMessageListener.class.getName(), new SecretiveStateCacheMessageListener(), CacheMessageListener.class);
+ //cr.unregisterComponent(ChannelMessageListener.class);
+ cr.registerComponent(ChannelMessageListener.class.getName(), new SecretiveStateCacheMessageListener(), ChannelMessageListener.class);
//cr.updateDependencies();
try
@@ -100,7 +100,7 @@
return Version.version;
}
- private static class SecretiveStateCacheMessageListener extends CacheMessageListener
+ private static class SecretiveStateCacheMessageListener extends ChannelMessageListener
{
@Override
public void setState(byte[] new_state)
More information about the jbosscache-commits
mailing list