[infinispan-commits] Infinispan SVN: r2217 - in trunk/core/src: main/java/org/infinispan/remoting/rpc and 1 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Fri Aug 13 10:37:11 EDT 2010


Author: galder.zamarreno at jboss.com
Date: 2010-08-13 10:37:11 -0400 (Fri, 13 Aug 2010)
New Revision: 2217

Added:
   trunk/core/src/test/java/org/infinispan/statetransfer/StateTransferReplicationQueueTest.java
Modified:
   trunk/core/src/main/java/org/infinispan/remoting/ReplicationQueue.java
   trunk/core/src/main/java/org/infinispan/remoting/rpc/ResponseMode.java
   trunk/core/src/main/java/org/infinispan/remoting/rpc/RpcManagerImpl.java
Log:
[ISPN-577] (State transfer and replication queue ordering issue) Fixed.

Modified: trunk/core/src/main/java/org/infinispan/remoting/ReplicationQueue.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/ReplicationQueue.java	2010-08-13 14:28:54 UTC (rev 2216)
+++ trunk/core/src/main/java/org/infinispan/remoting/ReplicationQueue.java	2010-08-13 14:37:11 UTC (rev 2217)
@@ -144,7 +144,7 @@
             log.trace("Flushing {0} elements", toReplicateSize);
             MultipleRpcCommand multipleRpcCommand = commandsFactory.buildReplicateCommand(toReplicate);
             // send to all live caches in the cluster
