[infinispan-commits] Infinispan SVN: r714 - in trunk/core/src: main/java/org/infinispan/factories and 3 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Thu Aug 20 21:52:39 EDT 2009


Author: manik.surtani at jboss.com
Date: 2009-08-20 21:52:39 -0400 (Thu, 20 Aug 2009)
New Revision: 714

Modified:
   trunk/core/src/main/java/org/infinispan/distribution/TransactionLogger.java
   trunk/core/src/main/java/org/infinispan/distribution/TransactionLoggerImpl.java
   trunk/core/src/main/java/org/infinispan/factories/EntryFactoryImpl.java
   trunk/core/src/main/java/org/infinispan/interceptors/DistTxInterceptor.java
   trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java
   trunk/core/src/main/java/org/infinispan/interceptors/TxInterceptor.java
   trunk/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java
   trunk/core/src/test/java/org/infinispan/distribution/rehash/RehashTestBase.java
Log:
More dist tests

Modified: trunk/core/src/main/java/org/infinispan/distribution/TransactionLogger.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/TransactionLogger.java	2009-08-20 23:21:30 UTC (rev 713)
+++ trunk/core/src/main/java/org/infinispan/distribution/TransactionLogger.java	2009-08-21 01:52:39 UTC (rev 714)
@@ -1,5 +1,8 @@
 package org.infinispan.distribution;
 
+import org.infinispan.commands.tx.CommitCommand;
+import org.infinispan.commands.tx.PrepareCommand;
+import org.infinispan.commands.tx.RollbackCommand;
 import org.infinispan.commands.write.WriteCommand;
 
 import java.util.Collection;
@@ -47,6 +50,12 @@
     */
    boolean logIfNeeded(WriteCommand command);
 
