Author: manik.surtani(a)jboss.com
Date: 2008-01-17 17:34:36 -0500 (Thu, 17 Jan 2008)
New Revision: 5161
Added:
core/trunk/src/main/java/org/jboss/cache/factories/ReplicationQueueFactory.java
Modified:
core/trunk/src/main/java/org/jboss/cache/CacheImpl.java
core/trunk/src/main/java/org/jboss/cache/CacheSPI.java
core/trunk/src/main/java/org/jboss/cache/RPCManager.java
core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
core/trunk/src/main/java/org/jboss/cache/RegionManager.java
core/trunk/src/main/java/org/jboss/cache/ReplicationQueue.java
core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java
core/trunk/src/main/java/org/jboss/cache/factories/ComponentRegistry.java
core/trunk/src/main/java/org/jboss/cache/factories/EmptyConstructorFactory.java
core/trunk/src/main/java/org/jboss/cache/factories/InterceptorChainFactory.java
core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/InvocationContextInterceptor.java
core/trunk/src/main/java/org/jboss/cache/invocation/CacheInvocationDelegate.java
core/trunk/src/main/java/org/jboss/cache/lock/LockUtil.java
core/trunk/src/test/java/org/jboss/cache/api/CacheSPITest.java
core/trunk/src/test/java/org/jboss/cache/api/NodeAPITest.java
core/trunk/src/test/java/org/jboss/cache/api/pfer/PutForExternalReadTestBase.java
core/trunk/src/test/java/org/jboss/cache/misc/TestingUtil.java
Log:
Refactored RPCManager and CacheImpl so that the channel is now created and owned solely by
the RPCManager
Modified: core/trunk/src/main/java/org/jboss/cache/CacheImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/CacheImpl.java 2008-01-17 15:22:36 UTC (rev
5160)
+++ core/trunk/src/main/java/org/jboss/cache/CacheImpl.java 2008-01-17 22:34:36 UTC (rev
5161)
@@ -11,7 +11,6 @@
import org.jboss.cache.buddyreplication.BuddyManager;
import org.jboss.cache.buddyreplication.GravitateResult;
import org.jboss.cache.config.Configuration;
-import org.jboss.cache.config.RuntimeConfig;
import org.jboss.cache.factories.ComponentRegistry;
import org.jboss.cache.factories.InterceptorChainFactory;
import org.jboss.cache.factories.annotations.ComponentName;
@@ -22,11 +21,9 @@
import org.jboss.cache.loader.CacheLoaderManager;
import org.jboss.cache.lock.IsolationLevel;
import org.jboss.cache.lock.LockStrategyFactory;
-import org.jboss.cache.lock.LockUtil;
import org.jboss.cache.lock.LockingException;
import org.jboss.cache.lock.NodeLock;
import org.jboss.cache.lock.TimeoutException;
-import org.jboss.cache.marshall.InactiveRegionAwareRpcDispatcher;
import org.jboss.cache.marshall.Marshaller;
import org.jboss.cache.marshall.MethodCall;
import org.jboss.cache.marshall.MethodCallFactory;
@@ -35,29 +32,27 @@
import org.jboss.cache.notifications.Notifier;
import org.jboss.cache.notifications.event.NodeModifiedEvent;
import org.jboss.cache.optimistic.DataVersion;
-import org.jboss.cache.remoting.jgroups.CacheMessageListener;
-import org.jboss.cache.statetransfer.StateTransferManager;
import org.jboss.cache.transaction.GlobalTransaction;
import org.jboss.cache.transaction.OptimisticTransactionEntry;
import org.jboss.cache.transaction.TransactionEntry;
import org.jboss.cache.transaction.TransactionTable;
import org.jboss.cache.util.CachePrinter;
-import org.jboss.cache.util.ThreadGate;
-import org.jboss.cache.util.reflect.ReflectionUtil;
-import org.jgroups.*;
-import org.jgroups.blocks.GroupRequest;
-import org.jgroups.blocks.RpcDispatcher;
-import org.jgroups.blocks.RspFilter;
-import org.jgroups.util.Rsp;
-import org.jgroups.util.RspList;
+import org.jgroups.Address;
import javax.management.MBeanServerFactory;
import javax.transaction.Status;
import javax.transaction.SystemException;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
-import java.io.NotSerializableException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
/**
* The default implementation class of {@link org.jboss.cache.Cache} and {@link
org.jboss.cache.CacheSPI}. This class
@@ -82,11 +77,6 @@
private Log log = LogFactory.getLog(CacheImpl.class);
/**
- * Thread gate used to block Dispatcher during JGroups FLUSH protocol
- */
- private final ThreadGate flushBlockGate = new ThreadGate();
-
- /**
* Root node.
*/
private NodeSPI root;
@@ -97,31 +87,6 @@
private RegionManager regionManager = null;
/**
- * The JGroups JChannel in use.
- */
- protected Channel channel = null;
-
- /**
- * True if this CacheImpl is the coordinator.
- */
- private volatile boolean coordinator = false;
-
- /**
- * List of cluster group members.
- */
- private final Vector<Address> members = new Vector<Address>();
-
- /**
- * JGroups RpcDispatcher in use.
- */
- private RpcDispatcher disp = null;
-
- /**
- * JGroups message listener.
- */
- private CacheMessageListener messageListener;
-
- /**
* Maintains mapping of transactions (keys) and Modifications/Undo-Operations
*/
private TransactionTable transactionTable;
@@ -148,11 +113,6 @@
private CacheLoaderManager cacheLoaderManager;
/**
- * Queue used to replicate updates when mode is repl-async
- */
- private ReplicationQueue repl_queue = null;
-
- /**
* The current lifecycle state.
*/
CacheStatus cacheStatus;
@@ -185,6 +145,7 @@
* from a shutdown hook.
*/
private boolean invokedFromShutdownHook;
+ private RPCManager rpcManager;
/**
* Constructs an uninitialized CacheImpl.
@@ -218,8 +179,8 @@
@Inject
private void injectDependencies(Notifier notifier, RegionManager regionManager,
TransactionManager transactionManager, Marshaller marshaller,
TransactionTable transactionTable, NodeFactory
nodeFactory,
- CacheSPI spi, CacheMessageListener messageListener,
@ComponentName("remoteDelegate")RemoteCacheInvocationDelegate remoteDelegate,
- Interceptor interceptorChain, BuddyManager
buddyManager)
+ CacheSPI spi,
@ComponentName("remoteDelegate")RemoteCacheInvocationDelegate remoteDelegate,
+ Interceptor interceptorChain, BuddyManager
buddyManager, RPCManager rpcManager)
{
this.notifier = notifier;
this.regionManager = regionManager;
@@ -227,11 +188,11 @@
this.transactionTable = transactionTable;
this.nodeFactory = nodeFactory;
this.spi = spi;
- this.messageListener = messageListener;
this.remoteDelegate = remoteDelegate;
this.marshaller = marshaller;
this.interceptorChain = interceptorChain;
this.buddyManager = buddyManager;
+ this.rpcManager = rpcManager;
}
public Configuration getConfiguration()
@@ -256,34 +217,6 @@
}
/**
- * Returns the local channel address.
- */
- public Address getLocalAddress()
- {
- return channel != null ? channel.getLocalAddress() : null;
- }
-
- /**
- * Returns the members as a List.
- * This list may be concurrently modified.
- */
- public List<Address> getMembers()
- {
- synchronized (members)
- {
- return new ArrayList<Address>(members);
- }
- }
-
- /**
- * Returns <code>true</code> if this node is the group coordinator.
- */
- public boolean isCoordinator()
- {
- return coordinator;
- }
-
- /**
* Returns the transaction table.
*/
public TransactionTable getTransactionTable()
@@ -291,38 +224,7 @@
return transactionTable;
}
- private void setUseReplQueue(boolean flag)
- {
- if (flag)
- {
- if (repl_queue == null)
- {
- repl_queue = new ReplicationQueue(this, configuration.getReplQueueInterval(),
configuration.getReplQueueMaxElements());
- if (configuration.getReplQueueInterval() >= 0)
- {
- repl_queue.start();
- }
- }
- }
- else
- {
- if (repl_queue != null)
- {
- repl_queue.stop();
- repl_queue = null;
- }
- }
- }
-
/**
- * Returns the replication queue.
- */
- public ReplicationQueue getReplicationQueue()
- {
- return repl_queue;
- }
-
- /**
* Sets the cache locking isolation level.
*/
private void setIsolationLevel(IsolationLevel level)
@@ -339,99 +241,6 @@
}
/**
- * Fetches the group state from the current coordinator. If successful, this
- * will trigger JChannel setState() call.
- */
- public void fetchState(long timeout) throws ChannelClosedException,
ChannelNotConnectedException
- {
- if (channel == null)
- {
- throw new ChannelNotConnectedException();
- }
- boolean rc = channel.getState(null, timeout);
- if (rc)
- {
- log.debug("fetchState(): state was retrieved successfully");
- }
- else
- {
- log.debug("fetchState(): state could not be retrieved (first
member)");
- }
- }
-
- public void fetchPartialState(List<Address> sources, Fqn sourceTarget, Fqn
integrationTarget) throws Exception
- {
- String encodedStateId = sourceTarget + StateTransferManager.PARTIAL_STATE_DELIMITER
+ integrationTarget;
- fetchPartialState(sources, encodedStateId);
- }
-
- public void fetchPartialState(List<Address> sources, Fqn subtree) throws
Exception
- {
- if (subtree == null)
- {
- throw new IllegalArgumentException("Cannot fetch partial state. Null
subtree.");
- }
- fetchPartialState(sources, subtree.toString());
- }
-
- private void fetchPartialState(List<Address> sources, String stateId) throws
Exception
- {
- if (sources == null || sources.isEmpty() || stateId == null)
- {
- // should this really be throwing an exception? Are there valid use cases where
partial state may not be available? - Manik
- // Yes -- cache is configured LOCAL but app doesn't know it -- Brian
- //throw new IllegalArgumentException("Cannot fetch partial state, targets
are " + sources + " and stateId is " + stateId);
- if (log.isWarnEnabled())
- {
- log.warn("Cannot fetch partial state, targets are " + sources +
- " and stateId is " + stateId);
- }
- return;
- }
-
- List<Address> targets = new LinkedList<Address>(sources);
-
- //skip *this* node as a target
- targets.remove(getLocalAddress());
-
- if (targets.isEmpty())
- {
- // Definitely no exception here -- this happens every time the 1st node in the
- // cluster activates a region!! -- Brian
- log.debug("Cannot fetch partial state. There are no target members
specified");
- return;
- }
-
- log.debug("Node " + getLocalAddress() + " fetching partial state
" + stateId + " from members " + targets);
- boolean successfulTransfer = false;
- for (Address target : targets)
- {
- log.debug("Node " + getLocalAddress() + " fetching partial state
" + stateId + " from member " + target);
- messageListener.setStateSet(false);
- successfulTransfer = channel.getState(target, stateId,
configuration.getStateRetrievalTimeout());
- if (successfulTransfer)
- {
- try
- {
- messageListener.waitForState();
- }
- catch (Exception transferFailed)
- {
- successfulTransfer = false;
- }
- }
- log.debug("Node " + getLocalAddress() + " fetching partial state
" + stateId + " from member " + target + (successfulTransfer ? "
successful" : " failed"));
- if (successfulTransfer)
- break;
- }
-
- if (!successfulTransfer)
- {
- log.debug("Node " + getLocalAddress() + " could not fetch partial
state " + stateId + " from any member " + targets);
- }
- }
-
- /**
* Lifecycle method. This is like initialize.
*
* @throws Exception
@@ -493,18 +302,11 @@
componentRegistry.wire();
correctRootNodeType();
- setUseReplQueue(configuration.isUseReplQueue());
setIsolationLevel(configuration.getIsolationLevel());
cacheStatus = CacheStatus.CREATED;
}
- protected boolean shouldFetchStateOnStartup()
- {
- boolean loaderFetch = cacheLoaderManager != null &&
cacheLoaderManager.isFetchPersistentState();
- return !configuration.isInactiveOnStartup() && buddyManager == null
&& (configuration.isFetchInMemoryState() || loaderFetch);
- }
-
/**
* Creates a new root node if one does not exist, or if the existing one does not
match the type according to the configuration.
*/
@@ -602,81 +404,17 @@
case REPL_ASYNC:
case INVALIDATION_ASYNC:
case INVALIDATION_SYNC:
- if (log.isDebugEnabled()) log.debug("cache mode is " +
configuration.getCacheMode());
- initialiseChannelAndRpcDispatcher();
-
- //connect and transfer state
- if (shouldFetchStateOnStartup())
- {
- try
- {
- long start = System.currentTimeMillis();
- channel.connect(configuration.getClusterName(), null, null,
configuration.getStateRetrievalTimeout());
- // reconfigure log category so that the instance name is reflected as
well.
- configureLogCategory();
- //if I am not the only and the first member than wait for a state to
arrive
- if (getMembers().size() > 1)
- {
- messageListener.waitForState();
- }
-
- if (log.isDebugEnabled())
- {
- log.debug("connected, state was retrieved successfully (in
" + (System.currentTimeMillis() - start)
- + " milliseconds)");
- }
- }
- catch (StateTransferException ste)
- {
- // make sure we disconnect from the channel before we throw this
exception!
- // JBCACHE-761
- channel.disconnect();
- channel.close();
- throw new CacheException("Unable to fetch state on startup",
ste);
- }
- catch (ChannelException e)
- {
- throw new CacheException("Unable to connect to JGroups
channel", e);
- }
- catch (Exception ex)
- {
- throw new CacheException("Unable to fetch state on startup",
ex);
- }
- }
- //otherwise just connect
- else
- {
- try
- {
- channel.connect(configuration.getClusterName());
- // reconfigure log category so that the instance name is reflected as
well.
- configureLogCategory();
- }
- catch (ChannelException e)
- {
- throw new CacheException("Unable to connect to JGroups
channel", e);
- }
- }
- if (log.isInfoEnabled())
- {
- log.info("CacheImpl local address is " +
channel.getLocalAddress());
- }
- if (buddyManager != null && buddyManager.isEnabled())
- {
- //buddyManager.init(this);
- buddyManager.init();
- if (configuration.isUseReplQueue())
- {
- log.warn("Replication queue not supported when using buddy
replication. Disabling repliction queue.");
- configuration.setUseReplQueue(false);
- repl_queue = null;
- }
- }
+ // reconfigure log category so that the instance name is reflected as well.
+ configureLogCategory();
break;
default:
throw new IllegalArgumentException("cache mode " +
configuration.getCacheMode() + " is invalid");
}
+ // these 2 components need to be started manually since they can only be started
after ALL other components have started.
+ if (rpcManager != null) rpcManager.start();
+ if (buddyManager != null) buddyManager.init();
+
//now attempt to preload the cache from the loader - Manik
if (cacheLoaderManager != null)
{
@@ -779,25 +517,8 @@
// The rest of these should have already been taken care of in stop,
// but we do it here as well in case stop failed.
+ rpcManager.stop();
- if (channel != null)
- {
- if (channel.isOpen())
- {
- try
- {
- channel.close();
- channel.disconnect();
- }
- catch (Exception toLog)
- {
- log.error("Problem closing channel; setting it to null",
toLog);
- }
- }
- channel = null;
- configuration.getRuntimeConfig().setChannel(null);
- }
- disp = null;
transactionManager = null;
componentRegistry.reset();
@@ -843,35 +564,6 @@
componentRegistry.stop();
- if (channel != null)
- {
- log.info("stop(): closing the channel");
- killChannel();
- channel = null;
- configuration.getRuntimeConfig().setChannel(null);
- }
-
- if (disp != null)
- {
- log.info("stop(): stopping the dispatcher");
- disp.stop();
- disp = null;
- }
- if (members != null)
- {
- synchronized (members)
- {
- members.clear();
- }
- }
-
- coordinator = false;
-
- if (repl_queue != null)
- {
- repl_queue.stop();
- }
-
if (notifier != null)
{
notifier.notifyCacheStopped(spi, spi.getInvocationContext());
@@ -908,37 +600,6 @@
}
/**
- * Returns the address of the coordinator or null if there is no
- * coordinator.
- * Waits until the membership view is updated.
- */
- public Address getCoordinator()
- {
- if (channel == null)
- {
- return null;
- }
-
- synchronized (members)
- {
- while (members.isEmpty())
- {
- log.debug("getCoordinator(): waiting on viewAccepted()");
- try
- {
- members.wait();
- }
- catch (InterruptedException e)
- {
- log.error("getCoordinator(): Interrupted while waiting for members to
be set", e);
- break;
- }
- }
- return members.size() > 0 ? members.get(0) : null;
- }
- }
-
- /**
* Evicts the node at <code>subtree</code> along with all descendant
nodes.
*
* @param subtree Fqn indicating the uppermost node in the
@@ -979,55 +640,6 @@
}
- private void removeLocksForDeadMembers(NodeSPI node, List deadMembers)
- {
- Set<GlobalTransaction> deadOwners = new HashSet<GlobalTransaction>();
- NodeLock lock = node.getLock();
- Object owner = lock.getWriterOwner();
-
- if (isLockOwnerDead(owner, deadMembers))
- {
- deadOwners.add((GlobalTransaction) owner);
- }
-
- for (Object readOwner : lock.getReaderOwners())
- {
- if (isLockOwnerDead(readOwner, deadMembers))
- {
- deadOwners.add((GlobalTransaction) readOwner);
- }
- }
-
- for (GlobalTransaction deadOwner : deadOwners)
- {
- boolean localTx = deadOwner.getAddress().equals(getLocalAddress());
- boolean broken = LockUtil.breakTransactionLock(lock, deadOwner, localTx, this);
-
- if (broken && trace)
- {
- log.trace("Broke lock for node " + node.getFqn() +
- " held by " + deadOwner);
- }
- }
-
- // Recursively unlock children
- for (Object child : node.getChildrenDirect())
- {
- removeLocksForDeadMembers((NodeSPI) child, deadMembers);
- }
- }
-
- private boolean isLockOwnerDead(Object owner, List deadMembers)
- {
- boolean result = false;
- if (owner != null && owner instanceof GlobalTransaction)
- {
- Object addr = ((GlobalTransaction) owner).getAddress();
- result = deadMembers.contains(addr);
- }
- return result;
- }
-
// ----------- End Marshalling and State Transfer -----------------------
/**
@@ -1342,155 +954,6 @@
return count;
}
- /* ---------------------- Remote method calls -------------------- */
-
- /**
- * @param mbrs
- * @param method_call
- * @param synchronous
- * @param exclude_self
- * @param timeout
- * @return
- * @throws Exception
- * @deprecated Note this is due to be moved to an interceptor.
- */
- @Deprecated
- public List callRemoteMethods(List<Address> mbrs, MethodCall method_call,
- boolean synchronous, boolean exclude_self, long
timeout)
- throws Exception
- {
- return callRemoteMethods(mbrs, method_call, synchronous ? GroupRequest.GET_ALL :
GroupRequest.GET_NONE, exclude_self, timeout);
- }
-
- @Deprecated
- public List callRemoteMethods(List<Address> mbrs, MethodCall method_call, int
mode, boolean exclude_self, long timeout)
- throws Exception
- {
- return callRemoteMethods(mbrs, method_call, mode, exclude_self, timeout, null);
- }
-
- /**
- * Overloaded to allow a finer grained control over JGroups mode
- *
- * @param mbrs
- * @param method_call
- * @param mode
- * @param exclude_self
- * @param timeout
- * @return
- * @throws Exception
- * @deprecated Note this is due to be moved to an interceptor.
- */
- @Deprecated
- public List callRemoteMethods(List<Address> mbrs, MethodCall method_call, int
mode, boolean exclude_self, long timeout, RspFilter rspFilter)
- throws Exception
- {
- int modeToUse = mode;
- int preferredMode;
- if ((preferredMode =
spi.getInvocationContext().getOptionOverrides().getGroupRequestMode()) > -1)
- modeToUse = preferredMode;
-
- RspList rsps;
- List retval;
- Vector<Address> validMembers;
-
- if (disp == null)
- {
- return null;
- }
-
- if (mbrs != null)
- validMembers = new Vector<Address>(mbrs);
- else
- {
- synchronized (members)
- {
- validMembers = new Vector<Address>(this.members);
- }
- }
-
- if (exclude_self && !validMembers.isEmpty())
- {
- Object local_addr = getLocalAddress();
- if (local_addr != null)
- {
- validMembers.remove(local_addr);
- }
- }
- if (validMembers.isEmpty())
- {
- if (trace)
- {
- log.trace("destination list is empty, discarding call");
- }
- return null;
- }
-
- if (trace)
- {
- log.trace("callRemoteMethods(): valid members are " + validMembers +
" methods: " + method_call);
- }
-
- if (channel.flushSupported())
- {
- if (!flushBlockGate.await(configuration.getStateRetrievalTimeout()))
- throw new TimeoutException("State retrieval timed out waiting for flush
unblock.");
- }
- rsps = rspFilter == null
- ? disp.callRemoteMethods(validMembers, method_call, modeToUse, timeout,
buddyManager != null && buddyManager.isEnabled())
- : disp.callRemoteMethods(validMembers, method_call, modeToUse, timeout,
buddyManager != null && buddyManager.isEnabled(), false, rspFilter);
-
- // a null response is 99% likely to be due to a marshalling problem - we throw a
NSE, this needs to be changed when
- // JGroups supports
http://jira.jboss.com/jira/browse/JGRP-193
- if (rsps == null)
- {
- // return null;
- throw new NotSerializableException("RpcDispatcher returned a null. This is
most often caused by args for " + method_call.getName() + " not being
serializable.");
- }
- if (mode == GroupRequest.GET_NONE)
- {
- return Collections.EMPTY_LIST;// async case
- }
-
- if (trace)
- {
- log.trace("(" + getLocalAddress() + "): responses for method
" + method_call.getName() + ":\n" + rsps);
- }
-
- retval = new ArrayList(rsps.size());
-
- for (Rsp rsp : rsps.values())
- {
- if (rsp.wasSuspected() || !rsp.wasReceived())
- {
- CacheException ex;
- if (rsp.wasSuspected())
- {
- ex = new SuspectException("Suspected member: " +
rsp.getSender());
- }
- else
- {
- ex = new TimeoutException("Replication timeout for " +
rsp.getSender());
- }
- retval.add(new ReplicationException("rsp=" + rsp, ex));
- }
- else
- {
- Object value = rsp.getValue();
- if (value instanceof Exception && !(value instanceof
ReplicationException))
- {
- // if we have any application-level exceptions make sure we throw them!!
- if (trace) log.trace("Recieved exception'" + value +
"' from " + rsp.getSender());
- throw (Exception) value;
- }
- retval.add(value);
- }
- }
- return retval;
- }
-
- /* -------------------- End Remote method calls ------------------ */
-
/* --------------------- Callbacks -------------------------- */
/* ----- These are VERSIONED callbacks to facilitate JBCACHE-843. Also see
docs/design/DataVersion.txt --- */
@@ -2281,7 +1744,7 @@
if (backupNodeFqn == null && searchSubtrees)
{
- backupNodeFqn =
BuddyManager.getBackupFqn(BuddyManager.getGroupNameFromAddress(getLocalAddress()), fqn);
+ backupNodeFqn =
BuddyManager.getBackupFqn(BuddyManager.getGroupNameFromAddress(rpcManager.getLocalAddress()),
fqn);
}
List<NodeData> list = getNodeData(new LinkedList<NodeData>(),
(NodeSPI) actualNode);
@@ -2483,114 +1946,31 @@
StringBuilder category = new StringBuilder(getClass().getName());
if (configuration != null)
{
- String clusterName = configuration.getClusterName();
- if (clusterName != null)
+ if (rpcManager != null)
{
- category.append('.');
- category.append(clusterName);
- if (channel != null && channel.getLocalAddress() != null)
+ String clusterName = configuration.getClusterName();
+ if (clusterName != null)
{
category.append('.');
- category.append(channel.getLocalAddress().toString().replace('.',
'_'));
+ category.append(clusterName);
+ if (rpcManager.getLocalAddress() != null)
+ {
+ category.append('.');
+
category.append(rpcManager.getLocalAddress().toString().replace('.',
'_'));
+ }
}
}
+ else
+ {
+ // we're in LOCAL mode
+ category.append("_LOCAL");
+ }
}
// replace .s with _s otherwise Log4J will strip them out
log = LogFactory.getLog(category.toString());
trace = log.isTraceEnabled();
}
- /**
- * Kills the JGroups channel; an unclean channel disconnect
- */
- public void killChannel()
- {
- if (channel != null)
- {
- channel.disconnect();
- channel.close();
- }
- }
-
- /*----------------------- MembershipListener ------------------------*/
-
- protected class MembershipListenerAdaptor implements ExtendedMembershipListener
- {
-
- public void viewAccepted(View new_view)
- {
- Vector<Address> new_mbrs = new_view.getMembers();
- if (log.isInfoEnabled()) log.info("viewAccepted(): " + new_view);
- synchronized (members)
- {
- boolean needNotification = false;
- if (new_mbrs != null)
- {
- // Determine what members have been removed
- // and roll back any tx and break any locks
- Vector<Address> removed = new Vector<Address>(members);
- removed.removeAll(new_mbrs);
- removeLocksForDeadMembers(root, removed);
-
- members.removeAllElements();
- members.addAll(new_mbrs);
-
- needNotification = true;
- }
-
- // Now that we have a view, figure out if we are the coordinator
- coordinator = (members.size() != 0 &&
members.get(0).equals(getLocalAddress()));
-
- // now notify listeners - *after* updating the coordinator. - JBCACHE-662
- if (needNotification && notifier != null)
- {
- InvocationContext ctx = spi.getInvocationContext();
- notifier.notifyViewChange(new_view, ctx);
- }
-
- // Wake up any threads that are waiting to know who the members
- // are so they can figure out who the coordinator is
- members.notifyAll();
- }
- }
-
- /**
- * Called when a member is suspected.
- */
- public void suspect(Address suspected_mbr)
- {
- }
-
- /**
- * Indicates that a channel has received a BLOCK event from FLUSH protocol.
- */
- public void block()
- {
- flushBlockGate.close();
- if (log.isDebugEnabled()) log.debug("Block received at " +
getLocalAddress());
-
- remoteDelegate.block();
-
- if (log.isDebugEnabled()) log.debug("Block processed at " +
getLocalAddress());
- }
-
- /**
- * Indicates that a channel has received a UNBLOCK event from FLUSH protocol.
- */
- public void unblock()
- {
- if (log.isDebugEnabled()) log.debug("UnBlock received at " +
getLocalAddress());
-
- remoteDelegate.unblock();
-
- if (log.isDebugEnabled()) log.debug("UnBlock processed at " +
getLocalAddress());
- flushBlockGate.open();
- }
-
- }
-
- /*------------------- End of MembershipListener ----------------------*/
-
/* ------------------------------ Private methods --------------------------- */
/**
@@ -2712,7 +2092,7 @@
GlobalTransaction gtx = transactionTable.get(tx);
if (gtx == null && createIfNotExists)
{
- Address addr = getLocalAddress();
+ Address addr = rpcManager.getLocalAddress();
gtx = GlobalTransaction.create(addr);
transactionTable.put(tx, gtx);
TransactionEntry ent = configuration.isNodeLockingOptimistic() ? new
OptimisticTransactionEntry() : new TransactionEntry();
@@ -2862,89 +2242,6 @@
return toReturn;
}
- private void initialiseChannelAndRpcDispatcher() throws CacheException
- {
- channel = configuration.getRuntimeConfig().getChannel();
- if (channel == null)
- {
- // Try to create a multiplexer channel
- channel = getMultiplexerChannel();
-
- if (channel != null)
- {
- ReflectionUtil.setValue(configuration, "accessible", true);
- configuration.setUsingMultiplexer(true);
- if (log.isDebugEnabled())
- {
- log.debug("Created Multiplexer Channel for cache cluster " +
configuration.getClusterName() +
- " using stack " + configuration.getMultiplexerStack());
- }
- }
- else
- {
-
- try
- {
- if (configuration.getClusterConfig() == null)
- {
- log.debug("setting cluster properties to default value");
- channel = new JChannel(configuration.getDefaultClusterConfig());
- }
- else
- {
- if (trace)
- {
- log.trace("Cache cluster properties: " +
configuration.getClusterConfig());
- }
- channel = new JChannel(configuration.getClusterConfig());
- }
- }
- catch (ChannelException el)
- {
- el.printStackTrace();
- }
- }
-
- configuration.getRuntimeConfig().setChannel(channel);
- }
-
- channel.setOpt(Channel.AUTO_RECONNECT, true);
- channel.setOpt(Channel.AUTO_GETSTATE, true);
- channel.setOpt(Channel.BLOCK, true);
-
- // always use the InactiveRegionAwareRpcDispatcher - exceptions due to regions not
being active should not propagate to remote
- // nodes as errors. - Manik
- disp = new InactiveRegionAwareRpcDispatcher(channel, messageListener, new
MembershipListenerAdaptor(), remoteDelegate);
-
- disp.setRequestMarshaller(marshaller);
- disp.setResponseMarshaller(marshaller);
-
- if (trace) log.trace("Started with RpcDispatcher " + disp);
- }
-
- private JChannel getMultiplexerChannel() throws CacheException
- {
- String stackName = configuration.getMultiplexerStack();
-
- RuntimeConfig rtc = configuration.getRuntimeConfig();
- ChannelFactory channelFactory = rtc.getMuxChannelFactory();
- JChannel muxchannel = null;
-
- if (channelFactory != null)
- {
- try
- {
- muxchannel = (JChannel) channelFactory.createMultiplexerChannel(stackName,
configuration.getClusterName());
- }
- catch (Exception e)
- {
- throw new CacheException("Failed to create multiplexed channel using
stack " + stackName, e);
- }
- }
-
- return muxchannel;
- }
-
// ================== methods to implement Cache and CacheSPI interfaces
============================
public List<Interceptor> getInterceptorChain()
Modified: core/trunk/src/main/java/org/jboss/cache/CacheSPI.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/CacheSPI.java 2008-01-17 15:22:36 UTC (rev
5160)
+++ core/trunk/src/main/java/org/jboss/cache/CacheSPI.java 2008-01-17 22:34:36 UTC (rev
5161)
@@ -18,7 +18,6 @@
import org.jboss.cache.statetransfer.StateTransferManager;
import org.jboss.cache.transaction.GlobalTransaction;
import org.jboss.cache.transaction.TransactionTable;
-import org.jgroups.Address;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
@@ -285,12 +284,6 @@
*/
Set<Fqn> getInternalFqns();
- @Deprecated
- void fetchPartialState(List<Address> members, Fqn subtreeRoot) throws
Exception;
-
- @Deprecated
- void fetchPartialState(List<Address> members, Fqn subtreeRoot, Fqn
integrationPoint) throws Exception;
-
int getNumberOfLocksHeld();
/**
Modified: core/trunk/src/main/java/org/jboss/cache/RPCManager.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/RPCManager.java 2008-01-17 15:22:36 UTC (rev
5160)
+++ core/trunk/src/main/java/org/jboss/cache/RPCManager.java 2008-01-17 22:34:36 UTC (rev
5161)
@@ -6,24 +6,120 @@
import java.util.List;
+/**
+ * Provides a mechanism for communicating with other caches in the cluster. For now this
is based on JGroups as an underlying
+ * transport, and in future more transport options may become available.
+ * <p/>
+ * Implementations have a simple lifecycle:
+ * <ul>
+ * <li>start() - starts the underlying channel based on configuration options
injected, and connects the channel</li>
+ * <li>disconnect() - disconnects the channel</li>
+ * <li>stop() - stops the dispatcher and releases resources</li>
+ * </ul>
+ *
+ * @author Manik Surtani
+ * @since 2.1.0
+ */
public interface RPCManager
{
/**
- * The same as {@link
#callRemoteMethods(java.util.List,org.jboss.cache.marshall.MethodCall,int,boolean,long)}
except that it adds a JGroups
- * {@link org.jgroups.blocks.RspFilter} to the list of parameters, which is used to
filter results.
+ * Disconnects and closes the underlying JGroups channel.
*/
- public List callRemoteMethods(List<Address> recipients, MethodCall methodCall,
int mode, boolean excludeSelf, long timeout, RspFilter responseFilter) throws Exception;
+ void disconnect();
- public List callRemoteMethods(List<Address> recipients, MethodCall methodCall,
int mode, boolean excludeSelf, long timeout) throws Exception;
+ /**
+ * Stops the RPCDispatcher and frees resources. Closes and disconnects the underlying
JGroups channel if this is
+ * still open/connected.
+ */
+ void stop();
- public boolean isCoordinator();
+ /**
+ * Starts the RPCManager by connecting the underlying JGroups channel (if configured
for replication). Connecting
+ * the channel may also involve state transfer (if configured) so the interceptor
chain should be started and
+ * available before this method is called.
+ */
+ void start();
- public Address getCoordinator();
+ /**
+ * Invokes an RPC call on other caches in the cluster.
+ *
+ * @param recipients a list of Addresses to invoke the call on. If this is null,
the call is broadcast to the entire cluster.
+ * @param methodCall the method call to invoke
+ * @param mode the group request mode to use. See {@link
org.jgroups.blocks.GroupRequest}.
+ * @param excludeSelf if true, the message is not looped back to the originator.
+ * @param timeout a timeout after which to throw a replication exception.
+ * @param responseFilter a response filter with which to filter out
failed/unwanted/invalid responses.
+ * @return a list of responses from each member contacted.
+ * @throws Exception in the event of problems.
+ */
+ List<Object> callRemoteMethods(List<Address> recipients, MethodCall
methodCall, int mode, boolean excludeSelf, long timeout, RspFilter responseFilter) throws
Exception;
- public List callRemoteMethods(List<Address> recipients, MethodCall methodCall,
boolean synchronous, boolean excludeSelf, int timeout) throws Exception;
+ /**
+ * Invokes an RPC call on other caches in the cluster.
+ *
+ * @param recipients a list of Addresses to invoke the call on. If this is null, the
call is broadcast to the entire cluster.
+ * @param methodCall the method call to invoke
+ * @param mode the group request mode to use. See {@link
org.jgroups.blocks.GroupRequest}.
+ * @param excludeSelf if true, the message is not looped back to the originator.
+ * @param timeout a timeout after which to throw a replication exception.
+ * @return a list of responses from each member contacted.
+ * @throws Exception in the event of problems.
+ */
+ List<Object> callRemoteMethods(List<Address> recipients, MethodCall
methodCall, int mode, boolean excludeSelf, long timeout) throws Exception;
/**
- * @return Returns the replication queue (if one is used), null otherwise.
+ * Invokes an RPC call on other caches in the cluster.
+ *
+ * @param recipients a list of Addresses to invoke the call on. If this is null, the
call is broadcast to the entire cluster.
+ * @param methodCall the method call to invoke
+ * @param synchronous if true, sets group request mode to {@link
org.jgroups.blocks.GroupRequest#GET_ALL}, and if false sets it to {@link
org.jgroups.blocks.GroupRequest#GET_NONE}.
+ * @param excludeSelf if true, the message is not looped back to the originator.
+ * @param timeout a timeout after which to throw a replication exception.
+ * @return a list of responses from each member contacted.
+ * @throws Exception in the event of problems.
*/
- public ReplicationQueue getReplicationQueue();
+ List<Object> callRemoteMethods(List<Address> recipients, MethodCall
methodCall, boolean synchronous, boolean excludeSelf, int timeout) throws Exception;
+
+ /**
+ * @return true if the current Channel is the coordinator of the cluster.
+ */
+ boolean isCoordinator();
+
+ /**
+ * @return the Address of the current coordinator.
+ */
+ Address getCoordinator();
+
+ /**
+ * Retrieves the local JGroups channel's address
+ *
+ * @return an Address
+ */
+ Address getLocalAddress();
+
+ /**
+ * Returns a defensively copied list of members in the current cluster view.
+ */
+ List<Address> getMembers();
+
+ /**
+ * Retrieves partial state from remote instances.
+ *
+ * @param sources sources to consider for a state transfer
+ * @param sourceTarget Fqn on source to retrieve state for
+ * @param integrationTarget integration point on local cache to apply state
+ * @throws Exception in the event of problems
+ */
+ void fetchPartialState(List<Address> sources, Fqn sourceTarget, Fqn
integrationTarget) throws Exception;
+
+ /**
+ * Retrieves partial state from remote instances.
+ *
+ * @param sources sources to consider for a state transfer
+ * @param subtree Fqn subtree to retrieve. Will be integrated at the same point.
+ * @throws Exception in the event of problems
+ */
+ void fetchPartialState(List<Address> sources, Fqn subtree) throws Exception;
+
+
}
Modified: core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java 2008-01-17 15:22:36 UTC
(rev 5160)
+++ core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java 2008-01-17 22:34:36 UTC
(rev 5161)
@@ -6,12 +6,52 @@
*/
package org.jboss.cache;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.cache.buddyreplication.BuddyManager;
+import org.jboss.cache.config.Configuration;
+import org.jboss.cache.config.RuntimeConfig;
+import org.jboss.cache.factories.annotations.ComponentName;
import org.jboss.cache.factories.annotations.Inject;
+import org.jboss.cache.factories.annotations.Stop;
+import org.jboss.cache.invocation.RemoteCacheInvocationDelegate;
+import org.jboss.cache.loader.CacheLoaderManager;
+import org.jboss.cache.lock.LockUtil;
+import org.jboss.cache.lock.NodeLock;
+import org.jboss.cache.lock.TimeoutException;
+import org.jboss.cache.marshall.InactiveRegionAwareRpcDispatcher;
+import org.jboss.cache.marshall.Marshaller;
import org.jboss.cache.marshall.MethodCall;
+import org.jboss.cache.notifications.Notifier;
+import org.jboss.cache.remoting.jgroups.CacheMessageListener;
+import org.jboss.cache.statetransfer.StateTransferManager;
+import org.jboss.cache.transaction.GlobalTransaction;
+import org.jboss.cache.transaction.TransactionTable;
+import org.jboss.cache.util.ThreadGate;
+import org.jboss.cache.util.reflect.ReflectionUtil;
import org.jgroups.Address;
+import org.jgroups.Channel;
+import org.jgroups.ChannelException;
+import org.jgroups.ChannelFactory;
+import org.jgroups.ExtendedMembershipListener;
+import org.jgroups.JChannel;
+import org.jgroups.StateTransferException;
+import org.jgroups.View;
+import org.jgroups.blocks.GroupRequest;
+import org.jgroups.blocks.RpcDispatcher;
import org.jgroups.blocks.RspFilter;
+import org.jgroups.util.Rsp;
+import org.jgroups.util.RspList;
+import javax.transaction.TransactionManager;
+import java.io.NotSerializableException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
import java.util.List;
+import java.util.Set;
+import java.util.Vector;
/**
* Manager that handles all RPC calls between JBoss Cache instances
@@ -20,61 +60,603 @@
*/
public class RPCManagerImpl implements RPCManager
{
- private CacheImpl c;
+ private Channel channel;
+ private Log log = LogFactory.getLog(RPCManagerImpl.class);
+ private List<Address> members = new LinkedList<Address>();
+ /**
+ * True if this Cache is the coordinator.
+ */
+ private volatile boolean coordinator = false;
+ /**
+ * Thread gate used to block Dispatcher during JGroups FLUSH protocol
+ */
+ private final ThreadGate flushBlockGate = new ThreadGate();
+ /**
+ * JGroups RpcDispatcher in use.
+ */
+ private RpcDispatcher disp = null;
/**
- * Empty ctor for mock object creation/unit testing
+ * JGroups message listener.
*/
- public RPCManagerImpl()
+ private CacheMessageListener messageListener;
+ private Configuration configuration;
+ private Notifier notifier;
+ private CacheSPI spi;
+ private boolean trace = log.isTraceEnabled();
+ private BuddyManager buddyManager;
+ private RemoteCacheInvocationDelegate remoteDelegate;
+ private CacheLoaderManager cacheLoaderManager;
+ private Marshaller marshaller;
+ private TransactionManager txManager;
+ private TransactionTable txTable;
+
+
+ @Inject
+ private void setupDependencies(CacheMessageListener messageListener, Configuration
configuration,
+ Notifier notifier, CacheSPI spi, BuddyManager
buddyManager, Marshaller marshaller,
+
@ComponentName("remoteDelegate")RemoteCacheInvocationDelegate remoteDelegate,
+ CacheLoaderManager cacheLoaderManager, TransactionTable
txTable, TransactionManager txManager)
{
+ this.messageListener = messageListener;
+ this.configuration = configuration;
+ this.notifier = notifier;
+ this.spi = spi;
+ this.buddyManager = buddyManager;
+ this.remoteDelegate = remoteDelegate;
+ this.cacheLoaderManager = cacheLoaderManager;
+ this.marshaller = marshaller;
+ this.txManager = txManager;
+ this.txTable = txTable;
}
- @Inject
- private void setupDependencies(CacheImpl c)
+ // ------------ START: Lifecycle methods ------------
+
+ // This is called manually, rather than by using @Start, since it needs to be called
AFTER all other components are started.
+
+ public void start()
{
- this.c = c;
+ switch (configuration.getCacheMode())
+ {
+ case LOCAL:
+ log.debug("cache mode is local, will not create the channel");
+ break;
+ case REPL_SYNC:
+ case REPL_ASYNC:
+ case INVALIDATION_ASYNC:
+ case INVALIDATION_SYNC:
+ if (log.isDebugEnabled()) log.debug("Cache mode is " +
configuration.getCacheMode());
+
+ initialiseChannelAndRpcDispatcher();
+
+ if (shouldFetchStateOnStartup())
+ {
+ try
+ {
+ long start = System.currentTimeMillis();
+ // connect and state transfer
+ channel.connect(configuration.getClusterName(), null, null,
configuration.getStateRetrievalTimeout());
+ //if I am not the only and the first member than wait for a state to
arrive
+ if (getMembers().size() > 1) messageListener.waitForState();
+
+ if (log.isDebugEnabled())
+ log.debug("connected, state was retrieved successfully (in
" + (System.currentTimeMillis() - start) + " milliseconds)");
+ }
+ catch (StateTransferException ste)
+ {
+ // make sure we disconnect from the channel before we throw this
exception!
+ // JBCACHE-761
+ disconnect();
+ throw new CacheException("Unable to fetch state on startup",
ste);
+ }
+ catch (ChannelException e)
+ {
+ throw new CacheException("Unable to connect to JGroups
channel", e);
+ }
+ catch (Exception ex)
+ {
+ throw new CacheException("Unable to fetch state on startup",
ex);
+ }
+ }
+ else
+ {
+ //otherwise just connect
+ try
+ {
+ channel.connect(configuration.getClusterName());
+ }
+ catch (ChannelException e)
+ {
+ throw new CacheException("Unable to connect to JGroups
channel", e);
+ }
+ }
+ if (log.isInfoEnabled()) log.info("Cache local address is " +
getLocalAddress());
+ }
}
- public RPCManagerImpl(CacheSPI c)
+ public void disconnect()
{
- this.c = (CacheImpl) c;
+ if (channel != null && channel.isOpen())
+ {
+ log.info("Disconnecting and closing the Channel");
+ channel.disconnect();
+ channel.close();
+ }
}
- // for now, we delegate RPC calls to deprecated methods in CacheImpl.
+ @Stop
+ public void stop()
+ {
+ try
+ {
+ disconnect();
+ }
+ catch (Exception toLog)
+ {
+ log.error("Problem closing channel; setting it to null", toLog);
+ }
- @SuppressWarnings("deprecation")
- public List callRemoteMethods(List<Address> recipients, MethodCall methodCall,
int mode, boolean excludeSelf, long timeout, RspFilter responseFilter) throws Exception
+ channel = null;
+ configuration.getRuntimeConfig().setChannel(null);
+ if (disp != null)
+ {
+ log.info("Stopping the RpcDispatcher");
+ disp.stop();
+ disp = null;
+ }
+ if (members != null)
+ {
+ synchronized (members)
+ {
+ members.clear();
+ }
+ }
+
+ coordinator = false;
+
+ disp = null;
+ }
+
+ /**
+ * @return true if we need to fetch state on startup. I.e., initiate a state
transfer.
+ */
+ private boolean shouldFetchStateOnStartup()
{
- return c.callRemoteMethods(recipients, methodCall, mode, excludeSelf, timeout,
responseFilter);
+ boolean loaderFetch = cacheLoaderManager != null &&
cacheLoaderManager.isFetchPersistentState();
+ return !configuration.isInactiveOnStartup() && buddyManager == null
&& (configuration.isFetchInMemoryState() || loaderFetch);
}
- @SuppressWarnings("deprecation")
- public List callRemoteMethods(List<Address> recipients, MethodCall methodCall,
int mode, boolean excludeSelf, long timeout) throws Exception
+ private void initialiseChannelAndRpcDispatcher() throws CacheException
{
- return c.callRemoteMethods(recipients, methodCall, mode, excludeSelf, timeout);
+ channel = configuration.getRuntimeConfig().getChannel();
+ if (channel == null)
+ {
+ // Try to create a multiplexer channel
+ channel = getMultiplexerChannel();
+
+ if (channel != null)
+ {
+ ReflectionUtil.setValue(configuration, "accessible", true);
+ configuration.setUsingMultiplexer(true);
+ if (log.isDebugEnabled())
+ log.debug("Created Multiplexer Channel for cache cluster " +
configuration.getClusterName() + " using stack " +
configuration.getMultiplexerStack());
+ }
+ else
+ {
+ try
+ {
+ if (configuration.getClusterConfig() == null)
+ {
+ log.debug("setting cluster properties to default value");
+ channel = new JChannel(configuration.getDefaultClusterConfig());
+ }
+ else
+ {
+ if (trace)
+ {
+ log.trace("Cache cluster properties: " +
configuration.getClusterConfig());
+ }
+ channel = new JChannel(configuration.getClusterConfig());
+ }
+ }
+ catch (ChannelException el)
+ {
+ el.printStackTrace();
+ }
+ }
+
+ configuration.getRuntimeConfig().setChannel(channel);
+ }
+
+ channel.setOpt(Channel.AUTO_RECONNECT, true);
+ channel.setOpt(Channel.AUTO_GETSTATE, true);
+ channel.setOpt(Channel.BLOCK, true);
+
+ // always use the InactiveRegionAwareRpcDispatcher - exceptions due to regions not
being active should not propagate to remote
+ // nodes as errors. - Manik
+ disp = new InactiveRegionAwareRpcDispatcher(channel, messageListener, new
MembershipListenerAdaptor(), remoteDelegate);
+
+ disp.setRequestMarshaller(marshaller);
+ disp.setResponseMarshaller(marshaller);
}
+ private JChannel getMultiplexerChannel() throws CacheException
+ {
+ String stackName = configuration.getMultiplexerStack();
+
+ RuntimeConfig rtc = configuration.getRuntimeConfig();
+ ChannelFactory channelFactory = rtc.getMuxChannelFactory();
+ JChannel muxchannel = null;
+
+ if (channelFactory != null)
+ {
+ try
+ {
+ muxchannel = (JChannel) channelFactory.createMultiplexerChannel(stackName,
configuration.getClusterName());
+ }
+ catch (Exception e)
+ {
+ throw new CacheException("Failed to create multiplexed channel using
stack " + stackName, e);
+ }
+ }
+
+ return muxchannel;
+ }
+
+
+ private void removeLocksForDeadMembers(NodeSPI node, List deadMembers)
+ {
+ Set<GlobalTransaction> deadOwners = new HashSet<GlobalTransaction>();
+ NodeLock lock = node.getLock();
+ Object owner = lock.getWriterOwner();
+
+ if (isLockOwnerDead(owner, deadMembers))
+ {
+ deadOwners.add((GlobalTransaction) owner);
+ }
+
+ for (Object readOwner : lock.getReaderOwners())
+ {
+ if (isLockOwnerDead(readOwner, deadMembers))
+ {
+ deadOwners.add((GlobalTransaction) readOwner);
+ }
+ }
+
+ for (GlobalTransaction deadOwner : deadOwners)
+ {
+ boolean localTx = deadOwner.getAddress().equals(getLocalAddress());
+ boolean broken = LockUtil.breakTransactionLock(lock, deadOwner, localTx,
txTable, txManager);
+
+ if (broken && trace) log.trace("Broke lock for node " +
node.getFqn() + " held by " + deadOwner);
+ }
+
+ // Recursively unlock children
+ for (Object child : node.getChildrenDirect())
+ {
+ removeLocksForDeadMembers((NodeSPI) child, deadMembers);
+ }
+ }
+
+ private boolean isLockOwnerDead(Object owner, List deadMembers)
+ {
+ boolean result = false;
+ if (owner != null && owner instanceof GlobalTransaction)
+ {
+ Object addr = ((GlobalTransaction) owner).getAddress();
+ result = deadMembers.contains(addr);
+ }
+ return result;
+ }
+
+ // ------------ END: Lifecycle methods ------------
+
+ // ------------ START: RPC call methods ------------
+
+ public List<Object> callRemoteMethods(List<Address> recipients, MethodCall
methodCall, int mode, boolean excludeSelf, long timeout) throws Exception
+ {
+ return callRemoteMethods(recipients, methodCall, mode, excludeSelf, timeout,
null);
+ }
+
+ public List<Object> callRemoteMethods(List<Address> recipients, MethodCall
methodCall, boolean synchronous, boolean excludeSelf, int timeout) throws Exception
+ {
+ return callRemoteMethods(recipients, methodCall, synchronous ? GroupRequest.GET_ALL
: GroupRequest.GET_NONE, excludeSelf, timeout);
+ }
+
+ public List<Object> callRemoteMethods(List<Address> recipients, MethodCall
methodCall, int mode, boolean excludeSelf, long timeout, RspFilter responseFilter) throws
Exception
+ {
+ int modeToUse = mode;
+ int preferredMode;
+ if ((preferredMode =
spi.getInvocationContext().getOptionOverrides().getGroupRequestMode()) > -1)
+ modeToUse = preferredMode;
+
+ RspList rsps;
+ List<Object> retval;
+ Vector<Address> validMembers;
+
+ if (disp == null)
+ {
+ return null;
+ }
+
+ if (recipients != null)
+ validMembers = new Vector<Address>(recipients);
+ else
+ {
+ synchronized (members)
+ {
+ validMembers = new Vector<Address>(members);
+ }
+ }
+
+ if (excludeSelf && !validMembers.isEmpty())
+ {
+ Address local_addr = getLocalAddress();
+ if (local_addr != null) validMembers.remove(local_addr);
+ }
+
+ if (validMembers.isEmpty())
+ {
+ if (trace) log.trace("destination list is empty, discarding call");
+ return null;
+ }
+
+ if (trace) log.trace("callRemoteMethods(): valid members are " +
validMembers + " methods: " + methodCall);
+
+ if (channel.flushSupported())
+ {
+ if (!flushBlockGate.await(configuration.getStateRetrievalTimeout()))
+ throw new TimeoutException("State retrieval timed out waiting for flush
unblock.");
+ }
+ rsps = responseFilter == null
+ ? disp.callRemoteMethods(validMembers, methodCall, modeToUse, timeout,
buddyManager != null && buddyManager.isEnabled())
+ : disp.callRemoteMethods(validMembers, methodCall, modeToUse, timeout,
buddyManager != null && buddyManager.isEnabled(), false, responseFilter);
+
+ // a null response is 99% likely to be due to a marshalling problem - we throw a
NSE, this needs to be changed when
+ // JGroups supports
http://jira.jboss.com/jira/browse/JGRP-193
+ if (rsps == null)
+ {
+ // return null;
+ throw new NotSerializableException("RpcDispatcher returned a null. This is
most often caused by args for " + methodCall.getName() + " not being
serializable.");
+ }
+ if (mode == GroupRequest.GET_NONE) return Collections.emptyList();// async case
+
+ if (trace) log.trace("(" + getLocalAddress() + "): responses for
method " + methodCall.getName() + ":\n" + rsps);
+
+ retval = new ArrayList<Object>(rsps.size());
+
+ for (Rsp rsp : rsps.values())
+ {
+ if (rsp.wasSuspected() || !rsp.wasReceived())
+ {
+ CacheException ex;
+ if (rsp.wasSuspected())
+ {
+ ex = new SuspectException("Suspected member: " +
rsp.getSender());
+ }
+ else
+ {
+ ex = new TimeoutException("Replication timeout for " +
rsp.getSender());
+ }
+ retval.add(new ReplicationException("rsp=" + rsp, ex));
+ }
+ else
+ {
+ Object value = rsp.getValue();
+ if (value instanceof Exception && !(value instanceof
ReplicationException))
+ {
+ // if we have any application-level exceptions make sure we throw them!!
+ if (trace) log.trace("Recieved exception'" + value +
"' from " + rsp.getSender());
+ throw (Exception) value;
+ }
+ retval.add(value);
+ }
+ }
+ return retval;
+ }
+
+ // ------------ END: RPC call methods ------------
+
+ // ------------ START: Partial state transfer methods ------------
+
+ public void fetchPartialState(List<Address> sources, Fqn sourceTarget, Fqn
integrationTarget) throws Exception
+ {
+ String encodedStateId = sourceTarget + StateTransferManager.PARTIAL_STATE_DELIMITER
+ integrationTarget;
+ fetchPartialState(sources, encodedStateId);
+ }
+
+ public void fetchPartialState(List<Address> sources, Fqn subtree) throws
Exception
+ {
+ if (subtree == null)
+ {
+ throw new IllegalArgumentException("Cannot fetch partial state. Null
subtree.");
+ }
+ fetchPartialState(sources, subtree.toString());
+ }
+
+ private void fetchPartialState(List<Address> sources, String stateId) throws
Exception
+ {
+ if (sources == null || sources.isEmpty() || stateId == null)
+ {
+ // should this really be throwing an exception? Are there valid use cases where
partial state may not be available? - Manik
+ // Yes -- cache is configured LOCAL but app doesn't know it -- Brian
+ //throw new IllegalArgumentException("Cannot fetch partial state, targets
are " + sources + " and stateId is " + stateId);
+ if (log.isWarnEnabled())
+ log.warn("Cannot fetch partial state, targets are " + sources +
" and stateId is " + stateId);
+ return;
+ }
+
+ List<Address> targets = new LinkedList<Address>(sources);
+
+ //skip *this* node as a target
+ targets.remove(getLocalAddress());
+
+ if (targets.isEmpty())
+ {
+ // Definitely no exception here -- this happens every time the 1st node in the
+ // cluster activates a region!! -- Brian
+ if (log.isDebugEnabled()) log.debug("Cannot fetch partial state. There are
no target members specified");
+ return;
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Node " + getLocalAddress() + " fetching partial state
" + stateId + " from members " + targets);
+ boolean successfulTransfer = false;
+ for (Address target : targets)
+ {
+ if (log.isDebugEnabled())
+ log.debug("Node " + getLocalAddress() + " fetching partial
state " + stateId + " from member " + target);
+ messageListener.setStateSet(false);
+ successfulTransfer = channel.getState(target, stateId,
configuration.getStateRetrievalTimeout());
+ if (successfulTransfer)
+ {
+ try
+ {
+ messageListener.waitForState();
+ }
+ catch (Exception transferFailed)
+ {
+ successfulTransfer = false;
+ }
+ }
+ if (log.isDebugEnabled())
+ log.debug("Node " + getLocalAddress() + " fetching partial
state " + stateId + " from member " + target + (successfulTransfer ? "
successful" : " failed"));
+ if (successfulTransfer) break;
+ }
+
+ if (!successfulTransfer)
+ {
+ if (log.isDebugEnabled())
+ log.debug("Node " + getLocalAddress() + " could not fetch
partial state " + stateId + " from any member " + targets);
+ }
+ }
+
+ // ------------ END: Partial state transfer methods ------------
+
+ // ------------ START: Informational methods ------------
+
+ public Address getLocalAddress()
+ {
+ return channel != null ? channel.getLocalAddress() : null;
+ }
+
+ public List<Address> getMembers()
+ {
+ synchronized (members)
+ {
+ return new ArrayList<Address>(members);
+ }
+ }
+
public boolean isCoordinator()
{
- return c.isCoordinator();
+ return coordinator;
}
public Address getCoordinator()
{
- return c.getCoordinator();
+ if (channel == null)
+ {
+ return null;
+ }
+
+ synchronized (members)
+ {
+ while (members.isEmpty())
+ {
+ log.debug("getCoordinator(): waiting on viewAccepted()");
+ try
+ {
+ members.wait();
+ }
+ catch (InterruptedException e)
+ {
+ log.error("getCoordinator(): Interrupted while waiting for members to
be set", e);
+ break;
+ }
+ }
+ return members.size() > 0 ? members.get(0) : null;
+ }
}
- @SuppressWarnings("deprecation")
- public List callRemoteMethods(List<Address> recipients, MethodCall methodCall,
boolean synchronous, boolean excludeSelf, int timeout) throws Exception
+ // ------------ END: Informational methods ------------
+
+ /*----------------------- MembershipListener ------------------------*/
+
+ protected class MembershipListenerAdaptor implements ExtendedMembershipListener
{
- return c.callRemoteMethods(recipients, methodCall, synchronous, excludeSelf,
timeout);
+
+ public void viewAccepted(View new_view)
+ {
+ Vector<Address> new_mbrs = new_view.getMembers();
+ if (log.isInfoEnabled()) log.info("viewAccepted(): " + new_view);
+ synchronized (members)
+ {
+ boolean needNotification = false;
+ if (new_mbrs != null)
+ {
+ // Determine what members have been removed
+ // and roll back any tx and break any locks
+ Vector<Address> removed = new Vector<Address>(members);
+ removed.removeAll(new_mbrs);
+ removeLocksForDeadMembers(spi.getRoot(), removed);
+
+ members.clear();
+ members.addAll(new_mbrs);
+
+ needNotification = true;
+ }
+
+ // Now that we have a view, figure out if we are the coordinator
+ coordinator = (members.size() != 0 &&
members.get(0).equals(getLocalAddress()));
+
+ // now notify listeners - *after* updating the coordinator. - JBCACHE-662
+ if (needNotification && notifier != null)
+ {
+ InvocationContext ctx = spi.getInvocationContext();
+ notifier.notifyViewChange(new_view, ctx);
+ }
+
+ // Wake up any threads that are waiting to know who the members
+ // are so they can figure out who the coordinator is
+ members.notifyAll();
+ }
+ }
+
+ /**
+ * Called when a member is suspected.
+ */
+ public void suspect(Address suspected_mbr)
+ {
+ }
+
+ /**
+ * Indicates that a channel has received a BLOCK event from FLUSH protocol.
+ */
+ public void block()
+ {
+ flushBlockGate.close();
+ if (log.isDebugEnabled()) log.debug("Block received at " +
getLocalAddress());
+
+ remoteDelegate.block();
+
+ if (log.isDebugEnabled()) log.debug("Block processed at " +
getLocalAddress());
+ }
+
+ /**
+ * Indicates that a channel has received a UNBLOCK event from FLUSH protocol.
+ */
+ public void unblock()
+ {
+ if (log.isDebugEnabled()) log.debug("UnBlock received at " +
getLocalAddress());
+
+ remoteDelegate.unblock();
+
+ if (log.isDebugEnabled()) log.debug("UnBlock processed at " +
getLocalAddress());
+ flushBlockGate.open();
+ }
+
}
- /**
- * @return Returns the replication queue (if one is used), null otherwise.
- */
- public ReplicationQueue getReplicationQueue()
- {
- return c.getReplicationQueue();
- }
-}
+ /*------------------- End of MembershipListener ----------------------*/
+}
\ No newline at end of file
Modified: core/trunk/src/main/java/org/jboss/cache/RegionManager.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/RegionManager.java 2008-01-17 15:22:36 UTC
(rev 5160)
+++ core/trunk/src/main/java/org/jboss/cache/RegionManager.java 2008-01-17 22:34:36 UTC
(rev 5161)
@@ -55,11 +55,13 @@
protected final Set<Fqn> activationChangeNodes = Collections.synchronizedSet(new
HashSet<Fqn>());
protected Configuration configuration;
+ protected RPCManager rpcManager;
@Inject
- void injectDependencies(CacheSPI cache, Configuration configuration)
+ void injectDependencies(CacheSPI cache, Configuration configuration, RPCManager
rpcManager)
{
this.cache = cache;
+ this.rpcManager = rpcManager;
this.configuration = configuration;
}
@@ -412,7 +414,7 @@
}
List<Address> members = cache.getMembers();
- cache.fetchPartialState(members, subtreeRoot.getFqn());
+ rpcManager.fetchPartialState(members, subtreeRoot.getFqn());
}
else if (!BuddyManager.isBackupFqn(fqn))
{
@@ -440,7 +442,7 @@
subtreeRoot = cache.getRoot().getChild(buddyRoot);
cache.getInvocationContext().getOptionOverrides().setCacheModeLocal(false);
}
- cache.fetchPartialState(sources, fqn, subtreeRoot.getFqn());
+ rpcManager.fetchPartialState(sources, fqn, subtreeRoot.getFqn());
}
}
else
Modified: core/trunk/src/main/java/org/jboss/cache/ReplicationQueue.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/ReplicationQueue.java 2008-01-17 15:22:36 UTC
(rev 5160)
+++ core/trunk/src/main/java/org/jboss/cache/ReplicationQueue.java 2008-01-17 22:34:36 UTC
(rev 5161)
@@ -9,6 +9,10 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.jboss.cache.config.Configuration;
+import org.jboss.cache.factories.annotations.Inject;
+import org.jboss.cache.factories.annotations.Start;
+import org.jboss.cache.factories.annotations.Stop;
import org.jboss.cache.marshall.MethodCall;
import org.jboss.cache.marshall.MethodCallFactory;
import org.jboss.cache.marshall.MethodDeclarations;
@@ -31,8 +35,6 @@
private static Log log = LogFactory.getLog(ReplicationQueue.class);
- private CacheImpl cache = null;
-
/**
* We flush every 5 seconds. Inactive if -1 or 0
*/
@@ -57,71 +59,47 @@
* The timer task, only calls flush() when executed by Timer
*/
private MyTask task = null;
+ private RPCManager rpcManager;
+ private Configuration configuration;
+ private boolean enabled;
- public ReplicationQueue()
- {
- }
- /**
- * Constructs a new ReplicationQueue.
- */
- public ReplicationQueue(CacheImpl cache, long interval, long max_elements)
+ public boolean isEnabled()
{
- this.cache = cache;
- this.interval = interval;
- this.max_elements = max_elements;
+ return enabled;
}
- /**
- * Returns the flush interval in milliseconds.
- */
- public long getInterval()
+ @Inject
+ private void injectDependencies(RPCManager rpcManager, Configuration configuration)
{
- return interval;
+ this.rpcManager = rpcManager;
+ this.configuration = configuration;
+ enabled = configuration.isUseReplQueue() &&
(configuration.getBuddyReplicationConfig() == null ||
!configuration.getBuddyReplicationConfig().isEnabled());
}
/**
- * Sets the flush interval in milliseconds.
- */
- public void setInterval(long interval)
- {
- this.interval = interval;
- stop();
- start();
- }
-
- /**
- * Returns the maximum number of elements to hold.
- * If the maximum number is reached, flushes in the calling thread.
- */
- public long getMax_elements()
- {
- return max_elements;
- }
-
- /**
- * Sets the maximum number of elements to hold.
- */
- public void setMax_elements(long max_elements)
- {
- this.max_elements = max_elements;
- }
-
- /**
* Starts the asynchronous flush queue.
*/
+ @Start
public synchronized void start()
{
- if (interval > 0)
+ this.interval = configuration.getReplQueueInterval();
+ this.max_elements = configuration.getReplQueueMaxElements();
+ // check again
+ enabled = configuration.isUseReplQueue() &&
(configuration.getBuddyReplicationConfig() == null ||
!configuration.getBuddyReplicationConfig().isEnabled());
+ if (enabled)
{
- if (task == null)
- task = new MyTask();
- if (timer == null)
+ if (interval > 0)
{
- timer = new Timer(true);
- timer.schedule(task,
- 500, // delay before initial flush
- interval); // interval between flushes
+ if (task == null)
+ task = new MyTask();
+ if (timer == null)
+ {
+ timer = new Timer(true);
+ timer.schedule(task,
+ 500, // delay before initial flush
+ interval); // interval between flushes
+ }
}
}
}
@@ -129,6 +107,7 @@
/**
* Stops the asynchronous flush queue.
*/
+ @Stop
public synchronized void stop()
{
if (task != null)
@@ -178,7 +157,7 @@
try
{
// send to all live nodes in the cluster
- cache.getRPCManager().callRemoteMethods(null,
MethodCallFactory.create(MethodDeclarations.replicateAllMethod_id, l), false, true,
5000);
+ rpcManager.callRemoteMethods(null,
MethodCallFactory.create(MethodDeclarations.replicateAllMethod_id, l), false, true,
5000);
}
catch (Throwable t)
{
Modified: core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java 2008-01-17
15:22:36 UTC (rev 5160)
+++ core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java 2008-01-17
22:34:36 UTC (rev 5161)
@@ -240,7 +240,6 @@
}
}
- // For now, this is initialised MANUALLY from CacheImpl.internalStart()
public void init() throws CacheException
{
log.debug("Starting BuddyManager");
Modified: core/trunk/src/main/java/org/jboss/cache/factories/ComponentRegistry.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/factories/ComponentRegistry.java 2008-01-17
15:22:36 UTC (rev 5160)
+++ core/trunk/src/main/java/org/jboss/cache/factories/ComponentRegistry.java 2008-01-17
22:34:36 UTC (rev 5161)
@@ -107,6 +107,7 @@
s.add(LockTableFactory.class);
s.add(RuntimeConfigAwareFactory.class);
s.add(TransactionManagerFactory.class);
+ s.add(ReplicationQueueFactory.class);
return s;
}
Modified: core/trunk/src/main/java/org/jboss/cache/factories/EmptyConstructorFactory.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/factories/EmptyConstructorFactory.java 2008-01-17
15:22:36 UTC (rev 5160)
+++
core/trunk/src/main/java/org/jboss/cache/factories/EmptyConstructorFactory.java 2008-01-17
22:34:36 UTC (rev 5161)
@@ -19,7 +19,9 @@
* @author Manik Surtani (<a
href="mailto:manik@jboss.org">manik@jboss.org</a>)
* @since 2.1.0
*/
-@DefaultFactoryFor(classes = {StateTransferManager.class, TransactionTable.class,
RegionManager.class, Notifier.class, CacheMessageListener.class, CacheLoaderManager.class,
RemoteCacheInvocationDelegate.class, Marshaller.class, InvocationContextContainer.class})
+@DefaultFactoryFor(classes = {StateTransferManager.class, TransactionTable.class,
RegionManager.class, Notifier.class,
+ CacheMessageListener.class, CacheLoaderManager.class,
RemoteCacheInvocationDelegate.class, Marshaller.class,
+ InvocationContextContainer.class})
public class EmptyConstructorFactory extends ComponentFactory
{
@Override
Modified: core/trunk/src/main/java/org/jboss/cache/factories/InterceptorChainFactory.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/factories/InterceptorChainFactory.java 2008-01-17
15:22:36 UTC (rev 5160)
+++
core/trunk/src/main/java/org/jboss/cache/factories/InterceptorChainFactory.java 2008-01-17
22:34:36 UTC (rev 5161)
@@ -106,7 +106,10 @@
call_interceptor = createInterceptor(CallInterceptor.class);
+ // load the icInterceptor first
+ first = setFirstInterceptor(invocationCtxInterceptor);
+
if (isUsingBuddyReplication()) dataGravitatorInterceptor =
createInterceptor(DataGravitatorInterceptor.class);
lock_interceptor = createInterceptor(PessimisticLockInterceptor.class);
@@ -145,144 +148,45 @@
}
}
- // load the icInterceptor first
- if (first == null) first = invocationCtxInterceptor;
-
// load the cache management interceptor next
if (configuration.getExposeManagementStatistics())
{
- if (first == null)
- {
- first = cacheMgmtInterceptor;
- }
- else
- {
- addInterceptor(first, cacheMgmtInterceptor);
- }
+ addInterceptor(first, cacheMgmtInterceptor);
}
// load the tx interceptor
- if (first == null)
- {
- first = txInterceptor;
- }
- else
- {
- addInterceptor(first, txInterceptor);
- }
+ addInterceptor(first, txInterceptor);
- if (first == null)
- first = notificationInterceptor;
- else
- addInterceptor(first, notificationInterceptor);
+ addInterceptor(first, notificationInterceptor);
- if (repl_interceptor != null)
- {
- if (first == null)
- {
- first = repl_interceptor;
- }
- else
- {
- addInterceptor(first, repl_interceptor);
- }
- }
+ if (repl_interceptor != null) addInterceptor(first, repl_interceptor);
- if (first == null)
- {
- first = lock_interceptor;
- }
- else
- {
- addInterceptor(first, lock_interceptor);
- }
+ addInterceptor(first, lock_interceptor);
- if (unlock_interceptor != null)
- {
- if (first == null)
- {
- first = unlock_interceptor;
- }
- else
- {
- addInterceptor(first, unlock_interceptor);
- }
- }
+ if (unlock_interceptor != null) addInterceptor(first, unlock_interceptor);
if (activation_interceptor != null)
{
- if (first == null)
- {
- first = activation_interceptor;
- }
- else
- {
- addInterceptor(first, activation_interceptor);
- }
- if (first == null)
- {
- first = passivation_interceptor;
- }
- else
- {
- addInterceptor(first, passivation_interceptor);
- }
+ addInterceptor(first, activation_interceptor);
+ addInterceptor(first, passivation_interceptor);
}
if (cache_loader_interceptor != null)
{
- if (first == null)
- {
- first = cache_loader_interceptor;
- }
- else
- {
- addInterceptor(first, cache_loader_interceptor);
- }
- if (first == null)
- {
- first = cache_store_interceptor;
- }
- else
- {
- addInterceptor(first, cache_store_interceptor);
- }
+ addInterceptor(first, cache_loader_interceptor);
+ addInterceptor(first, cache_store_interceptor);
}
- if (dataGravitatorInterceptor != null)
- {
- if (first == null)
- {
- first = dataGravitatorInterceptor;
- }
- else
- {
- addInterceptor(first, dataGravitatorInterceptor);
- }
- }
+ if (dataGravitatorInterceptor != null) addInterceptor(first,
dataGravitatorInterceptor);
if (configuration.getEvictionConfig() != null &&
configuration.getEvictionConfig().isValidConfig())
{
eviction_interceptor = createInterceptor(EvictionInterceptor.class);
- if (first == null)
- {
- first = eviction_interceptor;
- }
- else
- {
- addInterceptor(first, eviction_interceptor);
- }
+ addInterceptor(first, eviction_interceptor);
}
- if (first == null)
- {
- first = call_interceptor;
- }
- else
- {
- addInterceptor(first, call_interceptor);
- }
+ addInterceptor(first, call_interceptor);
return setLastInterceptorPointer(first, call_interceptor);
}
@@ -306,6 +210,10 @@
Interceptor invocationCtxInterceptor =
createInterceptor(InvocationContextInterceptor.class);
Interceptor notificationInterceptor =
createInterceptor(NotificationInterceptor.class);
+ // load the icInterceptor first
+ first = setFirstInterceptor(invocationCtxInterceptor);
+
+
if (isUsingCacheLoaders())
{
if (configuration.getCacheLoaderConfig().isPassivation())
@@ -353,193 +261,81 @@
evictionInterceptor = createInterceptor(EvictionInterceptor.class);
}
- if (first == null) first = invocationCtxInterceptor;
-
if (configuration.getExposeManagementStatistics())
{
cacheMgmtInterceptor = createInterceptor(CacheMgmtInterceptor.class);
- if (first == null)
- {
- first = cacheMgmtInterceptor;
- }
- else
- {
- addInterceptor(first, cacheMgmtInterceptor);
- }
+ addInterceptor(first, cacheMgmtInterceptor);
}
if (txInterceptor != null)
{
- if (first == null)
- {
- first = txInterceptor;
- }
- else
- {
- addInterceptor(first, txInterceptor);
- }
+ addInterceptor(first, txInterceptor);
}
- if (first == null)
- first = notificationInterceptor;
- else
- addInterceptor(first, notificationInterceptor);
+ addInterceptor(first, notificationInterceptor);
- if (first == null)
- {
- first = replicationInterceptor;
- }
- else
- {
- addInterceptor(first, replicationInterceptor);
- }
+ addInterceptor(first, replicationInterceptor);
if (passivationInterceptor != null &&
!configuration.getCacheLoaderConfig().isFetchPersistentState())
{
- if (first == null)
- {
- first = passivationInterceptor;
- }
- else
- {
- addInterceptor(first, passivationInterceptor);
- }
+ addInterceptor(first, passivationInterceptor);
}
// add the cache store interceptor here
if (cacheStoreInterceptor != null &&
!configuration.getCacheLoaderConfig().isFetchPersistentState())
{
- if (first == null)
- {
- first = cacheStoreInterceptor;
- }
- else
- {
- addInterceptor(first, cacheStoreInterceptor);
- }
+ addInterceptor(first, cacheStoreInterceptor);
}
// cache loader interceptor is only invoked if we are ready to write to the actual
tree cache
if (activationInterceptor != null)
{
- if (first == null)
- {
- first = activationInterceptor;
- }
- else
- {
- addInterceptor(first, activationInterceptor);
- }
+ addInterceptor(first, activationInterceptor);
if (configuration.getCacheLoaderConfig().isFetchPersistentState())
{
- if (first == null)
- {
- first = passivationInterceptor;
- }
- else
- {
- addInterceptor(first, passivationInterceptor);
- }
+ addInterceptor(first, passivationInterceptor);
}
}
if (cacheLoaderInterceptor != null)
{
- if (first == null)
- {
- first = cacheLoaderInterceptor;
- }
- else
- {
- addInterceptor(first, cacheLoaderInterceptor);
- }
+ addInterceptor(first, cacheLoaderInterceptor);
if (configuration.getCacheLoaderConfig().isFetchPersistentState())
{
- if (first == null)
- {
- first = cacheStoreInterceptor;
- }
- else
- {
- addInterceptor(first, cacheStoreInterceptor);
- }
+ addInterceptor(first, cacheStoreInterceptor);
}
}
if (dataGravitatorInterceptor != null)
{
- if (first == null)
- {
- first = dataGravitatorInterceptor;
- }
- else
- {
- addInterceptor(first, dataGravitatorInterceptor);
- }
+ addInterceptor(first, dataGravitatorInterceptor);
}
- if (first == null)
- {
- first = lockInterceptor;
- }
- else
- {
- addInterceptor(first, lockInterceptor);
- }
+ addInterceptor(first, lockInterceptor);
- if (first == null)
- {
- first = validationInterceptor;
- }
- else
- {
- addInterceptor(first, validationInterceptor);
- }
+ addInterceptor(first, validationInterceptor);
+ addInterceptor(first, createIfNotExistsInterceptor);
- if (first == null)
- {
- first = createIfNotExistsInterceptor;
- }
- else
- {
- addInterceptor(first, createIfNotExistsInterceptor);
- }
-
// eviction interceptor to come before the optimistic node interceptor
- if (first == null)
- {
- first = evictionInterceptor;
- }
- else
- {
- addInterceptor(first, evictionInterceptor);
- }
+ addInterceptor(first, evictionInterceptor);
- if (first == null)
- {
- first = nodeInterceptor;
- }
- else
- {
- addInterceptor(first, nodeInterceptor);
- }
+ addInterceptor(first, nodeInterceptor);
+ addInterceptor(first, invokerInterceptor);
- if (first == null)
- {
- first = invokerInterceptor;
- }
- else
- {
- addInterceptor(first, invokerInterceptor);
- }
-
return setLastInterceptorPointer(first, invokerInterceptor);
}
+ public Interceptor setFirstInterceptor(Interceptor i)
+ {
+ componentRegistry.registerComponent(Interceptor.class.getName(), i,
Interceptor.class);
+ return i;
+ }
+
+
public static List<Interceptor> asList(Interceptor interceptor)
{
if (interceptor == null)
Added: core/trunk/src/main/java/org/jboss/cache/factories/ReplicationQueueFactory.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/factories/ReplicationQueueFactory.java
(rev 0)
+++
core/trunk/src/main/java/org/jboss/cache/factories/ReplicationQueueFactory.java 2008-01-17
22:34:36 UTC (rev 5161)
@@ -0,0 +1,29 @@
+package org.jboss.cache.factories;
+
+import org.jboss.cache.ReplicationQueue;
+import org.jboss.cache.config.Configuration;
+import org.jboss.cache.factories.annotations.DefaultFactoryFor;
+
+/**
+ * RPCManager factory
+ *
+ * @author Manik Surtani (<a
href="mailto:manik@jboss.org">manik@jboss.org</a>)
+ * @since 2.1.0
+ */
+@DefaultFactoryFor(classes = ReplicationQueue.class)
+public class ReplicationQueueFactory extends EmptyConstructorFactory
+{
+ @Override
+ public <T> T construct(String componentName, Class<T> componentType)
+ {
+ if ((configuration.getCacheMode() == Configuration.CacheMode.REPL_ASYNC ||
configuration.getCacheMode() == Configuration.CacheMode.INVALIDATION_ASYNC)
+ && configuration.isUseReplQueue())
+ {
+ return super.construct(componentName, componentType);
+ }
+ else
+ {
+ return null;
+ }
+ }
+}
\ No newline at end of file
Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java 2008-01-17
15:22:36 UTC (rev 5160)
+++
core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java 2008-01-17
22:34:36 UTC (rev 5161)
@@ -6,6 +6,7 @@
import org.jboss.cache.CacheSPI;
import org.jboss.cache.InvocationContext;
import org.jboss.cache.RPCManager;
+import org.jboss.cache.ReplicationQueue;
import org.jboss.cache.buddyreplication.BuddyManager;
import org.jboss.cache.config.Configuration.CacheMode;
import org.jboss.cache.config.Option;
@@ -32,11 +33,13 @@
private RPCManager rpcManager;
private boolean usingBuddyReplication;
protected boolean defaultSynchronous;
+ private ReplicationQueue replicationQueue;
@Inject
- private void injectComponents(RPCManager rpcManager, BuddyManager buddyManager)
+ private void injectComponents(RPCManager rpcManager, BuddyManager buddyManager,
ReplicationQueue replicationQueue)
{
this.rpcManager = rpcManager;
+ this.replicationQueue = replicationQueue;
this.buddyManager = buddyManager;
usingBuddyReplication = buddyManager != null && buddyManager.isEnabled();
}
@@ -95,7 +98,7 @@
else if (te.isForceSyncReplication()) sync = true;
}
}
- if (!sync && rpcManager.getReplicationQueue() != null &&
!usingBuddyReplication)
+ if (!sync && replicationQueue != null && !usingBuddyReplication)
{
putCallOnAsyncReplicationQueue(call);
}
@@ -130,7 +133,7 @@
protected void putCallOnAsyncReplicationQueue(MethodCall call)
{
if (log.isDebugEnabled()) log.debug("Putting call " + call + " on
the replication queue.");
-
rpcManager.getReplicationQueue().add(MethodCallFactory.create(MethodDeclarations.replicateMethod_id,
call));
+
replicationQueue.add(MethodCallFactory.create(MethodDeclarations.replicateMethod_id,
call));
}
//todo info expt for this is InvocationContext, move method there
Modified:
core/trunk/src/main/java/org/jboss/cache/interceptors/InvocationContextInterceptor.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/interceptors/InvocationContextInterceptor.java 2008-01-17
15:22:36 UTC (rev 5160)
+++
core/trunk/src/main/java/org/jboss/cache/interceptors/InvocationContextInterceptor.java 2008-01-17
22:34:36 UTC (rev 5161)
@@ -37,7 +37,7 @@
boolean resumeSuspended = false;
if (trace)
- log.trace("Invoked on cache instance [" + cache.getLocalAddress() +
"] and InvocationContext [" + ctx + "]");
+ log.trace("Invoked with InvocationContext [" + ctx + "]");
if (MethodDeclarations.isBlockUnblockMethod(call.getMethodId())) return
nextInterceptor(ctx);
Modified:
core/trunk/src/main/java/org/jboss/cache/invocation/CacheInvocationDelegate.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/invocation/CacheInvocationDelegate.java 2008-01-17
15:22:36 UTC (rev 5160)
+++
core/trunk/src/main/java/org/jboss/cache/invocation/CacheInvocationDelegate.java 2008-01-17
22:34:36 UTC (rev 5161)
@@ -202,16 +202,6 @@
return cache.getInternalFqns();
}
- public void fetchPartialState(List<Address> members, Fqn subtreeRoot) throws
Exception
- {
- cache.fetchPartialState(members, subtreeRoot);
- }
-
- public void fetchPartialState(List<Address> members, Fqn subtreeRoot, Fqn
integrationPoint) throws Exception
- {
- cache.fetchPartialState(members, subtreeRoot, integrationPoint);
- }
-
public int getNumberOfLocksHeld()
{
return cache.getNumberOfLocksHeld();
@@ -307,12 +297,12 @@
public Address getLocalAddress()
{
- return cache.getLocalAddress();
+ return rpcManager.getLocalAddress();
}
public List<Address> getMembers()
{
- return cache.getMembers();
+ return rpcManager.getMembers();
}
public String getVersion()
Modified: core/trunk/src/main/java/org/jboss/cache/lock/LockUtil.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/lock/LockUtil.java 2008-01-17 15:22:36 UTC
(rev 5160)
+++ core/trunk/src/main/java/org/jboss/cache/lock/LockUtil.java 2008-01-17 22:34:36 UTC
(rev 5161)
@@ -2,9 +2,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.jboss.cache.CacheImpl;
import org.jboss.cache.Node;
-import org.jboss.cache.NodeSPI;
import org.jboss.cache.statetransfer.StateTransferManager;
import org.jboss.cache.transaction.GlobalTransaction;
import org.jboss.cache.transaction.TransactionTable;
@@ -12,7 +10,6 @@
import javax.transaction.Status;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
-import java.util.Iterator;
public abstract class LockUtil
{
@@ -26,11 +23,8 @@
public static boolean breakTransactionLock(NodeLock lock,
GlobalTransaction gtx,
boolean localTx,
- CacheImpl cache)
+ TransactionTable tx_table,
TransactionManager tm)
{
- TransactionTable tx_table = cache.getTransactionTable();
- TransactionManager tm = cache.getTransactionManager();
-
boolean broken = false;
int tryCount = 0;
int lastStatus = TransactionLockStatus.STATUS_BROKEN;
@@ -55,89 +49,6 @@
}
/**
- * Forcibly acquire a read lock on the given node for the given owner,
- * breaking any existing locks that prevent the read lock. If the
- * existing lock is held by a GlobalTransaction, breaking the lock may
- * result in a rollback of the transaction.
- *
- * @param node the node
- * @param newOwner the new owner (usually a Thread or GlobalTransaction)
- * @param lockChildren <code>true</code> if this method should be
recursively
- * applied to <code>node</code>'s children.
- */
- public static void forceAcquireLock(NodeSPI<?, ?> node,
- Object newOwner,
- CacheImpl cache,
- boolean lockChildren)
- {
-
- NodeLock lock = node.getLock();
- boolean acquired = lock.isOwner(newOwner);
-
- if (!acquired && log.isDebugEnabled())
- {
- log.debug("Force acquiring lock on node " + node.getFqn());
- }
-
- TransactionTable tx_table = cache.getTransactionTable();
- TransactionManager tm = cache.getTransactionManager();
- Object localAddress = cache.getLocalAddress();
- boolean serializable = cache.getConfiguration().getIsolationLevel() ==
IsolationLevel.SERIALIZABLE;
-
- while (!acquired)
- {
- Object curOwner = null;
- boolean attempted = false;
-
- // Keep breaking write locks until we acquire a read lock
- // or there are no more write locks
- while (!acquired && ((curOwner = lock.getWriterOwner()) != null))
- {
- acquired = acquireLockFromOwner(node, lock, curOwner, newOwner, tx_table, tm,
localAddress);
- attempted = true;
- }
-
- // If no more write locks, but we haven't acquired, see if we
- // need to break read locks as well.
- if (!acquired && serializable)
- {
- Iterator it = lock.getReaderOwners().iterator();
- if (it.hasNext())
- {
- curOwner = it.next();
- acquired = acquireLockFromOwner(node, lock, curOwner, newOwner, tx_table,
tm, localAddress);
- attempted = true;
- // Don't keep iterating due to the risk of
- // ConcurrentModificationException if readers are removed
- // Just go back through our outer loop to get the nextInterceptor one
- }
- }
-
- if (!acquired && !attempted)
- {
- // We only try to acquire above if someone else has the lock.
- // Seems no one is holding a lock and it's there for the taking.
- try
- {
- acquired = lock.acquire(newOwner, 1, NodeLock.LockType.READ);
- }
- catch (Exception ignored)
- {
- }
- }
- }
-
- // Recursively unlock children
- if (lockChildren)
- {
- for (NodeSPI n : node.getChildrenDirect())
- {
- forceAcquireLock(n, newOwner, cache, true);
- }
- }
- }
-
- /**
* Attempts to acquire a read lock on <code>node</code> for
* <code>newOwner</code>, if necessary breaking locks held by
* <code>curOwner</code>.
@@ -158,7 +69,7 @@
if (log.isTraceEnabled())
{
log.trace("Attempting to acquire lock for node " + node.getFqn() +
- " from owner " + curOwner);
+ " from owner " + curOwner);
}
boolean acquired = false;
@@ -192,7 +103,7 @@
if (broken && log.isTraceEnabled())
{
log.trace("Broke lock for node " + node.getFqn() +
- " held by owner " + curOwner);
+ " held by owner " + curOwner);
}
try
@@ -266,7 +177,7 @@
if (log.isTraceEnabled())
{
log.trace("Attempting to break transaction lock held "
+
- " by " + gtx + " by rolling back local
tx");
+ " by " + gtx + " by rolling back local
tx");
}
// This thread has to join the tx
tm.resume(tx);
@@ -315,8 +226,8 @@
if (log.isTraceEnabled())
{
log.trace("Attempting to break transaction lock held "
+
- "by " + gtx + " by marking local tx as
" +
- "rollback-only");
+ "by " + gtx + " by marking local tx as
" +
+ "rollback-only");
}
tx.setRollbackOnly();
break;
@@ -349,7 +260,7 @@
// Race condition; gtx was cleared from tx_table.
// Just double check if gtx still holds a lock
if (gtx == lock.getWriterOwner()
- || lock.getReaderOwners().contains(gtx))
+ || lock.getReaderOwners().contains(gtx))
{
// TODO should we throw an exception??
lock.release(gtx);
Modified: core/trunk/src/test/java/org/jboss/cache/api/CacheSPITest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/api/CacheSPITest.java 2008-01-17 15:22:36 UTC
(rev 5160)
+++ core/trunk/src/test/java/org/jboss/cache/api/CacheSPITest.java 2008-01-17 22:34:36 UTC
(rev 5161)
@@ -89,8 +89,8 @@
Configuration conf1 =
UnitTestCacheConfigurationFactory.createConfiguration(CacheMode.REPL_SYNC);
Configuration conf2 =
UnitTestCacheConfigurationFactory.createConfiguration(CacheMode.REPL_SYNC);
- cache1 = (CacheSPI<Object, Object>) new
DefaultCacheFactory().createCache(conf1, false);
- cache2 = (CacheSPI<Object, Object>) new
DefaultCacheFactory().createCache(conf2, false);
+ cache1 = (CacheSPI<Object, Object>) new DefaultCacheFactory<Object,
Object>().createCache(conf1, false);
+ cache2 = (CacheSPI<Object, Object>) new DefaultCacheFactory<Object,
Object>().createCache(conf2, false);
cache1.start();
assertTrue("Cache1 is coordinator",
cache1.getRPCManager().isCoordinator());
Modified: core/trunk/src/test/java/org/jboss/cache/api/NodeAPITest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/api/NodeAPITest.java 2008-01-17 15:22:36 UTC
(rev 5160)
+++ core/trunk/src/test/java/org/jboss/cache/api/NodeAPITest.java 2008-01-17 22:34:36 UTC
(rev 5161)
@@ -28,7 +28,7 @@
* @author <a href="mailto:manik@jboss.org">Manik Surtani</a>
* @since 2.0.0
*/
-@Test(groups = {"functional"})
+@Test(groups = "functional")
public class NodeAPITest
{
private Node<Object, Object> rootNode;
Modified:
core/trunk/src/test/java/org/jboss/cache/api/pfer/PutForExternalReadTestBase.java
===================================================================
---
core/trunk/src/test/java/org/jboss/cache/api/pfer/PutForExternalReadTestBase.java 2008-01-17
15:22:36 UTC (rev 5160)
+++
core/trunk/src/test/java/org/jboss/cache/api/pfer/PutForExternalReadTestBase.java 2008-01-17
22:34:36 UTC (rev 5161)
@@ -9,7 +9,6 @@
import org.jboss.cache.Fqn;
import org.jboss.cache.NodeSPI;
import org.jboss.cache.RPCManager;
-import org.jboss.cache.RPCManagerImpl;
import org.jboss.cache.config.Configuration;
import org.jboss.cache.config.Configuration.CacheMode;
import org.jboss.cache.factories.UnitTestCacheConfigurationFactory;
@@ -166,11 +165,11 @@
public void testAsyncForce() throws Exception
{
- RPCManager rpcManager = EasyMock.createMock(RPCManager.class);
+ RPCManager rpcManager = EasyMock.createNiceMock(RPCManager.class);
RPCManager originalRpcManager =
cache1.getConfiguration().getRuntimeConfig().getRPCManager();
-
+ List<Address> memberList = originalRpcManager.getMembers();
+ expect(rpcManager.getMembers()).andReturn(memberList).anyTimes();
// inject a mock RPC manager so that we can test whether calls made are sync or
async.
- //cache1.getConfiguration().getRuntimeConfig().setRPCManager(rpcManager);
TestingUtil.extractComponentRegistry(cache1).registerComponent(RPCManager.class.getName(),
rpcManager, RPCManager.class);
// invalidations will not trigger any rpc call sfor PFER
@@ -178,7 +177,6 @@
{
// specify what we expect called on the mock Rpc Manager. For params we
don't care about, just use ANYTHING.
// setting the mock object to expect the "sync" param to be false.
- expect(rpcManager.getReplicationQueue()).andReturn(null);
expect(rpcManager.callRemoteMethods(anyAddresses(), (MethodCall) anyObject(),
eq(false), anyBoolean(), anyInt())).andReturn(null);
}
@@ -233,52 +231,58 @@
assertEquals("parent fqn tx should have completed", value,
cache2.get(parentFqn, key));
}
- public void testExceptionSuppression()
+ public void testExceptionSuppression() throws Exception
{
- RPCManager barfingRpcManager = new RPCManagerImpl()
+ RPCManager barfingRpcManager = EasyMock.createNiceMock(RPCManager.class);
+ RPCManager originalRpcManager =
cache1.getConfiguration().getRuntimeConfig().getRPCManager();
+ try
{
- @Override
- public List callRemoteMethods(List<Address> recipients, MethodCall method,
boolean synchronous, boolean excludeSelf, int timeout)
- {
- throw new RuntimeException("Barf");
- }
- };
+ List<Address> memberList = originalRpcManager.getMembers();
+ expect(barfingRpcManager.getMembers()).andReturn(memberList).anyTimes();
+
expect(barfingRpcManager.getLocalAddress()).andReturn(originalRpcManager.getLocalAddress()).anyTimes();
+ expect(barfingRpcManager.callRemoteMethods(anyAddresses(), (MethodCall)
anyObject(), anyBoolean(), anyBoolean(), anyInt())).andThrow(new
RuntimeException("Barf!")).anyTimes();
+ replay(barfingRpcManager);
-
TestingUtil.extractComponentRegistry(cache1).registerComponent(RPCManager.class.getName(),
barfingRpcManager, RPCManager.class);
- cache1.getConfiguration().getRuntimeConfig().setRPCManager(barfingRpcManager);
+
TestingUtil.extractComponentRegistry(cache1).registerComponent(RPCManager.class.getName(),
barfingRpcManager, RPCManager.class);
+ cache1.getConfiguration().getRuntimeConfig().setRPCManager(barfingRpcManager);
- try
- {
- cache1.put(fqn, key, value);
- if (!optimistic) fail("Should have barfed");
- }
- catch (RuntimeException re)
- {
- }
-
- if (optimistic && !isUsingInvalidation())
- {
- // proves that the put did, in fact, barf. Doesn't work for invalidations
since the inability to invalidate will not cause a rollback.
- assertNull(cache1.get(fqn, key));
- }
- else
- {
- // clean up any indeterminate state left over
try
{
- cache1.removeNode(fqn);
- // as above, the inability to invalidate will not cause an exception
- if (!isUsingInvalidation()) fail("Should have barfed");
+ cache1.put(fqn, key, value);
+ if (!optimistic) fail("Should have barfed");
}
catch (RuntimeException re)
{
}
- }
- assertNull("Should have cleaned up", cache1.get(fqn, key));
+ if (optimistic && !isUsingInvalidation())
+ {
+ // proves that the put did, in fact, barf. Doesn't work for
invalidations since the inability to invalidate will not cause a rollback.
+ assertNull(cache1.get(fqn, key));
+ }
+ else
+ {
+ // clean up any indeterminate state left over
+ try
+ {
+ cache1.removeNode(fqn);
+ // as above, the inability to invalidate will not cause an exception
+ if (!isUsingInvalidation()) fail("Should have barfed");
+ }
+ catch (RuntimeException re)
+ {
+ }
+ }
- // should not barf
- cache1.putForExternalRead(fqn, key, value);
+ assertNull("Should have cleaned up", cache1.get(fqn, key));
+
+ // should not barf
+ cache1.putForExternalRead(fqn, key, value);
+ }
+ finally
+ {
+
TestingUtil.extractComponentRegistry(cache1).registerComponent(RPCManager.class.getName(),
originalRpcManager, RPCManager.class);
+ }
}
public void testBasicPropagation() throws Exception
Modified: core/trunk/src/test/java/org/jboss/cache/misc/TestingUtil.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/misc/TestingUtil.java 2008-01-17 15:22:36 UTC
(rev 5160)
+++ core/trunk/src/test/java/org/jboss/cache/misc/TestingUtil.java 2008-01-17 22:34:36 UTC
(rev 5161)
@@ -283,7 +283,7 @@
*/
public static boolean isCacheViewComplete(CacheImpl cache, int memberCount)
{
- List members = cache.getMembers();
+ List members = cache.getRPCManager().getMembers();
if (members == null || memberCount > members.size())
{
return false;
@@ -292,7 +292,7 @@
{
// This is an exceptional condition
StringBuffer sb = new StringBuffer("Cache at address ");
- sb.append(cache.getLocalAddress());
+ sb.append(cache.getRPCManager().getLocalAddress());
sb.append(" had ");
sb.append(members.size());
sb.append(" members; expecting ");