[infinispan-commits] Infinispan SVN: r2290 - in branches/4.1.x/core/src: main/java/org/infinispan/loaders/modifications and 4 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Tue Aug 31 18:20:03 EDT 2010


Author: sannegrinovero
Date: 2010-08-31 18:20:02 -0400 (Tue, 31 Aug 2010)
New Revision: 2290

Added:
   branches/4.1.x/core/src/main/java/org/infinispan/loaders/modifications/ModificationsList.java
   branches/4.1.x/core/src/test/java/org/infinispan/loaders/decorators/BatchAsyncCacheStoreTest.java
Modified:
   branches/4.1.x/core/src/main/java/org/infinispan/loaders/decorators/AsyncStore.java
   branches/4.1.x/core/src/main/java/org/infinispan/loaders/decorators/AsyncStoreConfig.java
   branches/4.1.x/core/src/main/java/org/infinispan/loaders/modifications/Commit.java
   branches/4.1.x/core/src/main/java/org/infinispan/loaders/modifications/Modification.java
   branches/4.1.x/core/src/main/java/org/infinispan/loaders/modifications/Prepare.java
   branches/4.1.x/core/src/test/java/org/infinispan/loaders/FlushingAsyncStoreTest.java
   branches/4.1.x/core/src/test/java/org/infinispan/loaders/UnnnecessaryLoadingTest.java
   branches/4.1.x/core/src/test/java/org/infinispan/loaders/decorators/AsyncTest.java
   branches/4.1.x/core/src/test/java/org/infinispan/statetransfer/StateTransferFileCacheLoaderFunctionalTest.java
   branches/4.1.x/core/src/test/java/org/infinispan/test/TestingUtil.java
Log:
[ISPN-618] (AsyncStore fails to save all values when using transactions or batch operations) and [ISPN-619] (Aggregate changes from multiple transactions in AsyncStore) - branch 4.1

Modified: branches/4.1.x/core/src/main/java/org/infinispan/loaders/decorators/AsyncStore.java
===================================================================
--- branches/4.1.x/core/src/main/java/org/infinispan/loaders/decorators/AsyncStore.java	2010-08-31 22:18:46 UTC (rev 2289)
+++ branches/4.1.x/core/src/main/java/org/infinispan/loaders/decorators/AsyncStore.java	2010-08-31 22:20:02 UTC (rev 2290)
@@ -8,10 +8,8 @@
 import org.infinispan.loaders.CacheLoaderException;
 import org.infinispan.loaders.CacheStore;
 import org.infinispan.loaders.modifications.Clear;
-import org.infinispan.loaders.modifications.Commit;
 import org.infinispan.loaders.modifications.Modification;
-import org.infinispan.loaders.modifications.Prepare;
-import org.infinispan.loaders.modifications.PurgeExpired;
+import org.infinispan.loaders.modifications.ModificationsList;
 import org.infinispan.loaders.modifications.Remove;
 import org.infinispan.loaders.modifications.Store;
 import org.infinispan.marshall.StreamingMarshaller;
@@ -20,8 +18,6 @@
 import org.infinispan.util.logging.Log;
 import org.infinispan.util.logging.LogFactory;
 
-import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -30,12 +26,12 @@
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -61,6 +57,7 @@
  *
  * @author Manik Surtani
  * @author Galder Zamarreño
