[infinispan-commits] Infinispan SVN: r2290 - in branches/4.1.x/core/src: main/java/org/infinispan/loaders/modifications and 4 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Tue Aug 31 18:20:03 EDT 2010
Author: sannegrinovero
Date: 2010-08-31 18:20:02 -0400 (Tue, 31 Aug 2010)
New Revision: 2290
Added:
branches/4.1.x/core/src/main/java/org/infinispan/loaders/modifications/ModificationsList.java
branches/4.1.x/core/src/test/java/org/infinispan/loaders/decorators/BatchAsyncCacheStoreTest.java
Modified:
branches/4.1.x/core/src/main/java/org/infinispan/loaders/decorators/AsyncStore.java
branches/4.1.x/core/src/main/java/org/infinispan/loaders/decorators/AsyncStoreConfig.java
branches/4.1.x/core/src/main/java/org/infinispan/loaders/modifications/Commit.java
branches/4.1.x/core/src/main/java/org/infinispan/loaders/modifications/Modification.java
branches/4.1.x/core/src/main/java/org/infinispan/loaders/modifications/Prepare.java
branches/4.1.x/core/src/test/java/org/infinispan/loaders/FlushingAsyncStoreTest.java
branches/4.1.x/core/src/test/java/org/infinispan/loaders/UnnnecessaryLoadingTest.java
branches/4.1.x/core/src/test/java/org/infinispan/loaders/decorators/AsyncTest.java
branches/4.1.x/core/src/test/java/org/infinispan/statetransfer/StateTransferFileCacheLoaderFunctionalTest.java
branches/4.1.x/core/src/test/java/org/infinispan/test/TestingUtil.java
Log:
[ISPN-618] (AsyncStore fails to save all values when using transactions or batch operations) and [ISPN-619] (Aggregate changes from multiple transactions in AsyncStore) - branch 4.1
Modified: branches/4.1.x/core/src/main/java/org/infinispan/loaders/decorators/AsyncStore.java
===================================================================
--- branches/4.1.x/core/src/main/java/org/infinispan/loaders/decorators/AsyncStore.java 2010-08-31 22:18:46 UTC (rev 2289)
+++ branches/4.1.x/core/src/main/java/org/infinispan/loaders/decorators/AsyncStore.java 2010-08-31 22:20:02 UTC (rev 2290)
@@ -8,10 +8,8 @@
import org.infinispan.loaders.CacheLoaderException;
import org.infinispan.loaders.CacheStore;
import org.infinispan.loaders.modifications.Clear;
-import org.infinispan.loaders.modifications.Commit;
import org.infinispan.loaders.modifications.Modification;
-import org.infinispan.loaders.modifications.Prepare;
-import org.infinispan.loaders.modifications.PurgeExpired;
+import org.infinispan.loaders.modifications.ModificationsList;
import org.infinispan.loaders.modifications.Remove;
import org.infinispan.loaders.modifications.Store;
import org.infinispan.marshall.StreamingMarshaller;
@@ -20,8 +18,6 @@
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
-import java.util.ArrayList;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -30,12 +26,12 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
@@ -61,6 +57,7 @@
*
* @author Manik Surtani
* @author Galder Zamarreño
+ * @author Sanne Grinovero
* @since 4.0
*/
public class AsyncStore extends AbstractDelegatingStore {
@@ -68,24 +65,31 @@
private static final boolean trace = log.isTraceEnabled();
private static final AtomicInteger threadId = new AtomicInteger(0);
private final AtomicBoolean stopped = new AtomicBoolean(true);
+
private final AsyncStoreConfig asyncStoreConfig;
-
+ private Map<GlobalTransaction, List<? extends Modification>> transactions;
+
/**
- * Approximate count of number of modified keys. At points, it could contain negative values.
+ * This is used as marker to shutdown the AsyncStoreCoordinator
*/
- private final AtomicInteger count = new AtomicInteger(0);
- private final ReentrantLock lock = new ReentrantLock();
- private final Condition notEmpty = lock.newCondition();
-
+ private static final Modification QUIT_SIGNAL = new Clear();
+
+ /**
+ * clear() is performed in sync by the one thread of storeCoordinator, while blocking all
+ * other threads interacting with the decorated store.
+ */
+ private final ReadWriteLock clearAllLock = new ReentrantReadWriteLock();
+ private final Lock clearAllReadLock = clearAllLock.readLock();
+ private final Lock clearAllWriteLock = clearAllLock.writeLock();
+ private final Lock stateMapLock = new ReentrantLock();
+
ExecutorService executor;
- private List<Future> processorFutures;
- private final ReadWriteLock mapLock = new ReentrantReadWriteLock();
- private final Lock read = mapLock.readLock();
- private final Lock write = mapLock.writeLock();
private int concurrencyLevel;
- @GuardedBy("mapLock")
+ @GuardedBy("stateMapLock")
protected ConcurrentMap<Object, Modification> state;
private ReleaseAllLockContainer lockContainer;
+ private final LinkedBlockingQueue<Modification> changesDeque = new LinkedBlockingQueue<Modification>();
+ public volatile boolean lastAsyncProcessorShutsDownExecutor = false;
public AsyncStore(CacheStore delegate, AsyncStoreConfig asyncStoreConfig) {
super(delegate);
@@ -97,77 +101,97 @@
super.init(config, cache, m);
concurrencyLevel = cache == null || cache.getConfiguration() == null ? 16 : cache.getConfiguration().getConcurrencyLevel();
lockContainer = new ReleaseAllLockContainer(concurrencyLevel);
+ transactions = new ConcurrentHashMap<GlobalTransaction, List<? extends Modification>>(64, 0.75f, concurrencyLevel);
}
@Override
public void store(InternalCacheEntry ed) {
- enqueue(ed.getKey(), new Store(ed));
+ enqueue(new Store(ed));
}
@Override
public boolean remove(Object key) {
- enqueue(key, new Remove(key));
+ enqueue(new Remove(key));
return true;
}
@Override
public void clear() {
Clear clear = new Clear();
- enqueue(clear, clear);
+ checkNotStopped(); //check we can change the changesDeque
+ changesDeque.clear();
+ enqueue(clear);
}
@Override
- public void purgeExpired() {
- PurgeExpired purge = new PurgeExpired();
- enqueue(purge, purge);
+ public void prepare(List<? extends Modification> mods, GlobalTransaction tx, boolean isOnePhase) throws CacheLoaderException {
+ if (isOnePhase) {
+ enqueueModificationsList(mods);
+ } else {
+ transactions.put(tx, mods);
+ }
}
-
+
@Override
- public void prepare(List<? extends Modification> list, GlobalTransaction tx, boolean isOnePhase) {
- Prepare prepare = new Prepare(list, tx, isOnePhase);
- enqueue(prepare, prepare);
+ public void rollback(GlobalTransaction tx) {
+ transactions.remove(tx);
}
@Override
public void commit(GlobalTransaction tx) throws CacheLoaderException {
- Commit commit = new Commit(tx);
- enqueue(commit, commit);
+ List<? extends Modification> list = transactions.remove(tx);
+ enqueueModificationsList(list);
}
+
+ protected void enqueueModificationsList(List<? extends Modification> mods) throws CacheLoaderException {
+ if (mods != null && !mods.isEmpty()) {
+ enqueue(new ModificationsList(mods));
+ }
+ }
@Override
public void start() throws CacheLoaderException {
state = newStateMap();
log.info("Async cache loader starting {0}", this);
stopped.set(false);
+ lastAsyncProcessorShutsDownExecutor = false;
super.start();
int poolSize = asyncStoreConfig.getThreadPoolSize();
- executor = Executors.newFixedThreadPool(poolSize, new ThreadFactory() {
- public Thread newThread(Runnable r) {
- Thread t = new Thread(r, "CoalescedAsyncStore-" + threadId.getAndIncrement());
- t.setDaemon(true);
- return t;
- }
- });
- processorFutures = new ArrayList<Future>(poolSize);
- for (int i = 0; i < poolSize; i++) processorFutures.add(executor.submit(createAsyncProcessor()));
+ executor = new ThreadPoolExecutor(poolSize, poolSize, 0L, TimeUnit.MILLISECONDS,
+ // note the use of poolSize+1 as maximum workingQueue together with DiscardPolicy:
+ // this way when a new AsyncProcessor is started unnecessarily we discard it
+ // before it takes locks to perform no work
+ // this way we save memory from the executor queue, CPU, and also avoid
+ // any possible RejectedExecutionException.
+ new LinkedBlockingQueue<Runnable>(poolSize + 1),
+ new ThreadFactory() {
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(r, "CoalescedAsyncStore-" + threadId.getAndIncrement());
+ t.setDaemon(true);
+ return t;
+ }
+ },
+ new ThreadPoolExecutor.DiscardPolicy()
+ );
+ startStoreCoordinator();
}
+ private void startStoreCoordinator() {
+ ExecutorService storeCoordinator = Executors.newFixedThreadPool(1);
+ storeCoordinator.execute( new AsyncStoreCoordinator() );
+ storeCoordinator.shutdown();
+ }
+
@Override
public void stop() throws CacheLoaderException {
stopped.set(true);
- if (executor != null) {
- for (Future f : processorFutures) f.cancel(true);
- executor.shutdown();
- try {
- boolean terminated = executor.isTerminated();
- while (!terminated) {
- terminated = executor.awaitTermination(60, TimeUnit.SECONDS);
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
+ try {
+ changesDeque.put(QUIT_SIGNAL);
+ executor.awaitTermination(asyncStoreConfig.getShutdownTimeout(), TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ log.error("Interrupted or timeout while waiting for AsyncStore worker threads to push all state to the decorated store", e);
+ Thread.currentThread().interrupt();
}
- executor = null;
super.stop();
}
@@ -182,81 +206,46 @@
case REMOVE:
super.remove(entry.getKey());
break;
- case CLEAR:
- super.clear();
- break;
- case PURGE_EXPIRED:
- super.purgeExpired();
- break;
- case PREPARE:
- List<? extends Modification> coalesced = coalesceModificationList(((Prepare) mod).getList());
- super.prepare(coalesced, ((Prepare) mod).getTx(), ((Prepare) mod).isOnePhase());
- break;
- case COMMIT:
- super.commit(((Commit) mod).getTx());
- break;
+ default:
+ throw new IllegalArgumentException("Unexpected modification type " + mod.getType());
}
}
}
-
- protected Runnable createAsyncProcessor() {
- return new AsyncProcessor();
+
+ protected boolean applyClear() {
+ try {
+ super.clear();
+ return true;
+ } catch (CacheLoaderException e) {
+ log.error("Error performing clear in AsyncStore", e);
+ return false;
+ }
}
-
- private List<? extends Modification> coalesceModificationList(List<? extends Modification> mods) {
- Map<Object, Modification> keyMods = new HashMap<Object, Modification>();
- List<Modification> coalesced = new ArrayList<Modification>();
- for (Modification mod : mods) {
- switch (mod.getType()) {
- case STORE:
- keyMods.put(((Store) mod).getStoredEntry().getKey(), mod);
- break;
- case CLEAR:
- keyMods.clear(); // remove all pending key modifications
- coalesced.add(mod); // add a clear so that future put/removes do not need to do anything
- break;
- case REMOVE:
- if (!coalesced.isEmpty() && keyMods.containsKey(((Remove) mod).getKey())) {
- keyMods.remove(((Remove) mod).getKey()); // clear, p(k), r(k) sequence should result in no-op for k
- } else if (coalesced.isEmpty()) {
- keyMods.put(((Remove) mod).getKey(), mod);
- }
- break;
- default:
- throw new IllegalArgumentException("Unknown modification type " + mod.getType());
- }
+
+ protected void delegatePurgeExpired() {
+ try {
+ super.purgeExpired();
+ } catch (CacheLoaderException e) {
+ log.error("Error performing PurgeExpired in AsyncStore", e);
}
- coalesced.addAll(keyMods.values());
- return coalesced;
}
- private void enqueue(Object key, Modification mod) {
+ private void enqueue(Modification mod) {
try {
- if (stopped.get()) {
- throw new CacheException("AsyncStore stopped; no longer accepting more entries.");
- }
+ checkNotStopped();
if (trace) log.trace("Enqueuing modification {0}", mod);
- Modification prev = null;
- int c = -1;
- boolean unlock = false;
- try {
- acquireLock(read);
- unlock = true;
- prev = state.put(key, mod); // put the key's latest state in updates
- } finally {
- if (unlock) read.unlock();
- }
- /* Increment can happen outside the lock cos worst case scenario a false not empty would
- * be sent if the swap and decrement happened between the put and the increment. In this
- * case, the corresponding processor would see the map empty and would wait again. This
- * means that we're allowing count to potentially go negative but that's not a problem. */
- if (prev == null) c = count.getAndIncrement();
- if (c == 0) signalNotEmpty();
+ changesDeque.add(mod);
} catch (Exception e) {
throw new CacheException("Unable to enqueue asynchronous task", e);
}
}
+ private void checkNotStopped() {
+ if (stopped.get()) {
+ throw new CacheException("AsyncStore stopped; no longer accepting more entries.");
+ }
+ }
+
private void acquireLock(Lock lock) {
try {
if (!lock.tryLock(asyncStoreConfig.getFlushLockTimeout(), TimeUnit.MILLISECONDS))
@@ -267,105 +256,85 @@
}
}
- private void signalNotEmpty() {
- lock.lock();
- try {
- notEmpty.signal();
- } finally {
- lock.unlock();
- }
- }
-
- private void awaitNotEmptyOrStopped() throws InterruptedException {
- lock.lockInterruptibly();
- try {
- try {
- while (count.get() == 0) {
- if (stopped.get()) {
- notEmpty.signal();
- return;
- }
- notEmpty.await();
- }
- } catch (InterruptedException ie) {
- notEmpty.signal(); // propagate to a non-interrupted thread
- throw ie;
- }
- } finally {
- lock.unlock();
- }
- }
-
- private int decrementAndGet(int delta) {
- for (; ;) {
- int current = count.get();
- int next = current - delta;
- if (count.compareAndSet(current, next)) return next;
- }
- }
-
/**
* Processes modifications taking the latest updates from a state map.
*/
class AsyncProcessor implements Runnable {
- private ConcurrentMap<Object, Modification> swap = newStateMap();
private final Set<Object> lockedKeys = new HashSet<Object>();
+ boolean runAgainAfterWaiting = false;
public void run() {
- while (!Thread.interrupted() && !stopped.get()) {
+ clearAllReadLock.lock();
+ try {
+ innerRun();
+ } catch (Throwable t) {
+ runAgainAfterWaiting = false;
+ log.error("Unexpected error", t);
+ } finally {
+ clearAllReadLock.unlock();
+ }
+ if (runAgainAfterWaiting) {
try {
- run0();
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ // just speedup ignoring more sleep but still make sure to store all data
}
- catch (InterruptedException e) {
- break;
- }
+ ensureMoreWorkIsHandled();
}
-
- try {
- if (trace) log.trace("Process remaining batch {0}", swap.size());
- put(swap);
- if (trace) log.trace("Process remaining queued {0}", state.size());
- while (!state.isEmpty()) run0();
- } catch (InterruptedException e) {
- if (trace) log.trace("Remaining interrupted");
- }
}
-
- void run0() throws InterruptedException {
+
+ private void innerRun() {
+ final ConcurrentMap<Object, Modification> swap;
if (trace) log.trace("Checking for modifications");
- boolean unlock = false;
-
try {
- acquireLock(write);
- unlock = true;
- swap = state;
- state = newStateMap();
+ acquireLock(stateMapLock);
+ try {
+ swap = state;
+ state = newStateMap();
- // This needs doing within the WL section, because if a key is in use, we need to put it back in the state
- // map for later processing and we don't wanna do it in such way that we override a newer value that might
- // have been enqueued by a user thread.
- for (Object key : swap.keySet()) {
- boolean acquired = lockContainer.acquireLock(key, 0, TimeUnit.NANOSECONDS) != null;
- if (trace) log.trace("Lock for key {0} was acquired={1}", key, acquired);
- if (!acquired) {
- Modification prev = swap.remove(key);
- state.put(key, prev);
- } else {
- lockedKeys.add(key);
+ // This needs to be done within the stateMapLock section, because if a key is in use,
+ // we need to put it back in the state
+ // map for later processing and we don't wanna do it in such way that we override a
+ // newer value that might
+ // have been taken already for processing by another instance of this same code.
+ // AsyncStoreCoordinator doesn't need to acquired the same lock as values put by it
+ // will never be overwritten (putIfAbsent below)
+ for (Object key : swap.keySet()) {
+ if (trace) log.trace("Going to process mod key: {0}", key);
+ boolean acquired = false;
+ try {
+ acquired = lockContainer.acquireLock(key, 0, TimeUnit.NANOSECONDS) != null;
+ } catch (InterruptedException e) {
+ log.error("interrupted on acquireLock {0}, 0 nanoseconds!", e);
+ Thread.currentThread().interrupt();
+ return;
+ }
+ if (trace)
+ log.trace("Lock for key {0} was acquired={1}", key, acquired);
+ if (!acquired) {
+ Modification prev = swap.remove(key);
+ Modification didPut = state.putIfAbsent(key, prev); // don't overwrite more recently put work
+ if (didPut == null) {
+ // otherwise a new job is being spawned by the arbiter, so no need to create
+ // a new worker
+ runAgainAfterWaiting = true;
+ }
+ } else {
+ lockedKeys.add(key);
+ }
}
+ } finally {
+ stateMapLock.unlock();
}
- } finally {
- if (unlock) write.unlock();
- }
- try {
- int size = swap.size();
if (swap.isEmpty()) {
- awaitNotEmptyOrStopped();
+ if (lastAsyncProcessorShutsDownExecutor && runAgainAfterWaiting == false) {
+ executor.shutdown();
+ }
+ return;
} else {
- decrementAndGet(size);
-
- if (trace) log.trace("Apply {0} modifications", size);
+ if (trace)
+ log.trace("Apply {0} modifications", swap.size());
int maxRetries = 3;
int attemptNumber = 0;
boolean successful;
@@ -386,22 +355,12 @@
}
}
- boolean put(ConcurrentMap<Object, Modification> mods) throws InterruptedException {
+ boolean put(ConcurrentMap<Object, Modification> mods) {
try {
AsyncStore.this.applyModificationsSync(mods);
return true;
} catch (Exception e) {
- boolean isDebug = log.isDebugEnabled();
- if (isDebug) log.debug("Failed to process async modifications", e);
- Throwable cause = e;
- while (cause != null) {
- if (cause instanceof InterruptedException) {
- // 3rd party code may have cleared the thread interrupt status
- if (isDebug) log.debug("Rethrowing InterruptedException");
- throw (InterruptedException) cause;
- }
- cause = cause.getCause();
- }
+ if (log.isDebugEnabled()) log.debug("Failed to process async modifications", e);
return false;
}
}
@@ -410,7 +369,7 @@
private ConcurrentMap<Object, Modification> newStateMap() {
return new ConcurrentHashMap<Object, Modification>(64, 0.75f, concurrencyLevel);
}
-
+
private static class ReleaseAllLockContainer extends ReentrantPerEntryLockContainer {
private ReleaseAllLockContainer(int concurrencyLevel) {
super(concurrencyLevel);
@@ -423,4 +382,112 @@
}
}
}
+
+ private void ensureMoreWorkIsHandled() {
+ executor.execute(new AsyncProcessor());
+ }
+
+ private class AsyncStoreCoordinator implements Runnable {
+
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ Modification take = changesDeque.take();
+ if (take == QUIT_SIGNAL) {
+ lastAsyncProcessorShutsDownExecutor = true;
+ ensureMoreWorkIsHandled();
+ return;
+ }
+ else {
+ handleSafely(take);
+ }
+ } catch (InterruptedException e) {
+ log.error("AsyncStoreCoordinator interrupted", e);
+ return;
+ } catch (Throwable t) {
+ log.error("Unexpected error in AsyncStoreCoordinator thread. AsyncStore is dead!", t);
+ }
+ }
+ }
+
+ private void handleSafely(Modification mod) {
+ try {
+ if (trace) log.trace("taking from modification queue: {0}", mod);
+ handle(mod, false);
+ } catch (Exception e) {
+ log.error("Error while handling Modification in AsyncStore", e);
+ }
+ }
+
+ private void handle(Modification mod, boolean nested) {
+ boolean asyncProcessorNeeded = false;
+ switch (mod.getType()) {
+ case STORE:
+ Store store = (Store) mod;
+ stateMapLock.lock();
+ state.put(store.getStoredEntry().getKey(), store);
+ stateMapLock.unlock();
+ asyncProcessorNeeded = true;
+ break;
+ case REMOVE:
+ Remove remove = (Remove) mod;
+ stateMapLock.lock();
+ state.put(remove.getKey(), remove);
+ stateMapLock.unlock();
+ asyncProcessorNeeded = true;
+ break;
+ case CLEAR:
+ performClear();
+ break;
+ case PURGE_EXPIRED:
+ delegatePurgeExpired();
+ break;
+ case LIST:
+ applyModificationsList((ModificationsList) mod);
+ asyncProcessorNeeded = true;
+ break;
+ default:
+ throw new IllegalArgumentException("Unexpected modification type " + mod.getType());
+ }
+ if (asyncProcessorNeeded && !nested) {
+ // we know when it's possible for some work to be done, starting short-lived
+ // AsyncProcessor(s) simplifies shutdown process.
+ ensureMoreWorkIsHandled();
+ }
+ }
+
+ private void applyModificationsList(ModificationsList mod) {
+ for (Modification m : mod.getList()) {
+ handle(m, true);
+ }
+ }
+
+ private void performClear() {
+ state.clear(); // cancel any other scheduled changes
+ clearAllWriteLock.lock(); // ensure no other tasks concurrently working
+ try {
+ // to acquire clearAllWriteLock we might have had to wait for N AsyncProcessor to have finished
+ // (as they have to release all clearAllReadLock),
+ // so as they might have put back some work to the state map, clear the state map again inside the writeLock:
+ state.clear();
+ if (trace) log.trace("Performed clear operation");
+ int maxRetries = 3;
+ int attemptNumber = 0;
+ boolean successful = false;
+ do {
+ if (attemptNumber > 0 && log.isDebugEnabled())
+ log.debug("Retrying clear() due to previous failure. {0} attempts left.", maxRetries - attemptNumber);
+ successful = applyClear();
+ attemptNumber++;
+ } while (!successful && attemptNumber <= maxRetries);
+ if (!successful) {
+ log.error("Clear() operation in async store could not be performed");
+ }
+ } finally {
+ clearAllWriteLock.unlock();
+ }
+ }
+
+ }
}
Modified: branches/4.1.x/core/src/main/java/org/infinispan/loaders/decorators/AsyncStoreConfig.java
===================================================================
--- branches/4.1.x/core/src/main/java/org/infinispan/loaders/decorators/AsyncStoreConfig.java 2010-08-31 22:18:46 UTC (rev 2289)
+++ branches/4.1.x/core/src/main/java/org/infinispan/loaders/decorators/AsyncStoreConfig.java 2010-08-31 22:20:02 UTC (rev 2290)
@@ -42,6 +42,11 @@
@Dynamic
protected Long flushLockTimeout = 5000L;
+ /** @configRef desc="Timeout to stop the cache store. When the store is stopped it's possible that some modifications still need to be applied;
+ * you likely want to set a very large timeout to make sure to not loose data." */
+ @Dynamic
+ protected Long shutdownTimeout = 7200L;
+
@XmlAttribute
public Boolean isEnabled() {
return enabled;
@@ -70,8 +75,18 @@
public void setFlushLockTimeout(Long stateLockTimeout) {
testImmutability("flushLockTimeout");
this.flushLockTimeout = stateLockTimeout;
- }
+ }
+ @XmlAttribute
+ public Long getShutdownTimeout() {
+ return shutdownTimeout;
+ }
+
+ public void setShutdownTimeout(Long shutdownTimeout) {
+ testImmutability("shutdownTimeout");
+ this.shutdownTimeout = shutdownTimeout;
+ }
+
@Override
public AsyncStoreConfig clone() {
try {
Modified: branches/4.1.x/core/src/main/java/org/infinispan/loaders/modifications/Commit.java
===================================================================
--- branches/4.1.x/core/src/main/java/org/infinispan/loaders/modifications/Commit.java 2010-08-31 22:18:46 UTC (rev 2289)
+++ branches/4.1.x/core/src/main/java/org/infinispan/loaders/modifications/Commit.java 2010-08-31 22:20:02 UTC (rev 2290)
@@ -62,5 +62,10 @@
Commit other = (Commit) obj;
return tx.equals(other.tx);
}
+
+ @Override
+ public String toString() {
+ return "Commit: " + tx;
+ }
}
Modified: branches/4.1.x/core/src/main/java/org/infinispan/loaders/modifications/Modification.java
===================================================================
--- branches/4.1.x/core/src/main/java/org/infinispan/loaders/modifications/Modification.java 2010-08-31 22:18:46 UTC (rev 2289)
+++ branches/4.1.x/core/src/main/java/org/infinispan/loaders/modifications/Modification.java 2010-08-31 22:20:02 UTC (rev 2290)
@@ -8,7 +8,7 @@
*/
public interface Modification {
public static enum Type {
- STORE, REMOVE, CLEAR, PURGE_EXPIRED, PREPARE, COMMIT
+ STORE, REMOVE, CLEAR, PURGE_EXPIRED, PREPARE, COMMIT, LIST
}
Type getType();
Added: branches/4.1.x/core/src/main/java/org/infinispan/loaders/modifications/ModificationsList.java
===================================================================
--- branches/4.1.x/core/src/main/java/org/infinispan/loaders/modifications/ModificationsList.java (rev 0)
+++ branches/4.1.x/core/src/main/java/org/infinispan/loaders/modifications/ModificationsList.java 2010-08-31 22:20:02 UTC (rev 2290)
@@ -0,0 +1,79 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.loaders.modifications;
+
+import java.util.List;
+
+/**
+ * ModificationsList contains a List<Modification>
+ *
+ * @author Sanne Grinovero
+ * @since 4.1
+ */
+public class ModificationsList implements Modification {
+
+ private final List<? extends Modification> list;
+
+ public ModificationsList(List<? extends Modification> list) {
+ this.list = list;
+ }
+
+ @Override
+ public Type getType() {
+ return Modification.Type.LIST;
+ }
+
+ public List<? extends Modification> getList() {
+ return list;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((list == null) ? 0 : list.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ ModificationsList other = (ModificationsList) obj;
+ if (list == null) {
+ if (other.list != null)
+ return false;
+ } else if (!list.equals(other.list))
+ return false;
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "ModificationsList: [" + list + "]";
+ }
+
+}
Modified: branches/4.1.x/core/src/main/java/org/infinispan/loaders/modifications/Prepare.java
===================================================================
--- branches/4.1.x/core/src/main/java/org/infinispan/loaders/modifications/Prepare.java 2010-08-31 22:18:46 UTC (rev 2289)
+++ branches/4.1.x/core/src/main/java/org/infinispan/loaders/modifications/Prepare.java 2010-08-31 22:20:02 UTC (rev 2290)
@@ -80,5 +80,16 @@
result = 31 * result + (isOnePhase ? 1 : 0);
return result;
}
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Prepare:");
+ sb.append(tx);
+ sb.append(" isOnePhase:");
+ sb.append(String.valueOf(isOnePhase));
+ sb.append(";[").append(list).append("]");
+ return sb.toString();
+ }
}
Modified: branches/4.1.x/core/src/test/java/org/infinispan/loaders/FlushingAsyncStoreTest.java
===================================================================
--- branches/4.1.x/core/src/test/java/org/infinispan/loaders/FlushingAsyncStoreTest.java 2010-08-31 22:18:46 UTC (rev 2289)
+++ branches/4.1.x/core/src/test/java/org/infinispan/loaders/FlushingAsyncStoreTest.java 2010-08-31 22:20:02 UTC (rev 2290)
@@ -3,7 +3,6 @@
import java.io.IOException;
import java.sql.SQLException;
-import org.apache.commons.math.stat.inference.TestUtils;
import org.infinispan.config.CacheLoaderManagerConfig;
import org.infinispan.config.Configuration;
import org.infinispan.container.entries.InternalCacheEntry;
@@ -13,7 +12,6 @@
import org.infinispan.test.SingleCacheManagerTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.TestCacheManagerFactory;
-import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;
/**
@@ -45,7 +43,7 @@
return TestCacheManagerFactory.createCacheManager(config);
}
- @Test (timeOut = 10000)
+ @Test(timeOut = 10000)
public void writeOnStorage() throws IOException, ClassNotFoundException, SQLException, InterruptedException {
cache = cacheManager.getCache("AsyncStoreInMemory");
cache.put("key1", "value");
@@ -60,11 +58,6 @@
assert "value".equals(cache.get("key1"));
}
- @AfterClass
- public void removeStore(){
- TestUtils a;
- }
-
public static class SlowCacheStoreConfig extends DummyInMemoryCacheStore.Cfg {
public SlowCacheStoreConfig() {
setCacheLoaderClassName(SlowCacheStore.class.getName());
Modified: branches/4.1.x/core/src/test/java/org/infinispan/loaders/UnnnecessaryLoadingTest.java
===================================================================
--- branches/4.1.x/core/src/test/java/org/infinispan/loaders/UnnnecessaryLoadingTest.java 2010-08-31 22:18:46 UTC (rev 2289)
+++ branches/4.1.x/core/src/test/java/org/infinispan/loaders/UnnnecessaryLoadingTest.java 2010-08-31 22:20:02 UTC (rev 2290)
@@ -45,6 +45,7 @@
return cm;
}
+ @Test
public void testRepeatedLoads() throws CacheLoaderException {
CacheLoaderManager clm = TestingUtil.extractComponent(cache, CacheLoaderManager.class);
ChainingCacheStore ccs = (ChainingCacheStore) clm.getCacheLoader();
@@ -66,6 +67,7 @@
assert countingCS.numContains == 0 : "Expected 0, was " + countingCS.numContains;
}
+ @Test
public void testSkipCacheFlagUsage() throws CacheLoaderException {
CacheLoaderManager clm = TestingUtil.extractComponent(cache, CacheLoaderManager.class);
ChainingCacheStore ccs = (ChainingCacheStore) clm.getCacheLoader();
Modified: branches/4.1.x/core/src/test/java/org/infinispan/loaders/decorators/AsyncTest.java
===================================================================
--- branches/4.1.x/core/src/test/java/org/infinispan/loaders/decorators/AsyncTest.java 2010-08-31 22:18:46 UTC (rev 2289)
+++ branches/4.1.x/core/src/test/java/org/infinispan/loaders/decorators/AsyncTest.java 2010-08-31 22:20:02 UTC (rev 2290)
@@ -8,7 +8,6 @@
import org.infinispan.loaders.dummy.DummyInMemoryCacheStore;
import org.infinispan.loaders.modifications.Clear;
import org.infinispan.loaders.modifications.Modification;
-import org.infinispan.loaders.modifications.Prepare;
import org.infinispan.loaders.modifications.Remove;
import org.infinispan.loaders.modifications.Store;
import org.infinispan.test.AbstractInfinispanTest;
@@ -37,7 +36,7 @@
import static org.infinispan.test.TestingUtil.k;
import static org.infinispan.test.TestingUtil.v;
- at Test(groups = "unit", testName = "loaders.decorators.AsyncTest")
+ at Test(groups = "unit", testName = "loaders.decorators.AsyncTest", sequential=true)
public class AsyncTest extends AbstractInfinispanTest {
private static final Log log = LogFactory.getLog(AsyncTest.class);
AsyncStore store;
@@ -52,7 +51,7 @@
asyncConfig = new AsyncStoreConfig();
asyncConfig.setThreadPoolSize(10);
store = new AsyncStore(underlying, asyncConfig);
- dummyCfg = new DummyInMemoryCacheStore.Cfg();
+ dummyCfg = new DummyInMemoryCacheStore.Cfg("AsyncStoreTests",false);
dummyCfg.setStore(AsyncTest.class.getName());
store.init(dummyCfg, null, null);
store.start();
@@ -64,6 +63,7 @@
if (store != null) store.stop();
}
+ @Test(timeOut=10000)
public void testPutRemove() throws Exception {
final int number = 1000;
String key = "testPutRemove-k-";
@@ -72,6 +72,7 @@
doTestRemove(number, key);
}
+ @Test(timeOut=10000)
public void testPutClearPut() throws Exception {
final int number = 1000;
String key = "testPutClearPut-k-";
@@ -80,10 +81,10 @@
doTestClear(number, key);
value = "testPutClearPut-v[2]-";
doTestPut(number, key, value);
-
doTestRemove(number, key);
}
+ @Test(timeOut=10000)
public void testMultiplePutsOnSameKey() throws Exception {
final int number = 1000;
String key = "testMultiplePutsOnSameKey-k";
@@ -92,6 +93,7 @@
doTestSameKeyRemove(key);
}
+ @Test(timeOut=10000)
public void testRestrictionOnAddingToAsyncQueue() throws Exception {
store.remove("blah");
@@ -174,10 +176,8 @@
mods.add(new Remove(k1));
GlobalTransaction tx = gtf.newGlobalTransaction(null, false);
store.prepare(mods, tx, false);
- barrier.await(5, TimeUnit.SECONDS);
- assert 1 == localMods.size();
- assert localMods.entrySet().iterator().next().getKey() instanceof Prepare;
+ assert 0 == localMods.size();
assert !store.containsKey(k1);
assert !store.containsKey(k2);
@@ -185,6 +185,8 @@
barrier.await(5, TimeUnit.SECONDS);
assert store.load(k2).getValue().equals(v2);
assert !store.containsKey(k1);
+ assert 2 == localMods.size();
+ assert new Remove(k1).equals(localMods.get(k1));
} finally {
store.delegate.clear();
store.stop();
@@ -246,12 +248,12 @@
mods.add(new Remove(k1));
GlobalTransaction tx = gtf.newGlobalTransaction(null, false);
store.prepare(mods, tx, false);
- barrier.await(5, TimeUnit.SECONDS);
+ Thread.sleep(200); //verify that work is not performed until commit
assert 0 == storeCount.get();
assert 0 == removeCount.get();
assert 0 == clearCount.get();
store.commit(tx);
- barrier.await(5, TimeUnit.SECONDS);
+ barrier.await(5, TimeUnit.SECONDS); //modifications applied all at once
assert 1 == storeCount.get() : "Store count was " + storeCount.get();
assert 1 == removeCount.get();
assert 0 == clearCount.get();
@@ -267,14 +269,14 @@
mods.add(new Remove(k2));
tx = gtf.newGlobalTransaction(null, false);
store.prepare(mods, tx, false);
- barrier.await(5, TimeUnit.SECONDS);
+ Thread.sleep(200); //verify that work is not performed until commit
assert 0 == storeCount.get();
assert 0 == removeCount.get();
assert 0 == clearCount.get();
store.commit(tx);
barrier.await(5, TimeUnit.SECONDS);
assert 0 == storeCount.get() : "Store count was " + storeCount.get();
- assert 0 == removeCount.get();
+ assert 1 == removeCount.get();
assert 1 == clearCount.get();
storeCount.set(0);
@@ -288,7 +290,7 @@
mods.add(new Store(InternalEntryFactory.create(k3, v3)));
tx = gtf.newGlobalTransaction(null, false);
store.prepare(mods, tx, false);
- barrier.await(5, TimeUnit.SECONDS);
+ Thread.sleep(200);
assert 0 == storeCount.get();
assert 0 == removeCount.get();
assert 0 == clearCount.get();
@@ -306,14 +308,14 @@
mods.add(new Remove(k1));
tx = gtf.newGlobalTransaction(null, false);
store.prepare(mods, tx, false);
- barrier.await(5, TimeUnit.SECONDS);
+ Thread.sleep(200);
assert 0 == storeCount.get();
assert 0 == removeCount.get();
assert 0 == clearCount.get();
store.commit(tx);
barrier.await(5, TimeUnit.SECONDS);
assert 0 == storeCount.get() : "Store count was " + storeCount.get();
- assert 0 == removeCount.get();
+ assert 1 == removeCount.get();
assert 1 == clearCount.get();
storeCount.set(0);
@@ -324,7 +326,7 @@
mods.add(new Store(InternalEntryFactory.create(k1, v1)));
tx = gtf.newGlobalTransaction(null, false);
store.prepare(mods, tx, false);
- barrier.await(5, TimeUnit.SECONDS);
+ Thread.sleep(200);
assert 0 == storeCount.get();
assert 0 == removeCount.get();
assert 0 == clearCount.get();
@@ -341,9 +343,13 @@
}
private void doTestPut(int number, String key, String value) throws Exception {
- for (int i = 0; i < number; i++) store.store(InternalEntryFactory.create(key + i, value + i));
+ for (int i = 0; i < number; i++) {
+ InternalCacheEntry cacheEntry = InternalEntryFactory.create(key + i, value + i);
+ store.store(cacheEntry);
+ }
- TestingUtil.sleepRandom(1000);
+ store.stop();
+ store.start();
InternalCacheEntry[] entries = new InternalCacheEntry[number];
for (int i = 0; i < number; i++) {
@@ -360,7 +366,7 @@
if (entry != null) {
assert entry.getValue().equals(value + i);
} else {
- TestingUtil.sleepRandom(1000);
+ TestingUtil.sleepThread(20, "still waiting for key to appear: " + key + i);
}
}
}
@@ -368,14 +374,16 @@
}
private void doTestSameKeyPut(int number, String key, String value) throws Exception {
- for (int i = 0; i < number; i++)
+ for (int i = 0; i < number; i++) {
store.store(InternalEntryFactory.create(key, value + i));
+ }
- TestingUtil.sleepThread(5000);
+ store.stop();
+ store.start();
InternalCacheEntry entry;
boolean success = false;
for (int i = 0; i < 120; i++) {
- TestingUtil.sleepRandom(1000);
+ TestingUtil.sleepThread(20);
entry = store.load(key);
success = entry.getValue().equals(value + (number - 1));
if (success) break;
@@ -386,7 +394,8 @@
private void doTestRemove(int number, String key) throws Exception {
for (int i = 0; i < number; i++) store.remove(key + i);
- TestingUtil.sleepRandom(1000);
+ store.stop();//makes sure the store is flushed
+ store.start();
InternalCacheEntry[] entries = new InternalCacheEntry[number];
for (int i = 0; i < number; i++) {
@@ -396,8 +405,7 @@
for (int i = 0; i < number; i++) {
InternalCacheEntry entry = entries[i];
while (entry != null) {
- log.info("Entry still not null {0}", entry);
- TestingUtil.sleepRandom(1000);
+ TestingUtil.sleepThread(20, "still waiting for key to be removed: " + key + i);
entry = store.load(key + i);
}
}
@@ -407,14 +415,15 @@
store.remove(key);
InternalCacheEntry entry;
do {
- TestingUtil.sleepRandom(1000);
+ TestingUtil.sleepThread(20, "still waiting for key to be removed: " + key);
entry = store.load(key);
} while (entry != null);
}
private void doTestClear(int number, String key) throws Exception {
store.clear();
- TestingUtil.sleepRandom(1000);
+ store.stop();
+ store.start();
InternalCacheEntry[] entries = new InternalCacheEntry[number];
for (int i = 0; i < number; i++) {
@@ -424,8 +433,7 @@
for (int i = 0; i < number; i++) {
InternalCacheEntry entry = entries[i];
while (entry != null) {
- log.info("Entry still not null {0}", entry);
- TestingUtil.sleepRandom(1000);
+ TestingUtil.sleepThread(20, "still waiting for key to be removed: " + key + i);
entry = store.load(key + i);
}
}
Added: branches/4.1.x/core/src/test/java/org/infinispan/loaders/decorators/BatchAsyncCacheStoreTest.java
===================================================================
--- branches/4.1.x/core/src/test/java/org/infinispan/loaders/decorators/BatchAsyncCacheStoreTest.java (rev 0)
+++ branches/4.1.x/core/src/test/java/org/infinispan/loaders/decorators/BatchAsyncCacheStoreTest.java 2010-08-31 22:20:02 UTC (rev 2290)
@@ -0,0 +1,142 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.loaders.decorators;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.HashMap;
+
+import org.infinispan.AdvancedCache;
+import org.infinispan.config.CacheLoaderManagerConfig;
+import org.infinispan.config.Configuration;
+import org.infinispan.loaders.CacheStoreConfig;
+import org.infinispan.loaders.decorators.AsyncStoreConfig;
+import org.infinispan.loaders.file.FileCacheStoreConfig;
+import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.test.SingleCacheManagerTest;
+import org.infinispan.test.TestingUtil;
+import org.infinispan.test.fwk.TestCacheManagerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Optional;
+import org.testng.annotations.Parameters;
+import org.testng.annotations.Test;
+
+/**
+ * BatchAsyncCacheStoreTest performs some additional tests on the AsyncStore
+ * but using batches.
+ *
+ * @author Sanne Grinovero
+ * @since 4.1
+ */
+ at Test(groups = "functional", testName = "loaders.AsyncCacheStoreTest")
+public class BatchAsyncCacheStoreTest extends SingleCacheManagerTest {
+
+ private final HashMap cacheCopy = new HashMap();
+
+ public BatchAsyncCacheStoreTest() {
+ cleanup = CleanupPhase.AFTER_METHOD;
+ }
+
+ @Override
+ protected EmbeddedCacheManager createCacheManager() throws Exception {
+ Configuration configuration = new Configuration();
+ configuration.setCacheMode(Configuration.CacheMode.LOCAL);
+ configuration.setInvocationBatchingEnabled(true);
+ enableTestJdbcStorage(configuration);
+ return TestCacheManagerFactory.createCacheManager(configuration);
+ }
+
+ private void enableTestJdbcStorage(Configuration configuration) throws Exception {
+ CacheStoreConfig fileStoreConfiguration = createCacheStoreConfig();
+ AsyncStoreConfig asyncStoreConfig = new AsyncStoreConfig();
+ asyncStoreConfig.setEnabled(true);
+ asyncStoreConfig.setThreadPoolSize(1);
+ fileStoreConfiguration.setAsyncStoreConfig(asyncStoreConfig);
+ CacheLoaderManagerConfig loaderManagerConfig = configuration.getCacheLoaderManagerConfig();
+ loaderManagerConfig.setPassivation(false);
+ loaderManagerConfig.setPreload(false);
+ loaderManagerConfig.setShared(true);
+ loaderManagerConfig.addCacheLoaderConfig(fileStoreConfiguration);
+ }
+
+ @Test
+ public void sequantialOvewritingInBatches() throws IOException, ClassNotFoundException, SQLException, InterruptedException {
+ cache = cacheManager.getCache();
+ AdvancedCache<Object,Object> advancedCache = cache.getAdvancedCache();
+ for (int i = 0; i < 2000;) {
+ advancedCache.startBatch();
+ putAValue(advancedCache, i++);
+ putAValue(advancedCache, i++);
+ advancedCache.endBatch(true);
+ }
+ cacheCopy.putAll(cache);
+ cache.stop();
+ cacheManager.stop();
+ }
+
+ private void putAValue(AdvancedCache<Object, Object> advancedCache, int i) {
+ String key = "k" + (i % 13);
+ String value = "V" + i;
+ advancedCache.put(key, value);
+ }
+
+ @Test(dependsOnMethods = "sequantialOvewritingInBatches")
+ public void indexWasStored() throws IOException {
+ cache = cacheManager.getCache();
+ assert cache.isEmpty();
+ boolean failed = false;
+ for (Object key : cacheCopy.keySet()) {
+ Object expected = cacheCopy.get(key);
+ Object actual = cache.get(key);
+ if (!expected.equals(actual)) {
+ System.out.println("Failure on key '" + key.toString() + "' expected value: '" + expected + "' actual value: '" + actual + "'");
+ failed = true;
+ }
+ }
+ Assert.assertFalse(failed);
+ Assert.assertEquals(cacheCopy.keySet().size(), cache.keySet().size(), "have a different number of keys");
+ }
+
+ private String tmpDirectory;
+
+ @BeforeClass
+ @Parameters( { "basedir" })
+ protected void setUpTempDir(@Optional(value = "/tmp") String basedir) {
+ tmpDirectory = TestingUtil.tmpDirectory(basedir, this);
+ new File(tmpDirectory).mkdirs();
+ }
+
+ @AfterClass
+ protected void clearTempDir() {
+ TestingUtil.recursiveFileRemove(tmpDirectory);
+ }
+
+ protected CacheStoreConfig createCacheStoreConfig() throws Exception {
+ FileCacheStoreConfig cfg = new FileCacheStoreConfig();
+ cfg.setLocation(tmpDirectory);
+ return cfg;
+ }
+
+}
Modified: branches/4.1.x/core/src/test/java/org/infinispan/statetransfer/StateTransferFileCacheLoaderFunctionalTest.java
===================================================================
--- branches/4.1.x/core/src/test/java/org/infinispan/statetransfer/StateTransferFileCacheLoaderFunctionalTest.java 2010-08-31 22:18:46 UTC (rev 2289)
+++ branches/4.1.x/core/src/test/java/org/infinispan/statetransfer/StateTransferFileCacheLoaderFunctionalTest.java 2010-08-31 22:20:02 UTC (rev 2290)
@@ -73,10 +73,10 @@
@BeforeTest
@Parameters({"basedir"})
protected void setUpTempDir(@Optional(value = "/tmp") String basedir) {
- tmpDirectory1 = basedir + TestingUtil.TEST_PATH + File.separator + "1" + File.separator + getClass().getSimpleName();
- tmpDirectory2 = basedir + TestingUtil.TEST_PATH + File.separator + "2" + File.separator + getClass().getSimpleName();
- tmpDirectory3 = basedir + TestingUtil.TEST_PATH + File.separator + "3" + File.separator + getClass().getSimpleName();
- tmpDirectory4 = basedir + TestingUtil.TEST_PATH + File.separator + "4" + File.separator + getClass().getSimpleName();
+ tmpDirectory1 = basedir + File.separator + TestingUtil.TEST_PATH + File.separator + "1" + File.separator + getClass().getSimpleName();
+ tmpDirectory2 = basedir + File.separator + TestingUtil.TEST_PATH + File.separator + "2" + File.separator + getClass().getSimpleName();
+ tmpDirectory3 = basedir + File.separator + TestingUtil.TEST_PATH + File.separator + "3" + File.separator + getClass().getSimpleName();
+ tmpDirectory4 = basedir + File.separator + TestingUtil.TEST_PATH + File.separator + "4" + File.separator + getClass().getSimpleName();
}
@AfterMethod(alwaysRun = true)
Modified: branches/4.1.x/core/src/test/java/org/infinispan/test/TestingUtil.java
===================================================================
--- branches/4.1.x/core/src/test/java/org/infinispan/test/TestingUtil.java 2010-08-31 22:18:46 UTC (rev 2289)
+++ branches/4.1.x/core/src/test/java/org/infinispan/test/TestingUtil.java 2010-08-31 22:20:02 UTC (rev 2290)
@@ -373,10 +373,16 @@
* @param sleeptime number of ms to sleep
*/
public static void sleepThread(long sleeptime) {
+ sleepThread(sleeptime, null);
+ }
+
+ public static void sleepThread(long sleeptime, String messageOnInterrupt) {
try {
Thread.sleep(sleeptime);
}
catch (InterruptedException ie) {
+ if (messageOnInterrupt != null)
+ log.error(messageOnInterrupt);
}
}
More information about the infinispan-commits
mailing list