[infinispan-commits] Infinispan SVN: r2371 - in trunk/core/src: main/java/org/infinispan/remoting/transport and 2 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Tue Sep 14 13:11:35 EDT 2010
Author: manik.surtani at jboss.com
Date: 2010-09-14 13:11:35 -0400 (Tue, 14 Sep 2010)
New Revision: 2371
Added:
trunk/core/src/main/java/org/infinispan/remoting/transport/AbstractTransport.java
trunk/core/src/test/java/org/infinispan/remoting/NonExistentCacheTest.java
Modified:
trunk/core/src/main/java/org/infinispan/config/GlobalConfiguration.java
trunk/core/src/main/java/org/infinispan/remoting/transport/Transport.java
trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java
Log:
[ISPN-648] (Allow suppressing of CacheNotFoundExceptions in the RpcManagerImpl)
Modified: trunk/core/src/main/java/org/infinispan/config/GlobalConfiguration.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/config/GlobalConfiguration.java 2010-09-14 17:06:17 UTC (rev 2370)
+++ trunk/core/src/main/java/org/infinispan/config/GlobalConfiguration.java 2010-09-14 17:11:35 UTC (rev 2371)
@@ -132,7 +132,22 @@
globalJmxStatistics.setAllowDuplicateDomains(allowDuplicateDomains);
}
+ public boolean isStrictPeerToPeer() {
+ return transport.strictPeerToPeer;
+ }
+
/**
+ * If set to true, RPC operations will fail if the named cache does not exist on remote nodes
+ * with a NamedCacheNotFoundException. Otherwise, operations will succeed but it will be
+ * logged on the caller that the RPC did not succeed on certain nodes due to the named cache
+ * not being available.
+ * @param strictPeerToPeer flag controlling this behavior
+ */
+ public void setStrictPeerToPeer(boolean strictPeerToPeer) {
+ transport.setStrictPeerToPeer(strictPeerToPeer);
+ }
+
+ /**
* Behavior of the JVM shutdown hook registered by the cache
*/
public static enum ShutdownHookBehavior {
@@ -588,6 +603,14 @@
protected String clusterName = "Infinispan-Cluster";
/**
+ * @configRef desc = "If set to true, RPC operations will fail if the named cache does not exist on remote nodes
+ * with a NamedCacheNotFoundException. Otherwise, operations will succeed but it will be
+ * logged on the caller that the RPC did not succeed on certain nodes due to the named cache
+ * not being available."
+ */
+ protected Boolean strictPeerToPeer = true;
+
+ /**
* @configRef desc="Cluster-wide synchronization timeout for locks. Used to coordinate changes in cluster
* membership."
*/
@@ -651,6 +674,12 @@
this.properties = properties;
}
+ @XmlElement
+ public void setStrictPeerToPeer(Boolean strictPeerToPeer) {
+ testImmutability("strictPeerToPeer");
+ this.strictPeerToPeer = strictPeerToPeer;
+ }
+
@Override
public TransportType clone() throws CloneNotSupportedException {
TransportType dolly = (TransportType) super.clone();
Copied: trunk/core/src/main/java/org/infinispan/remoting/transport/AbstractTransport.java (from rev 2370, branches/4.2.x/core/src/main/java/org/infinispan/remoting/transport/AbstractTransport.java)
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/transport/AbstractTransport.java (rev 0)
+++ trunk/core/src/main/java/org/infinispan/remoting/transport/AbstractTransport.java 2010-09-14 17:11:35 UTC (rev 2371)
@@ -0,0 +1,80 @@
+package org.infinispan.remoting.transport;
+
+import org.infinispan.CacheException;
+import org.infinispan.config.GlobalConfiguration;
+import org.infinispan.manager.NamedCacheNotFoundException;
+import org.infinispan.remoting.ReplicationException;
+import org.infinispan.remoting.responses.ExceptionResponse;
+import org.infinispan.remoting.responses.Response;
+import org.infinispan.remoting.transport.jgroups.SuspectException;
+import org.infinispan.util.concurrent.TimeoutException;
+import org.infinispan.util.logging.Log;
+
+import java.util.List;
+
+/**
+ * Common transport-related behaviour
+ *
+ * @author Manik Surtani
+ * @version 4.2
+ */
+public abstract class AbstractTransport implements Transport {
+
+ protected GlobalConfiguration configuration;
+
+ public void setConfiguration(GlobalConfiguration globalConfiguration) {
+ this.configuration = globalConfiguration;
+ }
+
+ protected final boolean shouldThrowException(Exception ce) {
+ if (!configuration.isStrictPeerToPeer()) {
+ if (ce instanceof NamedCacheNotFoundException) return false;
+ if (ce.getCause() != null && ce.getCause() instanceof NamedCacheNotFoundException) return false;
+ }
+ return true;
+ }
+
+ protected boolean parseResponseAndAddToResponseList(Object value, List<Response> retval, boolean wasSuspected,
+ boolean wasReceived, Address sender, boolean usedResponseFilter)
+ throws Exception
+ {
+ Log log = getLog();
+ boolean trace = log.isTraceEnabled();
+ boolean invalidResponse = true;
+ if (wasSuspected || !wasReceived) {
+ if (wasSuspected) {
+ throw new SuspectException("Suspected member: " + sender);
+ } else {
+ // if we have a response filter then we may not have waited for some nodes!
+ if (usedResponseFilter) throw new TimeoutException("Replication timeout for " + sender);
+ }
+ } else {
+ invalidResponse = false;
+ if (value instanceof Response) {
+ Response response = (Response) value;
+ if (response instanceof ExceptionResponse) {
+ Exception e = ((ExceptionResponse) value).getException();
+ if (!(e instanceof ReplicationException)) {
+ // if we have any application-level exceptions make sure we throw them!!
+ if (shouldThrowException(e)) {
+ throw e;
+ } else {
+ if (log.isDebugEnabled()) log.debug("Received exception from sender {0}", sender, e);
+ }
+ }
+ }
+ retval.add(response);
+ } else if (value instanceof Exception) {
+ Exception e = (Exception) value;
+ if (trace) log.trace("Unexpected exception from " + sender, e);
+ throw e;
+ } else if (value instanceof Throwable) {
+ Throwable t = (Throwable) value;
+ if (trace) log.trace("Unexpected throwable from " + sender, t);
+ throw new CacheException("Remote (" + sender + ") failed unexpectedly", t);
+ }
+ }
+
+ return invalidResponse;
+ }
+}
Modified: trunk/core/src/main/java/org/infinispan/remoting/transport/Transport.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/transport/Transport.java 2010-09-14 17:06:17 UTC (rev 2370)
+++ trunk/core/src/main/java/org/infinispan/remoting/transport/Transport.java 2010-09-14 17:11:35 UTC (rev 2371)
@@ -17,6 +17,7 @@
import org.infinispan.remoting.rpc.ResponseFilter;
import org.infinispan.remoting.rpc.ResponseMode;
import org.infinispan.statetransfer.StateTransferException;
+import org.infinispan.util.logging.Log;
import java.util.Collection;
import java.util.List;
@@ -34,6 +35,9 @@
public interface Transport extends Lifecycle {
// TODO discovery should be abstracted away into a separate set of interfaces such that it is not tightly coupled to the transport
+ @Inject
+ void setConfiguration(GlobalConfiguration gc);
+
/**
* Initializes the transport with global cache configuration and transport-specific properties.
*
@@ -44,7 +48,7 @@
* @param notifier notifier to use
*/
@Inject
- void initialize(GlobalConfiguration c, StreamingMarshaller marshaller,
+ void initialize(StreamingMarshaller marshaller,
@ComponentName(KnownComponentNames.ASYNC_TRANSPORT_EXECUTOR) ExecutorService asyncExecutor,
InboundInvocationHandler handler, CacheManagerNotifier notifier);
@@ -134,4 +138,6 @@
void stop();
int getViewId();
+
+ Log getLog();
}
Modified: trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java 2010-09-14 17:06:17 UTC (rev 2370)
+++ trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java 2010-09-14 17:11:35 UTC (rev 2371)
@@ -35,6 +35,7 @@
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.rpc.ResponseFilter;
import org.infinispan.remoting.rpc.ResponseMode;
+import org.infinispan.remoting.transport.AbstractTransport;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.DistributedSync;
import org.infinispan.remoting.transport.Transport;
@@ -88,7 +89,7 @@
* @author Galder Zamarreño
* @since 4.0
*/
-public class JGroupsTransport implements Transport, ExtendedMembershipListener, ExtendedMessageListener {
+public class JGroupsTransport extends AbstractTransport implements ExtendedMembershipListener, ExtendedMessageListener {
public static final String CONFIGURATION_STRING = "configurationString";
public static final String CONFIGURATION_XML = "configurationXml";
public static final String CONFIGURATION_FILE = "configurationFile";
@@ -106,7 +107,6 @@
CommandAwareRpcDispatcher dispatcher;
static final Log log = LogFactory.getLog(JGroupsTransport.class);
static final boolean trace = log.isTraceEnabled();
- protected GlobalConfiguration c;
protected TypedProperties props;
protected InboundInvocationHandler inboundInvocationHandler;
protected StreamingMarshaller marshaller;
@@ -130,13 +130,16 @@
public JGroupsTransport() {
}
+ public Log getLog() {
+ return log;
+ }
+
// ------------------------------------------------------------------------------------------------------------------
// Lifecycle and setup stuff
// ------------------------------------------------------------------------------------------------------------------
- public void initialize(GlobalConfiguration c, StreamingMarshaller marshaller, ExecutorService asyncExecutor,
+ public void initialize(StreamingMarshaller marshaller, ExecutorService asyncExecutor,
InboundInvocationHandler inboundInvocationHandler, CacheManagerNotifier notifier) {
- this.c = c;
this.marshaller = marshaller;
this.asyncExecutor = asyncExecutor;
this.inboundInvocationHandler = inboundInvocationHandler;
@@ -144,8 +147,8 @@
}
public void start() {
- props = TypedProperties.toTypedProperties(c.getTransportProperties());
- distributedSyncTimeout = c.getDistributedSyncTimeout();
+ props = TypedProperties.toTypedProperties(configuration.getTransportProperties());
+ distributedSyncTimeout = configuration.getDistributedSyncTimeout();
if (log.isInfoEnabled()) log.info("Starting JGroups Channel");
@@ -161,7 +164,7 @@
protected void startJGroupsChannelIfNeeded() {
if (startChannel) {
try {
- channel.connect(c.getClusterName());
+ channel.connect(configuration.getClusterName());
} catch (ChannelException e) {
throw new CacheException("Unable to start JGroups Channel", e);
}
@@ -204,7 +207,7 @@
buildChannel();
// Channel.LOCAL *must* be set to false so we don't see our own messages - otherwise invalidations targeted at
// remote instances will be received by self.
- String transportNodeName = c.getTransportNodeName();
+ String transportNodeName = configuration.getTransportNodeName();
if (transportNodeName != null && transportNodeName.length() > 0) {
long range = Short.MAX_VALUE * 2;
long randomInRange = (long) ((Math.random() * range) % range) + 1;
@@ -412,37 +415,7 @@
boolean noValidResponses = true;
for (Rsp rsp : rsps.values()) {
- if (rsp.wasSuspected() || !rsp.wasReceived()) {
- if (rsp.wasSuspected()) {
- throw new SuspectException("Suspected member: " + rsp.getSender());
- } else {
- // if we have a response filter then we may not have waited for some nodes!
- if (responseFilter == null) throw new TimeoutException("Replication timeout for " + rsp.getSender());
- }
- } else {
- noValidResponses = false;
- Object value = rsp.getValue();
- if (value instanceof Response) {
- Response response = (Response) value;
- if (response instanceof ExceptionResponse) {
- Exception e = ((ExceptionResponse) value).getException();
- if (!(e instanceof ReplicationException)) {
- // if we have any application-level exceptions make sure we throw them!!
- if (trace) log.trace("Received exception from " + rsp.getSender(), e);
- throw e;
- }
- }
- retval.add(response);
- } else if (value instanceof Exception) {
- Exception e = (Exception) value;
- if (trace) log.trace("Unexpected exception from " + rsp.getSender(), e);
- throw e;
- } else if (value instanceof Throwable) {
- Throwable t = (Throwable) value;
- if (trace) log.trace("Unexpected throwable from " + rsp.getSender(), t);
- throw new CacheException("Remote (" + rsp.getSender() + ") failed unexpectedly", t);
- }
- }
+ noValidResponses = parseResponseAndAddToResponseList(rsp.getValue(), retval, rsp.wasSuspected(), rsp.wasReceived(), new JGroupsAddress(rsp.getSender()), responseFilter != null) && noValidResponses;
}
if (noValidResponses) throw new TimeoutException("Timed out waiting for valid responses!");
Copied: trunk/core/src/test/java/org/infinispan/remoting/NonExistentCacheTest.java (from rev 2370, branches/4.2.x/core/src/test/java/org/infinispan/remoting/NonExistentCacheTest.java)
===================================================================
--- trunk/core/src/test/java/org/infinispan/remoting/NonExistentCacheTest.java (rev 0)
+++ trunk/core/src/test/java/org/infinispan/remoting/NonExistentCacheTest.java 2010-09-14 17:11:35 UTC (rev 2371)
@@ -0,0 +1,62 @@
+package org.infinispan.remoting;
+
+import org.infinispan.CacheException;
+import org.infinispan.config.Configuration;
+import org.infinispan.config.GlobalConfiguration;
+import org.infinispan.manager.CacheContainer;
+import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.manager.NamedCacheNotFoundException;
+import org.infinispan.test.AbstractInfinispanTest;
+import org.infinispan.test.MultipleCacheManagersTest;
+import org.infinispan.test.TestingUtil;
+import org.infinispan.test.fwk.TestCacheManagerFactory;
+import org.testng.annotations.Test;
+
+ at Test
+public class NonExistentCacheTest extends AbstractInfinispanTest {
+
+ public void testStrictPeerToPeer() {
+ doTest(true);
+ }
+
+ public void testNonStrictPeerToPeer() {
+ doTest(false);
+ }
+
+ private void doTest(boolean strict) {
+ EmbeddedCacheManager cm1 = null, cm2 = null;
+ try {
+ Configuration c = new Configuration();
+ c.setCacheMode(Configuration.CacheMode.REPL_SYNC);
+ GlobalConfiguration gc = GlobalConfiguration.getClusteredDefault();
+ gc.setStrictPeerToPeer(strict);
+
+ cm1 = TestCacheManagerFactory.createCacheManager(gc, c);
+ cm2 = TestCacheManagerFactory.createCacheManager(gc, c);
+
+ cm1.getCache();
+ cm2.getCache();
+
+ cm1.getCache().put("k", "v");
+ assert "v".equals(cm1.getCache().get("k"));
+ assert "v".equals(cm2.getCache().get("k"));
+
+ cm1.defineConfiguration("newCache", c);
+
+ if (strict) {
+ try {
+ cm1.getCache("newCache").put("k", "v");
+ assert false : "Should have failed!";
+ } catch (CacheException e) {
+ assert e.getCause() instanceof NamedCacheNotFoundException;
+ }
+ } else {
+ cm1.getCache("newCache").put("k", "v");
+ assert "v".equals(cm1.getCache("newCache").get("k"));
+ }
+ } finally {
+ TestingUtil.killCacheManagers(cm1, cm2);
+ }
+ }
+
+}
More information about the infinispan-commits
mailing list