[jbosscache-commits] JBoss Cache SVN: r7123 - in core/trunk/src:	test/java/org/jboss/cache/api/pfer and 9 other directories.
    jbosscache-commits at lists.jboss.org 
    jbosscache-commits at lists.jboss.org
       
    Wed Nov 12 12:47:13 EST 2008
    
    
  
Author: mircea.markus
Date: 2008-11-12 12:47:12 -0500 (Wed, 12 Nov 2008)
New Revision: 7123
Modified:
   core/trunk/src/main/java/org/jboss/cache/commands/remote/ReplicateCommand.java
   core/trunk/src/test/java/org/jboss/cache/api/pfer/PFERPessimisticTestBase.java
   core/trunk/src/test/java/org/jboss/cache/api/pfer/PutForExternalReadTestBase.java
   core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyReplicationTestsBase.java
   core/trunk/src/test/java/org/jboss/cache/buddyreplication/EmptyRegionTest.java
   core/trunk/src/test/java/org/jboss/cache/cluster/ReplicationQueueTxTest.java
   core/trunk/src/test/java/org/jboss/cache/eviction/ElementSizePolicyTest.java
   core/trunk/src/test/java/org/jboss/cache/lock/ReadWriteLockWithUpgradeTest.java
   core/trunk/src/test/java/org/jboss/cache/marshall/AsyncReplTest.java
   core/trunk/src/test/java/org/jboss/cache/marshall/CacheLoaderMarshallingJDBCTest.java
   core/trunk/src/test/java/org/jboss/cache/marshall/CustomCollectionTest.java
   core/trunk/src/test/java/org/jboss/cache/marshall/RegionBasedMarshallingTestBase.java
   core/trunk/src/test/java/org/jboss/cache/marshall/ReturnValueMarshallingTest.java
   core/trunk/src/test/java/org/jboss/cache/marshall/SyncReplTest.java
   core/trunk/src/test/java/org/jboss/cache/notifications/BuddyGroupChangeNotificationTest.java
   core/trunk/src/test/java/org/jboss/cache/optimistic/AbstractOptimisticTestCase.java
   core/trunk/src/test/java/org/jboss/cache/optimistic/AsyncFullStackInterceptorTest.java
   core/trunk/src/test/java/org/jboss/cache/passivation/ConcurrentPassivationTest.java
   core/trunk/src/test/java/org/jboss/cache/util/internals/ReplicationListener.java
Log:
fixing tests
Modified: core/trunk/src/main/java/org/jboss/cache/commands/remote/ReplicateCommand.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/commands/remote/ReplicateCommand.java	2008-11-12 05:03:13 UTC (rev 7122)
+++ core/trunk/src/main/java/org/jboss/cache/commands/remote/ReplicateCommand.java	2008-11-12 17:47:12 UTC (rev 7123)
@@ -1,24 +1,24 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
 package org.jboss.cache.commands.remote;
 
 import org.apache.commons.logging.Log;
@@ -264,15 +264,29 @@
    {
       if (isSingleCommand())
       {
-         return getSingleModification().getClass().equals(aClass);
+
+         return isCommandWithType(getSingleModification(), aClass);
       }
       else
       {
          for (ReplicableCommand command : getModifications())
          {
-            if (command.getClass().equals(aClass)) return true;
+            if (isCommandWithType(command, aClass)) return true;
          }
       }
       return false;
    }
+
+   private boolean isCommandWithType(ReplicableCommand command, Class<? extends ReplicableCommand> aClass)
+   {
+      if (command.getClass().equals(aClass)) return true;
+      if (command instanceof ReplicateCommand)
+      {
+         return ((ReplicateCommand) command).containsCommandType(aClass);
+      }
+      else
+      {
+         return false;
+      }
+   }
 }
\ No newline at end of file
Modified: core/trunk/src/test/java/org/jboss/cache/api/pfer/PFERPessimisticTestBase.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/api/pfer/PFERPessimisticTestBase.java	2008-11-12 05:03:13 UTC (rev 7122)
+++ core/trunk/src/test/java/org/jboss/cache/api/pfer/PFERPessimisticTestBase.java	2008-11-12 17:47:12 UTC (rev 7123)
@@ -2,6 +2,8 @@
 
 import org.jboss.cache.CacheSPI;
 import org.jboss.cache.Fqn;