+   void logIfNeeded(PrepareCommand command);
+
+   void logIfNeeded(CommitCommand command);
+
+   void logIfNeeded(RollbackCommand command);
+
    /**
     * If logging is enabled, will log the commands and return true.  Otherwise, will just return false.
     *

Modified: trunk/core/src/main/java/org/infinispan/distribution/TransactionLoggerImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/TransactionLoggerImpl.java	2009-08-20 23:21:30 UTC (rev 713)
+++ trunk/core/src/main/java/org/infinispan/distribution/TransactionLoggerImpl.java	2009-08-21 01:52:39 UTC (rev 714)
@@ -1,11 +1,17 @@
 package org.infinispan.distribution;
 
+import org.infinispan.commands.tx.CommitCommand;
+import org.infinispan.commands.tx.PrepareCommand;
+import org.infinispan.commands.tx.RollbackCommand;
 import org.infinispan.commands.write.WriteCommand;
+import org.infinispan.transaction.xa.GlobalTransaction;
 
 import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -20,6 +26,7 @@
    volatile boolean enabled;
    final ReadWriteLock loggingLock = new ReentrantReadWriteLock();
    final BlockingQueue<WriteCommand> commandQueue = new LinkedBlockingQueue<WriteCommand>();
+   final Map<GlobalTransaction, PrepareCommand> uncommittedPrepares = new ConcurrentHashMap<GlobalTransaction, PrepareCommand>();
 
    public void enable() {
       enabled = true;
@@ -42,31 +49,87 @@
    }
 
    public boolean logIfNeeded(WriteCommand command) {
-      loggingLock.readLock().lock();
-      try {
-         if (enabled) {
-            commandQueue.add(command);
-            return true;
-         } else {
-            return false;
+      if (enabled) {
+         loggingLock.readLock().lock();
+         try {
+            if (enabled) {
+               try {
+                  commandQueue.put(command);
+               } catch (InterruptedException e) {
+                  Thread.currentThread().interrupt();
+               }
+               return true;
+            }
+         } finally {
+            loggingLock.readLock().unlock();
          }
-      } finally {
-         loggingLock.readLock().unlock();
       }
+      return false;
    }
 
+   public void logIfNeeded(PrepareCommand command) {
+      if (enabled) {
+         loggingLock.readLock().lock();
+         try {
+            if (enabled) {
+               uncommittedPrepares.put(command.getGlobalTransaction(), command);
+            }
+         } finally {
+            loggingLock.readLock().unlock();
+         }
+      }
+   }
+
+   public void logIfNeeded(CommitCommand command) {
+      if (enabled) {
+         loggingLock.readLock().lock();
+         try {
+            if (enabled) {
+               PrepareCommand pc = uncommittedPrepares.remove(command.getGlobalTransaction());
+               for (WriteCommand wc : pc.getModifications())
+                  try {
+                     commandQueue.put(wc);
+                  } catch (InterruptedException e) {
+                     Thread.currentThread().interrupt();
+                  }
+            }
+         } finally {
+            loggingLock.readLock().unlock();
+         }
+      }
+   }
+
+   public void logIfNeeded(RollbackCommand command) {
+      if (enabled) {
+         loggingLock.readLock().lock();
+         try {
+            if (enabled) {
+               uncommittedPrepares.remove(command.getGlobalTransaction());
+            }
+         } finally {
+            loggingLock.readLock().unlock();
+         }
+      }
+   }
+
    public boolean logIfNeeded(Collection<WriteCommand> commands) {
-      loggingLock.readLock().lock();
-      try {
-         if (enabled) {
-            for (WriteCommand command : commands) commandQueue.add(command);
-            return true;
-         } else {
-            return false;
+      if (enabled) {
+         loggingLock.readLock().lock();
+         try {
+            if (enabled) {
+               for (WriteCommand command : commands)
+                  try {
+                     commandQueue.put(command);
+                  } catch (InterruptedException e) {
+                     Thread.currentThread().interrupt();
+                  }
+               return true;
+            }
+         } finally {
+            loggingLock.readLock().unlock();
          }
-      } finally {
-         loggingLock.readLock().unlock();
       }
+      return false;
    }
 
    public int size() {

Modified: trunk/core/src/main/java/org/infinispan/factories/EntryFactoryImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/factories/EntryFactoryImpl.java	2009-08-20 23:21:30 UTC (rev 713)
+++ trunk/core/src/main/java/org/infinispan/factories/EntryFactoryImpl.java	2009-08-21 01:52:39 UTC (rev 714)
@@ -34,6 +34,7 @@
 import org.infinispan.factories.annotations.Inject;
 import org.infinispan.factories.annotations.Start;
 import org.infinispan.notifications.cachelistener.CacheNotifier;
+import org.infinispan.util.Util;
 import org.infinispan.util.concurrent.IsolationLevel;
 import org.infinispan.util.concurrent.TimeoutException;
 import org.infinispan.util.concurrent.locks.LockManager;
@@ -129,7 +130,7 @@
             if (mvccEntry != cacheEntry) mvccEntry = (MVCCEntry) cacheEntry;
             mvccEntry.setRemoved(false);
             mvccEntry.setValid(true);
-         }         
+         }
 
          return mvccEntry;
 
@@ -178,7 +179,7 @@
     * @return true if a lock was needed and acquired, false if it didn't need to acquire the lock (i.e., lock was
     *         already held)
     * @throws InterruptedException if interrupted
-    * @throws org.infinispan.lock.TimeoutException
+    * @throws org.infinispan.util.concurrent.TimeoutException
     *                              if we are unable to acquire the lock after a specified timeout.
     */
    public final boolean acquireLock(InvocationContext ctx, Object key) throws InterruptedException, TimeoutException {
@@ -195,7 +196,7 @@
             return true;
          } else {
             Object owner = lockManager.getOwner(key);
-            throw new TimeoutException("Unable to acquire lock on key [" + key + "] for requestor [" +
+            throw new TimeoutException("Unable to acquire lock after [" + Util.prettyPrintTime(getLockAcquisitionTimeout(ctx)) + "] on key [" + key + "] for requestor [" +
                   ctx.getLockOwner() + "]! Lock held by [" + owner + "]");
          }
       }

Modified: trunk/core/src/main/java/org/infinispan/interceptors/DistTxInterceptor.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/interceptors/DistTxInterceptor.java	2009-08-20 23:21:30 UTC (rev 713)
+++ trunk/core/src/main/java/org/infinispan/interceptors/DistTxInterceptor.java	2009-08-21 01:52:39 UTC (rev 714)
@@ -1,14 +1,19 @@
 package org.infinispan.interceptors;
 
 import org.infinispan.commands.AbstractVisitor;
+import org.infinispan.commands.CommandsFactory;
 import org.infinispan.commands.VisitableCommand;
-import org.infinispan.commands.CommandsFactory;
+import org.infinispan.commands.tx.CommitCommand;
+import org.infinispan.commands.tx.PrepareCommand;
+import org.infinispan.commands.tx.RollbackCommand;
+import org.infinispan.commands.write.ClearCommand;
 import org.infinispan.commands.write.DataWriteCommand;
 import org.infinispan.commands.write.PutKeyValueCommand;
 import org.infinispan.commands.write.PutMapCommand;
 import org.infinispan.commands.write.RemoveCommand;
 import org.infinispan.commands.write.ReplaceCommand;
 import org.infinispan.context.InvocationContext;
+import org.infinispan.context.impl.TxInvocationContext;
 import org.infinispan.distribution.DistributionManager;
 import org.infinispan.factories.annotations.Inject;
 
@@ -48,6 +53,60 @@
       }
    }
 
