[jbosscache-commits] JBoss Cache SVN: r7608 - in core/branches/flat/src: main/java/org/horizon/remoting and 6 other directories.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Wed Jan 28 18:43:57 EST 2009


Author: manik.surtani at jboss.com
Date: 2009-01-28 18:43:56 -0500 (Wed, 28 Jan 2009)
New Revision: 7608

Added:
   core/branches/flat/src/test/java/org/horizon/atomic/AtomicHashMapTestAssertions.java
   core/branches/flat/src/test/java/org/horizon/atomic/ClusteredAPITest.java
Modified:
   core/branches/flat/src/main/java/org/horizon/batch/BatchContainer.java
   core/branches/flat/src/main/java/org/horizon/remoting/RPCManagerImpl.java
   core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/JGroupsTransport.java
   core/branches/flat/src/test/java/org/horizon/BaseClusteredTest.java
   core/branches/flat/src/test/java/org/horizon/api/mvcc/PutForExternalReadTest.java
   core/branches/flat/src/test/java/org/horizon/api/tree/NodeAPITest.java
   core/branches/flat/src/test/java/org/horizon/atomic/APITest.java
   core/branches/flat/src/test/java/org/horizon/replication/AsyncReplTest.java
   core/branches/flat/src/test/java/org/horizon/replication/ExceptionTest.java
   core/branches/flat/src/test/java/org/horizon/replication/ReplicationExceptionTest.java
   core/branches/flat/src/test/java/org/horizon/replication/SyncCacheListenerTest.java
   core/branches/flat/src/test/java/org/horizon/replication/SyncReplTest.java
Log:


Modified: core/branches/flat/src/main/java/org/horizon/batch/BatchContainer.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/batch/BatchContainer.java	2009-01-28 18:27:32 UTC (rev 7607)
+++ core/branches/flat/src/main/java/org/horizon/batch/BatchContainer.java	2009-01-28 23:43:56 UTC (rev 7608)
@@ -61,9 +61,9 @@
    public boolean startBatch(boolean autoBatch) throws CacheException {
       BatchDetails bd = batchDetails.get();
       try {
-         bd.invocationCount++;
          if (transactionManager.getTransaction() == null && bd.tx == null) {
             transactionManager.begin();
+            bd.invocationCount = 1;
             bd.suspendTxAfterInvocation = !autoBatch;
 
             // do not suspend if this is from an AutoBatch!
@@ -73,8 +73,10 @@
                bd.tx = transactionManager.suspend();
 
             return true;
-         }
-         return false;
+         } else {
+            bd.invocationCount++;
+            return false;
+         }         
       }
       catch (Exception e) {
          throw new CacheException("Unable to start batch", e);

Modified: core/branches/flat/src/main/java/org/horizon/remoting/RPCManagerImpl.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/RPCManagerImpl.java	2009-01-28 18:27:32 UTC (rev 7607)
+++ core/branches/flat/src/main/java/org/horizon/remoting/RPCManagerImpl.java	2009-01-28 23:43:56 UTC (rev 7608)
@@ -121,7 +121,7 @@
    }
 
    // mainly for unit testing
-   public void setTransport(Transport transport) {
-      this.t = transport;
+   public void setTransport(Transport t) {
+      this.t = t;
    }
 }

Modified: core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/JGroupsTransport.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/JGroupsTransport.java	2009-01-28 18:27:32 UTC (rev 7607)
+++ core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/JGroupsTransport.java	2009-01-28 23:43:56 UTC (rev 7608)
@@ -289,7 +289,7 @@
          boolean needNotification = false;
          if (newMembers != null) {
 
-            // TODO: Implement breaking stale locks for dead members
+            // TODO: Implement breaking stale locks for dead members.  This should be in the TxINterceptor or TransactionTable, with a listener on the cache manager.
 //            if (members != null) {
             // we had a membership list before this event.  Check to make sure we haven't lost any members,
             // and if so, determine what members have been removed

Modified: core/branches/flat/src/test/java/org/horizon/BaseClusteredTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/BaseClusteredTest.java	2009-01-28 18:27:32 UTC (rev 7607)
+++ core/branches/flat/src/test/java/org/horizon/BaseClusteredTest.java	2009-01-28 23:43:56 UTC (rev 7608)
@@ -18,6 +18,7 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
+import java.util.ArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -64,6 +65,16 @@
       }
    }
 
