Author: jason.greene(a)jboss.com
Date: 2009-02-11 01:00:34 -0500 (Wed, 11 Feb 2009)
New Revision: 7678
Added:
core/trunk/src/main/java/org/jboss/cache/marshall/ExtendedResponse.java
core/trunk/src/main/java/org/jboss/cache/marshall/RequestIgnoredResponse.java
Modified:
core/trunk/src/main/java/org/jboss/cache/RPCManager.java
core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
core/trunk/src/main/java/org/jboss/cache/factories/ComponentRegistry.java
core/trunk/src/main/java/org/jboss/cache/marshall/CacheMarshaller200.java
core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java
core/trunk/src/main/java/org/jboss/cache/marshall/InactiveRegionAwareRpcDispatcher.java
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferGenerator.java
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java
core/trunk/src/main/java/org/jboss/cache/transaction/TransactionLog.java
core/trunk/src/test/java/org/jboss/cache/statetransfer/NonBlockingStateTransferTest.java
core/trunk/src/test/java/org/jboss/cache/transaction/PrepareCommitContentionTest.java
Log:
Eliminate known race conditions in NBST
Modified: core/trunk/src/main/java/org/jboss/cache/RPCManager.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/RPCManager.java 2009-02-10 20:39:38 UTC (rev
7677)
+++ core/trunk/src/main/java/org/jboss/cache/RPCManager.java 2009-02-11 06:00:34 UTC (rev
7678)
@@ -21,16 +21,16 @@
*/
package org.jboss.cache;
+import java.util.List;
+import java.util.Vector;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.jboss.cache.RPCManagerImpl.FlushTracker;
import org.jboss.cache.commands.ReplicableCommand;
-import org.jboss.cache.lock.TimeoutException;
import org.jgroups.Address;
import org.jgroups.Channel;
import org.jgroups.blocks.RspFilter;
-import java.util.List;
-import java.util.Vector;
-import java.util.concurrent.TimeUnit;
-
/**
* Provides a mechanism for communicating with other caches in the cluster. For now this
is based on JGroups as an underlying
* transport, and in future more transport options may become available.
@@ -153,7 +153,17 @@
*/
Channel getChannel();
- public void waitForFlush(long timeout);
+ /**
+ * Returns the last state transfer source address.
+ *
+ * @return the last state transfer source address
+ */
+ public Address getLastStateTransferSource();
- public Address getLastStateTransferSource();
+ /**
+ * Returns the flush tracker associated with this manager.
+ *
+ * @return the current flush tracker
+ */
+ public FlushTracker getFlushTracker();
}
Modified: core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java 2009-02-10 20:39:38 UTC
(rev 7677)
+++ core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java 2009-02-11 06:00:34 UTC
(rev 7678)
@@ -21,12 +21,29 @@
*/
package org.jboss.cache;
+import java.net.URL;
+import java.text.NumberFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.Vector;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+
+import javax.transaction.TransactionManager;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.cache.commands.ReplicableCommand;
import org.jboss.cache.config.Configuration;
+import org.jboss.cache.config.RuntimeConfig;
import org.jboss.cache.config.Configuration.NodeLockingScheme;
-import org.jboss.cache.config.RuntimeConfig;
import org.jboss.cache.factories.ComponentRegistry;
import org.jboss.cache.factories.annotations.Inject;
import org.jboss.cache.factories.annotations.Start;
@@ -65,18 +82,6 @@
import org.jgroups.util.Rsp;
import org.jgroups.util.RspList;
-import javax.transaction.TransactionManager;
-import java.net.URL;
-import java.text.NumberFormat;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-import java.util.Vector;
-import java.util.concurrent.TimeUnit;
-
/**
* Manager that handles all RPC calls between JBoss Cache instances
*
@@ -97,17 +102,8 @@
* True if this Cache is the coordinator.
*/
private volatile boolean coordinator = false;
- /**
- * Thread gate used to block Dispatcher during JGroups FLUSH protocol
- */
- private final ReclosableLatch flushBlockGate = new ReclosableLatch();
/**
- * Thread gate used by NBST to wait for a flush
- */
- private final ReclosableLatch flushWaitGate = new ReclosableLatch(false);
-
- /**
* The most recent state transfer source
*/
volatile Address lastStateTransferSource;
@@ -135,7 +131,9 @@
private volatile boolean isInLocalMode;
private ComponentRegistry componentRegistry;
private LockManager lockManager;
+ private FlushTracker flushTracker = new FlushTracker();
+
@Inject
public void setupDependencies(ChannelMessageListener messageListener, Configuration
configuration, Notifier notifier,
CacheSPI spi, Marshaller marshaller, TransactionTable
txTable,
@@ -155,6 +153,129 @@
this.lockManager = lockManager;
}
+ public class FlushTracker
+ {
+ private final ReclosableLatch flushBlockGate = new ReclosableLatch();
+ private final AtomicInteger flushCompletionCount = new AtomicInteger();
+ private final ReentrantReadWriteLock coordinationLock = new
ReentrantReadWriteLock();
+ private final ReclosableLatch flushWaitGate = new ReclosableLatch(false);
+
+ public void block()
+ {
+ flushBlockGate.close();
+ flushWaitGate.open();
+ }
+
+ public void unblock()
+ {
+ flushWaitGate.close();
+ flushCompletionCount.incrementAndGet();
+ flushBlockGate.open();
+ }
+
+ public int getFlushCompletionCount()
+ {
+ return flushCompletionCount.get();
+ }
+
+ public void lockProcessingLock()
+ {
+ if (! configuration.isNonBlockingStateTransfer())
+ return;
+
+ for (;;)
+ {
+ try
+ {
+ if
(!coordinationLock.readLock().tryLock(configuration.getStateRetrievalTimeout(),
TimeUnit.MILLISECONDS))
+ throw new TimeoutException("Could not obtain processing
lock");
+
+ return;
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ public void unlockProcessingLock()
+ {
+ if (! configuration.isNonBlockingStateTransfer())
+ return;
+
+ coordinationLock.readLock().unlock();
+ }
+
+ public void lockSuspendProcessingLock()
+ {
+ if (! configuration.isNonBlockingStateTransfer())
+ return;
+
+ for (;;)
+ {
+ try
+ {
+ if
(!coordinationLock.writeLock().tryLock(configuration.getStateRetrievalTimeout(),
TimeUnit.MILLISECONDS))
+ throw new TimeoutException("Could not obtain processing
lock");
+
+ return;
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ public void unlockSuspendProcessingLock()
+ {
+ if (! configuration.isNonBlockingStateTransfer())
+ return;
+
+ if (coordinationLock.isWriteLockedByCurrentThread())
+ coordinationLock.writeLock().unlock();
+ }
+
+ public void waitForFlushCompletion(long timeout)
+ {
+ for (; ;)
+ {
+ try
+ {
+ if (channel.flushSupported() && !flushBlockGate.await(timeout,
TimeUnit.MILLISECONDS))
+ {
+ throw new TimeoutException("State retrieval timed out waiting for
flush to block. (timeout = " + timeout + " millis) ");
+ }
+ return;
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ public void waitForFlushStart(long timeout)
+ {
+ for (; ;)
+ {
+ try
+ {
+ if (channel.flushSupported() && !flushWaitGate.await(timeout,
TimeUnit.MILLISECONDS))
+ {
+ throw new TimeoutException("State retrieval timed out waiting for
flush to block. (timeout = " + timeout + " millis) ");
+ }
+ return;
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ }
+
// ------------ START: Lifecycle methods ------------
@Start(priority = 15)
@@ -243,9 +364,9 @@
}
}
- if (log.isDebugEnabled())
+ if (log.isInfoEnabled())
{
- log.debug("state was retrieved successfully (in " +
(System.currentTimeMillis() - start) + " milliseconds)");
+ log.info("state was retrieved successfully (in " +
(System.currentTimeMillis() - start) + " milliseconds)");
}
}
@@ -273,9 +394,9 @@
try
{
- if (log.isTraceEnabled())
+ if (log.isInfoEnabled())
{
- log.trace("Trying to fetch state from: " + member);
+ log.info("Trying to fetch state from: " + member);
}
if (getState(null, member))
{
@@ -286,18 +407,18 @@
}
catch (Exception e)
{
- if (log.isTraceEnabled())
+ if (log.isDebugEnabled())
{
- log.trace("Error while fetching state", e);
+ log.debug("Error while fetching state", e);
}
}
}
if (!success)
{
- if (trace)
+ if (log.isWarnEnabled())
{
- log.trace("Could not find available peer for state, backing off and
retrying");
+ log.warn("Could not find available peer for state, backing off and
retrying");
}
try
@@ -428,12 +549,12 @@
if (configuration.isUseRegionBasedMarshalling())
{
rpcDispatcher = new InactiveRegionAwareRpcDispatcher(channel, messageListener,
new MembershipListenerAdaptor(),
- spi, invocationContextContainer, interceptorChain, componentRegistry,
flushBlockGate);
+ spi, invocationContextContainer, interceptorChain, componentRegistry,
this);
}
else
{
rpcDispatcher = new CommandAwareRpcDispatcher(channel, messageListener, new
MembershipListenerAdaptor(),
- invocationContextContainer, invocationContextContainer, interceptorChain,
componentRegistry, flushBlockGate);
+ invocationContextContainer, invocationContextContainer, interceptorChain,
componentRegistry, this);
}
checkAppropriateConfig();
rpcDispatcher.setRequestMarshaller(marshaller);
@@ -552,6 +673,7 @@
public List<Object> callRemoteMethods(Vector<Address> recipients,
ReplicableCommand command, int mode, long timeout, RspFilter responseFilter, boolean
useOutOfBandMessage) throws Exception
{
boolean success = true;
+ boolean unlock = false;
try
{
// short circuit if we don't have an RpcDispatcher!
@@ -566,10 +688,11 @@
{
log.trace("callRemoteMethods(): valid members are " + recipients +
" methods: " + command + " Using OOB? " + useOutOfBandMessage + "
modeToUse: " + modeToUse);
}
- if (channel.flushSupported() &&
!flushBlockGate.await(configuration.getStateRetrievalTimeout(), TimeUnit.MILLISECONDS))
- {
- throw new TimeoutException("State retrieval timed out waiting for flush
unblock. (timeout = " + configuration.getStateRetrievalTimeout() + " millis)
");
- }
+
+ flushTracker.lockProcessingLock();
+ unlock = true;
+ flushTracker.waitForFlushCompletion(configuration.getStateRetrievalTimeout());
+
useOutOfBandMessage = false;
RspList rsps = rpcDispatcher.invokeRemoteCommands(recipients, command,
modeToUse, timeout, isUsingBuddyReplication, useOutOfBandMessage, responseFilter);
if (mode == GroupRequest.GET_NONE) return Collections.emptyList();// async case
@@ -619,6 +742,8 @@
finally
{
computeStats(success);
+ if (unlock)
+ flushTracker.unlockProcessingLock();
}
}
@@ -722,24 +847,6 @@
return ((JChannel) channel).getState(target, stateId,
configuration.getStateRetrievalTimeout(), !configuration.isNonBlockingStateTransfer());
}
- public void waitForFlush(long timeout)
- {
- for (; ;)
- {
- try
- {
- if (channel.flushSupported() && !flushWaitGate.await(timeout,
TimeUnit.MILLISECONDS))
- {
- throw new TimeoutException("State retrieval timed out waiting for
flush to block. (timeout = " + timeout + " millis) ");
- }
- return;
- }
- catch (InterruptedException e)
- {
- Thread.currentThread().interrupt();
- }
- }
- }
// ------------ END: Partial state transfer methods ------------
@@ -895,9 +1002,9 @@
{
try
{
- flushBlockGate.close();
- flushWaitGate.open();
if (log.isDebugEnabled()) log.debug("Block received at " +
getLocalAddress());
+
+ flushTracker.block();
notifier.notifyCacheBlocked(true);
notifier.notifyCacheBlocked(false);
@@ -917,14 +1024,13 @@
{
try
{
- flushWaitGate.close();
if (log.isDebugEnabled()) log.debug("UnBlock received at " +
getLocalAddress());
notifier.notifyCacheUnblocked(true);
notifier.notifyCacheUnblocked(false);
+ flushTracker.unblock();
if (log.isDebugEnabled()) log.debug("UnBlock processed at " +
getLocalAddress());
- flushBlockGate.open();
}
catch (Throwable e)
{
@@ -1034,4 +1140,9 @@
}
}
}
+
+ public FlushTracker getFlushTracker()
+ {
+ return flushTracker;
+ }
}
Modified: core/trunk/src/main/java/org/jboss/cache/factories/ComponentRegistry.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/factories/ComponentRegistry.java 2009-02-10
20:39:38 UTC (rev 7677)
+++ core/trunk/src/main/java/org/jboss/cache/factories/ComponentRegistry.java 2009-02-11
06:00:34 UTC (rev 7678)
@@ -93,7 +93,7 @@
// component and method containers
final Map<String, Component> componentLookup = new HashMap<String,
Component>();
- CacheStatus state = CacheStatus.INSTANTIATED;
+ volatile CacheStatus state = CacheStatus.INSTANTIATED;
/**
* Hook to shut down the cache when the JVM exits.
@@ -899,10 +899,11 @@
Thread.currentThread().interrupt();
}
}
- else
+ else if (blockInStarting)
{
log.warn("Received a remote call but the cache is not in STARTED state -
ignoring call.");
}
+
return false;
}
Modified: core/trunk/src/main/java/org/jboss/cache/marshall/CacheMarshaller200.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/CacheMarshaller200.java 2009-02-10
20:39:38 UTC (rev 7677)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/CacheMarshaller200.java 2009-02-11
06:00:34 UTC (rev 7678)
@@ -38,6 +38,7 @@
import java.util.TreeMap;
import java.util.TreeSet;
+import org.apache.commons.httpclient.cookie.IgnoreCookiesSpec;
import org.jboss.cache.CacheException;
import org.jboss.cache.Fqn;
import org.jboss.cache.Region;
@@ -93,6 +94,8 @@
protected static final int MAGICNUMBER_DOUBLE = 28;
protected static final int MAGICNUMBER_OBJECT = 29;
protected static final int MAGICNUMBER_TXLOG_ENTRY = 50;
+ protected static final int MAGICNUMBER_REQUEST_IGNORED_RESPONSE = 51;
+ protected static final int MAGICNUMBER_EXTENDED_RESPONSE = 52;
protected static final int MAGICNUMBER_NULL = 99;
protected static final int MAGICNUMBER_SERIALIZABLE = 100;
protected static final int MAGICNUMBER_REF = 101;
@@ -448,6 +451,15 @@
out.writeByte(MAGICNUMBER_GRAVITATERESULT);
marshallGravitateResult((GravitateResult) o, out, refMap);
}
+ else if (o instanceof RequestIgnoredResponse)
+ {
+ out.writeByte(MAGICNUMBER_REQUEST_IGNORED_RESPONSE);
+ }
+ else if (o instanceof ExtendedResponse)
+ {
+ out.writeByte(MAGICNUMBER_EXTENDED_RESPONSE);
+ marshallExtendedResponse((ExtendedResponse)o, out, refMap);
+ }
else if (o instanceof Serializable)
{
if (trace)
@@ -464,6 +476,12 @@
}
}
+ private void marshallExtendedResponse(ExtendedResponse response, ObjectOutputStream
out, Map<Object, Integer> refMap) throws Exception
+ {
+ out.writeBoolean(response.isReplayIgnoredRequests());
+ marshallObject(response.getResponse(), out, refMap);
+ }
+
private void marshallLogEntry(LogEntry log, ObjectOutputStream out, Map<Object,
Integer> refMap) throws Exception
{
marshallObject(log.getTransaction(), out, refMap);
@@ -673,6 +691,10 @@
return retVal;
case MAGICNUMBER_GRAVITATERESULT:
return unmarshallGravitateResult(in, refMap);
+ case MAGICNUMBER_REQUEST_IGNORED_RESPONSE:
+ return new RequestIgnoredResponse();
+ case MAGICNUMBER_EXTENDED_RESPONSE:
+ return unmarshallExtendedResponse(in, refMap);
default:
if (log.isErrorEnabled())
{
@@ -683,8 +705,17 @@
throw new Exception("Unknown magic number " + magicNumber);
}
+ private ExtendedResponse unmarshallExtendedResponse(ObjectInputStream in,
UnmarshalledReferences refMap) throws Exception
+ {
+ boolean replayIgnoredRequests = in.readBoolean();
+ ExtendedResponse response = new ExtendedResponse(unmarshallObject(in, refMap));
+ response.setReplayIgnoredRequests(replayIgnoredRequests);
+
+ return response;
+ }
+
@SuppressWarnings("unchecked")
- private Object unmarshallLogEntry(ObjectInputStream in, UnmarshalledReferences refMap)
throws Exception
+ private LogEntry unmarshallLogEntry(ObjectInputStream in, UnmarshalledReferences
refMap) throws Exception
{
GlobalTransaction gtx = (GlobalTransaction)unmarshallObject(in, refMap);
List<WriteCommand> mods = (List<WriteCommand>)unmarshallObject(in,
refMap);
Modified:
core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java 2009-02-10
20:39:38 UTC (rev 7677)
+++
core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java 2009-02-11
06:00:34 UTC (rev 7678)
@@ -22,16 +22,21 @@
package org.jboss.cache.marshall;
import java.io.NotSerializableException;
+import java.util.Map;
import java.util.Vector;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.jboss.cache.InvocationContext;
+import org.jboss.cache.RPCManager;
+import org.jboss.cache.RPCManagerImpl.FlushTracker;
import org.jboss.cache.commands.ReplicableCommand;
import org.jboss.cache.commands.VisitableCommand;
import org.jboss.cache.commands.remote.AnnounceBuddyPoolNameCommand;
@@ -50,6 +55,7 @@
import org.jgroups.MembershipListener;
import org.jgroups.Message;
import org.jgroups.MessageListener;
+import org.jgroups.blocks.GroupRequest;
import org.jgroups.blocks.RpcDispatcher;
import org.jgroups.blocks.RspFilter;
import org.jgroups.util.Buffer;
@@ -72,20 +78,20 @@
private AtomicInteger replicationProcessorCount;
private boolean asyncSerial;
private Configuration configuration;
- private ReclosableLatch flushGate;
+ private RPCManager rpcManager;
private ReplicationObserver replicationObserver;
public CommandAwareRpcDispatcher() {}
public CommandAwareRpcDispatcher(Channel channel, MessageListener l,
MembershipListener l2,
Object serverObj, InvocationContextContainer
container, InterceptorChain interceptorChain,
- ComponentRegistry componentRegistry, ReclosableLatch
flushGate)
+ ComponentRegistry componentRegistry, RPCManager
manager)
{
super(channel, l, l2, serverObj);
this.invocationContextContainer = container;
this.componentRegistry = componentRegistry;
this.interceptorChain = interceptorChain;
- this.flushGate = flushGate;
+ this.rpcManager = manager;
trace = log.isTraceEnabled();
@@ -198,7 +204,8 @@
log.trace(new StringBuilder("dests=").append(dests).append(",
command=").append(command).
append(", mode=").append(mode).append(",
timeout=").append(timeout));
- ReplicationTask replicationTask = new ReplicationTask(command, oob, dests, mode,
timeout, anycasting, filter);
+ boolean supportReplay = configuration.isNonBlockingStateTransfer();
+ ReplicationTask replicationTask = new ReplicationTask(command, oob, dests, mode,
timeout, anycasting, supportReplay, filter);
Future<RspList> response = replicationProcessor.submit(replicationTask);
if (asyncSerial)
{
@@ -253,25 +260,41 @@
protected Object executeCommand(ReplicableCommand cmd, Message req) throws Throwable
{
+ boolean unlock = false;
+ FlushTracker flushTracker = rpcManager.getFlushTracker();
+
try
{
if (cmd == null) throw new NullPointerException("Unable to execute a null
command! Message was " + req);
if (trace) log.trace("Executing command: " + cmd + "
[sender=" + req.getSrc() + "]");
- if (channel.flushSupported() &&
!flushGate.await(configuration.getStateRetrievalTimeout(), TimeUnit.MILLISECONDS))
+ boolean replayIgnored = false;
+
+
+ if (configuration.isNonBlockingStateTransfer())
{
- throw new TimeoutException("State retrieval timed out waiting for flush
unblock. (timeout = " + configuration.getStateRetrievalTimeout() + " millis)
");
+ int flushCount = flushTracker.getFlushCompletionCount();
+ flushTracker.lockProcessingLock();
+ unlock = true;
+
+
flushTracker.waitForFlushCompletion(configuration.getStateRetrievalTimeout());
+
+ // If this thread blocked during a NBST flush, then inform the sender
+ // it needs to replay ignored messages
+ replayIgnored = flushTracker.getFlushCompletionCount() != flushCount;
}
+ Object ret;
+
if (cmd instanceof VisitableCommand)
{
InvocationContext ctx = invocationContextContainer.get();
ctx.setOriginLocal(false);
if (!componentRegistry.invocationsAllowed(false))
{
- return null;
+ return new RequestIgnoredResponse();
}
- return interceptorChain.invoke(ctx, (VisitableCommand) cmd);
+ ret = interceptorChain.invoke(ctx, (VisitableCommand) cmd);
}
else
{
@@ -283,15 +306,27 @@
cmd instanceof RemoveFromBuddyGroupCommand)
&& !componentRegistry.invocationsAllowed(false))
{
- return null;
+ return new RequestIgnoredResponse();
}
- return cmd.perform(null);
+ ret = cmd.perform(null);
}
+
+ if (replayIgnored)
+ {
+ ExtendedResponse extended = new ExtendedResponse(ret);
+ extended.setReplayIgnoredRequests(true);
+ ret = extended;
+ }
+
+ return ret;
}
finally
{
if (replicationObserver != null)
replicationObserver.afterExecutingCommand(cmd);
+
+ if (unlock)
+ flushTracker.unlockProcessingLock();
}
}
@@ -309,9 +344,10 @@
private int mode;
private long timeout;
private boolean anycasting;
+ private boolean supportReplay;
private RspFilter filter;
- private ReplicationTask(ReplicableCommand command, boolean oob,
Vector<Address> dests, int mode, long timeout, boolean anycasting, RspFilter
filter)
+ private ReplicationTask(ReplicableCommand command, boolean oob,
Vector<Address> dests, int mode, long timeout, boolean anycasting, boolean
supportReplay, RspFilter filter)
{
this.command = command;
this.oob = oob;
@@ -319,6 +355,7 @@
this.mode = mode;
this.timeout = timeout;
this.anycasting = anycasting;
+ this.supportReplay = supportReplay;
this.filter = filter;
}
@@ -338,6 +375,9 @@
Message msg = new Message();
msg.setBuffer(buf);
if (oob) msg.setFlag(Message.OOB);
+
+ // Replay capability requires responses from all members!
+ int mode = supportReplay ? GroupRequest.GET_ALL : this.mode;
RspList retval = castMessage(dests, msg, mode, timeout, anycasting, filter);
if (trace) log.trace("responses: " + retval);
@@ -347,6 +387,36 @@
if (retval == null)
throw new NotSerializableException("RpcDispatcher returned a null. This
is most often caused by args for " + command.getClass().getSimpleName() + " not
being serializable.");
+
+ if (supportReplay)
+ {
+ boolean replay = false;
+ Vector<Address> ignorers = new Vector<Address>();
+ for (Map.Entry<Address, Rsp> entry : retval.entrySet())
+ {
+ Object value = entry.getValue().getValue();
+ if (value instanceof RequestIgnoredResponse)
+ {
+ ignorers.add(entry.getKey());
+ }
+ else if (value instanceof ExtendedResponse)
+ {
+ ExtendedResponse extended = (ExtendedResponse) value;
+ replay |= extended.isReplayIgnoredRequests();
+ entry.getValue().setValue(extended.getResponse());
+ }
+ }
+
+ if (replay && ignorers.size() > 0)
+ {
+ if (trace)
+ log.trace("Replaying message to ignoring senders: " +
ignorers);
+ RspList responses = castMessage(ignorers, msg, GroupRequest.GET_ALL,
timeout, anycasting, filter);
+ if (responses != null)
+ retval.putAll(responses);
+ }
+ }
+
return retval;
}
}
Added: core/trunk/src/main/java/org/jboss/cache/marshall/ExtendedResponse.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/ExtendedResponse.java
(rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/ExtendedResponse.java 2009-02-11
06:00:34 UTC (rev 7678)
@@ -0,0 +1,53 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2005, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+*/
+package org.jboss.cache.marshall;
+
+/**
+ * A response with extended information
+ *
+ * @author Jason T. Greene
+ */
+public class ExtendedResponse
+{
+ private boolean replayIgnoredRequests;
+ private final Object response;
+
+ public ExtendedResponse(Object response)
+ {
+ this.response = response;
+ }
+
+ public boolean isReplayIgnoredRequests()
+ {
+ return replayIgnoredRequests;
+ }
+
+ public void setReplayIgnoredRequests(boolean replayIgnoredRequests)
+ {
+ this.replayIgnoredRequests = replayIgnoredRequests;
+ }
+
+ public Object getResponse()
+ {
+ return response;
+ }
+}
Property changes on:
core/trunk/src/main/java/org/jboss/cache/marshall/ExtendedResponse.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Modified:
core/trunk/src/main/java/org/jboss/cache/marshall/InactiveRegionAwareRpcDispatcher.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/marshall/InactiveRegionAwareRpcDispatcher.java 2009-02-10
20:39:38 UTC (rev 7677)
+++
core/trunk/src/main/java/org/jboss/cache/marshall/InactiveRegionAwareRpcDispatcher.java 2009-02-11
06:00:34 UTC (rev 7678)
@@ -21,6 +21,7 @@
*/
package org.jboss.cache.marshall;
+import org.jboss.cache.RPCManager;
import org.jboss.cache.commands.ReplicableCommand;
import org.jboss.cache.factories.ComponentRegistry;
import org.jboss.cache.interceptors.InterceptorChain;
@@ -47,9 +48,9 @@
*/
public InactiveRegionAwareRpcDispatcher(Channel channel, MessageListener l,
MembershipListener l2, Object serverObj,
InvocationContextContainer container,
InterceptorChain interceptorChain,
- ComponentRegistry componentRegistry,
ReclosableLatch flushBlockGate)
+ ComponentRegistry componentRegistry,
RPCManager manager)
{
- super(channel, l, l2, serverObj, container, interceptorChain, componentRegistry,
flushBlockGate);
+ super(channel, l, l2, serverObj, container, interceptorChain, componentRegistry,
manager);
}
@Override
Added: core/trunk/src/main/java/org/jboss/cache/marshall/RequestIgnoredResponse.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/RequestIgnoredResponse.java
(rev 0)
+++
core/trunk/src/main/java/org/jboss/cache/marshall/RequestIgnoredResponse.java 2009-02-11
06:00:34 UTC (rev 7678)
@@ -0,0 +1,32 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2005, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+*/
+package org.jboss.cache.marshall;
+
+/**
+ * Indicates that the request was ignored,
+ *
+ * @author Jason T. Greene
+ */
+public class RequestIgnoredResponse
+{
+
+}
Property changes on:
core/trunk/src/main/java/org/jboss/cache/marshall/RequestIgnoredResponse.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Modified:
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferGenerator.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferGenerator.java 2009-02-10
20:39:38 UTC (rev 7677)
+++
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferGenerator.java 2009-02-11
06:00:34 UTC (rev 7678)
@@ -26,6 +26,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -36,6 +37,7 @@
import org.jboss.cache.Node;
import org.jboss.cache.RPCManager;
import org.jboss.cache.Version;
+import org.jboss.cache.RPCManagerImpl.FlushTracker;
import org.jboss.cache.config.Configuration;
import org.jboss.cache.factories.annotations.Inject;
import org.jboss.cache.factories.annotations.Start;
@@ -136,68 +138,93 @@
CacheLoader cacheLoader = cache.getCacheLoaderManager() == null ? null :
cache.getCacheLoaderManager().getCacheLoader();
if (cacheLoader != null && generatePersistent)
{
- if (log.isTraceEnabled())
- {
- log.trace("writing persistent state for " + fqn + ",using
" + cache.getCacheLoaderManager().getCacheLoader().getClass());
- }
-
- if (fqn.isRoot())
- {
- cacheLoader.loadEntireState(out);
- }
- else
- {
- cacheLoader.loadState(fqn, out);
- }
-
- if (log.isTraceEnabled())
- {
- log.trace("persistent state succesfully written");
- }
+ writePersistentData(out, fqn, cacheLoader);
}
delimitStream(out);
if (nonBlocking && generateTransient)
{
- for (int nonProgress = 0, size = txLog.size(); size > 0;)
- {
- if (log.isTraceEnabled())
- log.trace("Tx Log remaining entries = " + size);
- txLog.writeCommitLog(cache.getMarshaller(), out);
- int newSize = txLog.size();
+ writeTxLog(out);
+ }
- // If size did not decrease then we did not make progress, and could be
wasting
- // our time. Limit this to the specified max.
- if (newSize >= size && ++nonProgress >=
maxNonProgressingLogWrites)
- break;
+ }
+ catch (Exception e)
+ {
+ cache.getMarshaller().objectToObjectStream(new NodeDataExceptionMarker(e,
cache.getLocalAddress()), out);
+ throw e;
+ }
+ finally
+ {
+ if (nonBlocking)
+ txLog.deactivate();
+ }
+ }
- size = newSize;
- }
+ private void writePersistentData(ObjectOutputStream out, Fqn fqn, CacheLoader
cacheLoader) throws Exception
+ {
+ if (log.isTraceEnabled())
+ {
+ log.trace("writing persistent state for " + fqn + ",using "
+ cache.getCacheLoaderManager().getCacheLoader().getClass());
+ }
- // Signal to sender that we need a flush to get a consistent view
- // of the remaining transactions.
- delimitStream(out);
- out.flush();
- rpcManager.waitForFlush(flushTimeout);
+ if (fqn.isRoot())
+ {
+ cacheLoader.loadEntireState(out);
+ }
+ else
+ {
+ cacheLoader.loadState(fqn, out);
+ }
- // Write remaining transactions
+ if (log.isTraceEnabled())
+ {
+ log.trace("persistent state succesfully written");
+ }
+ }
+
+ private void writeTxLog(ObjectOutputStream out) throws Exception
+ {
+ FlushTracker flushTracker = rpcManager.getFlushTracker();
+
+ try
+ {
+ for (int nonProgress = 0, size = txLog.size(); size > 0;)
+ {
+ if (log.isTraceEnabled())
+ log.trace("Tx Log remaining entries = " + size);
txLog.writeCommitLog(cache.getMarshaller(), out);
- delimitStream(out);
+ int newSize = txLog.size();
- // Write all non-completed prepares
- txLog.writePendingPrepares(cache.getMarshaller(), out);
- delimitStream(out);
+ // If size did not decrease then we did not make progress, and could be
wasting
+ // our time. Limit this to the specified max.
+ if (newSize >= size && ++nonProgress >=
maxNonProgressingLogWrites)
+ break;
+
+ size = newSize;
}
+ // Wait on incoming and outgoing threads to line-up in front of
+ // the flush gate.
+ flushTracker.lockSuspendProcessingLock();
+
+ // Signal to sender that we need a flush to get a consistent view
+ // of the remaining transactions.
+ delimitStream(out);
+ out.flush();
+ flushTracker.waitForFlushStart(flushTimeout);
+
+ // Write remaining transactions
+ txLog.writeCommitLog(cache.getMarshaller(), out);
+ delimitStream(out);
+
+ // Write all non-completed prepares
+ txLog.writePendingPrepares(cache.getMarshaller(), out);
+ delimitStream(out);
+ out.flush();
}
- catch (Exception e)
- {
- cache.getMarshaller().objectToObjectStream(new NodeDataExceptionMarker(e,
cache.getLocalAddress()), out);
- throw e;
- }
finally
{
- txLog.deactivate();
+ flushTracker.unlockSuspendProcessingLock();
}
}
Modified:
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java 2009-02-10
20:39:38 UTC (rev 7677)
+++
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java 2009-02-11
06:00:34 UTC (rev 7678)
@@ -230,6 +230,10 @@
notifyAllNodesCreated(cache.getInvocationContext(), target);
}
+ catch (CacheException ce)
+ {
+ throw ce;
+ }
catch (Exception e)
{
throw new CacheException(e);
@@ -387,6 +391,14 @@
private List<NodeData> readNodesAsList(ObjectInputStream in) throws Exception
{
Object obj = cache.getMarshaller().objectFromObjectStream(in);
+ if (obj instanceof NodeDataExceptionMarker)
+ {
+ Throwable cause = ((NodeDataExceptionMarker)obj).getCause();
+ if (cause instanceof Exception)
+ throw (Exception) cause;
+
+ throw new CacheException(cause);
+ }
if (obj instanceof NodeDataMarker) return null;
return (List<NodeData>) obj;
Modified: core/trunk/src/main/java/org/jboss/cache/transaction/TransactionLog.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/transaction/TransactionLog.java 2009-02-10
20:39:38 UTC (rev 7677)
+++ core/trunk/src/main/java/org/jboss/cache/transaction/TransactionLog.java 2009-02-11
06:00:34 UTC (rev 7678)
@@ -69,7 +69,7 @@
}
}
- private Log log = LogFactory.getLog(getClass().getName());
+ private static Log log = LogFactory.getLog(TransactionLog.class);
public void logPrepare(PrepareCommand command)
{
@@ -145,6 +145,8 @@
public void deactivate()
{
active.set(false);
+ if (entries.size() > 0)
+ log.error("Unprocessed Transaction Log Entries! = " +
entries.size());
entries.clear();
}
Modified:
core/trunk/src/test/java/org/jboss/cache/statetransfer/NonBlockingStateTransferTest.java
===================================================================
---
core/trunk/src/test/java/org/jboss/cache/statetransfer/NonBlockingStateTransferTest.java 2009-02-10
20:39:38 UTC (rev 7677)
+++
core/trunk/src/test/java/org/jboss/cache/statetransfer/NonBlockingStateTransferTest.java 2009-02-11
06:00:34 UTC (rev 7678)
@@ -13,11 +13,10 @@
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.List;
-import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.jboss.cache.Cache;
import org.jboss.cache.CacheSPI;
import org.jboss.cache.Fqn;
@@ -27,7 +26,6 @@
import org.jboss.cache.factories.UnitTestConfigurationFactory;
import org.jboss.cache.util.TestingUtil;
import org.testng.annotations.Test;
-import org.testng.annotations.AfterMethod;
@Test(groups="functional", testName =
"statetransfer.NonBlockingStateTransferTest")
public class NonBlockingStateTransferTest
@@ -46,17 +44,9 @@
public static final Integer TWENTY = 20;
public static final Integer FORTY = 40;
- private List<Cache> createdCaches = new ArrayList<Cache>();
+ private volatile int testCount = 0;
- @AfterMethod
- public void clearCaches()
- {
- for (Cache c : createdCaches)
- {
- TestingUtil.killCaches(c);
- }
- createdCaches.clear();
- }
+ private static final Log log = LogFactory.getLog(NonBlockingStateTransferTest.class);
public static class DelayTransfer implements Serializable
{
@@ -121,6 +111,8 @@
}
catch (Exception e)
{
+ e.printStackTrace();
+ log.error(e);
}
}
result = c;
@@ -134,84 +126,170 @@
private CacheSPI<Object, Object> createCache(String name)
{
+ return createCache(name, true);
+
+ }
+
+ private CacheSPI<Object, Object> createCache(String name, boolean start)
+ {
Configuration config =
UnitTestConfigurationFactory.createConfiguration(CacheMode.REPL_SYNC);
+ config.setSyncCommitPhase(true);
config.setClusterName(name + "-" + Thread.currentThread().getName());
config.setNonBlockingStateTransfer(true);
+ config.setSyncReplTimeout(30000);
CacheSPI<Object, Object> cache = (CacheSPI<Object, Object>) new
UnitTestCacheFactory<Object, Object>().createCache(config, false, getClass());
- // Use marshaller
-
cache.create();
- cache.start();
- createdCaches.add(cache);
+ if (start)
+ cache.start();
return cache;
-
}
public void testInitialStateTransfer() throws Exception
{
- CacheSPI<Object, Object> cache1 = createCache("nbst");
+ testCount++;
+ log.info("testInitialStateTransfer start - " + testCount);
+ CacheSPI<Object, Object> cache1 = null, cache2 = null;
+ try
+ {
+ cache1 = createCache("nbst");
+ writeInitialData(cache1);
- writeInitialData(cache1);
+ cache2 = createCache("nbst");
- CacheSPI<Object, Object> cache2 = createCache("nbst");
+ // Pause to give caches time to see each other
+ TestingUtil.blockUntilViewsReceived(new CacheSPI[] { cache1, cache2 }, 60000);
- // Pause to give caches time to see each other
- TestingUtil.blockUntilViewsReceived(new CacheSPI[]{cache1, cache2}, 60000);
+ verifyInitialData(cache2);
+ }
+ finally
+ {
+ TestingUtil.killCaches(cache1, cache2);
+ }
+ log.info("testInitialStateTransfer end - " + testCount);
+ }
- verifyInitialData(cache2);
+ public void testConcurrentStateTransfer() throws Exception
+ {
+ testCount++;
+ log.info("testConcurrentStateTransfer start - " + testCount);
+ CacheSPI<Object, Object> cache1 = null, cache2 = null, cache3 = null, cache4
= null;
+ try
+ {
+ cache1 = createCache("nbst");
+ writeInitialData(cache1);
+
+ cache2 = createCache("nbst");
+
+ cache1.put("/delay", "delay", new DelayTransfer());
+
+ // Pause to give caches time to see each other
+ TestingUtil.blockUntilViewsReceived(new CacheSPI[] { cache1, cache2 }, 60000);
+ verifyInitialData(cache2);
+
+ final CacheSPI<Object, Object >c3 = cache3 = createCache("nbst",
false);
+ final CacheSPI<Object, Object >c4 = cache4 = createCache("nbst",
false);
+
+ Thread t1 = new Thread(new Runnable()
+ {
+ public void run()
+ {
+ c3.start();
+ }
+ });
+ t1.start();
+
+ Thread t2 = new Thread(new Runnable()
+ {
+ public void run()
+ {
+ c4.start();
+ }
+ });
+ t2.start();
+
+ t1.join();
+ t2.join();
+
+ TestingUtil.blockUntilViewsReceived(new CacheSPI[] { cache1, cache2, cache3,
cache4 }, 60000);
+ verifyInitialData(cache3);
+ verifyInitialData(cache4);
+ }
+ finally
+ {
+ TestingUtil.killCaches(cache1, cache2, cache3, cache4);
+ }
+ log.info("testConcurrentStateTransfer end - " + testCount);
}
-
public void testSTWithThirdWritingNonTxCache() throws Exception
{
+ testCount++;
+ log.info("testSTWithThirdWritingNonTxCache start - " + testCount);
thirdWritingCacheTest(false, "nbst1");
+ log.info("testSTWithThirdWritingNonTxCache end - " + testCount);
}
public void testSTWithThirdWritingTxCache() throws Exception
{
+ testCount++;
+ log.info("testSTWithThirdWritingTxCache start - " + testCount);
thirdWritingCacheTest(true, "nbst2");
+ log.info("testSTWithThirdWritingTxCache end - " + testCount);
}
public void testSTWithWritingNonTxThread() throws Exception
{
+ testCount++;
+ log.info("testSTWithWritingNonTxThread start - " + testCount);
writingThreadTest(false, "nbst3");
+ log.info("testSTWithWritingNonTxThread end - " + testCount);
}
public void testSTWithWritingTxThread() throws Exception
{
+ testCount++;
+ log.info("testSTWithWritingTxThread start - " + testCount);
writingThreadTest(true, "nbst4");
+ log.info("testSTWithWritingTxThread end - " + testCount);
}
-
private void thirdWritingCacheTest(boolean tx, String name) throws
InterruptedException
{
- final CacheSPI<Object, Object> cache1 = createCache(name);
- final CacheSPI<Object, Object> cache3 = createCache(name);
+ CacheSPI<Object, Object> cache1 = null, cache2 = null, cache3 = null;
+ try
+ {
+ cache1 = createCache(name);
+ cache3 = createCache(name);
- writeInitialData(cache1);
+ writeInitialData(cache1);
- // Delay the transient copy, so that we get a more thorough log test
- cache1.put("/delay", "delay", new DelayTransfer());
+ // Delay the transient copy, so that we get a more thorough log test
+ cache1.put("/delay", "delay", new DelayTransfer());
- WritingRunner writer = new WritingRunner(cache3, tx);
- Thread writerThread = new Thread(writer);
- writerThread.start();
+ WritingRunner writer = new WritingRunner(cache3, tx);
+ Thread writerThread = new Thread(writer);
+ writerThread.start();
- CacheSPI<Object, Object> cache2 = createCache(name);
+ cache2 = createCache(name);
- // Pause to give caches time to see each other
- TestingUtil.blockUntilViewsReceived(new CacheSPI[]{cache1, cache2, cache3},
60000);
+ // Pause to give caches time to see each other
+ TestingUtil.blockUntilViewsReceived(new CacheSPI[] { cache1, cache2, cache3 },
60000);
- writer.stop();
- writerThread.join();
+ writer.stop();
+ writerThread.join();
- verifyInitialData(cache2);
+ verifyInitialData(cache2);
- int count = writer.result();
+ int count = writer.result();
- for (int c = 0; c < count; c++)
- assertEquals(c, cache2.get("/test" + c, "test"));
+ for (int c = 0; c < count; c++)
+ assertEquals(c, cache2.get("/test" + c, "test"));
+ }
+ finally
+ {
+ TestingUtil.killCaches(cache1, cache2, cache3);
+ }
}
private void verifyInitialData(CacheSPI<Object, Object> cache2)
@@ -232,30 +310,38 @@
private void writingThreadTest(boolean tx, String name) throws InterruptedException
{
- final CacheSPI<Object, Object> cache1 = createCache(name);
+ CacheSPI<Object, Object> cache1 = null, cache2 = null;
+ try
+ {
+ cache1 = createCache(name);
- writeInitialData(cache1);
+ writeInitialData(cache1);
- // Delay the transient copy, so that we get a more thorough log test
- cache1.put("/delay", "delay", new DelayTransfer());
+ // Delay the transient copy, so that we get a more thorough log test
+ cache1.put("/delay", "delay", new DelayTransfer());
- WritingRunner writer = new WritingRunner(cache1, tx);
- Thread writerThread = new Thread(writer);
- writerThread.start();
+ WritingRunner writer = new WritingRunner(cache1, tx);
+ Thread writerThread = new Thread(writer);
+ writerThread.start();
- CacheSPI<Object, Object> cache2 = createCache(name);
+ cache2 = createCache(name);
- // Pause to give caches time to see each other
- TestingUtil.blockUntilViewsReceived(new CacheSPI[]{cache1, cache2}, 60000);
+ // Pause to give caches time to see each other
+ TestingUtil.blockUntilViewsReceived(new CacheSPI[] { cache1, cache2 }, 60000);
- writer.stop();
- writerThread.join();
+ writer.stop();
+ writerThread.join();
- verifyInitialData(cache2);
+ verifyInitialData(cache2);
- int count = writer.result();
+ int count = writer.result();
- for (int c = 0; c < count; c++)
- assertEquals(c, cache2.get("/test" + c, "test"));
+ for (int c = 0; c < count; c++)
+ assertEquals(c, cache2.get("/test" + c, "test"));
+ }
+ finally
+ {
+ TestingUtil.killCaches(cache1, cache2);
+ }
}
}
Modified:
core/trunk/src/test/java/org/jboss/cache/transaction/PrepareCommitContentionTest.java
===================================================================
---
core/trunk/src/test/java/org/jboss/cache/transaction/PrepareCommitContentionTest.java 2009-02-10
20:39:38 UTC (rev 7677)
+++
core/trunk/src/test/java/org/jboss/cache/transaction/PrepareCommitContentionTest.java 2009-02-11
06:00:34 UTC (rev 7678)
@@ -4,6 +4,7 @@
import org.jboss.cache.Fqn;
import org.jboss.cache.RPCManager;
import org.jboss.cache.UnitTestCacheFactory;
+import org.jboss.cache.RPCManagerImpl.FlushTracker;
import org.jboss.cache.commands.ReplicableCommand;
import org.jboss.cache.commands.remote.ReplicateCommand;
import org.jboss.cache.commands.tx.CommitCommand;
@@ -23,6 +24,7 @@
import java.util.List;
import java.util.Map;
import java.util.Vector;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* This is to test the scenario described in
http://jira.jboss.org/jira/browse/JBCACHE-1270
@@ -162,9 +164,9 @@
return delegate.getLastStateTransferSource();
}
- public void waitForFlush(long timeout)
+ public FlushTracker getFlushTracker()
{
- delegate.waitForFlush(timeout);
+ return delegate.getFlushTracker();
}
}
}