Author: manik.surtani(a)jboss.com
Date: 2009-03-16 16:37:52 -0400 (Mon, 16 Mar 2009)
New Revision: 7905
Modified:
core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
core/trunk/src/main/java/org/jboss/cache/factories/ComponentRegistry.java
core/trunk/src/main/java/org/jboss/cache/jmx/JmxRegistrationManager.java
core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java
core/trunk/src/main/java/org/jboss/cache/statetransfer/StateProviderBusyException.java
Log:
Re-implemented NBST using RPC instead of a partial FLUSH
Modified: core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java 2009-03-16 20:37:26 UTC
(rev 7904)
+++ core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java 2009-03-16 20:37:52 UTC
(rev 7905)
@@ -63,7 +63,6 @@
import org.jgroups.blocks.GroupRequest;
import org.jgroups.blocks.RspFilter;
import org.jgroups.protocols.TP;
-import org.jgroups.protocols.pbcast.FLUSH;
import org.jgroups.protocols.pbcast.STREAMING_STATE_TRANSFER;
import org.jgroups.stack.ProtocolStack;
import org.jgroups.util.Rsp;
@@ -156,11 +155,12 @@
public abstract class FlushTracker
{
- private final ReclosableLatch flushBlockGate = new ReclosableLatch();
+ // closed whenever a FLUSH is in progress. Open by default.
+ final ReclosableLatch flushBlockGate = new ReclosableLatch(true);
private final AtomicInteger flushCompletionCount = new AtomicInteger();
+ // closed whenever a FLUSH is NOT in progress. Closed by default.
+ final ReclosableLatch flushWaitGate = new ReclosableLatch(false);
- private final ReclosableLatch flushWaitGate = new ReclosableLatch(false);
-
public void block()
{
flushBlockGate.close();
@@ -179,46 +179,27 @@
return flushCompletionCount.get();
}
- public abstract void lockProcessingLock();
+ public abstract void lockProcessingLock() throws InterruptedException;
+
public abstract void unlockProcessingLock();
- public abstract void lockSuspendProcessingLock();
+
+ public abstract void lockSuspendProcessingLock() throws InterruptedException;
+
public abstract void unlockSuspendProcessingLock();
- public void waitForFlushCompletion(long timeout)
+ public void waitForFlushCompletion(long timeout) throws InterruptedException
{
- for (; ;)
+ if (channel.flushSupported() && !flushBlockGate.await(timeout,
TimeUnit.MILLISECONDS))
{
- try
- {
- if (channel.flushSupported() && !flushBlockGate.await(timeout,
TimeUnit.MILLISECONDS))
- {
- throw new TimeoutException("State retrieval timed out waiting for
flush to unblock. (timeout = " + CachePrinter.prettyPrint(timeout) + ")");
- }
- return;
- }
- catch (InterruptedException e)
- {
- Thread.currentThread().interrupt();
- }
+ throw new TimeoutException("State retrieval timed out waiting for flush
to unblock. (timeout = " + CachePrinter.prettyPrint(timeout) + ")");
}
}
- public void waitForFlushStart(long timeout)
+ public void waitForFlushStart(long timeout) throws InterruptedException
{
- for (; ;)
+ if (channel.flushSupported() && !flushWaitGate.await(timeout,
TimeUnit.MILLISECONDS))
{
- try
- {
- if (channel.flushSupported() && !flushWaitGate.await(timeout,
TimeUnit.MILLISECONDS))
- {
- throw new TimeoutException("State retrieval timed out waiting for
flush to block. (timeout = " + CachePrinter.prettyPrint(timeout) + " )");
- }
- return;
- }
- catch (InterruptedException e)
- {
- Thread.currentThread().interrupt();
- }
+ throw new TimeoutException("State retrieval timed out waiting for flush
to block. (timeout = " + CachePrinter.prettyPrint(timeout) + " )");
}
}
}
@@ -247,21 +228,11 @@
{
private final ReentrantReadWriteLock coordinationLock = new
ReentrantReadWriteLock();
- public void lockProcessingLock()
+ public void lockProcessingLock() throws InterruptedException
{
- for (;;)
+ if
(!coordinationLock.readLock().tryLock(configuration.getStateRetrievalTimeout(),
TimeUnit.MILLISECONDS))
{
- try
- {
- if
(!coordinationLock.readLock().tryLock(configuration.getStateRetrievalTimeout(),
TimeUnit.MILLISECONDS))
- throw new TimeoutException("Could not obtain processing
lock");
-
- return;
- }
- catch (InterruptedException e)
- {
- Thread.currentThread().interrupt();
- }
+ throw new TimeoutException("Could not obtain processing lock");
}
}
@@ -270,30 +241,38 @@
coordinationLock.readLock().unlock();
}
- public void lockSuspendProcessingLock()
+ public void lockSuspendProcessingLock() throws InterruptedException
{
- for (;;)
+ if
(!coordinationLock.writeLock().tryLock(configuration.getStateRetrievalTimeout(),
TimeUnit.MILLISECONDS))
{
- try
- {
- if
(!coordinationLock.writeLock().tryLock(configuration.getStateRetrievalTimeout(),
TimeUnit.MILLISECONDS))
- throw new TimeoutException("Could not obtain processing
lock");
-
- return;
- }
- catch (InterruptedException e)
- {
- Thread.currentThread().interrupt();
- }
+ throw new TimeoutException("Could not obtain processing lock");
}
}
public void unlockSuspendProcessingLock()
{
if (coordinationLock.isWriteLockedByCurrentThread())
+ {
coordinationLock.writeLock().unlock();
+ }
}
+ public void waitForFlushCompletion(long timeout) throws InterruptedException
+ {
+ if (!flushBlockGate.await(timeout, TimeUnit.MILLISECONDS))
+ {
+ throw new TimeoutException("State retrieval timed out waiting for flush
to unblock. (timeout = " + CachePrinter.prettyPrint(timeout) + ")");
+ }
+ }
+
+ public void waitForFlushStart(long timeout) throws InterruptedException
+ {
+ if (!flushWaitGate.await(timeout, TimeUnit.MILLISECONDS))
+ {
+ throw new TimeoutException("State retrieval timed out waiting for flush
to block. (timeout = " + CachePrinter.prettyPrint(timeout) + " )");
+ }
+ }
+
}
// ------------ START: Lifecycle methods ------------
@@ -332,7 +311,7 @@
// Allow commands to be ACKed during state transfer
if (nonBlocking)
{
- componentRegistry.setBlockInStarting(false);
+ componentRegistry.setStatusCheckNecessary(false);
}
channel.connect(configuration.getClusterName());
if (log.isInfoEnabled()) log.info("Cache local address is " +
getLocalAddress());
@@ -376,17 +355,19 @@
}
}
- if (log.isInfoEnabled()) log.info("state was retrieved successfully (in
" + CachePrinter.prettyPrint((System.currentTimeMillis() - start)) + ")");
+ if (log.isInfoEnabled())
+ {
+ log.info("state was retrieved successfully (in " +
CachePrinter.prettyPrint((System.currentTimeMillis() - start)) + ")");
+ }
}
}
private void sanityCheckJGroupsStack(JChannel channel)
{
if (channel.getProtocolStack().findProtocol(STREAMING_STATE_TRANSFER.class) ==
null)
+ {
throw new ConfigurationException("JGroups channel does not use
STREAMING_STATE_TRANSFER! This is a requirement for non-blocking state transfer. Either
make sure your JGroups configuration uses STREAMING_STATE_TRANSFER or disable non-blocking
state transfer.");
-
- if (channel.getProtocolStack().findProtocol(FLUSH.class) == null)
- throw new ConfigurationException("JGroups channel does not use FLUSH! This
is a requirement for non-blocking state transfer. Either make sure your JGroups
configuration uses FLUSH or disable non-blocking state transfer.");
+ }
}
private void sanityCheckConfiguration(boolean nonBlockingStateTransfer, boolean
fetchStateOnStart)
@@ -394,12 +375,16 @@
if (isInLocalMode || !nonBlockingStateTransfer || !fetchStateOnStart) return; //
don't care about these cases!
if (configuration.getNodeLockingScheme() != NodeLockingScheme.MVCC)
+ {
throw new ConfigurationException("Non-blocking state transfer is only
supported with the MVCC node locking scheme. Please change your node locking scheme to
MVCC or disable non-blocking state transfer.");
+ }
if (isUsingBuddyReplication)
+ {
throw new ConfigurationException("Non-blocking state transfer cannot be
used with buddy replication at this time. Please disable either buddy replication or
non-blocking state transfer.");
+ }
}
-
+
private void startNonBlockStateTransfer(List<Address> members)
{
if (members.size() < 2)
@@ -440,7 +425,10 @@
if (!success)
{
wait <<= 2;
- if (log.isWarnEnabled()) log.warn("Could not find available peer for
state, backing off and retrying after "+wait+" millis. Retries left: " +
(numRetries -1 -i) );
+ if (log.isWarnEnabled())
+ {
+ log.warn("Could not find available peer for state, backing off and
retrying after " + wait + " millis. Retries left: " + (numRetries - 1 -
i));
+ }
try
{
@@ -460,7 +448,7 @@
throw new CacheException("Unable to fetch state on startup");
}
- componentRegistry.setBlockInStarting(true);
+ componentRegistry.setStatusCheckNecessary(true);
}
public void disconnect()
@@ -522,7 +510,10 @@
{
ReflectionUtil.setValue(configuration, "accessible", true);
configuration.setUsingMultiplexer(true);
- if (log.isDebugEnabled()) log.debug("Created Multiplexer Channel for
cache cluster " + configuration.getClusterName() + " using stack " +
configuration.getMultiplexerStack());
+ if (log.isDebugEnabled())
+ {
+ log.debug("Created Multiplexer Channel for cache cluster " +
configuration.getClusterName() + " using stack " +
configuration.getMultiplexerStack());
+ }
}
else
{
@@ -697,8 +688,14 @@
if (rpcDispatcher == null) return null;
int modeToUse = mode;
int preferredMode;
- if ((preferredMode =
spi.getInvocationContext().getOptionOverrides().getGroupRequestMode()) > -1) modeToUse
= preferredMode;
- if (trace) log.trace("callRemoteMethods(): valid members are " +
recipients + " methods: " + command + " Using OOB? " +
useOutOfBandMessage + " modeToUse: " + modeToUse);
+ if ((preferredMode =
spi.getInvocationContext().getOptionOverrides().getGroupRequestMode()) > -1)
+ {
+ modeToUse = preferredMode;
+ }
+ if (trace)
+ {
+ log.trace("callRemoteMethods(): valid members are " + recipients +
" methods: " + command + " Using OOB? " + useOutOfBandMessage + "
modeToUse: " + modeToUse);
+ }
flushTracker.lockProcessingLock();
unlock = true;
@@ -707,7 +704,10 @@
useOutOfBandMessage = false;
RspList rsps = rpcDispatcher.invokeRemoteCommands(recipients, command,
modeToUse, timeout, isUsingBuddyReplication, useOutOfBandMessage, responseFilter);
if (mode == GroupRequest.GET_NONE) return Collections.emptyList();// async case
- if (trace) log.trace("(" + getLocalAddress() + "): responses for
method " + command.getClass().getSimpleName() + ":\n" + rsps);
+ if (trace)
+ {
+ log.trace("(" + getLocalAddress() + "): responses for method
" + command.getClass().getSimpleName() + ":\n" + rsps);
+ }
// short-circuit no-return-value calls.
if (rsps == null) return Collections.emptyList();
List<Object> retval = new ArrayList<Object>(rsps.size());
@@ -751,7 +751,9 @@
{
computeStats(success);
if (unlock)
+ {
flushTracker.unlockProcessingLock();
+ }
}
}
@@ -779,7 +781,10 @@
// should this really be throwing an exception? Are there valid use cases where
partial state may not be available? - Manik
// Yes -- cache is configured LOCAL but app doesn't know it -- Brian
//throw new IllegalArgumentException("Cannot fetch partial state, targets
are " + sources + " and stateId is " + stateId);
- if (log.isWarnEnabled()) log.warn("Cannot fetch partial state, targets are
" + sources + " and stateId is " + stateId);
+ if (log.isWarnEnabled())
+ {
+ log.warn("Cannot fetch partial state, targets are " + sources +
" and stateId is " + stateId);
+ }
return;
}
@@ -796,13 +801,19 @@
return;
}
- if (log.isDebugEnabled()) log.debug("Node " + getLocalAddress() + "
fetching partial state " + stateId + " from members " + targets);
+ if (log.isDebugEnabled())
+ {
+ log.debug("Node " + getLocalAddress() + " fetching partial state
" + stateId + " from members " + targets);
+ }
boolean successfulTransfer = false;
for (Address target : targets)
{
try
{
- if (log.isDebugEnabled()) log.debug("Node " + getLocalAddress() +
" fetching partial state " + stateId + " from member " + target);
+ if (log.isDebugEnabled())
+ {
+ log.debug("Node " + getLocalAddress() + " fetching partial
state " + stateId + " from member " + target);
+ }
messageListener.setStateSet(false);
successfulTransfer = getState(stateId, target);
if (successfulTransfer)
@@ -817,17 +828,26 @@
successfulTransfer = false;
}
}
- if (log.isDebugEnabled()) log.debug("Node " + getLocalAddress() +
" fetching partial state " + stateId + " from member " + target +
(successfulTransfer ? " successful" : " failed"));
+ if (log.isDebugEnabled())
+ {
+ log.debug("Node " + getLocalAddress() + " fetching partial
state " + stateId + " from member " + target + (successfulTransfer ? "
successful" : " failed"));
+ }
if (successfulTransfer) break;
}
catch (IllegalStateException ise)
{
// thrown by the JGroups channel if state retrieval fails.
- if (log.isInfoEnabled()) log.info("Channel problems fetching state.
Continuing on to next provider. ", ise);
+ if (log.isInfoEnabled())
+ {
+ log.info("Channel problems fetching state. Continuing on to next
provider. ", ise);
+ }
}
}
- if (!successfulTransfer && log.isDebugEnabled()) log.debug("Node
" + getLocalAddress() + " could not fetch partial state " + stateId +
" from any member " + targets);
+ if (!successfulTransfer && log.isDebugEnabled())
+ {
+ log.debug("Node " + getLocalAddress() + " could not fetch partial
state " + stateId + " from any member " + targets);
+ }
}
private boolean getState(String stateId, Address target) throws
ChannelNotConnectedException, ChannelClosedException
@@ -989,21 +1009,24 @@
*/
public void block()
{
- try
+ if (!configuration.isNonBlockingStateTransfer())
{
- if (log.isDebugEnabled()) log.debug("Block received at " +
getLocalAddress());
+ try
+ {
+ if (log.isDebugEnabled()) log.debug("Block received at " +
getLocalAddress());
- flushTracker.block();
- notifier.notifyCacheBlocked(true);
- notifier.notifyCacheBlocked(false);
+ flushTracker.block();
+ notifier.notifyCacheBlocked(true);
+ notifier.notifyCacheBlocked(false);
- if (log.isDebugEnabled()) log.debug("Block processed at " +
getLocalAddress());
+ if (log.isDebugEnabled()) log.debug("Block processed at " +
getLocalAddress());
+ }
+ catch (Throwable e)
+ {
+ //do not rethrow! jgroups might behave funny, resulting even in deadlock
+ log.error("Error found while processing block()", e);
+ }
}
- catch (Throwable e)
- {
- //do not rethrow! jgroups might behave funny, resulting even in deadlock
- log.error("Error found while processing block()", e);
- }
}
/**
@@ -1011,23 +1034,25 @@
*/
public void unblock()
{
- try
+ if (!configuration.isNonBlockingStateTransfer())
{
- if (log.isDebugEnabled()) log.debug("UnBlock received at " +
getLocalAddress());
+ try
+ {
+ if (log.isDebugEnabled()) log.debug("UnBlock received at " +
getLocalAddress());
- notifier.notifyCacheUnblocked(true);
- notifier.notifyCacheUnblocked(false);
- flushTracker.unblock();
+ notifier.notifyCacheUnblocked(true);
+ notifier.notifyCacheUnblocked(false);
+ flushTracker.unblock();
- if (log.isDebugEnabled()) log.debug("UnBlock processed at " +
getLocalAddress());
+ if (log.isDebugEnabled()) log.debug("UnBlock processed at " +
getLocalAddress());
+ }
+ catch (Throwable e)
+ {
+ //do not rethrow! jgroups might behave funny, resulting even in deadlock
+ log.error("Error found while processing unblock", e);
+ }
}
- catch (Throwable e)
- {
- //do not rethrow! jgroups might behave funny, resulting even in deadlock
- log.error("Error found while processing unblock", e);
- }
}
-
}
//jmx operations
Modified: core/trunk/src/main/java/org/jboss/cache/factories/ComponentRegistry.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/factories/ComponentRegistry.java 2009-03-16
20:37:26 UTC (rev 7904)
+++ core/trunk/src/main/java/org/jboss/cache/factories/ComponentRegistry.java 2009-03-16
20:37:52 UTC (rev 7905)
@@ -105,7 +105,7 @@
*/
private boolean invokedFromShutdownHook;
- private volatile boolean blockInStarting = true;
+ private volatile boolean statusCheckNecessary = true;
/**
* Creates an instance of the component registry. The configuration passed in is
automatically registered.
@@ -886,23 +886,26 @@
if (trace) log.trace("Is remotely originating.");
// else if this is a remote call and the status is STARTING, wait until the cache
starts.
- if (state == CacheStatus.STARTING && blockInStarting)
+ if (statusCheckNecessary)
{
- if (trace) log.trace("Cache is starting; block.");
- try
+ if (state == CacheStatus.STARTING)
{
- blockUntilCacheStarts();
- return true;
+ if (trace) log.trace("Cache is starting; block.");
+ try
+ {
+ blockUntilCacheStarts();
+ return true;
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
}
- catch (InterruptedException e)
+ else
{
- Thread.currentThread().interrupt();
+ log.warn("Received a remote call but the cache is not in STARTED state -
ignoring call.");
}
}
- else if (blockInStarting)
- {
- log.warn("Received a remote call but the cache is not in STARTED state -
ignoring call.");
- }
return false;
}
@@ -1022,14 +1025,14 @@
/**
* Returns an immutable set contating all the components that exists in the
reporsitory at this moment.
*/
- public Set<Component> getRegiteredComponents()
+ public Set<Component> getRegisteredComponents()
{
HashSet<Component> defensiveCopy = new
HashSet<Component>(componentLookup.values());
return Collections.unmodifiableSet(defensiveCopy);
}
- public void setBlockInStarting(boolean blockInStarting)
+ public void setStatusCheckNecessary(boolean statusCheckNecessary)
{
- this.blockInStarting = blockInStarting;
+ this.statusCheckNecessary = statusCheckNecessary;
}
}
Modified: core/trunk/src/main/java/org/jboss/cache/jmx/JmxRegistrationManager.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/jmx/JmxRegistrationManager.java 2009-03-16
20:37:26 UTC (rev 7904)
+++ core/trunk/src/main/java/org/jboss/cache/jmx/JmxRegistrationManager.java 2009-03-16
20:37:52 UTC (rev 7905)
@@ -196,7 +196,7 @@
private List<ResourceDMBean> getResourceDMBeans()
{
List<ResourceDMBean> resourceDMBeans = new
ArrayList<ResourceDMBean>();
- for (ComponentRegistry.Component component :
cacheSpi.getComponentRegistry().getRegiteredComponents())
+ for (ComponentRegistry.Component component :
cacheSpi.getComponentRegistry().getRegisteredComponents())
{
ResourceDMBean resourceDMBean = new ResourceDMBean(component.getInstance());
if (resourceDMBean.isManagedResource())
Modified:
core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java 2009-03-16
20:37:26 UTC (rev 7904)
+++
core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java 2009-03-16
20:37:52 UTC (rev 7905)
@@ -21,17 +21,6 @@
*/
package org.jboss.cache.marshall;
-import java.io.NotSerializableException;
-import java.util.Map;
-import java.util.Vector;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
import org.jboss.cache.InvocationContext;
import org.jboss.cache.RPCManager;
import org.jboss.cache.RPCManagerImpl.FlushTracker;
@@ -40,6 +29,7 @@
import org.jboss.cache.commands.remote.AnnounceBuddyPoolNameCommand;
import org.jboss.cache.commands.remote.AssignToBuddyGroupCommand;
import org.jboss.cache.commands.remote.RemoveFromBuddyGroupCommand;
+import org.jboss.cache.commands.remote.StateTransferControlCommand;
import org.jboss.cache.config.Configuration;
import org.jboss.cache.factories.ComponentRegistry;
import org.jboss.cache.interceptors.InterceptorChain;
@@ -58,6 +48,17 @@
import org.jgroups.util.Rsp;
import org.jgroups.util.RspList;
+import java.io.NotSerializableException;
+import java.util.Map;
+import java.util.Vector;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
/**
* A JGroups RPC dispatcher that knows how to deal with {@link
org.jboss.cache.commands.ReplicableCommand}s.
*
@@ -267,7 +268,7 @@
boolean replayIgnored = false;
- if (configuration.isNonBlockingStateTransfer())
+ if (configuration.isNonBlockingStateTransfer() && !(cmd instanceof
StateTransferControlCommand))
{
int flushCount = flushTracker.getFlushCompletionCount();
flushTracker.lockProcessingLock();
@@ -297,10 +298,7 @@
if (trace) log.trace("This is a non-visitable command - so performing
directly and not via the invoker.");
// need to check cache status for all except buddy replication commands.
- if (!(cmd instanceof AnnounceBuddyPoolNameCommand ||
- cmd instanceof AssignToBuddyGroupCommand ||
- cmd instanceof RemoveFromBuddyGroupCommand)
- && !componentRegistry.invocationsAllowed(false))
+ if (requiresRunningCache(cmd) &&
!componentRegistry.invocationsAllowed(false))
{
return new RequestIgnoredResponse();
}
@@ -326,6 +324,14 @@
}
}
+ private boolean requiresRunningCache(ReplicableCommand cmd)
+ {
+ return !(cmd instanceof AnnounceBuddyPoolNameCommand ||
+ cmd instanceof AssignToBuddyGroupCommand ||
+ cmd instanceof RemoveFromBuddyGroupCommand ||
+ cmd instanceof StateTransferControlCommand);
+ }
+
@Override
public String toString()
{
Modified:
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java 2009-03-16
20:37:26 UTC (rev 7904)
+++
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java 2009-03-16
20:37:52 UTC (rev 7905)
@@ -32,7 +32,9 @@
import org.jboss.cache.NodeSPI;
import org.jboss.cache.RPCManager;
import org.jboss.cache.buddyreplication.BuddyManager;
+import org.jboss.cache.commands.CommandsFactory;
import org.jboss.cache.commands.WriteCommand;
+import org.jboss.cache.commands.remote.StateTransferControlCommand;
import org.jboss.cache.commands.tx.PrepareCommand;
import org.jboss.cache.config.Configuration;
import org.jboss.cache.factories.ComponentRegistry;
@@ -49,17 +51,16 @@
import org.jboss.cache.transaction.TransactionLog;
import org.jboss.cache.transaction.TransactionLog.LogEntry;
import org.jgroups.Address;
-import org.jgroups.Channel;
import java.io.IOException;
import java.io.ObjectInputStream;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.Vector;
public class DefaultStateTransferIntegrator implements StateTransferIntegrator
{
@@ -71,25 +72,29 @@
private Set<Fqn> internalFqns;
private Configuration cfg;
- private RPCManager manager;
+ private RPCManager rpcManager;
private TransactionLog txLog;
private boolean needToPersistState; // for JBCACHE-131
private boolean nonBlocking;
private InvocationContextContainer container;
private InterceptorChain chain;
private ComponentRegistry registry;
+ private CommandsFactory commandsFactory;
@Inject
- public void inject(CacheSPI<?, ?> cache, Configuration cfg, RPCManager
rpcManager, TransactionLog txLog, InvocationContextContainer container, InterceptorChain
chain, ComponentRegistry registry)
+ public void inject(CacheSPI<?, ?> cache, Configuration cfg, RPCManager
rpcManager, TransactionLog txLog,
+ InvocationContextContainer container, InterceptorChain chain,
ComponentRegistry registry,
+ CommandsFactory commandsFactory)
{
this.cache = cache;
this.cfg = cfg;
- this.manager = rpcManager;
+ this.rpcManager = rpcManager;
this.nonBlocking = cfg.isNonBlockingStateTransfer();
this.txLog = txLog;
this.container = container;
this.chain = chain;
this.registry = registry;
+ this.commandsFactory = commandsFactory;
}
@Start(priority = 14)
@@ -120,47 +125,21 @@
integrateTxLog(ois);
}
- private void doPartialFlush(Channel channel, List<Address> members)
+ /**
+ * Mimics a partial flush between the current instance and the address to flush, by
opening and closing the necessary
+ * latches on both ends.
+ * @param addressToFlush address to flush in addition to the current address
+ * @param block if true, mimics setting a flush. Otherwise, mimics un-setting a
flush.
+ * @throws Exception if there are issues
+ */
+ private void mimicPartialFlushViaRPC(Address addressToFlush, boolean block) throws
Exception
{
- int retries = 5;
- int sleepBetweenRetries = 250;
- int sleepIncreaseFactor = 2;
- if (trace) log.trace("Attempting a partial flush on members " + members +
" with up to " + retries + " retries.");
-
- boolean success = false;
- int i;
- for (i=1; i<=retries; i++)
- {
- if (trace) log.trace("Attempt number " + i);
- try
- {
- if (success = channel.startFlush(members, false)) break;
- if (trace) log.trace("Channel.startFlush() returned false!");
- }
- catch (Exception e)
- {
- if (trace) log.trace("Caught exception attempting a partial flush",
e);
- }
- try
- {
- if (trace) log.trace("Partial state transfer failed. Backing off for
" + sleepBetweenRetries + " millis and retrying");
- Thread.sleep(sleepBetweenRetries);
- sleepBetweenRetries *= sleepIncreaseFactor;
- }
- catch (InterruptedException ie)
- {
- Thread.currentThread().interrupt();
- }
- }
-
- if (success)
- {
- if (log.isDebugEnabled()) log.debug("Partial flush between " + members
+ " succeeded!");
- }
- else
- {
- throw new CacheException("Could initiate partial flush between "
+members+ "! State-transfer failed!");
- }
+ StateTransferControlCommand cmd =
commandsFactory.buildStateTransferControlCommand(block);
+ Vector<Address> recipient = new Vector<Address>();
+ recipient.add(addressToFlush);
+ if (!block) rpcManager.getFlushTracker().unblock();
+ rpcManager.callRemoteMethods(recipient, cmd, true, cfg.getStateRetrievalTimeout(),
true);
+ if (block) rpcManager.getFlushTracker().block();
}
private void integrateTxLog(ObjectInputStream ois) throws Exception
@@ -170,11 +149,8 @@
processCommitLog(ois);
- Channel channel = manager.getChannel();
+ mimicPartialFlushViaRPC(rpcManager.getLastStateTransferSource(), true);
- List<Address> targets = Arrays.asList(channel.getLocalAddress(),
manager.getLastStateTransferSource());
- doPartialFlush(channel, targets);
-
try
{
if (trace)
@@ -201,12 +177,13 @@
// Block all remote commands once transfer is complete,
// and before FLUSH completes
- registry.setBlockInStarting(true);
+ registry.setStatusCheckNecessary(true);
}
finally
{
if (trace) log.trace("Stopping partial flush");
- channel.stopFlush(targets);
+// channel.stopFlush(targets);
+ mimicPartialFlushViaRPC(rpcManager.getLastStateTransferSource(), false);
}
}
Modified:
core/trunk/src/main/java/org/jboss/cache/statetransfer/StateProviderBusyException.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/statetransfer/StateProviderBusyException.java 2009-03-16
20:37:26 UTC (rev 7904)
+++
core/trunk/src/main/java/org/jboss/cache/statetransfer/StateProviderBusyException.java 2009-03-16
20:37:52 UTC (rev 7905)
@@ -1,12 +1,14 @@
package org.jboss.cache.statetransfer;
+import org.jboss.cache.CacheException;
+
/**
* Thrown when a state provider is busy
*
* @author Manik Surtani
* @since 3.1
*/
-public class StateProviderBusyException extends Exception
+public class StateProviderBusyException extends CacheException
{
public StateProviderBusyException()
{