[infinispan-commits] Infinispan SVN: r308 - in trunk/core/src: main/java/org/infinispan/commands and 17 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Thu May 14 17:18:43 EDT 2009
Author: manik.surtani at jboss.com
Date: 2009-05-14 17:18:43 -0400 (Thu, 14 May 2009)
New Revision: 308
Added:
trunk/core/src/main/java/org/infinispan/AsyncReturnValue.java
trunk/core/src/main/java/org/infinispan/distribution/DistAsyncReturnValue.java
Removed:
trunk/core/src/main/java/org/infinispan/remoting/rpc/CacheRpcManager.java
Modified:
trunk/core/src/main/java/org/infinispan/commands/RemoteCommandFactory.java
trunk/core/src/main/java/org/infinispan/commands/control/StateTransferControlCommand.java
trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java
trunk/core/src/main/java/org/infinispan/factories/EmptyConstructorNamedCacheFactory.java
trunk/core/src/main/java/org/infinispan/factories/RpcManagerFactory.java
trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java
trunk/core/src/main/java/org/infinispan/interceptors/InvalidationInterceptor.java
trunk/core/src/main/java/org/infinispan/interceptors/ReplicationInterceptor.java
trunk/core/src/main/java/org/infinispan/interceptors/base/BaseRpcInterceptor.java
trunk/core/src/main/java/org/infinispan/loaders/cluster/ClusterCacheLoader.java
trunk/core/src/main/java/org/infinispan/manager/DefaultCacheManager.java
trunk/core/src/main/java/org/infinispan/marshall/jboss/ExternalizerClassFactory.java
trunk/core/src/main/java/org/infinispan/marshall/jboss/JBossMarshaller.java
trunk/core/src/main/java/org/infinispan/marshall/jboss/externalizers/StateTransferControlCommandExternalizer.java
trunk/core/src/main/java/org/infinispan/remoting/ReplicationQueue.java
trunk/core/src/main/java/org/infinispan/remoting/rpc/ResponseFilter.java
trunk/core/src/main/java/org/infinispan/remoting/rpc/RpcManager.java
trunk/core/src/main/java/org/infinispan/remoting/rpc/RpcManagerImpl.java
trunk/core/src/main/java/org/infinispan/remoting/transport/Transport.java
trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java
trunk/core/src/main/java/org/infinispan/transaction/xa/TransactionTable.java
trunk/core/src/main/java/org/infinispan/util/ReflectionUtil.java
trunk/core/src/test/java/org/infinispan/jmx/JmxStatsFunctionalTest.java
trunk/core/src/test/java/org/infinispan/jmx/RpcManagerMBeanTest.java
trunk/core/src/test/java/org/infinispan/manager/CacheManagerComponentRegistryTest.java
Log:
RpcManager is now named-cache scope.
Added: trunk/core/src/main/java/org/infinispan/AsyncReturnValue.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/AsyncReturnValue.java (rev 0)
+++ trunk/core/src/main/java/org/infinispan/AsyncReturnValue.java 2009-05-14 21:18:43 UTC (rev 308)
@@ -0,0 +1,44 @@
+package org.infinispan;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Wraps up return values for the asunc API
+ *
+ * @author Manik Surtani
+ * @since 4.0
+ */
+public class AsyncReturnValue implements Future<Object> {
+ final Future<Object> networkCallFuture;
+ final Object actualReturnValue;
+
+ public AsyncReturnValue(Future<Object> networkCallFuture, Object actualReturnValue) {
+ this.networkCallFuture = networkCallFuture;
+ this.actualReturnValue = actualReturnValue;
+ }
+
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ return networkCallFuture.cancel(mayInterruptIfRunning);
+ }
+
+ public boolean isCancelled() {
+ return networkCallFuture.isCancelled();
+ }
+
+ public boolean isDone() {
+ return networkCallFuture.isDone();
+ }
+
+ public Object get() throws InterruptedException, ExecutionException {
+ networkCallFuture.get();
+ return actualReturnValue;
+ }
+
+ public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ networkCallFuture.get(timeout, unit);
+ return actualReturnValue;
+ }
+}
Property changes on: trunk/core/src/main/java/org/infinispan/AsyncReturnValue.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Modified: trunk/core/src/main/java/org/infinispan/commands/RemoteCommandFactory.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/RemoteCommandFactory.java 2009-05-14 21:09:09 UTC (rev 307)
+++ trunk/core/src/main/java/org/infinispan/commands/RemoteCommandFactory.java 2009-05-14 21:18:43 UTC (rev 308)
@@ -19,7 +19,7 @@
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.scopes.Scope;
import org.infinispan.factories.scopes.Scopes;
-import org.infinispan.remoting.rpc.RpcManager;
+import org.infinispan.remoting.transport.Transport;
/**
* Specifically used to create un-initialized {@link org.infinispan.commands.ReplicableCommand}s from a byte stream.
@@ -29,11 +29,11 @@
*/
@Scope(Scopes.GLOBAL)
public class RemoteCommandFactory {
- RpcManager rpcManager;
+ Transport transport;
@Inject
- public void init(RpcManager rpcManager) {
- this.rpcManager = rpcManager;
+ public void inject(Transport transport) {
+ this.transport = transport;
}
/**
@@ -55,7 +55,7 @@
break;
case LockControlCommand.COMMAND_ID:
command = new LockControlCommand();
- break;
+ break;
case PutMapCommand.COMMAND_ID:
command = new PutMapCommand();
break;
@@ -94,7 +94,7 @@
break;
case StateTransferControlCommand.COMMAND_ID:
command = new StateTransferControlCommand();
- ((StateTransferControlCommand) command).init(rpcManager);
+ ((StateTransferControlCommand) command).init(transport);
break;
case ClusteredGetCommand.COMMAND_ID:
command = new ClusteredGetCommand();
Modified: trunk/core/src/main/java/org/infinispan/commands/control/StateTransferControlCommand.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/control/StateTransferControlCommand.java 2009-05-14 21:09:09 UTC (rev 307)
+++ trunk/core/src/main/java/org/infinispan/commands/control/StateTransferControlCommand.java 2009-05-14 21:18:43 UTC (rev 308)
@@ -2,7 +2,7 @@
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.context.InvocationContext;
-import org.infinispan.remoting.rpc.RpcManager;
+import org.infinispan.remoting.transport.Transport;
/**
* A command that informs caches participating in a state transfer of the various stages in the state transfer process.
@@ -12,7 +12,7 @@
*/
public class StateTransferControlCommand implements ReplicableCommand {
public static final int COMMAND_ID = 15;
- RpcManager rpcManager;
+ Transport transport;
boolean enabled;
public StateTransferControlCommand() {
@@ -22,15 +22,15 @@
this.enabled = enabled;
}
- public void init(RpcManager rpcManager) {
- this.rpcManager = rpcManager;
+ public void init(Transport transport) {
+ this.transport = transport;
}
public Object perform(InvocationContext ctx) throws Throwable {
if (enabled)
- rpcManager.getTransport().getDistributedSync().acquireSync();
+ transport.getDistributedSync().acquireSync();
else
- rpcManager.getTransport().getDistributedSync().releaseSync();
+ transport.getDistributedSync().releaseSync();
return null;
}
Added: trunk/core/src/main/java/org/infinispan/distribution/DistAsyncReturnValue.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/DistAsyncReturnValue.java (rev 0)
+++ trunk/core/src/main/java/org/infinispan/distribution/DistAsyncReturnValue.java 2009-05-14 21:18:43 UTC (rev 308)
@@ -0,0 +1,49 @@
+package org.infinispan.distribution;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * A version of the async return values for dist
+ *
+ * @author Manik Surtani
+ * @since 4.0
+ */
+public class DistAsyncReturnValue implements Future<Object> {
+ final Future<Object> invalFuture, replFuture;
+ final Object returnValue;
+
+ public DistAsyncReturnValue(Future<Object> invalFuture, Future<Object> replFuture, Object returnValue) {
+ this.invalFuture = invalFuture;
+ this.replFuture = replFuture;
+ this.returnValue = returnValue;
+ }
+
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ boolean invalCancelled = true;
+ if (invalFuture != null) invalCancelled = invalFuture.cancel(mayInterruptIfRunning);
+ return replFuture.cancel(mayInterruptIfRunning) && invalCancelled;
+ }
+
+ public boolean isCancelled() {
+ return replFuture.isCancelled() && (invalFuture == null || invalFuture.isCancelled());
+ }
+
+ public boolean isDone() {
+ return replFuture.isDone() && (invalFuture == null || invalFuture.isDone());
+ }
+
+ public Object get() throws InterruptedException, ExecutionException {
+ if (invalFuture != null) invalFuture.get();
+ replFuture.get();
+ return returnValue;
+ }
+
+ public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ if (invalFuture != null) invalFuture.get(timeout, unit);
+ replFuture.get(timeout, unit);
+ return returnValue;
+ }
+}
Property changes on: trunk/core/src/main/java/org/infinispan/distribution/DistAsyncReturnValue.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Modified: trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java 2009-05-14 21:09:09 UTC (rev 307)
+++ trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java 2009-05-14 21:18:43 UTC (rev 308)
@@ -12,12 +12,12 @@
import org.infinispan.notifications.cachemanagerlistener.CacheManagerNotifier;
import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged;
import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
+import org.infinispan.remoting.responses.ClusteredGetResponseValidityFilter;
+import org.infinispan.remoting.responses.Response;
+import org.infinispan.remoting.responses.SuccessfulResponse;
import org.infinispan.remoting.rpc.ResponseFilter;
import org.infinispan.remoting.rpc.ResponseMode;
import org.infinispan.remoting.rpc.RpcManager;
-import org.infinispan.remoting.responses.ClusteredGetResponseValidityFilter;
-import org.infinispan.remoting.responses.Response;
-import org.infinispan.remoting.responses.SuccessfulResponse;
import org.infinispan.remoting.transport.Address;
import org.infinispan.util.Util;
import org.infinispan.util.logging.Log;
@@ -73,7 +73,7 @@
}
public boolean isLocal(Object key) {
- return consistentHash.locate(key, replCount).contains(rpcManager.getLocalAddress());
+ return consistentHash.locate(key, replCount).contains(rpcManager.getTransport().getAddress());
}
public List<Address> locate(Object key) {
@@ -94,7 +94,7 @@
ResponseFilter filter = new ClusteredGetResponseValidityFilter(locate(key));
List<Response> responses = rpcManager.invokeRemotely(locate(key), get, ResponseMode.SYNCHRONOUS,
- configuration.getSyncReplTimeout(), false, filter, false);
+ configuration.getSyncReplTimeout(), false, filter);
if (!responses.isEmpty()) {
for (Response r : responses) {
Modified: trunk/core/src/main/java/org/infinispan/factories/EmptyConstructorNamedCacheFactory.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/factories/EmptyConstructorNamedCacheFactory.java 2009-05-14 21:09:09 UTC (rev 307)
+++ trunk/core/src/main/java/org/infinispan/factories/EmptyConstructorNamedCacheFactory.java 2009-05-14 21:18:43 UTC (rev 308)
@@ -33,7 +33,6 @@
import org.infinispan.marshall.Marshaller;
import org.infinispan.marshall.VersionAwareMarshaller;
import org.infinispan.notifications.cachelistener.CacheNotifier;
-import org.infinispan.remoting.rpc.CacheRpcManager;
import org.infinispan.transaction.TransactionLog;
/**
@@ -43,7 +42,7 @@
* @since 4.0
*/
@DefaultFactoryFor(classes = {CacheNotifier.class, EntryFactory.class, CommandsFactory.class,
- CacheLoaderManager.class, InvocationContextContainer.class, CacheRpcManager.class,
+ CacheLoaderManager.class, InvocationContextContainer.class,
BatchContainer.class, TransactionLog.class, EvictionManager.class, InvocationContextContainer.class})
public class EmptyConstructorNamedCacheFactory extends AbstractNamedCacheComponentFactory implements AutoInstantiableFactory {
@Override
@@ -54,7 +53,7 @@
if (componentType.equals(Marshaller.class)) {
componentImpl = VersionAwareMarshaller.class;
} else if (componentType.equals(InvocationContextContainer.class)) {
- componentImpl = InvocationContextContainerImpl.class;
+ componentImpl = InvocationContextContainerImpl.class;
} else {
// add an "Impl" to the end of the class name and try again
componentImpl = getClass().getClassLoader().loadClass(componentType.getName() + "Impl");
Modified: trunk/core/src/main/java/org/infinispan/factories/RpcManagerFactory.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/factories/RpcManagerFactory.java 2009-05-14 21:09:09 UTC (rev 307)
+++ trunk/core/src/main/java/org/infinispan/factories/RpcManagerFactory.java 2009-05-14 21:18:43 UTC (rev 308)
@@ -32,8 +32,8 @@
* @since 4.0
*/
@DefaultFactoryFor(classes = RpcManager.class)
-public class RpcManagerFactory extends EmptyConstructorFactory implements AutoInstantiableFactory {
-
+public class RpcManagerFactory extends EmptyConstructorNamedCacheFactory implements AutoInstantiableFactory {
+
@Override
public <T> T construct(Class<T> componentType) {
// only do this if we have a transport configured!
Modified: trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java 2009-05-14 21:09:09 UTC (rev 307)
+++ trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java 2009-05-14 21:18:43 UTC (rev 308)
@@ -17,6 +17,7 @@
import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.impl.TxInvocationContext;
+import org.infinispan.distribution.DistAsyncReturnValue;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
@@ -30,11 +31,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
/**
* The interceptor that handles distribution of entries across a cluster, as well as transparent lookup
@@ -173,7 +170,7 @@
public Object visitCommitCommand(TxInvocationContext ctx, CommitCommand command) throws Throwable {
if (ctx.isOriginLocal()) {
List<Address> recipients = new ArrayList<Address>(ctx.getTransactionParticipants());
- rpcManager.multicastRpcCommand(recipients, command, configuration.isSyncCommitPhase(), true);
+ rpcManager.anycastRpcCommand(recipients, command, configuration.isSyncCommitPhase(), true);
}
return invokeNextInterceptor(ctx, command);
}
@@ -188,7 +185,7 @@
List<Address> recipients = new ArrayList<Address>(ctx.getTransactionParticipants());
if (trace) log.trace("Multicasting PrepareCommand to recipients : " + recipients);
// this method will return immediately if we're the only member (because exclude_self=true)
- rpcManager.multicastRpcCommand(recipients, command, sync, false);
+ rpcManager.anycastRpcCommand(recipients, command, sync);
}
return retVal;
}
@@ -197,7 +194,7 @@
public Object visitRollbackCommand(TxInvocationContext ctx, RollbackCommand command) throws Throwable {
if (ctx.isOriginLocal()) {
List<Address> recipients = new ArrayList<Address>(ctx.getTransactionParticipants());
- rpcManager.multicastRpcCommand(recipients, command, configuration.isSyncRollbackPhase(), true);
+ rpcManager.anycastRpcCommand(recipients, command, configuration.isSyncRollbackPhase(), true);
}
return invokeNextInterceptor(ctx, command);
}
@@ -241,13 +238,13 @@
boolean sync = isSynchronous(ctx);
// if L1 caching is used make sure we broadcast an invalidate message
- if (isL1CacheEnabled && rec != null && rpcManager.getMembers().size() > rec.size()) {
+ if (isL1CacheEnabled && rec != null && rpcManager.getTransport().getMembers().size() > rec.size()) {
InvalidateCommand ic = cf.buildInvalidateFromL1Command(recipientGenerator.getKeys());
f1 = submitRpc(null, ic, sync, useFuture);
}
f2 = submitRpc(rec, command, sync, useFuture);
- if (f2 != null) return new DistributionCommunicationFuture(f1, f2, returnValue);
+ if (f2 != null) return new DistAsyncReturnValue(f1, f2, returnValue);
}
} else {
if (!localModeForced) {
@@ -260,64 +257,11 @@
return returnValue;
}
- private class DistributionCommunicationFuture implements Future<Object> {
- final Future<Object> invalFuture, replFuture;
- final Object returnValue;
-
- private DistributionCommunicationFuture(Future<Object> invalFuture, Future<Object> replFuture, Object returnValue) {
- this.invalFuture = invalFuture;
- this.replFuture = replFuture;
- this.returnValue = returnValue;
- }
-
- public boolean cancel(boolean mayInterruptIfRunning) {
- boolean invalCancelled = true;
- if (invalFuture != null) invalCancelled = invalFuture.cancel(mayInterruptIfRunning);
- return replFuture.cancel(mayInterruptIfRunning) && invalCancelled;
- }
-
- public boolean isCancelled() {
- return replFuture.isCancelled() && (invalFuture == null || invalFuture.isCancelled());
- }
-
- public boolean isDone() {
- return replFuture.isDone() && (invalFuture == null || invalFuture.isDone());
- }
-
- public Object get() throws InterruptedException, ExecutionException {
- if (invalFuture != null) invalFuture.get();
- replFuture.get();
- return returnValue;
- }
-
- public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
- if (invalFuture != null) invalFuture.get(timeout, unit);
- replFuture.get(timeout, unit);
- return returnValue;
- }
- }
-
-
private Future<Object> submitRpc(final List<Address> recipients, final WriteCommand cmd, final boolean sync, boolean useFuture) {
if (useFuture) {
- Callable<Object> c = new Callable<Object>() {
- public Object call() {
- if (recipients == null) {
- rpcManager.broadcastReplicableCommand(cmd, true);
- } else {
- rpcManager.multicastReplicableCommand(recipients, cmd, true);
- }
- return null;
- }
- };
-
- return asyncExecutorService.submit(c);
+ return rpcManager.anycastRpcCommandInFuture(recipients, cmd);
} else {
- if (recipients == null) {
- rpcManager.broadcastReplicableCommand(cmd, sync);
- } else {
- rpcManager.multicastReplicableCommand(recipients, cmd, sync);
- }
+ rpcManager.anycastRpcCommand(recipients, cmd, sync);
return null;
}
}
Modified: trunk/core/src/main/java/org/infinispan/interceptors/InvalidationInterceptor.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/interceptors/InvalidationInterceptor.java 2009-05-14 21:09:09 UTC (rev 307)
+++ trunk/core/src/main/java/org/infinispan/interceptors/InvalidationInterceptor.java 2009-05-14 21:18:43 UTC (rev 308)
@@ -21,6 +21,7 @@
*/
package org.infinispan.interceptors;
+import org.infinispan.AsyncReturnValue;
import org.infinispan.commands.AbstractVisitor;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.VisitableCommand;
@@ -48,7 +49,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicLong;
@@ -98,7 +98,7 @@
public Object visitClearCommand(InvocationContext ctx, ClearCommand command) throws Throwable {
// just broadcast the clear command - this is simplest!
Object retval = invokeNextInterceptor(ctx, command);
- if (ctx.isOriginLocal()) rpcManager.broadcastReplicableCommand(command, defaultSynchronous);
+ if (ctx.isOriginLocal()) rpcManager.broadcastRpcCommand(command, defaultSynchronous);
return retval;
}
@@ -192,17 +192,12 @@
incrementInvalidations();
final InvalidateCommand command = commandsFactory.buildInvalidateCommand(keys);
if (log.isDebugEnabled())
- log.debug("Cache [" + rpcManager.getLocalAddress() + "] replicating " + command);
+ log.debug("Cache [" + rpcManager.getTransport().getAddress() + "] replicating " + command);
// voila, invalidated!
if (useFuture) {
- return submitRpcCall(new Callable<Object>() {
- public Object call() throws Exception {
- rpcManager.broadcastReplicableCommand(command, true);
- return null;
- }
- }, retvalForFuture);
+ return new AsyncReturnValue(rpcManager.broadcastRpcCommandInFuture(command), retvalForFuture);
} else {
- rpcManager.broadcastReplicableCommand(command, synchronous);
+ rpcManager.broadcastRpcCommand(command, synchronous);
}
}
Modified: trunk/core/src/main/java/org/infinispan/interceptors/ReplicationInterceptor.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/interceptors/ReplicationInterceptor.java 2009-05-14 21:09:09 UTC (rev 307)
+++ trunk/core/src/main/java/org/infinispan/interceptors/ReplicationInterceptor.java 2009-05-14 21:18:43 UTC (rev 308)
@@ -21,6 +21,7 @@
*/
package org.infinispan.interceptors;
+import org.infinispan.AsyncReturnValue;
import org.infinispan.commands.LockControlCommand;
import org.infinispan.commands.tx.CommitCommand;
import org.infinispan.commands.tx.PrepareCommand;
@@ -36,8 +37,6 @@
import org.infinispan.context.impl.TxInvocationContext;
import org.infinispan.interceptors.base.BaseRpcInterceptor;
-import java.util.concurrent.Callable;
-
/**
* Takes care of replicating modifications to other caches in a cluster. Also listens for prepare(), commit() and
* rollback() messages which are received 'side-ways' (see docs/design/Refactoring.txt).
@@ -117,14 +116,9 @@
final Object returnValue = invokeNextInterceptor(ctx, command);
if (!isLocalModeForced(ctx) && command.isSuccessful() && ctx.isOriginLocal() && !ctx.isInTxScope()) {
if (ctx.isUseFutureReturnType()) {
- return submitRpcCall(new Callable<Object>() {
- public Object call() throws Exception {
- rpcManager.broadcastReplicableCommand(command, true);
- return null;
- }
- }, returnValue);
+ return new AsyncReturnValue(rpcManager.broadcastRpcCommandInFuture(command), returnValue);
} else {
- rpcManager.broadcastReplicableCommand(command, isSynchronous(ctx));
+ rpcManager.broadcastRpcCommand(command, isSynchronous(ctx));
}
}
return returnValue;
Modified: trunk/core/src/main/java/org/infinispan/interceptors/base/BaseRpcInterceptor.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/interceptors/base/BaseRpcInterceptor.java 2009-05-14 21:09:09 UTC (rev 307)
+++ trunk/core/src/main/java/org/infinispan/interceptors/base/BaseRpcInterceptor.java 2009-05-14 21:18:43 UTC (rev 308)
@@ -23,19 +23,10 @@
import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContext;
-import org.infinispan.factories.KnownComponentNames;
-import org.infinispan.factories.annotations.ComponentName;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
-import org.infinispan.remoting.rpc.CacheRpcManager;
+import org.infinispan.remoting.rpc.RpcManager;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
/**
* Acts as a base for all RPC calls - subclassed by
*
@@ -45,14 +36,11 @@
*/
public abstract class BaseRpcInterceptor extends CommandInterceptor {
- protected CacheRpcManager rpcManager;
- protected ExecutorService asyncExecutorService;
+ protected RpcManager rpcManager;
@Inject
- public void init(CacheRpcManager rpcManager,
- @ComponentName(KnownComponentNames.ASYNC_TRANSPORT_EXECUTOR) ExecutorService e) {
+ public void init(RpcManager rpcManager) {
this.rpcManager = rpcManager;
- this.asyncExecutorService = e;
}
protected boolean defaultSynchronous;
@@ -78,35 +66,4 @@
}
return false;
}
-
- protected final <X> Future<X> submitRpcCall(Callable<Object> c, final Object returnValue) {
- final Future f = asyncExecutorService.submit(c);
- return new Future<X>() {
-
- public boolean cancel(boolean mayInterruptIfRunning) {
- return f.cancel(mayInterruptIfRunning);
- }
-
- public boolean isCancelled() {
- return f.isCancelled();
- }
-
- public boolean isDone() {
- return f.isDone();
- }
-
- @SuppressWarnings("unchecked")
- public X get() throws InterruptedException, ExecutionException {
- f.get(); // wait for f to complete first
- return (X) returnValue;
- }
-
- @SuppressWarnings("unchecked")
- public X get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
- f.get(timeout, unit);
- return (X) returnValue;
- }
- };
- }
-
}
\ No newline at end of file
Modified: trunk/core/src/main/java/org/infinispan/loaders/cluster/ClusterCacheLoader.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/loaders/cluster/ClusterCacheLoader.java 2009-05-14 21:09:09 UTC (rev 307)
+++ trunk/core/src/main/java/org/infinispan/loaders/cluster/ClusterCacheLoader.java 2009-05-14 21:18:43 UTC (rev 308)
@@ -80,10 +80,10 @@
private List<Response> doRemoteCall(ClusteredGetCommand clusteredGetCommand) throws CacheLoaderException {
Set<Address> validMembers = new HashSet<Address>(rpcManager.getTransport().getMembers());
- validMembers.remove(rpcManager.getLocalAddress());
+ validMembers.remove(rpcManager.getTransport().getAddress());
ResponseFilter filter = new ClusteredGetResponseValidityFilter(validMembers);
try {
- return rpcManager.invokeRemotely(null, clusteredGetCommand, ResponseMode.WAIT_FOR_VALID_RESPONSE, config.getRemoteCallTimeout(), false, filter, false);
+ return rpcManager.invokeRemotely(null, clusteredGetCommand, ResponseMode.WAIT_FOR_VALID_RESPONSE, config.getRemoteCallTimeout(), false, filter);
} catch (Exception e) {
log.error("error while doing remote call", e);
throw new CacheLoaderException(e);
Modified: trunk/core/src/main/java/org/infinispan/manager/DefaultCacheManager.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/manager/DefaultCacheManager.java 2009-05-14 21:09:09 UTC (rev 307)
+++ trunk/core/src/main/java/org/infinispan/manager/DefaultCacheManager.java 2009-05-14 21:18:43 UTC (rev 308)
@@ -38,8 +38,8 @@
import org.infinispan.lifecycle.ComponentStatus;
import org.infinispan.lifecycle.Lifecycle;
import org.infinispan.notifications.cachemanagerlistener.CacheManagerNotifier;
-import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.transport.Address;
+import org.infinispan.remoting.transport.Transport;
import java.io.IOException;
import java.io.InputStream;
@@ -311,20 +311,20 @@
public List<Address> getMembers() {
if (globalComponentRegistry == null) return null;
- RpcManager rpcManager = globalComponentRegistry.getComponent(RpcManager.class);
- return rpcManager == null ? null : rpcManager.getTransport().getMembers();
+ Transport t = globalComponentRegistry.getComponent(Transport.class);
+ return t == null ? null : t.getMembers();
}
public Address getAddress() {
if (globalComponentRegistry == null) return null;
- RpcManager rpcManager = globalComponentRegistry.getComponent(RpcManager.class);
- return rpcManager == null ? null : rpcManager.getLocalAddress();
+ Transport t = globalComponentRegistry.getComponent(Transport.class);
+ return t == null ? null : t.getAddress();
}
public boolean isCoordinator() {
if (globalComponentRegistry == null) return false;
- RpcManager rpcManager = globalComponentRegistry.getComponent(RpcManager.class);
- return rpcManager != null && rpcManager.getTransport().isCoordinator();
+ Transport t = globalComponentRegistry.getComponent(Transport.class);
+ return t != null && t.isCoordinator();
}
private Cache createCache(String cacheName) {
Modified: trunk/core/src/main/java/org/infinispan/marshall/jboss/ExternalizerClassFactory.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/marshall/jboss/ExternalizerClassFactory.java 2009-05-14 21:09:09 UTC (rev 307)
+++ trunk/core/src/main/java/org/infinispan/marshall/jboss/ExternalizerClassFactory.java 2009-05-14 21:18:43 UTC (rev 308)
@@ -45,11 +45,11 @@
import org.infinispan.container.entries.TransientMortalCacheEntry;
import org.infinispan.marshall.MarshalledValue;
import org.infinispan.marshall.jboss.externalizers.*;
-import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.responses.ExceptionResponse;
import org.infinispan.remoting.responses.ExtendedResponse;
import org.infinispan.remoting.responses.SuccessfulResponse;
import org.infinispan.remoting.responses.UnsuccessfulResponse;
+import org.infinispan.remoting.transport.Transport;
import org.infinispan.remoting.transport.jgroups.JGroupsAddress;
import org.infinispan.transaction.xa.GlobalTransaction;
import org.infinispan.util.FastCopyHashMap;
@@ -116,16 +116,16 @@
EXTERNALIZERS.put(MortalCacheEntry.class.getName(), InternalCachedEntryExternalizer.class.getName());
EXTERNALIZERS.put(TransientCacheEntry.class.getName(), InternalCachedEntryExternalizer.class.getName());
EXTERNALIZERS.put(TransientMortalCacheEntry.class.getName(), InternalCachedEntryExternalizer.class.getName());
-
+
EXTERNALIZERS.put(InvalidateL1Command.class.getName(), ReplicableCommandExternalizer.class.getName());
}
private final Map<Class<?>, Externalizer> externalizers = new WeakHashMap<Class<?>, Externalizer>();
- private final RpcManager rpcManager;
+ private final Transport transport;
private final CustomObjectTable objectTable;
- public ExternalizerClassFactory(RpcManager rpcManager, CustomObjectTable objectTable) {
- this.rpcManager = rpcManager;
+ public ExternalizerClassFactory(Transport transport, CustomObjectTable objectTable) {
+ this.transport = transport;
this.objectTable = objectTable;
}
@@ -135,7 +135,7 @@
Class typeClazz = Util.loadClass(entry.getKey());
Externalizer ext = (Externalizer) Util.getInstance(entry.getValue());
if (ext instanceof StateTransferControlCommandExternalizer) {
- ((StateTransferControlCommandExternalizer) ext).init(rpcManager);
+ ((StateTransferControlCommandExternalizer) ext).init(transport);
}
externalizers.put(typeClazz, ext);
objectTable.add(ext);
Modified: trunk/core/src/main/java/org/infinispan/marshall/jboss/JBossMarshaller.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/marshall/jboss/JBossMarshaller.java 2009-05-14 21:09:09 UTC (rev 307)
+++ trunk/core/src/main/java/org/infinispan/marshall/jboss/JBossMarshaller.java 2009-05-14 21:18:43 UTC (rev 308)
@@ -29,7 +29,7 @@
import org.infinispan.io.ByteBuffer;
import org.infinispan.io.ExposedByteArrayOutputStream;
import org.infinispan.marshall.AbstractMarshaller;
-import org.infinispan.remoting.rpc.RpcManager;
+import org.infinispan.remoting.transport.Transport;
import org.infinispan.util.Util;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
@@ -68,7 +68,7 @@
/// private boolean trace;
@Inject
- public void init(ClassLoader defaultCl, RpcManager rpcManager) {
+ public void init(ClassLoader defaultCl, Transport transport) {
log.debug("Using JBoss Marshalling based marshaller.");
// trace = log.isTraceEnabled();
@@ -83,7 +83,7 @@
classTable = createMagicNumberClassTable();
objectTable = createCustomObjectTable();
- externalizerFactoryAndObjectTable = createCustomExternalizerFactory(rpcManager, objectTable);
+ externalizerFactoryAndObjectTable = createCustomExternalizerFactory(transport, objectTable);
configuration = new MarshallingConfiguration();
configuration.setCreator(new SunReflectiveCreator());
@@ -185,8 +185,8 @@
return classTable;
}
- protected ExternalizerClassFactory createCustomExternalizerFactory(RpcManager rpcManager, CustomObjectTable objectTable) {
- ExternalizerClassFactory externalizerFactory = new ExternalizerClassFactory(rpcManager, objectTable);
+ protected ExternalizerClassFactory createCustomExternalizerFactory(Transport transport, CustomObjectTable objectTable) {
+ ExternalizerClassFactory externalizerFactory = new ExternalizerClassFactory(transport, objectTable);
externalizerFactory.init();
return externalizerFactory;
}
Modified: trunk/core/src/main/java/org/infinispan/marshall/jboss/externalizers/StateTransferControlCommandExternalizer.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/marshall/jboss/externalizers/StateTransferControlCommandExternalizer.java 2009-05-14 21:09:09 UTC (rev 307)
+++ trunk/core/src/main/java/org/infinispan/marshall/jboss/externalizers/StateTransferControlCommandExternalizer.java 2009-05-14 21:18:43 UTC (rev 308)
@@ -22,7 +22,7 @@
package org.infinispan.marshall.jboss.externalizers;
import org.infinispan.commands.control.StateTransferControlCommand;
-import org.infinispan.remoting.rpc.RpcManager;
+import org.infinispan.remoting.transport.Transport;
import org.jboss.marshalling.Creator;
import java.io.IOException;
@@ -41,10 +41,10 @@
*/
private static final long serialVersionUID = -3743458410265076691L;
- private RpcManager rpcManager;
+ private Transport transport;
- public void init(RpcManager rpcManager) {
- this.rpcManager = rpcManager;
+ public void init(Transport transport) {
+ this.transport = transport;
}
/**
@@ -56,7 +56,7 @@
public Object createExternal(Class<?> subjectType, ObjectInput input, Creator defaultCreator)
throws IOException, ClassNotFoundException {
StateTransferControlCommand command = new StateTransferControlCommand();
- command.init(rpcManager);
+ command.init(transport);
return command;
}
}
\ No newline at end of file
Modified: trunk/core/src/main/java/org/infinispan/remoting/ReplicationQueue.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/ReplicationQueue.java 2009-05-14 21:09:09 UTC (rev 307)
+++ trunk/core/src/main/java/org/infinispan/remoting/ReplicationQueue.java 2009-05-14 21:18:43 UTC (rev 308)
@@ -30,10 +30,10 @@
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.factories.annotations.Stop;
+import org.infinispan.remoting.rpc.ResponseMode;
+import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
-import org.infinispan.remoting.rpc.RpcManager;
-import org.infinispan.remoting.rpc.ResponseMode;
import java.util.ArrayList;
import java.util.LinkedList;
@@ -146,7 +146,7 @@
log.trace("Flushing {0} elements", toReplicateSize);
MultipleRpcCommand multipleRpcCommand = commandsFactory.buildReplicateCommand(toReplicate);
// send to all live caches in the cluster
- rpcManager.invokeRemotely(null, multipleRpcCommand, ResponseMode.ASYNCHRONOUS, configuration.getSyncReplTimeout(), stateTransferEnabled);
+ rpcManager.invokeRemotely(null, multipleRpcCommand, ResponseMode.ASYNCHRONOUS, configuration.getSyncReplTimeout());
}
catch (Throwable t) {
log.error("failed replicating " + toReplicate.size() + " elements in replication queue", t);
Deleted: trunk/core/src/main/java/org/infinispan/remoting/rpc/CacheRpcManager.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/rpc/CacheRpcManager.java 2009-05-14 21:09:09 UTC (rev 307)
+++ trunk/core/src/main/java/org/infinispan/remoting/rpc/CacheRpcManager.java 2009-05-14 21:18:43 UTC (rev 308)
@@ -1,136 +0,0 @@
-package org.infinispan.remoting.rpc;
-
-import org.infinispan.CacheException;
-import org.infinispan.commands.CommandsFactory;
-import org.infinispan.commands.ReplicableCommand;
-import org.infinispan.commands.remote.CacheRpcCommand;
-import org.infinispan.commands.remote.SingleRpcCommand;
-import org.infinispan.config.Configuration;
-import org.infinispan.factories.annotations.Inject;
-import org.infinispan.factories.annotations.Start;
-import org.infinispan.remoting.ReplicationException;
-import org.infinispan.remoting.ReplicationQueue;
-import org.infinispan.remoting.transport.Address;
-import org.infinispan.util.logging.Log;
-import org.infinispan.util.logging.LogFactory;
-
-import java.util.List;
-
-/**
- * Class that encapsulates the logic for replicating commands through cluster participants.
- *
- * @author Mircea.Markus at jboss.com
- * @since 4.0
- */
-public class CacheRpcManager {
-
- private static Log log = LogFactory.getLog(CacheRpcManager.class);
- private static final boolean trace = log.isTraceEnabled();
-
- private Configuration configuration;
- private boolean stateTransferEnabled;
-
- private ReplicationQueue replicationQueue;
- private RpcManager rpcManager;
- private CommandsFactory commandsFactory;
-
-
- @Inject
- public void init(ReplicationQueue replicationQueue, RpcManager rpcManager, CommandsFactory commandsFactory, Configuration configuration) {
- this.replicationQueue = replicationQueue;
- this.rpcManager = rpcManager;
- this.commandsFactory = commandsFactory;
- this.configuration = configuration;
- }
-
- @Start
- public void init() {
- stateTransferEnabled = configuration.isFetchInMemoryState();
- }
-
- public void broadcastRpcCommand(CacheRpcCommand rpc, boolean sync, boolean useOutOfBandMessage) throws ReplicationException {
- if (useReplicationQueue(sync)) {
- replicationQueue.add(rpc);
- } else {
- multicastRpcCommand(null, rpc, sync, useOutOfBandMessage);
- }
- }
-
- public void broadcastReplicableCommand(ReplicableCommand call, boolean sync) throws ReplicationException {
- multicastReplicableCommand(null, call, sync);
- }
-
- public void multicastReplicableCommand(List<Address> members, ReplicableCommand call, boolean sync) throws ReplicationException {
- if (useReplicationQueue(sync)) {
- replicationQueue.add(call);
- } else {
- SingleRpcCommand rpcCommand = commandsFactory.buildSingleRpcCommand(call);
- multicastRpcCommand(members, rpcCommand, sync, false);
- }
- }
-
- private ResponseMode getResponseMode(boolean sync) {
- return sync ? ResponseMode.SYNCHRONOUS : configuration.isUseAsyncMarshalling() ? ResponseMode.ASYNCHRONOUS : ResponseMode.ASYNCHRONOUS_WITH_SYNC_MARSHALLING;
- }
-
- public void multicastRpcCommand(List<Address> recipients, CacheRpcCommand command, boolean sync, boolean useOutOfBandMessage) throws ReplicationException {
- if (trace) {
- log.trace("invoking method " + command.getClass().getSimpleName() + ", members=" + rpcManager.getTransport().getMembers() + ", mode=" +
- configuration.getCacheMode() + ", exclude_self=" + true + ", timeout=" +
- configuration.getSyncReplTimeout());
- log.trace("Broadcasting call " + command + " to recipient list " + recipients);
- }
-
- List rsps;
- try {
- rsps = rpcManager.invokeRemotely(recipients,
- command,
- getResponseMode(sync),
- configuration.getSyncReplTimeout(), useOutOfBandMessage, stateTransferEnabled
- );
- if (trace) log.trace("responses=" + rsps);
- if (sync) checkResponses(rsps);
- } catch (CacheException e) {
- log.error("Replication exception", e);
- throw e;
- } catch (Exception ex) {
- log.error("Unexpected exception", ex);
- throw new ReplicationException("Unexpected exception while replicating", ex);
- }
- }
-
- public boolean isClusterStarted() {
- return rpcManager != null && rpcManager.getTransport() != null && rpcManager.getTransport().getMembers() != null
- && rpcManager.getTransport().getMembers().size() > 1;
- }
-
- private boolean useReplicationQueue(boolean sync) {
- return !sync && replicationQueue != null;
- }
-
- /**
- * Checks whether any of the responses are exceptions. If yes, re-throws them (as exceptions or runtime exceptions).
- */
- private void checkResponses(List rsps) {
- if (rsps != null) {
- for (Object rsp : rsps) {
- if (rsp != null && rsp instanceof Throwable) {
- // lets print a stack trace first.
- Throwable throwable = (Throwable) rsp;
- if (trace) {
- log.trace("Received Throwable from remote cache", throwable);
- }
- throw new ReplicationException(throwable);
- }
- }
- }
- }
-
- public Address getLocalAddress() {
- return rpcManager != null ? rpcManager.getLocalAddress() : null;
- }
-
- public List<Address> getMembers() {
- return rpcManager.getTransport().getMembers();
- }
-}
Modified: trunk/core/src/main/java/org/infinispan/remoting/rpc/ResponseFilter.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/rpc/ResponseFilter.java 2009-05-14 21:09:09 UTC (rev 307)
+++ trunk/core/src/main/java/org/infinispan/remoting/rpc/ResponseFilter.java 2009-05-14 21:18:43 UTC (rev 308)
@@ -5,7 +5,7 @@
/**
* A mechanism of filtering RPC responses. Used with {@link RpcManager#invokeRemotely(java.util.List,
- * org.infinispan.commands.ReplicableCommand, ResponseMode, long, boolean, ResponseFilter, boolean)}
+ * org.infinispan.commands.ReplicableCommand, ResponseMode, long, boolean, ResponseFilter)}
*
* @author Manik Surtani
* @since 4.0
Modified: trunk/core/src/main/java/org/infinispan/remoting/rpc/RpcManager.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/rpc/RpcManager.java 2009-05-14 21:09:09 UTC (rev 307)
+++ trunk/core/src/main/java/org/infinispan/remoting/rpc/RpcManager.java 2009-05-14 21:18:43 UTC (rev 308)
@@ -22,80 +22,66 @@
package org.infinispan.remoting.rpc;
import org.infinispan.commands.ReplicableCommand;
-import org.infinispan.factories.annotations.NonVolatile;
-import org.infinispan.factories.scopes.Scope;
-import org.infinispan.factories.scopes.Scopes;
-import org.infinispan.lifecycle.Lifecycle;
+import org.infinispan.remoting.ReplicationException;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.Transport;
import org.infinispan.statetransfer.StateTransferException;
import java.util.List;
+import java.util.concurrent.Future;
/**
- * Provides a mechanism for communicating with other caches in the cluster.
- * <p/>
- * Implementations have a simple lifecycle: <ul> <li>start() - starts the underlying communication channel based on
- * configuration options injected, and connects the channel</li> <li>stop() - stops the dispatcher and releases
- * resources</li> </ul>
+ * Provides a mechanism for communicating with other caches in the cluster, by formatting and passing requests down to
+ * the registered {@link Transport}.
*
* @author Manik Surtani
* @since 4.0
*/
- at Scope(Scopes.GLOBAL)
- at NonVolatile
-public interface RpcManager extends Lifecycle {
- // TODO this needs to be re-thought regarding adding a transport-independent mechanism of unicasts for distribution based on consistent hashes
+public interface RpcManager {
/**
* Invokes an RPC call on other caches in the cluster.
*
- * @param recipients a list of Addresses to invoke the call on. If this is null, the call is broadcast to
- * the entire cluster.
- * @param rpcCommand the cache command to invoke
- * @param mode the response mode to use
- * @param timeout a timeout after which to throw a replication exception.
- * @param usePriorityQueue if true, a priority queue is used to deliver messages. May not be supported by all
- * implementations.
- * @param responseFilter a response filter with which to filter out failed/unwanted/invalid responses.
- * @param stateTransferEnabled if true, additional replaying is considered if messages need to be re-sent during the
- * course of a state transfer
+ * @param recipients a list of Addresses to invoke the call on. If this is null, the call is broadcast to the
+ * entire cluster.
+ * @param rpcCommand the cache command to invoke
+ * @param mode the response mode to use
+ * @param timeout a timeout after which to throw a replication exception.
+ * @param usePriorityQueue if true, a priority queue is used to deliver messages. May not be supported by all
+ * implementations.
+ * @param responseFilter a response filter with which to filter out failed/unwanted/invalid responses.
* @return a list of responses from each member contacted.
* @throws Exception in the event of problems.
*/
- List<Response> invokeRemotely(List<Address> recipients, ReplicableCommand rpcCommand, ResponseMode mode, long timeout, boolean usePriorityQueue, ResponseFilter responseFilter, boolean stateTransferEnabled) throws Exception;
+ List<Response> invokeRemotely(List<Address> recipients, ReplicableCommand rpcCommand, ResponseMode mode, long timeout, boolean usePriorityQueue, ResponseFilter responseFilter) throws Exception;
/**
* Invokes an RPC call on other caches in the cluster.
*
- * @param recipients a list of Addresses to invoke the call on. If this is null, the call is broadcast to
- * the entire cluster.
- * @param rpcCommand the cache command to invoke
- * @param mode the response mode to use
- * @param timeout a timeout after which to throw a replication exception.
- * @param usePriorityQueue if true, a priority queue is used to deliver messages. May not be supported by all
- * implementations.
- * @param stateTransferEnabled if true, additional replaying is considered if messages need to be re-sent during the
- * course of a state transfer
+ * @param recipients a list of Addresses to invoke the call on. If this is null, the call is broadcast to the
+ * entire cluster.
+ * @param rpcCommand the cache command to invoke
+ * @param mode the response mode to use
+ * @param timeout a timeout after which to throw a replication exception.
+ * @param usePriorityQueue if true, a priority queue is used to deliver messages. May not be supported by all
+ * implementations.
* @return a list of responses from each member contacted.
* @throws Exception in the event of problems.
*/
- List<Response> invokeRemotely(List<Address> recipients, ReplicableCommand rpcCommand, ResponseMode mode, long timeout, boolean usePriorityQueue, boolean stateTransferEnabled) throws Exception;
+ List<Response> invokeRemotely(List<Address> recipients, ReplicableCommand rpcCommand, ResponseMode mode, long timeout, boolean usePriorityQueue) throws Exception;
/**
* Invokes an RPC call on other caches in the cluster.
*
- * @param recipients a list of Addresses to invoke the call on. If this is null, the call is broadcast to
- * the entire cluster.
- * @param rpcCommand the cache command to invoke
- * @param mode the response mode to use
- * @param timeout a timeout after which to throw a replication exception.
- * @param stateTransferEnabled if true, additional replaying is considered if messages need to be re-sent during the
- * course of a state transfer
+ * @param recipients a list of Addresses to invoke the call on. If this is null, the call is broadcast to the entire
+ * cluster.
+ * @param rpcCommand the cache command to invoke
+ * @param mode the response mode to use
+ * @param timeout a timeout after which to throw a replication exception.
* @return a list of responses from each member contacted.
* @throws Exception in the event of problems.
*/
- List<Response> invokeRemotely(List<Address> recipients, ReplicableCommand rpcCommand, ResponseMode mode, long timeout, boolean stateTransferEnabled) throws Exception;
+ List<Response> invokeRemotely(List<Address> recipients, ReplicableCommand rpcCommand, ResponseMode mode, long timeout) throws Exception;
/**
* Initiates a state retrieval process from neighbouring caches. This method will block until it either times out,
@@ -109,6 +95,92 @@
void retrieveState(String cacheName, long timeout) throws StateTransferException;
/**
+ * Broadcasts an RPC command to the entire cluster.
+ *
+ * @param rpc command to execute remotely
+ * @param sync if true, the transport will operate in sync mode. Otherwise, it will operate in async mode.
+ * @throws ReplicationException in the event of problems
+ */
+ void broadcastRpcCommand(ReplicableCommand rpc, boolean sync) throws ReplicationException;
+
+ /**
+ * Broadcasts an RPC command to the entire cluster.
+ *
+ * @param rpc command to execute remotely
+ * @param sync if true, the transport will operate in sync mode. Otherwise, it will operate in async
+ * mode.
+ * @param usePriorityQueue if true, a priority queue is used
+ * @throws ReplicationException in the event of problems
+ */
+ void broadcastRpcCommand(ReplicableCommand rpc, boolean sync, boolean usePriorityQueue) throws ReplicationException;
+
+ /**
+ * The same as {@link #broadcastRpcCommand(org.infinispan.commands.ReplicableCommand, boolean)} except that the task
+ * is passed to the transport executor and a Future is returned. The transport always deals with this
+ * synchronously.
+ *
+ * @param rpc command to execute remotely
+ * @return a future
+ */
+ Future<Object> broadcastRpcCommandInFuture(ReplicableCommand rpc);
+
+ /**
+ * The same as {@link #broadcastRpcCommand(org.infinispan.commands.ReplicableCommand, boolean, boolean)} except that
+ * the task is passed to the transport executor and a Future is returned. The transport always deals with this
+ * synchronously.
+ *
+ * @param rpc command to execute remotely
+ * @param usePriorityQueue if true, a priority queue is used
+ * @return a future
+ */
+ Future<Object> broadcastRpcCommandInFuture(ReplicableCommand rpc, boolean usePriorityQueue);
+
+ /**
+ * Broadcasts an RPC command to a specified set of recipients
+ *
+ * @param recipients recipients to invoke remote command on
+ * @param rpc command to execute remotely
+ * @param sync if true, the transport will operate in sync mode. Otherwise, it will operate in async mode.
+ * @throws ReplicationException in the event of problems
+ */
+ void anycastRpcCommand(List<Address> recipients, ReplicableCommand rpc, boolean sync) throws ReplicationException;
+
+ /**
+ * Broadcasts an RPC command to a specified set of recipients
+ *
+ * @param recipients recipients to invoke remote command on
+ * @param rpc command to execute remotely
+ * @param sync if true, the transport will operate in sync mode. Otherwise, it will operate in async
+ * mode.
+ * @param usePriorityQueue if true, a priority queue is used
+ * @throws ReplicationException in the event of problems
+ */
+ void anycastRpcCommand(List<Address> recipients, ReplicableCommand rpc, boolean sync, boolean usePriorityQueue) throws ReplicationException;
+
+ /**
+ * The same as {@link #anycastRpcCommand(java.util.List, org.infinispan.commands.ReplicableCommand, boolean)} except
+ * that the task is passed to the transport executor and a Future is returned. The transport always deals with this
+ * synchronously.
+ *
+ * @param recipients recipients to invoke remote call on
+ * @param rpc command to execute remotely
+ * @return a future
+ */
+ Future<Object> anycastRpcCommandInFuture(List<Address> recipients, ReplicableCommand rpc);
+
+ /**
+ * The same as {@link #anycastRpcCommand(java.util.List, org.infinispan.commands.ReplicableCommand, boolean)} except
+ * that the task is passed to the transport executor and a Future is returned. The transport always deals with this
+ * synchronously.
+ *
+ * @param recipients recipients to invoke remote call on
+ * @param rpc command to execute remotely
+ * @param usePriorityQueue if true, a priority queue is used
+ * @return a future
+ */
+ Future<Object> anycastRpcCommandInFuture(List<Address> recipients, ReplicableCommand rpc, boolean usePriorityQueue);
+
+ /**
* @return a reference to the underlying transport.
*/
Transport getTransport();
@@ -122,9 +194,4 @@
* a null otherwise.
*/
Address getCurrentStateTransferSource();
-
- /**
- * Returns the local address.
- */
- public Address getLocalAddress();
}
\ No newline at end of file
Modified: trunk/core/src/main/java/org/infinispan/remoting/rpc/RpcManagerImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/rpc/RpcManagerImpl.java 2009-05-14 21:09:09 UTC (rev 307)
+++ trunk/core/src/main/java/org/infinispan/remoting/rpc/RpcManagerImpl.java 2009-05-14 21:18:43 UTC (rev 308)
@@ -1,19 +1,19 @@
package org.infinispan.remoting.rpc;
import org.infinispan.CacheException;
+import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.ReplicableCommand;
-import org.infinispan.config.GlobalConfiguration;
+import org.infinispan.commands.remote.CacheRpcCommand;
+import org.infinispan.config.Configuration;
import org.infinispan.factories.KnownComponentNames;
import org.infinispan.factories.annotations.ComponentName;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
-import org.infinispan.factories.annotations.Stop;
import org.infinispan.jmx.annotations.MBean;
import org.infinispan.jmx.annotations.ManagedAttribute;
import org.infinispan.jmx.annotations.ManagedOperation;
-import org.infinispan.marshall.Marshaller;
-import org.infinispan.notifications.cachemanagerlistener.CacheManagerNotifier;
-import org.infinispan.remoting.InboundInvocationHandler;
+import org.infinispan.remoting.ReplicationException;
+import org.infinispan.remoting.ReplicationQueue;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.Transport;
@@ -25,7 +25,9 @@
import java.util.Collections;
import java.util.List;
import java.util.Random;
+import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
/**
@@ -40,37 +42,43 @@
public class RpcManagerImpl implements RpcManager {
private static final Log log = LogFactory.getLog(RpcManagerImpl.class);
+ private static final boolean trace = log.isTraceEnabled();
- Transport t;
+ private Transport t;
private final AtomicLong replicationCount = new AtomicLong(0);
private final AtomicLong replicationFailures = new AtomicLong(0);
boolean statisticsEnabled = false; // by default, don't gather statistics.
private volatile Address currentStateTransferSource;
+ private boolean stateTransferEnabled;
+ private Configuration configuration;
+ private ReplicationQueue replicationQueue;
+ private ExecutorService asyncExecutor;
+ private CommandsFactory cf;
+
@Inject
- public void injectDependencies(GlobalConfiguration globalConfiguration, Transport t, InboundInvocationHandler handler,
- Marshaller marshaller,
- @ComponentName(KnownComponentNames.ASYNC_TRANSPORT_EXECUTOR) ExecutorService e,
- CacheManagerNotifier notifier) {
+ public void injectDependencies(Transport t, Configuration configuration, ReplicationQueue replicationQueue, CommandsFactory cf,
+ @ComponentName(KnownComponentNames.ASYNC_TRANSPORT_EXECUTOR) ExecutorService e) {
this.t = t;
- this.t.initialize(globalConfiguration, globalConfiguration.getTransportProperties(), marshaller, e, handler,
- notifier, globalConfiguration.getDistributedSyncTimeout());
+ this.configuration = configuration;
+ this.replicationQueue = replicationQueue;
+ this.asyncExecutor = e;
+ this.cf = cf;
}
- @Start(priority = 10)
- public void start() {
- t.start();
+ @Start(priority = 9)
+ private void start() {
+ stateTransferEnabled = configuration.isStateTransferEnabled();
}
- @Stop
- public void stop() {
- t.stop();
+ private boolean useReplicationQueue(boolean sync) {
+ return !sync && replicationQueue != null && replicationQueue.isEnabled();
}
- public List<Response> invokeRemotely(List<Address> recipients, ReplicableCommand rpcCommand, ResponseMode mode, long timeout, boolean usePriorityQueue, ResponseFilter responseFilter, boolean stateTransferEnabled) throws Exception {
+ public final List<Response> invokeRemotely(List<Address> recipients, ReplicableCommand rpcCommand, ResponseMode mode, long timeout, boolean usePriorityQueue, ResponseFilter responseFilter) throws Exception {
List<Address> members = t.getMembers();
if (members.size() < 2) {
- if (log.isDebugEnabled())
+ if (log.isDebugEnabled())
log.debug("We're the only member in the cluster; Don't invoke remotely.");
return Collections.emptyList();
} else {
@@ -90,12 +98,12 @@
}
}
- public List<Response> invokeRemotely(List<Address> recipients, ReplicableCommand rpcCommand, ResponseMode mode, long timeout, boolean usePriorityQueue, boolean stateTransferEnabled) throws Exception {
- return invokeRemotely(recipients, rpcCommand, mode, timeout, usePriorityQueue, null, stateTransferEnabled);
+ public final List<Response> invokeRemotely(List<Address> recipients, ReplicableCommand rpcCommand, ResponseMode mode, long timeout, boolean usePriorityQueue) throws Exception {
+ return invokeRemotely(recipients, rpcCommand, mode, timeout, usePriorityQueue, null);
}
- public List<Response> invokeRemotely(List<Address> recipients, ReplicableCommand rpcCommand, ResponseMode mode, long timeout, boolean stateTransferEnabled) throws Exception {
- return invokeRemotely(recipients, rpcCommand, mode, timeout, false, null, stateTransferEnabled);
+ public final List<Response> invokeRemotely(List<Address> recipients, ReplicableCommand rpcCommand, ResponseMode mode, long timeout) throws Exception {
+ return invokeRemotely(recipients, rpcCommand, mode, timeout, false, null);
}
public void retrieveState(String cacheName, long timeout) throws StateTransferException {
@@ -160,6 +168,71 @@
}
}
+ public final void broadcastRpcCommand(ReplicableCommand rpc, boolean sync) throws ReplicationException {
+ broadcastRpcCommand(rpc, sync, false);
+ }
+
+ public final void broadcastRpcCommand(ReplicableCommand rpc, boolean sync, boolean usePriorityQueue) throws ReplicationException {
+ if (useReplicationQueue(sync)) {
+ replicationQueue.add(rpc);
+ } else {
+ anycastRpcCommand(null, rpc, sync, usePriorityQueue);
+ }
+ }
+
+ public final Future<Object> broadcastRpcCommandInFuture(ReplicableCommand rpc) {
+ return broadcastRpcCommandInFuture(rpc, false);
+ }
+
+ public final Future<Object> broadcastRpcCommandInFuture(ReplicableCommand rpc, boolean usePriorityQueue) {
+ return anycastRpcCommandInFuture(null, rpc, usePriorityQueue);
+ }
+
+ public final void anycastRpcCommand(List<Address> recipients, ReplicableCommand rpc, boolean sync) throws ReplicationException {
+ anycastRpcCommand(recipients, rpc, sync, false);
+ }
+
+ public final void anycastRpcCommand(List<Address> recipients, ReplicableCommand rpc, boolean sync, boolean usePriorityQueue) throws ReplicationException {
+ if (trace) {
+ log.trace("Broadcasting call " + rpc + " to recipient list " + recipients);
+ }
+
+ if (useReplicationQueue(sync)) {
+ replicationQueue.add(rpc);
+ } else {
+ if (!(rpc instanceof CacheRpcCommand)) {
+ rpc = cf.buildSingleRpcCommand(rpc);
+ }
+ List rsps;
+ try {
+ rsps = invokeRemotely(recipients, rpc, getResponseMode(sync),
+ configuration.getSyncReplTimeout(), usePriorityQueue);
+ if (trace) log.trace("responses=" + rsps);
+ if (sync) checkResponses(rsps);
+ } catch (CacheException e) {
+ log.error("Replication exception", e);
+ throw e;
+ } catch (Exception ex) {
+ log.error("Unexpected exception", ex);
+ throw new ReplicationException("Unexpected exception while replicating", ex);
+ }
+ }
+ }
+
+ public final Future<Object> anycastRpcCommandInFuture(List<Address> recipients, ReplicableCommand rpc) {
+ return anycastRpcCommandInFuture(recipients, rpc, false);
+ }
+
+ public final Future<Object> anycastRpcCommandInFuture(final List<Address> recipients, final ReplicableCommand rpc, final boolean usePriorityQueue) {
+ Callable<Object> c = new Callable<Object>() {
+ public Object call() {
+ anycastRpcCommand(recipients, rpc, true, usePriorityQueue);
+ return null;
+ }
+ };
+ return asyncExecutor.submit(c);
+ }
+
public Transport getTransport() {
return t;
}
@@ -168,11 +241,26 @@
return currentStateTransferSource;
}
- public Address getLocalAddress() {
- if (t == null) {
- return null;
+ private ResponseMode getResponseMode(boolean sync) {
+ return sync ? ResponseMode.SYNCHRONOUS : configuration.isUseAsyncMarshalling() ? ResponseMode.ASYNCHRONOUS : ResponseMode.ASYNCHRONOUS_WITH_SYNC_MARSHALLING;
+ }
+
+ /**
+ * Checks whether any of the responses are exceptions. If yes, re-throws them (as exceptions or runtime exceptions).
+ */
+ private void checkResponses(List rsps) {
+ if (rsps != null) {
+ for (Object rsp : rsps) {
+ if (rsp != null && rsp instanceof Throwable) {
+ // lets print a stack trace first.
+ Throwable throwable = (Throwable) rsp;
+ if (trace) {
+ log.trace("Received Throwable from remote cache", throwable);
+ }
+ throw new ReplicationException(throwable);
+ }
+ }
}
- return t.getAddress();
}
// -------------------------------------------- JMX information -----------------------------------------------
Modified: trunk/core/src/main/java/org/infinispan/remoting/transport/Transport.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/transport/Transport.java 2009-05-14 21:09:09 UTC (rev 307)
+++ trunk/core/src/main/java/org/infinispan/remoting/transport/Transport.java 2009-05-14 21:18:43 UTC (rev 308)
@@ -2,7 +2,12 @@
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.config.GlobalConfiguration;
+import org.infinispan.factories.KnownComponentNames;
+import org.infinispan.factories.annotations.ComponentName;
+import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.NonVolatile;
+import org.infinispan.factories.annotations.Start;
+import org.infinispan.factories.annotations.Stop;
import org.infinispan.factories.scopes.Scope;
import org.infinispan.factories.scopes.Scopes;
import org.infinispan.lifecycle.Lifecycle;
@@ -15,7 +20,6 @@
import org.infinispan.statetransfer.StateTransferException;
import java.util.List;
-import java.util.Properties;
import java.util.concurrent.ExecutorService;
/**
@@ -33,16 +37,16 @@
/**
* Initializes the transport with global cache configuration and transport-specific properties.
*
- * @param c global cache-wide configuration
- * @param p properties to set
- * @param marshaller marshaller to use for marshalling and unmarshalling
- * @param asyncExecutor executor to use for asynchronous calls
- * @param handler handler for invoking remotely originating calls on the local cache
- * @param notifier notifier to use
- * @param distributedSyncTimeout timeout to wait for distributed syncs
+ * @param c global cache-wide configuration
+ * @param marshaller marshaller to use for marshalling and unmarshalling
+ * @param asyncExecutor executor to use for asynchronous calls
+ * @param handler handler for invoking remotely originating calls on the local cache
+ * @param notifier notifier to use
*/
- void initialize(GlobalConfiguration c, Properties p, Marshaller marshaller, ExecutorService asyncExecutor,
- InboundInvocationHandler handler, CacheManagerNotifier notifier, long distributedSyncTimeout);
+ @Inject
+ void initialize(GlobalConfiguration c, Marshaller marshaller,
+ @ComponentName(KnownComponentNames.ASYNC_TRANSPORT_EXECUTOR) ExecutorService asyncExecutor,
+ InboundInvocationHandler handler, CacheManagerNotifier notifier);
/**
* Invokes an RPC call on other caches in the cluster.
@@ -112,4 +116,11 @@
* @return true if the implementation supports state transfer, false otherwise.
*/
boolean isSupportStateTransfer();
+
+ @Start(priority = 10)
+ void start();
+
+ @Stop
+ void stop();
+
}
Modified: trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java 2009-05-14 21:09:09 UTC (rev 307)
+++ trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java 2009-05-14 21:18:43 UTC (rev 308)
@@ -81,18 +81,19 @@
// Lifecycle and setup stuff
// ------------------------------------------------------------------------------------------------------------------
- public void initialize(GlobalConfiguration c, Properties p, Marshaller marshaller, ExecutorService asyncExecutor,
- InboundInvocationHandler inboundInvocationHandler, CacheManagerNotifier notifier, long distributedSyncTimeout) {
+ public void initialize(GlobalConfiguration c, Marshaller marshaller, ExecutorService asyncExecutor,
+ InboundInvocationHandler inboundInvocationHandler, CacheManagerNotifier notifier) {
this.c = c;
- this.p = p;
this.marshaller = marshaller;
this.asyncExecutor = asyncExecutor;
this.inboundInvocationHandler = inboundInvocationHandler;
this.notifier = notifier;
- this.distributedSyncTimeout = distributedSyncTimeout;
}
public void start() {
+ p = c.getTransportProperties();
+ distributedSyncTimeout = c.getDistributedSyncTimeout();
+
log.info("Starting JGroups Channel");
initChannelAndRPCDispatcher();
Modified: trunk/core/src/main/java/org/infinispan/transaction/xa/TransactionTable.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/transaction/xa/TransactionTable.java 2009-05-14 21:09:09 UTC (rev 307)
+++ trunk/core/src/main/java/org/infinispan/transaction/xa/TransactionTable.java 2009-05-14 21:18:43 UTC (rev 308)
@@ -9,7 +9,7 @@
import org.infinispan.factories.annotations.Inject;
import org.infinispan.interceptors.InterceptorChain;
import org.infinispan.notifications.cachelistener.CacheNotifier;
-import org.infinispan.remoting.rpc.CacheRpcManager;
+import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
@@ -39,11 +39,11 @@
private InvocationContextContainer icc;
private InterceptorChain invoker;
private CacheNotifier notifier;
- private CacheRpcManager rpcManager;
+ private RpcManager rpcManager;
@Inject
- public void initialize(CommandsFactory commandsFactory, CacheRpcManager rpcManager, Configuration configuration,
+ public void initialize(CommandsFactory commandsFactory, RpcManager rpcManager, Configuration configuration,
InvocationContextContainer icc, InterceptorChain invoker, CacheNotifier notifier) {
this.commandsFactory = commandsFactory;
this.rpcManager = rpcManager;
@@ -65,8 +65,9 @@
/**
* Creates and register a {@link org.infinispan.transaction.xa.RemoteTransaction} based on the supplied params.
* Returns the created transaction.
+ *
* @throws IllegalStateException if an attempt to create a {@link org.infinispan.transaction.xa.RemoteTransaction}
- * for an already registered id is made.
+ * for an already registered id is made.
*/
public RemoteTransaction createRemoteTransaction(GlobalTransaction globalTx, WriteCommand[] modifications) {
RemoteTransaction remoteTransaction = new RemoteTransaction(modifications, globalTx);
@@ -89,7 +90,7 @@
public TransactionXaAdapter getOrCreateXaAdapter(Transaction transaction, InvocationContext ctx) {
TransactionXaAdapter current = localTransactions.get(transaction);
if (current == null) {
- Address localAddress = rpcManager != null ? rpcManager.getLocalAddress() : null;
+ Address localAddress = rpcManager != null ? rpcManager.getTransport().getAddress() : null;
GlobalTransaction tx = localAddress == null ? new GlobalTransaction(false) : new GlobalTransaction(localAddress, false);
current = new TransactionXaAdapter(tx, icc, invoker, commandsFactory, configuration, this, transaction);
localTransactions.put(transaction, current);
@@ -113,8 +114,8 @@
}
/**
- * Removes the {@link org.infinispan.transaction.xa.RemoteTransaction} coresponding to the given tx. Returns true
- * if such an tx exists.
+ * Removes the {@link org.infinispan.transaction.xa.RemoteTransaction} coresponding to the given tx. Returns true if
+ * such an tx exists.
*/
public boolean removeRemoteTransaction(GlobalTransaction txId) {
boolean existed = remoteTransactions.remove(txId) != null;
Modified: trunk/core/src/main/java/org/infinispan/util/ReflectionUtil.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/util/ReflectionUtil.java 2009-05-14 21:09:09 UTC (rev 307)
+++ trunk/core/src/main/java/org/infinispan/util/ReflectionUtil.java 2009-05-14 21:18:43 UTC (rev 308)
@@ -66,7 +66,12 @@
*/
private static void inspectRecursively(Class c, List<Method> s, Class<? extends Annotation> annotationType) {
// Superclass first
- if (!c.equals(Object.class)) inspectRecursively(c.getSuperclass(), s, annotationType);
+ if (!c.equals(Object.class)) {
+ if (!c.isInterface()) {
+ inspectRecursively(c.getSuperclass(), s, annotationType);
+ for (Class ifc : c.getInterfaces()) inspectRecursively(ifc, s, annotationType);
+ }
+ }
for (Method m : c.getDeclaredMethods()) {
// don't bother if this method has already been overridden by a subclass
Modified: trunk/core/src/test/java/org/infinispan/jmx/JmxStatsFunctionalTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/jmx/JmxStatsFunctionalTest.java 2009-05-14 21:09:09 UTC (rev 307)
+++ trunk/core/src/test/java/org/infinispan/jmx/JmxStatsFunctionalTest.java 2009-05-14 21:18:43 UTC (rev 308)
@@ -37,7 +37,7 @@
/**
* Create a local cache, two replicated caches and see that everithing is correctly registered.
*/
- public void testDefaultDomanin() {
+ public void testDefaultDomain() {
assert !existsDomains("infinispan");
GlobalConfiguration globalConfiguration = GlobalConfiguration.getClusteredDefault();
globalConfiguration.setExposeGlobalJmxStatistics(true);
@@ -58,15 +58,17 @@
cm.getCache("remote2");
assert existsObject("infinispan:cache-name=local_cache(local),jmx-resource=CacheMgmtInterceptor");
- assert existsObject("infinispan:cache-name=[global],jmx-resource=RpcManager");
+ assert existsObject("infinispan:cache-name=remote1(repl_sync),jmx-resource=RpcManager");
assert existsObject("infinispan:cache-name=remote1(repl_sync),jmx-resource=CacheMgmtInterceptor");
+ assert existsObject("infinispan:cache-name=remote2(invalidation_async),jmx-resource=RpcManager");
assert existsObject("infinispan:cache-name=remote2(invalidation_async),jmx-resource=CacheMgmtInterceptor");
TestingUtil.killCacheManagers(cm);
assert !existsObject("infinispan:cache-name=local_cache(local),jmx-resource=CacheMgmtInterceptor");
- assert !existsObject("infinispan:cache-name=[global],jmx-resource=RpcManager");
+ assert !existsObject("infinispan:cache-name=remote1(repl_sync),jmx-resource=RpcManager");
assert !existsObject("infinispan:cache-name=remote1(repl_sync),jmx-resource=CacheMgmtInterceptor");
+ assert !existsObject("infinispan:cache-name=remote2(invalidation_async),jmx-resource=RpcManager");
assert !existsObject("infinispan:cache-name=remote2(invalidation_async),jmx-resource=CacheMgmtInterceptor");
}
@@ -105,7 +107,7 @@
cm.getCache("remote1");
assert !existsObject("infinispan:cache-name=local_cache(local),jmx-resource=CacheMgmtInterceptor");
- assert existsObject("infinispan:cache-name=[global],jmx-resource=RpcManager");
+ assert existsObject("infinispan:cache-name=[global],jmx-resource=CacheManager");
assert !existsObject("infinispan:cache-name=remote1(repl_sync),jmx-resource=CacheMgmtInterceptor");
}
Modified: trunk/core/src/test/java/org/infinispan/jmx/RpcManagerMBeanTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/jmx/RpcManagerMBeanTest.java 2009-05-14 21:09:09 UTC (rev 307)
+++ trunk/core/src/test/java/org/infinispan/jmx/RpcManagerMBeanTest.java 2009-05-14 21:18:43 UTC (rev 308)
@@ -1,9 +1,7 @@
package org.infinispan.jmx;
import org.easymock.EasyMock;
-import static org.easymock.EasyMock.createMock;
-import static org.easymock.EasyMock.createNiceMock;
-import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.*;
import org.infinispan.Cache;
import org.infinispan.config.Configuration;
import org.infinispan.config.GlobalConfiguration;
@@ -20,7 +18,6 @@
import javax.management.MBeanServer;
import javax.management.ObjectName;
-
import java.util.ArrayList;
import java.util.List;
@@ -59,16 +56,16 @@
Configuration config = getDefaultClusteredConfig(Configuration.CacheMode.REPL_SYNC);
config.setExposeJmxStatistics(true);
- defineCacheOnAllManagers("repl_sync_cache", config);
- cache1 = manager(0).getCache("repl_sync_cache");
- cache2 = manager(1).getCache("repl_sync_cache");
+ String cachename = "repl_sync_cache";
+ defineCacheOnAllManagers(cachename, config);
+ cache1 = manager(0).getCache(cachename);
+ cache2 = manager(1).getCache(cachename);
mBeanServer = PerThreadMBeanServerLookup.getThreadMBeanServer();
- rpcManager1 = new ObjectName("RpcManagerMBeanTest:cache-name=[global],jmx-resource=RpcManager");
- rpcManager2 = new ObjectName("RpcManagerMBeanTest2:cache-name=[global],jmx-resource=RpcManager");
+ rpcManager1 = new ObjectName("RpcManagerMBeanTest:cache-name=" + cachename + "(repl_sync),jmx-resource=RpcManager");
+ rpcManager2 = new ObjectName("RpcManagerMBeanTest2:cache-name=" + cachename + "(repl_sync),jmx-resource=RpcManager");
}
public void testEnableJmxStats() throws Exception {
-
assert mBeanServer.isRegistered(rpcManager1);
assert mBeanServer.isRegistered(rpcManager2);
@@ -87,7 +84,7 @@
cache1.put("key", "value2");
assert cache2.get("key").equals("value2");
- assert mBeanServer.getAttribute(rpcManager1, "ReplicationCount").equals("1");
+ assert mBeanServer.getAttribute(rpcManager1, "ReplicationCount").equals("1") : "Expected 1, was " + mBeanServer.getAttribute(rpcManager1, "ReplicationCount");
assert mBeanServer.getAttribute(rpcManager1, "ReplicationFailures").equals("0");
mBeanServer.getAttribute(rpcManager1, "ReplicationCount").equals("N/A");
@@ -110,7 +107,7 @@
cache1.put("a3", "b3");
cache1.put("a4", "b4");
assert mBeanServer.getAttribute(rpcManager1, "SuccessRatio").equals("100%");
- RpcManagerImpl rpcManager = (RpcManagerImpl) TestingUtil.extractGlobalComponent(manager(0), RpcManager.class);
+ RpcManagerImpl rpcManager = (RpcManagerImpl) TestingUtil.extractComponent(cache1, RpcManager.class);
Transport originalTransport = rpcManager.getTransport();
try {
Modified: trunk/core/src/test/java/org/infinispan/manager/CacheManagerComponentRegistryTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/manager/CacheManagerComponentRegistryTest.java 2009-05-14 21:09:09 UTC (rev 307)
+++ trunk/core/src/test/java/org/infinispan/manager/CacheManagerComponentRegistryTest.java 2009-05-14 21:18:43 UTC (rev 308)
@@ -1,17 +1,17 @@
package org.infinispan.manager;
import org.infinispan.Cache;
-import org.infinispan.test.fwk.TestCacheManagerFactory;
import org.infinispan.config.Configuration;
import org.infinispan.config.GlobalConfiguration;
import org.infinispan.eviction.EvictionManager;
import org.infinispan.eviction.EvictionStrategy;
import org.infinispan.interceptors.BatchingInterceptor;
import org.infinispan.interceptors.InterceptorChain;
-import org.infinispan.remoting.rpc.RpcManager;
+import org.infinispan.remoting.transport.Transport;
import org.infinispan.test.TestingUtil;
+import org.infinispan.test.fwk.TestCacheManagerFactory;
+import org.infinispan.transaction.lookup.DummyTransactionManagerLookup;
import org.infinispan.transaction.tm.DummyTransactionManager;
-import org.infinispan.transaction.lookup.DummyTransactionManagerLookup;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;
@@ -52,9 +52,9 @@
assert TestingUtil.extractComponent(transactional, TransactionManager.class) instanceof DummyTransactionManager;
// assert force-shared components
- assert TestingUtil.extractComponent(c, RpcManager.class) != null;
- assert TestingUtil.extractComponent(transactional, RpcManager.class) != null;
- assert TestingUtil.extractComponent(c, RpcManager.class) == TestingUtil.extractComponent(transactional, RpcManager.class);
+ assert TestingUtil.extractComponent(c, Transport.class) != null;
+ assert TestingUtil.extractComponent(transactional, Transport.class) != null;
+ assert TestingUtil.extractComponent(c, Transport.class) == TestingUtil.extractComponent(transactional, Transport.class);
}
public void testForceUnsharedComponents() throws NamedCacheNotFoundException {
More information about the infinispan-commits
mailing list