[infinispan-commits] Infinispan SVN: r308 - in trunk/core/src: main/java/org/infinispan/commands and 17 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Thu May 14 17:18:43 EDT 2009


Author: manik.surtani at jboss.com
Date: 2009-05-14 17:18:43 -0400 (Thu, 14 May 2009)
New Revision: 308

Added:
   trunk/core/src/main/java/org/infinispan/AsyncReturnValue.java
   trunk/core/src/main/java/org/infinispan/distribution/DistAsyncReturnValue.java
Removed:
   trunk/core/src/main/java/org/infinispan/remoting/rpc/CacheRpcManager.java
Modified:
   trunk/core/src/main/java/org/infinispan/commands/RemoteCommandFactory.java
   trunk/core/src/main/java/org/infinispan/commands/control/StateTransferControlCommand.java
   trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java
   trunk/core/src/main/java/org/infinispan/factories/EmptyConstructorNamedCacheFactory.java
   trunk/core/src/main/java/org/infinispan/factories/RpcManagerFactory.java
   trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java
   trunk/core/src/main/java/org/infinispan/interceptors/InvalidationInterceptor.java
   trunk/core/src/main/java/org/infinispan/interceptors/ReplicationInterceptor.java
   trunk/core/src/main/java/org/infinispan/interceptors/base/BaseRpcInterceptor.java
   trunk/core/src/main/java/org/infinispan/loaders/cluster/ClusterCacheLoader.java
   trunk/core/src/main/java/org/infinispan/manager/DefaultCacheManager.java
   trunk/core/src/main/java/org/infinispan/marshall/jboss/ExternalizerClassFactory.java
   trunk/core/src/main/java/org/infinispan/marshall/jboss/JBossMarshaller.java
   trunk/core/src/main/java/org/infinispan/marshall/jboss/externalizers/StateTransferControlCommandExternalizer.java
   trunk/core/src/main/java/org/infinispan/remoting/ReplicationQueue.java
   trunk/core/src/main/java/org/infinispan/remoting/rpc/ResponseFilter.java
   trunk/core/src/main/java/org/infinispan/remoting/rpc/RpcManager.java
   trunk/core/src/main/java/org/infinispan/remoting/rpc/RpcManagerImpl.java
   trunk/core/src/main/java/org/infinispan/remoting/transport/Transport.java
   trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java
   trunk/core/src/main/java/org/infinispan/transaction/xa/TransactionTable.java
   trunk/core/src/main/java/org/infinispan/util/ReflectionUtil.java
   trunk/core/src/test/java/org/infinispan/jmx/JmxStatsFunctionalTest.java
   trunk/core/src/test/java/org/infinispan/jmx/RpcManagerMBeanTest.java
   trunk/core/src/test/java/org/infinispan/manager/CacheManagerComponentRegistryTest.java
Log:
RpcManager is now named-cache scope.

Added: trunk/core/src/main/java/org/infinispan/AsyncReturnValue.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/AsyncReturnValue.java	                        (rev 0)
+++ trunk/core/src/main/java/org/infinispan/AsyncReturnValue.java	2009-05-14 21:18:43 UTC (rev 308)
@@ -0,0 +1,44 @@
+package org.infinispan;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Wraps up return values for the asunc API
+ *
+ * @author Manik Surtani
+ * @since 4.0
+ */
+public class AsyncReturnValue implements Future<Object> {
+   final Future<Object> networkCallFuture;
+   final Object actualReturnValue;
+
+   public AsyncReturnValue(Future<Object> networkCallFuture, Object actualReturnValue) {
+      this.networkCallFuture = networkCallFuture;
+      this.actualReturnValue = actualReturnValue;
+   }
+
+   public boolean cancel(boolean mayInterruptIfRunning) {
+      return networkCallFuture.cancel(mayInterruptIfRunning);
+   }
+
+   public boolean isCancelled() {
+      return networkCallFuture.isCancelled();
+   }
+
+   public boolean isDone() {
+      return networkCallFuture.isDone();
+   }
+
+   public Object get() throws InterruptedException, ExecutionException {
+      networkCallFuture.get();
+      return actualReturnValue;
+   }
+
+   public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+      networkCallFuture.get(timeout, unit);
+      return actualReturnValue;
+   }
+}


Property changes on: trunk/core/src/main/java/org/infinispan/AsyncReturnValue.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Modified: trunk/core/src/main/java/org/infinispan/commands/RemoteCommandFactory.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/RemoteCommandFactory.java	2009-05-14 21:09:09 UTC (rev 307)
+++ trunk/core/src/main/java/org/infinispan/commands/RemoteCommandFactory.java	2009-05-14 21:18:43 UTC (rev 308)
@@ -19,7 +19,7 @@
 import org.infinispan.factories.annotations.Inject;
 import org.infinispan.factories.scopes.Scope;
 import org.infinispan.factories.scopes.Scopes;