+ * @author Sanne Grinovero
  * @since 4.0
  */
 public class AsyncStore extends AbstractDelegatingStore {
@@ -68,24 +65,31 @@
    private static final boolean trace = log.isTraceEnabled();
    private static final AtomicInteger threadId = new AtomicInteger(0);
    private final AtomicBoolean stopped = new AtomicBoolean(true);
+   
    private final AsyncStoreConfig asyncStoreConfig;
-
+   private Map<GlobalTransaction, List<? extends Modification>> transactions;
+   
    /**
-    * Approximate count of number of modified keys. At points, it could contain negative values.
+    * This is used as marker to shutdown the AsyncStoreCoordinator
     */
-   private final AtomicInteger count = new AtomicInteger(0);
-   private final ReentrantLock lock = new ReentrantLock();
-   private final Condition notEmpty = lock.newCondition();
-
+   private static final Modification QUIT_SIGNAL = new Clear();
+   
+   /**
+    * clear() is performed in sync by the one thread of storeCoordinator, while blocking all
+    * other threads interacting with the decorated store.
+    */
+   private final ReadWriteLock clearAllLock = new ReentrantReadWriteLock();
+   private final Lock clearAllReadLock = clearAllLock.readLock();
+   private final Lock clearAllWriteLock = clearAllLock.writeLock();
+   private final Lock stateMapLock = new ReentrantLock();
+   
    ExecutorService executor;
-   private List<Future> processorFutures;
-   private final ReadWriteLock mapLock = new ReentrantReadWriteLock();
-   private final Lock read = mapLock.readLock();
-   private final Lock write = mapLock.writeLock();
    private int concurrencyLevel;
-   @GuardedBy("mapLock")
+   @GuardedBy("stateMapLock")
    protected ConcurrentMap<Object, Modification> state;
    private ReleaseAllLockContainer lockContainer;
+   private final LinkedBlockingQueue<Modification> changesDeque = new LinkedBlockingQueue<Modification>();
+   public volatile boolean lastAsyncProcessorShutsDownExecutor = false;
 
    public AsyncStore(CacheStore delegate, AsyncStoreConfig asyncStoreConfig) {
       super(delegate);
@@ -97,77 +101,97 @@
       super.init(config, cache, m);
       concurrencyLevel = cache == null || cache.getConfiguration() == null ? 16 : cache.getConfiguration().getConcurrencyLevel();
       lockContainer = new ReleaseAllLockContainer(concurrencyLevel);
+      transactions = new ConcurrentHashMap<GlobalTransaction, List<? extends Modification>>(64, 0.75f, concurrencyLevel);
    }
 
    @Override
    public void store(InternalCacheEntry ed) {
-      enqueue(ed.getKey(), new Store(ed));
+      enqueue(new Store(ed));
    }
 
    @Override
    public boolean remove(Object key) {
-      enqueue(key, new Remove(key));
+      enqueue(new Remove(key));
       return true;
    }
 
    @Override
    public void clear() {
       Clear clear = new Clear();
-      enqueue(clear, clear);
+      checkNotStopped(); //check we can change the changesDeque
+      changesDeque.clear();
+      enqueue(clear);
    }
 
    @Override
-   public void purgeExpired() {
-      PurgeExpired purge = new PurgeExpired();
-      enqueue(purge, purge);
+   public void prepare(List<? extends Modification> mods, GlobalTransaction tx, boolean isOnePhase) throws CacheLoaderException {
+      if (isOnePhase) {
+         enqueueModificationsList(mods);
+      } else {
+         transactions.put(tx, mods);
+      }
    }
-
+   
    @Override
-   public void prepare(List<? extends Modification> list, GlobalTransaction tx, boolean isOnePhase) {
-      Prepare prepare = new Prepare(list, tx, isOnePhase);
-      enqueue(prepare, prepare);
+   public void rollback(GlobalTransaction tx) {
+      transactions.remove(tx);
    }
 
    @Override
    public void commit(GlobalTransaction tx) throws CacheLoaderException {
-      Commit commit = new Commit(tx);
-      enqueue(commit, commit);
+      List<? extends Modification> list = transactions.remove(tx);
+      enqueueModificationsList(list);
    }
+   
+   protected void enqueueModificationsList(List<? extends Modification> mods) throws CacheLoaderException {
+      if (mods != null && !mods.isEmpty()) {
+         enqueue(new ModificationsList(mods));
+      }
+   }
 
    @Override
    public void start() throws CacheLoaderException {
       state = newStateMap();
       log.info("Async cache loader starting {0}", this);
       stopped.set(false);
+      lastAsyncProcessorShutsDownExecutor = false;
       super.start();
       int poolSize = asyncStoreConfig.getThreadPoolSize();
-      executor = Executors.newFixedThreadPool(poolSize, new ThreadFactory() {
-         public Thread newThread(Runnable r) {
-            Thread t = new Thread(r, "CoalescedAsyncStore-" + threadId.getAndIncrement());
-            t.setDaemon(true);
-            return t;
-         }
-      });
-      processorFutures = new ArrayList<Future>(poolSize);
-      for (int i = 0; i < poolSize; i++) processorFutures.add(executor.submit(createAsyncProcessor()));
+      executor = new ThreadPoolExecutor(poolSize, poolSize, 0L, TimeUnit.MILLISECONDS,
+               // note the use of poolSize+1 as maximum workingQueue together with DiscardPolicy:
+               // this way when a new AsyncProcessor is started unnecessarily we discard it
+               // before it takes locks to perform no work
+               // this way we save memory from the executor queue, CPU, and also avoid
+               // any possible RejectedExecutionException.
+               new LinkedBlockingQueue<Runnable>(poolSize + 1),
+               new ThreadFactory() {
+                  public Thread newThread(Runnable r) {
+                     Thread t = new Thread(r, "CoalescedAsyncStore-" + threadId.getAndIncrement());
+                     t.setDaemon(true);
+                     return t;
+                  }
+               },
+               new ThreadPoolExecutor.DiscardPolicy()
+         );
+      startStoreCoordinator();
    }
 
+   private void startStoreCoordinator() {
+      ExecutorService storeCoordinator = Executors.newFixedThreadPool(1);
+      storeCoordinator.execute( new AsyncStoreCoordinator() );
+      storeCoordinator.shutdown();
+   }
+
    @Override
    public void stop() throws CacheLoaderException {
       stopped.set(true);
-      if (executor != null) {
-         for (Future f : processorFutures) f.cancel(true);
-         executor.shutdown();
-         try {
-            boolean terminated = executor.isTerminated();
-            while (!terminated) {
-               terminated = executor.awaitTermination(60, TimeUnit.SECONDS);
-            }
-         } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-         }
+      try {
+         changesDeque.put(QUIT_SIGNAL);
+         executor.awaitTermination(asyncStoreConfig.getShutdownTimeout(), TimeUnit.SECONDS);
+      } catch (InterruptedException e) {
+         log.error("Interrupted or timeout while waiting for AsyncStore worker threads to push all state to the decorated store", e);
+         Thread.currentThread().interrupt();
       }
-      executor = null;
       super.stop();
    }
 
@@ -182,81 +206,46 @@
             case REMOVE:
                super.remove(entry.getKey());
                break;
-            case CLEAR:
-               super.clear();
-               break;
-            case PURGE_EXPIRED:
-               super.purgeExpired();
-               break;
-            case PREPARE:
-               List<? extends Modification> coalesced = coalesceModificationList(((Prepare) mod).getList());
-               super.prepare(coalesced, ((Prepare) mod).getTx(), ((Prepare) mod).isOnePhase());
-               break;
-            case COMMIT:
-               super.commit(((Commit) mod).getTx());
-               break;
+            default:
+               throw new IllegalArgumentException("Unexpected modification type " + mod.getType());
          }
       }
    }