+import org.jboss.cache.util.internals.ReplicationListener;
+import org.jboss.cache.commands.write.PutKeyValueCommand;
 import org.jboss.cache.config.Configuration.NodeLockingScheme;
 import org.jboss.cache.lock.NodeLock;
 import static org.testng.AssertJUnit.*;
@@ -38,8 +40,12 @@
    {
       PutForExternalReadTestBaseTL threadCfg = threadLocal.get();
       // create the parent node first ...
+      threadCfg.replListener2 = new ReplicationListener(threadCfg.cache2);
+      threadCfg.replListener2.smartExpect(PutKeyValueCommand.class, false);
       threadCfg.cache1.put(parentFqn, key, value);
+      threadCfg.replListener2.waitForReplicationToOccur(10000);
 
+      threadCfg.replListener2.smartExpect(PutKeyValueCommand.class, true);
       threadCfg.tm1.begin();
       threadCfg.cache1.put(parentFqn, key, value2);
 
@@ -59,7 +65,7 @@
       threadCfg.tm1.resume(t);
       threadCfg.tm1.commit();
 
-      asyncWait();
+      threadCfg.replListener2.waitForReplicationToOccur(1000);
 
       assertEquals("Parent node write should have succeeded", value2, threadCfg.cache1.get(parentFqn, key));
       if (isUsingInvalidation())
Modified: core/trunk/src/test/java/org/jboss/cache/api/pfer/PutForExternalReadTestBase.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/api/pfer/PutForExternalReadTestBase.java	2008-11-12 05:03:13 UTC (rev 7122)
+++ core/trunk/src/test/java/org/jboss/cache/api/pfer/PutForExternalReadTestBase.java	2008-11-12 17:47:12 UTC (rev 7123)
@@ -65,6 +65,7 @@
 
       tl.cache1 = (CacheSPI<String, String>) cf.createCache(UnitTestCacheConfigurationFactory.createConfiguration(cacheMode), false);
       tl.cache1.getConfiguration().setTransactionManagerLookupClass("org.jboss.cache.transaction.DummyTransactionManagerLookup");
+      tl.cache1.getConfiguration().setSerializationExecutorPoolSize(0);//this is very important for async tests!
       tl.cache1.getConfiguration().setNodeLockingScheme(nodeLockingScheme);
 
       tl.cache1.start();
@@ -72,6 +73,7 @@
 
       tl.cache2 = (CacheSPI<String, String>) cf.createCache(UnitTestCacheConfigurationFactory.createConfiguration(cacheMode), false);
       tl.cache2.getConfiguration().setTransactionManagerLookupClass("org.jboss.cache.transaction.DummyTransactionManagerLookup");
+      tl.cache2.getConfiguration().setSerializationExecutorPoolSize(0); //this is very important for async tests!
       tl.cache2.getConfiguration().setNodeLockingScheme(nodeLockingScheme);
 
       tl.cache2.start();
Modified: core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyReplicationTestsBase.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyReplicationTestsBase.java	2008-11-12 05:03:13 UTC (rev 7122)
+++ core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyReplicationTestsBase.java	2008-11-12 17:47:12 UTC (rev 7123)
@@ -319,6 +319,39 @@
       assertTrue(buddyLocalAddress + " should be a buddy to " + group.getGroupName(), group.getBuddies().contains(buddyLocalAddress));
    }
 
