Author: manik.surtani(a)jboss.com
Date: 2009-01-28 18:43:56 -0500 (Wed, 28 Jan 2009)
New Revision: 7608
Added:
core/branches/flat/src/test/java/org/horizon/atomic/AtomicHashMapTestAssertions.java
core/branches/flat/src/test/java/org/horizon/atomic/ClusteredAPITest.java
Modified:
core/branches/flat/src/main/java/org/horizon/batch/BatchContainer.java
core/branches/flat/src/main/java/org/horizon/remoting/RPCManagerImpl.java
core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/JGroupsTransport.java
core/branches/flat/src/test/java/org/horizon/BaseClusteredTest.java
core/branches/flat/src/test/java/org/horizon/api/mvcc/PutForExternalReadTest.java
core/branches/flat/src/test/java/org/horizon/api/tree/NodeAPITest.java
core/branches/flat/src/test/java/org/horizon/atomic/APITest.java
core/branches/flat/src/test/java/org/horizon/replication/AsyncReplTest.java
core/branches/flat/src/test/java/org/horizon/replication/ExceptionTest.java
core/branches/flat/src/test/java/org/horizon/replication/ReplicationExceptionTest.java
core/branches/flat/src/test/java/org/horizon/replication/SyncCacheListenerTest.java
core/branches/flat/src/test/java/org/horizon/replication/SyncReplTest.java
Log:
Modified: core/branches/flat/src/main/java/org/horizon/batch/BatchContainer.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/batch/BatchContainer.java 2009-01-28
18:27:32 UTC (rev 7607)
+++ core/branches/flat/src/main/java/org/horizon/batch/BatchContainer.java 2009-01-28
23:43:56 UTC (rev 7608)
@@ -61,9 +61,9 @@
public boolean startBatch(boolean autoBatch) throws CacheException {
BatchDetails bd = batchDetails.get();
try {
- bd.invocationCount++;
if (transactionManager.getTransaction() == null && bd.tx == null) {
transactionManager.begin();
+ bd.invocationCount = 1;
bd.suspendTxAfterInvocation = !autoBatch;
// do not suspend if this is from an AutoBatch!
@@ -73,8 +73,10 @@
bd.tx = transactionManager.suspend();
return true;
- }
- return false;
+ } else {
+ bd.invocationCount++;
+ return false;
+ }
}
catch (Exception e) {
throw new CacheException("Unable to start batch", e);
Modified: core/branches/flat/src/main/java/org/horizon/remoting/RPCManagerImpl.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/RPCManagerImpl.java 2009-01-28
18:27:32 UTC (rev 7607)
+++ core/branches/flat/src/main/java/org/horizon/remoting/RPCManagerImpl.java 2009-01-28
23:43:56 UTC (rev 7608)
@@ -121,7 +121,7 @@
}
// mainly for unit testing
- public void setTransport(Transport transport) {
- this.t = transport;
+ public void setTransport(Transport t) {
+ this.t = t;
}
}
Modified:
core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/JGroupsTransport.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/JGroupsTransport.java 2009-01-28
18:27:32 UTC (rev 7607)
+++
core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/JGroupsTransport.java 2009-01-28
23:43:56 UTC (rev 7608)
@@ -289,7 +289,7 @@
boolean needNotification = false;
if (newMembers != null) {
- // TODO: Implement breaking stale locks for dead members
+ // TODO: Implement breaking stale locks for dead members. This should be in
the TxINterceptor or TransactionTable, with a listener on the cache manager.
// if (members != null) {
// we had a membership list before this event. Check to make sure we
haven't lost any members,
// and if so, determine what members have been removed
Modified: core/branches/flat/src/test/java/org/horizon/BaseClusteredTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/BaseClusteredTest.java 2009-01-28
18:27:32 UTC (rev 7607)
+++ core/branches/flat/src/test/java/org/horizon/BaseClusteredTest.java 2009-01-28
23:43:56 UTC (rev 7608)
@@ -18,6 +18,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
+import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -64,6 +65,16 @@
}
}
+ protected List<Cache> createClusteredCaches(int numMembersInCluster, String
cacheName, Configuration c) {
+ List<Cache> caches = new ArrayList<Cache>(numMembersInCluster);
+ for (int i=0; i<numMembersInCluster; i++) {
+ CacheManager cm = addCacheManager();
+ cm.defineCache(cacheName, c);
+ caches.add(cm.getCache(cacheName));
+ }
+ return caches;
+ }
+
protected void assertClusterSize(String message, int size) {
for (CacheManager cm : cacheManagerThreadLocal.get()) {
assert cm.getMembers() != null && cm.getMembers().size() == size :
message;
@@ -115,7 +126,7 @@
}
public void waitForReplication() {
- waitForReplication(600, TimeUnit.SECONDS);
+ waitForReplication(120, TimeUnit.SECONDS);
}
public void waitForReplication(long time, TimeUnit unit) {
Modified:
core/branches/flat/src/test/java/org/horizon/api/mvcc/PutForExternalReadTest.java
===================================================================
---
core/branches/flat/src/test/java/org/horizon/api/mvcc/PutForExternalReadTest.java 2009-01-28
18:27:32 UTC (rev 7607)
+++
core/branches/flat/src/test/java/org/horizon/api/mvcc/PutForExternalReadTest.java 2009-01-28
23:43:56 UTC (rev 7608)
@@ -8,11 +8,13 @@
import org.horizon.commands.write.PutKeyValueCommand;
import org.horizon.commands.write.RemoveCommand;
import org.horizon.config.Configuration;
-import org.horizon.factories.ComponentRegistry;
import org.horizon.manager.CacheManager;
import org.horizon.remoting.RPCManager;
+import org.horizon.remoting.RPCManagerImpl;
+import org.horizon.remoting.ResponseFilter;
import org.horizon.remoting.ResponseMode;
import org.horizon.remoting.transport.Address;
+import org.horizon.remoting.transport.Transport;
import org.horizon.transaction.DummyTransactionManagerLookup;
import org.horizon.transaction.TransactionTable;
import org.horizon.util.TestingUtil;
@@ -22,6 +24,7 @@
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
+import java.util.ArrayList;
import java.util.List;
@Test(groups = "functional", sequential = true)
@@ -90,28 +93,34 @@
}
public void testAsyncForce() throws Exception {
- RPCManager rpcManager = EasyMock.createNiceMock(RPCManager.class);
- RPCManager originalRpcManager =
cache1.getConfiguration().getRuntimeConfig().getRPCManager();
- List<Address> memberList = originalRpcManager.getMembers();
- expect(rpcManager.getMembers()).andReturn(memberList).anyTimes();
- // inject a mock RPC manager so that we can test whether calls made are sync or
async.
- ComponentRegistry cr = TestingUtil.extractComponentRegistry(cache1);
- cr.registerComponent(rpcManager, RPCManager.class);
- cr.rewire();
+ Transport mockTransport = createNiceMock(Transport.class);
+ RPCManagerImpl rpcManager = (RPCManagerImpl) TestingUtil.extractComponent(cache1,
RPCManager.class);
+ Transport originalTransport = TestingUtil.extractComponent(cache1,
Transport.class);
+ try {
- // specify what we expectWithTx called on the mock Rpc Manager. For params we
don't care about, just use ANYTHING.
- // setting the mock object to expectWithTx the "sync" param to be false.
- expect(rpcManager.invokeRemotely(anyAddresses(), (RPCCommand) anyObject(),
eq(ResponseMode.ASYNCHRONOUS), anyLong(), anyBoolean())).andReturn(null);
+ Address mockAddress1 = createNiceMock(Address.class);
+ Address mockAddress2 = createNiceMock(Address.class);
- replay(rpcManager);
+ List<Address> memberList = new ArrayList<Address>(2);
+ memberList.add(mockAddress1);
+ memberList.add(mockAddress2);
- // now try a simple replication. Since the RPCManager is a mock object it will not
actually replicate anything.
- cache1.putForExternalRead(key, value);
- verify(rpcManager);
+ expect(mockTransport.getMembers()).andReturn(memberList).anyTimes();
+ rpcManager.setTransport(mockTransport);
+ // specify what we expectWithTx called on the mock Rpc Manager. For params we
don't care about, just use ANYTHING.
+ // setting the mock object to expectWithTx the "sync" param to be
false.
+ expect(mockTransport.invokeRemotely((List<Address>) anyObject(),
(RPCCommand) anyObject(),
+ eq(ResponseMode.ASYNCHRONOUS), anyLong(),
anyBoolean(), (ResponseFilter) isNull())).andReturn(null);
- // cleanup
- TestingUtil.extractComponentRegistry(cache1).registerComponent(originalRpcManager,
RPCManager.class);
- cache1.remove(key);
+ replay(mockAddress1, mockAddress2, mockTransport);
+
+ // now try a simple replication. Since the RPCManager is a mock object it will
not actually replicate anything.
+ cache1.putForExternalRead(key, value);
+ verify(mockTransport);
+
+ } finally {
+ if (rpcManager != null) rpcManager.setTransport(originalTransport);
+ }
}
public void testTxSuspension() throws Exception {
@@ -139,19 +148,28 @@
}
public void testExceptionSuppression() throws Exception {
- RPCManager barfingRpcManager = EasyMock.createNiceMock(RPCManager.class);
- RPCManager originalRpcManager =
cache1.getConfiguration().getRuntimeConfig().getRPCManager();
+ Transport mockTransport = createNiceMock(Transport.class);
+ RPCManagerImpl rpcManager = (RPCManagerImpl) TestingUtil.extractComponent(cache1,
RPCManager.class);
+ Transport originalTransport = TestingUtil.extractComponent(cache1,
Transport.class);
try {
- List<Address> memberList = originalRpcManager.getMembers();
- expect(barfingRpcManager.getMembers()).andReturn(memberList).anyTimes();
-
expect(barfingRpcManager.getAddress()).andReturn(originalRpcManager.getAddress()).anyTimes();
- expect(barfingRpcManager.invokeRemotely(anyAddresses(), (RPCCommand)
anyObject(), anyResponseMode(), anyLong(), anyBoolean())).andThrow(new
RuntimeException("Barf!")).anyTimes();
- replay(barfingRpcManager);
-
TestingUtil.extractComponentRegistry(cache1).registerComponent(barfingRpcManager,
RPCManager.class);
- cache1.getConfiguration().getRuntimeConfig().setRPCManager(barfingRpcManager);
- TestingUtil.extractComponentRegistry(cache1).rewire();
+ Address mockAddress1 = createNiceMock(Address.class);
+ Address mockAddress2 = createNiceMock(Address.class);
+ List<Address> memberList = new ArrayList<Address>(2);
+ memberList.add(mockAddress1);
+ memberList.add(mockAddress2);
+
+ expect(mockTransport.getMembers()).andReturn(memberList).anyTimes();
+ rpcManager.setTransport(mockTransport);
+
+
+ expect(mockTransport.invokeRemotely(anyAddresses(), (RPCCommand) anyObject(),
anyResponseMode(),
+ anyLong(), anyBoolean(), (ResponseFilter)
anyObject()))
+ .andThrow(new RuntimeException("Barf!")).anyTimes();
+
+ replay(mockTransport);
+
try {
cache1.put(key, value);
fail("Should have barfed");
@@ -173,7 +191,7 @@
cache1.putForExternalRead(key, value);
}
finally {
-
TestingUtil.extractComponentRegistry(cache1).registerComponent(originalRpcManager,
RPCManager.class);
+ if (rpcManager != null) rpcManager.setTransport(originalTransport);
}
}
Modified: core/branches/flat/src/test/java/org/horizon/api/tree/NodeAPITest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/api/tree/NodeAPITest.java 2009-01-28
18:27:32 UTC (rev 7607)
+++ core/branches/flat/src/test/java/org/horizon/api/tree/NodeAPITest.java 2009-01-28
23:43:56 UTC (rev 7608)
@@ -3,7 +3,6 @@
import org.horizon.config.Configuration;
import org.horizon.manager.CacheManager;
import org.horizon.manager.DefaultCacheManager;
-import org.horizon.transaction.DummyTransactionManager;
import org.horizon.transaction.DummyTransactionManagerLookup;
import org.horizon.tree.Fqn;
import org.horizon.tree.Node;
@@ -11,6 +10,7 @@
import org.horizon.tree.TreeCacheImpl;
import org.horizon.util.TestingUtil;
import static org.testng.AssertJUnit.*;
+import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -44,6 +44,11 @@
tm = TestingUtil.getTransactionManager(cache.getCache());
}
+ @AfterMethod
+ public void tearDown() {
+ TestingUtil.killCaches(cache.getCache());
+ }
+
public void testAddingData() {
Node<Object, Object> rootNode = cache.getRoot();
Node<Object, Object> nodeA = rootNode.addChild(A);
@@ -322,11 +327,9 @@
}
public void testDoubleRemovalOfData() throws Exception {
-
-
- assert DummyTransactionManager.getInstance().getTransaction() == null;
+ assert tm.getTransaction() == null;
cache.put("/foo/1/2/3", "item", 1);
- assert DummyTransactionManager.getInstance().getTransaction() == null;
+ assert tm.getTransaction() == null;
assert 1 == (Integer) cache.get("/foo/1/2/3", "item");
tm.begin();
assert 1 == (Integer) cache.get("/foo/1/2/3", "item");
Modified: core/branches/flat/src/test/java/org/horizon/atomic/APITest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/atomic/APITest.java 2009-01-28 18:27:32
UTC (rev 7607)
+++ core/branches/flat/src/test/java/org/horizon/atomic/APITest.java 2009-01-28 23:43:56
UTC (rev 7608)
@@ -21,213 +21,144 @@
*/
package org.horizon.atomic;
+import static org.horizon.atomic.AtomicHashMapTestAssertions.assertIsEmpty;
+import static org.horizon.atomic.AtomicHashMapTestAssertions.assertIsEmptyMap;
import org.horizon.config.Configuration;
-import org.horizon.config.Configuration.CacheMode;
import org.horizon.manager.DefaultCacheManager;
-import org.horizon.transaction.DummyTransactionManager;
import org.horizon.transaction.DummyTransactionManagerLookup;
import org.horizon.util.TestingUtil;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.AfterTest;
+import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
+import javax.transaction.SystemException;
import javax.transaction.Transaction;
-import java.util.Map;
+import javax.transaction.TransactionManager;
-@Test(groups = "functional")
+@Test(groups = "functional", sequential = true)
public class APITest {
- private void assertIsEmpty(Map map) {
- assert map.size() == 0;
- assert map.get("blah") == null;
- assert !map.containsKey("blah");
+ AtomicMapCache cache;
+ TransactionManager tm;
+
+ @BeforeTest
+ private void setUp() {
+ Configuration c = new Configuration();
+ c.setInvocationBatchingEnabled(true);
+ c.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
+ cache = (AtomicMapCache) new DefaultCacheManager(c).getCache();
+ tm = TestingUtil.getTransactionManager(cache);
}
- private void assertIsEmptyMap(AtomicMapCache cache, Object key) {
- assertIsEmpty(cache.getAtomicMap(key));
+ @AfterTest
+ private void tearDown() {
+ cache.getCacheManager().stop();
}
+ @AfterMethod
+ private void clearUp() throws SystemException {
+ if (tm.getTransaction() != null) {
+ try {
+ tm.rollback();
+ } catch (Exception ignored) {
+ // try to suspend?
+ tm.suspend();
+ }
+ }
+ cache.clear();
+ }
+
public void testAtomicMap() {
- Configuration c = new Configuration();
- c.setInvocationBatchingEnabled(true);
- AtomicMapCache cache = (AtomicMapCache) new DefaultCacheManager(c).getCache();
- try {
- AtomicMap map = cache.getAtomicMap("map");
+ AtomicMap map = cache.getAtomicMap("map");
- assertIsEmpty(map);
- assertIsEmptyMap(cache, "map");
+ assertIsEmpty(map);
+ assertIsEmptyMap(cache, "map");
- map.put("blah", "blah");
- assert map.size() == 1;
- assert map.get("blah").equals("blah");
- assert map.containsKey("blah");
+ map.put("blah", "blah");
+ assert map.size() == 1;
+ assert map.get("blah").equals("blah");
+ assert map.containsKey("blah");
- map.clear();
+ map.clear();
- assertIsEmpty(map);
- assertIsEmptyMap(cache, "map");
- }
- finally {
- cache.stop();
- }
+ assertIsEmpty(map);
+ assertIsEmptyMap(cache, "map");
}
public void testReadSafetyEmptyCache() throws Exception {
- Configuration c = new Configuration();
- c.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
- c.setInvocationBatchingEnabled(true);
- AtomicMapCache cache = (AtomicMapCache) new DefaultCacheManager(c).getCache();
- try {
- AtomicMap map = cache.getAtomicMap("map");
+ AtomicMap map = cache.getAtomicMap("map");
- assertIsEmpty(map);
- assertIsEmptyMap(cache, "map");
+ assertIsEmpty(map);
+ assertIsEmptyMap(cache, "map");
- DummyTransactionManager.getInstance().begin();
- map.put("blah", "blah");
- assert map.size() == 1;
- assert map.get("blah").equals("blah");
- assert map.containsKey("blah");
- Transaction t = DummyTransactionManager.getInstance().suspend();
+ tm.begin();
+ map.put("blah", "blah");
+ assert map.size() == 1;
+ assert map.get("blah").equals("blah");
+ assert map.containsKey("blah");
+ Transaction t = tm.suspend();
- assertIsEmpty(map);
- assertIsEmptyMap(cache, "map");
+ assertIsEmpty(map);
+ assertIsEmptyMap(cache, "map");
- DummyTransactionManager.getInstance().resume(t);
- DummyTransactionManager.getInstance().commit();
+ tm.resume(t);
+ tm.commit();
- assert map.size() == 1;
- assert map.get("blah").equals("blah");
- assert map.containsKey("blah");
+ assert map.size() == 1;
+ assert map.get("blah").equals("blah");
+ assert map.containsKey("blah");
- map.clear();
+ map.clear();
- assertIsEmpty(map);
- assertIsEmptyMap(cache, "map");
- }
- finally {
- cache.stop();
- }
+ assertIsEmpty(map);
+ assertIsEmptyMap(cache, "map");
}
public void testReadSafetyNotEmptyCache() throws Exception {
- Configuration c = new Configuration();
- c.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
- c.setInvocationBatchingEnabled(true);
- AtomicMapCache cache = (AtomicMapCache) new DefaultCacheManager(c).getCache();
- try {
- AtomicMap map = cache.getAtomicMap("map");
+ AtomicMap map = cache.getAtomicMap("map");
- DummyTransactionManager.getInstance().begin();
- map.put("blah", "blah");
- assert map.size() == 1;
- assert map.get("blah").equals("blah");
- assert map.containsKey("blah");
- Transaction t = DummyTransactionManager.getInstance().suspend();
+ tm.begin();
+ map.put("blah", "blah");
+ assert map.size() == 1;
+ assert map.get("blah").equals("blah");
+ assert map.containsKey("blah");
+ Transaction t = tm.suspend();
- assertIsEmpty(map);
- assertIsEmptyMap(cache, "map");
+ assertIsEmpty(map);
+ assertIsEmptyMap(cache, "map");
- DummyTransactionManager.getInstance().resume(t);
- DummyTransactionManager.getInstance().commit();
+ tm.resume(t);
+ tm.commit();
- assert map.size() == 1;
- assert map.get("blah").equals("blah");
- assert map.containsKey("blah");
+ assert map.size() == 1;
+ assert map.get("blah").equals("blah");
+ assert map.containsKey("blah");
- map.clear();
+ map.clear();
- assertIsEmpty(map);
- assertIsEmptyMap(cache, "map");
- }
- finally {
- cache.stop();
- }
+ assertIsEmpty(map);
+ assertIsEmptyMap(cache, "map");
}
public void testReadSafetyRollback() throws Exception {
- Configuration c = new Configuration();
- c.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
- c.setInvocationBatchingEnabled(true);
- AtomicMapCache cache = (AtomicMapCache) new DefaultCacheManager(c).getCache();
- try {
- AtomicMap map = cache.getAtomicMap("map");
+ AtomicMap map = cache.getAtomicMap("map");
- DummyTransactionManager.getInstance().begin();
- map.put("blah", "blah");
- assert map.size() == 1;
- assert map.get("blah").equals("blah");
- assert map.containsKey("blah");
- Transaction t = DummyTransactionManager.getInstance().suspend();
+ tm.begin();
+ map.put("blah", "blah");
+ assert map.size() == 1;
+ assert map.get("blah").equals("blah");
+ assert map.containsKey("blah");
+ Transaction t = tm.suspend();
- assertIsEmpty(map);
- assertIsEmptyMap(cache, "map");
+ assertIsEmpty(map);
+ assertIsEmptyMap(cache, "map");
- DummyTransactionManager.getInstance().resume(t);
- DummyTransactionManager.getInstance().rollback();
+ tm.resume(t);
+ tm.rollback();
- assertIsEmpty(map);
- assertIsEmptyMap(cache, "map");
- }
- finally {
- cache.stop();
- }
+ assertIsEmpty(map);
+ assertIsEmptyMap(cache, "map");
}
-
- public void testReplicationCommit() throws Exception {
- Configuration c = new Configuration();
- c.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
- c.setCacheMode(CacheMode.REPL_SYNC);
- c.setInvocationBatchingEnabled(true);
- AtomicMapCache cache1 = (AtomicMapCache) new DefaultCacheManager(c).getCache();
- AtomicMapCache cache2 = (AtomicMapCache) new DefaultCacheManager(c).getCache();
-
- try {
- TestingUtil.blockUntilViewsReceived(20000, cache1, cache2);
- AtomicMap map = cache1.getAtomicMap("map");
-
- DummyTransactionManager.getInstance().begin();
- map.put("existing", "existing");
- map.put("blah", "blah");
- DummyTransactionManager.getInstance().commit();
-
- assert map.size() == 2;
- assert map.get("blah").equals("blah");
- assert map.containsKey("blah");
-
- System.out.println("Map on cache 2 is " +
cache2.getAtomicMap("map"));
-
- assert cache2.getAtomicMap("map").size() == 2;
- assert
cache2.getAtomicMap("map").get("blah").equals("blah");
- assert cache2.getAtomicMap("map").containsKey("blah");
- }
- finally {
- TestingUtil.killCaches(cache1, cache2);
- }
- }
-
- public void testReplicationRollback() throws Exception {
- Configuration c = new Configuration();
- c.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
- c.setCacheMode(CacheMode.REPL_SYNC);
- c.setInvocationBatchingEnabled(true);
- AtomicMapCache cache1 = (AtomicMapCache) new DefaultCacheManager(c).getCache();
- AtomicMapCache cache2 = (AtomicMapCache) new DefaultCacheManager(c).getCache();
-
- try {
- TestingUtil.blockUntilViewsReceived(20000, cache1, cache2);
- AtomicMap map = cache1.getAtomicMap("map");
-
- DummyTransactionManager.getInstance().begin();
- map.put("existing", "existing");
- map.put("blah", "blah");
- DummyTransactionManager.getInstance().rollback();
-
- assertIsEmpty(map);
- assertIsEmptyMap(cache1, "map");
- assertIsEmptyMap(cache2, "map");
- }
- finally {
- TestingUtil.killCaches(cache1, cache2);
- }
- }
}
Added:
core/branches/flat/src/test/java/org/horizon/atomic/AtomicHashMapTestAssertions.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/atomic/AtomicHashMapTestAssertions.java
(rev 0)
+++
core/branches/flat/src/test/java/org/horizon/atomic/AtomicHashMapTestAssertions.java 2009-01-28
23:43:56 UTC (rev 7608)
@@ -0,0 +1,16 @@
+package org.horizon.atomic;
+
+import java.util.Map;
+
+public class AtomicHashMapTestAssertions {
+
+ public static void assertIsEmpty(Map map) {
+ assert map.size() == 0;
+ assert map.get("blah") == null;
+ assert !map.containsKey("blah");
+ }
+
+ public static void assertIsEmptyMap(AtomicMapCache cache, Object key) {
+ assertIsEmpty(cache.getAtomicMap(key));
+ }
+}
Added: core/branches/flat/src/test/java/org/horizon/atomic/ClusteredAPITest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/atomic/ClusteredAPITest.java
(rev 0)
+++ core/branches/flat/src/test/java/org/horizon/atomic/ClusteredAPITest.java 2009-01-28
23:43:56 UTC (rev 7608)
@@ -0,0 +1,65 @@
+package org.horizon.atomic;
+
+import org.horizon.BaseClusteredTest;
+import org.horizon.Cache;
+import static org.horizon.atomic.AtomicHashMapTestAssertions.assertIsEmpty;
+import static org.horizon.atomic.AtomicHashMapTestAssertions.assertIsEmptyMap;
+import org.horizon.config.Configuration;
+import org.horizon.transaction.DummyTransactionManager;
+import org.horizon.transaction.DummyTransactionManagerLookup;
+import org.horizon.util.TestingUtil;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.List;
+
+
+@Test(groups = "functional", sequential = true)
+public class ClusteredAPITest extends BaseClusteredTest {
+ AtomicMapCache cache1, cache2;
+
+ @BeforeMethod
+ public void setUp() {
+ Configuration c = new Configuration();
+ c.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
+ c.setCacheMode(Configuration.CacheMode.REPL_SYNC);
+ c.setInvocationBatchingEnabled(true);
+
+ List<Cache> caches = createClusteredCaches(2, "atomic", c);
+ cache1 = (AtomicMapCache) caches.get(0);
+ cache2 = (AtomicMapCache) caches.get(1);
+ TestingUtil.blockUntilViewsReceived(20000, cache1, cache2);
+ }
+
+ public void testReplicationCommit() throws Exception {
+ AtomicMap map = cache1.getAtomicMap("map");
+
+ DummyTransactionManager.getInstance().begin();
+ map.put("existing", "existing");
+ map.put("blah", "blah");
+ DummyTransactionManager.getInstance().commit();
+
+ assert map.size() == 2;
+ assert map.get("blah").equals("blah");
+ assert map.containsKey("blah");
+
+ System.out.println("Map on cache 2 is " +
cache2.getAtomicMap("map"));
+
+ assert cache2.getAtomicMap("map").size() == 2;
+ assert
cache2.getAtomicMap("map").get("blah").equals("blah");
+ assert cache2.getAtomicMap("map").containsKey("blah");
+ }
+
+ public void testReplicationRollback() throws Exception {
+ AtomicMap map = cache1.getAtomicMap("map");
+
+ DummyTransactionManager.getInstance().begin();
+ map.put("existing", "existing");
+ map.put("blah", "blah");
+ DummyTransactionManager.getInstance().rollback();
+
+ assertIsEmpty(map);
+ assertIsEmptyMap(cache1, "map");
+ assertIsEmptyMap(cache2, "map");
+ }
+}
Modified: core/branches/flat/src/test/java/org/horizon/replication/AsyncReplTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/replication/AsyncReplTest.java 2009-01-28
18:27:32 UTC (rev 7607)
+++ core/branches/flat/src/test/java/org/horizon/replication/AsyncReplTest.java 2009-01-28
23:43:56 UTC (rev 7608)
@@ -11,7 +11,6 @@
import org.horizon.BaseClusteredTest;
import org.horizon.Cache;
import org.horizon.config.Configuration;
-import org.horizon.manager.CacheManager;
import org.horizon.transaction.DummyTransactionManagerLookup;
import org.horizon.util.TestingUtil;
import static org.testng.AssertJUnit.assertEquals;
@@ -19,6 +18,7 @@
import org.testng.annotations.Test;
import javax.transaction.TransactionManager;
+import java.util.List;
import java.util.concurrent.TimeUnit;
/**
@@ -37,13 +37,10 @@
asyncConfiguration.setSyncRollbackPhase(true);
asyncConfiguration.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
- CacheManager cm1 = addCacheManager();
- CacheManager cm2 = addCacheManager();
+ List<Cache> caches = createClusteredCaches(2, "asyncRepl",
asyncConfiguration);
- defineCacheOnAllManagers("asyncRepl", asyncConfiguration);
-
- cache1 = cm1.getCache("asyncRepl");
- cache2 = cm2.getCache("asyncRepl");
+ cache1 = caches.get(0);
+ cache2 = caches.get(1);
}
public void testWithNoTx() throws Exception {
Modified: core/branches/flat/src/test/java/org/horizon/replication/ExceptionTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/replication/ExceptionTest.java 2009-01-28
18:27:32 UTC (rev 7607)
+++ core/branches/flat/src/test/java/org/horizon/replication/ExceptionTest.java 2009-01-28
23:43:56 UTC (rev 7608)
@@ -1,66 +1,53 @@
package org.horizon.replication;
+import org.horizon.BaseClusteredTest;
import org.horizon.Cache;
-import org.horizon.UnitTestCacheManager;
+import org.horizon.commands.VisitableCommand;
import org.horizon.config.Configuration;
+import org.horizon.context.InvocationContext;
+import org.horizon.interceptors.base.CommandInterceptor;
import org.horizon.lock.TimeoutException;
import org.horizon.transaction.DummyTransactionManagerLookup;
import org.horizon.util.TestingUtil;
-import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
+import java.util.List;
/**
* Tests the type of exceptions thrown for Lock Acquisition Timeouts versus Sync Repl
Timeouts
*
* @author <a href="mailto:manik@jboss.org">Manik Surtani</a>
*/
-@Test(groups = {"functional"}, sequential = true)
-public class ExceptionTest {
+@Test(groups = "functional", sequential = true)
+public class ExceptionTest extends BaseClusteredTest {
private Cache cache1;
private Cache cache2;
- private static String DELAYED_CLUSTER_CONFIG =
- "UDP(mcast_addr=224.0.0.36;mcast_port=55566;ip_ttl=32;" +
- " mcast_send_buf_size=150000;mcast_recv_buf_size=80000):" +
- "PING(timeout=1000;num_initial_members=2):" +
- "MERGE2(min_interval=5000;max_interval=10000):" +
- "FD_SOCK:" +
- "VERIFY_SUSPECT(timeout=1500):" +
-
"pbcast.NAKACK(gc_lag=50;max_xmit_size=8192;retransmit_timeout=600,1200,2400,4800):"
+
- "UNICAST(timeout=600,1200,2400,4800):" +
- "pbcast.STABLE(desired_avg_gossip=20000):" +
- "FRAG(frag_size=8192;down_thread=false;up_thread=false):" +
- "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;" +
- "shun=false;print_local_addr=true):" +
- "pbcast.STATE_TRANSFER:" +
- "DELAY(in_delay=100;out_delay=100)";
-
- private Cache createCache(String jgroupsConfig) {
+ @BeforeMethod
+ public void setUp() {
Configuration c = new Configuration();
c.setSyncCommitPhase(true);
c.setSyncRollbackPhase(true);
c.setCacheMode(Configuration.CacheMode.REPL_SYNC);
- // TODO fix this
-// if (jgroupsConfig != null) c.setClusterConfig(jgroupsConfig);
c.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
- Cache cache = new UnitTestCacheManager(c).createCache("testCache");
- return cache;
- }
- @AfterMethod
- public void tearDown() {
- TestingUtil.killCaches(cache1, cache2);
- cache1 = null;
- cache2 = null;
+ List<Cache> caches = createClusteredCaches(2, "ExceptionTestCache",
c);
+ cache1 = caches.get(0);
+ cache2 = caches.get(1);
}
- @Test(groups = {"functional"}, expectedExceptions =
{TimeoutException.class})
+ @Test(groups = "functional", expectedExceptions = {TimeoutException.class})
public void testSyncReplTimeout() {
- cache1 = createCache(DELAYED_CLUSTER_CONFIG);
- cache2 = createCache(DELAYED_CLUSTER_CONFIG);
+ cache2.addInterceptor(new CommandInterceptor() {
+ @Override
+ protected Object handleDefault(InvocationContext ctx, VisitableCommand cmd)
throws Throwable {
+ // Add a delay
+ Thread.sleep(100);
+ return super.handleDefault(ctx, cmd);
+ }
+ }, 0);
cache1.getConfiguration().setSyncReplTimeout(1); // 1ms. this is *bound* to fail.
cache2.getConfiguration().setSyncReplTimeout(1);
@@ -73,10 +60,8 @@
cache1.put("k", "v");
}
- @Test(groups = {"functional"}, expectedExceptions =
{TimeoutException.class})
+ @Test(groups = "functional", expectedExceptions = {TimeoutException.class})
public void testLockAcquisitionTimeout() throws Exception {
- cache1 = createCache(null);
- cache2 = createCache(null);
cache2.getConfiguration().setLockAcquisitionTimeout(1);
cache1.start();
@@ -88,7 +73,7 @@
TransactionManager tm =
cache2.getConfiguration().getRuntimeConfig().getTransactionManager();
tm.begin();
cache2.put("block", "block");
- Transaction t = tm.suspend();
+ tm.suspend();
cache1.put("block", "v");
}
}
Modified:
core/branches/flat/src/test/java/org/horizon/replication/ReplicationExceptionTest.java
===================================================================
---
core/branches/flat/src/test/java/org/horizon/replication/ReplicationExceptionTest.java 2009-01-28
18:27:32 UTC (rev 7607)
+++
core/branches/flat/src/test/java/org/horizon/replication/ReplicationExceptionTest.java 2009-01-28
23:43:56 UTC (rev 7608)
@@ -7,16 +7,15 @@
*/
package org.horizon.replication;
-import org.horizon.CacheSPI;
-import org.horizon.UnitTestCacheManager;
+import org.horizon.BaseClusteredTest;
+import org.horizon.Cache;
import org.horizon.config.Configuration;
import org.horizon.lock.IsolationLevel;
-import org.horizon.transaction.DummyTransactionManager;
import org.horizon.transaction.DummyTransactionManagerLookup;
import org.horizon.util.TestingUtil;
import static org.testng.AssertJUnit.assertNotNull;
import static org.testng.AssertJUnit.fail;
-import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import javax.transaction.NotSupportedException;
@@ -25,68 +24,36 @@
import javax.transaction.TransactionManager;
import java.io.NotSerializableException;
import java.io.Serializable;
+import java.util.List;
-/**
- * Teting of replication exception for a Nonerislizable object
- *
- * @author Ben Wang
- */
-@Test(groups = {"functional"}, sequential = true)
-public class ReplicationExceptionTest {
- private CacheSPI<String, ContainerData> cache1, cache2;
+@Test(groups = "functional", sequential = true)
+public class ReplicationExceptionTest extends BaseClusteredTest {
+ private Cache<String, ContainerData> cache1, cache2;
- @AfterMethod(alwaysRun = true)
- public void tearDown() throws Exception {
- // We just can't kill DummyTransactionManager. We are sharing single instance
in more tests.
- TestingUtil.killTransaction(DummyTransactionManager.getInstance());
- destroyCaches();
- /*
- if (old_factory != null)
- {
- System.setProperty(Context.INITIAL_CONTEXT_FACTORY, old_factory);
- old_factory = null;
- }
- */
- }
+ @BeforeMethod
+ public void setUp() {
+ Configuration configuration = new Configuration();
- private TransactionManager beginTransaction() throws SystemException,
NotSupportedException {
- TransactionManager mgr =
cache1.getConfiguration().getRuntimeConfig().getTransactionManager();
- mgr.begin();
- return mgr;
- }
+ configuration.setCacheMode(Configuration.CacheMode.REPL_SYNC);
+ configuration.setIsolationLevel(IsolationLevel.REPEATABLE_READ);
- private void initCaches(Configuration.CacheMode cachingMode) {
- Configuration conf1 = new Configuration();
- Configuration conf2 = new Configuration();
+
configuration.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
+ configuration.setLockAcquisitionTimeout(5000);
- conf1.setCacheMode(cachingMode);
- conf2.setCacheMode(cachingMode);
- conf1.setIsolationLevel(IsolationLevel.SERIALIZABLE);
- conf2.setIsolationLevel(IsolationLevel.SERIALIZABLE);
+ List<Cache> caches = createClusteredCaches(2,
"replicatinExceptionTest", configuration);
-
conf1.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
-
conf2.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
- conf1.setLockAcquisitionTimeout(5000);
- conf2.setLockAcquisitionTimeout(5000);
-
- cache1 = (CacheSPI) new
UnitTestCacheManager(conf1).createCache("testCache");
- cache2 = (CacheSPI) new
UnitTestCacheManager(conf2).createCache("testCache");
-
-
- cache1.start();
- cache2.start();
+ cache1 = caches.get(0);
+ cache2 = caches.get(1);
}
- void destroyCaches() throws Exception {
- TestingUtil.killCaches(cache1, cache2);
- cache1 = null;
- cache2 = null;
+ private TransactionManager beginTransaction() throws SystemException,
NotSupportedException {
+ TransactionManager mgr = TestingUtil.getTransactionManager(cache1);
+ mgr.begin();
+ return mgr;
}
public void testNonSerializableRepl() throws Exception {
try {
- initCaches(Configuration.CacheMode.REPL_SYNC);
-
cache1.put("test", new ContainerData());
// We should not come here.
@@ -106,8 +73,6 @@
TransactionManager tm;
try {
- initCaches(Configuration.CacheMode.REPL_SYNC);
-
tm = beginTransaction();
cache1.put("test", new ContainerData());
tm.commit();
Modified:
core/branches/flat/src/test/java/org/horizon/replication/SyncCacheListenerTest.java
===================================================================
---
core/branches/flat/src/test/java/org/horizon/replication/SyncCacheListenerTest.java 2009-01-28
18:27:32 UTC (rev 7607)
+++
core/branches/flat/src/test/java/org/horizon/replication/SyncCacheListenerTest.java 2009-01-28
23:43:56 UTC (rev 7608)
@@ -7,62 +7,40 @@
*/
package org.horizon.replication;
+import org.horizon.BaseClusteredTest;
+import org.horizon.Cache;
import org.horizon.CacheException;
-import org.horizon.CacheSPI;
-import org.horizon.UnitTestCacheManager;
import org.horizon.config.Configuration;
import org.horizon.lock.IsolationLevel;
import org.horizon.logging.Log;
import org.horizon.logging.LogFactory;
import org.horizon.notifications.annotation.CacheEntryModified;
import org.horizon.notifications.annotation.CacheEntryRemoved;
+import org.horizon.notifications.annotation.Listener;
import org.horizon.notifications.event.Event;
import org.horizon.notifications.event.TransactionalEvent;
-import org.horizon.transaction.DummyTransactionManager;
import org.horizon.transaction.DummyTransactionManagerLookup;
import org.horizon.util.TestingUtil;
import static org.testng.AssertJUnit.*;
-import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
+import java.util.List;
import java.util.Map;
import java.util.Set;
/**
- * Test out the TreeCacheListener
+ * Test out the CacheListener
*/
-@Test(groups = {"functional"}, sequential = true)
-public class SyncCacheListenerTest {
- private CacheSPI<Object, Object> cache1, cache2;
- private final static Log log_ = LogFactory.getLog(SyncCacheListenerTest.class);
+@Test(groups = "functional", sequential = true)
+public class SyncCacheListenerTest extends BaseClusteredTest {
+ private Cache<Object, Object> cache1, cache2;
+ private final static Log log = LogFactory.getLog(SyncCacheListenerTest.class);
@BeforeMethod(alwaysRun = true)
public void setUp() throws Exception {
- System.out.println("*** starting setUp()");
- initCaches();
- System.out.println("*** finished setUp()");
- }
-
- @AfterMethod(alwaysRun = true)
- public void tearDown() throws Exception {
- System.out.println("*** starting tearDown()");
- // We just can't kill DummyTransactionManager. We are sharing single instance
in more tests.
- TestingUtil.killTransaction(DummyTransactionManager.getInstance());
- destroyCaches();
- /*
- if (old_factory != null)
- {
- System.setProperty(Context.INITIAL_CONTEXT_FACTORY, old_factory);
- old_factory = null;
- }
- */
- System.out.println("*** finished tearDown()");
- }
-
- private void initCaches() {
Configuration conf = new Configuration();
conf.setSyncCommitPhase(true);
conf.setCacheMode(Configuration.CacheMode.REPL_SYNC);
@@ -70,25 +48,21 @@
conf.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
conf.setLockAcquisitionTimeout(5000);
- cache1 = (CacheSPI) new
UnitTestCacheManager(conf).createCache("firstCache");
- cache2 = (CacheSPI) new
UnitTestCacheManager(conf).createCache("firstCache");
- TestingUtil.blockUntilViewReceived(cache2, 2, 1000);
- }
+ List<Cache> caches = createClusteredCaches(2, "cache", conf);
- private void destroyCaches() {
- TestingUtil.killCaches(cache1, cache2);
- cache1 = null;
- cache2 = null;
+ cache1 = caches.get(0);
+ cache2 = caches.get(1);
+ TestingUtil.blockUntilViewsReceived(10000, cache1, cache2);
}
public void testSyncTxRepl() throws Exception {
Integer age;
- TransactionManager tm = cache1.getTransactionManager();
+ TransactionManager tm = TestingUtil.getTransactionManager(cache1);
tm.begin();
Transaction tx = tm.getTransaction();
- Listener lis = new Listener();
- cache1.getNotifier().addListener(lis);
+ LocalListener lis = new LocalListener();
+ cache1.addListener(lis);
lis.put("age", 38);
tm.suspend();
@@ -105,7 +79,7 @@
public void testRemoteCacheListener() throws Exception {
Integer age;
RemoteListener lis = new RemoteListener();
- cache2.getNotifier().addListener(lis);
+ cache2.addListener(lis);
cache1.put("age", 38);
// value on cache2 must be 38
@@ -117,7 +91,7 @@
public void testSyncRepl() throws Exception {
Integer age;
- Listener lis = new Listener();
+ LocalListener lis = new LocalListener();
cache1.addListener(lis);
lis.put("age", 38);
@@ -143,7 +117,7 @@
TransactionManager tm =
cache1.getConfiguration().getRuntimeConfig().getTransactionManager();
tm.begin();
Transaction tx = tm.getTransaction();
- Listener lis = new Listener();
+ LocalListener lis = new LocalListener();
cache1.put("age", 38);
lis.put("name", "Ben");
@@ -167,8 +141,8 @@
public void testSyncReplMap() throws Exception {
Integer age;
- Listener lis = new Listener();
- cache1.getNotifier().addListener(lis);
+ LocalListener lis = new LocalListener();
+ cache1.addListener(lis);
lis.put("age", 38);
cache1.put("name", "Ben");
@@ -178,8 +152,8 @@
assertTrue("\"age\" must be 38", age == 38);
}
- @org.horizon.notifications.annotation.Listener
- public class Listener {
+ @Listener
+ public class LocalListener {
Object key = null;
public void put(Object key, Object val) {
@@ -198,7 +172,7 @@
@CacheEntryModified
public void nodeModified(Event ne) {
if (!ne.isPre()) {
- log_.debug("nodeModified visited with fqn: " + key);
+ log.debug("nodeModified visited with fqn: " + key);
try {
// test out if we can get the read lock since there is a write lock going
as well.
cache1.get(key);
@@ -212,14 +186,14 @@
}
- @org.horizon.notifications.annotation.Listener
+ @Listener
public class RemoteListener {
@CacheEntryRemoved
@CacheEntryModified
public void callback(TransactionalEvent e) {
System.out.println("Callback got event " + e);
- log_.debug("Callback got event " + e);
+ log.debug("Callback got event " + e);
assertFalse("node was removed on remote cache so isLocal should be
false", e.isOriginLocal());
}
}
Modified: core/branches/flat/src/test/java/org/horizon/replication/SyncReplTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/replication/SyncReplTest.java 2009-01-28
18:27:32 UTC (rev 7607)
+++ core/branches/flat/src/test/java/org/horizon/replication/SyncReplTest.java 2009-01-28
23:43:56 UTC (rev 7608)
@@ -44,8 +44,7 @@
Configuration replSync = new Configuration();
replSync.setCacheMode(Configuration.CacheMode.REPL_SYNC);
- cm1.defineCache("replSync", replSync);
- cm2.defineCache("replSync", replSync);
+ defineCacheOnAllManagers("replSync", replSync);
cache1 = cm1.getCache("replSync");
cache2 = cm2.getCache("replSync");