-
-   protected Runnable createAsyncProcessor() {
-      return new AsyncProcessor();
+   
+   protected boolean applyClear() {
+      try {
+         super.clear();
+         return true;
+      } catch (CacheLoaderException e) {
+         log.error("Error performing clear in AsyncStore", e);
+         return false;
+      }
    }
-
-   private List<? extends Modification> coalesceModificationList(List<? extends Modification> mods) {
-      Map<Object, Modification> keyMods = new HashMap<Object, Modification>();
-      List<Modification> coalesced = new ArrayList<Modification>();
-      for (Modification mod : mods) {
-         switch (mod.getType()) {
-            case STORE:
-               keyMods.put(((Store) mod).getStoredEntry().getKey(), mod);
-               break;
-            case CLEAR:
-               keyMods.clear(); // remove all pending key modifications
-               coalesced.add(mod); // add a clear so that future put/removes do not need to do anything
-               break;
-            case REMOVE:
-               if (!coalesced.isEmpty() && keyMods.containsKey(((Remove) mod).getKey())) {
-                  keyMods.remove(((Remove) mod).getKey()); // clear, p(k), r(k) sequence should result in no-op for k
-               } else if (coalesced.isEmpty()) {
-                  keyMods.put(((Remove) mod).getKey(), mod);
-               }
-               break;
-            default:
-               throw new IllegalArgumentException("Unknown modification type " + mod.getType());
-         }
+   
+   protected void delegatePurgeExpired() {
+      try {
+         super.purgeExpired();
+      } catch (CacheLoaderException e) {
+         log.error("Error performing PurgeExpired in AsyncStore", e);
       }
-      coalesced.addAll(keyMods.values());
-      return coalesced;
    }
 
-   private void enqueue(Object key, Modification mod) {
+   private void enqueue(Modification mod) {
       try {
-         if (stopped.get()) {
-            throw new CacheException("AsyncStore stopped; no longer accepting more entries.");
-         }
+         checkNotStopped();
          if (trace) log.trace("Enqueuing modification {0}", mod);
-         Modification prev = null;
-         int c = -1;
-         boolean unlock = false;
-         try {
-            acquireLock(read);
-            unlock = true;
-            prev = state.put(key, mod); // put the key's latest state in updates
-         } finally {
-            if (unlock) read.unlock();
-         }
-         /* Increment can happen outside the lock cos worst case scenario a false not empty would 
-          * be sent if the swap and decrement happened between the put and the increment. In this 
-          * case, the corresponding processor would see the map empty and would wait again. This 
-          * means that we're allowing count to potentially go negative but that's not a problem. */
-         if (prev == null) c = count.getAndIncrement();
-         if (c == 0) signalNotEmpty();
+         changesDeque.add(mod);
       } catch (Exception e) {
          throw new CacheException("Unable to enqueue asynchronous task", e);
       }
    }
 
+   private void checkNotStopped() {
+      if (stopped.get()) {
+         throw new CacheException("AsyncStore stopped; no longer accepting more entries.");
+      }
+   }
+
    private void acquireLock(Lock lock) {
       try {
          if (!lock.tryLock(asyncStoreConfig.getFlushLockTimeout(), TimeUnit.MILLISECONDS))
@@ -267,105 +256,85 @@
       }
    }
 
-   private void signalNotEmpty() {
-      lock.lock();
-      try {
-         notEmpty.signal();
-      } finally {
-         lock.unlock();
-      }
-   }
-
-   private void awaitNotEmptyOrStopped() throws InterruptedException {
-      lock.lockInterruptibly();
-      try {
-         try {
-            while (count.get() == 0) {
-               if (stopped.get()) {
-                  notEmpty.signal();
-                  return;
-               }
-               notEmpty.await();
-            }
-         } catch (InterruptedException ie) {
-            notEmpty.signal(); // propagate to a non-interrupted thread
-            throw ie;
-         }
-      } finally {
-         lock.unlock();
-      }
-   }
-
-   private int decrementAndGet(int delta) {
-      for (; ;) {
-         int current = count.get();
-         int next = current - delta;
-         if (count.compareAndSet(current, next)) return next;
-      }
-   }
-
    /**
     * Processes modifications taking the latest updates from a state map.
     */
    class AsyncProcessor implements Runnable {
-      private ConcurrentMap<Object, Modification> swap = newStateMap();
       private final Set<Object> lockedKeys = new HashSet<Object>();
+      boolean runAgainAfterWaiting = false;
 
       public void run() {
-         while (!Thread.interrupted() && !stopped.get()) {
+         clearAllReadLock.lock();
+         try {
+            innerRun();
+         } catch (Throwable t) {
+            runAgainAfterWaiting = false;
+            log.error("Unexpected error", t);
+         } finally {
+            clearAllReadLock.unlock();
+         }
+         if (runAgainAfterWaiting) {
             try {
-               run0();
+               Thread.sleep(10);
+            } catch (InterruptedException e) {
+               // just speedup ignoring more sleep but still make sure to store all data
             }
-            catch (InterruptedException e) {
-               break;
-            }
+            ensureMoreWorkIsHandled();
          }
-
-         try {
-            if (trace) log.trace("Process remaining batch {0}", swap.size());
-            put(swap);
-            if (trace) log.trace("Process remaining queued {0}", state.size());
-            while (!state.isEmpty()) run0();
-         } catch (InterruptedException e) {
-            if (trace) log.trace("Remaining interrupted");
-         }
       }
-
-      void run0() throws InterruptedException {
+      
+      private void innerRun() {
+         final ConcurrentMap<Object, Modification> swap;
          if (trace) log.trace("Checking for modifications");
-         boolean unlock = false;
-
          try {
-            acquireLock(write);
-            unlock = true;
-            swap = state;
-            state = newStateMap();
+            acquireLock(stateMapLock);
+            try {
+               swap = state;
+               state = newStateMap();
 
-            // This needs doing within the WL section, because if a key is in use, we need to put it back in the state
-            // map for later processing and we don't wanna do it in such way that we override a newer value that might 
-            // have been enqueued by a user thread.
-            for (Object key : swap.keySet()) {
-               boolean acquired = lockContainer.acquireLock(key, 0, TimeUnit.NANOSECONDS) != null;
-               if (trace) log.trace("Lock for key {0} was acquired={1}", key, acquired);
-               if (!acquired) {
-                  Modification prev = swap.remove(key);
-                  state.put(key, prev);
-               } else {
-                  lockedKeys.add(key);
+               // This needs to be done within the stateMapLock section, because if a key is in use,
+               // we need to put it back in the state
+               // map for later processing and we don't wanna do it in such way that we override a
+               // newer value that might
+               // have been taken already for processing by another instance of this same code.
+               // AsyncStoreCoordinator doesn't need to acquired the same lock as values put by it
+               // will never be overwritten (putIfAbsent below)
+               for (Object key : swap.keySet()) {
+                  if (trace) log.trace("Going to process mod key: {0}", key);
+                  boolean acquired = false;
+                  try {
+                     acquired = lockContainer.acquireLock(key, 0, TimeUnit.NANOSECONDS) != null;
+                  } catch (InterruptedException e) {
+                     log.error("interrupted on acquireLock {0}, 0 nanoseconds!", e);
+                     Thread.currentThread().interrupt();
+                     return;
+                  }
+                  if (trace)
+                     log.trace("Lock for key {0} was acquired={1}", key, acquired);
+                  if (!acquired) {
+                     Modification prev = swap.remove(key);
+                     Modification didPut = state.putIfAbsent(key, prev); // don't overwrite more recently put work
+                     if (didPut == null) {
+                        // otherwise a new job is being spawned by the arbiter, so no need to create
+                        // a new worker
+                        runAgainAfterWaiting = true;
+                     }
+                  } else {
+                     lockedKeys.add(key);
+                  }
                }
+            } finally {
+               stateMapLock.unlock();
             }
-         } finally {
-            if (unlock) write.unlock();
-         }
 
-         try {
-            int size = swap.size();
             if (swap.isEmpty()) {
-               awaitNotEmptyOrStopped();
+               if (lastAsyncProcessorShutsDownExecutor && runAgainAfterWaiting == false) {
+                  executor.shutdown();
+               }
+               return;
             } else {
-               decrementAndGet(size);
-
-               if (trace) log.trace("Apply {0} modifications", size);
+               if (trace)
+                  log.trace("Apply {0} modifications", swap.size());
                int maxRetries = 3;
                int attemptNumber = 0;
                boolean successful;
@@ -386,22 +355,12 @@
          }
       }
 
-      boolean put(ConcurrentMap<Object, Modification> mods) throws InterruptedException {
+      boolean put(ConcurrentMap<Object, Modification> mods) {
          try {
             AsyncStore.this.applyModificationsSync(mods);
             return true;
          } catch (Exception e) {
-            boolean isDebug = log.isDebugEnabled();
-            if (isDebug) log.debug("Failed to process async modifications", e);
-            Throwable cause = e;
-            while (cause != null) {
-                if (cause instanceof InterruptedException) {
-                    // 3rd party code may have cleared the thread interrupt status
-                    if (isDebug) log.debug("Rethrowing InterruptedException");
-                    throw (InterruptedException) cause;
-                }
-                cause = cause.getCause();
-            }
+            if (log.isDebugEnabled()) log.debug("Failed to process async modifications", e);
             return false;
          }
       }
@@ -410,7 +369,7 @@
    private ConcurrentMap<Object, Modification> newStateMap() {
       return new ConcurrentHashMap<Object, Modification>(64, 0.75f, concurrencyLevel);
    }
-
+   
    private static class ReleaseAllLockContainer extends ReentrantPerEntryLockContainer {
       private ReleaseAllLockContainer(int concurrencyLevel) {
          super(concurrencyLevel);
@@ -423,4 +382,112 @@
          }
       }
    }
+
+   private void ensureMoreWorkIsHandled() {
+           executor.execute(new AsyncProcessor());
+   }
+   
+   private class AsyncStoreCoordinator implements Runnable {
+
+      @Override
+      public void run() {
+         while (true) {
+            try {
+               Modification take = changesDeque.take();
+               if (take == QUIT_SIGNAL) {
+                  lastAsyncProcessorShutsDownExecutor = true;
+                  ensureMoreWorkIsHandled();
+                  return;
+               }
+               else {
+                  handleSafely(take);
+               }
+            } catch (InterruptedException e) {
+               log.error("AsyncStoreCoordinator interrupted", e);
+               return;
+            } catch (Throwable t) {
+               log.error("Unexpected error in AsyncStoreCoordinator thread. AsyncStore is dead!", t);
+            }
+         }
+      }
+
+      private void handleSafely(Modification mod) {
+         try {
+            if (trace) log.trace("taking from modification queue: {0}", mod);
+            handle(mod, false);
+         } catch (Exception e) {
+            log.error("Error while handling Modification in AsyncStore", e);
+         }
+      }
+
+      private void handle(Modification mod, boolean nested) {
+         boolean asyncProcessorNeeded = false;
+         switch (mod.getType()) {
+            case STORE:
+               Store store = (Store) mod;
+               stateMapLock.lock();
+               state.put(store.getStoredEntry().getKey(), store);
+               stateMapLock.unlock();
+               asyncProcessorNeeded = true;
+               break;
+            case REMOVE:
+               Remove remove = (Remove) mod;
+               stateMapLock.lock();
+               state.put(remove.getKey(), remove);
+               stateMapLock.unlock();
+               asyncProcessorNeeded = true;
+               break;
+            case CLEAR:
+               performClear();
+               break;
+            case PURGE_EXPIRED:
+               delegatePurgeExpired();
+               break;
+            case LIST:
+               applyModificationsList((ModificationsList) mod);
+               asyncProcessorNeeded = true;
+               break;
+            default:
+               throw new IllegalArgumentException("Unexpected modification type " + mod.getType());
+         }
+         if (asyncProcessorNeeded && !nested) {
+            // we know when it's possible for some work to be done, starting short-lived
+            // AsyncProcessor(s) simplifies shutdown process.
+             ensureMoreWorkIsHandled();
+         }
+      }
+
+      private void applyModificationsList(ModificationsList mod) {
+         for (Modification m : mod.getList()) {
+            handle(m, true);
+         }
+      }
+
+      private void performClear() {
+         state.clear(); // cancel any other scheduled changes
+         clearAllWriteLock.lock(); // ensure no other tasks concurrently working
+         try {
+            // to acquire clearAllWriteLock we might have had to wait for N AsyncProcessor to have finished
+            // (as they have to release all clearAllReadLock),
+            // so as they might have put back some work to the state map, clear the state map again inside the writeLock:
+            state.clear();
+            if (trace) log.trace("Performed clear operation");
+            int maxRetries = 3;
+            int attemptNumber = 0;
+            boolean successful = false;
+            do {
+               if (attemptNumber > 0 && log.isDebugEnabled())
+                  log.debug("Retrying clear() due to previous failure. {0} attempts left.", maxRetries - attemptNumber);
+               successful = applyClear();
+               attemptNumber++;
+            } while (!successful && attemptNumber <= maxRetries);
+            if (!successful) {
+               log.error("Clear() operation in async store could not be performed");
+            }
+         } finally {
+            clearAllWriteLock.unlock();
+         }
+      }
+
+   }
 }

Modified: branches/4.1.x/core/src/main/java/org/infinispan/loaders/decorators/AsyncStoreConfig.java
===================================================================
--- branches/4.1.x/core/src/main/java/org/infinispan/loaders/decorators/AsyncStoreConfig.java	2010-08-31 22:18:46 UTC (rev 2289)
+++ branches/4.1.x/core/src/main/java/org/infinispan/loaders/decorators/AsyncStoreConfig.java	2010-08-31 22:20:02 UTC (rev 2290)
@@ -42,6 +42,11 @@
    @Dynamic
    protected Long flushLockTimeout = 5000L;
 
+   /** @configRef desc="Timeout to stop the cache store. When the store is stopped it's possible that some modifications still need to be applied;
+    *             you likely want to set a very large timeout to make sure to not loose data." */
+   @Dynamic
+   protected Long shutdownTimeout = 7200L;
+
    @XmlAttribute
    public Boolean isEnabled() {
       return enabled;
@@ -70,8 +75,18 @@
    public void setFlushLockTimeout(Long stateLockTimeout) {
       testImmutability("flushLockTimeout");
       this.flushLockTimeout = stateLockTimeout;
-   }   
+   }
 
+   @XmlAttribute
+   public Long getShutdownTimeout() {
+      return shutdownTimeout;
+   }
+
+   public void setShutdownTimeout(Long shutdownTimeout) {
+      testImmutability("shutdownTimeout");
+      this.shutdownTimeout = shutdownTimeout;
+   }
+
    @Override
    public AsyncStoreConfig clone() {
       try {

Modified: branches/4.1.x/core/src/main/java/org/infinispan/loaders/modifications/Commit.java
===================================================================
--- branches/4.1.x/core/src/main/java/org/infinispan/loaders/modifications/Commit.java	2010-08-31 22:18:46 UTC (rev 2289)
+++ branches/4.1.x/core/src/main/java/org/infinispan/loaders/modifications/Commit.java	2010-08-31 22:20:02 UTC (rev 2290)
@@ -62,5 +62,10 @@
       Commit other = (Commit) obj;
       return tx.equals(other.tx);
    }
+   
+   @Override
+   public String toString() {
+      return "Commit: " + tx;
+   }
 
 }

Modified: branches/4.1.x/core/src/main/java/org/infinispan/loaders/modifications/Modification.java
===================================================================
--- branches/4.1.x/core/src/main/java/org/infinispan/loaders/modifications/Modification.java	2010-08-31 22:18:46 UTC (rev 2289)
+++ branches/4.1.x/core/src/main/java/org/infinispan/loaders/modifications/Modification.java	2010-08-31 22:20:02 UTC (rev 2290)
@@ -8,7 +8,7 @@
  */
 public interface Modification {
    public static enum Type {
-      STORE, REMOVE, CLEAR, PURGE_EXPIRED, PREPARE, COMMIT
+      STORE, REMOVE, CLEAR, PURGE_EXPIRED, PREPARE, COMMIT, LIST
    }
 
    Type getType();

Added: branches/4.1.x/core/src/main/java/org/infinispan/loaders/modifications/ModificationsList.java
===================================================================
--- branches/4.1.x/core/src/main/java/org/infinispan/loaders/modifications/ModificationsList.java	                        (rev 0)
+++ branches/4.1.x/core/src/main/java/org/infinispan/loaders/modifications/ModificationsList.java	2010-08-31 22:20:02 UTC (rev 2290)
@@ -0,0 +1,79 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.loaders.modifications;
+
+import java.util.List;
+
+/**
+ * ModificationsList contains a List<Modification>
+ * 
+ * @author Sanne Grinovero
+ * @since 4.1
+ */
+public class ModificationsList implements Modification {
+   
+   private final List<? extends Modification> list;
+
+   public ModificationsList(List<? extends Modification> list) {
+      this.list = list;
+   }
+
+   @Override
+   public Type getType() {
+      return Modification.Type.LIST;
+   }
+
+   public List<? extends Modification> getList() {
+      return list;
+   }
+
+   @Override
+   public int hashCode() {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + ((list == null) ? 0 : list.hashCode());
+      return result;
+   }
+
+   @Override
+   public boolean equals(Object obj) {
+      if (this == obj)
+         return true;
+      if (obj == null)
+         return false;
+      if (getClass() != obj.getClass())
+         return false;
+      ModificationsList other = (ModificationsList) obj;
+      if (list == null) {
+         if (other.list != null)
+            return false;
+      } else if (!list.equals(other.list))
+         return false;
+      return true;
+   }
+   
+   @Override
+   public String toString() {
+      return "ModificationsList: [" + list + "]";
+   }
+
+}

Modified: branches/4.1.x/core/src/main/java/org/infinispan/loaders/modifications/Prepare.java
===================================================================
--- branches/4.1.x/core/src/main/java/org/infinispan/loaders/modifications/Prepare.java	2010-08-31 22:18:46 UTC (rev 2289)
+++ branches/4.1.x/core/src/main/java/org/infinispan/loaders/modifications/Prepare.java	2010-08-31 22:20:02 UTC (rev 2290)
@@ -80,5 +80,16 @@
       result = 31 * result + (isOnePhase ? 1 : 0);
       return result;
    }
+   
+   @Override
+   public String toString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append("Prepare:");
+      sb.append(tx);
+      sb.append(" isOnePhase:");
+      sb.append(String.valueOf(isOnePhase));
+      sb.append(";[").append(list).append("]");
+      return sb.toString();
+   }
 
 }

Modified: branches/4.1.x/core/src/test/java/org/infinispan/loaders/FlushingAsyncStoreTest.java
===================================================================
--- branches/4.1.x/core/src/test/java/org/infinispan/loaders/FlushingAsyncStoreTest.java	2010-08-31 22:18:46 UTC (rev 2289)
+++ branches/4.1.x/core/src/test/java/org/infinispan/loaders/FlushingAsyncStoreTest.java	2010-08-31 22:20:02 UTC (rev 2290)
@@ -3,7 +3,6 @@
 import java.io.IOException;
 import java.sql.SQLException;
 
-import org.apache.commons.math.stat.inference.TestUtils;
 import org.infinispan.config.CacheLoaderManagerConfig;
 import org.infinispan.config.Configuration;
 import org.infinispan.container.entries.InternalCacheEntry;
@@ -13,7 +12,6 @@
 import org.infinispan.test.SingleCacheManagerTest;
 import org.infinispan.test.TestingUtil;
 import org.infinispan.test.fwk.TestCacheManagerFactory;
-import org.testng.annotations.AfterClass;
 import org.testng.annotations.Test;
 
 /**
@@ -45,7 +43,7 @@
       return TestCacheManagerFactory.createCacheManager(config);
    }
 
-   @Test (timeOut = 10000)
+   @Test(timeOut = 10000)
    public void writeOnStorage() throws IOException, ClassNotFoundException, SQLException, InterruptedException {
       cache = cacheManager.getCache("AsyncStoreInMemory");
       cache.put("key1", "value");
@@ -60,11 +58,6 @@
       assert "value".equals(cache.get("key1"));
    }
    
-   @AfterClass
-   public void removeStore(){
-      TestUtils a; 
-   }
-
    public static class SlowCacheStoreConfig extends DummyInMemoryCacheStore.Cfg {
       public SlowCacheStoreConfig() {
          setCacheLoaderClassName(SlowCacheStore.class.getName());

Modified: branches/4.1.x/core/src/test/java/org/infinispan/loaders/UnnnecessaryLoadingTest.java
===================================================================
--- branches/4.1.x/core/src/test/java/org/infinispan/loaders/UnnnecessaryLoadingTest.java	2010-08-31 22:18:46 UTC (rev 2289)
+++ branches/4.1.x/core/src/test/java/org/infinispan/loaders/UnnnecessaryLoadingTest.java	2010-08-31 22:20:02 UTC (rev 2290)
@@ -45,6 +45,7 @@
       return cm;
    }
 
+   @Test
    public void testRepeatedLoads() throws CacheLoaderException {
       CacheLoaderManager clm = TestingUtil.extractComponent(cache, CacheLoaderManager.class);
       ChainingCacheStore ccs = (ChainingCacheStore) clm.getCacheLoader();
@@ -66,6 +67,7 @@
       assert countingCS.numContains == 0 : "Expected 0, was " + countingCS.numContains;
    }
 
+   @Test
    public void testSkipCacheFlagUsage() throws CacheLoaderException {
       CacheLoaderManager clm = TestingUtil.extractComponent(cache, CacheLoaderManager.class);
       ChainingCacheStore ccs = (ChainingCacheStore) clm.getCacheLoader();

Modified: branches/4.1.x/core/src/test/java/org/infinispan/loaders/decorators/AsyncTest.java
===================================================================
--- branches/4.1.x/core/src/test/java/org/infinispan/loaders/decorators/AsyncTest.java	2010-08-31 22:18:46 UTC (rev 2289)
+++ branches/4.1.x/core/src/test/java/org/infinispan/loaders/decorators/AsyncTest.java	2010-08-31 22:20:02 UTC (rev 2290)
@@ -8,7 +8,6 @@
 import org.infinispan.loaders.dummy.DummyInMemoryCacheStore;
 import org.infinispan.loaders.modifications.Clear;
 import org.infinispan.loaders.modifications.Modification;
-import org.infinispan.loaders.modifications.Prepare;
 import org.infinispan.loaders.modifications.Remove;
 import org.infinispan.loaders.modifications.Store;
 import org.infinispan.test.AbstractInfinispanTest;
@@ -37,7 +36,7 @@
 import static org.infinispan.test.TestingUtil.k;
 import static org.infinispan.test.TestingUtil.v;
 
- at Test(groups = "unit", testName = "loaders.decorators.AsyncTest")
+ at Test(groups = "unit", testName = "loaders.decorators.AsyncTest", sequential=true)
 public class AsyncTest extends AbstractInfinispanTest {
    private static final Log log = LogFactory.getLog(AsyncTest.class);
    AsyncStore store;
@@ -52,7 +51,7 @@
       asyncConfig = new AsyncStoreConfig();
       asyncConfig.setThreadPoolSize(10);
       store = new AsyncStore(underlying, asyncConfig);
-      dummyCfg = new DummyInMemoryCacheStore.Cfg();
+      dummyCfg = new DummyInMemoryCacheStore.Cfg("AsyncStoreTests",false);
       dummyCfg.setStore(AsyncTest.class.getName());
       store.init(dummyCfg, null, null);
       store.start();
@@ -64,6 +63,7 @@
       if (store != null) store.stop();
    }
 
+   @Test(timeOut=10000)
    public void testPutRemove() throws Exception {
       final int number = 1000;
       String key = "testPutRemove-k-";
@@ -72,6 +72,7 @@
       doTestRemove(number, key);
    }
 
+   @Test(timeOut=10000)
    public void testPutClearPut() throws Exception {
       final int number = 1000;
       String key = "testPutClearPut-k-";
@@ -80,10 +81,10 @@
       doTestClear(number, key);
       value = "testPutClearPut-v[2]-";
       doTestPut(number, key, value);
-
       doTestRemove(number, key);
    }
 
+   @Test(timeOut=10000)
    public void testMultiplePutsOnSameKey() throws Exception {
       final int number = 1000;
       String key = "testMultiplePutsOnSameKey-k";
@@ -92,6 +93,7 @@
       doTestSameKeyRemove(key);
    }
 
+   @Test(timeOut=10000)
    public void testRestrictionOnAddingToAsyncQueue() throws Exception {
       store.remove("blah");
 
@@ -174,10 +176,8 @@
          mods.add(new Remove(k1));
          GlobalTransaction tx = gtf.newGlobalTransaction(null, false);
          store.prepare(mods, tx, false);
-         barrier.await(5, TimeUnit.SECONDS);
 
-         assert 1 == localMods.size();
-         assert localMods.entrySet().iterator().next().getKey() instanceof Prepare;
+         assert 0 == localMods.size();
          assert !store.containsKey(k1);
          assert !store.containsKey(k2);
 
@@ -185,6 +185,8 @@
          barrier.await(5, TimeUnit.SECONDS);
          assert store.load(k2).getValue().equals(v2);
          assert !store.containsKey(k1);
+         assert 2 == localMods.size();
+         assert new Remove(k1).equals(localMods.get(k1));
       } finally {
          store.delegate.clear();
          store.stop();
@@ -246,12 +248,12 @@
          mods.add(new Remove(k1));
          GlobalTransaction tx = gtf.newGlobalTransaction(null, false);
          store.prepare(mods, tx, false);
-         barrier.await(5, TimeUnit.SECONDS);
+         Thread.sleep(200); //verify that work is not performed until commit
          assert 0 == storeCount.get();
          assert 0 == removeCount.get();
          assert 0 == clearCount.get();
          store.commit(tx);
-         barrier.await(5, TimeUnit.SECONDS);
+         barrier.await(5, TimeUnit.SECONDS); //modifications applied all at once
          assert 1 == storeCount.get() : "Store count was " + storeCount.get();
          assert 1 == removeCount.get();
          assert 0 == clearCount.get();
@@ -267,14 +269,14 @@
          mods.add(new Remove(k2));
          tx = gtf.newGlobalTransaction(null, false);
          store.prepare(mods, tx, false);
-         barrier.await(5, TimeUnit.SECONDS);
+         Thread.sleep(200); //verify that work is not performed until commit
          assert 0 == storeCount.get();
          assert 0 == removeCount.get();
          assert 0 == clearCount.get();
          store.commit(tx);
          barrier.await(5, TimeUnit.SECONDS);
          assert 0 == storeCount.get() : "Store count was " + storeCount.get();
-         assert 0 == removeCount.get();
+         assert 1 == removeCount.get();
          assert 1 == clearCount.get();
 
          storeCount.set(0);
@@ -288,7 +290,7 @@
          mods.add(new Store(InternalEntryFactory.create(k3, v3)));         
          tx = gtf.newGlobalTransaction(null, false);
          store.prepare(mods, tx, false);
-         barrier.await(5, TimeUnit.SECONDS);
+         Thread.sleep(200);
          assert 0 == storeCount.get();
          assert 0 == removeCount.get();
          assert 0 == clearCount.get();
@@ -306,14 +308,14 @@
          mods.add(new Remove(k1));
          tx = gtf.newGlobalTransaction(null, false);
          store.prepare(mods, tx, false);
-         barrier.await(5, TimeUnit.SECONDS);
+         Thread.sleep(200);
          assert 0 == storeCount.get();
          assert 0 == removeCount.get();
          assert 0 == clearCount.get();
          store.commit(tx);
          barrier.await(5, TimeUnit.SECONDS);
          assert 0 == storeCount.get() : "Store count was " + storeCount.get();
-         assert 0 == removeCount.get();
+         assert 1 == removeCount.get();
          assert 1 == clearCount.get();
 
          storeCount.set(0);
@@ -324,7 +326,7 @@
          mods.add(new Store(InternalEntryFactory.create(k1, v1)));         
          tx = gtf.newGlobalTransaction(null, false);
          store.prepare(mods, tx, false);
-         barrier.await(5, TimeUnit.SECONDS);
+         Thread.sleep(200);
          assert 0 == storeCount.get();
          assert 0 == removeCount.get();
          assert 0 == clearCount.get();
@@ -341,9 +343,13 @@
    }
 
    private void doTestPut(int number, String key, String value) throws Exception {
-      for (int i = 0; i < number; i++) store.store(InternalEntryFactory.create(key + i, value + i));
+      for (int i = 0; i < number; i++) {
+         InternalCacheEntry cacheEntry = InternalEntryFactory.create(key + i, value + i);
+         store.store(cacheEntry);
+      }
 
-      TestingUtil.sleepRandom(1000);
+      store.stop();
+      store.start();
 
       InternalCacheEntry[] entries = new InternalCacheEntry[number];
       for (int i = 0; i < number; i++) {
@@ -360,7 +366,7 @@
                if (entry != null) {
                   assert entry.getValue().equals(value + i);
                } else {
-                  TestingUtil.sleepRandom(1000);
+                  TestingUtil.sleepThread(20, "still waiting for key to appear: " + key + i);
                }
             }
          }
@@ -368,14 +374,16 @@
    }
 
    private void doTestSameKeyPut(int number, String key, String value) throws Exception {
-      for (int i = 0; i < number; i++)
+      for (int i = 0; i < number; i++) {
          store.store(InternalEntryFactory.create(key, value + i));
+      }
 
-      TestingUtil.sleepThread(5000);
+      store.stop();
+      store.start();
       InternalCacheEntry entry;
       boolean success = false;
       for (int i = 0; i < 120; i++) {
-         TestingUtil.sleepRandom(1000);
+         TestingUtil.sleepThread(20);
          entry = store.load(key);
          success = entry.getValue().equals(value + (number - 1));
          if (success) break;
@@ -386,7 +394,8 @@
    private void doTestRemove(int number, String key) throws Exception {
       for (int i = 0; i < number; i++) store.remove(key + i);
 
-      TestingUtil.sleepRandom(1000);
+      store.stop();//makes sure the store is flushed
+      store.start();
 
       InternalCacheEntry[] entries = new InternalCacheEntry[number];
       for (int i = 0; i < number; i++) {
@@ -396,8 +405,7 @@
       for (int i = 0; i < number; i++) {
          InternalCacheEntry entry = entries[i];
          while (entry != null) {
-            log.info("Entry still not null {0}", entry);
-            TestingUtil.sleepRandom(1000);
+            TestingUtil.sleepThread(20, "still waiting for key to be removed: " + key + i);
             entry = store.load(key + i);
          }
       }
@@ -407,14 +415,15 @@
       store.remove(key);
       InternalCacheEntry entry;
       do {
-         TestingUtil.sleepRandom(1000);
+         TestingUtil.sleepThread(20, "still waiting for key to be removed: " + key);
          entry = store.load(key);
       } while (entry != null);
    }
 
    private void doTestClear(int number, String key) throws Exception {
       store.clear();
-      TestingUtil.sleepRandom(1000);
+      store.stop();
+      store.start();
 
       InternalCacheEntry[] entries = new InternalCacheEntry[number];
       for (int i = 0; i < number; i++) {
@@ -424,8 +433,7 @@
       for (int i = 0; i < number; i++) {
          InternalCacheEntry entry = entries[i];
          while (entry != null) {
-            log.info("Entry still not null {0}", entry);
-            TestingUtil.sleepRandom(1000);
+            TestingUtil.sleepThread(20, "still waiting for key to be removed: " + key + i);
             entry = store.load(key + i);
          }
       }

Added: branches/4.1.x/core/src/test/java/org/infinispan/loaders/decorators/BatchAsyncCacheStoreTest.java
===================================================================
--- branches/4.1.x/core/src/test/java/org/infinispan/loaders/decorators/BatchAsyncCacheStoreTest.java	                        (rev 0)
+++ branches/4.1.x/core/src/test/java/org/infinispan/loaders/decorators/BatchAsyncCacheStoreTest.java	2010-08-31 22:20:02 UTC (rev 2290)
@@ -0,0 +1,142 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.loaders.decorators;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.HashMap;
+
+import org.infinispan.AdvancedCache;
+import org.infinispan.config.CacheLoaderManagerConfig;
+import org.infinispan.config.Configuration;
+import org.infinispan.loaders.CacheStoreConfig;
+import org.infinispan.loaders.decorators.AsyncStoreConfig;
+import org.infinispan.loaders.file.FileCacheStoreConfig;
+import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.test.SingleCacheManagerTest;
+import org.infinispan.test.TestingUtil;
+import org.infinispan.test.fwk.TestCacheManagerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Optional;
+import org.testng.annotations.Parameters;
+import org.testng.annotations.Test;
+
+/**
+ * BatchAsyncCacheStoreTest performs some additional tests on the AsyncStore
+ * but using batches.
+ * 
+ * @author Sanne Grinovero
+ * @since 4.1
+ */
+ at Test(groups = "functional", testName = "loaders.AsyncCacheStoreTest")
+public class BatchAsyncCacheStoreTest extends SingleCacheManagerTest {
+
+   private final HashMap cacheCopy = new HashMap();
+
+   public BatchAsyncCacheStoreTest() {
+      cleanup = CleanupPhase.AFTER_METHOD;
+   }
+
+   @Override
+   protected EmbeddedCacheManager createCacheManager() throws Exception {
+      Configuration configuration = new Configuration();
+      configuration.setCacheMode(Configuration.CacheMode.LOCAL);
+      configuration.setInvocationBatchingEnabled(true);
+      enableTestJdbcStorage(configuration);
+      return TestCacheManagerFactory.createCacheManager(configuration);
+   }
+
+   private void enableTestJdbcStorage(Configuration configuration) throws Exception {
+      CacheStoreConfig fileStoreConfiguration = createCacheStoreConfig();
+      AsyncStoreConfig asyncStoreConfig = new AsyncStoreConfig();
+      asyncStoreConfig.setEnabled(true);
+      asyncStoreConfig.setThreadPoolSize(1);
+      fileStoreConfiguration.setAsyncStoreConfig(asyncStoreConfig);
+      CacheLoaderManagerConfig loaderManagerConfig = configuration.getCacheLoaderManagerConfig();
+      loaderManagerConfig.setPassivation(false);
+      loaderManagerConfig.setPreload(false);
+      loaderManagerConfig.setShared(true);
+      loaderManagerConfig.addCacheLoaderConfig(fileStoreConfiguration);
+   }
+
+   @Test
+   public void sequantialOvewritingInBatches() throws IOException, ClassNotFoundException, SQLException, InterruptedException {
+      cache = cacheManager.getCache();
+      AdvancedCache<Object,Object> advancedCache = cache.getAdvancedCache();
+      for (int i = 0; i < 2000;) {
+         advancedCache.startBatch();
+         putAValue(advancedCache, i++);
+         putAValue(advancedCache, i++);
+         advancedCache.endBatch(true);
+      }
+      cacheCopy.putAll(cache);
+      cache.stop();
+      cacheManager.stop();
+   }
+
+   private void putAValue(AdvancedCache<Object, Object> advancedCache, int i) {
+      String key = "k" + (i % 13);
+      String value = "V" + i;
+      advancedCache.put(key, value);
+   }
+
+   @Test(dependsOnMethods = "sequantialOvewritingInBatches")
+   public void indexWasStored() throws IOException {
+      cache = cacheManager.getCache();
+      assert cache.isEmpty();
+      boolean failed = false;
+      for (Object key : cacheCopy.keySet()) {
+         Object expected = cacheCopy.get(key);
+         Object actual = cache.get(key);
+         if (!expected.equals(actual)) {
+            System.out.println("Failure on key '" + key.toString() + "' expected value: '" + expected + "' actual value: '" + actual + "'");
+            failed = true;
+         }
+      }
+      Assert.assertFalse(failed);
+      Assert.assertEquals(cacheCopy.keySet().size(), cache.keySet().size(), "have a different number of keys");
+   }
+
+   private String tmpDirectory;
+
+   @BeforeClass
+   @Parameters( { "basedir" })
+   protected void setUpTempDir(@Optional(value = "/tmp") String basedir) {
+      tmpDirectory = TestingUtil.tmpDirectory(basedir, this);
+      new File(tmpDirectory).mkdirs();
+   }
+
+   @AfterClass
+   protected void clearTempDir() {
+      TestingUtil.recursiveFileRemove(tmpDirectory);
+   }
+
+   protected CacheStoreConfig createCacheStoreConfig() throws Exception {
+      FileCacheStoreConfig cfg = new FileCacheStoreConfig();
+      cfg.setLocation(tmpDirectory);
+      return cfg;
+   }
+
+}

Modified: branches/4.1.x/core/src/test/java/org/infinispan/statetransfer/StateTransferFileCacheLoaderFunctionalTest.java
===================================================================
--- branches/4.1.x/core/src/test/java/org/infinispan/statetransfer/StateTransferFileCacheLoaderFunctionalTest.java	2010-08-31 22:18:46 UTC (rev 2289)
+++ branches/4.1.x/core/src/test/java/org/infinispan/statetransfer/StateTransferFileCacheLoaderFunctionalTest.java	2010-08-31 22:20:02 UTC (rev 2290)
@@ -73,10 +73,10 @@
    @BeforeTest
    @Parameters({"basedir"})
    protected void setUpTempDir(@Optional(value = "/tmp") String basedir) {
-      tmpDirectory1 = basedir + TestingUtil.TEST_PATH + File.separator + "1" + File.separator + getClass().getSimpleName();
-      tmpDirectory2 = basedir + TestingUtil.TEST_PATH + File.separator + "2" + File.separator + getClass().getSimpleName();
-      tmpDirectory3 = basedir + TestingUtil.TEST_PATH + File.separator + "3" + File.separator + getClass().getSimpleName();
-      tmpDirectory4 = basedir + TestingUtil.TEST_PATH + File.separator + "4" + File.separator + getClass().getSimpleName();
+      tmpDirectory1 = basedir + File.separator + TestingUtil.TEST_PATH + File.separator + "1" + File.separator + getClass().getSimpleName();
+      tmpDirectory2 = basedir + File.separator + TestingUtil.TEST_PATH + File.separator + "2" + File.separator + getClass().getSimpleName();
+      tmpDirectory3 = basedir + File.separator + TestingUtil.TEST_PATH + File.separator + "3" + File.separator + getClass().getSimpleName();
+      tmpDirectory4 = basedir + File.separator + TestingUtil.TEST_PATH + File.separator + "4" + File.separator + getClass().getSimpleName();
    }
 
    @AfterMethod(alwaysRun = true)

Modified: branches/4.1.x/core/src/test/java/org/infinispan/test/TestingUtil.java
===================================================================
--- branches/4.1.x/core/src/test/java/org/infinispan/test/TestingUtil.java	2010-08-31 22:18:46 UTC (rev 2289)
+++ branches/4.1.x/core/src/test/java/org/infinispan/test/TestingUtil.java	2010-08-31 22:20:02 UTC (rev 2290)
@@ -373,10 +373,16 @@
     * @param sleeptime number of ms to sleep
     */
    public static void sleepThread(long sleeptime) {
+      sleepThread(sleeptime, null);
+   }
+   
+   public static void sleepThread(long sleeptime, String messageOnInterrupt) {
       try {
          Thread.sleep(sleeptime);
       }
       catch (InterruptedException ie) {
+         if (messageOnInterrupt != null)
+            log.error(messageOnInterrupt);
       }
    }
 



More information about the infinispan-commits mailing list