Author: manik.surtani(a)jboss.com
Date: 2007-09-19 13:05:59 -0400 (Wed, 19 Sep 2007)
New Revision: 4488
Modified:
core/trunk/src/main/java/org/jboss/cache/CacheImpl.java
core/trunk/src/test/java/org/jboss/cache/LifeCycleTest.java
Log:
JBCACHE-1179
Modified: core/trunk/src/main/java/org/jboss/cache/CacheImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/CacheImpl.java 2007-09-19 15:23:19 UTC (rev
4487)
+++ core/trunk/src/main/java/org/jboss/cache/CacheImpl.java 2007-09-19 17:05:59 UTC (rev
4488)
@@ -3387,8 +3387,9 @@
{
if (channel != null)
{
+ channel.disconnect();
channel.close();
- channel.disconnect();
+// channel.disconnect();
}
}
@@ -3952,6 +3953,20 @@
}
+ private void blockUntilCacheStarts() throws InterruptedException
+ {
+ int pollFrequencyMS = 100;
+ long startupWaitTime = configuration.getStateRetrievalTimeout();
+ long giveUpTime = System.currentTimeMillis() + startupWaitTime;
+
+ while (System.currentTimeMillis() < giveUpTime)
+ {
+ if (cacheStatus.allowInvocations()) break;
+ Thread.sleep(pollFrequencyMS);
+ }
+
+ }
+
/**
* Invokes a method against this object. Contains the logger_ic for handling
* the various use cases, e.g. mode (local, repl_async, repl_sync),
@@ -3976,8 +3991,25 @@
}
else
{
- log.warn("Received a remote call but the cache is not in STARTED state -
ignoring call.");
- return null;
+ if (getCacheStatus() == CacheStatus.STARTING)
+ {
+ try
+ {
+ blockUntilCacheStarts();
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
+
+ // if the cache STILL can't take invocations...
+ if (!cacheStatus.allowInvocations()) throw new
IllegalStateException("Cache not in STARTED state!");
+ }
+ else
+ {
+ log.warn("Received a remote call but the cache is not in STARTED
state - ignoring call.");
+ return null;
+ }
}
}
Modified: core/trunk/src/test/java/org/jboss/cache/LifeCycleTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/LifeCycleTest.java 2007-09-19 15:23:19 UTC
(rev 4487)
+++ core/trunk/src/test/java/org/jboss/cache/LifeCycleTest.java 2007-09-19 17:05:59 UTC
(rev 4488)
@@ -19,6 +19,7 @@
import org.jboss.cache.transaction.DummyTransactionManager;
import org.jboss.cache.misc.TestingUtil;
import org.testng.annotations.Test;
+import org.testng.annotations.AfterMethod;
import java.util.List;
import java.util.LinkedList;
@@ -29,294 +30,338 @@
* @author Bela Ban
* @version $Id$
*/
+@Test(groups = {"functional"})
public class LifeCycleTest
{
-
private static Log log = LogFactory.getLog(LifeCycleTest.class);
+ private CacheImpl[] c;
- @Test(groups = {"functional"})
+ @AfterMethod
+ public void tearDown()
+ {
+ if (c != null)
+ {
+ for (Cache cache : c)
+ {
+ if (cache != null)
+ {
+ try
+ {
+ cache.stop();
+ }
+ catch (Exception e)
+ {
+ // do nothing
+ }
+ }
+ }
+ }
+ c = null;
+ }
+
+ private void createAndRegisterCache(Configuration.CacheMode mode, boolean start)
throws Exception
+ {
+ Cache cache = createCache(mode);
+ List<Cache> caches = new LinkedList<Cache>();
+ if (c != null) for (Cache existingCache : c) caches.add(existingCache);
+ caches.add(cache);
+ c = caches.toArray(new CacheImpl[]{});
+ if (start)
+ {
+ cache.start();
+ if (c.length > 1) TestingUtil.blockUntilViewsReceived(c, 10000);
+ }
+ }
+
+
public void testLocalRestartNoTransactions() throws Exception
{
- CacheImpl cache = createCache(Configuration.CacheMode.LOCAL);
- cache.create();
- cache.start();
+ createAndRegisterCache(Configuration.CacheMode.LOCAL, true);
- cache.put("/a/b/c", null);
- assertTrue(cache.getNumberOfNodes() > 0);
- assertEquals(0, cache.getNumberOfLocksHeld());
+ c[0].put("/a/b/c", null);
+ assertTrue(c[0].getNumberOfNodes() > 0);
+ assertEquals(0, c[0].getNumberOfLocksHeld());
- System.out.println("cache locks before restart:\n" +
cache.printLockInfo());
- restartCache(cache);
- System.out.println("cache locks after restart:\n" +
cache.printLockInfo());
+ System.out.println("cache locks before restart:\n" +
c[0].printLockInfo());
+ restartCache(c[0]);
+ System.out.println("cache locks after restart:\n" +
c[0].printLockInfo());
- assertEquals(0, cache.getNumberOfNodes());
- assertEquals(0, cache.getNumberOfLocksHeld());
+ assertEquals(0, c[0].getNumberOfNodes());
+ assertEquals(0, c[0].getNumberOfLocksHeld());
}
- @Test(groups = {"functional"})
public void testLocalRestartWithTransactions() throws Exception
{
- CacheImpl cache = createCache(Configuration.CacheMode.LOCAL);
- cache.create();
- cache.start();
+ createAndRegisterCache(Configuration.CacheMode.LOCAL, true);
Transaction tx = beginTransaction();
- cache.put("/a/b/c", null);
- log.debug("cache locks before restart:\n" + cache.printLockInfo());
- assertTrue(cache.getNumberOfNodes() > 0);
- assertEquals(4, cache.getNumberOfLocksHeld());
+ c[0].put("/a/b/c", null);
+ log.debug("cache locks before restart:\n" + c[0].printLockInfo());
+ assertTrue(c[0].getNumberOfNodes() > 0);
+ assertEquals(4, c[0].getNumberOfLocksHeld());
- restartCache(cache);
- log.debug("cache locks after restart:\n" + cache.printLockInfo());
+ restartCache(c[0]);
+ log.debug("cache locks after restart:\n" + c[0].printLockInfo());
//assertEquals(4, cache.getNumberOfLocksHeld());
- assertEquals(0, cache.getNumberOfNodes());
+ assertEquals(0, c[0].getNumberOfNodes());
tx.rollback();
- assertEquals(0, cache.getNumberOfLocksHeld());
+ assertEquals(0, c[0].getNumberOfLocksHeld());
}
- @Test(groups = {"functional"})
public void testStartNoCreate() throws Exception
{
- CacheImpl cache = createCache(Configuration.CacheMode.LOCAL);
- cache.start();
+ createAndRegisterCache(Configuration.CacheMode.LOCAL, false);
+ c[0].start();
- cache.put("/a/b/c", null);
- assertTrue(cache.getNumberOfNodes() > 0);
- assertEquals(0, cache.getNumberOfLocksHeld());
+ c[0].put("/a/b/c", null);
+ assertTrue(c[0].getNumberOfNodes() > 0);
+ assertEquals(0, c[0].getNumberOfLocksHeld());
- System.out.println("cache locks before restart:\n" +
cache.printLockInfo());
- restartCache(cache);
- System.out.println("cache locks after restart:\n" +
cache.printLockInfo());
+ System.out.println("cache locks before restart:\n" +
c[0].printLockInfo());
+ restartCache(c[0]);
+ System.out.println("cache locks after restart:\n" +
c[0].printLockInfo());
- assertEquals(0, cache.getNumberOfNodes());
- assertEquals(0, cache.getNumberOfLocksHeld());
+ assertEquals(0, c[0].getNumberOfNodes());
+ assertEquals(0, c[0].getNumberOfLocksHeld());
}
- @Test(groups = {"functional"})
public void testReStartNoCreate() throws Exception
{
- CacheImpl cache = createCache(Configuration.CacheMode.LOCAL);
- cache.start();
- cache.stop();
- cache.start();
+ createAndRegisterCache(Configuration.CacheMode.LOCAL, false);
+ c[0].start();
+ c[0].stop();
+ c[0].start();
- cache.put("/a/b/c", null);
- assertTrue(cache.getNumberOfNodes() > 0);
- assertEquals(0, cache.getNumberOfLocksHeld());
+ c[0].put("/a/b/c", null);
+ assertTrue(c[0].getNumberOfNodes() > 0);
+ assertEquals(0, c[0].getNumberOfLocksHeld());
- System.out.println("cache locks before restart:\n" +
cache.printLockInfo());
- restartCache(cache);
- System.out.println("cache locks after restart:\n" +
cache.printLockInfo());
+ System.out.println("cache locks before restart:\n" +
c[0].printLockInfo());
+ restartCache(c[0]);
+ System.out.println("cache locks after restart:\n" +
c[0].printLockInfo());
- assertEquals(0, cache.getNumberOfNodes());
- assertEquals(0, cache.getNumberOfLocksHeld());
+ assertEquals(0, c[0].getNumberOfNodes());
+ assertEquals(0, c[0].getNumberOfLocksHeld());
}
- @Test(groups = {"functional"})
public void testDuplicateInvocation() throws Exception
{
- CacheImpl cache = createCache(Configuration.CacheMode.LOCAL);
- cache.create();
- cache.start();
- cache.create();
- cache.start();
+ createAndRegisterCache(Configuration.CacheMode.LOCAL, false);
+ c[0].create();
+ c[0].start();
+ c[0].create();
+ c[0].start();
- cache.put("/a/b/c", null);
- assertTrue(cache.getNumberOfNodes() > 0);
- assertEquals(0, cache.getNumberOfLocksHeld());
+ c[0].put("/a/b/c", null);
+ assertTrue(c[0].getNumberOfNodes() > 0);
+ assertEquals(0, c[0].getNumberOfLocksHeld());
- System.out.println("cache locks before restart:\n" +
cache.printLockInfo());
- restartCache(cache);
- System.out.println("cache locks after restart:\n" +
cache.printLockInfo());
+ System.out.println("cache locks before restart:\n" +
c[0].printLockInfo());
+ restartCache(c[0]);
+ System.out.println("cache locks after restart:\n" +
c[0].printLockInfo());
- assertEquals(0, cache.getNumberOfNodes());
- assertEquals(0, cache.getNumberOfLocksHeld());
+ assertEquals(0, c[0].getNumberOfNodes());
+ assertEquals(0, c[0].getNumberOfLocksHeld());
- cache.stop();
- cache.destroy();
- cache.stop();
- cache.destroy();
+ c[0].stop();
+ c[0].destroy();
+ c[0].stop();
+ c[0].destroy();
}
- @Test(groups = {"functional"})
public void testFailedStart() throws Exception
{
- CacheImpl cache = createCache(Configuration.CacheMode.LOCAL);
- assertEquals("Correct state", CacheStatus.INSTANTIATED,
cache.getCacheStatus());
+ createAndRegisterCache(Configuration.CacheMode.LOCAL, false);
+ assertEquals("Correct state", CacheStatus.INSTANTIATED,
c[0].getCacheStatus());
DisruptLifecycleListener listener = new DisruptLifecycleListener();
- cache.addCacheListener(listener);
+ c[0].addCacheListener(listener);
- cache.create();
+ c[0].create();
listener.disrupt = true;
- assertEquals("Correct state", CacheStatus.CREATED,
cache.getCacheStatus());
+ assertEquals("Correct state", CacheStatus.CREATED,
c[0].getCacheStatus());
try
{
- cache.start();
+ c[0].start();
fail("Listener did not prevent start");
}
catch (CacheException good)
{
}
- assertEquals("Correct state", CacheStatus.FAILED,
cache.getCacheStatus());
+ assertEquals("Correct state", CacheStatus.FAILED,
c[0].getCacheStatus());
- cache.addCacheListener(listener);
+ c[0].addCacheListener(listener);
listener.disrupt = false;
- cache.start();
+ c[0].start();
- assertEquals("Correct state", CacheStatus.STARTED,
cache.getCacheStatus());
+ assertEquals("Correct state", CacheStatus.STARTED,
c[0].getCacheStatus());
- cache.put("/a/b/c", null);
- assertTrue(cache.getNumberOfNodes() > 0);
- assertEquals(0, cache.getNumberOfLocksHeld());
+ c[0].put("/a/b/c", null);
+ assertTrue(c[0].getNumberOfNodes() > 0);
+ assertEquals(0, c[0].getNumberOfLocksHeld());
listener.disrupt = true;
- cache.addCacheListener(listener);
+ c[0].addCacheListener(listener);
try
{
- cache.stop();
+ c[0].stop();
fail("Listener did not prevent stop");
}
catch (CacheException good)
{
}
- assertEquals("Correct state", CacheStatus.FAILED,
cache.getCacheStatus());
+ assertEquals("Correct state", CacheStatus.FAILED,
c[0].getCacheStatus());
listener.disrupt = false;
- cache.stop();
- assertEquals("Correct state", CacheStatus.STOPPED,
cache.getCacheStatus());
- cache.destroy();
- assertEquals("Correct state", CacheStatus.DESTROYED,
cache.getCacheStatus());
+ c[0].stop();
+ assertEquals("Correct state", CacheStatus.STOPPED,
c[0].getCacheStatus());
+ c[0].destroy();
+ assertEquals("Correct state", CacheStatus.DESTROYED,
c[0].getCacheStatus());
}
- @Test(groups = {"functional"})
public void testInvalidStateInvocations() throws Exception
{
- Cache<Object, Object> c = createCache(Configuration.CacheMode.LOCAL);
+ createAndRegisterCache(Configuration.CacheMode.LOCAL, false);
try
{
- try
- {
- c.get(Fqn.ROOT, "k");
- fail("Cache isn't ready!");
- }
- catch (IllegalStateException good)
- {
- }
+ c[0].get(Fqn.ROOT, "k");
+ fail("Cache isn't ready!");
+ }
+ catch (IllegalStateException good)
+ {
+ }
- c.create();
- try
- {
- c.get(Fqn.ROOT, "k");
- fail("Cache isn't ready!");
- }
- catch (IllegalStateException good)
- {
- }
+ c[0].create();
+ try
+ {
+ c[0].get(Fqn.ROOT, "k");
+ fail("Cache isn't ready!");
+ }
+ catch (IllegalStateException good)
+ {
+ }
- c.start();
- c.get(Fqn.ROOT, "k"); // should work
+ c[0].start();
+ c[0].get(Fqn.ROOT, "k"); // should work
- c.stop();
+ c[0].stop();
- try
- {
- c.get(Fqn.ROOT, "k");
- fail("Cache isn't ready!");
- }
- catch (IllegalStateException good)
- {
- }
+ try
+ {
+ c[0].get(Fqn.ROOT, "k");
+ fail("Cache isn't ready!");
+ }
+ catch (IllegalStateException good)
+ {
+ }
- c.destroy();
- try
- {
- c.get(Fqn.ROOT, "k");
- fail("Cache isn't ready!");
- }
- catch (IllegalStateException good)
- {
- }
+ c[0].destroy();
+ try
+ {
+ c[0].get(Fqn.ROOT, "k");
+ fail("Cache isn't ready!");
}
- finally
+ catch (IllegalStateException good)
{
- c.stop();
}
}
- @Test(groups = {"functional"})
public void testRemoteInvalidStateInvocations() throws Exception
{
- CacheImpl<Object, Object> c1 =
createCache(Configuration.CacheMode.REPL_SYNC);
- CacheImpl<Object, Object> c2 =
createCache(Configuration.CacheMode.REPL_SYNC);
+ createAndRegisterCache(Configuration.CacheMode.REPL_SYNC, true);
+ createAndRegisterCache(Configuration.CacheMode.REPL_SYNC, true);
try
{
- // need to start them both first to ensure they see each other
- c1.start();
- c2.start();
+ // now DIRECTLY change the status of c2.
+ // emulate the race condition where the remote cache is stopping but hasn't
disconnected from the channel.
+ c[1].cacheStatus = CacheStatus.STOPPING;
+ // Thanks to JBCACHE-1179, this should only log a warning and not throw an
exception
+ c[0].put(Fqn.ROOT, "k", "v");
+ }
+ finally
+ {
+ // reset c[1] to running so the tearDown method can clean it up
+ c[1].cacheStatus = CacheStatus.STARTED;
+ }
+ }
+
+ public void testRemoteInvalidStateInvocations2() throws Exception
+ {
+ createAndRegisterCache(Configuration.CacheMode.REPL_SYNC, true);
+ createAndRegisterCache(Configuration.CacheMode.REPL_SYNC, true);
+ try
+ {
// now DIRECTLY change the status of c2.
// emulate the race condition where the remote cache is stopping but hasn't
disconnected from the channel.
- c2.cacheStatus = CacheStatus.STOPPING;
+ c[1].cacheStatus = CacheStatus.STARTING;
try
{
- c1.put(Fqn.ROOT, "k", "v");
- fail("Cache isn't ready!");
+ // This call should wait for up to StateRetrievalTimeout secs or until c[1]
has entered the STARTED state, and then barf.
+ c[0].put(Fqn.ROOT, "k", "v");
+ fail("Should barf!");
}
catch (IllegalStateException good)
{
+
}
+
+ // now kick off another thread to sleep for a few secs and then set c[1] to
STARTED
+ final int sleepTime = 500;
+ new Thread()
+ {
+ public void run()
+ {
+ TestingUtil.sleepThread(sleepTime);
+ c[1].cacheStatus = CacheStatus.STARTED;
+ }
+ }.start();
+
+ // should succeed but should take at least 1000ms.
+ long startTime = System.currentTimeMillis();
+ c[0].put(Fqn.ROOT, "k", "v");
+ assert System.currentTimeMillis() > (startTime + sleepTime) : "Should
wait till c[1] has STARTED state";
+
}
finally
{
- c1.stop();
- c2.stop();
+ // reset c[1] to running so the tearDown method can clean it up
+ c[1].cacheStatus = CacheStatus.STARTED;
}
}
- @Test(groups = {"functional"})
public void testInvalidStateTxCommit() throws Exception
{
- CacheImpl<Object, Object> c = createCache(Configuration.CacheMode.LOCAL);
+ createAndRegisterCache(Configuration.CacheMode.LOCAL, true);
+ c[0].getTransactionManager().begin();
+ c[0].put(Fqn.ROOT, "k1", "v1");
+ c[0].put(Fqn.ROOT, "k2", "v2");
+
+ // now DIRECTLY change the status of c.
+ c[0].cacheStatus = CacheStatus.STOPPING;
+
try
{
- c.start();
-
- c.getTransactionManager().begin();
- c.put(Fqn.ROOT, "k1", "v1");
- c.put(Fqn.ROOT, "k2", "v2");
-
- // now DIRECTLY change the status of c.
- c.cacheStatus = CacheStatus.STOPPING;
-
- try
- {
- c.getTransactionManager().commit();
- fail("Cache isn't STARTED!");
- }
- catch (RollbackException good)
- {
- }
+ c[0].getTransactionManager().commit();
+ fail("Cache isn't STARTED!");
}
- finally
+ catch (RollbackException good)
{
- c.stop();
}
}
- @Test(groups = {"functional"})
@SuppressWarnings("unchecked")
public void testStopInstanceWhileOtherInstanceSends() throws Exception
{
@@ -325,84 +370,64 @@
final List<Exception> exceptions = new LinkedList<Exception>();
running.add(true);
- final CacheImpl <Object, Object> c[] = new CacheImpl[2];
- try
- {
- c[0] = createCache(Configuration.CacheMode.REPL_SYNC);
- c[1] = createCache(Configuration.CacheMode.REPL_SYNC);
- c[0].start();
- c[1].start();
- TestingUtil.blockUntilViewsReceived(5000, c[0], c[1]);
+ createAndRegisterCache(Configuration.CacheMode.REPL_SYNC, true);
+ createAndRegisterCache(Configuration.CacheMode.REPL_SYNC, true);
- c[0].put(fqn, "k", "v");
+ c[0].put(fqn, "k", "v");
- assert "v".equals(c[0].get(fqn, "k"));
- assert "v".equals(c[1].get(fqn, "k"));
+ assert "v".equals(c[0].get(fqn, "k"));
+ assert "v".equals(c[1].get(fqn, "k"));
- // now kick start a thread on c[1] that will constantly update the fqn
+ // now kick start a thread on c[1] that will constantly update the fqn
- Thread updater = new Thread()
+ Thread updater = new Thread()
+ {
+ public void run()
{
- public void run()
+ int i=0;
+ while (running.get(0))
{
- int i=0;
- while (running.get(0))
+ try
{
- try
- {
- i++;
- c[1].put(fqn, "k", "v" + i);
- }
- catch (Exception e)
- {
- exceptions.add(e);
- }
- TestingUtil.sleepThread(20);
+ i++;
+ if (running.get(0)) c[1].put(fqn, "k", "v" + i);
+ }
+ catch (ReplicationException re)
+ {
+ // this sometimes happens when JGroups suspects the remote node. This
is ok, as long as we don't get an ISE.
+ }
+ catch (Exception e)
+ {
+ exceptions.add(e);
+ }
+ TestingUtil.sleepThread(20);
- }
}
- };
+ }
+ };
- updater.start();
+ updater.start();
- c[0].stop();
- running.add(false);
- running.remove(true);
- updater.join();
+ c[0].stop();
+ running.add(false);
+ running.remove(true);
+ updater.join();
- for (Exception e : exceptions) throw e;
- }
- finally
- {
- for (CacheImpl ci : c)
- {
- if (ci != null) ci.stop();
- }
- }
+ for (Exception e : exceptions) throw e;
}
- @Test(groups = {"functional"})
public void testInvalidStateTxRollback() throws Exception
{
- CacheImpl<Object, Object> c = createCache(Configuration.CacheMode.LOCAL);
- try
- {
- c.start();
+ createAndRegisterCache(Configuration.CacheMode.LOCAL, true);
+ c[0].getTransactionManager().begin();
+ c[0].put(Fqn.ROOT, "k1", "v1");
+ c[0].put(Fqn.ROOT, "k2", "v2");
- c.getTransactionManager().begin();
- c.put(Fqn.ROOT, "k1", "v1");
- c.put(Fqn.ROOT, "k2", "v2");
+ // now DIRECTLY change the status of c.
+ c[0].cacheStatus = CacheStatus.STOPPING;
- // now DIRECTLY change the status of c.
- c.cacheStatus = CacheStatus.STOPPING;
-
- // rollbacks should just log a message
- c.getTransactionManager().rollback();
- }
- finally
- {
- c.stop();
- }
+ // rollbacks should just log a message
+ c[0].getTransactionManager().rollback();
}
@@ -419,8 +444,7 @@
{
DummyTransactionManager mgr = DummyTransactionManager.getInstance();
mgr.begin();
- Transaction tx = mgr.getTransaction();
- return tx;
+ return mgr.getTransaction();
}