[jbosscache-commits] JBoss Cache SVN: r5623 - in core/trunk/src: main/java/org/jboss/cache/factories and 4 other directories.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Tue Apr 22 12:09:06 EDT 2008


Author: mircea.markus
Date: 2008-04-22 12:09:06 -0400 (Tue, 22 Apr 2008)
New Revision: 5623

Added:
   core/trunk/src/main/java/org/jboss/cache/remoting/jgroups/ChannelMessageListener.java
Removed:
   core/trunk/src/main/java/org/jboss/cache/remoting/jgroups/CacheMessageListener.java
Modified:
   core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
   core/trunk/src/main/java/org/jboss/cache/factories/EmptyConstructorFactory.java
   core/trunk/src/main/java/org/jboss/cache/marshall/CacheMarshaller200.java
   core/trunk/src/test/java/org/jboss/cache/marshall/ActiveInactiveTest.java
   core/trunk/src/test/java/org/jboss/cache/marshall/AsyncReplTest.java
   core/trunk/src/test/java/org/jboss/cache/statetransfer/FailedStateTransferTest.java
Log:
JBCACHE-1222 - bug fixing - marshalling

Modified: core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java	2008-04-22 16:03:18 UTC (rev 5622)
+++ core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java	2008-04-22 16:09:06 UTC (rev 5623)
@@ -22,7 +22,7 @@
 import org.jboss.cache.marshall.Marshaller;
 import org.jboss.cache.marshall.MethodCallWrapper;
 import org.jboss.cache.notifications.Notifier;
-import org.jboss.cache.remoting.jgroups.CacheMessageListener;
+import org.jboss.cache.remoting.jgroups.ChannelMessageListener;
 import org.jboss.cache.statetransfer.StateTransferManager;
 import org.jboss.cache.transaction.GlobalTransaction;
 import org.jboss.cache.transaction.TransactionTable;
@@ -81,7 +81,7 @@
    /**
     * JGroups message listener.
     */
-   private CacheMessageListener messageListener;
+   private ChannelMessageListener messageListener;
    private Configuration configuration;
    private Notifier notifier;
    private CacheSPI spi;
@@ -97,7 +97,7 @@
    private boolean isInLocalMode;
 
    @Inject
-   private void setupDependencies(CacheMessageListener messageListener, Configuration configuration, Notifier notifier,
+   private void setupDependencies(ChannelMessageListener messageListener, Configuration configuration, Notifier notifier,
                                   CacheSPI spi, Marshaller marshaller, TransactionTable txTable,
                                   TransactionManager txManager, InvocationContextContainer container, InterceptorChain interceptorChain,
                                     CacheLifecycleManager lifecycleManager)
@@ -439,6 +439,7 @@
 
       useOutOfBandMessage = false;
 
+      //todo check whether we can get rid of the MethodCallWrapper and use the command directly
       rsps = responseFilter == null
             ? disp.callRemoteMethods(validMembers, new MethodCallWrapper(command), modeToUse, timeout, isUsingBuddyReplication, useOutOfBandMessage)
             : disp.callRemoteMethods(validMembers, new MethodCallWrapper(command), modeToUse, timeout, isUsingBuddyReplication, useOutOfBandMessage, responseFilter);

Modified: core/trunk/src/main/java/org/jboss/cache/factories/EmptyConstructorFactory.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/factories/EmptyConstructorFactory.java	2008-04-22 16:03:18 UTC (rev 5622)
+++ core/trunk/src/main/java/org/jboss/cache/factories/EmptyConstructorFactory.java	2008-04-22 16:09:06 UTC (rev 5623)
@@ -13,7 +13,7 @@
 import org.jboss.cache.marshall.Marshaller;
 import org.jboss.cache.marshall.VersionAwareMarshaller;
 import org.jboss.cache.notifications.Notifier;
-import org.jboss.cache.remoting.jgroups.CacheMessageListener;
+import org.jboss.cache.remoting.jgroups.ChannelMessageListener;
 import org.jboss.cache.statetransfer.StateTransferManager;
 import org.jboss.cache.transaction.TransactionTable;
 
@@ -24,7 +24,7 @@
  * @since 2.1.0
  */
 @DefaultFactoryFor(classes = {StateTransferManager.class, TransactionTable.class, RegionManager.class, Notifier.class,
-      CacheMessageListener.class, CacheLoaderManager.class, Marshaller.class,
+      ChannelMessageListener.class, CacheLoaderManager.class, Marshaller.class,
       InvocationContextContainer.class, CacheInvocationDelegate.class,
       CacheTransactionHelper.class, CacheData.class, CommandsFactory.class, LockManager.class})
 public class EmptyConstructorFactory extends ComponentFactory

Modified: core/trunk/src/main/java/org/jboss/cache/marshall/CacheMarshaller200.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/CacheMarshaller200.java	2008-04-22 16:03:18 UTC (rev 5622)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/CacheMarshaller200.java	2008-04-22 16:09:06 UTC (rev 5623)
@@ -92,9 +92,9 @@
             region = rrv.region;
             o = rrv.returnValue;
          }