+   @Override
+   public Object visitPrepareCommand(TxInvocationContext ctx, PrepareCommand cmd) throws Throwable {
+      dm.getTransactionLogger().logIfNeeded(cmd);
+      return super.visitPrepareCommand(ctx, cmd);
+   }
+
+   @Override
+   public Object visitRollbackCommand(TxInvocationContext ctx, RollbackCommand cmd) throws Throwable {
+      dm.getTransactionLogger().logIfNeeded(cmd);
+      return super.visitRollbackCommand(ctx, cmd);
+   }
+
+   @Override
+   public Object visitCommitCommand(TxInvocationContext ctx, CommitCommand cmd) throws Throwable {
+      dm.getTransactionLogger().logIfNeeded(cmd);
+      return super.visitCommitCommand(ctx, cmd);
+   }
+
+   @Override
+   public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable {
+      Object o = super.visitPutKeyValueCommand(ctx, command);
+      if (!ctx.isInTxScope() && command.isSuccessful()) dm.getTransactionLogger().logIfNeeded(command);
+      return o;
+   }
+
+   @Override
+   public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws Throwable {
+      Object o = super.visitRemoveCommand(ctx, command);
+      if (!ctx.isInTxScope() && command.isSuccessful()) dm.getTransactionLogger().logIfNeeded(command);
+      return o;
+   }
+
+   @Override
+   public Object visitReplaceCommand(InvocationContext ctx, ReplaceCommand command) throws Throwable {
+      Object o = super.visitReplaceCommand(ctx, command);
+      if (!ctx.isInTxScope() && command.isSuccessful()) dm.getTransactionLogger().logIfNeeded(command);
+      return o;
+   }
+
+   @Override
+   public Object visitClearCommand(InvocationContext ctx, ClearCommand command) throws Throwable {
+      Object o = super.visitClearCommand(ctx, command);
+      if (!ctx.isInTxScope() && command.isSuccessful()) dm.getTransactionLogger().logIfNeeded(command);
+      return o;
+   }
+
+   @Override
+   public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) throws Throwable {
+      Object o = super.visitPutMapCommand(ctx, command);
+      if (!ctx.isInTxScope() && command.isSuccessful()) dm.getTransactionLogger().logIfNeeded(command);
+      return o;
+   }
+
+
    class ReplayCommandVisitor extends AbstractVisitor {
       @Override
       public Object visitPutMapCommand(InvocationContext ignored, PutMapCommand command) {

Modified: trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java	2009-08-20 23:21:30 UTC (rev 713)
+++ trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java	2009-08-21 01:52:39 UTC (rev 714)
@@ -1,6 +1,7 @@
 package org.infinispan.interceptors;
 
 import org.infinispan.commands.CommandsFactory;
+import org.infinispan.commands.DataCommand;
 import org.infinispan.commands.control.LockControlCommand;
 import org.infinispan.commands.read.GetKeyValueCommand;
 import org.infinispan.commands.tx.CommitCommand;
@@ -31,6 +32,7 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -181,6 +183,7 @@
       if (ctx.isOriginLocal()) {
          List<Address> recipients = new ArrayList<Address>(ctx.getTransactionParticipants());
          rpcManager.invokeRemotely(recipients, command, configuration.isSyncCommitPhase(), true);
+         flushL1Cache(recipients.size(), getKeys(ctx.getModifications()), false, null, configuration.isSyncCommitPhase());
       }
       return invokeNextInterceptor(ctx, command);
    }
@@ -196,6 +199,8 @@
          if (trace) log.trace("Multicasting PrepareCommand to recipients : " + recipients);
          // this method will return immediately if we're the only member (because exclude_self=true)
          rpcManager.invokeRemotely(recipients, command, sync);
+         if (command.isOnePhaseCommit())
+            flushL1Cache(recipients.size(), getKeys(ctx.getModifications()), false, null, false);
       }
       return retVal;
    }
@@ -223,6 +228,33 @@
       return !ctx.hasFlag(Flag.SKIP_REMOTE_LOOKUP) && needReliableReturnValues;
    }
 