+   public void waitForBuddy(Cache dataOwner, Cache buddy, boolean onlyBuddy, long timeout) throws Exception
+   {
+      long start = System.currentTimeMillis();
+      while ((System.currentTimeMillis() - start) < timeout)
+      {
+         if (isBuddy(dataOwner, buddy, onlyBuddy)) return;
+         Thread.sleep(50);
+      }
+      //give it a last chance, just to have a nice printed message
+      assertIsBuddy(dataOwner, buddy, onlyBuddy);
+   }
+
+
+   private boolean isBuddy(Cache dataOwner, Cache buddy, boolean onlyBuddy)
+   {
+      Address dataOwnerLocalAddress = dataOwner.getLocalAddress();
+      Address buddyLocalAddress = buddy.getLocalAddress();
+      printBuddyGroup(dataOwner);
+      BuddyManager dataOwnerBuddyManager = ((CacheSPI) dataOwner).getBuddyManager();
+      BuddyManager buddyBuddyManager = ((CacheSPI) buddy).getBuddyManager();
+      boolean result = true;
+      // lets test things on the data owner's side of things
+      if (onlyBuddy) result = result && (1 == dataOwnerBuddyManager.getBuddyAddresses().size());
+      result = result && dataOwnerBuddyManager.getBuddyAddresses().contains(buddyLocalAddress);
+
+      // and now on the buddy end
+      BuddyGroup group = buddyBuddyManager.buddyGroupsIParticipateIn.get(dataOwnerLocalAddress);
+      result = result & buddyBuddyManager.buddyGroupsIParticipateIn.containsKey(dataOwnerLocalAddress);
+      if (onlyBuddy) result = result && group.getBuddies().size() == 1;
+      result = result & group!= null && group.getBuddies() != null && group.getBuddies().contains(buddyLocalAddress);
+      return result;
+   }
+
    protected void assertNoLocks(List<CacheSPI<Object, Object>> caches)
    {
       for (Cache cache : caches)
Modified: core/trunk/src/test/java/org/jboss/cache/buddyreplication/EmptyRegionTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/buddyreplication/EmptyRegionTest.java	2008-11-12 05:03:13 UTC (rev 7122)
+++ core/trunk/src/test/java/org/jboss/cache/buddyreplication/EmptyRegionTest.java	2008-11-12 17:47:12 UTC (rev 7123)
@@ -28,7 +28,7 @@
    Fqn regionFqn = Fqn.fromString("/a/b/c");
    Fqn region2Fqn = Fqn.fromString("/d/e/f");
    Region region, region2;
-   CountDownLatch buddyJoinLatch = new CountDownLatch(1);
+   CountDownLatch buddyJoinLatch = new CountDownLatch(2);
 
    @BeforeMethod
    public void setUp() throws Exception
@@ -62,6 +62,7 @@
       assert c1.getNode(regionFqn) == null : "Node should not exist";
       assert c1.getRegion(regionFqn, false) != null : "Region should exist";
       assert c1.getRegion(regionFqn, false).isActive() : "Region should be active";
+      c1.addCacheListener(new BuddyJoinListener());
 
       // now start c2
       c2.start();
@@ -84,6 +85,7 @@
       @BuddyGroupChanged
       public void buddyJoined(Event e)
       {
+         System.out.println("e = " + e);
          buddyJoinLatch.countDown();
       }
    }
Modified: core/trunk/src/test/java/org/jboss/cache/cluster/ReplicationQueueTxTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/cluster/ReplicationQueueTxTest.java	2008-11-12 05:03:13 UTC (rev 7122)
+++ core/trunk/src/test/java/org/jboss/cache/cluster/ReplicationQueueTxTest.java	2008-11-12 17:47:12 UTC (rev 7123)
@@ -5,14 +5,17 @@
 import org.jboss.cache.factories.UnitTestCacheConfigurationFactory;
 import org.jboss.cache.util.TestingUtil;
 import org.jboss.cache.util.internals.ReplicationQueueNotifier;
+import org.jboss.cache.util.internals.ReplicationListener;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import javax.transaction.TransactionManager;
 import org.jboss.cache.UnitTestCacheFactory;
+import org.jboss.cache.commands.write.PutKeyValueCommand;
+import org.jboss.cache.commands.tx.PrepareCommand;
 
- at Test(groups = {"functional", "transaction"}, sequential = true, testName = "cluster.ReplicationQueueTxTest")
+ at Test(groups = {"functional", "transaction"}, testName = "cluster.ReplicationQueueTxTest")
 public class ReplicationQueueTxTest
 {
    Cache cache, cache2;
@@ -43,23 +46,23 @@
 
    public void testTransactionalReplication() throws Exception
    {
+      ReplicationListener cache1Listener = new ReplicationListener(cache);
+      ReplicationListener cache2Listener = new ReplicationListener(cache2);
+
+      cache2Listener.expect(PutKeyValueCommand.class);
       // outside of tx scope
       cache.put("/a", "k", "v");
+      cache2Listener.waitForReplicationToOccur(5000);
 
-      ReplicationQueueNotifier replicationQueueNotifier = new ReplicationQueueNotifier(cache);
-      replicationQueueNotifier.waitUntillAllReplicated(200);
-
       assert cache2.get("/a", "k").equals("v");
 
       // now, a transactional call
+      cache1Listener.expect(PrepareCommand.class);
       txManager.begin();
       cache2.put("/a", "k", "v2");
       txManager.commit();
+      cache1Listener.waitForReplicationToOccur(5000);
 
-      ReplicationQueueNotifier replicationQueueNotifier2 = new ReplicationQueueNotifier(cache2);
-      replicationQueueNotifier2.waitUntillAllReplicated(200);
-
-
       assert cache.get("/a", "k").equals("v2");
    }
 }
