[infinispan-commits] Infinispan SVN: r1311 - trunk/core/src/main/java/org/infinispan/interceptors.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Fri Dec 18 06:37:14 EST 2009


Author: manik.surtani at jboss.com
Date: 2009-12-18 06:37:14 -0500 (Fri, 18 Dec 2009)
New Revision: 1311

Modified:
   trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java
Log:
Parellelize invalidation messages and write/prepare/commit messages

Modified: trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java	2009-12-18 01:11:06 UTC (rev 1310)
+++ trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java	2009-12-18 11:37:14 UTC (rev 1311)
@@ -197,9 +197,10 @@
    public Object visitCommitCommand(TxInvocationContext ctx, CommitCommand command) throws Throwable {
       if (ctx.isOriginLocal() && ctx.hasModifications()) {
          List<Address> recipients = dm.getAffectedNodes(ctx.getAffectedKeys());
+         NotifyingNotifiableFuture<Object> f = flushL1Cache(recipients.size(), getKeys(ctx.getModifications()), null,
+                                                            configuration.isSyncCommitPhase());
          rpcManager.invokeRemotely(recipients, command, configuration.isSyncCommitPhase(), true);
-         List<WriteCommand> mods = ctx.getModifications();
-         flushL1Cache(recipients.size(), getKeys(mods), false, null, configuration.isSyncCommitPhase());
+         if (f != null) f.get();
       }
       return invokeNextInterceptor(ctx, command);
    }
@@ -212,10 +213,11 @@
 
       if (ctx.isOriginLocal() && ctx.hasModifications()) {
          List<Address> recipients = dm.getAffectedNodes(ctx.getAffectedKeys());
+         NotifyingNotifiableFuture<Object> f = null;
+         if (command.isOnePhaseCommit()) f = flushL1Cache(recipients.size(), getKeys(ctx.getModifications()), null, sync);
          // this method will return immediately if we're the only member (because exclude_self=true)
          rpcManager.invokeRemotely(recipients, command, sync);
-         if (command.isOnePhaseCommit())
-            flushL1Cache(recipients.size(), getKeys(ctx.getModifications()), false, null, false);
+         if (f != null) f.get();
       }
       return retVal;
    }
@@ -252,17 +254,17 @@
       return l.toArray(new Object[l.size()]);
    }
 
-   private NotifyingNotifiableFuture<Object> flushL1Cache(int numCallRecipients, Object[] keys, boolean useFuture, Object retval, boolean sync) {
+   private NotifyingNotifiableFuture<Object> flushL1Cache(int numCallRecipients, Object[] keys, Object retval, boolean sync) {
       if (isL1CacheEnabled && numCallRecipients > 0 && rpcManager.getTransport().getMembers().size() > numCallRecipients) {
          if (trace) log.trace("Invalidating L1 caches");
          InvalidateCommand ic = cf.buildInvalidateFromL1Command(false, keys);
-         if (useFuture) {
+//         if (useFuture) {
             NotifyingNotifiableFuture<Object> future = new AggregatingNotifyingFutureImpl(retval, 2);
             rpcManager.broadcastRpcCommandInFuture(ic, future);
             return future;
-         } else {
-            rpcManager.broadcastRpcCommand(ic, sync);
-         }
+//         } else {
+//            rpcManager.broadcastRpcCommand(ic, sync);
+//         }
       }
       return null;
    }
@@ -293,14 +295,14 @@
                if (trace) log.trace("Invoking command {0} on hosts {1}", command, rec);
                boolean useFuture = ctx.isUseFutureReturnType();
                boolean sync = isSynchronous(ctx);
-               NotifyingNotifiableFuture<Object> future = flushL1Cache(rec == null ? 0 : rec.size(), recipientGenerator.getKeys(), useFuture, returnValue, sync);
-
+               NotifyingNotifiableFuture<Object> future = flushL1Cache(rec == null ? 0 : rec.size(), recipientGenerator.getKeys(), returnValue, sync);
                if (useFuture) {
                   if (future == null) future = new NotifyingFutureImpl(returnValue);
                   rpcManager.invokeRemotelyInFuture(rec, command, future);
                   return future;
-               } else {
+               } else {                  
                   rpcManager.invokeRemotely(rec, command, sync);
+                  if (future != null) future.get(); // wait for the inval command to complete
                }
             }
          } else {



More information about the infinispan-commits mailing list