+   private Object[] getKeys(List<WriteCommand> mods) {
+      List<Object> l = new LinkedList<Object>();
+      for (WriteCommand m : mods) {
+         if (m instanceof DataCommand) {
+            l.add(((DataCommand) m).getKey());
+         } else if (m instanceof PutMapCommand) {
+            l.addAll(((PutMapCommand) m).getMap().keySet());
+         }
+      }
+      return l.toArray(new Object[l.size()]);
+   }
+
+   private NotifyingNotifiableFuture<Object> flushL1Cache(int numCallRecipients, Object[] keys, boolean useFuture, Object retval, boolean sync) {
+      if (isL1CacheEnabled && numCallRecipients > 0 && rpcManager.getTransport().getMembers().size() > numCallRecipients) {
+         if (trace) log.trace("Invalidating L1 caches");
+         InvalidateCommand ic = cf.buildInvalidateFromL1Command(keys);
+         if (useFuture) {
+            NotifyingNotifiableFuture<Object> future = new AggregatingNotifyingFutureImpl(retval, 2);
+            rpcManager.broadcastRpcCommandInFuture(ic, future);
+            return future;
+         } else {
+            rpcManager.broadcastRpcCommand(ic, sync);
+         }
+      }
+      return null;
+   }
+
    /**
     * If we are within one transaction we won't do any replication as replication would only be performed at commit
     * time. If the operation didn't originate locally we won't do any replication either.
@@ -245,17 +277,7 @@
                if (trace) log.trace("Invoking command {0} on hosts {1}", command, rec);
                boolean useFuture = ctx.isUseFutureReturnType();
                boolean sync = isSynchronous(ctx);
-               NotifyingNotifiableFuture<Object> future = null;
-               // if L1 caching is used make sure we broadcast an invalidate message
-               if (isL1CacheEnabled && rec != null && rpcManager.getTransport().getMembers().size() > rec.size()) {
-                  InvalidateCommand ic = cf.buildInvalidateFromL1Command(recipientGenerator.getKeys());
-                  if (useFuture) {
-                     future = new AggregatingNotifyingFutureImpl(returnValue, 2);
-                     rpcManager.broadcastRpcCommandInFuture(ic, future);
-                  } else {
-                     rpcManager.broadcastRpcCommand(ic, sync);
-                  }
-               }
+               NotifyingNotifiableFuture<Object> future = flushL1Cache(rec == null ? 0 : rec.size(), recipientGenerator.getKeys(), useFuture, returnValue, sync);
 
                if (useFuture) {
                   if (future == null) future = new NotifyingFutureImpl(returnValue);

Modified: trunk/core/src/main/java/org/infinispan/interceptors/TxInterceptor.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/interceptors/TxInterceptor.java	2009-08-20 23:21:30 UTC (rev 713)
+++ trunk/core/src/main/java/org/infinispan/interceptors/TxInterceptor.java	2009-08-21 01:52:39 UTC (rev 714)
@@ -7,7 +7,6 @@
 import org.infinispan.commands.tx.PrepareCommand;
 import org.infinispan.commands.tx.RollbackCommand;
 import org.infinispan.commands.write.ClearCommand;
-import org.infinispan.commands.write.EvictCommand;
 import org.infinispan.commands.write.InvalidateCommand;
 import org.infinispan.commands.write.PutKeyValueCommand;
 import org.infinispan.commands.write.PutMapCommand;
@@ -148,11 +147,6 @@
    }
 
    @Override
-   public Object visitEvictCommand(InvocationContext ctx, EvictCommand command) throws Throwable {
-      return invokeNextInterceptor(ctx, command);
-   }
-
-   @Override
    public Object visitInvalidateCommand(InvocationContext ctx, InvalidateCommand invalidateCommand) throws Throwable {
       return enlistWriteAndInvokeNext(ctx, invalidateCommand);
    }

Modified: trunk/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java	2009-08-20 23:21:30 UTC (rev 713)
+++ trunk/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java	2009-08-21 01:52:39 UTC (rev 714)
@@ -22,6 +22,7 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
+import java.util.concurrent.TimeUnit;
 import static java.util.concurrent.TimeUnit.SECONDS;
 
 @Test(groups = "functional", testName = "distribution.BaseDistFunctionalTest")
@@ -45,6 +46,8 @@
          configuration.setIsolationLevel(IsolationLevel.REPEATABLE_READ);
       }
       if (tx) configuration.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
+      configuration.setSyncReplTimeout(60, TimeUnit.SECONDS);
+      configuration.setLockAcquisitionTimeout(45, TimeUnit.SECONDS);
       caches = createClusteredCaches(4, cacheName, configuration);
 
       reorderBasedOnCHPositions();

Modified: trunk/core/src/test/java/org/infinispan/distribution/rehash/RehashTestBase.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/rehash/RehashTestBase.java	2009-08-20 23:21:30 UTC (rev 713)
+++ trunk/core/src/test/java/org/infinispan/distribution/rehash/RehashTestBase.java	2009-08-21 01:52:39 UTC (rev 714)
@@ -96,7 +96,6 @@
     * More complex - init some state.  Start a new transaction, and midway trigger a rehash.  Then complete transaction
     * and test results.
     */
-   @Test(enabled = false, description = "Enable after releasing Beta1")
    public void testTransactional() throws Exception {
       final List<MagicKey> keys = init();
       final CountDownLatch l = new CountDownLatch(1);



More information about the infinispan-commits mailing list