+   protected List<Cache> createClusteredCaches(int numMembersInCluster, String cacheName, Configuration c) {
+      List<Cache> caches = new ArrayList<Cache>(numMembersInCluster);
+      for (int i=0; i<numMembersInCluster; i++) {
+         CacheManager cm = addCacheManager();
+         cm.defineCache(cacheName, c);
+         caches.add(cm.getCache(cacheName));
+      }
+      return caches;
+   }
+
    protected void assertClusterSize(String message, int size) {
       for (CacheManager cm : cacheManagerThreadLocal.get()) {
          assert cm.getMembers() != null && cm.getMembers().size() == size : message;
@@ -115,7 +126,7 @@
       }
 
       public void waitForReplication() {
-         waitForReplication(600, TimeUnit.SECONDS);
+         waitForReplication(120, TimeUnit.SECONDS);
       }
 
       public void waitForReplication(long time, TimeUnit unit) {

Modified: core/branches/flat/src/test/java/org/horizon/api/mvcc/PutForExternalReadTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/api/mvcc/PutForExternalReadTest.java	2009-01-28 18:27:32 UTC (rev 7607)
+++ core/branches/flat/src/test/java/org/horizon/api/mvcc/PutForExternalReadTest.java	2009-01-28 23:43:56 UTC (rev 7608)
@@ -8,11 +8,13 @@
 import org.horizon.commands.write.PutKeyValueCommand;
 import org.horizon.commands.write.RemoveCommand;
 import org.horizon.config.Configuration;
-import org.horizon.factories.ComponentRegistry;
 import org.horizon.manager.CacheManager;
 import org.horizon.remoting.RPCManager;
+import org.horizon.remoting.RPCManagerImpl;
+import org.horizon.remoting.ResponseFilter;
 import org.horizon.remoting.ResponseMode;
 import org.horizon.remoting.transport.Address;
+import org.horizon.remoting.transport.Transport;
 import org.horizon.transaction.DummyTransactionManagerLookup;
 import org.horizon.transaction.TransactionTable;
 import org.horizon.util.TestingUtil;
@@ -22,6 +24,7 @@
 
 import javax.transaction.Transaction;
 import javax.transaction.TransactionManager;
+import java.util.ArrayList;
 import java.util.List;
 
 @Test(groups = "functional", sequential = true)
@@ -90,28 +93,34 @@
    }
 
    public void testAsyncForce() throws Exception {
-      RPCManager rpcManager = EasyMock.createNiceMock(RPCManager.class);
-      RPCManager originalRpcManager = cache1.getConfiguration().getRuntimeConfig().getRPCManager();
-      List<Address> memberList = originalRpcManager.getMembers();
-      expect(rpcManager.getMembers()).andReturn(memberList).anyTimes();
-      // inject a mock RPC manager so that we can test whether calls made are sync or async.
-      ComponentRegistry cr = TestingUtil.extractComponentRegistry(cache1);
-      cr.registerComponent(rpcManager, RPCManager.class);
-      cr.rewire();
+      Transport mockTransport = createNiceMock(Transport.class);
+      RPCManagerImpl rpcManager = (RPCManagerImpl) TestingUtil.extractComponent(cache1, RPCManager.class);
+      Transport originalTransport = TestingUtil.extractComponent(cache1, Transport.class);
+      try {
 
-      // specify what we expectWithTx called on the mock Rpc Manager.  For params we don't care about, just use ANYTHING.
-      // setting the mock object to expectWithTx the "sync" param to be false.
-      expect(rpcManager.invokeRemotely(anyAddresses(), (RPCCommand) anyObject(), eq(ResponseMode.ASYNCHRONOUS), anyLong(), anyBoolean())).andReturn(null);
+         Address mockAddress1 = createNiceMock(Address.class);
+         Address mockAddress2 = createNiceMock(Address.class);
 
-      replay(rpcManager);
+         List<Address> memberList = new ArrayList<Address>(2);
+         memberList.add(mockAddress1);
+         memberList.add(mockAddress2);
 
-      // now try a simple replication.  Since the RPCManager is a mock object it will not actually replicate anything.
-      cache1.putForExternalRead(key, value);
-      verify(rpcManager);
+         expect(mockTransport.getMembers()).andReturn(memberList).anyTimes();
+         rpcManager.setTransport(mockTransport);
+         // specify what we expectWithTx called on the mock Rpc Manager.  For params we don't care about, just use ANYTHING.
+         // setting the mock object to expectWithTx the "sync" param to be false.
+         expect(mockTransport.invokeRemotely((List<Address>) anyObject(), (RPCCommand) anyObject(),
+                                             eq(ResponseMode.ASYNCHRONOUS), anyLong(), anyBoolean(), (ResponseFilter) isNull())).andReturn(null);
 
-      // cleanup
-      TestingUtil.extractComponentRegistry(cache1).registerComponent(originalRpcManager, RPCManager.class);
-      cache1.remove(key);
+         replay(mockAddress1, mockAddress2, mockTransport);
+
+         // now try a simple replication.  Since the RPCManager is a mock object it will not actually replicate anything.
+         cache1.putForExternalRead(key, value);
+         verify(mockTransport);
+
+      } finally {
+         if (rpcManager != null) rpcManager.setTransport(originalTransport);
+      }
    }
 
    public void testTxSuspension() throws Exception {
@@ -139,19 +148,28 @@
    }
 
    public void testExceptionSuppression() throws Exception {
-      RPCManager barfingRpcManager = EasyMock.createNiceMock(RPCManager.class);
-      RPCManager originalRpcManager = cache1.getConfiguration().getRuntimeConfig().getRPCManager();
+      Transport mockTransport = createNiceMock(Transport.class);
+      RPCManagerImpl rpcManager = (RPCManagerImpl) TestingUtil.extractComponent(cache1, RPCManager.class);
+      Transport originalTransport = TestingUtil.extractComponent(cache1, Transport.class);
       try {
-         List<Address> memberList = originalRpcManager.getMembers();
-         expect(barfingRpcManager.getMembers()).andReturn(memberList).anyTimes();
-         expect(barfingRpcManager.getAddress()).andReturn(originalRpcManager.getAddress()).anyTimes();
-         expect(barfingRpcManager.invokeRemotely(anyAddresses(), (RPCCommand) anyObject(), anyResponseMode(), anyLong(), anyBoolean())).andThrow(new RuntimeException("Barf!")).anyTimes();
-         replay(barfingRpcManager);
 
-         TestingUtil.extractComponentRegistry(cache1).registerComponent(barfingRpcManager, RPCManager.class);
-         cache1.getConfiguration().getRuntimeConfig().setRPCManager(barfingRpcManager);
-         TestingUtil.extractComponentRegistry(cache1).rewire();
+         Address mockAddress1 = createNiceMock(Address.class);
+         Address mockAddress2 = createNiceMock(Address.class);
 
+         List<Address> memberList = new ArrayList<Address>(2);
+         memberList.add(mockAddress1);
+         memberList.add(mockAddress2);
+
+         expect(mockTransport.getMembers()).andReturn(memberList).anyTimes();
+         rpcManager.setTransport(mockTransport);
+
+
+         expect(mockTransport.invokeRemotely(anyAddresses(), (RPCCommand) anyObject(), anyResponseMode(),
+                                             anyLong(), anyBoolean(), (ResponseFilter) anyObject()))
+               .andThrow(new RuntimeException("Barf!")).anyTimes();
+         
+         replay(mockTransport);
+
          try {
             cache1.put(key, value);
             fail("Should have barfed");
@@ -173,7 +191,7 @@
          cache1.putForExternalRead(key, value);
       }
       finally {
-         TestingUtil.extractComponentRegistry(cache1).registerComponent(originalRpcManager, RPCManager.class);
+         if (rpcManager != null) rpcManager.setTransport(originalTransport);
       }
    }
 

Modified: core/branches/flat/src/test/java/org/horizon/api/tree/NodeAPITest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/api/tree/NodeAPITest.java	2009-01-28 18:27:32 UTC (rev 7607)
+++ core/branches/flat/src/test/java/org/horizon/api/tree/NodeAPITest.java	2009-01-28 23:43:56 UTC (rev 7608)
@@ -3,7 +3,6 @@
 import org.horizon.config.Configuration;
 import org.horizon.manager.CacheManager;
 import org.horizon.manager.DefaultCacheManager;
-import org.horizon.transaction.DummyTransactionManager;
 import org.horizon.transaction.DummyTransactionManagerLookup;
 import org.horizon.tree.Fqn;
 import org.horizon.tree.Node;
@@ -11,6 +10,7 @@
 import org.horizon.tree.TreeCacheImpl;
 import org.horizon.util.TestingUtil;
 import static org.testng.AssertJUnit.*;
+import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
@@ -44,6 +44,11 @@
       tm = TestingUtil.getTransactionManager(cache.getCache());
    }
 
+   @AfterMethod
+   public void tearDown() {
+      TestingUtil.killCaches(cache.getCache());
+   }
+
    public void testAddingData() {
       Node<Object, Object> rootNode = cache.getRoot();
       Node<Object, Object> nodeA = rootNode.addChild(A);
@@ -322,11 +327,9 @@
    }
 
    public void testDoubleRemovalOfData() throws Exception {
-
-
-      assert DummyTransactionManager.getInstance().getTransaction() == null;
+      assert tm.getTransaction() == null;
       cache.put("/foo/1/2/3", "item", 1);
-      assert DummyTransactionManager.getInstance().getTransaction() == null;
+      assert tm.getTransaction() == null;
       assert 1 == (Integer) cache.get("/foo/1/2/3", "item");
       tm.begin();
       assert 1 == (Integer) cache.get("/foo/1/2/3", "item");

Modified: core/branches/flat/src/test/java/org/horizon/atomic/APITest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/atomic/APITest.java	2009-01-28 18:27:32 UTC (rev 7607)
+++ core/branches/flat/src/test/java/org/horizon/atomic/APITest.java	2009-01-28 23:43:56 UTC (rev 7608)
@@ -21,213 +21,144 @@
  */
 package org.horizon.atomic;
 
+import static org.horizon.atomic.AtomicHashMapTestAssertions.assertIsEmpty;
+import static org.horizon.atomic.AtomicHashMapTestAssertions.assertIsEmptyMap;
 import org.horizon.config.Configuration;
-import org.horizon.config.Configuration.CacheMode;
 import org.horizon.manager.DefaultCacheManager;
-import org.horizon.transaction.DummyTransactionManager;
 import org.horizon.transaction.DummyTransactionManagerLookup;
 import org.horizon.util.TestingUtil;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.AfterTest;
+import org.testng.annotations.BeforeTest;
 import org.testng.annotations.Test;
 
+import javax.transaction.SystemException;
 import javax.transaction.Transaction;
-import java.util.Map;
+import javax.transaction.TransactionManager;
 
- at Test(groups = "functional")
+ at Test(groups = "functional", sequential = true)
 public class APITest {
 
-   private void assertIsEmpty(Map map) {
-      assert map.size() == 0;
-      assert map.get("blah") == null;
-      assert !map.containsKey("blah");
+   AtomicMapCache cache;
+   TransactionManager tm;
+
+   @BeforeTest
+   private void setUp() {
+      Configuration c = new Configuration();
+      c.setInvocationBatchingEnabled(true);
+      c.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
+      cache = (AtomicMapCache) new DefaultCacheManager(c).getCache();
+      tm = TestingUtil.getTransactionManager(cache);
    }
 
-   private void assertIsEmptyMap(AtomicMapCache cache, Object key) {
-      assertIsEmpty(cache.getAtomicMap(key));
+   @AfterTest
+   private void tearDown() {
+      cache.getCacheManager().stop();
    }
 
+   @AfterMethod
+   private void clearUp() throws SystemException {
+      if (tm.getTransaction() != null) {
+         try {
+            tm.rollback();
+         } catch (Exception ignored) {
+            // try to suspend?
+            tm.suspend();
+         }
+      }
+      cache.clear();
+   }
+
    public void testAtomicMap() {
-      Configuration c = new Configuration();
-      c.setInvocationBatchingEnabled(true);
-      AtomicMapCache cache = (AtomicMapCache) new DefaultCacheManager(c).getCache();
-      try {
-         AtomicMap map = cache.getAtomicMap("map");
+      AtomicMap map = cache.getAtomicMap("map");
 
-         assertIsEmpty(map);
-         assertIsEmptyMap(cache, "map");
+      assertIsEmpty(map);
+      assertIsEmptyMap(cache, "map");
 
-         map.put("blah", "blah");
-         assert map.size() == 1;
-         assert map.get("blah").equals("blah");
-         assert map.containsKey("blah");
+      map.put("blah", "blah");
+      assert map.size() == 1;
+      assert map.get("blah").equals("blah");
+      assert map.containsKey("blah");
 
-         map.clear();
+      map.clear();
 
-         assertIsEmpty(map);
-         assertIsEmptyMap(cache, "map");
-      }
-      finally {
-         cache.stop();
-      }
+      assertIsEmpty(map);
+      assertIsEmptyMap(cache, "map");
    }
 
 
    public void testReadSafetyEmptyCache() throws Exception {
-      Configuration c = new Configuration();
-      c.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
-      c.setInvocationBatchingEnabled(true);
-      AtomicMapCache cache = (AtomicMapCache) new DefaultCacheManager(c).getCache();
-      try {
-         AtomicMap map = cache.getAtomicMap("map");
+      AtomicMap map = cache.getAtomicMap("map");
 
-         assertIsEmpty(map);
-         assertIsEmptyMap(cache, "map");
+      assertIsEmpty(map);
+      assertIsEmptyMap(cache, "map");
 
-         DummyTransactionManager.getInstance().begin();
-         map.put("blah", "blah");
-         assert map.size() == 1;
-         assert map.get("blah").equals("blah");
-         assert map.containsKey("blah");
-         Transaction t = DummyTransactionManager.getInstance().suspend();
+      tm.begin();
+      map.put("blah", "blah");
+      assert map.size() == 1;
+      assert map.get("blah").equals("blah");
+      assert map.containsKey("blah");
+      Transaction t = tm.suspend();
 
-         assertIsEmpty(map);
-         assertIsEmptyMap(cache, "map");
+      assertIsEmpty(map);
+      assertIsEmptyMap(cache, "map");
 
-         DummyTransactionManager.getInstance().resume(t);
-         DummyTransactionManager.getInstance().commit();
+      tm.resume(t);
+      tm.commit();
 
-         assert map.size() == 1;
-         assert map.get("blah").equals("blah");
-         assert map.containsKey("blah");
+      assert map.size() == 1;
+      assert map.get("blah").equals("blah");
+      assert map.containsKey("blah");
 
-         map.clear();
+      map.clear();
 
-         assertIsEmpty(map);
-         assertIsEmptyMap(cache, "map");
-      }
-      finally {
-         cache.stop();
-      }
+      assertIsEmpty(map);
+      assertIsEmptyMap(cache, "map");
    }
 
    public void testReadSafetyNotEmptyCache() throws Exception {
-      Configuration c = new Configuration();
-      c.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
-      c.setInvocationBatchingEnabled(true);
-      AtomicMapCache cache = (AtomicMapCache) new DefaultCacheManager(c).getCache();
-      try {
-         AtomicMap map = cache.getAtomicMap("map");
+      AtomicMap map = cache.getAtomicMap("map");
 
-         DummyTransactionManager.getInstance().begin();
-         map.put("blah", "blah");
-         assert map.size() == 1;
-         assert map.get("blah").equals("blah");
-         assert map.containsKey("blah");
-         Transaction t = DummyTransactionManager.getInstance().suspend();
+      tm.begin();
+      map.put("blah", "blah");
+      assert map.size() == 1;
+      assert map.get("blah").equals("blah");
+      assert map.containsKey("blah");
+      Transaction t = tm.suspend();
 
-         assertIsEmpty(map);
-         assertIsEmptyMap(cache, "map");
+      assertIsEmpty(map);
+      assertIsEmptyMap(cache, "map");
 
-         DummyTransactionManager.getInstance().resume(t);
-         DummyTransactionManager.getInstance().commit();
+      tm.resume(t);
+      tm.commit();
 
-         assert map.size() == 1;
-         assert map.get("blah").equals("blah");
-         assert map.containsKey("blah");
+      assert map.size() == 1;
+      assert map.get("blah").equals("blah");
+      assert map.containsKey("blah");
 
-         map.clear();
+      map.clear();
 
-         assertIsEmpty(map);
-         assertIsEmptyMap(cache, "map");
-      }
-      finally {
-         cache.stop();
-      }
+      assertIsEmpty(map);
+      assertIsEmptyMap(cache, "map");
    }
 
    public void testReadSafetyRollback() throws Exception {
-      Configuration c = new Configuration();
-      c.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
-      c.setInvocationBatchingEnabled(true);
-      AtomicMapCache cache = (AtomicMapCache) new DefaultCacheManager(c).getCache();
-      try {
-         AtomicMap map = cache.getAtomicMap("map");
+      AtomicMap map = cache.getAtomicMap("map");
 
-         DummyTransactionManager.getInstance().begin();
-         map.put("blah", "blah");
-         assert map.size() == 1;
-         assert map.get("blah").equals("blah");
-         assert map.containsKey("blah");
-         Transaction t = DummyTransactionManager.getInstance().suspend();
+      tm.begin();
+      map.put("blah", "blah");
+      assert map.size() == 1;
+      assert map.get("blah").equals("blah");
+      assert map.containsKey("blah");
+      Transaction t = tm.suspend();
 
-         assertIsEmpty(map);
-         assertIsEmptyMap(cache, "map");
+      assertIsEmpty(map);
+      assertIsEmptyMap(cache, "map");
 
-         DummyTransactionManager.getInstance().resume(t);
-         DummyTransactionManager.getInstance().rollback();
+      tm.resume(t);
+      tm.rollback();
 
-         assertIsEmpty(map);
-         assertIsEmptyMap(cache, "map");
-      }
-      finally {
-         cache.stop();
-      }
+      assertIsEmpty(map);
+      assertIsEmptyMap(cache, "map");
    }
-
-   public void testReplicationCommit() throws Exception {
-      Configuration c = new Configuration();
-      c.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
-      c.setCacheMode(CacheMode.REPL_SYNC);
-      c.setInvocationBatchingEnabled(true);
-      AtomicMapCache cache1 = (AtomicMapCache) new DefaultCacheManager(c).getCache();
-      AtomicMapCache cache2 = (AtomicMapCache) new DefaultCacheManager(c).getCache();
-
-      try {
-         TestingUtil.blockUntilViewsReceived(20000, cache1, cache2);
-         AtomicMap map = cache1.getAtomicMap("map");
-
-         DummyTransactionManager.getInstance().begin();
-         map.put("existing", "existing");
-         map.put("blah", "blah");
-         DummyTransactionManager.getInstance().commit();
-
-         assert map.size() == 2;
-         assert map.get("blah").equals("blah");
-         assert map.containsKey("blah");
-
-         System.out.println("Map on cache 2 is " + cache2.getAtomicMap("map"));
-
-         assert cache2.getAtomicMap("map").size() == 2;
-         assert cache2.getAtomicMap("map").get("blah").equals("blah");
-         assert cache2.getAtomicMap("map").containsKey("blah");
-      }
-      finally {
-         TestingUtil.killCaches(cache1, cache2);
-      }
-   }
-
-   public void testReplicationRollback() throws Exception {
-      Configuration c = new Configuration();
-      c.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
-      c.setCacheMode(CacheMode.REPL_SYNC);
-      c.setInvocationBatchingEnabled(true);
-      AtomicMapCache cache1 = (AtomicMapCache) new DefaultCacheManager(c).getCache();
-      AtomicMapCache cache2 = (AtomicMapCache) new DefaultCacheManager(c).getCache();
-
-      try {
-         TestingUtil.blockUntilViewsReceived(20000, cache1, cache2);
-         AtomicMap map = cache1.getAtomicMap("map");
-
-         DummyTransactionManager.getInstance().begin();
-         map.put("existing", "existing");
-         map.put("blah", "blah");
-         DummyTransactionManager.getInstance().rollback();
-
-         assertIsEmpty(map);
-         assertIsEmptyMap(cache1, "map");
-         assertIsEmptyMap(cache2, "map");
-      }
-      finally {
-         TestingUtil.killCaches(cache1, cache2);
-      }
-   }
 }

Added: core/branches/flat/src/test/java/org/horizon/atomic/AtomicHashMapTestAssertions.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/atomic/AtomicHashMapTestAssertions.java	                        (rev 0)
+++ core/branches/flat/src/test/java/org/horizon/atomic/AtomicHashMapTestAssertions.java	2009-01-28 23:43:56 UTC (rev 7608)
@@ -0,0 +1,16 @@
+package org.horizon.atomic;
+
+import java.util.Map;
+
+public class AtomicHashMapTestAssertions {
+
+   public static void assertIsEmpty(Map map) {
+      assert map.size() == 0;
+      assert map.get("blah") == null;
+      assert !map.containsKey("blah");
+   }
+
+   public static void assertIsEmptyMap(AtomicMapCache cache, Object key) {
+      assertIsEmpty(cache.getAtomicMap(key));
+   }
+}

Added: core/branches/flat/src/test/java/org/horizon/atomic/ClusteredAPITest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/atomic/ClusteredAPITest.java	                        (rev 0)
+++ core/branches/flat/src/test/java/org/horizon/atomic/ClusteredAPITest.java	2009-01-28 23:43:56 UTC (rev 7608)
@@ -0,0 +1,65 @@
+package org.horizon.atomic;
+
+import org.horizon.BaseClusteredTest;
+import org.horizon.Cache;
+import static org.horizon.atomic.AtomicHashMapTestAssertions.assertIsEmpty;
+import static org.horizon.atomic.AtomicHashMapTestAssertions.assertIsEmptyMap;
+import org.horizon.config.Configuration;
+import org.horizon.transaction.DummyTransactionManager;
+import org.horizon.transaction.DummyTransactionManagerLookup;
+import org.horizon.util.TestingUtil;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.List;
+
+
+ at Test(groups = "functional", sequential = true)
+public class ClusteredAPITest extends BaseClusteredTest {
+   AtomicMapCache cache1, cache2;
+
+   @BeforeMethod
+   public void setUp() {
+      Configuration c = new Configuration();
+      c.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
+      c.setCacheMode(Configuration.CacheMode.REPL_SYNC);
+      c.setInvocationBatchingEnabled(true);
+
+      List<Cache> caches = createClusteredCaches(2, "atomic", c);
+      cache1 = (AtomicMapCache) caches.get(0);
+      cache2 = (AtomicMapCache) caches.get(1);
+      TestingUtil.blockUntilViewsReceived(20000, cache1, cache2);
+   }
+
+   public void testReplicationCommit() throws Exception {
+      AtomicMap map = cache1.getAtomicMap("map");
+
+      DummyTransactionManager.getInstance().begin();
+      map.put("existing", "existing");
+      map.put("blah", "blah");
+      DummyTransactionManager.getInstance().commit();
+
+      assert map.size() == 2;
+      assert map.get("blah").equals("blah");
+      assert map.containsKey("blah");
+
+      System.out.println("Map on cache 2 is " + cache2.getAtomicMap("map"));
+
+      assert cache2.getAtomicMap("map").size() == 2;
+      assert cache2.getAtomicMap("map").get("blah").equals("blah");
+      assert cache2.getAtomicMap("map").containsKey("blah");
+   }
+
+   public void testReplicationRollback() throws Exception {
+      AtomicMap map = cache1.getAtomicMap("map");
+
+      DummyTransactionManager.getInstance().begin();
+      map.put("existing", "existing");
+      map.put("blah", "blah");
+      DummyTransactionManager.getInstance().rollback();
+
+      assertIsEmpty(map);
+      assertIsEmptyMap(cache1, "map");
+      assertIsEmptyMap(cache2, "map");
+   }
+}

Modified: core/branches/flat/src/test/java/org/horizon/replication/AsyncReplTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/replication/AsyncReplTest.java	2009-01-28 18:27:32 UTC (rev 7607)
+++ core/branches/flat/src/test/java/org/horizon/replication/AsyncReplTest.java	2009-01-28 23:43:56 UTC (rev 7608)
@@ -11,7 +11,6 @@
 import org.horizon.BaseClusteredTest;
 import org.horizon.Cache;
 import org.horizon.config.Configuration;
-import org.horizon.manager.CacheManager;
 import org.horizon.transaction.DummyTransactionManagerLookup;
 import org.horizon.util.TestingUtil;
 import static org.testng.AssertJUnit.assertEquals;
@@ -19,6 +18,7 @@
 import org.testng.annotations.Test;
 
 import javax.transaction.TransactionManager;
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -37,13 +37,10 @@
       asyncConfiguration.setSyncRollbackPhase(true);
       asyncConfiguration.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
 
-      CacheManager cm1 = addCacheManager();
-      CacheManager cm2 = addCacheManager();
+      List<Cache> caches = createClusteredCaches(2, "asyncRepl", asyncConfiguration);
 
-      defineCacheOnAllManagers("asyncRepl", asyncConfiguration);
-
-      cache1 = cm1.getCache("asyncRepl");
-      cache2 = cm2.getCache("asyncRepl");
+      cache1 = caches.get(0);
+      cache2 = caches.get(1);
    }
 
    public void testWithNoTx() throws Exception {

Modified: core/branches/flat/src/test/java/org/horizon/replication/ExceptionTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/replication/ExceptionTest.java	2009-01-28 18:27:32 UTC (rev 7607)
+++ core/branches/flat/src/test/java/org/horizon/replication/ExceptionTest.java	2009-01-28 23:43:56 UTC (rev 7608)
@@ -1,66 +1,53 @@
 package org.horizon.replication;
 
+import org.horizon.BaseClusteredTest;
 import org.horizon.Cache;
-import org.horizon.UnitTestCacheManager;
+import org.horizon.commands.VisitableCommand;
 import org.horizon.config.Configuration;
+import org.horizon.context.InvocationContext;
+import org.horizon.interceptors.base.CommandInterceptor;
 import org.horizon.lock.TimeoutException;
 import org.horizon.transaction.DummyTransactionManagerLookup;
 import org.horizon.util.TestingUtil;
-import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import javax.transaction.Transaction;
 import javax.transaction.TransactionManager;
+import java.util.List;
 
 /**
  * Tests the type of exceptions thrown for Lock Acquisition Timeouts versus Sync Repl Timeouts
  *
  * @author <a href="mailto:manik at jboss.org">Manik Surtani</a>
  */
- at Test(groups = {"functional"}, sequential = true)
-public class ExceptionTest {
+ at Test(groups = "functional", sequential = true)
+public class ExceptionTest extends BaseClusteredTest {
    private Cache cache1;
    private Cache cache2;
 
-   private static String DELAYED_CLUSTER_CONFIG =
-         "UDP(mcast_addr=224.0.0.36;mcast_port=55566;ip_ttl=32;" +
-               "    mcast_send_buf_size=150000;mcast_recv_buf_size=80000):" +
-               "PING(timeout=1000;num_initial_members=2):" +
-               "MERGE2(min_interval=5000;max_interval=10000):" +
-               "FD_SOCK:" +
-               "VERIFY_SUSPECT(timeout=1500):" +
-               "pbcast.NAKACK(gc_lag=50;max_xmit_size=8192;retransmit_timeout=600,1200,2400,4800):" +
-               "UNICAST(timeout=600,1200,2400,4800):" +
-               "pbcast.STABLE(desired_avg_gossip=20000):" +
-               "FRAG(frag_size=8192;down_thread=false;up_thread=false):" +
-               "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;" +
-               "shun=false;print_local_addr=true):" +
-               "pbcast.STATE_TRANSFER:" +
-               "DELAY(in_delay=100;out_delay=100)";
-
-   private Cache createCache(String jgroupsConfig) {
+   @BeforeMethod
+   public void setUp() {
       Configuration c = new Configuration();
       c.setSyncCommitPhase(true);
       c.setSyncRollbackPhase(true);
       c.setCacheMode(Configuration.CacheMode.REPL_SYNC);
-      // TODO fix this
-//      if (jgroupsConfig != null) c.setClusterConfig(jgroupsConfig);
       c.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
-      Cache cache = new UnitTestCacheManager(c).createCache("testCache");
-      return cache;
-   }
 
-   @AfterMethod
-   public void tearDown() {
-      TestingUtil.killCaches(cache1, cache2);
-      cache1 = null;
-      cache2 = null;
+      List<Cache> caches = createClusteredCaches(2, "ExceptionTestCache", c);
+      cache1 = caches.get(0);
+      cache2 = caches.get(1);
    }
 
-   @Test(groups = {"functional"}, expectedExceptions = {TimeoutException.class})
+   @Test(groups = "functional", expectedExceptions = {TimeoutException.class})
    public void testSyncReplTimeout() {
-      cache1 = createCache(DELAYED_CLUSTER_CONFIG);
-      cache2 = createCache(DELAYED_CLUSTER_CONFIG);
+      cache2.addInterceptor(new CommandInterceptor() {
+         @Override
+         protected Object handleDefault(InvocationContext ctx, VisitableCommand cmd) throws Throwable {
+            // Add a delay
+            Thread.sleep(100);
+            return super.handleDefault(ctx, cmd);
+         }
+      }, 0);
 
       cache1.getConfiguration().setSyncReplTimeout(1); // 1ms.  this is *bound* to fail.
       cache2.getConfiguration().setSyncReplTimeout(1);
@@ -73,10 +60,8 @@
       cache1.put("k", "v");
    }
 
-   @Test(groups = {"functional"}, expectedExceptions = {TimeoutException.class})
+   @Test(groups = "functional", expectedExceptions = {TimeoutException.class})
    public void testLockAcquisitionTimeout() throws Exception {
-      cache1 = createCache(null);
-      cache2 = createCache(null);
       cache2.getConfiguration().setLockAcquisitionTimeout(1);
 
       cache1.start();
@@ -88,7 +73,7 @@
       TransactionManager tm = cache2.getConfiguration().getRuntimeConfig().getTransactionManager();
       tm.begin();
       cache2.put("block", "block");
-      Transaction t = tm.suspend();
+      tm.suspend();
       cache1.put("block", "v");
    }
 }

Modified: core/branches/flat/src/test/java/org/horizon/replication/ReplicationExceptionTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/replication/ReplicationExceptionTest.java	2009-01-28 18:27:32 UTC (rev 7607)
+++ core/branches/flat/src/test/java/org/horizon/replication/ReplicationExceptionTest.java	2009-01-28 23:43:56 UTC (rev 7608)
@@ -7,16 +7,15 @@
  */
 package org.horizon.replication;
 
-import org.horizon.CacheSPI;
-import org.horizon.UnitTestCacheManager;
+import org.horizon.BaseClusteredTest;
+import org.horizon.Cache;
 import org.horizon.config.Configuration;
 import org.horizon.lock.IsolationLevel;
-import org.horizon.transaction.DummyTransactionManager;
 import org.horizon.transaction.DummyTransactionManagerLookup;
 import org.horizon.util.TestingUtil;
 import static org.testng.AssertJUnit.assertNotNull;
 import static org.testng.AssertJUnit.fail;
-import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import javax.transaction.NotSupportedException;
@@ -25,68 +24,36 @@
 import javax.transaction.TransactionManager;
 import java.io.NotSerializableException;
 import java.io.Serializable;
+import java.util.List;
 
-/**
- * Teting of replication exception for a Nonerislizable object
- *
- * @author Ben Wang
- */
- at Test(groups = {"functional"}, sequential = true)
-public class ReplicationExceptionTest {
-   private CacheSPI<String, ContainerData> cache1, cache2;
+ at Test(groups = "functional", sequential = true)
+public class ReplicationExceptionTest extends BaseClusteredTest {
+   private Cache<String, ContainerData> cache1, cache2;
 
-   @AfterMethod(alwaysRun = true)
-   public void tearDown() throws Exception {
-      // We just can't kill DummyTransactionManager. We are sharing single instance in more tests.
-      TestingUtil.killTransaction(DummyTransactionManager.getInstance());
-      destroyCaches();
-      /*
-      if (old_factory != null)
-      {
-         System.setProperty(Context.INITIAL_CONTEXT_FACTORY, old_factory);
-         old_factory = null;
-      }
-      */
-   }
+   @BeforeMethod
+   public void setUp() {
+      Configuration configuration = new Configuration();
 
-   private TransactionManager beginTransaction() throws SystemException, NotSupportedException {
-      TransactionManager mgr = cache1.getConfiguration().getRuntimeConfig().getTransactionManager();
-      mgr.begin();
-      return mgr;
-   }
+      configuration.setCacheMode(Configuration.CacheMode.REPL_SYNC);
+      configuration.setIsolationLevel(IsolationLevel.REPEATABLE_READ);
 
-   private void initCaches(Configuration.CacheMode cachingMode) {
-      Configuration conf1 = new Configuration();
-      Configuration conf2 = new Configuration();
+      configuration.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
+      configuration.setLockAcquisitionTimeout(5000);
 
-      conf1.setCacheMode(cachingMode);
-      conf2.setCacheMode(cachingMode);
-      conf1.setIsolationLevel(IsolationLevel.SERIALIZABLE);
-      conf2.setIsolationLevel(IsolationLevel.SERIALIZABLE);
+      List<Cache> caches = createClusteredCaches(2, "replicatinExceptionTest", configuration);
 
-      conf1.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
-      conf2.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
-      conf1.setLockAcquisitionTimeout(5000);
-      conf2.setLockAcquisitionTimeout(5000);
-
-      cache1 = (CacheSPI) new UnitTestCacheManager(conf1).createCache("testCache");
-      cache2 = (CacheSPI) new UnitTestCacheManager(conf2).createCache("testCache");
-
-
-      cache1.start();
-      cache2.start();
+      cache1 = caches.get(0);
+      cache2 = caches.get(1);
    }
 
-   void destroyCaches() throws Exception {
-      TestingUtil.killCaches(cache1, cache2);
-      cache1 = null;
-      cache2 = null;
+   private TransactionManager beginTransaction() throws SystemException, NotSupportedException {
+      TransactionManager mgr = TestingUtil.getTransactionManager(cache1);
+      mgr.begin();
+      return mgr;
    }
 
    public void testNonSerializableRepl() throws Exception {
       try {
-         initCaches(Configuration.CacheMode.REPL_SYNC);
-
          cache1.put("test", new ContainerData());
 
          // We should not come here.
@@ -106,8 +73,6 @@
       TransactionManager tm;
 
       try {
-         initCaches(Configuration.CacheMode.REPL_SYNC);
-
          tm = beginTransaction();
          cache1.put("test", new ContainerData());
          tm.commit();

Modified: core/branches/flat/src/test/java/org/horizon/replication/SyncCacheListenerTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/replication/SyncCacheListenerTest.java	2009-01-28 18:27:32 UTC (rev 7607)
+++ core/branches/flat/src/test/java/org/horizon/replication/SyncCacheListenerTest.java	2009-01-28 23:43:56 UTC (rev 7608)
@@ -7,62 +7,40 @@
  */
 package org.horizon.replication;
 
+import org.horizon.BaseClusteredTest;
+import org.horizon.Cache;
 import org.horizon.CacheException;
-import org.horizon.CacheSPI;
-import org.horizon.UnitTestCacheManager;
 import org.horizon.config.Configuration;
 import org.horizon.lock.IsolationLevel;
 import org.horizon.logging.Log;
 import org.horizon.logging.LogFactory;
 import org.horizon.notifications.annotation.CacheEntryModified;
 import org.horizon.notifications.annotation.CacheEntryRemoved;
+import org.horizon.notifications.annotation.Listener;
 import org.horizon.notifications.event.Event;
 import org.horizon.notifications.event.TransactionalEvent;
-import org.horizon.transaction.DummyTransactionManager;
 import org.horizon.transaction.DummyTransactionManagerLookup;
 import org.horizon.util.TestingUtil;
 import static org.testng.AssertJUnit.*;
-import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import javax.transaction.Transaction;
 import javax.transaction.TransactionManager;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 /**
- * Test out the TreeCacheListener
+ * Test out the CacheListener
  */
- at Test(groups = {"functional"}, sequential = true)
-public class SyncCacheListenerTest {
-   private CacheSPI<Object, Object> cache1, cache2;
-   private final static Log log_ = LogFactory.getLog(SyncCacheListenerTest.class);
+ at Test(groups = "functional", sequential = true)
+public class SyncCacheListenerTest extends BaseClusteredTest {
+   private Cache<Object, Object> cache1, cache2;
+   private final static Log log = LogFactory.getLog(SyncCacheListenerTest.class);
 
    @BeforeMethod(alwaysRun = true)
    public void setUp() throws Exception {
-      System.out.println("*** starting setUp()");
-      initCaches();
-      System.out.println("*** finished setUp()");
-   }
-
-   @AfterMethod(alwaysRun = true)
-   public void tearDown() throws Exception {
-      System.out.println("*** starting tearDown()");
-      // We just can't kill DummyTransactionManager. We are sharing single instance in more tests.
-      TestingUtil.killTransaction(DummyTransactionManager.getInstance());
-      destroyCaches();
-      /*
-      if (old_factory != null)
-      {
-         System.setProperty(Context.INITIAL_CONTEXT_FACTORY, old_factory);
-         old_factory = null;
-      }
-       */
-      System.out.println("*** finished tearDown()");
-   }
-
-   private void initCaches() {
       Configuration conf = new Configuration();
       conf.setSyncCommitPhase(true);
       conf.setCacheMode(Configuration.CacheMode.REPL_SYNC);
@@ -70,25 +48,21 @@
       conf.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
       conf.setLockAcquisitionTimeout(5000);
 
-      cache1 = (CacheSPI) new UnitTestCacheManager(conf).createCache("firstCache");
-      cache2 = (CacheSPI) new UnitTestCacheManager(conf).createCache("firstCache");
-      TestingUtil.blockUntilViewReceived(cache2, 2, 1000);
-   }
+      List<Cache> caches = createClusteredCaches(2, "cache", conf);
 
-   private void destroyCaches() {
-      TestingUtil.killCaches(cache1, cache2);
-      cache1 = null;
-      cache2 = null;
+      cache1 = caches.get(0);
+      cache2 = caches.get(1);
+      TestingUtil.blockUntilViewsReceived(10000, cache1, cache2);
    }
 
    public void testSyncTxRepl() throws Exception {
       Integer age;
-      TransactionManager tm = cache1.getTransactionManager();
+      TransactionManager tm = TestingUtil.getTransactionManager(cache1);
 
       tm.begin();
       Transaction tx = tm.getTransaction();
-      Listener lis = new Listener();
-      cache1.getNotifier().addListener(lis);
+      LocalListener lis = new LocalListener();
+      cache1.addListener(lis);
       lis.put("age", 38);
 
       tm.suspend();
@@ -105,7 +79,7 @@
    public void testRemoteCacheListener() throws Exception {
       Integer age;
       RemoteListener lis = new RemoteListener();
-      cache2.getNotifier().addListener(lis);
+      cache2.addListener(lis);
       cache1.put("age", 38);
 
       // value on cache2 must be 38
@@ -117,7 +91,7 @@
 
    public void testSyncRepl() throws Exception {
       Integer age;
-      Listener lis = new Listener();
+      LocalListener lis = new LocalListener();
       cache1.addListener(lis);
       lis.put("age", 38);
 
@@ -143,7 +117,7 @@
       TransactionManager tm = cache1.getConfiguration().getRuntimeConfig().getTransactionManager();
       tm.begin();
       Transaction tx = tm.getTransaction();
-      Listener lis = new Listener();
+      LocalListener lis = new LocalListener();
 
       cache1.put("age", 38);
       lis.put("name", "Ben");
@@ -167,8 +141,8 @@
    public void testSyncReplMap() throws Exception {
       Integer age;
 
-      Listener lis = new Listener();
-      cache1.getNotifier().addListener(lis);
+      LocalListener lis = new LocalListener();
+      cache1.addListener(lis);
       lis.put("age", 38);
 
       cache1.put("name", "Ben");
@@ -178,8 +152,8 @@
       assertTrue("\"age\" must be 38", age == 38);
    }
 
-   @org.horizon.notifications.annotation.Listener
-   public class Listener {
+   @Listener
+   public class LocalListener {
       Object key = null;
 
       public void put(Object key, Object val) {
@@ -198,7 +172,7 @@
       @CacheEntryModified
       public void nodeModified(Event ne) {
          if (!ne.isPre()) {
-            log_.debug("nodeModified visited with fqn: " + key);
+            log.debug("nodeModified visited with fqn: " + key);
             try {
                // test out if we can get the read lock since there is a write lock going as well.
                cache1.get(key);
@@ -212,14 +186,14 @@
 
    }
 
-   @org.horizon.notifications.annotation.Listener
+   @Listener
    public class RemoteListener {
 
       @CacheEntryRemoved
       @CacheEntryModified
       public void callback(TransactionalEvent e) {
          System.out.println("Callback got event " + e);
-         log_.debug("Callback got event " + e);
+         log.debug("Callback got event " + e);
          assertFalse("node was removed on remote cache so isLocal should be false", e.isOriginLocal());
       }
    }

Modified: core/branches/flat/src/test/java/org/horizon/replication/SyncReplTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/replication/SyncReplTest.java	2009-01-28 18:27:32 UTC (rev 7607)
+++ core/branches/flat/src/test/java/org/horizon/replication/SyncReplTest.java	2009-01-28 23:43:56 UTC (rev 7608)
@@ -44,8 +44,7 @@
       Configuration replSync = new Configuration();
       replSync.setCacheMode(Configuration.CacheMode.REPL_SYNC);
 
-      cm1.defineCache("replSync", replSync);
-      cm2.defineCache("replSync", replSync);
+      defineCacheOnAllManagers("replSync", replSync);
 
       cache1 = cm1.getCache("replSync");
       cache2 = cm2.getCache("replSync");




More information about the jbosscache-commits mailing list