Modified: core/trunk/src/test/java/org/jboss/cache/eviction/ElementSizePolicyTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/eviction/ElementSizePolicyTest.java	2008-11-12 05:03:13 UTC (rev 7122)
+++ core/trunk/src/test/java/org/jboss/cache/eviction/ElementSizePolicyTest.java	2008-11-12 17:47:12 UTC (rev 7123)
@@ -18,6 +18,7 @@
 import org.jboss.cache.transaction.DummyTransactionManagerLookup;
 import org.jboss.cache.util.TestingUtil;
 import org.jboss.cache.util.internals.EvictionWatcher;
+import org.jboss.cache.util.internals.EvictionController;
 import static org.testng.AssertJUnit.*;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -103,7 +104,9 @@
       }
 
       System.out.println(cache);
-      assert waitForEviction(cache, 30, TimeUnit.SECONDS, Fqn.fromString("/org/jboss/test/data/8")) : "Eviction event not received!";
+      EvictionController evController = new EvictionController(cache);
+      evController.startEviction();
+
       TestingUtil.sleepThread(200); // small grace period
       System.out.println(cache);
 
@@ -136,7 +139,7 @@
       List<Fqn> fqnsThatShouldBeEvicted = new ArrayList<Fqn>();
       for (int i = 10; i < 20; i++) fqnsThatShouldBeEvicted.add(Fqn.fromString("/org/jboss/data/" + i));
 
-      EvictionWatcher watcher = new EvictionWatcher(cache, fqnsThatShouldBeEvicted);
+      EvictionController evictionController = new EvictionController(cache);
       String rootStr = "/org/jboss/data/";
       for (int i = 0; i < 20; i++)
       {
@@ -149,7 +152,7 @@
          }
       }
 
