[infinispan-commits] Infinispan SVN: r2371 - in trunk/core/src: main/java/org/infinispan/remoting/transport and 2 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Tue Sep 14 13:11:35 EDT 2010


Author: manik.surtani at jboss.com
Date: 2010-09-14 13:11:35 -0400 (Tue, 14 Sep 2010)
New Revision: 2371

Added:
   trunk/core/src/main/java/org/infinispan/remoting/transport/AbstractTransport.java
   trunk/core/src/test/java/org/infinispan/remoting/NonExistentCacheTest.java
Modified:
   trunk/core/src/main/java/org/infinispan/config/GlobalConfiguration.java
   trunk/core/src/main/java/org/infinispan/remoting/transport/Transport.java
   trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java
Log:
[ISPN-648] (Allow suppressing of CacheNotFoundExceptions in the RpcManagerImpl)

Modified: trunk/core/src/main/java/org/infinispan/config/GlobalConfiguration.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/config/GlobalConfiguration.java	2010-09-14 17:06:17 UTC (rev 2370)
+++ trunk/core/src/main/java/org/infinispan/config/GlobalConfiguration.java	2010-09-14 17:11:35 UTC (rev 2371)
@@ -132,7 +132,22 @@
       globalJmxStatistics.setAllowDuplicateDomains(allowDuplicateDomains);
    }
 
+   public boolean isStrictPeerToPeer() {
+      return transport.strictPeerToPeer;
+   }
+
    /**
+    * If set to true, RPC operations will fail if the named cache does not exist on remote nodes
+    * with a NamedCacheNotFoundException.  Otherwise, operations will succeed but it will be
+    * logged on the caller that the RPC did not succeed on certain nodes due to the named cache
+    * not being available.
+    * @param strictPeerToPeer flag controlling this behavior
+    */
+   public void setStrictPeerToPeer(boolean strictPeerToPeer) {
+      transport.setStrictPeerToPeer(strictPeerToPeer);
+   }
+
+   /**
     * Behavior of the JVM shutdown hook registered by the cache
     */
    public static enum ShutdownHookBehavior {
@@ -588,6 +603,14 @@
       protected String clusterName = "Infinispan-Cluster";
 
       /**
+       * @configRef desc = "If set to true, RPC operations will fail if the named cache does not exist on remote nodes
+       *                    with a NamedCacheNotFoundException.  Otherwise, operations will succeed but it will be
+       *                    logged on the caller that the RPC did not succeed on certain nodes due to the named cache
+       *                    not being available."
+       */
+      protected Boolean strictPeerToPeer = true;      
+
+      /**
        * @configRef desc="Cluster-wide synchronization timeout for locks.  Used to coordinate changes in cluster
        * membership."
        */
@@ -651,6 +674,12 @@
          this.properties = properties;
       }
 
