Author: manik.surtani(a)jboss.com
Date: 2009-08-06 07:21:03 -0400 (Thu, 06 Aug 2009)
New Revision: 8162
Added:
core/trunk/src/main/java/org/jboss/cache/invocation/CacheNotReadyException.java
core/trunk/src/test/java/org/jboss/cache/buddyreplication/GravitationFromDyingNodeTest.java
Modified:
core/trunk/src/main/java/org/jboss/cache/factories/ComponentRegistry.java
core/trunk/src/main/java/org/jboss/cache/invocation/CacheInvocationDelegate.java
core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java
core/trunk/src/main/java/org/jboss/cache/notifications/NotifierImpl.java
core/trunk/src/main/java/org/jboss/cache/util/CachePrinter.java
core/trunk/src/test/java/org/jboss/cache/notifications/NotifierTest.java
Log:
JBCACHE-1528 - Data gravitation prefers exception response over valid gravitation
Modified: core/trunk/src/main/java/org/jboss/cache/factories/ComponentRegistry.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/factories/ComponentRegistry.java 2009-08-06
10:10:12 UTC (rev 8161)
+++ core/trunk/src/main/java/org/jboss/cache/factories/ComponentRegistry.java 2009-08-06
11:21:03 UTC (rev 8162)
@@ -872,16 +872,16 @@
* this method will block for up to {@link
org.jboss.cache.config.Configuration#getStateRetrievalTimeout()} millis, checking
* for a valid state.
*
- * @param originLocal true if the call originates locally (i.e., from the {@link
org.jboss.cache.invocation.CacheInvocationDelegate} or false if it originates remotely,
i.e., from the {@link org.jboss.cache.marshall.CommandAwareRpcDispatcher}.
+ * @param SkipBlockUntilStart true if the call should not wait for the cache to start
(typical local use) and false if we should try and wait (typical remote lookup use)
* @return true if invocations are allowed, false otherwise.
*/
- public boolean invocationsAllowed(boolean originLocal)
+ public boolean invocationsAllowed(boolean SkipBlockUntilStart)
{
if (trace) log.trace("Testing if invocations are allowed.");
if (state.allowInvocations()) return true;
// if this is a locally originating call and the cache is not in a valid state,
return false.
- if (originLocal) return false;
+ if (SkipBlockUntilStart) return false;
if (trace) log.trace("Is remotely originating.");
Modified:
core/trunk/src/main/java/org/jboss/cache/invocation/CacheInvocationDelegate.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/invocation/CacheInvocationDelegate.java 2009-08-06
10:10:12 UTC (rev 8161)
+++
core/trunk/src/main/java/org/jboss/cache/invocation/CacheInvocationDelegate.java 2009-08-06
11:21:03 UTC (rev 8162)
@@ -690,9 +690,9 @@
protected void cacheStatusCheck(InvocationContext ctx)
{
assertIsConstructed();
- if (!ctx.getOptionOverrides().isSkipCacheStatusCheck() &&
!componentRegistry.invocationsAllowed(ctx.isOriginLocal()))
+ if (!ctx.getOptionOverrides().isSkipCacheStatusCheck() &&
!componentRegistry.invocationsAllowed(true)) // this should always be true since we should
never wait for the cache to start at this stage
{
- throw new IllegalStateException("Cache not in STARTED state!");
+ throw new CacheNotReadyException("Cache not in a valid STARTED state!
Cache state is " + componentRegistry.getState());
}
}
@@ -704,16 +704,4 @@
command.setErase(erase);
invoker.invoke(ctx, command);
}
-
-
- // TODO: Add these to the public interface in 3.1.0.
- public void setData(Fqn fqn, Map<? extends K, ? extends V> data)
- {
- invokePut(fqn, data, true);
- }
-
- public void setData(String fqn, Map<? extends K, ? extends V> data)
- {
- setData(Fqn.fromString(fqn), data);
- }
}
Added: core/trunk/src/main/java/org/jboss/cache/invocation/CacheNotReadyException.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/invocation/CacheNotReadyException.java
(rev 0)
+++
core/trunk/src/main/java/org/jboss/cache/invocation/CacheNotReadyException.java 2009-08-06
11:21:03 UTC (rev 8162)
@@ -0,0 +1,24 @@
+package org.jboss.cache.invocation;
+
+/**
+ * Thrown when a cache is in an invalid state
+ *
+ * @author Manik Surtani
+ * @since 3.2
+ */
+public class CacheNotReadyException extends IllegalStateException {
+ public CacheNotReadyException() {
+ }
+
+ public CacheNotReadyException(String s) {
+ super(s);
+ }
+
+ public CacheNotReadyException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public CacheNotReadyException(Throwable cause) {
+ super(cause);
+ }
+}
Property changes on:
core/trunk/src/main/java/org/jboss/cache/invocation/CacheNotReadyException.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Modified:
core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java 2009-08-06
10:10:12 UTC (rev 8161)
+++
core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java 2009-08-06
11:21:03 UTC (rev 8162)
@@ -36,6 +36,7 @@
import org.jboss.cache.factories.ComponentRegistry;
import org.jboss.cache.interceptors.InterceptorChain;
import org.jboss.cache.invocation.InvocationContextContainer;
+import org.jboss.cache.invocation.CacheNotReadyException;
import org.jboss.cache.util.concurrent.BoundedExecutors;
import org.jboss.cache.util.concurrent.WithinThreadExecutor;
import org.jgroups.Address;
@@ -294,7 +295,17 @@
{
return new RequestIgnoredResponse();
}
- ret = interceptorChain.invoke(ctx, (VisitableCommand) cmd);
+ try
+ {
+ ret = interceptorChain.invoke(ctx, (VisitableCommand) cmd);
+ }
+ catch (CacheNotReadyException cnre)
+ {
+ // this could happen, even though we check the state earlier on. There is
still a window
+ // for the cache to be shut down in the meanwhile.
+ if (log.isDebugEnabled()) log.debug("Cache not in a state to
respond!", cnre);
+ return new RequestIgnoredResponse();
+ }
}
else
{
Modified: core/trunk/src/main/java/org/jboss/cache/notifications/NotifierImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/notifications/NotifierImpl.java 2009-08-06
10:10:12 UTC (rev 8161)
+++ core/trunk/src/main/java/org/jboss/cache/notifications/NotifierImpl.java 2009-08-06
11:21:03 UTC (rev 8162)
@@ -527,18 +527,36 @@
/**
* Notifies all registered listeners of a cacheStopped event.
*/
- @Stop(priority = 98)
- public void notifyCacheStopped()
+ @Stop(priority = 999)
+ public void notifyCacheStoppedPost()
{
if (!cacheStoppedListeners.isEmpty())
{
EventImpl e = new EventImpl();
e.setCache(cache);
e.setType(CACHE_STOPPED);
+ e.setPre(false);
for (ListenerInvocation listener : cacheStoppedListeners) listener.invoke(e);
}
}
+/**
+ * Notifies all registered listeners of a cacheStopped event.
+ */
+ @Stop(priority = 0)
+ public void notifyCacheStoppedPre()
+ {
+ if (!cacheStoppedListeners.isEmpty())
+ {
+ EventImpl e = new EventImpl();
+ e.setCache(cache);
+ e.setType(CACHE_STOPPED);
+ e.setPre(true);
+ for (ListenerInvocation listener : cacheStoppedListeners) listener.invoke(e);
+ }
+ }
+
+
public void notifyViewChange(final View newView, InvocationContext ctx)
{
if (!viewChangedListeners.isEmpty())
Modified: core/trunk/src/main/java/org/jboss/cache/util/CachePrinter.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/util/CachePrinter.java 2009-08-06 10:10:12
UTC (rev 8161)
+++ core/trunk/src/main/java/org/jboss/cache/util/CachePrinter.java 2009-08-06 11:21:03
UTC (rev 8162)
@@ -44,11 +44,19 @@
* @param c cache to print
* @return a String representation of the cache
*/
- public static String printCacheDetails(Cache c)
+ public static String printCacheDetails(Cache... c)
{
- // internal cast
- DataContainer ci = ((CacheInvocationDelegate) c).getDataContainer();
- return ci.printDetails();
+ StringBuilder sb = new StringBuilder();
+ int i=1;
+ for (Cache cache: c)
+ {
+ // internal cast
+ sb.append("\n--- Cache").append(i++).append(" ---\n");
+ DataContainer ci = ((CacheInvocationDelegate) cache).getDataContainer();
+ sb.append(ci.printDetails());
+ sb.append("\n------------\n\n");
+ }
+ return sb.toString();
}
/**
Added:
core/trunk/src/test/java/org/jboss/cache/buddyreplication/GravitationFromDyingNodeTest.java
===================================================================
---
core/trunk/src/test/java/org/jboss/cache/buddyreplication/GravitationFromDyingNodeTest.java
(rev 0)
+++
core/trunk/src/test/java/org/jboss/cache/buddyreplication/GravitationFromDyingNodeTest.java 2009-08-06
11:21:03 UTC (rev 8162)
@@ -0,0 +1,116 @@
+package org.jboss.cache.buddyreplication;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.cache.CacheSPI;
+import org.jboss.cache.notifications.annotation.CacheListener;
+import org.jboss.cache.notifications.annotation.CacheStopped;
+import org.jboss.cache.notifications.annotation.NodeVisited;
+import org.jboss.cache.notifications.event.CacheStoppedEvent;
+import org.jboss.cache.notifications.event.NodeVisitedEvent;
+import org.jboss.cache.util.CachePrinter;
+import org.jboss.cache.util.TestingUtil;
+import org.testng.annotations.Test;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+@Test(testName = "buddyreplication.GravitationFromDyingNodeTest", groups =
"functional")
+public class GravitationFromDyingNodeTest extends BuddyReplicationTestsBase {
+
+ static Log log = LogFactory.getLog(GravitationFromDyingNodeTest.class);
+
+ public void testGravitationFromDyingNode() throws Exception {
+ // this test starts 4 nodes, each with 2 backups.
+ // the data owner is stopped.
+ // the node with no backup requests state from the other 2.
+ // one of the other two should be shutting down, and the other should be slow to
respond
+ // so that the illegal cache state exception is propagated to the caller
+ // this encapsulates what is seen in JBCACHE-1528
+
+ List<CacheSPI<Object, Object>> caches = createCaches(2, 4, false, true,
false, true);
+ final CacheSPI c1 = caches.get(0), c2 = caches.get(1), c3 = caches.get(2), c4 =
caches.get(3);
+
+ c1.put("/a/b/c", "k", "v");
+
+ System.out.println("BEFORE: " + CachePrinter.printCacheDetails(c1, c2,
c3, c4));
+
+ // kill c1
+ c1.stop();
+
+ log.error("** Stopped C1!");
+
+ // now stop c2, but make sure it takes time to shut down.
+ final StopListener sl = new StopListener();
+ c2.addCacheListener(sl);
+
+ final GetNodeListener gnl = new GetNodeListener();
+ c3.addCacheListener(gnl);
+
+ Thread stopper = new Thread("Stopper") {
+ @Override
+ public void run() {
+ log.error("** Stopping C2!");
+ c2.stop();
+ log.error("** Stopped C2!");
+ }
+ };
+
+ stopper.start();
+
+ Thread gateOpener = new Thread("GateOpener") {
+ @Override
+ public void run() {
+ TestingUtil.sleepThread(1000); // Yuk
+ log.error("** Opening gates!!");
+ sl.openGate();
+ gnl.openGate();
+ }
+ };
+
+ gateOpener.start();
+ log.error("** Starting gravitation!");
+ c4.getInvocationContext().getOptionOverrides().setForceDataGravitation(true);
+ assert null != c4.getNode("/a/b/c");
+ }
+
+ static abstract class GatedListener {
+ CountDownLatch gate = new CountDownLatch(1);
+
+ public void openGate() {
+ gate.countDown();
+ }
+
+ void waitForGate() {
+ try {
+ gate.await();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ @CacheListener
+ public static class StopListener extends GatedListener {
+
+ @CacheStopped
+ public void stop(CacheStoppedEvent e) {
+ if (e.isPre()) {
+ log.error("** Waiting on gate to stop C2!");
+ waitForGate();
+ }
+ }
+ }
+
+ @CacheListener
+ public static class GetNodeListener extends GatedListener {
+
+ @NodeVisited
+ public void visit(NodeVisitedEvent e) {
+ if (e.isPre()) {
+ log.error("** Waiting on gate to read C3!");
+ waitForGate();
+ }
+ }
+ }
+}
Property changes on:
core/trunk/src/test/java/org/jboss/cache/buddyreplication/GravitationFromDyingNodeTest.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Modified: core/trunk/src/test/java/org/jboss/cache/notifications/NotifierTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/notifications/NotifierTest.java 2009-08-06
10:10:12 UTC (rev 8161)
+++ core/trunk/src/test/java/org/jboss/cache/notifications/NotifierTest.java 2009-08-06
11:21:03 UTC (rev 8162)
@@ -164,14 +164,24 @@
assert allEventsListener.cacheStartedEvent.getType() == Event.Type.CACHE_STARTED;
}
- public void testNotifyCacheStopped()
+ public void testNotifyCacheStoppedPre()
{
assert allEventsListener.cacheStoppedEvent == null;
- notifier.notifyCacheStopped();
+ notifier.notifyCacheStoppedPre();
assert allEventsListener.cacheStoppedEvent != null;
assert allEventsListener.cacheStoppedEvent.getType() == Event.Type.CACHE_STOPPED;
+ assert allEventsListener.cacheStoppedEvent.isPre();
}
+ public void testNotifyCacheStoppedPost()
+ {
+ assert allEventsListener.cacheStoppedEvent == null;
+ notifier.notifyCacheStoppedPost();
+ assert allEventsListener.cacheStoppedEvent != null;
+ assert allEventsListener.cacheStoppedEvent.getType() == Event.Type.CACHE_STOPPED;
+ assert !allEventsListener.cacheStoppedEvent.isPre();
+ }
+
public void testNotifyViewChange()
{
assert allEventsListener.viewChanged == null;