-import org.infinispan.remoting.rpc.RpcManager;
+import org.infinispan.remoting.transport.Transport;
 
 /**
  * Specifically used to create un-initialized {@link org.infinispan.commands.ReplicableCommand}s from a byte stream.
@@ -29,11 +29,11 @@
  */
 @Scope(Scopes.GLOBAL)
 public class RemoteCommandFactory {
-   RpcManager rpcManager;
+   Transport transport;
 
    @Inject
-   public void init(RpcManager rpcManager) {
-      this.rpcManager = rpcManager;
+   public void inject(Transport transport) {
+      this.transport = transport;
    }
 
    /**
@@ -55,7 +55,7 @@
             break;
          case LockControlCommand.COMMAND_ID:
             command = new LockControlCommand();
-            break;   
+            break;
          case PutMapCommand.COMMAND_ID:
             command = new PutMapCommand();
             break;
@@ -94,7 +94,7 @@
             break;
          case StateTransferControlCommand.COMMAND_ID:
             command = new StateTransferControlCommand();
-            ((StateTransferControlCommand) command).init(rpcManager);
+            ((StateTransferControlCommand) command).init(transport);
             break;
          case ClusteredGetCommand.COMMAND_ID:
             command = new ClusteredGetCommand();

Modified: trunk/core/src/main/java/org/infinispan/commands/control/StateTransferControlCommand.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/control/StateTransferControlCommand.java	2009-05-14 21:09:09 UTC (rev 307)
+++ trunk/core/src/main/java/org/infinispan/commands/control/StateTransferControlCommand.java	2009-05-14 21:18:43 UTC (rev 308)
@@ -2,7 +2,7 @@
 
 import org.infinispan.commands.ReplicableCommand;
 import org.infinispan.context.InvocationContext;
-import org.infinispan.remoting.rpc.RpcManager;
+import org.infinispan.remoting.transport.Transport;
 
 /**
  * A command that informs caches participating in a state transfer of the various stages in the state transfer process.
@@ -12,7 +12,7 @@
  */
 public class StateTransferControlCommand implements ReplicableCommand {
    public static final int COMMAND_ID = 15;
-   RpcManager rpcManager;
+   Transport transport;
    boolean enabled;
 
    public StateTransferControlCommand() {
@@ -22,15 +22,15 @@
       this.enabled = enabled;
    }
 
-   public void init(RpcManager rpcManager) {
-      this.rpcManager = rpcManager;
+   public void init(Transport transport) {
+      this.transport = transport;
    }
 
    public Object perform(InvocationContext ctx) throws Throwable {
       if (enabled)
-         rpcManager.getTransport().getDistributedSync().acquireSync();
+         transport.getDistributedSync().acquireSync();
       else
-         rpcManager.getTransport().getDistributedSync().releaseSync();
+         transport.getDistributedSync().releaseSync();
       return null;
    }
 

Added: trunk/core/src/main/java/org/infinispan/distribution/DistAsyncReturnValue.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/DistAsyncReturnValue.java	                        (rev 0)
+++ trunk/core/src/main/java/org/infinispan/distribution/DistAsyncReturnValue.java	2009-05-14 21:18:43 UTC (rev 308)
@@ -0,0 +1,49 @@
+package org.infinispan.distribution;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * A version of the async return values for dist
+ *
+ * @author Manik Surtani
+ * @since 4.0
+ */
+public class DistAsyncReturnValue implements Future<Object> {
+   final Future<Object> invalFuture, replFuture;
+   final Object returnValue;
+
+   public DistAsyncReturnValue(Future<Object> invalFuture, Future<Object> replFuture, Object returnValue) {
+      this.invalFuture = invalFuture;
+      this.replFuture = replFuture;
+      this.returnValue = returnValue;
+   }
+
+   public boolean cancel(boolean mayInterruptIfRunning) {
+      boolean invalCancelled = true;
+      if (invalFuture != null) invalCancelled = invalFuture.cancel(mayInterruptIfRunning);
+      return replFuture.cancel(mayInterruptIfRunning) && invalCancelled;
+   }
+
+   public boolean isCancelled() {
+      return replFuture.isCancelled() && (invalFuture == null || invalFuture.isCancelled());
+   }
+
+   public boolean isDone() {
+      return replFuture.isDone() && (invalFuture == null || invalFuture.isDone());
+   }
+
+   public Object get() throws InterruptedException, ExecutionException {
+      if (invalFuture != null) invalFuture.get();
+      replFuture.get();
+      return returnValue;
+   }
+
+   public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+      if (invalFuture != null) invalFuture.get(timeout, unit);
+      replFuture.get(timeout, unit);
+      return returnValue;
+   }
+}


Property changes on: trunk/core/src/main/java/org/infinispan/distribution/DistAsyncReturnValue.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Modified: trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java	2009-05-14 21:09:09 UTC (rev 307)
+++ trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java	2009-05-14 21:18:43 UTC (rev 308)
@@ -12,12 +12,12 @@
 import org.infinispan.notifications.cachemanagerlistener.CacheManagerNotifier;
 import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged;
 import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
+import org.infinispan.remoting.responses.ClusteredGetResponseValidityFilter;
+import org.infinispan.remoting.responses.Response;
+import org.infinispan.remoting.responses.SuccessfulResponse;
 import org.infinispan.remoting.rpc.ResponseFilter;
 import org.infinispan.remoting.rpc.ResponseMode;
 import org.infinispan.remoting.rpc.RpcManager;
-import org.infinispan.remoting.responses.ClusteredGetResponseValidityFilter;
-import org.infinispan.remoting.responses.Response;
-import org.infinispan.remoting.responses.SuccessfulResponse;
 import org.infinispan.remoting.transport.Address;
 import org.infinispan.util.Util;
 import org.infinispan.util.logging.Log;
@@ -73,7 +73,7 @@
    }
 
    public boolean isLocal(Object key) {
-      return consistentHash.locate(key, replCount).contains(rpcManager.getLocalAddress());
+      return consistentHash.locate(key, replCount).contains(rpcManager.getTransport().getAddress());
    }
 
    public List<Address> locate(Object key) {
@@ -94,7 +94,7 @@
 
       ResponseFilter filter = new ClusteredGetResponseValidityFilter(locate(key));
       List<Response> responses = rpcManager.invokeRemotely(locate(key), get, ResponseMode.SYNCHRONOUS,
-                                                           configuration.getSyncReplTimeout(), false, filter, false);
+                                                           configuration.getSyncReplTimeout(), false, filter);
 
       if (!responses.isEmpty()) {
          for (Response r : responses) {

Modified: trunk/core/src/main/java/org/infinispan/factories/EmptyConstructorNamedCacheFactory.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/factories/EmptyConstructorNamedCacheFactory.java	2009-05-14 21:09:09 UTC (rev 307)
+++ trunk/core/src/main/java/org/infinispan/factories/EmptyConstructorNamedCacheFactory.java	2009-05-14 21:18:43 UTC (rev 308)
@@ -33,7 +33,6 @@
 import org.infinispan.marshall.Marshaller;
 import org.infinispan.marshall.VersionAwareMarshaller;
 import org.infinispan.notifications.cachelistener.CacheNotifier;
-import org.infinispan.remoting.rpc.CacheRpcManager;
 import org.infinispan.transaction.TransactionLog;
 
 /**
@@ -43,7 +42,7 @@
  * @since 4.0
  */
 @DefaultFactoryFor(classes = {CacheNotifier.class, EntryFactory.class, CommandsFactory.class,
-                              CacheLoaderManager.class, InvocationContextContainer.class, CacheRpcManager.class,
+                              CacheLoaderManager.class, InvocationContextContainer.class,
                               BatchContainer.class, TransactionLog.class, EvictionManager.class, InvocationContextContainer.class})
 public class EmptyConstructorNamedCacheFactory extends AbstractNamedCacheComponentFactory implements AutoInstantiableFactory {
    @Override
@@ -54,7 +53,7 @@
             if (componentType.equals(Marshaller.class)) {
                componentImpl = VersionAwareMarshaller.class;
             } else if (componentType.equals(InvocationContextContainer.class)) {
-                  componentImpl = InvocationContextContainerImpl.class;
+               componentImpl = InvocationContextContainerImpl.class;
             } else {
                // add an "Impl" to the end of the class name and try again
                componentImpl = getClass().getClassLoader().loadClass(componentType.getName() + "Impl");

Modified: trunk/core/src/main/java/org/infinispan/factories/RpcManagerFactory.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/factories/RpcManagerFactory.java	2009-05-14 21:09:09 UTC (rev 307)
+++ trunk/core/src/main/java/org/infinispan/factories/RpcManagerFactory.java	2009-05-14 21:18:43 UTC (rev 308)
@@ -32,8 +32,8 @@
  * @since 4.0
  */
 @DefaultFactoryFor(classes = RpcManager.class)
-public class RpcManagerFactory extends EmptyConstructorFactory implements AutoInstantiableFactory {
-   
+public class RpcManagerFactory extends EmptyConstructorNamedCacheFactory implements AutoInstantiableFactory {
+
    @Override
    public <T> T construct(Class<T> componentType) {
       // only do this if we have a transport configured!

Modified: trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java	2009-05-14 21:09:09 UTC (rev 307)
+++ trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java	2009-05-14 21:18:43 UTC (rev 308)
@@ -17,6 +17,7 @@
 import org.infinispan.context.Flag;
 import org.infinispan.context.InvocationContext;
 import org.infinispan.context.impl.TxInvocationContext;
+import org.infinispan.distribution.DistAsyncReturnValue;
 import org.infinispan.distribution.DistributionManager;
 import org.infinispan.factories.annotations.Inject;
 import org.infinispan.factories.annotations.Start;
@@ -30,11 +31,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 /**
  * The interceptor that handles distribution of entries across a cluster, as well as transparent lookup
@@ -173,7 +170,7 @@
    public Object visitCommitCommand(TxInvocationContext ctx, CommitCommand command) throws Throwable {
       if (ctx.isOriginLocal()) {
          List<Address> recipients = new ArrayList<Address>(ctx.getTransactionParticipants());
-         rpcManager.multicastRpcCommand(recipients, command, configuration.isSyncCommitPhase(), true);
+         rpcManager.anycastRpcCommand(recipients, command, configuration.isSyncCommitPhase(), true);
       }
       return invokeNextInterceptor(ctx, command);
    }
@@ -188,7 +185,7 @@
          List<Address> recipients = new ArrayList<Address>(ctx.getTransactionParticipants());
          if (trace) log.trace("Multicasting PrepareCommand to recipients : " + recipients);
          // this method will return immediately if we're the only member (because exclude_self=true)
-         rpcManager.multicastRpcCommand(recipients, command, sync, false);
+         rpcManager.anycastRpcCommand(recipients, command, sync);
       }
       return retVal;
    }
@@ -197,7 +194,7 @@
    public Object visitRollbackCommand(TxInvocationContext ctx, RollbackCommand command) throws Throwable {
       if (ctx.isOriginLocal()) {
          List<Address> recipients = new ArrayList<Address>(ctx.getTransactionParticipants());
-         rpcManager.multicastRpcCommand(recipients, command, configuration.isSyncRollbackPhase(), true);
+         rpcManager.anycastRpcCommand(recipients, command, configuration.isSyncRollbackPhase(), true);
       }
       return invokeNextInterceptor(ctx, command);
    }
@@ -241,13 +238,13 @@
                boolean sync = isSynchronous(ctx);
 
                // if L1 caching is used make sure we broadcast an invalidate message
-               if (isL1CacheEnabled && rec != null && rpcManager.getMembers().size() > rec.size()) {
+               if (isL1CacheEnabled && rec != null && rpcManager.getTransport().getMembers().size() > rec.size()) {
                   InvalidateCommand ic = cf.buildInvalidateFromL1Command(recipientGenerator.getKeys());
                   f1 = submitRpc(null, ic, sync, useFuture);
                }
                f2 = submitRpc(rec, command, sync, useFuture);
 
-               if (f2 != null) return new DistributionCommunicationFuture(f1, f2, returnValue);
+               if (f2 != null) return new DistAsyncReturnValue(f1, f2, returnValue);
             }
          } else {
             if (!localModeForced) {
@@ -260,64 +257,11 @@
       return returnValue;
    }
 
-   private class DistributionCommunicationFuture implements Future<Object> {
-      final Future<Object> invalFuture, replFuture;
-      final Object returnValue;
-
-      private DistributionCommunicationFuture(Future<Object> invalFuture, Future<Object> replFuture, Object returnValue) {
-         this.invalFuture = invalFuture;
-         this.replFuture = replFuture;
-         this.returnValue = returnValue;
-      }
-
-      public boolean cancel(boolean mayInterruptIfRunning) {
-         boolean invalCancelled = true;
-         if (invalFuture != null) invalCancelled = invalFuture.cancel(mayInterruptIfRunning);
-         return replFuture.cancel(mayInterruptIfRunning) && invalCancelled;
-      }
-
-      public boolean isCancelled() {
-         return replFuture.isCancelled() && (invalFuture == null || invalFuture.isCancelled());
-      }
-
-      public boolean isDone() {
-         return replFuture.isDone() && (invalFuture == null || invalFuture.isDone());
-      }
-
-      public Object get() throws InterruptedException, ExecutionException {
-         if (invalFuture != null) invalFuture.get();
-         replFuture.get();
-         return returnValue;
-      }
-
-      public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
-         if (invalFuture != null) invalFuture.get(timeout, unit);
-         replFuture.get(timeout, unit);
-         return returnValue;
-      }
-   }
-
-
    private Future<Object> submitRpc(final List<Address> recipients, final WriteCommand cmd, final boolean sync, boolean useFuture) {
       if (useFuture) {
-         Callable<Object> c = new Callable<Object>() {
-            public Object call() {
-               if (recipients == null) {
-                  rpcManager.broadcastReplicableCommand(cmd, true);
-               } else {
-                  rpcManager.multicastReplicableCommand(recipients, cmd, true);
-               }
-               return null;
-            }
-         };
-
-         return asyncExecutorService.submit(c);
+         return rpcManager.anycastRpcCommandInFuture(recipients, cmd);
       } else {
-         if (recipients == null) {
-            rpcManager.broadcastReplicableCommand(cmd, sync);
-         } else {
-            rpcManager.multicastReplicableCommand(recipients, cmd, sync);
-         }
+         rpcManager.anycastRpcCommand(recipients, cmd, sync);
          return null;
       }
    }

Modified: trunk/core/src/main/java/org/infinispan/interceptors/InvalidationInterceptor.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/interceptors/InvalidationInterceptor.java	2009-05-14 21:09:09 UTC (rev 307)
+++ trunk/core/src/main/java/org/infinispan/interceptors/InvalidationInterceptor.java	2009-05-14 21:18:43 UTC (rev 308)
@@ -21,6 +21,7 @@
  */
 package org.infinispan.interceptors;
 
+import org.infinispan.AsyncReturnValue;
 import org.infinispan.commands.AbstractVisitor;
 import org.infinispan.commands.CommandsFactory;
 import org.infinispan.commands.VisitableCommand;
@@ -48,7 +49,6 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicLong;
 
 
@@ -98,7 +98,7 @@
    public Object visitClearCommand(InvocationContext ctx, ClearCommand command) throws Throwable {
       // just broadcast the clear command - this is simplest!
       Object retval = invokeNextInterceptor(ctx, command);
-      if (ctx.isOriginLocal()) rpcManager.broadcastReplicableCommand(command, defaultSynchronous);
+      if (ctx.isOriginLocal()) rpcManager.broadcastRpcCommand(command, defaultSynchronous);
       return retval;
    }
 
@@ -192,17 +192,12 @@
          incrementInvalidations();
          final InvalidateCommand command = commandsFactory.buildInvalidateCommand(keys);
          if (log.isDebugEnabled())
-            log.debug("Cache [" + rpcManager.getLocalAddress() + "] replicating " + command);
+            log.debug("Cache [" + rpcManager.getTransport().getAddress() + "] replicating " + command);
          // voila, invalidated!
          if (useFuture) {
-            return submitRpcCall(new Callable<Object>() {
-               public Object call() throws Exception {
-                  rpcManager.broadcastReplicableCommand(command, true);
-                  return null;
-               }
-            }, retvalForFuture);
+            return new AsyncReturnValue(rpcManager.broadcastRpcCommandInFuture(command), retvalForFuture);
          } else {
-            rpcManager.broadcastReplicableCommand(command, synchronous);
+            rpcManager.broadcastRpcCommand(command, synchronous);
          }
       }
 

Modified: trunk/core/src/main/java/org/infinispan/interceptors/ReplicationInterceptor.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/interceptors/ReplicationInterceptor.java	2009-05-14 21:09:09 UTC (rev 307)
+++ trunk/core/src/main/java/org/infinispan/interceptors/ReplicationInterceptor.java	2009-05-14 21:18:43 UTC (rev 308)
@@ -21,6 +21,7 @@
  */
 package org.infinispan.interceptors;
 
+import org.infinispan.AsyncReturnValue;
 import org.infinispan.commands.LockControlCommand;
 import org.infinispan.commands.tx.CommitCommand;
 import org.infinispan.commands.tx.PrepareCommand;
@@ -36,8 +37,6 @@
 import org.infinispan.context.impl.TxInvocationContext;
 import org.infinispan.interceptors.base.BaseRpcInterceptor;
 
-import java.util.concurrent.Callable;
-
 /**
  * Takes care of replicating modifications to other caches in a cluster. Also listens for prepare(), commit() and
  * rollback() messages which are received 'side-ways' (see docs/design/Refactoring.txt).
@@ -117,14 +116,9 @@
       final Object returnValue = invokeNextInterceptor(ctx, command);
       if (!isLocalModeForced(ctx) && command.isSuccessful() && ctx.isOriginLocal() && !ctx.isInTxScope()) {
          if (ctx.isUseFutureReturnType()) {
-            return submitRpcCall(new Callable<Object>() {
-               public Object call() throws Exception {
-                  rpcManager.broadcastReplicableCommand(command, true);
-                  return null;
-               }
-            }, returnValue);
+            return new AsyncReturnValue(rpcManager.broadcastRpcCommandInFuture(command), returnValue);
          } else {
-            rpcManager.broadcastReplicableCommand(command, isSynchronous(ctx));
+            rpcManager.broadcastRpcCommand(command, isSynchronous(ctx));
          }
       }
       return returnValue;

Modified: trunk/core/src/main/java/org/infinispan/interceptors/base/BaseRpcInterceptor.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/interceptors/base/BaseRpcInterceptor.java	2009-05-14 21:09:09 UTC (rev 307)
+++ trunk/core/src/main/java/org/infinispan/interceptors/base/BaseRpcInterceptor.java	2009-05-14 21:18:43 UTC (rev 308)
@@ -23,19 +23,10 @@
 
 import org.infinispan.context.Flag;
 import org.infinispan.context.InvocationContext;
-import org.infinispan.factories.KnownComponentNames;
-import org.infinispan.factories.annotations.ComponentName;
 import org.infinispan.factories.annotations.Inject;
 import org.infinispan.factories.annotations.Start;
-import org.infinispan.remoting.rpc.CacheRpcManager;
+import org.infinispan.remoting.rpc.RpcManager;
 
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
 /**
  * Acts as a base for all RPC calls - subclassed by
  *
@@ -45,14 +36,11 @@
  */
 public abstract class BaseRpcInterceptor extends CommandInterceptor {
 
-   protected CacheRpcManager rpcManager;
-   protected ExecutorService asyncExecutorService;
+   protected RpcManager rpcManager;
 
    @Inject
-   public void init(CacheRpcManager rpcManager,
-                    @ComponentName(KnownComponentNames.ASYNC_TRANSPORT_EXECUTOR) ExecutorService e) {
+   public void init(RpcManager rpcManager) {
       this.rpcManager = rpcManager;
-      this.asyncExecutorService = e;
    }
 
    protected boolean defaultSynchronous;
@@ -78,35 +66,4 @@
       }
       return false;
    }
-
-   protected final <X> Future<X> submitRpcCall(Callable<Object> c, final Object returnValue) {
-      final Future f = asyncExecutorService.submit(c);
-      return new Future<X>() {
-
-         public boolean cancel(boolean mayInterruptIfRunning) {
-            return f.cancel(mayInterruptIfRunning);
-         }
-
-         public boolean isCancelled() {
-            return f.isCancelled();
-         }
-
-         public boolean isDone() {
-            return f.isDone();
-         }
-
-         @SuppressWarnings("unchecked")
-         public X get() throws InterruptedException, ExecutionException {
-            f.get(); // wait for f to complete first
-            return (X) returnValue;
-         }
-
-         @SuppressWarnings("unchecked")
-         public X get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
-            f.get(timeout, unit);
-            return (X) returnValue;
-         }
-      };
-   }
-
 }
\ No newline at end of file

Modified: trunk/core/src/main/java/org/infinispan/loaders/cluster/ClusterCacheLoader.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/loaders/cluster/ClusterCacheLoader.java	2009-05-14 21:09:09 UTC (rev 307)
+++ trunk/core/src/main/java/org/infinispan/loaders/cluster/ClusterCacheLoader.java	2009-05-14 21:18:43 UTC (rev 308)
@@ -80,10 +80,10 @@
 
    private List<Response> doRemoteCall(ClusteredGetCommand clusteredGetCommand) throws CacheLoaderException {
       Set<Address> validMembers = new HashSet<Address>(rpcManager.getTransport().getMembers());
-      validMembers.remove(rpcManager.getLocalAddress());
+      validMembers.remove(rpcManager.getTransport().getAddress());
       ResponseFilter filter = new ClusteredGetResponseValidityFilter(validMembers);
       try {
-         return rpcManager.invokeRemotely(null, clusteredGetCommand, ResponseMode.WAIT_FOR_VALID_RESPONSE, config.getRemoteCallTimeout(), false, filter, false);
+         return rpcManager.invokeRemotely(null, clusteredGetCommand, ResponseMode.WAIT_FOR_VALID_RESPONSE, config.getRemoteCallTimeout(), false, filter);
       } catch (Exception e) {
          log.error("error while doing remote call", e);
          throw new CacheLoaderException(e);

Modified: trunk/core/src/main/java/org/infinispan/manager/DefaultCacheManager.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/manager/DefaultCacheManager.java	2009-05-14 21:09:09 UTC (rev 307)
+++ trunk/core/src/main/java/org/infinispan/manager/DefaultCacheManager.java	2009-05-14 21:18:43 UTC (rev 308)
@@ -38,8 +38,8 @@
 import org.infinispan.lifecycle.ComponentStatus;
 import org.infinispan.lifecycle.Lifecycle;
 import org.infinispan.notifications.cachemanagerlistener.CacheManagerNotifier;
-import org.infinispan.remoting.rpc.RpcManager;
 import org.infinispan.remoting.transport.Address;
+import org.infinispan.remoting.transport.Transport;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -311,20 +311,20 @@
 
    public List<Address> getMembers() {
       if (globalComponentRegistry == null) return null;
-      RpcManager rpcManager = globalComponentRegistry.getComponent(RpcManager.class);
-      return rpcManager == null ? null : rpcManager.getTransport().getMembers();
+      Transport t = globalComponentRegistry.getComponent(Transport.class);
+      return t == null ? null : t.getMembers();
    }
 
    public Address getAddress() {
       if (globalComponentRegistry == null) return null;
-      RpcManager rpcManager = globalComponentRegistry.getComponent(RpcManager.class);
-      return rpcManager == null ? null : rpcManager.getLocalAddress();
+      Transport t = globalComponentRegistry.getComponent(Transport.class);
+      return t == null ? null : t.getAddress();
    }
 
    public boolean isCoordinator() {
       if (globalComponentRegistry == null) return false;
-      RpcManager rpcManager = globalComponentRegistry.getComponent(RpcManager.class);
-      return rpcManager != null && rpcManager.getTransport().isCoordinator();
+      Transport t = globalComponentRegistry.getComponent(Transport.class);
+      return t != null && t.isCoordinator();
    }
 
    private Cache createCache(String cacheName) {

Modified: trunk/core/src/main/java/org/infinispan/marshall/jboss/ExternalizerClassFactory.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/marshall/jboss/ExternalizerClassFactory.java	2009-05-14 21:09:09 UTC (rev 307)
+++ trunk/core/src/main/java/org/infinispan/marshall/jboss/ExternalizerClassFactory.java	2009-05-14 21:18:43 UTC (rev 308)
@@ -45,11 +45,11 @@
 import org.infinispan.container.entries.TransientMortalCacheEntry;
 import org.infinispan.marshall.MarshalledValue;
 import org.infinispan.marshall.jboss.externalizers.*;
-import org.infinispan.remoting.rpc.RpcManager;
 import org.infinispan.remoting.responses.ExceptionResponse;
 import org.infinispan.remoting.responses.ExtendedResponse;
 import org.infinispan.remoting.responses.SuccessfulResponse;
 import org.infinispan.remoting.responses.UnsuccessfulResponse;
+import org.infinispan.remoting.transport.Transport;
 import org.infinispan.remoting.transport.jgroups.JGroupsAddress;
 import org.infinispan.transaction.xa.GlobalTransaction;
 import org.infinispan.util.FastCopyHashMap;
@@ -116,16 +116,16 @@
       EXTERNALIZERS.put(MortalCacheEntry.class.getName(), InternalCachedEntryExternalizer.class.getName());
       EXTERNALIZERS.put(TransientCacheEntry.class.getName(), InternalCachedEntryExternalizer.class.getName());
       EXTERNALIZERS.put(TransientMortalCacheEntry.class.getName(), InternalCachedEntryExternalizer.class.getName());
-      
+
       EXTERNALIZERS.put(InvalidateL1Command.class.getName(), ReplicableCommandExternalizer.class.getName());
    }
 
    private final Map<Class<?>, Externalizer> externalizers = new WeakHashMap<Class<?>, Externalizer>();
-   private final RpcManager rpcManager;
+   private final Transport transport;
    private final CustomObjectTable objectTable;
 
-   public ExternalizerClassFactory(RpcManager rpcManager, CustomObjectTable objectTable) {
-      this.rpcManager = rpcManager;
+   public ExternalizerClassFactory(Transport transport, CustomObjectTable objectTable) {
+      this.transport = transport;
       this.objectTable = objectTable;
    }
 
@@ -135,7 +135,7 @@
             Class typeClazz = Util.loadClass(entry.getKey());
             Externalizer ext = (Externalizer) Util.getInstance(entry.getValue());
             if (ext instanceof StateTransferControlCommandExternalizer) {
-               ((StateTransferControlCommandExternalizer) ext).init(rpcManager);
+               ((StateTransferControlCommandExternalizer) ext).init(transport);
             }
             externalizers.put(typeClazz, ext);
             objectTable.add(ext);

Modified: trunk/core/src/main/java/org/infinispan/marshall/jboss/JBossMarshaller.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/marshall/jboss/JBossMarshaller.java	2009-05-14 21:09:09 UTC (rev 307)
+++ trunk/core/src/main/java/org/infinispan/marshall/jboss/JBossMarshaller.java	2009-05-14 21:18:43 UTC (rev 308)
@@ -29,7 +29,7 @@
 import org.infinispan.io.ByteBuffer;
 import org.infinispan.io.ExposedByteArrayOutputStream;
 import org.infinispan.marshall.AbstractMarshaller;
-import org.infinispan.remoting.rpc.RpcManager;
+import org.infinispan.remoting.transport.Transport;
 import org.infinispan.util.Util;
 import org.infinispan.util.logging.Log;
 import org.infinispan.util.logging.LogFactory;
@@ -68,7 +68,7 @@
 ///   private boolean trace;
 
    @Inject
-   public void init(ClassLoader defaultCl, RpcManager rpcManager) {
+   public void init(ClassLoader defaultCl, Transport transport) {
       log.debug("Using JBoss Marshalling based marshaller.");
 
 //      trace = log.isTraceEnabled();
@@ -83,7 +83,7 @@
 
       classTable = createMagicNumberClassTable();
       objectTable = createCustomObjectTable();
-      externalizerFactoryAndObjectTable = createCustomExternalizerFactory(rpcManager, objectTable);
+      externalizerFactoryAndObjectTable = createCustomExternalizerFactory(transport, objectTable);
 
       configuration = new MarshallingConfiguration();
       configuration.setCreator(new SunReflectiveCreator());
@@ -185,8 +185,8 @@
       return classTable;
    }
 
-   protected ExternalizerClassFactory createCustomExternalizerFactory(RpcManager rpcManager, CustomObjectTable objectTable) {
-      ExternalizerClassFactory externalizerFactory = new ExternalizerClassFactory(rpcManager, objectTable);
+   protected ExternalizerClassFactory createCustomExternalizerFactory(Transport transport, CustomObjectTable objectTable) {
+      ExternalizerClassFactory externalizerFactory = new ExternalizerClassFactory(transport, objectTable);
       externalizerFactory.init();
       return externalizerFactory;
    }

Modified: trunk/core/src/main/java/org/infinispan/marshall/jboss/externalizers/StateTransferControlCommandExternalizer.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/marshall/jboss/externalizers/StateTransferControlCommandExternalizer.java	2009-05-14 21:09:09 UTC (rev 307)
+++ trunk/core/src/main/java/org/infinispan/marshall/jboss/externalizers/StateTransferControlCommandExternalizer.java	2009-05-14 21:18:43 UTC (rev 308)
@@ -22,7 +22,7 @@
 package org.infinispan.marshall.jboss.externalizers;
 
 import org.infinispan.commands.control.StateTransferControlCommand;
-import org.infinispan.remoting.rpc.RpcManager;
+import org.infinispan.remoting.transport.Transport;
 import org.jboss.marshalling.Creator;
 
 import java.io.IOException;
@@ -41,10 +41,10 @@
     */
    private static final long serialVersionUID = -3743458410265076691L;
 
-   private RpcManager rpcManager;
+   private Transport transport;
 
-   public void init(RpcManager rpcManager) {
-      this.rpcManager = rpcManager;
+   public void init(Transport transport) {
+      this.transport = transport;
    }
 
    /**
@@ -56,7 +56,7 @@
    public Object createExternal(Class<?> subjectType, ObjectInput input, Creator defaultCreator)
          throws IOException, ClassNotFoundException {
       StateTransferControlCommand command = new StateTransferControlCommand();
-      command.init(rpcManager);
+      command.init(transport);
       return command;
    }
 }
\ No newline at end of file

Modified: trunk/core/src/main/java/org/infinispan/remoting/ReplicationQueue.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/ReplicationQueue.java	2009-05-14 21:09:09 UTC (rev 307)
+++ trunk/core/src/main/java/org/infinispan/remoting/ReplicationQueue.java	2009-05-14 21:18:43 UTC (rev 308)
@@ -30,10 +30,10 @@
 import org.infinispan.factories.annotations.Inject;
 import org.infinispan.factories.annotations.Start;
 import org.infinispan.factories.annotations.Stop;
+import org.infinispan.remoting.rpc.ResponseMode;
+import org.infinispan.remoting.rpc.RpcManager;
 import org.infinispan.util.logging.Log;
 import org.infinispan.util.logging.LogFactory;
-import org.infinispan.remoting.rpc.RpcManager;
-import org.infinispan.remoting.rpc.ResponseMode;
 
 import java.util.ArrayList;
 import java.util.LinkedList;
@@ -146,7 +146,7 @@
             log.trace("Flushing {0} elements", toReplicateSize);
             MultipleRpcCommand multipleRpcCommand = commandsFactory.buildReplicateCommand(toReplicate);
             // send to all live caches in the cluster
-            rpcManager.invokeRemotely(null, multipleRpcCommand, ResponseMode.ASYNCHRONOUS, configuration.getSyncReplTimeout(), stateTransferEnabled);
+            rpcManager.invokeRemotely(null, multipleRpcCommand, ResponseMode.ASYNCHRONOUS, configuration.getSyncReplTimeout());
          }
          catch (Throwable t) {
             log.error("failed replicating " + toReplicate.size() + " elements in replication queue", t);

Deleted: trunk/core/src/main/java/org/infinispan/remoting/rpc/CacheRpcManager.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/rpc/CacheRpcManager.java	2009-05-14 21:09:09 UTC (rev 307)
+++ trunk/core/src/main/java/org/infinispan/remoting/rpc/CacheRpcManager.java	2009-05-14 21:18:43 UTC (rev 308)
@@ -1,136 +0,0 @@
-package org.infinispan.remoting.rpc;
-
-import org.infinispan.CacheException;
-import org.infinispan.commands.CommandsFactory;
-import org.infinispan.commands.ReplicableCommand;
-import org.infinispan.commands.remote.CacheRpcCommand;
-import org.infinispan.commands.remote.SingleRpcCommand;
-import org.infinispan.config.Configuration;
-import org.infinispan.factories.annotations.Inject;
-import org.infinispan.factories.annotations.Start;
-import org.infinispan.remoting.ReplicationException;
-import org.infinispan.remoting.ReplicationQueue;
-import org.infinispan.remoting.transport.Address;
-import org.infinispan.util.logging.Log;
-import org.infinispan.util.logging.LogFactory;
-
-import java.util.List;
-
-/**
- * Class that encapsulates the logic for replicating commands through cluster participants.
- *
- * @author Mircea.Markus at jboss.com
- * @since 4.0
- */
-public class CacheRpcManager {
-
-   private static Log log = LogFactory.getLog(CacheRpcManager.class);
-   private static final boolean trace = log.isTraceEnabled();
-
-   private Configuration configuration;
-   private boolean stateTransferEnabled;
-
-   private ReplicationQueue replicationQueue;
-   private RpcManager rpcManager;
-   private CommandsFactory commandsFactory;
-
-
-   @Inject
-   public void init(ReplicationQueue replicationQueue, RpcManager rpcManager, CommandsFactory commandsFactory, Configuration configuration) {
-      this.replicationQueue = replicationQueue;
-      this.rpcManager = rpcManager;
-      this.commandsFactory = commandsFactory;
-      this.configuration = configuration;
-   }
-
-   @Start
-   public void init() {
-      stateTransferEnabled = configuration.isFetchInMemoryState();
-   }
-
-   public void broadcastRpcCommand(CacheRpcCommand rpc, boolean sync, boolean useOutOfBandMessage) throws ReplicationException {
-      if (useReplicationQueue(sync)) {
-         replicationQueue.add(rpc);
-      } else {
-         multicastRpcCommand(null, rpc, sync, useOutOfBandMessage);
-      }
-   }
-
-   public void broadcastReplicableCommand(ReplicableCommand call, boolean sync) throws ReplicationException {
-      multicastReplicableCommand(null, call, sync);
-   }
-
-   public void multicastReplicableCommand(List<Address> members, ReplicableCommand call, boolean sync) throws ReplicationException {
-      if (useReplicationQueue(sync)) {
-         replicationQueue.add(call);
-      } else {
-         SingleRpcCommand rpcCommand = commandsFactory.buildSingleRpcCommand(call);
-         multicastRpcCommand(members, rpcCommand, sync, false);
-      }
-   }
-
-   private ResponseMode getResponseMode(boolean sync) {
-      return sync ? ResponseMode.SYNCHRONOUS : configuration.isUseAsyncMarshalling() ? ResponseMode.ASYNCHRONOUS : ResponseMode.ASYNCHRONOUS_WITH_SYNC_MARSHALLING;
-   }
-
-   public void multicastRpcCommand(List<Address> recipients, CacheRpcCommand command, boolean sync, boolean useOutOfBandMessage) throws ReplicationException {
-      if (trace) {
-         log.trace("invoking method " + command.getClass().getSimpleName() + ", members=" + rpcManager.getTransport().getMembers() + ", mode=" +
-               configuration.getCacheMode() + ", exclude_self=" + true + ", timeout=" +
-               configuration.getSyncReplTimeout());
-         log.trace("Broadcasting call " + command + " to recipient list " + recipients);
-      }
-
-      List rsps;
-      try {
-         rsps = rpcManager.invokeRemotely(recipients,
-                                          command,
-                                          getResponseMode(sync),
-                                          configuration.getSyncReplTimeout(), useOutOfBandMessage, stateTransferEnabled
-         );
-         if (trace) log.trace("responses=" + rsps);
-         if (sync) checkResponses(rsps);
-      } catch (CacheException e) {
-         log.error("Replication exception", e);
-         throw e;
-      } catch (Exception ex) {
-         log.error("Unexpected exception", ex);
-         throw new ReplicationException("Unexpected exception while replicating", ex);
-      }
-   }
-
-   public boolean isClusterStarted() {
-      return rpcManager != null && rpcManager.getTransport() != null && rpcManager.getTransport().getMembers() != null
-            && rpcManager.getTransport().getMembers().size() > 1;
-   }
-
-   private boolean useReplicationQueue(boolean sync) {
-      return !sync && replicationQueue != null;
-   }
-
-   /**
-    * Checks whether any of the responses are exceptions. If yes, re-throws them (as exceptions or runtime exceptions).
-    */
-   private void checkResponses(List rsps) {
-      if (rsps != null) {
-         for (Object rsp : rsps) {
-            if (rsp != null && rsp instanceof Throwable) {
-               // lets print a stack trace first.
-               Throwable throwable = (Throwable) rsp;
-               if (trace) {
-                  log.trace("Received Throwable from remote cache", throwable);
-               }
-               throw new ReplicationException(throwable);
-            }
-         }
-      }
-   }
-
-   public Address getLocalAddress() {
-      return rpcManager != null ? rpcManager.getLocalAddress() : null;
-   }
-
-   public List<Address> getMembers() {
-      return rpcManager.getTransport().getMembers();
-   }
-}

Modified: trunk/core/src/main/java/org/infinispan/remoting/rpc/ResponseFilter.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/rpc/ResponseFilter.java	2009-05-14 21:09:09 UTC (rev 307)
+++ trunk/core/src/main/java/org/infinispan/remoting/rpc/ResponseFilter.java	2009-05-14 21:18:43 UTC (rev 308)
@@ -5,7 +5,7 @@
 
 /**
  * A mechanism of filtering RPC responses.  Used with {@link RpcManager#invokeRemotely(java.util.List,
- * org.infinispan.commands.ReplicableCommand, ResponseMode, long, boolean, ResponseFilter, boolean)}
+ * org.infinispan.commands.ReplicableCommand, ResponseMode, long, boolean, ResponseFilter)}
  *
  * @author Manik Surtani
  * @since 4.0

Modified: trunk/core/src/main/java/org/infinispan/remoting/rpc/RpcManager.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/rpc/RpcManager.java	2009-05-14 21:09:09 UTC (rev 307)
+++ trunk/core/src/main/java/org/infinispan/remoting/rpc/RpcManager.java	2009-05-14 21:18:43 UTC (rev 308)
@@ -22,80 +22,66 @@
 package org.infinispan.remoting.rpc;
 
 import org.infinispan.commands.ReplicableCommand;
-import org.infinispan.factories.annotations.NonVolatile;
-import org.infinispan.factories.scopes.Scope;
-import org.infinispan.factories.scopes.Scopes;
-import org.infinispan.lifecycle.Lifecycle;
+import org.infinispan.remoting.ReplicationException;
 import org.infinispan.remoting.responses.Response;
 import org.infinispan.remoting.transport.Address;
 import org.infinispan.remoting.transport.Transport;
 import org.infinispan.statetransfer.StateTransferException;
 
 import java.util.List;
+import java.util.concurrent.Future;
 
 /**
- * Provides a mechanism for communicating with other caches in the cluster.
- * <p/>
- * Implementations have a simple lifecycle: <ul> <li>start() - starts the underlying communication channel based on
- * configuration options injected, and connects the channel</li> <li>stop() - stops the dispatcher and releases
- * resources</li> </ul>
+ * Provides a mechanism for communicating with other caches in the cluster, by formatting and passing requests down to
+ * the registered {@link Transport}.
  *
  * @author Manik Surtani
  * @since 4.0
  */
- at Scope(Scopes.GLOBAL)
- at NonVolatile
-public interface RpcManager extends Lifecycle {
-   // TODO this needs to be re-thought regarding adding a transport-independent mechanism of unicasts for distribution based on consistent hashes
+public interface RpcManager {
    /**
     * Invokes an RPC call on other caches in the cluster.
     *
-    * @param recipients           a list of Addresses to invoke the call on.  If this is null, the call is broadcast to
-    *                             the entire cluster.
-    * @param rpcCommand           the cache command to invoke
-    * @param mode                 the response mode to use
-    * @param timeout              a timeout after which to throw a replication exception.
-    * @param usePriorityQueue     if true, a priority queue is used to deliver messages.  May not be supported by all
-    *                             implementations.
-    * @param responseFilter       a response filter with which to filter out failed/unwanted/invalid responses.
-    * @param stateTransferEnabled if true, additional replaying is considered if messages need to be re-sent during the
-    *                             course of a state transfer
+    * @param recipients       a list of Addresses to invoke the call on.  If this is null, the call is broadcast to the
+    *                         entire cluster.
+    * @param rpcCommand       the cache command to invoke
+    * @param mode             the response mode to use
+    * @param timeout          a timeout after which to throw a replication exception.
+    * @param usePriorityQueue if true, a priority queue is used to deliver messages.  May not be supported by all
+    *                         implementations.
+    * @param responseFilter   a response filter with which to filter out failed/unwanted/invalid responses.
     * @return a list of responses from each member contacted.
     * @throws Exception in the event of problems.
     */
-   List<Response> invokeRemotely(List<Address> recipients, ReplicableCommand rpcCommand, ResponseMode mode, long timeout, boolean usePriorityQueue, ResponseFilter responseFilter, boolean stateTransferEnabled) throws Exception;
+   List<Response> invokeRemotely(List<Address> recipients, ReplicableCommand rpcCommand, ResponseMode mode, long timeout, boolean usePriorityQueue, ResponseFilter responseFilter) throws Exception;
 
    /**
     * Invokes an RPC call on other caches in the cluster.
     *
-    * @param recipients           a list of Addresses to invoke the call on.  If this is null, the call is broadcast to
-    *                             the entire cluster.
-    * @param rpcCommand           the cache command to invoke
-    * @param mode                 the response mode to use
-    * @param timeout              a timeout after which to throw a replication exception.
-    * @param usePriorityQueue     if true, a priority queue is used to deliver messages.  May not be supported by all
-    *                             implementations.
-    * @param stateTransferEnabled if true, additional replaying is considered if messages need to be re-sent during the
-    *                             course of a state transfer
+    * @param recipients       a list of Addresses to invoke the call on.  If this is null, the call is broadcast to the
+    *                         entire cluster.
+    * @param rpcCommand       the cache command to invoke
+    * @param mode             the response mode to use
+    * @param timeout          a timeout after which to throw a replication exception.
+    * @param usePriorityQueue if true, a priority queue is used to deliver messages.  May not be supported by all
+    *                         implementations.
     * @return a list of responses from each member contacted.
     * @throws Exception in the event of problems.
     */
-   List<Response> invokeRemotely(List<Address> recipients, ReplicableCommand rpcCommand, ResponseMode mode, long timeout, boolean usePriorityQueue, boolean stateTransferEnabled) throws Exception;
+   List<Response> invokeRemotely(List<Address> recipients, ReplicableCommand rpcCommand, ResponseMode mode, long timeout, boolean usePriorityQueue) throws Exception;
 
    /**
     * Invokes an RPC call on other caches in the cluster.
     *
-    * @param recipients           a list of Addresses to invoke the call on.  If this is null, the call is broadcast to
-    *                             the entire cluster.
-    * @param rpcCommand           the cache command to invoke
-    * @param mode                 the response mode to use
-    * @param timeout              a timeout after which to throw a replication exception.
-    * @param stateTransferEnabled if true, additional replaying is considered if messages need to be re-sent during the
-    *                             course of a state transfer
+    * @param recipients a list of Addresses to invoke the call on.  If this is null, the call is broadcast to the entire
+    *                   cluster.
+    * @param rpcCommand the cache command to invoke
+    * @param mode       the response mode to use
+    * @param timeout    a timeout after which to throw a replication exception.
     * @return a list of responses from each member contacted.
     * @throws Exception in the event of problems.
     */
-   List<Response> invokeRemotely(List<Address> recipients, ReplicableCommand rpcCommand, ResponseMode mode, long timeout, boolean stateTransferEnabled) throws Exception;
+   List<Response> invokeRemotely(List<Address> recipients, ReplicableCommand rpcCommand, ResponseMode mode, long timeout) throws Exception;
 
    /**
     * Initiates a state retrieval process from neighbouring caches.  This method will block until it either times out,
@@ -109,6 +95,92 @@
    void retrieveState(String cacheName, long timeout) throws StateTransferException;
 
    /**
+    * Broadcasts an RPC command to the entire cluster.
+    *
+    * @param rpc  command to execute remotely
+    * @param sync if true, the transport will operate in sync mode.  Otherwise, it will operate in async mode.
+    * @throws ReplicationException in the event of problems
+    */
+   void broadcastRpcCommand(ReplicableCommand rpc, boolean sync) throws ReplicationException;
+
+   /**
+    * Broadcasts an RPC command to the entire cluster.
+    *
+    * @param rpc              command to execute remotely
+    * @param sync             if true, the transport will operate in sync mode.  Otherwise, it will operate in async
+    *                         mode.
+    * @param usePriorityQueue if true, a priority queue is used
+    * @throws ReplicationException in the event of problems
+    */
+   void broadcastRpcCommand(ReplicableCommand rpc, boolean sync, boolean usePriorityQueue) throws ReplicationException;
+
+   /**
+    * The same as {@link #broadcastRpcCommand(org.infinispan.commands.ReplicableCommand, boolean)} except that the task
+    * is passed to the transport executor and a Future is returned.  The transport always deals with this
+    * synchronously.
+    *
+    * @param rpc command to execute remotely
+    * @return a future
+    */
+   Future<Object> broadcastRpcCommandInFuture(ReplicableCommand rpc);
+
+   /**
+    * The same as {@link #broadcastRpcCommand(org.infinispan.commands.ReplicableCommand, boolean, boolean)} except that
+    * the task is passed to the transport executor and a Future is returned.  The transport always deals with this
+    * synchronously.
+    *
+    * @param rpc              command to execute remotely
+    * @param usePriorityQueue if true, a priority queue is used
+    * @return a future
+    */
+   Future<Object> broadcastRpcCommandInFuture(ReplicableCommand rpc, boolean usePriorityQueue);
+
+   /**
+    * Broadcasts an RPC command to a specified set of recipients
+    *
+    * @param recipients recipients to invoke remote command on
+    * @param rpc        command to execute remotely
+    * @param sync       if true, the transport will operate in sync mode.  Otherwise, it will operate in async mode.
+    * @throws ReplicationException in the event of problems
+    */
+   void anycastRpcCommand(List<Address> recipients, ReplicableCommand rpc, boolean sync) throws ReplicationException;
+
+   /**
+    * Broadcasts an RPC command to a specified set of recipients
+    *
+    * @param recipients       recipients to invoke remote command on
+    * @param rpc              command to execute remotely
+    * @param sync             if true, the transport will operate in sync mode.  Otherwise, it will operate in async
+    *                         mode.
+    * @param usePriorityQueue if true, a priority queue is used
+    * @throws ReplicationException in the event of problems
+    */
+   void anycastRpcCommand(List<Address> recipients, ReplicableCommand rpc, boolean sync, boolean usePriorityQueue) throws ReplicationException;
+
+   /**
+    * The same as {@link #anycastRpcCommand(java.util.List, org.infinispan.commands.ReplicableCommand, boolean)} except
+    * that the task is passed to the transport executor and a Future is returned.  The transport always deals with this
+    * synchronously.
+    *
+    * @param recipients recipients to invoke remote call on
+    * @param rpc        command to execute remotely
+    * @return a future
+    */
+   Future<Object> anycastRpcCommandInFuture(List<Address> recipients, ReplicableCommand rpc);
+
+   /**
+    * The same as {@link #anycastRpcCommand(java.util.List, org.infinispan.commands.ReplicableCommand, boolean)} except
+    * that the task is passed to the transport executor and a Future is returned.  The transport always deals with this
+    * synchronously.
+    *
+    * @param recipients       recipients to invoke remote call on
+    * @param rpc              command to execute remotely
+    * @param usePriorityQueue if true, a priority queue is used
+    * @return a future
+    */
+   Future<Object> anycastRpcCommandInFuture(List<Address> recipients, ReplicableCommand rpc, boolean usePriorityQueue);
+
+   /**
     * @return a reference to the underlying transport.
     */
    Transport getTransport();
@@ -122,9 +194,4 @@
     *         a null otherwise.
     */
    Address getCurrentStateTransferSource();
-
-   /**
-    * Returns the local address.
-    */
-   public Address getLocalAddress();
 }
\ No newline at end of file

Modified: trunk/core/src/main/java/org/infinispan/remoting/rpc/RpcManagerImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/rpc/RpcManagerImpl.java	2009-05-14 21:09:09 UTC (rev 307)
+++ trunk/core/src/main/java/org/infinispan/remoting/rpc/RpcManagerImpl.java	2009-05-14 21:18:43 UTC (rev 308)
@@ -1,19 +1,19 @@
 package org.infinispan.remoting.rpc;
 
 import org.infinispan.CacheException;
+import org.infinispan.commands.CommandsFactory;
 import org.infinispan.commands.ReplicableCommand;
-import org.infinispan.config.GlobalConfiguration;
+import org.infinispan.commands.remote.CacheRpcCommand;
+import org.infinispan.config.Configuration;
 import org.infinispan.factories.KnownComponentNames;
 import org.infinispan.factories.annotations.ComponentName;
 import org.infinispan.factories.annotations.Inject;
 import org.infinispan.factories.annotations.Start;
-import org.infinispan.factories.annotations.Stop;
 import org.infinispan.jmx.annotations.MBean;
 import org.infinispan.jmx.annotations.ManagedAttribute;
 import org.infinispan.jmx.annotations.ManagedOperation;
-import org.infinispan.marshall.Marshaller;
-import org.infinispan.notifications.cachemanagerlistener.CacheManagerNotifier;
-import org.infinispan.remoting.InboundInvocationHandler;
+import org.infinispan.remoting.ReplicationException;
+import org.infinispan.remoting.ReplicationQueue;
 import org.infinispan.remoting.responses.Response;
 import org.infinispan.remoting.transport.Address;
 import org.infinispan.remoting.transport.Transport;
@@ -25,7 +25,9 @@
 import java.util.Collections;
 import java.util.List;
 import java.util.Random;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -40,37 +42,43 @@
 public class RpcManagerImpl implements RpcManager {
 
    private static final Log log = LogFactory.getLog(RpcManagerImpl.class);
+   private static final boolean trace = log.isTraceEnabled();
 
-   Transport t;
+   private Transport t;
    private final AtomicLong replicationCount = new AtomicLong(0);
    private final AtomicLong replicationFailures = new AtomicLong(0);
    boolean statisticsEnabled = false; // by default, don't gather statistics.
    private volatile Address currentStateTransferSource;
+   private boolean stateTransferEnabled;
+   private Configuration configuration;
+   private ReplicationQueue replicationQueue;
+   private ExecutorService asyncExecutor;
+   private CommandsFactory cf;
 
+
    @Inject
-   public void injectDependencies(GlobalConfiguration globalConfiguration, Transport t, InboundInvocationHandler handler,
-                                  Marshaller marshaller,
-                                  @ComponentName(KnownComponentNames.ASYNC_TRANSPORT_EXECUTOR) ExecutorService e,
-                                  CacheManagerNotifier notifier) {
+   public void injectDependencies(Transport t, Configuration configuration, ReplicationQueue replicationQueue, CommandsFactory cf,
+                                  @ComponentName(KnownComponentNames.ASYNC_TRANSPORT_EXECUTOR) ExecutorService e) {
       this.t = t;
-      this.t.initialize(globalConfiguration, globalConfiguration.getTransportProperties(), marshaller, e, handler,
-                        notifier, globalConfiguration.getDistributedSyncTimeout());
+      this.configuration = configuration;
+      this.replicationQueue = replicationQueue;
+      this.asyncExecutor = e;
+      this.cf = cf;
    }
 
-   @Start(priority = 10)
-   public void start() {
-      t.start();
+   @Start(priority = 9)
+   private void start() {
+      stateTransferEnabled = configuration.isStateTransferEnabled();
    }
 
-   @Stop
-   public void stop() {
-      t.stop();
+   private boolean useReplicationQueue(boolean sync) {
+      return !sync && replicationQueue != null && replicationQueue.isEnabled();
    }
 
-   public List<Response> invokeRemotely(List<Address> recipients, ReplicableCommand rpcCommand, ResponseMode mode, long timeout, boolean usePriorityQueue, ResponseFilter responseFilter, boolean stateTransferEnabled) throws Exception {
+   public final List<Response> invokeRemotely(List<Address> recipients, ReplicableCommand rpcCommand, ResponseMode mode, long timeout, boolean usePriorityQueue, ResponseFilter responseFilter) throws Exception {
       List<Address> members = t.getMembers();
       if (members.size() < 2) {
-         if (log.isDebugEnabled()) 
+         if (log.isDebugEnabled())
             log.debug("We're the only member in the cluster; Don't invoke remotely.");
          return Collections.emptyList();
       } else {
@@ -90,12 +98,12 @@
       }
    }
 
-   public List<Response> invokeRemotely(List<Address> recipients, ReplicableCommand rpcCommand, ResponseMode mode, long timeout, boolean usePriorityQueue, boolean stateTransferEnabled) throws Exception {
-      return invokeRemotely(recipients, rpcCommand, mode, timeout, usePriorityQueue, null, stateTransferEnabled);
+   public final List<Response> invokeRemotely(List<Address> recipients, ReplicableCommand rpcCommand, ResponseMode mode, long timeout, boolean usePriorityQueue) throws Exception {
+      return invokeRemotely(recipients, rpcCommand, mode, timeout, usePriorityQueue, null);
    }
 
-   public List<Response> invokeRemotely(List<Address> recipients, ReplicableCommand rpcCommand, ResponseMode mode, long timeout, boolean stateTransferEnabled) throws Exception {
-      return invokeRemotely(recipients, rpcCommand, mode, timeout, false, null, stateTransferEnabled);
+   public final List<Response> invokeRemotely(List<Address> recipients, ReplicableCommand rpcCommand, ResponseMode mode, long timeout) throws Exception {
+      return invokeRemotely(recipients, rpcCommand, mode, timeout, false, null);
    }
 
    public void retrieveState(String cacheName, long timeout) throws StateTransferException {
@@ -160,6 +168,71 @@
       }
    }
 
+   public final void broadcastRpcCommand(ReplicableCommand rpc, boolean sync) throws ReplicationException {
+      broadcastRpcCommand(rpc, sync, false);
+   }
+
+   public final void broadcastRpcCommand(ReplicableCommand rpc, boolean sync, boolean usePriorityQueue) throws ReplicationException {
+      if (useReplicationQueue(sync)) {
+         replicationQueue.add(rpc);
+      } else {
+         anycastRpcCommand(null, rpc, sync, usePriorityQueue);
+      }
+   }
+
+   public final Future<Object> broadcastRpcCommandInFuture(ReplicableCommand rpc) {
+      return broadcastRpcCommandInFuture(rpc, false);
+   }
+
+   public final Future<Object> broadcastRpcCommandInFuture(ReplicableCommand rpc, boolean usePriorityQueue) {
+      return anycastRpcCommandInFuture(null, rpc, usePriorityQueue);
+   }
+
+   public final void anycastRpcCommand(List<Address> recipients, ReplicableCommand rpc, boolean sync) throws ReplicationException {
+      anycastRpcCommand(recipients, rpc, sync, false);
+   }
+
+   public final void anycastRpcCommand(List<Address> recipients, ReplicableCommand rpc, boolean sync, boolean usePriorityQueue) throws ReplicationException {
+      if (trace) {
+         log.trace("Broadcasting call " + rpc + " to recipient list " + recipients);
+      }
+
+      if (useReplicationQueue(sync)) {
+         replicationQueue.add(rpc);
+      } else {
+         if (!(rpc instanceof CacheRpcCommand)) {
+            rpc = cf.buildSingleRpcCommand(rpc);
+         }
+         List rsps;
+         try {
+            rsps = invokeRemotely(recipients, rpc, getResponseMode(sync),
+                                  configuration.getSyncReplTimeout(), usePriorityQueue);
+            if (trace) log.trace("responses=" + rsps);
+            if (sync) checkResponses(rsps);
+         } catch (CacheException e) {
+            log.error("Replication exception", e);
+            throw e;
+         } catch (Exception ex) {
+            log.error("Unexpected exception", ex);
+            throw new ReplicationException("Unexpected exception while replicating", ex);
+         }
+      }
+   }
+
+   public final Future<Object> anycastRpcCommandInFuture(List<Address> recipients, ReplicableCommand rpc) {
+      return anycastRpcCommandInFuture(recipients, rpc, false);
+   }
+
+   public final Future<Object> anycastRpcCommandInFuture(final List<Address> recipients, final ReplicableCommand rpc, final boolean usePriorityQueue) {
+      Callable<Object> c = new Callable<Object>() {
+         public Object call() {
+            anycastRpcCommand(recipients, rpc, true, usePriorityQueue);
+            return null;
+         }
+      };
+      return asyncExecutor.submit(c);
+   }
+
    public Transport getTransport() {
       return t;
    }
@@ -168,11 +241,26 @@
       return currentStateTransferSource;
    }
 
-   public Address getLocalAddress() {
-      if (t == null) {
-         return null;
+   private ResponseMode getResponseMode(boolean sync) {
+      return sync ? ResponseMode.SYNCHRONOUS : configuration.isUseAsyncMarshalling() ? ResponseMode.ASYNCHRONOUS : ResponseMode.ASYNCHRONOUS_WITH_SYNC_MARSHALLING;
+   }
+
+   /**
+    * Checks whether any of the responses are exceptions. If yes, re-throws them (as exceptions or runtime exceptions).
+    */
+   private void checkResponses(List rsps) {
+      if (rsps != null) {
+         for (Object rsp : rsps) {
+            if (rsp != null && rsp instanceof Throwable) {
+               // lets print a stack trace first.
+               Throwable throwable = (Throwable) rsp;
+               if (trace) {
+                  log.trace("Received Throwable from remote cache", throwable);
+               }
+               throw new ReplicationException(throwable);
+            }
+         }
       }
-      return t.getAddress();
    }
 
    // -------------------------------------------- JMX information -----------------------------------------------

Modified: trunk/core/src/main/java/org/infinispan/remoting/transport/Transport.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/transport/Transport.java	2009-05-14 21:09:09 UTC (rev 307)
+++ trunk/core/src/main/java/org/infinispan/remoting/transport/Transport.java	2009-05-14 21:18:43 UTC (rev 308)
@@ -2,7 +2,12 @@
 
 import org.infinispan.commands.ReplicableCommand;
 import org.infinispan.config.GlobalConfiguration;
+import org.infinispan.factories.KnownComponentNames;
+import org.infinispan.factories.annotations.ComponentName;
+import org.infinispan.factories.annotations.Inject;
 import org.infinispan.factories.annotations.NonVolatile;
+import org.infinispan.factories.annotations.Start;
+import org.infinispan.factories.annotations.Stop;
 import org.infinispan.factories.scopes.Scope;
 import org.infinispan.factories.scopes.Scopes;
 import org.infinispan.lifecycle.Lifecycle;
@@ -15,7 +20,6 @@
 import org.infinispan.statetransfer.StateTransferException;
 
 import java.util.List;
-import java.util.Properties;
 import java.util.concurrent.ExecutorService;
 
 /**
@@ -33,16 +37,16 @@
    /**
     * Initializes the transport with global cache configuration and transport-specific properties.
     *
-    * @param c                      global cache-wide configuration
-    * @param p                      properties to set
-    * @param marshaller             marshaller to use for marshalling and unmarshalling
-    * @param asyncExecutor          executor to use for asynchronous calls
-    * @param handler                handler for invoking remotely originating calls on the local cache
-    * @param notifier               notifier to use
-    * @param distributedSyncTimeout timeout to wait for distributed syncs
+    * @param c             global cache-wide configuration
+    * @param marshaller    marshaller to use for marshalling and unmarshalling
+    * @param asyncExecutor executor to use for asynchronous calls
+    * @param handler       handler for invoking remotely originating calls on the local cache
+    * @param notifier      notifier to use
     */
-   void initialize(GlobalConfiguration c, Properties p, Marshaller marshaller, ExecutorService asyncExecutor,
-                   InboundInvocationHandler handler, CacheManagerNotifier notifier, long distributedSyncTimeout);
+   @Inject
+   void initialize(GlobalConfiguration c, Marshaller marshaller,
+                   @ComponentName(KnownComponentNames.ASYNC_TRANSPORT_EXECUTOR) ExecutorService asyncExecutor,
+                   InboundInvocationHandler handler, CacheManagerNotifier notifier);
 
    /**
     * Invokes an RPC call on other caches in the cluster.
@@ -112,4 +116,11 @@
     * @return true if the implementation supports state transfer, false otherwise.
     */
    boolean isSupportStateTransfer();
+
+   @Start(priority = 10)
+   void start();
+
+   @Stop
+   void stop();
+
 }

Modified: trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java	2009-05-14 21:09:09 UTC (rev 307)
+++ trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java	2009-05-14 21:18:43 UTC (rev 308)
@@ -81,18 +81,19 @@
    // Lifecycle and setup stuff
    // ------------------------------------------------------------------------------------------------------------------
 
-   public void initialize(GlobalConfiguration c, Properties p, Marshaller marshaller, ExecutorService asyncExecutor,
-                          InboundInvocationHandler inboundInvocationHandler, CacheManagerNotifier notifier, long distributedSyncTimeout) {
+   public void initialize(GlobalConfiguration c, Marshaller marshaller, ExecutorService asyncExecutor,
+                          InboundInvocationHandler inboundInvocationHandler, CacheManagerNotifier notifier) {
       this.c = c;
-      this.p = p;
       this.marshaller = marshaller;
       this.asyncExecutor = asyncExecutor;
       this.inboundInvocationHandler = inboundInvocationHandler;
       this.notifier = notifier;
-      this.distributedSyncTimeout = distributedSyncTimeout;
    }
 
    public void start() {
+      p = c.getTransportProperties();
+      distributedSyncTimeout = c.getDistributedSyncTimeout();
+
       log.info("Starting JGroups Channel");
 
       initChannelAndRPCDispatcher();

Modified: trunk/core/src/main/java/org/infinispan/transaction/xa/TransactionTable.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/transaction/xa/TransactionTable.java	2009-05-14 21:09:09 UTC (rev 307)
+++ trunk/core/src/main/java/org/infinispan/transaction/xa/TransactionTable.java	2009-05-14 21:18:43 UTC (rev 308)
@@ -9,7 +9,7 @@
 import org.infinispan.factories.annotations.Inject;
 import org.infinispan.interceptors.InterceptorChain;
 import org.infinispan.notifications.cachelistener.CacheNotifier;
-import org.infinispan.remoting.rpc.CacheRpcManager;
+import org.infinispan.remoting.rpc.RpcManager;
 import org.infinispan.remoting.transport.Address;
 import org.infinispan.util.logging.Log;
 import org.infinispan.util.logging.LogFactory;
@@ -39,11 +39,11 @@
    private InvocationContextContainer icc;
    private InterceptorChain invoker;
    private CacheNotifier notifier;
-   private CacheRpcManager rpcManager;
+   private RpcManager rpcManager;
 
 
    @Inject
-   public void initialize(CommandsFactory commandsFactory, CacheRpcManager rpcManager, Configuration configuration,
+   public void initialize(CommandsFactory commandsFactory, RpcManager rpcManager, Configuration configuration,
                           InvocationContextContainer icc, InterceptorChain invoker, CacheNotifier notifier) {
       this.commandsFactory = commandsFactory;
       this.rpcManager = rpcManager;
@@ -65,8 +65,9 @@
    /**
     * Creates and register a {@link org.infinispan.transaction.xa.RemoteTransaction} based on the supplied params.
     * Returns the created transaction.
+    *
     * @throws IllegalStateException if an attempt to create a {@link org.infinispan.transaction.xa.RemoteTransaction}
-    * for an already registered id is made.
+    *                               for an already registered id is made.
     */
    public RemoteTransaction createRemoteTransaction(GlobalTransaction globalTx, WriteCommand[] modifications) {
       RemoteTransaction remoteTransaction = new RemoteTransaction(modifications, globalTx);
@@ -89,7 +90,7 @@
    public TransactionXaAdapter getOrCreateXaAdapter(Transaction transaction, InvocationContext ctx) {
       TransactionXaAdapter current = localTransactions.get(transaction);
       if (current == null) {
-         Address localAddress = rpcManager != null ? rpcManager.getLocalAddress() : null;
+         Address localAddress = rpcManager != null ? rpcManager.getTransport().getAddress() : null;
          GlobalTransaction tx = localAddress == null ? new GlobalTransaction(false) : new GlobalTransaction(localAddress, false);
          current = new TransactionXaAdapter(tx, icc, invoker, commandsFactory, configuration, this, transaction);
          localTransactions.put(transaction, current);
@@ -113,8 +114,8 @@
    }
 
    /**
-    * Removes the {@link org.infinispan.transaction.xa.RemoteTransaction} coresponding to the given tx. Returns true 
-    * if such an tx exists.
+    * Removes the {@link org.infinispan.transaction.xa.RemoteTransaction} coresponding to the given tx. Returns true if
+    * such an tx exists.
     */
    public boolean removeRemoteTransaction(GlobalTransaction txId) {
       boolean existed = remoteTransactions.remove(txId) != null;

Modified: trunk/core/src/main/java/org/infinispan/util/ReflectionUtil.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/util/ReflectionUtil.java	2009-05-14 21:09:09 UTC (rev 307)
+++ trunk/core/src/main/java/org/infinispan/util/ReflectionUtil.java	2009-05-14 21:18:43 UTC (rev 308)
@@ -66,7 +66,12 @@
     */
    private static void inspectRecursively(Class c, List<Method> s, Class<? extends Annotation> annotationType) {
       // Superclass first
-      if (!c.equals(Object.class)) inspectRecursively(c.getSuperclass(), s, annotationType);
+      if (!c.equals(Object.class)) {
+         if (!c.isInterface()) {
+            inspectRecursively(c.getSuperclass(), s, annotationType);
+            for (Class ifc : c.getInterfaces()) inspectRecursively(ifc, s, annotationType);
+         }
+      }
 
       for (Method m : c.getDeclaredMethods()) {
          // don't bother if this method has already been overridden by a subclass

Modified: trunk/core/src/test/java/org/infinispan/jmx/JmxStatsFunctionalTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/jmx/JmxStatsFunctionalTest.java	2009-05-14 21:09:09 UTC (rev 307)
+++ trunk/core/src/test/java/org/infinispan/jmx/JmxStatsFunctionalTest.java	2009-05-14 21:18:43 UTC (rev 308)
@@ -37,7 +37,7 @@
    /**
     * Create a local cache, two replicated caches and see that everithing is correctly registered.
     */
-   public void testDefaultDomanin() {
+   public void testDefaultDomain() {
       assert !existsDomains("infinispan");
       GlobalConfiguration globalConfiguration = GlobalConfiguration.getClusteredDefault();
       globalConfiguration.setExposeGlobalJmxStatistics(true);
@@ -58,15 +58,17 @@
       cm.getCache("remote2");
 
       assert existsObject("infinispan:cache-name=local_cache(local),jmx-resource=CacheMgmtInterceptor");
-      assert existsObject("infinispan:cache-name=[global],jmx-resource=RpcManager");
+      assert existsObject("infinispan:cache-name=remote1(repl_sync),jmx-resource=RpcManager");
       assert existsObject("infinispan:cache-name=remote1(repl_sync),jmx-resource=CacheMgmtInterceptor");
+      assert existsObject("infinispan:cache-name=remote2(invalidation_async),jmx-resource=RpcManager");
       assert existsObject("infinispan:cache-name=remote2(invalidation_async),jmx-resource=CacheMgmtInterceptor");
 
       TestingUtil.killCacheManagers(cm);
 
       assert !existsObject("infinispan:cache-name=local_cache(local),jmx-resource=CacheMgmtInterceptor");
-      assert !existsObject("infinispan:cache-name=[global],jmx-resource=RpcManager");
+      assert !existsObject("infinispan:cache-name=remote1(repl_sync),jmx-resource=RpcManager");
       assert !existsObject("infinispan:cache-name=remote1(repl_sync),jmx-resource=CacheMgmtInterceptor");
+      assert !existsObject("infinispan:cache-name=remote2(invalidation_async),jmx-resource=RpcManager");
       assert !existsObject("infinispan:cache-name=remote2(invalidation_async),jmx-resource=CacheMgmtInterceptor");
    }
 
@@ -105,7 +107,7 @@
       cm.getCache("remote1");
 
       assert !existsObject("infinispan:cache-name=local_cache(local),jmx-resource=CacheMgmtInterceptor");
-      assert existsObject("infinispan:cache-name=[global],jmx-resource=RpcManager");
+      assert existsObject("infinispan:cache-name=[global],jmx-resource=CacheManager");
       assert !existsObject("infinispan:cache-name=remote1(repl_sync),jmx-resource=CacheMgmtInterceptor");
    }
 

Modified: trunk/core/src/test/java/org/infinispan/jmx/RpcManagerMBeanTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/jmx/RpcManagerMBeanTest.java	2009-05-14 21:09:09 UTC (rev 307)
+++ trunk/core/src/test/java/org/infinispan/jmx/RpcManagerMBeanTest.java	2009-05-14 21:18:43 UTC (rev 308)
@@ -1,9 +1,7 @@
 package org.infinispan.jmx;
 
 import org.easymock.EasyMock;
-import static org.easymock.EasyMock.createMock;
-import static org.easymock.EasyMock.createNiceMock;
-import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.*;
 import org.infinispan.Cache;
 import org.infinispan.config.Configuration;
 import org.infinispan.config.GlobalConfiguration;
@@ -20,7 +18,6 @@
 
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
-
 import java.util.ArrayList;
 import java.util.List;
 
@@ -59,16 +56,16 @@
 
       Configuration config = getDefaultClusteredConfig(Configuration.CacheMode.REPL_SYNC);
       config.setExposeJmxStatistics(true);
-      defineCacheOnAllManagers("repl_sync_cache", config);
-      cache1 = manager(0).getCache("repl_sync_cache");
-      cache2 = manager(1).getCache("repl_sync_cache");
+      String cachename = "repl_sync_cache";
+      defineCacheOnAllManagers(cachename, config);
+      cache1 = manager(0).getCache(cachename);
+      cache2 = manager(1).getCache(cachename);
       mBeanServer = PerThreadMBeanServerLookup.getThreadMBeanServer();
-      rpcManager1 = new ObjectName("RpcManagerMBeanTest:cache-name=[global],jmx-resource=RpcManager");
-      rpcManager2 = new ObjectName("RpcManagerMBeanTest2:cache-name=[global],jmx-resource=RpcManager");
+      rpcManager1 = new ObjectName("RpcManagerMBeanTest:cache-name=" + cachename + "(repl_sync),jmx-resource=RpcManager");
+      rpcManager2 = new ObjectName("RpcManagerMBeanTest2:cache-name=" + cachename + "(repl_sync),jmx-resource=RpcManager");
    }
 
    public void testEnableJmxStats() throws Exception {
-
       assert mBeanServer.isRegistered(rpcManager1);
       assert mBeanServer.isRegistered(rpcManager2);
 
@@ -87,7 +84,7 @@
 
       cache1.put("key", "value2");
       assert cache2.get("key").equals("value2");
-      assert mBeanServer.getAttribute(rpcManager1, "ReplicationCount").equals("1");
+      assert mBeanServer.getAttribute(rpcManager1, "ReplicationCount").equals("1") : "Expected 1, was " + mBeanServer.getAttribute(rpcManager1, "ReplicationCount");
       assert mBeanServer.getAttribute(rpcManager1, "ReplicationFailures").equals("0");
       mBeanServer.getAttribute(rpcManager1, "ReplicationCount").equals("N/A");
 
@@ -110,7 +107,7 @@
       cache1.put("a3", "b3");
       cache1.put("a4", "b4");
       assert mBeanServer.getAttribute(rpcManager1, "SuccessRatio").equals("100%");
-      RpcManagerImpl rpcManager = (RpcManagerImpl) TestingUtil.extractGlobalComponent(manager(0), RpcManager.class);
+      RpcManagerImpl rpcManager = (RpcManagerImpl) TestingUtil.extractComponent(cache1, RpcManager.class);
       Transport originalTransport = rpcManager.getTransport();
 
       try {

Modified: trunk/core/src/test/java/org/infinispan/manager/CacheManagerComponentRegistryTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/manager/CacheManagerComponentRegistryTest.java	2009-05-14 21:09:09 UTC (rev 307)
+++ trunk/core/src/test/java/org/infinispan/manager/CacheManagerComponentRegistryTest.java	2009-05-14 21:18:43 UTC (rev 308)
@@ -1,17 +1,17 @@
 package org.infinispan.manager;
 
 import org.infinispan.Cache;
-import org.infinispan.test.fwk.TestCacheManagerFactory;
 import org.infinispan.config.Configuration;
 import org.infinispan.config.GlobalConfiguration;
 import org.infinispan.eviction.EvictionManager;
 import org.infinispan.eviction.EvictionStrategy;
 import org.infinispan.interceptors.BatchingInterceptor;
 import org.infinispan.interceptors.InterceptorChain;
-import org.infinispan.remoting.rpc.RpcManager;
+import org.infinispan.remoting.transport.Transport;
 import org.infinispan.test.TestingUtil;
+import org.infinispan.test.fwk.TestCacheManagerFactory;
+import org.infinispan.transaction.lookup.DummyTransactionManagerLookup;
 import org.infinispan.transaction.tm.DummyTransactionManager;
-import org.infinispan.transaction.lookup.DummyTransactionManagerLookup;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.Test;
 
@@ -52,9 +52,9 @@
       assert TestingUtil.extractComponent(transactional, TransactionManager.class) instanceof DummyTransactionManager;
 
       // assert force-shared components
-      assert TestingUtil.extractComponent(c, RpcManager.class) != null;
-      assert TestingUtil.extractComponent(transactional, RpcManager.class) != null;
-      assert TestingUtil.extractComponent(c, RpcManager.class) == TestingUtil.extractComponent(transactional, RpcManager.class);
+      assert TestingUtil.extractComponent(c, Transport.class) != null;
+      assert TestingUtil.extractComponent(transactional, Transport.class) != null;
+      assert TestingUtil.extractComponent(c, Transport.class) == TestingUtil.extractComponent(transactional, Transport.class);
    }
 
    public void testForceUnsharedComponents() throws NamedCacheNotFoundException {




More information about the infinispan-commits mailing list