+      @XmlElement
+      public void setStrictPeerToPeer(Boolean strictPeerToPeer) {
+         testImmutability("strictPeerToPeer");
+         this.strictPeerToPeer = strictPeerToPeer;
+      }
+
       @Override
       public TransportType clone() throws CloneNotSupportedException {
          TransportType dolly = (TransportType) super.clone();

Copied: trunk/core/src/main/java/org/infinispan/remoting/transport/AbstractTransport.java (from rev 2370, branches/4.2.x/core/src/main/java/org/infinispan/remoting/transport/AbstractTransport.java)
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/transport/AbstractTransport.java	                        (rev 0)
+++ trunk/core/src/main/java/org/infinispan/remoting/transport/AbstractTransport.java	2010-09-14 17:11:35 UTC (rev 2371)
@@ -0,0 +1,80 @@
+package org.infinispan.remoting.transport;
+
+import org.infinispan.CacheException;
+import org.infinispan.config.GlobalConfiguration;
+import org.infinispan.manager.NamedCacheNotFoundException;
+import org.infinispan.remoting.ReplicationException;
+import org.infinispan.remoting.responses.ExceptionResponse;
+import org.infinispan.remoting.responses.Response;
+import org.infinispan.remoting.transport.jgroups.SuspectException;
+import org.infinispan.util.concurrent.TimeoutException;
+import org.infinispan.util.logging.Log;
+
+import java.util.List;
+
+/**
+ * Common transport-related behaviour
+ *
+ * @author Manik Surtani
+ * @version 4.2
+ */
+public abstract class AbstractTransport implements Transport {
+
+   protected GlobalConfiguration configuration;
+
+   public void setConfiguration(GlobalConfiguration globalConfiguration) {
+      this.configuration = globalConfiguration;
+   }
+
+   protected final boolean shouldThrowException(Exception ce) {
+      if (!configuration.isStrictPeerToPeer()) {
+         if (ce instanceof NamedCacheNotFoundException) return false;
+         if (ce.getCause() != null && ce.getCause() instanceof NamedCacheNotFoundException) return false;
+      }
+      return true;
+   }
+
+   protected boolean parseResponseAndAddToResponseList(Object value, List<Response> retval, boolean wasSuspected,
+                                                       boolean wasReceived, Address sender, boolean usedResponseFilter)
+           throws Exception
+   {
+      Log log = getLog();
+      boolean trace = log.isTraceEnabled();
+      boolean invalidResponse = true;
+      if (wasSuspected || !wasReceived) {
+         if (wasSuspected) {
+            throw new SuspectException("Suspected member: " + sender);
+         } else {
+            // if we have a response filter then we may not have waited for some nodes!
+            if (usedResponseFilter) throw new TimeoutException("Replication timeout for " + sender);
+         }
+      } else {
+         invalidResponse = false;
+         if (value instanceof Response) {
+            Response response = (Response) value;
+            if (response instanceof ExceptionResponse) {
+               Exception e = ((ExceptionResponse) value).getException();
+               if (!(e instanceof ReplicationException)) {
+                  // if we have any application-level exceptions make sure we throw them!!
+                  if (shouldThrowException(e)) {
+                     throw e;
+                  } else {
+                     if (log.isDebugEnabled()) log.debug("Received exception from sender {0}", sender, e);
+                  }
+               }
+            }
+            retval.add(response);
+         } else if (value instanceof Exception) {
+            Exception e = (Exception) value;
+            if (trace) log.trace("Unexpected exception from " + sender, e);
+            throw e;
+         } else if (value instanceof Throwable) {
+            Throwable t = (Throwable) value;
+            if (trace) log.trace("Unexpected throwable from " + sender, t);
+            throw new CacheException("Remote (" + sender + ") failed unexpectedly", t);
+         }
+      }
+      
+      return invalidResponse;
+   }
+}

Modified: trunk/core/src/main/java/org/infinispan/remoting/transport/Transport.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/transport/Transport.java	2010-09-14 17:06:17 UTC (rev 2370)
+++ trunk/core/src/main/java/org/infinispan/remoting/transport/Transport.java	2010-09-14 17:11:35 UTC (rev 2371)
@@ -17,6 +17,7 @@
 import org.infinispan.remoting.rpc.ResponseFilter;
 import org.infinispan.remoting.rpc.ResponseMode;
 import org.infinispan.statetransfer.StateTransferException;
+import org.infinispan.util.logging.Log;
 
 import java.util.Collection;
 import java.util.List;
@@ -34,6 +35,9 @@
 public interface Transport extends Lifecycle {
    // TODO discovery should be abstracted away into a separate set of interfaces such that it is not tightly coupled to the transport
 
+   @Inject
+   void setConfiguration(GlobalConfiguration gc);
+
    /**
     * Initializes the transport with global cache configuration and transport-specific properties.
     *
@@ -44,7 +48,7 @@
     * @param notifier      notifier to use
     */
    @Inject
-   void initialize(GlobalConfiguration c, StreamingMarshaller marshaller,
+   void initialize(StreamingMarshaller marshaller,
                    @ComponentName(KnownComponentNames.ASYNC_TRANSPORT_EXECUTOR) ExecutorService asyncExecutor,
                    InboundInvocationHandler handler, CacheManagerNotifier notifier);
 
@@ -134,4 +138,6 @@
    void stop();
 
    int getViewId();
+
+   Log getLog();
 }

Modified: trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java	2010-09-14 17:06:17 UTC (rev 2370)
+++ trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java	2010-09-14 17:11:35 UTC (rev 2371)
@@ -35,6 +35,7 @@
 import org.infinispan.remoting.responses.Response;
 import org.infinispan.remoting.rpc.ResponseFilter;
 import org.infinispan.remoting.rpc.ResponseMode;
+import org.infinispan.remoting.transport.AbstractTransport;
 import org.infinispan.remoting.transport.Address;
 import org.infinispan.remoting.transport.DistributedSync;
 import org.infinispan.remoting.transport.Transport;
@@ -88,7 +89,7 @@
  * @author Galder Zamarreño
  * @since 4.0
  */
-public class JGroupsTransport implements Transport, ExtendedMembershipListener, ExtendedMessageListener {
+public class JGroupsTransport extends AbstractTransport implements ExtendedMembershipListener, ExtendedMessageListener {
    public static final String CONFIGURATION_STRING = "configurationString";
    public static final String CONFIGURATION_XML = "configurationXml";
    public static final String CONFIGURATION_FILE = "configurationFile";
@@ -106,7 +107,6 @@
    CommandAwareRpcDispatcher dispatcher;
    static final Log log = LogFactory.getLog(JGroupsTransport.class);
    static final boolean trace = log.isTraceEnabled();
-   protected GlobalConfiguration c;
    protected TypedProperties props;
    protected InboundInvocationHandler inboundInvocationHandler;
    protected StreamingMarshaller marshaller;
@@ -130,13 +130,16 @@
    public JGroupsTransport() {
    }
 
+   public Log getLog() {
+      return log;
+   }
+
    // ------------------------------------------------------------------------------------------------------------------
    // Lifecycle and setup stuff
    // ------------------------------------------------------------------------------------------------------------------
 
-   public void initialize(GlobalConfiguration c, StreamingMarshaller marshaller, ExecutorService asyncExecutor,
+   public void initialize(StreamingMarshaller marshaller, ExecutorService asyncExecutor,
                           InboundInvocationHandler inboundInvocationHandler, CacheManagerNotifier notifier) {
-      this.c = c;
       this.marshaller = marshaller;
       this.asyncExecutor = asyncExecutor;
       this.inboundInvocationHandler = inboundInvocationHandler;
@@ -144,8 +147,8 @@
    }
 
    public void start() {
-      props = TypedProperties.toTypedProperties(c.getTransportProperties());
-      distributedSyncTimeout = c.getDistributedSyncTimeout();
+      props = TypedProperties.toTypedProperties(configuration.getTransportProperties());
+      distributedSyncTimeout = configuration.getDistributedSyncTimeout();
 
       if (log.isInfoEnabled()) log.info("Starting JGroups Channel");
 
@@ -161,7 +164,7 @@
    protected void startJGroupsChannelIfNeeded() {
       if (startChannel) {
          try {
-            channel.connect(c.getClusterName());
+            channel.connect(configuration.getClusterName());
          } catch (ChannelException e) {
             throw new CacheException("Unable to start JGroups Channel", e);
          }
@@ -204,7 +207,7 @@
          buildChannel();
          // Channel.LOCAL *must* be set to false so we don't see our own messages - otherwise invalidations targeted at
          // remote instances will be received by self.
-         String transportNodeName = c.getTransportNodeName();
+         String transportNodeName = configuration.getTransportNodeName();
          if (transportNodeName != null && transportNodeName.length() > 0) {
             long range = Short.MAX_VALUE * 2;
             long randomInRange = (long) ((Math.random() * range) % range) + 1;
@@ -412,37 +415,7 @@
 
          boolean noValidResponses = true;
          for (Rsp rsp : rsps.values()) {
-            if (rsp.wasSuspected() || !rsp.wasReceived()) {
-               if (rsp.wasSuspected()) {
-                  throw new SuspectException("Suspected member: " + rsp.getSender());
-               } else {
-                  // if we have a response filter then we may not have waited for some nodes!
-                  if (responseFilter == null) throw new TimeoutException("Replication timeout for " + rsp.getSender());
-               }
-            } else {
-               noValidResponses = false;
-               Object value = rsp.getValue();
-               if (value instanceof Response) {
-                  Response response = (Response) value;
-                  if (response instanceof ExceptionResponse) {
-                     Exception e = ((ExceptionResponse) value).getException();
-                     if (!(e instanceof ReplicationException)) {
-                        // if we have any application-level exceptions make sure we throw them!!
-                        if (trace) log.trace("Received exception from " + rsp.getSender(), e);
-                        throw e;
-                     }
-                  }
-                  retval.add(response);
-               } else if (value instanceof Exception) {
-                  Exception e = (Exception) value;
-                  if (trace) log.trace("Unexpected exception from " + rsp.getSender(), e);
-                  throw e;
-               } else if (value instanceof Throwable) {
-                  Throwable t = (Throwable) value;
-                  if (trace) log.trace("Unexpected throwable from " + rsp.getSender(), t);
-                  throw new CacheException("Remote (" + rsp.getSender() + ") failed unexpectedly", t);
-               }
-            }
+            noValidResponses = parseResponseAndAddToResponseList(rsp.getValue(), retval, rsp.wasSuspected(), rsp.wasReceived(), new JGroupsAddress(rsp.getSender()), responseFilter != null) && noValidResponses;
          }
 
          if (noValidResponses) throw new TimeoutException("Timed out waiting for valid responses!");

Copied: trunk/core/src/test/java/org/infinispan/remoting/NonExistentCacheTest.java (from rev 2370, branches/4.2.x/core/src/test/java/org/infinispan/remoting/NonExistentCacheTest.java)
===================================================================
--- trunk/core/src/test/java/org/infinispan/remoting/NonExistentCacheTest.java	                        (rev 0)
+++ trunk/core/src/test/java/org/infinispan/remoting/NonExistentCacheTest.java	2010-09-14 17:11:35 UTC (rev 2371)
@@ -0,0 +1,62 @@
+package org.infinispan.remoting;
+
+import org.infinispan.CacheException;
+import org.infinispan.config.Configuration;
+import org.infinispan.config.GlobalConfiguration;
+import org.infinispan.manager.CacheContainer;
+import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.manager.NamedCacheNotFoundException;
+import org.infinispan.test.AbstractInfinispanTest;
+import org.infinispan.test.MultipleCacheManagersTest;
+import org.infinispan.test.TestingUtil;
+import org.infinispan.test.fwk.TestCacheManagerFactory;
+import org.testng.annotations.Test;
+
+ at Test
+public class NonExistentCacheTest extends AbstractInfinispanTest {
+
+   public void testStrictPeerToPeer() {
+      doTest(true);
+   }
+
+   public void testNonStrictPeerToPeer() {
+      doTest(false);
+   }
+
+   private void doTest(boolean strict) {
+      EmbeddedCacheManager cm1 = null, cm2 = null;
+      try {
+         Configuration c = new Configuration();
+         c.setCacheMode(Configuration.CacheMode.REPL_SYNC);
+         GlobalConfiguration gc = GlobalConfiguration.getClusteredDefault();
+         gc.setStrictPeerToPeer(strict);
+
+         cm1 = TestCacheManagerFactory.createCacheManager(gc, c);
+         cm2 = TestCacheManagerFactory.createCacheManager(gc, c);
+
+         cm1.getCache();
+         cm2.getCache();
+
+         cm1.getCache().put("k", "v");
+         assert "v".equals(cm1.getCache().get("k"));
+         assert "v".equals(cm2.getCache().get("k"));
+
+         cm1.defineConfiguration("newCache", c);
+
+         if (strict) {
+            try {
+               cm1.getCache("newCache").put("k", "v");
+               assert false : "Should have failed!";
+            } catch (CacheException e) {
+               assert e.getCause() instanceof NamedCacheNotFoundException;
+            }
+         } else {
+            cm1.getCache("newCache").put("k", "v");
+            assert "v".equals(cm1.getCache("newCache").get("k"));
+         }
+      } finally {
+         TestingUtil.killCacheManagers(cm1, cm2);
+      }
+   }
+
+}



More information about the infinispan-commits mailing list