-            rpcManager.invokeRemotely(null, multipleRpcCommand, ResponseMode.ASYNCHRONOUS, configuration.getSyncReplTimeout());
+            rpcManager.invokeRemotely(null, multipleRpcCommand, ResponseMode.getAsyncResponseMode(configuration), configuration.getSyncReplTimeout());
          }
          catch (Throwable t) {
             log.error("failed replicating " + toReplicate.size() + " elements in replication queue", t);

Modified: trunk/core/src/main/java/org/infinispan/remoting/rpc/ResponseMode.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/rpc/ResponseMode.java	2010-08-13 14:28:54 UTC (rev 2216)
+++ trunk/core/src/main/java/org/infinispan/remoting/rpc/ResponseMode.java	2010-08-13 14:37:11 UTC (rev 2217)
@@ -1,5 +1,7 @@
 package org.infinispan.remoting.rpc;
 
+import org.infinispan.config.Configuration;
+
 /**
  * Represents different handling mechanisms when dealing with remote command responses. 
  * These include waiting for responses from all nodes in the cluster ({@link ResponseMode#SYNCHRONOUS}}),
@@ -19,4 +21,9 @@
    public boolean isAsynchronous() {
       return this == ASYNCHRONOUS || this == ASYNCHRONOUS_WITH_SYNC_MARSHALLING;
    }
+
+   public static ResponseMode getAsyncResponseMode(Configuration c) {
+      return c.isUseAsyncMarshalling() ? ResponseMode.ASYNCHRONOUS : ResponseMode.ASYNCHRONOUS_WITH_SYNC_MARSHALLING;
+   }
+
 }

Modified: trunk/core/src/main/java/org/infinispan/remoting/rpc/RpcManagerImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/rpc/RpcManagerImpl.java	2010-08-13 14:28:54 UTC (rev 2216)
+++ trunk/core/src/main/java/org/infinispan/remoting/rpc/RpcManagerImpl.java	2010-08-13 14:37:11 UTC (rev 2217)
@@ -261,7 +261,7 @@
    }
 
    private ResponseMode getResponseMode(boolean sync) {
-      return sync ? ResponseMode.SYNCHRONOUS : configuration.isUseAsyncMarshalling() ? ResponseMode.ASYNCHRONOUS : ResponseMode.ASYNCHRONOUS_WITH_SYNC_MARSHALLING;
+      return sync ? ResponseMode.SYNCHRONOUS : ResponseMode.getAsyncResponseMode(configuration);
    }
 
    /**

Added: trunk/core/src/test/java/org/infinispan/statetransfer/StateTransferReplicationQueueTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/statetransfer/StateTransferReplicationQueueTest.java	                        (rev 0)
+++ trunk/core/src/test/java/org/infinispan/statetransfer/StateTransferReplicationQueueTest.java	2010-08-13 14:37:11 UTC (rev 2217)
@@ -0,0 +1,218 @@
+package org.infinispan.statetransfer;
+
+import org.infinispan.Cache;
+import org.infinispan.config.Configuration;
+import org.infinispan.config.GlobalConfiguration;
+import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.test.MultipleCacheManagersTest;
+import org.infinispan.test.TestingUtil;
+import org.infinispan.statetransfer.StateTransferFunctionalTest.*;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+import org.testng.annotations.Test;
+
+import javax.transaction.TransactionManager;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.lang.reflect.Method;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * State transfer and replication queue test veryfying that the interaction between them two works in fine.
+ *
+ * @author Galder Zamarre�o
+ * @since 4.1
+ */
+ at Test(groups = "functional", testName = "statetransfer.StateTransferReplicationQueueTest", enabled = true)
+public class StateTransferReplicationQueueTest extends MultipleCacheManagersTest {
+
+   public static final String A_B_NAME = "a_b_name";
+   public static final String A_C_NAME = "a_c_name";
+   public static final String A_B_AGE = "a_b_age";
+   public static final String A_C_AGE = "a_c_age";
+   public static final String JOE = "JOE";
+   public static final String BOB = "BOB";
+   public static final Integer TWENTY = 20;
+   public static final Integer FORTY = 40;
+   
+   private final String cacheName = "nbst-replqueue";
+
+   Configuration config;
+
+   protected void createCacheManagers() throws Throwable {
+      // This impl only really sets up a configuration for use later.
+      config = getDefaultClusteredConfig(Configuration.CacheMode.REPL_ASYNC, true);
+      config.setUseReplQueue(true);
+      config.setReplQueueInterval(100, TimeUnit.MILLISECONDS);
+      config.setReplQueueMaxElements(100);
+      config.setUseAsyncMarshalling(false);
+      config.setFetchInMemoryState(true);
+      config.setUseLockStriping(false); // reduces the odd chance of a key collision and deadlock
+   }
+
+   protected EmbeddedCacheManager createCacheManager() {
+      EmbeddedCacheManager cm = addClusterEnabledCacheManager();
+      GlobalConfiguration gc = cm.getGlobalConfiguration();
+      Properties p = new Properties();
+      p.setProperty("maxThreads", "25");
+      gc.setAsyncTransportExecutorProperties(p);
+      cm.defineConfiguration(cacheName, config.clone());
+      return cm;
+   }
+
+   protected void writeInitialData(final Cache<Object, Object> c) {
+      c.put(A_B_NAME, JOE);
+      c.put(A_B_AGE, TWENTY);
+      c.put(A_C_NAME, BOB);
+      c.put(A_C_AGE, FORTY);
+   }
+
+   protected void verifyInitialData(Cache<Object, Object> c) {
+      assert JOE.equals(c.get(A_B_NAME)) : "Incorrect value for key " + A_B_NAME;
+      assert TWENTY.equals(c.get(A_B_AGE)) : "Incorrect value for key " + A_B_AGE;
+      assert BOB.equals(c.get(A_C_NAME)) : "Incorrect value for key " + A_C_NAME;
+      assert FORTY.equals(c.get(A_C_AGE)) : "Incorrect value for key " + A_C_AGE;
+   }
+
+   /**
+    * In particular, this test focuses on checking that ordering is maintained when multiple operations are executed
+    * on the same key in a asynchronous environment with async marshalling turned off.
+    */
+   public void testStateTransferWithNodeRestartedAndBusy(Method m) throws Exception {
+      log.info(m.getName() + " start");
+      thirdWritingCacheTest(false);
+      log.info(m.getName() + "end");
+   }
+
+   private void thirdWritingCacheTest(boolean tx) throws InterruptedException {
+      Cache<Object, Object> cache1, cache3;
+      cache1 = createCacheManager().getCache(cacheName);
+      EmbeddedCacheManager manager3 = createCacheManager();
+      cache3 = manager3.getCache(cacheName);
+
+      writeInitialData(cache1);
+
+      WritingThread writerThread = new WritingThread(cache1, tx);
+      writerThread.start();
+
+      manager3.stop();
+
+      // Pause for view to update
+      TestingUtil.blockUntilViewsReceived(60000, cache1);
+
+      cache3 = createCacheManager().getCache(cacheName);
+
+      // Pause to give caches time to see each other
+      TestingUtil.blockUntilViewsReceived(60000, cache1, cache3);
+
+      writerThread.stopThread();
+      writerThread.join(60000);
+
+      verifyInitialData(cache3);
+
+      int count = writerThread.result();
+
+      // Since this is async, sleep a bit to allow any ongoing repls to go through
+      TestingUtil.sleepThread(5000);
+
+      for (int c = 0; c < count; c++) {
+         Object o = cache3.get("test" + c);
+         // Nothing should be left after a put/remove on a key
+         assert o == null;
+      }
+   }
+
+
+   private static class WritingThread extends Thread {
+      private final Cache<Object, Object> cache;
+      private final boolean tx;
+      private volatile boolean stop;
+      private volatile int result;
+      private TransactionManager tm;
+
+      WritingThread(Cache<Object, Object> cache, boolean tx) {
+         super("WriterThread");
+         this.cache = cache;
+         this.tx = tx;
+         if (tx) tm = TestingUtil.getTransactionManager(cache);
+         setDaemon(true);
+      }
+
+      public int result() {
+         return result;
+      }
+
+      public void run() {
+         int c = 0;
+         while (!stop) {
+            try {
+               if (tx) tm.begin();
+               cache.put("test" + c, new PojoValue(c));
+               cache.remove("test" + c);
+               c++;
+               if (tx) tm.commit();
+               if (c % 1000 == 0) TestingUtil.sleepThread(1); // Slow it down a bit
+            }
+            catch (Exception e) {
+               stopThread();
+            }
+         }
+         result = c;
+      }
+
+      public void stopThread() {
+         stop = true;
+      }
+   }
+
+   private static class PojoValue implements Externalizable {
+      Log log = LogFactory.getLog(PojoValue.class);
+      static AtomicBoolean holdUp = new AtomicBoolean();
+      volatile int value;
+
+      public PojoValue(int value) {
+         this.value = value;
+      }
+
+      @Override
+      public void writeExternal(ObjectOutput out) throws IOException {
+         String threadName = Thread.currentThread().getName();
+         if (!holdUp.get() && threadName.contains("STREAMING_STATE_TRANSFER-sender")) {
+            log.debug("In streaming...");
+            holdUp.compareAndSet(false, true);
+            log.debug("Holding up...");
+            TestingUtil.sleepThread(2000); // Sleep for 2 seconds to hold up state transfer
+         }
+
+         out.writeInt(value);
+      }
+
+      @Override
+      public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+         value = in.readInt();
+      }
+
+      @Override
+      public int hashCode() {
+         return value + 31;
+      }
+
+      @Override
+      public boolean equals(Object o) {
+         if (this == o) return true;
+         if (o == null || getClass() != o.getClass()) return false;
+
+         PojoValue pojo = (PojoValue) o;
+         if (value != pojo.value) return false;
+         return true;
+      }
+   }
+   
+
+}



More information about the infinispan-commits mailing list