-      assert watcher.waitForEviction(30, TimeUnit.SECONDS) : "Eviction events never received!";
+      evictionController.startEviction();
 
       for (int i = 0; i < 20; i++)
       {
Modified: core/trunk/src/test/java/org/jboss/cache/lock/ReadWriteLockWithUpgradeTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/lock/ReadWriteLockWithUpgradeTest.java	2008-11-12 05:03:13 UTC (rev 7122)
+++ core/trunk/src/test/java/org/jboss/cache/lock/ReadWriteLockWithUpgradeTest.java	2008-11-12 17:47:12 UTC (rev 7123)
@@ -25,7 +25,7 @@
  * @author <a href="mailto:cavin_song at yahoo.com">Cavin Song</a> April 22, 2004
  * @version 1.0
  */
- at Test(groups = { "functional" }, sequential = true, testName = "lock.ReadWriteLockWithUpgradeTest")
+ at Test(groups = { "functional" }, testName = "lock.ReadWriteLockWithUpgradeTest")
 public class ReadWriteLockWithUpgradeTest
 {
    static final ReadWriteLockWithUpgrade lock_ = new ReadWriteLockWithUpgrade();
Modified: core/trunk/src/test/java/org/jboss/cache/marshall/AsyncReplTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/marshall/AsyncReplTest.java	2008-11-12 05:03:13 UTC (rev 7122)
+++ core/trunk/src/test/java/org/jboss/cache/marshall/AsyncReplTest.java	2008-11-12 17:47:12 UTC (rev 7123)
@@ -42,7 +42,7 @@
  * @author Ben Wang
  * @version $Revision$
  */
- at Test(groups = {"functional", "jgroups"}, sequential = true, testName = "marshall.AsyncReplTest")
+ at Test(groups = {"functional", "jgroups"}, testName = "marshall.AsyncReplTest")
 public class AsyncReplTest extends RegionBasedMarshallingTestBase
 {
    CacheSPI<Object, Object> cache1, cache2;
@@ -284,7 +284,7 @@
       Fqn fqn = Fqn.fromRelativeElements(base, custom1);
       replListener2.expectAny();
       cache1.put(fqn, "key", "value");
-      replListener2.waitForReplicationToOccur(1000);
+      replListener2.waitForReplicationToOccur(10000);
 
       Fqn fqn2 = Fqn.fromRelativeElements(base, custom2);
       Object val = cache2.get(fqn2, "key");
Modified: core/trunk/src/test/java/org/jboss/cache/marshall/CacheLoaderMarshallingJDBCTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/marshall/CacheLoaderMarshallingJDBCTest.java	2008-11-12 05:03:13 UTC (rev 7122)
+++ core/trunk/src/test/java/org/jboss/cache/marshall/CacheLoaderMarshallingJDBCTest.java	2008-11-12 17:47:12 UTC (rev 7123)
@@ -27,7 +27,7 @@
  * @author <a href="mailto:brian.stansberry at jboss.org">Brian Stansberry</a>
  * @since 2.1.0
  */
- at Test(groups = "functional", sequential = true, testName = "marshall.CacheLoaderMarshallingJDBCTest")
+ at Test(groups = "functional", testName = "marshall.CacheLoaderMarshallingJDBCTest")
 public class CacheLoaderMarshallingJDBCTest extends RegionBasedMarshallingTestBase
 {
    private static final String className = "org.jboss.cache.marshall.MyUUID";
Modified: core/trunk/src/test/java/org/jboss/cache/marshall/CustomCollectionTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/marshall/CustomCollectionTest.java	2008-11-12 05:03:13 UTC (rev 7122)
+++ core/trunk/src/test/java/org/jboss/cache/marshall/CustomCollectionTest.java	2008-11-12 17:47:12 UTC (rev 7123)
@@ -24,7 +24,7 @@
  * @author <a href="mailto:galder.zamarreno at jboss.com">Galder Zamarreno</a>
  * @author <a href="mailto:manik AT jboss DOT org">Manik Surtani (manik AT jboss DOT org)</a>
  */
- at Test(groups = {"functional"}, sequential = true, testName = "marshall.CustomCollectionTest")
+ at Test(groups = {"functional"}, testName = "marshall.CustomCollectionTest")
 public class CustomCollectionTest extends RegionBasedMarshallingTestBase implements Serializable
 {
    private transient Cache<Object, Object> cache1 = null;
Modified: core/trunk/src/test/java/org/jboss/cache/marshall/RegionBasedMarshallingTestBase.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/marshall/RegionBasedMarshallingTestBase.java	2008-11-12 05:03:13 UTC (rev 7122)
+++ core/trunk/src/test/java/org/jboss/cache/marshall/RegionBasedMarshallingTestBase.java	2008-11-12 17:47:12 UTC (rev 7123)
@@ -4,7 +4,6 @@
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
- at Test(groups = {"functional"}, testName = "marshall.RegionBasedMarshallingTestBase")
 public abstract class RegionBasedMarshallingTestBase
 {
    protected static final String ADDRESS_CLASSNAME = "org.jboss.cache.marshall.data.Address";
Modified: core/trunk/src/test/java/org/jboss/cache/marshall/ReturnValueMarshallingTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/marshall/ReturnValueMarshallingTest.java	2008-11-12 05:03:13 UTC (rev 7122)
+++ core/trunk/src/test/java/org/jboss/cache/marshall/ReturnValueMarshallingTest.java	2008-11-12 17:47:12 UTC (rev 7123)
@@ -27,7 +27,7 @@
  * @author <a href="mailto:manik AT jboss DOT org">Manik Surtani</a>
  * @since 2.0.0
  */
- at Test(groups = {"functional", "jgroups"}, sequential = true, testName = "marshall.ReturnValueMarshallingTest")
+ at Test(groups = {"functional", "jgroups"}, testName = "marshall.ReturnValueMarshallingTest")
 public class ReturnValueMarshallingTest extends RegionBasedMarshallingTestBase
 {
    protected boolean useMarshalledValues = false;
Modified: core/trunk/src/test/java/org/jboss/cache/marshall/SyncReplTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/marshall/SyncReplTest.java	2008-11-12 05:03:13 UTC (rev 7122)
+++ core/trunk/src/test/java/org/jboss/cache/marshall/SyncReplTest.java	2008-11-12 17:47:12 UTC (rev 7123)
@@ -36,7 +36,7 @@
  * @author Ben Wang
  * @version $Revision$
  */
- at Test(groups = {"functional", "jgroups"}, sequential = true, testName = "marshall.SyncReplTest")
+ at Test(groups = {"functional", "jgroups"}, testName = "marshall.SyncReplTest")
 public class SyncReplTest extends RegionBasedMarshallingTestBase
 {
    
Modified: core/trunk/src/test/java/org/jboss/cache/notifications/BuddyGroupChangeNotificationTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/notifications/BuddyGroupChangeNotificationTest.java	2008-11-12 05:03:13 UTC (rev 7122)
+++ core/trunk/src/test/java/org/jboss/cache/notifications/BuddyGroupChangeNotificationTest.java	2008-11-12 17:47:12 UTC (rev 7123)
@@ -18,18 +18,16 @@
 import org.testng.annotations.Test;
 
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 /**
  * @author Manik Surtani (<a href="mailto:manik AT jboss DOT org">manik AT jboss DOT org</a>)
  * @since 2.1.0
  */
- at Test(groups = "functional", sequential = true, testName = "notifications.BuddyGroupChangeNotificationTest")
+ at Test(groups = "functional", testName = "notifications.BuddyGroupChangeNotificationTest")
 public class BuddyGroupChangeNotificationTest extends BuddyReplicationTestsBase
 {
    Cache c1, c2, c3;
-   Listener listener;
-   static CountDownLatch latch1 = new CountDownLatch(1);
-   static CountDownLatch latch2 = new CountDownLatch(1);
    static boolean stage2 = false;
    static boolean notificationsReceived = true;
 
@@ -42,7 +40,7 @@
       BuddyReplicationConfig brc = new BuddyReplicationConfig();
       brc.setEnabled(true);
       conf.setBuddyReplicationConfig(brc);
-      
+
       c1 = cf.createCache(conf, false);
       c2 = cf.createCache(conf.clone(), false);
       c3 = cf.createCache(conf.clone(), false);
@@ -53,17 +51,12 @@
 
       // make sure views are received and groups are formed first
       TestingUtil.blockUntilViewsReceived(60000, c1, c2, c3);
-
-      Cache[] caches = new Cache[]{c1, c2, c3};
-
-      listener = new Listener(caches);
-
-      c2.addCacheListener(listener);
    }
 
    @AfterMethod
    public void tearDown()
    {
+      System.out.println("***** BuddyGroupChangeNotificationTest.tearDown");
       TestingUtil.killCaches(c1, c2, c3);
       c1 = null;
       c2 = null;
@@ -71,67 +64,26 @@
    }
 
    @Test(timeOut = 60000)
-   public void testChangingGroups() throws InterruptedException
+   public void testChangingGroups() throws Exception
    {
       // initial state
-      assertIsBuddy(c1, c2, true);
-      assertIsBuddy(c2, c3, true);
-      assertIsBuddy(c3, c1, true);
+      waitForBuddy(c1, c2, true, 60000);
+      waitForBuddy(c2, c3, true, 60000);
+      waitForBuddy(c3, c1, true, 60000);
 
       // kill c3
       c3.stop();
-      latch1.await();
 
-      assertIsBuddy(c1, c2, true);
-      assertIsBuddy(c2, c1, true);
+      waitForBuddy(c1, c2, true, 60000);
+      waitForBuddy(c2, c1, true, 60000);
 
       stage2 = true;
       c3.start();
-      latch2.await();
 
-      assertIsBuddy(c1, c2, true);
-      assertIsBuddy(c2, c3, true);
-      assertIsBuddy(c3, c1, true);
+      waitForBuddy(c1, c2, true, 60000);
+      waitForBuddy(c2, c3, true, 60000);
+      waitForBuddy(c3, c1, true, 60000);
 
       assert notificationsReceived;
    }
-
-   @CacheListener
-   public static class Listener
-   {
-      Cache[] caches;
-      int numActiveCaches;
-
-      public Listener(Cache[] caches)
-      {
-         this.caches = caches;
-      }
-
-      @ViewChanged
-      public void viewChanged(ViewChangedEvent e)
-      {
-         numActiveCaches = e.getNewView().getMembers().size();
-      }
-
-      @BuddyGroupChanged
-      public void buddyChanged(BuddyGroupChangedEvent e)
-      {
-         System.out.println("Received event " + e);
-         if (!e.isPre())
-         {
-            BuddyGroup bg = e.getBuddyGroup();
-
-            boolean passed = bg.getDataOwner().equals(caches[1].getLocalAddress()) &&
-                  bg.getBuddies().size() == 1 &&
-                  bg.getBuddies().contains(caches[(numActiveCaches == 3) ? 2 : 0].getLocalAddress());
-
-            notificationsReceived = notificationsReceived && passed;
-
-            if (stage2)
-               latch2.countDown();
-            else
-               latch1.countDown();
-         }
-      }
-   }
 }
Modified: core/trunk/src/test/java/org/jboss/cache/optimistic/AbstractOptimisticTestCase.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/optimistic/AbstractOptimisticTestCase.java	2008-11-12 05:03:13 UTC (rev 7122)
+++ core/trunk/src/test/java/org/jboss/cache/optimistic/AbstractOptimisticTestCase.java	2008-11-12 17:47:12 UTC (rev 7123)
@@ -194,6 +194,7 @@
       c.setTransactionManagerLookupClass(TransactionSetup.getManagerLookup());
 
       CacheSPI<Object, Object> cache = (CacheSPI<Object, Object>) new UnitTestCacheFactory<Object, Object>().createCache(c, false);
+      cache.getConfiguration().setSerializationExecutorPoolSize(0);
       
       if (start)
       {
Modified: core/trunk/src/test/java/org/jboss/cache/optimistic/AsyncFullStackInterceptorTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/optimistic/AsyncFullStackInterceptorTest.java	2008-11-12 05:03:13 UTC (rev 7122)
+++ core/trunk/src/test/java/org/jboss/cache/optimistic/AsyncFullStackInterceptorTest.java	2008-11-12 17:47:12 UTC (rev 7123)
@@ -23,7 +23,7 @@
 /**
  * @author xenephon
  */
- at Test(groups = {"functional", "transaction", "optimistic"}, sequential = true, testName = "optimistic.AsyncFullStackInterceptorTest")
+ at Test(groups = {"functional", "transaction", "optimistic"}, testName = "optimistic.AsyncFullStackInterceptorTest")
 public class AsyncFullStackInterceptorTest extends AbstractOptimisticTestCase
 {
    private int groupIncreaser = 0;
Modified: core/trunk/src/test/java/org/jboss/cache/passivation/ConcurrentPassivationTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/passivation/ConcurrentPassivationTest.java	2008-11-12 05:03:13 UTC (rev 7122)
+++ core/trunk/src/test/java/org/jboss/cache/passivation/ConcurrentPassivationTest.java	2008-11-12 17:47:12 UTC (rev 7123)
@@ -27,7 +27,7 @@
  * @version $Revision$
  */
 
- at Test(groups = {"functional"}, sequential = true, testName = "passivation.ConcurrentPassivationTest")
+ at Test(groups = {"functional"}, testName = "passivation.ConcurrentPassivationTest")
 public class ConcurrentPassivationTest
 {
    private CacheSPI cache;
Modified: core/trunk/src/test/java/org/jboss/cache/util/internals/ReplicationListener.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/util/internals/ReplicationListener.java	2008-11-12 05:03:13 UTC (rev 7122)
+++ core/trunk/src/test/java/org/jboss/cache/util/internals/ReplicationListener.java	2008-11-12 17:47:12 UTC (rev 7123)
@@ -4,9 +4,14 @@
 import org.jboss.cache.Fqn;
 import org.jboss.cache.InvocationContext;
 import org.jboss.cache.RPCManager;
+import org.jboss.cache.config.Configuration;
 import org.jboss.cache.commands.ReplicableCommand;
+import org.jboss.cache.commands.WriteCommand;
+import org.jboss.cache.commands.legacy.write.*;
+import org.jboss.cache.commands.write.*;
 import org.jboss.cache.commands.remote.ReplicateCommand;
 import org.jboss.cache.commands.tx.PrepareCommand;
+import org.jboss.cache.commands.tx.CommitCommand;
 import org.jboss.cache.factories.ComponentRegistry;
 import org.jboss.cache.io.ByteBuffer;
 import org.jboss.cache.marshall.AbstractMarshaller;
@@ -21,10 +26,7 @@
 import java.io.InputStream;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Set;
+import java.util.*;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -52,6 +54,19 @@
 {
    private CountDownLatch latch = new CountDownLatch(1);
    private Set<Class<? extends ReplicableCommand>> expectedCommands;
+   private Configuration config;
+   private static Map <Class<? extends WriteCommand>, Class<? extends WriteCommand>> mvcc2PessMap =
+         new HashMap<Class<? extends WriteCommand>, Class<? extends WriteCommand>>();
+   static
+   {
+      mvcc2PessMap.put(ClearDataCommand.class, PessClearDataCommand.class);
+      mvcc2PessMap.put(MoveCommand.class, PessMoveCommand.class);
+      mvcc2PessMap.put(PutDataMapCommand.class, PessPutDataMapCommand.class);
+      mvcc2PessMap.put(PutForExternalReadCommand.class, PessPutForExternalReadCommand.class);
+      mvcc2PessMap.put(PutKeyValueCommand.class, PessPutKeyValueCommand.class);
+      mvcc2PessMap.put(RemoveKeyCommand.class, PessRemoveKeyCommand.class);
+      mvcc2PessMap.put(RemoveNodeCommand.class, PessRemoveNodeCommand.class);
+   }
 
    /**
     * Builds a listener that will observe the given cache for recieving replication commands.
@@ -63,7 +78,7 @@
       CommandAwareRpcDispatcher realDispatcher = (CommandAwareRpcDispatcher) TestingUtil.extractField(rpcManager, "rpcDispatcher");
       RpcDispatcher.Marshaller2 realMarshaller = (RpcDispatcher.Marshaller2) realDispatcher.getMarshaller();
       RpcDispatcher.Marshaller2 delegate = null;
-      if ( (realMarshaller instanceof RegionMarshallerDelegate) || (realMarshaller instanceof MarshallerDelegate) )
+      if ((realMarshaller instanceof RegionMarshallerDelegate) || (realMarshaller instanceof MarshallerDelegate))
       {
          throw new RuntimeException("Illegal state");
       }
@@ -74,8 +89,44 @@
       realDispatcher.setMarshaller(delegate);
       realDispatcher.setRequestMarshaller(delegate);
       realDispatcher.setResponseMarshaller(delegate);
+      this.config = cache.getConfiguration();
    }
 
+   /**
+    * Based on cache's configuration, will know for what specific commands to expect to be replicated.
+    * E.g. async replication with a tx, would expect only a PrepareCommand (async is 1PC). sync repl with tx would expect
+    * a prepare and a commit (sync is 2pc).
+    * @param inTx do you expect replication to occur as result of a tx.commit?
+    */
+   public void smartExpect(Class<? extends WriteCommand> writeCommand, boolean inTx)
+   {
+      if (config.getCacheMode().equals(Configuration.CacheMode.INVALIDATION_ASYNC) || config.getCacheMode().equals(Configuration.CacheMode.INVALIDATION_SYNC))
+      {
+         expect(InvalidateCommand.class);
+         return;
+      }
+      if (inTx)
+      {
+         expect(PrepareCommand.class);
+         if (config.getCacheMode().isSynchronous())
+         {
+            expect(CommitCommand.class);
+         }
+         return;
+      }
+      if (config.getNodeLockingScheme().equals(Configuration.NodeLockingScheme.PESSIMISTIC))
+      {
+         expect(getPessCommand(writeCommand));
+      }
+   }
+
+   private Class<? extends ReplicableCommand> getPessCommand(Class<? extends WriteCommand> writeCommand)
+   {
+      Class<? extends ReplicableCommand> result = mvcc2PessMap.get(writeCommand);
+      if (result == null) throw new IllegalStateException("Unknown command: " + writeCommand);
+      return result;
+   }
+
    private class MarshallerDelegate implements RpcDispatcher.Marshaller2
    {
       RpcDispatcher.Marshaller2 marshaller;
    
    
More information about the jbosscache-commits
mailing list