-         else if (o instanceof MarshallableCommand)
+         else if (o instanceof MethodCallWrapper)
          {
-            MarshallableCommand marshallableCommand = (MarshallableCommand) o;
+            MarshallableCommand marshallableCommand = ((MethodCallWrapper) o).getCommand();
             region = extractFqnRegion(marshallableCommand);
          }
 

Deleted: core/trunk/src/main/java/org/jboss/cache/remoting/jgroups/CacheMessageListener.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/remoting/jgroups/CacheMessageListener.java	2008-04-22 16:03:18 UTC (rev 5622)
+++ core/trunk/src/main/java/org/jboss/cache/remoting/jgroups/CacheMessageListener.java	2008-04-22 16:09:06 UTC (rev 5623)
@@ -1,376 +0,0 @@
-package org.jboss.cache.remoting.jgroups;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.jboss.cache.CacheException;
-import org.jboss.cache.Fqn;
-import org.jboss.cache.config.Configuration;
-import org.jboss.cache.factories.annotations.Inject;
-import org.jboss.cache.statetransfer.StateTransferManager;
-import org.jboss.cache.util.ExposedByteArrayOutputStream;
-import org.jboss.util.stream.MarshalledValueInputStream;
-import org.jboss.util.stream.MarshalledValueOutputStream;
-import org.jgroups.ExtendedMessageListener;
-import org.jgroups.Message;
-import org.jgroups.util.Util;
-
-import java.io.ByteArrayInputStream;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-/**
- * JGroups MessageListener
- *
- * @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
- * @since 2.1.0
- */
-public class CacheMessageListener implements ExtendedMessageListener
-{
-   /**
-    * Reference to an exception that was raised during
-    * state installation on this node.
-    */
-   protected volatile Exception setStateException;
-   private final Object stateLock = new Object();
-   private Log log = LogFactory.getLog(CacheMessageListener.class);
-   private StateTransferManager stateTransferManager;
-   private Configuration configuration;
-   /**
-    * True if state was initialized during start-up.
-    */
-   private volatile boolean isStateSet = false;
-
-   @Inject
-   private void injectDependencies(StateTransferManager stateTransferManager, Configuration configuration)
-   {
-      this.stateTransferManager = stateTransferManager;
-      this.configuration = configuration;
-   }
-
-   public boolean isStateSet()
-   {
-      return isStateSet;
-   }
-
-   public void setStateSet(boolean stateSet)
-   {
-      isStateSet = stateSet;
-   }
-
-   public void waitForState() throws Exception
-   {
-      synchronized (stateLock)
-      {
-         while (!isStateSet)
-         {
-            if (setStateException != null)
-            {
-               throw setStateException;
-            }
-
-            try
-            {
-               stateLock.wait();
-            }
-            catch (InterruptedException iex)
-            {
-            }
-         }
-      }
-   }
-
-   protected void stateReceivedSuccess()
-   {
-      isStateSet = true;
-      setStateException = null;
-   }
-
-   protected void stateReceivingFailed(Throwable t)
-   {
-      if (t instanceof CacheException)
-      {
-         log.debug(t);
-      }
-      else
-      {
-         log.error("failed setting state", t);
-      }
-      if (t instanceof Exception)
-      {
-         setStateException = (Exception) t;
-      }
-      else
-      {
-         setStateException = new Exception(t);
-      }
-   }
-
-   protected void stateProducingFailed(Throwable t)
-   {
-      if (t instanceof CacheException)
-      {
-         log.debug(t);
-      }
-      else
-      {
-         log.error("Caught " + t.getClass().getName()
-               + " while responding to state transfer request", t);
-      }
-   }
-
-   /**
-    * Callback, does nothing.
-    */
-   public void receive(Message msg)
-   {
-   }
-
-   public byte[] getState()
-   {
-      MarshalledValueOutputStream out = null;
-      byte[] result = null;
-      ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream(16 * 1024);
-      try
-      {
-         out = new MarshalledValueOutputStream(baos);
-
-         stateTransferManager.getState(out, Fqn.ROOT, configuration.getStateRetrievalTimeout(), true, true);
-      }
-      catch (Throwable t)
-      {
-         stateProducingFailed(t);
-      }
-      finally
-      {
-         result = baos.getRawBuffer();
-         Util.close(out);
-      }
-      return result;
-   }
-
-   public void setState(byte[] new_state)
-   {
-      if (new_state == null)
-      {
-         log.debug("transferred state is null (may be first member in cluster)");
-         return;
-      }
-      ByteArrayInputStream bais = new ByteArrayInputStream(new_state);
-      MarshalledValueInputStream in = null;
-      try
-      {
-         in = new MarshalledValueInputStream(bais);
-         stateTransferManager.setState(in, Fqn.ROOT);
-         stateReceivedSuccess();
-      }
-      catch (Throwable t)
-      {
-         stateReceivingFailed(t);
-      }
-      finally
-      {
-         Util.close(in);
-         synchronized (stateLock)
-         {
-            // Notify wait that state has been set.
-            stateLock.notifyAll();
-         }
-      }
-   }
-
-   public byte[] getState(String state_id)
-   {
-      MarshalledValueOutputStream out = null;
-      String sourceRoot = state_id;
-      byte[] result = null;
-
-      boolean hasDifferentSourceAndIntegrationRoots = state_id.indexOf(StateTransferManager.PARTIAL_STATE_DELIMITER) > 0;
-      if (hasDifferentSourceAndIntegrationRoots)
-      {
-         sourceRoot = state_id.split(StateTransferManager.PARTIAL_STATE_DELIMITER)[0];
-      }
-
-      ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream(16 * 1024);
-      try
-      {
-         out = new MarshalledValueOutputStream(baos);
-
-         stateTransferManager.getState(out, Fqn.fromString(sourceRoot),
-               configuration.getStateRetrievalTimeout(), true, true);
-      }
-      catch (Throwable t)
-      {
-         stateProducingFailed(t);
-      }
-      finally
-      {
-         result = baos.getRawBuffer();
-         Util.close(out);
-      }
-      return result;
-   }
-
-   public void getState(OutputStream ostream)
-   {
-      MarshalledValueOutputStream out = null;
-      try
-      {
-         out = new MarshalledValueOutputStream(ostream);
-         stateTransferManager.getState(out, Fqn.ROOT, configuration.getStateRetrievalTimeout(), true, true);
-      }
-      catch (Throwable t)
-      {
-         stateProducingFailed(t);
-      }
-      finally
-      {
-         Util.close(out);
-      }
-   }
-
-   public void getState(String state_id, OutputStream ostream)
-   {
-      String sourceRoot = state_id;
-      MarshalledValueOutputStream out = null;
-      boolean hasDifferentSourceAndIntegrationRoots = state_id.indexOf(StateTransferManager.PARTIAL_STATE_DELIMITER) > 0;
-      if (hasDifferentSourceAndIntegrationRoots)
-      {
-         sourceRoot = state_id.split(StateTransferManager.PARTIAL_STATE_DELIMITER)[0];
-      }
-      try
-      {
-         out = new MarshalledValueOutputStream(ostream);
-         stateTransferManager.getState(out, Fqn.fromString(sourceRoot), configuration.getStateRetrievalTimeout(), true, true);
-      }
-      catch (Throwable t)
-      {
-         stateProducingFailed(t);
-      }
-      finally
-      {
-         Util.close(out);
-      }
-   }
-
-   public void setState(InputStream istream)
-   {
-      if (istream == null)
-      {
-         log.debug("stream is null (may be first member in cluster)");
-         return;
-      }
-      MarshalledValueInputStream in = null;
-      try
-      {
-         in = new MarshalledValueInputStream(istream);
-         stateTransferManager.setState(in, Fqn.ROOT);
-         stateReceivedSuccess();
-      }
-      catch (Throwable t)
-      {
-         stateReceivingFailed(t);
-      }
-      finally
-      {
-         Util.close(in);
-         synchronized (stateLock)
-         {
-            // Notify wait that state has been set.
-            stateLock.notifyAll();
-         }
-      }
-   }
-
-   public void setState(String state_id, byte[] state)
-   {
-      if (state == null)
-      {
-         log.debug("partial transferred state is null");
-         return;
-      }
-
-      MarshalledValueInputStream in = null;
-      String targetRoot = state_id;
-      boolean hasDifferentSourceAndIntegrationRoots = state_id.indexOf(StateTransferManager.PARTIAL_STATE_DELIMITER) > 0;
-      if (hasDifferentSourceAndIntegrationRoots)
-      {
-         targetRoot = state_id.split(StateTransferManager.PARTIAL_STATE_DELIMITER)[1];
-      }
-      try
-      {
-         log.debug("Setting received partial state for subroot " + state_id);
-         Fqn subroot = Fqn.fromString(targetRoot);
-//            Region region = regionManager.getRegion(subroot, false);
-//            ClassLoader cl = null;
-//            if (region != null)
-//            {
-//               // If a classloader is registered for the node's region, use it
-//               cl = region.getClassLoader();
-//            }
-         ByteArrayInputStream bais = new ByteArrayInputStream(state);
-         in = new MarshalledValueInputStream(bais);
-         //getStateTransferManager().setState(in, subroot, cl);
-         stateTransferManager.setState(in, subroot);
-         stateReceivedSuccess();
-      }
-      catch (Throwable t)
-      {
-         stateReceivingFailed(t);
-      }
-      finally
-      {
-         Util.close(in);
-         synchronized (stateLock)
-         {
-            // Notify wait that state has been set.
-            stateLock.notifyAll();
-         }
-      }
-   }
-
-   public void setState(String state_id, InputStream istream)
-   {
-      String targetRoot = state_id;
-      MarshalledValueInputStream in = null;
-      boolean hasDifferentSourceAndIntegrationRoots = state_id.indexOf(StateTransferManager.PARTIAL_STATE_DELIMITER) > 0;
-      if (hasDifferentSourceAndIntegrationRoots)
-      {
-         targetRoot = state_id.split(StateTransferManager.PARTIAL_STATE_DELIMITER)[1];
-      }
-      if (istream == null)
-      {
-         log.debug("stream is null (may be first member in cluster). State is not set");
-         return;
-      }
-
-      try
-      {
-         log.debug("Setting received partial state for subroot " + state_id);
-         in = new MarshalledValueInputStream(istream);
-         Fqn subroot = Fqn.fromString(targetRoot);
-//            Region region = regionManager.getRegion(subroot, false);
-//            ClassLoader cl = null;
-//            if (region != null)
-//            {
-//               // If a classloader is registered for the node's region, use it
-//               cl = region.getClassLoader();
-//            }
-         //getStateTransferManager().setState(in, subroot, cl);
-         stateTransferManager.setState(in, subroot);
-         stateReceivedSuccess();
-      }
-      catch (Throwable t)
-      {
-         stateReceivingFailed(t);
-      }
-      finally
-      {
-         Util.close(in);
-         synchronized (stateLock)
-         {
-            // Notify wait that state has been set.
-            stateLock.notifyAll();
-         }
-      }
-   }
-}

Copied: core/trunk/src/main/java/org/jboss/cache/remoting/jgroups/ChannelMessageListener.java (from rev 5617, core/trunk/src/main/java/org/jboss/cache/remoting/jgroups/CacheMessageListener.java)
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/remoting/jgroups/ChannelMessageListener.java	                        (rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/remoting/jgroups/ChannelMessageListener.java	2008-04-22 16:09:06 UTC (rev 5623)
@@ -0,0 +1,377 @@
+package org.jboss.cache.remoting.jgroups;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.cache.CacheException;
+import org.jboss.cache.Fqn;
+import org.jboss.cache.config.Configuration;
+import org.jboss.cache.factories.annotations.Inject;
+import org.jboss.cache.statetransfer.StateTransferManager;
+import org.jboss.cache.util.ExposedByteArrayOutputStream;
+import org.jboss.util.stream.MarshalledValueInputStream;
+import org.jboss.util.stream.MarshalledValueOutputStream;
+import org.jgroups.ExtendedMessageListener;
+import org.jgroups.Message;
+import org.jgroups.util.Util;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * JGroups MessageListener
+ *
+ * @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
+ * @since 2.1.0
+ */
+public class ChannelMessageListener implements ExtendedMessageListener
+{
+   /**
+    * Reference to an exception that was raised during
+    * state installation on this node.
+    */
+   protected volatile Exception setStateException;
+   private final Object stateLock = new Object();
+   private Log log = LogFactory.getLog(ChannelMessageListener.class);
+   private StateTransferManager stateTransferManager;
+   private Configuration configuration;
+   /**
+    * True if state was initialized during start-up.
+    */
+   private volatile boolean isStateSet = false;
+
+   @Inject
+   private void injectDependencies(StateTransferManager stateTransferManager, Configuration configuration)
+   {
+      this.stateTransferManager = stateTransferManager;
+      this.configuration = configuration;
+   }
+
+   public boolean isStateSet()
+   {
+      return isStateSet;
+   }
+
+   public void setStateSet(boolean stateSet)
+   {
+      isStateSet = stateSet;
+   }
+
+   public void waitForState() throws Exception
+   {
+      synchronized (stateLock)
+      {
+         while (!isStateSet)
+         {
+            if (setStateException != null)
+            {
+               throw setStateException;
+            }
+
+            try
+            {
+               stateLock.wait();
+            }
+            catch (InterruptedException iex)
+            {
+            }
+         }
+      }
+   }
+
+   protected void stateReceivedSuccess()
+   {
+      isStateSet = true;
+      setStateException = null;
+   }
+
+   protected void stateReceivingFailed(Throwable t)
+   {
+      if (t instanceof CacheException)
+      {
+         log.debug(t);
+      }
+      else
+      {
+         log.error("failed setting state", t);
+      }
+      if (t instanceof Exception)
+      {
+         setStateException = (Exception) t;
+      }
+      else
+      {
+         setStateException = new Exception(t);
+      }
+   }
+
+   protected void stateProducingFailed(Throwable t)
+   {
+      if (t instanceof CacheException)
+      {
+         log.debug(t);
+      }
+      else
+      {
+         log.error("Caught " + t.getClass().getName()
+               + " while responding to state transfer request", t);
+      }
+   }
+
+   /**
+    * Callback, does nothing.
+    */
+   public void receive(Message msg)
+   {
+   }
+
+   public byte[] getState()
+   {
+      MarshalledValueOutputStream out = null;
+      byte[] result = null;
+      ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream(16 * 1024);
+      try
+      {
+         out = new MarshalledValueOutputStream(baos);
+
+         stateTransferManager.getState(out, Fqn.ROOT, configuration.getStateRetrievalTimeout(), true, true);
+      }
+      catch (Throwable t)
+      {
+         stateProducingFailed(t);
+      }
+      finally
+      {
+         result = baos.getRawBuffer();
+         Util.close(out);
+      }
+      return result;
+   }
+
+   public void setState(byte[] new_state)
+   {
+      if (new_state == null)
+      {
+         log.debug("transferred state is null (may be first member in cluster)");
+         return;
+      }
+      ByteArrayInputStream bais = new ByteArrayInputStream(new_state);
+      MarshalledValueInputStream in = null;
+      try
+      {
+         in = new MarshalledValueInputStream(bais);
+         stateTransferManager.setState(in, Fqn.ROOT);
+         stateReceivedSuccess();
+      }
+      catch (Throwable t)
+      {
+         stateReceivingFailed(t);
+      }
+      finally
+      {
+         Util.close(in);
+         synchronized (stateLock)
+         {
+            // Notify wait that state has been set.
+            stateLock.notifyAll();
+         }
+      }
+   }
+
+   public byte[] getState(String state_id)
+   {
+      MarshalledValueOutputStream out = null;
+      String sourceRoot = state_id;
+      byte[] result = null;
+
+      boolean hasDifferentSourceAndIntegrationRoots = state_id.indexOf(StateTransferManager.PARTIAL_STATE_DELIMITER) > 0;
+      if (hasDifferentSourceAndIntegrationRoots)
+      {
+         sourceRoot = state_id.split(StateTransferManager.PARTIAL_STATE_DELIMITER)[0];
+      }
+
+      ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream(16 * 1024);
+      try
+      {
+         out = new MarshalledValueOutputStream(baos);
+
+         stateTransferManager.getState(out, Fqn.fromString(sourceRoot),
+               configuration.getStateRetrievalTimeout(), true, true);
+      }
+      catch (Throwable t)
+      {
+         stateProducingFailed(t);
+      }
+      finally
+      {
+         result = baos.getRawBuffer();
+         Util.close(out);
+      }
+      return result;
+   }
+
+   public void getState(OutputStream ostream)
+   {
+      MarshalledValueOutputStream out = null;
+      try
+      {
+         out = new MarshalledValueOutputStream(ostream);
+         stateTransferManager.getState(out, Fqn.ROOT, configuration.getStateRetrievalTimeout(), true, true);
+      }
+      catch (Throwable t)
+      {
+         stateProducingFailed(t);
+      }
+      finally
+      {
+         Util.close(out);
+      }
+   }
+
+   public void getState(String state_id, OutputStream ostream)
+   {
+      String sourceRoot = state_id;
+      MarshalledValueOutputStream out = null;
+      boolean hasDifferentSourceAndIntegrationRoots = state_id.indexOf(StateTransferManager.PARTIAL_STATE_DELIMITER) > 0;
+      if (hasDifferentSourceAndIntegrationRoots)
+      {
+         sourceRoot = state_id.split(StateTransferManager.PARTIAL_STATE_DELIMITER)[0];
+      }
+      try
+      {
+         out = new MarshalledValueOutputStream(ostream);
+         stateTransferManager.getState(out, Fqn.fromString(sourceRoot), configuration.getStateRetrievalTimeout(), true, true);
+      }
+      catch (Throwable t)
+      {
+         stateProducingFailed(t);
+      }
+      finally
+      {
+         Util.close(out);
+      }
+   }
+
+   public void setState(InputStream istream)
+   {
+      if (istream == null)
+      {
+         log.debug("stream is null (may be first member in cluster)");
+         return;
+      }
+      MarshalledValueInputStream in = null;
+      try
+      {
+         in = new MarshalledValueInputStream(istream);
+         stateTransferManager.setState(in, Fqn.ROOT);
+         stateReceivedSuccess();
+      }
+      catch (Throwable t)
+      {
+         stateReceivingFailed(t);
+      }
+      finally
+      {
+         Util.close(in);
+         synchronized (stateLock)
+         {
+            // Notify wait that state has been set.
+            stateLock.notifyAll();
+         }
+      }
+   }
+
+   public void setState(String state_id, byte[] state)
+   {
+      if (state == null)
+      {
+         log.debug("partial transferred state is null");
+         return;
+      }
+
+      MarshalledValueInputStream in = null;
+      String targetRoot = state_id;
+      boolean hasDifferentSourceAndIntegrationRoots = state_id.indexOf(StateTransferManager.PARTIAL_STATE_DELIMITER) > 0;
+      if (hasDifferentSourceAndIntegrationRoots)
+      {
+         targetRoot = state_id.split(StateTransferManager.PARTIAL_STATE_DELIMITER)[1];
+      }
+      try
+      {
+         log.debug("Setting received partial state for subroot " + state_id);
+         Fqn subroot = Fqn.fromString(targetRoot);
+//            Region region = regionManager.getRegion(subroot, false);
+//            ClassLoader cl = null;
+//            if (region != null)
+//            {
+//               // If a classloader is registered for the node's region, use it
+//               cl = region.getClassLoader();
+//            }
+         ByteArrayInputStream bais = new ByteArrayInputStream(state);
+         in = new MarshalledValueInputStream(bais);
+         //getStateTransferManager().setState(in, subroot, cl);
+         stateTransferManager.setState(in, subroot);
+         stateReceivedSuccess();
+      }
+      catch (Throwable t)
+      {
+         stateReceivingFailed(t);
+      }
+      finally
+      {
+         Util.close(in);
+         synchronized (stateLock)
+         {
+            // Notify wait that state has been set.
+            stateLock.notifyAll();
+         }
+      }
+   }
+
+   public void setState(String stateId, InputStream istream)
+   {
+      if (log.isTraceEnabled()) log.trace("**** Receiving state for " + stateId);
+      String targetRoot = stateId;
+      MarshalledValueInputStream in = null;
+      boolean hasDifferentSourceAndIntegrationRoots = stateId.indexOf(StateTransferManager.PARTIAL_STATE_DELIMITER) > 0;
+      if (hasDifferentSourceAndIntegrationRoots)
+      {
+         targetRoot = stateId.split(StateTransferManager.PARTIAL_STATE_DELIMITER)[1];
+      }
+      if (istream == null)
+      {
+         log.debug("stream is null (may be first member in cluster). State is not set");
+         return;
+      }
+
+      try
+      {
+         log.debug("Setting received partial state for subroot " + stateId);
+         in = new MarshalledValueInputStream(istream);
+         Fqn subroot = Fqn.fromString(targetRoot);
+//            Region region = regionManager.getRegion(subroot, false);
+//            ClassLoader cl = null;
+//            if (region != null)
+//            {
+//               // If a classloader is registered for the node's region, use it
+//               cl = region.getClassLoader();
+//            }
+         //getStateTransferManager().setState(in, subroot, cl);
+         stateTransferManager.setState(in, subroot);
+         stateReceivedSuccess();
+      }
+      catch (Throwable t)
+      {
+         stateReceivingFailed(t);
+      }
+      finally
+      {
+         Util.close(in);
+         synchronized (stateLock)
+         {
+            // Notify wait that state has been set.
+            stateLock.notifyAll();
+         }
+      }
+   }
+}

Modified: core/trunk/src/test/java/org/jboss/cache/marshall/ActiveInactiveTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/marshall/ActiveInactiveTest.java	2008-04-22 16:03:18 UTC (rev 5622)
+++ core/trunk/src/test/java/org/jboss/cache/marshall/ActiveInactiveTest.java	2008-04-22 16:09:06 UTC (rev 5623)
@@ -153,7 +153,7 @@
    public void testObjectFromByteBuffer() throws Exception
    {
       PutKeyValueCommand put = new PutKeyValueCommand(null, A_B, "name", "Joe", false, false);
-      ReplicateCommand replicate = new ReplicateCommand(put);
+      MethodCallWrapper replicate = new MethodCallWrapper(new ReplicateCommand(put));
 
       rman.setDefaultInactive(true);
       // register A as an inactive marshalling region

Modified: core/trunk/src/test/java/org/jboss/cache/marshall/AsyncReplTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/marshall/AsyncReplTest.java	2008-04-22 16:03:18 UTC (rev 5622)
+++ core/trunk/src/test/java/org/jboss/cache/marshall/AsyncReplTest.java	2008-04-22 16:09:06 UTC (rev 5623)
@@ -43,9 +43,9 @@
 {
    CacheSPI<Object, Object> cache1, cache2;
    String props = null;
-   Person ben_;
-   Address addr_;
-   Throwable ex_;
+   Person ben;
+   Address addr;
+   Throwable ex;
    private Fqn<String> aop = Fqn.fromString("/aop");
    protected boolean useMarshalledValues = false;
 
@@ -60,11 +60,11 @@
 
       cache2 = createCache("TestCache");
 
-      addr_ = new Address();
-      addr_.setCity("San Jose");
-      ben_ = new Person();
-      ben_.setName("Ben");
-      ben_.setAddress(addr_);
+      addr = new Address();
+      addr.setCity("San Jose");
+      ben = new Person();
+      ben.setName("Ben");
+      ben.setAddress(addr);
 
       // Pause to give caches time to see each other
       TestingUtil.blockUntilViewsReceived(new CacheSPI[]{cache1, cache2}, 60000);
@@ -109,8 +109,8 @@
       }
 
       if (useMarshalledValues) Thread.currentThread().setContextClassLoader(cla);
-      cache1.put(aop, "person", ben_);
-      cache1.put(Fqn.fromString("/alias"), "person", ben_);
+      cache1.put(aop, "person", ben);
+      cache1.put(Fqn.fromString("/alias"), "person", ben);
       if (useMarshalledValues) resetContextClassLoader();
 
       TestingUtil.sleepThread(1000);
@@ -120,7 +120,7 @@
       ben2 = cache2.get(aop, "person");
       if (useMarshalledValues) resetContextClassLoader();
       assertNotNull(ben2);
-      assertEquals(ben_.toString(), ben2.toString());
+      assertEquals(ben.toString(), ben2.toString());
 
       Class<?> claz = clb.loadClass(ADDRESS_CLASSNAME);
       Object add = claz.newInstance();
@@ -161,7 +161,7 @@
       Object scopedBen2 = getPersonFromClassloader(clb);
 
       if (useMarshalledValues) Thread.currentThread().setContextClassLoader(cla);
-      cache1.put(Fqn.fromString("/aop/1"), "person", ben_);
+      cache1.put(Fqn.fromString("/aop/1"), "person", ben);
       cache1.put(Fqn.fromString("/aop/2"), "person", scopedBen1);
       if (useMarshalledValues) resetContextClassLoader();
       TestingUtil.sleepThread(1000);
@@ -170,7 +170,7 @@
       // Can't cast it to Person. CCE will resutl.
       if (useMarshalledValues) Thread.currentThread().setContextClassLoader(clb);
       ben2 = cache2.get(Fqn.fromString("/aop/1"), "person");
-      assertEquals(ben_.toString(), ben2.toString());
+      assertEquals(ben.toString(), ben2.toString());
 
       ben2 = cache2.get(Fqn.fromString("/aop/2"), "person");
       assertFalse("cache2 deserialized with scoped classloader", ben2 instanceof Person);
@@ -181,13 +181,13 @@
    public void testTxPut() throws Exception
    {
       beginTransaction();
-      cache1.put(aop, "person", ben_);
-      cache1.put(aop, "person1", ben_);
+      cache1.put(aop, "person", ben);
+//      cache1.put(aop, "person1", ben);
       commit();
       TestingUtil.sleepThread(1000);
       Person ben2 = (Person) cache2.get(aop, "person");
       assertNotNull("Person from 2nd cache should not be null ", ben2);
-      assertEquals(ben_.toString(), ben2.toString());
+      assertEquals(ben.toString(), ben2.toString());
    }
 
    public void testTxCLSet2() throws Exception
@@ -205,7 +205,7 @@
 
       if (useMarshalledValues) Thread.currentThread().setContextClassLoader(cla);
       beginTransaction();
-      cache1.put(aop, "person", ben_);
+      cache1.put(aop, "person", ben);
       commit();
       if (useMarshalledValues) resetContextClassLoader();
 
@@ -216,7 +216,7 @@
       if (useMarshalledValues) Thread.currentThread().setContextClassLoader(clb);
       ben2 = cache2.get(aop, "person");
       if (useMarshalledValues) resetContextClassLoader();
-      assertEquals(ben_.toString(), ben2.toString());
+      assertEquals(ben.toString(), ben2.toString());
 
       Class<?> claz = clb.loadClass(ADDRESS_CLASSNAME);
       Object add = claz.newInstance();

Modified: core/trunk/src/test/java/org/jboss/cache/statetransfer/FailedStateTransferTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/statetransfer/FailedStateTransferTest.java	2008-04-22 16:03:18 UTC (rev 5622)
+++ core/trunk/src/test/java/org/jboss/cache/statetransfer/FailedStateTransferTest.java	2008-04-22 16:09:06 UTC (rev 5623)
@@ -31,7 +31,7 @@
 import org.jboss.cache.factories.UnitTestCacheConfigurationFactory;
 import org.jboss.cache.lock.TimeoutException;
 import org.jboss.cache.misc.TestingUtil;
-import org.jboss.cache.remoting.jgroups.CacheMessageListener;
+import org.jboss.cache.remoting.jgroups.ChannelMessageListener;
 import static org.testng.AssertJUnit.fail;
 import org.testng.annotations.Test;
 
@@ -61,8 +61,8 @@
 
       // inject our own message listener and re-wire deps
       ComponentRegistry cr = TestingUtil.extractComponentRegistry(cache);
-//      cr.unregisterComponent(CacheMessageListener.class);
-      cr.registerComponent(CacheMessageListener.class.getName(), new SecretiveStateCacheMessageListener(), CacheMessageListener.class);
+//      cr.unregisterComponent(ChannelMessageListener.class);
+      cr.registerComponent(ChannelMessageListener.class.getName(), new SecretiveStateCacheMessageListener(), ChannelMessageListener.class);
 //      cr.updateDependencies();
 
       cache.start();
@@ -80,8 +80,8 @@
 
       // inject our own message listener and re-wire deps
       cr = TestingUtil.extractComponentRegistry(recipient);
-      //cr.unregisterComponent(CacheMessageListener.class);
-      cr.registerComponent(CacheMessageListener.class.getName(), new SecretiveStateCacheMessageListener(), CacheMessageListener.class);
+      //cr.unregisterComponent(ChannelMessageListener.class);
+      cr.registerComponent(ChannelMessageListener.class.getName(), new SecretiveStateCacheMessageListener(), ChannelMessageListener.class);
       //cr.updateDependencies();
 
       try
@@ -100,7 +100,7 @@
       return Version.version;
    }
 
-   private static class SecretiveStateCacheMessageListener extends CacheMessageListener
+   private static class SecretiveStateCacheMessageListener extends ChannelMessageListener
    {
       @Override
       public void setState(byte[] new_state)




More information about the jbosscache-commits mailing list