[jboss-cvs] JBossCache/src/org/jboss/cache ...
Brian Stansberry
brian.stansberry at jboss.com
Thu Jul 20 17:58:21 EDT 2006
User: bstansberry
Date: 06/07/20 17:58:21
Modified: src/org/jboss/cache CacheSPI.java TreeCacheProxyImpl.java
TreeCache.java
Log:
[JBCACHE-465] Extract the state transfer code out of TreeCache
Revision Changes Path
1.5 +8 -1 JBossCache/src/org/jboss/cache/CacheSPI.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: CacheSPI.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/CacheSPI.java,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -b -r1.4 -r1.5
--- CacheSPI.java 19 Jul 2006 21:34:45 -0000 1.4
+++ CacheSPI.java 20 Jul 2006 21:58:21 -0000 1.5
@@ -8,10 +8,11 @@
import org.jboss.cache.buddyreplication.BuddyManager;
import org.jboss.cache.config.Configuration;
+import org.jboss.cache.eviction.RegionManager;
import org.jboss.cache.interceptors.Interceptor;
import org.jboss.cache.loader.CacheLoader;
import org.jboss.cache.loader.CacheLoaderManager;
-import org.jboss.cache.eviction.RegionManager;
+import org.jboss.cache.statetransfer.StateTransferManager;
import org.jgroups.Address;
import org.jgroups.blocks.MethodCall;
@@ -69,6 +70,12 @@
/**
*
+ * @return the current {@link PojoStateTransferManager}
+ */
+ StateTransferManager getStateTransferManager();
+
+ /**
+ *
* @return the local address of this cache in a cluster. Null if running in local mode.
*/
Object getLocalAddress();
1.5 +6 -0 JBossCache/src/org/jboss/cache/TreeCacheProxyImpl.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: TreeCacheProxyImpl.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/TreeCacheProxyImpl.java,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -b -r1.4 -r1.5
--- TreeCacheProxyImpl.java 19 Jul 2006 21:34:45 -0000 1.4
+++ TreeCacheProxyImpl.java 20 Jul 2006 21:58:21 -0000 1.5
@@ -12,6 +12,7 @@
import org.jboss.cache.loader.CacheLoaderManager;
import org.jboss.cache.lock.IdentityLock;
import org.jboss.cache.eviction.RegionManager;
+import org.jboss.cache.statetransfer.StateTransferManager;
import org.jgroups.Address;
import org.jgroups.blocks.MethodCall;
@@ -73,6 +74,11 @@
return RPCManager.getInstance(treeCache);
}
+ public StateTransferManager getStateTransferManager()
+ {
+ return treeCache.getStateTransferManager();
+ }
+
public Object getLocalAddress()
{
return treeCache.getLocalAddress();
1.208 +46 -689 JBossCache/src/org/jboss/cache/TreeCache.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: TreeCache.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/TreeCache.java,v
retrieving revision 1.207
retrieving revision 1.208
diff -u -b -r1.207 -r1.208
--- TreeCache.java 20 Jul 2006 09:03:51 -0000 1.207
+++ TreeCache.java 20 Jul 2006 21:58:21 -0000 1.208
@@ -23,6 +23,7 @@
import org.jboss.cache.lock.IdentityLock;
import org.jboss.cache.lock.IsolationLevel;
import org.jboss.cache.lock.LockStrategyFactory;
+import org.jboss.cache.lock.LockUtil;
import org.jboss.cache.lock.LockingException;
import org.jboss.cache.lock.TimeoutException;
import org.jboss.cache.marshall.JBCMethodCall;
@@ -35,9 +36,7 @@
import org.jboss.cache.marshall.TreeCacheMarshaller;
import org.jboss.cache.marshall.VersionAwareMarshaller;
import org.jboss.cache.optimistic.DataVersion;
-import org.jboss.cache.statetransfer.StateTransferFactory;
-import org.jboss.cache.statetransfer.StateTransferGenerator;
-import org.jboss.cache.statetransfer.StateTransferIntegrator;
+import org.jboss.cache.statetransfer.StateTransferManager;
import org.jboss.cache.util.MBeanConfigurator;
import org.jboss.invocation.MarshalledValueOutputStream;
import org.jboss.system.ServiceMBeanSupport;
@@ -74,7 +73,7 @@
* @author <a href="mailto:manik at jboss.org">Manik Surtani (manik at jboss.org)</a>
* @author Brian Stansberry
* @author Daniel Huang (dhuang at jboss.org)
- * @version $Id: TreeCache.java,v 1.207 2006/07/20 09:03:51 msurtani Exp $
+ * @version $Id: TreeCache.java,v 1.208 2006/07/20 21:58:21 bstansberry Exp $
* <p/>
* @see <a href="http://labs.jboss.com/portal/jbosscache/docs">JBossCache doc</a>
*/
@@ -253,6 +252,23 @@
*/
protected BuddyManager buddyManager;
+ /** State transfer manager. Do not access this field directly -- use the getter */
+ private StateTransferManager stateTransferManager;
+
+ public StateTransferManager getStateTransferManager()
+ {
+ if (stateTransferManager == null)
+ {
+ stateTransferManager = new StateTransferManager(this);
+ }
+ return stateTransferManager;
+ }
+
+ public void setStateTransferManager(StateTransferManager manager)
+ {
+ this.stateTransferManager = manager;
+ }
+
private boolean isStandalone = false;
private long stateFetchTimeout;
@@ -262,7 +278,7 @@
* activateReqion or inactivateRegion. Requests for these fqns
* will be ignored by _getState().
*/
- protected final Set activationChangeNodes = new HashSet();
+ protected final Set activationChangeNodes = Collections.synchronizedSet(new HashSet());
boolean started;
public Configuration getConfiguration()
@@ -957,22 +973,6 @@
// ----------- Marshalling and State Transfer -----------------------
/**
- * Returns the state bytes from the message listener.
- */
- public byte[] getStateBytes()
- {
- return this.getMessageListener().getState();
- }
-
- /**
- * Sets the state bytes in the message listener.
- */
- public void setStateBytes(byte[] state)
- {
- this.getMessageListener().setState(state);
- }
-
- /**
* Registers a specific classloader for a region defined by a fully
* qualified name.
* A instance of {@link TreeCacheMarshaller} is used for marshalling.
@@ -995,7 +995,7 @@
}
/**
- * Unregisteres a class loader for a region.
+ * Unregisters a class loader for a region.
*
* @param fqn The fqn region.
* @throws RegionNotFoundException If there is a conflict in fqn specification.
@@ -1052,10 +1052,7 @@
// Add this fqn to the set of those we are activating
// so calls to _getState for the fqn can return quickly
- synchronized (activationChangeNodes)
- {
activationChangeNodes.add(fqn);
- }
// Start accepting messages for the subtree, but
// queue them for later processing. We do this early
@@ -1082,7 +1079,7 @@
}
Object[] mbrArray = getMembers().toArray();
- _loadState(subtreeRoot.getFqn(), subtreeRoot, mbrArray, cl);
+ getStateTransferManager().loadState(subtreeRoot.getFqn(), subtreeRoot, mbrArray, cl);
}
else
{
@@ -1101,7 +1098,7 @@
// We'll update this node with the state we receive
subtreeRoot = createSubtreeRootNode(buddyRoot);
}
- _loadState(fqn, subtreeRoot, sources, cl);
+ getStateTransferManager().loadState(fqn, subtreeRoot, sources, cl);
}
}
@@ -1143,11 +1140,13 @@
}
finally
{
- synchronized (activationChangeNodes)
- {
activationChangeNodes.remove(fqn);
}
}
+
+ public boolean isActivatingDeactivating(Fqn fqn)
+ {
+ return activationChangeNodes.contains(fqn);
}
/**
@@ -1175,123 +1174,6 @@
}
/**
- * Requests state from each of the given source nodes in the cluster
- * until it gets it or no node replies with a timeout exception. If state
- * is returned, integrates it into the given DataNode. If no state is
- * returned but a node replies with a timeout exception, the calls will be
- * repeated with a longer timeout, until 3 attempts have been made.
- *
- * @param subtreeRoot Fqn of the topmost node in the subtree whose
- * state should be transferred.
- * @param integrationRoot the DataNode into which state should be integrated
- * @param sources the cluster nodes to query for state
- * @param cl the classloader to use to unmarshal the state.
- * Can be <code>null</code>.
- * @throws Exception
- */
- public void _loadState(Fqn subtreeRoot, DataNode integrationRoot,
- Object[] sources, ClassLoader cl)
- throws Exception
- {
- // Call each node in the cluster with progressively longer timeouts
- // until we get state or no cluster node returns a TimeoutException
- long[] timeouts = {400, 800, 1600};
- Object ourself = getLocalAddress(); // ignore ourself when we call
- boolean stateSet = false;
- TimeoutException timeoutException = null;
- Object timeoutTarget = null;
-
- boolean trace = log.isTraceEnabled();
-
- for (int i = 0; i < timeouts.length; i++)
- {
- timeoutException = null;
-
- Boolean force = (i == timeouts.length - 1) ? Boolean.TRUE
- : Boolean.FALSE;
-
- MethodCall psmc = MethodCallFactory.create(MethodDeclarations.getPartialStateMethod,
- new Object[]{subtreeRoot,
- new Long(timeouts[i]),
- force,
- Boolean.FALSE});
-
- MethodCall replPsmc = MethodCallFactory.create(MethodDeclarations.replicateMethod,
- new Object[]{psmc});
-
- // Iterate over the group members, seeing if anyone
- // can give us state for this region
- for (int j = 0; j < sources.length; j++)
- {
- Object target = sources[j];
- if (ourself.equals(target))
- continue;
-
- Vector targets = new Vector();
- targets.add(target);
-
- List responses = callRemoteMethods(targets, replPsmc, true,
- true, configuration.getSyncReplTimeout());
- Object rsp = null;
- if (responses != null && responses.size() > 0)
- {
- rsp = responses.get(0);
- if (rsp instanceof byte[])
- {
- _setState((byte[]) rsp, integrationRoot, cl);
- stateSet = true;
-
- if (log.isDebugEnabled())
- {
- log.debug("TreeCache.activateRegion(): " + ourself +
- " got state from " + target);
- }
-
- break;
- }
- else if (rsp instanceof TimeoutException)
- {
- timeoutException = (TimeoutException) rsp;
- timeoutTarget = target;
- if (trace)
- {
- log.trace("TreeCache.activateRegion(): " + ourself +
- " got a TimeoutException from " + target);
- }
- }
- }
-
- if (trace)
- {
- log.trace("TreeCache.activateRegion(): " + ourself +
- " No usable response from node " + target +
- (rsp == null ? "" : (" -- received " + rsp)));
- }
- }
-
- // We've looped through all targets; if we got state or didn't
- // but no one sent a timeout (which means no one had state)
- // we don't want to try again
- if (stateSet || timeoutException == null)
- break;
- }
-
- if (!stateSet)
- {
- // If we got a timeout exception on the final try,
- // this is a failure condition
- if (timeoutException != null)
- {
- throw new CacheException("Failed getting state due to timeout on " +
- timeoutTarget, timeoutException);
- }
-
- if (log.isDebugEnabled())
- log.debug("TreeCache.activateRegion(): No nodes able to give state");
- }
- }
-
- /**
* Creates a subtree in the local tree.
* Returns the DataNode created.
*/
@@ -1363,7 +1245,7 @@
* managed (either by activate/inactiveRegion()
* or by registerClassLoader())
* @throws CacheException if there is a problem evicting nodes
- * @throws IllegalStateException if is <code>false</code>
+ * @throws IllegalStateException if {@link Configuration#isUseRegionBasedMarshalling()} is <code>false</code>
*/
public void inactivateRegion(String subtreeFqn) throws RegionNameConflictException, CacheException
{
@@ -1377,11 +1259,8 @@
boolean subtreeLocked = false;
try
{
-
- synchronized (activationChangeNodes)
- {
+ // Record that this fqn is in status change, so can't provide state
activationChangeNodes.add(fqn);
- }
boolean inactive = marshaller_.isInactive(subtreeFqn);
if (!inactive)
@@ -1478,12 +1357,9 @@
}
}
- synchronized (activationChangeNodes)
- {
activationChangeNodes.remove(fqn);
}
}
- }
/**
* Evicts the node at <code>subtree</code> along with all descendant nodes.
@@ -1648,480 +1524,7 @@
*/
public byte[] _getState(Fqn fqn, long timeout, boolean force, boolean suppressErrors) throws Throwable
{
-
- if (marshaller_ != null)
- {
- // can't give state for regions currently being activated/inactivated
- synchronized (activationChangeNodes)
- {
- if (activationChangeNodes.contains(fqn))
- {
- if (log.isDebugEnabled())
- log.debug("ignoring _getState() for " + fqn + " as it is being activated/inactivated");
- return null;
- }
- }
-
- // Can't give state for inactive nodes
- if (marshaller_.isInactive(fqn.toString()))
- {
- if (log.isDebugEnabled())
- log.debug("ignoring _getState() for inactive region " + fqn);
- return null;
- }
- }
-
- DataNode rootNode = findNode(fqn);
- if (rootNode == null)
- return null;
-
- boolean fetchTransientState = configuration.isFetchInMemoryState();
- boolean fetchPersistentState = cacheLoaderManager !=null && cacheLoaderManager.isFetchPersistentState();
-
- Object owner = getOwnerForLock();
-
- try
- {
- if (fetchTransientState || fetchPersistentState)
- {
- log.info("locking the " + fqn + " subtree to return the in-memory (transient) state");
- acquireLocksForStateTransfer(rootNode, owner, timeout, true, force);
- }
-
- StateTransferGenerator generator =
- StateTransferFactory.getStateTransferGenerator(this);
-
- return generator.generateStateTransfer(rootNode,
- fetchTransientState,
- fetchPersistentState,
- suppressErrors);
- }
- finally
- {
- releaseStateTransferLocks(rootNode, owner, true);
- }
- }
-
- /**
- * Set the portion of the cache rooted in <code>targetRoot</code>
- * to match the given state. Updates the contents of <code>targetRoot</code>
- * to reflect those in <code>new_state</code>.
- * <p/>
- * <strong>NOTE:</strong> This method performs no locking of nodes; it
- * is up to the caller to lock <code>targetRoot</code> before calling
- * this method.
- *
- * @param new_state a serialized byte[][] array where element 0 is the
- * transient state (or null) , and element 1 is the
- * persistent state (or null)
- * @param targetRoot fqn of the node into which the state should be integrated
- * @param cl classloader to use to unmarshal the state, or
- * <code>null</code> if the TCCL should be used
- */
- public void _setState(byte[] new_state, Fqn targetRoot, ClassLoader cl)
- throws Exception
- {
- DataNode target = findNode(targetRoot);
- if (target == null)
- {
- // Create the integration root, but do not replicate
- Option option = new Option();
- option.setCacheModeLocal(true);
- this.put(targetRoot, null, option);
- target = findNode(targetRoot);
- }
-
- _setState(new_state, target, cl);
- }
-
- /**
- * Set the portion of the cache rooted in <code>targetRoot</code>
- * to match the given state. Updates the contents of <code>targetRoot</code>
- * to reflect those in <code>new_state</code>.
- * <p/>
- * <strong>NOTE:</strong> This method performs no locking of nodes; it
- * is up to the caller to lock <code>targetRoot</code> before calling
- * this method.
- *
- * @param new_state a serialized byte[][] array where element 0 is the
- * transient state (or null) , and element 1 is the
- * persistent state (or null)
- * @param targetRoot node into which the state should be integrated
- * @param cl classloader to use to unmarshal the state, or
- * <code>null</code> if the TCCL should be used
- */
- private void _setState(byte[] new_state, DataNode targetRoot, ClassLoader cl)
- throws Exception
- {
- if (new_state == null)
- {
- log.info("new_state is null (may be first member in cluster)");
- return;
- }
-
- log.info("received the state (size=" + new_state.length + " bytes)");
-
- Object owner = getOwnerForLock();
- try
- {
- // Acquire a lock on the root node
- acquireLocksForStateTransfer(targetRoot, owner, stateFetchTimeout,
- true, true);
-
- // 1. Unserialize the states into transient and persistent state
- StateTransferIntegrator integrator =
- StateTransferFactory.getStateTransferIntegrator(new_state,
- targetRoot.getFqn(),
- this);
-
- // 2. If transient state is available, integrate it
- try
- {
- integrator.integrateTransientState(targetRoot, cl);
- notifyAllNodesCreated(targetRoot);
- }
- catch (Throwable t)
- {
- log.error("failed setting transient state", t);
- }
-
- // 3. Store any persistent state
- integrator.integratePersistentState();
- }
- finally
- {
- releaseStateTransferLocks(targetRoot, owner, true);
- }
-
- }
-
- /**
- * Acquires locks on a root node for an owner for state transfer.
- */
- protected void acquireLocksForStateTransfer(DataNode root,
- Object lockOwner,
- long timeout,
- boolean lockChildren,
- boolean force)
- throws Exception
- {
- try
- {
- if (lockChildren)
- root.acquireAll(lockOwner, timeout, DataNode.LOCK_TYPE_READ);
- else
- root.acquire(lockOwner, timeout, DataNode.LOCK_TYPE_READ);
- }
- catch (TimeoutException te)
- {
- log.error("Caught TimeoutException acquiring locks on region " +
- root.getFqn(), te);
- if (force)
- {
- // Until we have FLUSH in place, don't force locks
-// forceAcquireLock(root, lockOwner, lockChildren);
- throw te;
-
- }
- else
- {
- throw te;
- }
- }
- }
-
- /**
- * Releases all state transfer locks acquired.
- *
- * @see #acquireLocksForStateTransfer
- */
- protected void releaseStateTransferLocks(DataNode root,
- Object lockOwner,
- boolean childrenLocked)
- {
- try
- {
- if (childrenLocked)
- root.releaseAll(lockOwner);
- else
- root.release(lockOwner);
- }
- catch (Throwable t)
- {
- log.error("failed releasing locks", t);
- }
- }
-
- /**
- * Forcibly acquire a read lock on the given node for the given owner,
- * breaking any existing locks that prevent the read lock. If the
- * existing lock is held by a GlobalTransaction, breaking the lock may
- * result in a rollback of the transaction.
- *
- * @param node the node
- * @param newOwner the new owner (usually a Thread or GlobalTransaction)
- * @param lockChildren <code>true</code> if this method should be recursively
- * applied to <code>node</code>'s children.
- */
- protected void forceAcquireLock(DataNode node, Object newOwner, boolean lockChildren)
- {
- IdentityLock lock = node.getLock();
- boolean acquired = lock.isOwner(newOwner);
-
- if (!acquired && log.isDebugEnabled())
- log.debug("Force acquiring lock on node " + node.getFqn());
-
- while (!acquired)
- {
- Object curOwner = null;
- boolean attempted = false;
-
- // Keep breaking write locks until we acquire a read lock
- // or there are no more write locks
- while (!acquired && ((curOwner = lock.getWriterOwner()) != null))
- {
- acquired = acquireLockFromOwner(node, lock, curOwner, newOwner);
- attempted = true;
- }
-
- // If no more write locks, but we haven't acquired, see if we
- // need to break read locks as well
- if (!acquired && configuration.getIsolationLevel() == IsolationLevel.SERIALIZABLE)
- {
- Iterator it = lock.getReaderOwners().iterator();
- if (it.hasNext())
- {
- curOwner = it.next();
- acquired = acquireLockFromOwner(node, lock, it.next(), newOwner);
- attempted = true;
- // Don't keep iterating due to the risk of
- // ConcurrentModificationException if readers are removed
- // Just go back through our outer loop to get the next one
- }
- }
-
- if (!acquired && !attempted)
- {
- // We only try to acquire above if someone else has the lock.
- // Seems no one is holding a lock and it's there for the taking.
- try
- {
- acquired = node.acquire(newOwner, 1, DataNode.LOCK_TYPE_READ);
- }
- catch (Exception ignored)
- {
- }
- }
- }
-
- // Recursively unlock children
- if (lockChildren && node.hasChildren())
- {
- Collection children = node.getChildren().values();
- for (Iterator it = children.iterator(); it.hasNext();)
- {
- forceAcquireLock((DataNode) it.next(), newOwner, true);
- }
- }
- }
-
- /**
- * Attempts to acquire a read lock on <code>node</code> for
- * <code>newOwner</code>, if necessary breaking locks held by
- * <code>curOwner</code>.
- *
- * @param node the node
- * @param lock the lock
- * @param curOwner the current owner
- * @param newOwner the new owner
- */
- private boolean acquireLockFromOwner(DataNode node,
- IdentityLock lock,
- Object curOwner,
- Object newOwner)
- {
- if (log.isTraceEnabled())
- log.trace("Attempting to acquire lock for node " + node.getFqn() +
- " from owner " + curOwner);
-
- boolean acquired = false;
- boolean broken = false;
- int tryCount = 0;
- int lastStatus = TransactionLockStatus.STATUS_BROKEN;
-
- while (!broken && !acquired)
- {
- if (curOwner instanceof GlobalTransaction)
- {
- int status = breakTransactionLock((GlobalTransaction) curOwner, lock, lastStatus, tryCount);
- if (status == TransactionLockStatus.STATUS_BROKEN)
- broken = true;
- else if (status != lastStatus)
- tryCount = 0;
- lastStatus = status;
- }
- else if (tryCount > 0)
- {
- lock.release(curOwner);
- broken = true;
- }
-
- if (broken && log.isTraceEnabled())
- log.trace("Broke lock for node " + node.getFqn() +
- " held by owner " + curOwner);
-
- try
- {
- acquired = node.acquire(newOwner, 1, DataNode.LOCK_TYPE_READ);
- }
- catch (Exception ignore)
- {
- }
-
- tryCount++;
- }
-
- return acquired;
- }
-
- /**
- * Attempts to release the lock held by <code>gtx</code> by altering the
- * underlying transaction. Different strategies will be employed
- * depending on the status of the transaction and param
- * <code>tryCount</code>. Transaction may be rolled back or marked
- * rollback-only, or the lock may just be broken, ignoring the tx. Makes an
- * effort to not affect the tx or break the lock if tx appears to be in
- * the process of completion; param <code>tryCount</code> is used to help
- * make decisions about this.
- * <p/>
- * This method doesn't guarantee to have broken the lock unless it returns
- * {@link TransactionLockStatus#STATUS_BROKEN}.
- *
- * @param gtx the gtx holding the lock
- * @param lock the lock
- * @param lastStatus the return value from a previous invocation of this
- * method for the same lock, or Status.STATUS_UNKNOW
- * for the first invocation.
- * @param tryCount number of times this method has been called with
- * the same gtx, lock and lastStatus arguments. Should
- * be reset to 0 anytime lastStatus changes.
- * @return the current status of the Transaction associated with
- * <code>gtx</code>, or {@link TransactionLockStatus#STATUS_BROKEN}
- * if the lock held by gtx was forcibly broken.
- */
- private int breakTransactionLock(GlobalTransaction gtx,
- IdentityLock lock,
- int lastStatus,
- int tryCount)
- {
- int status = Status.STATUS_UNKNOWN;
- Transaction tx = tx_table.getLocalTransaction(gtx);
- if (tx != null)
- {
- try
- {
- status = tx.getStatus();
-
- if (status != lastStatus)
- tryCount = 0;
-
- switch (status)
- {
- case Status.STATUS_ACTIVE:
- case Status.STATUS_MARKED_ROLLBACK:
- case Status.STATUS_PREPARING:
- case Status.STATUS_UNKNOWN:
- if (tryCount == 0)
- {
- if (log.isTraceEnabled())
- log.trace("Attempting to break transaction lock held " +
- " by " + gtx + " by rolling back local tx");
- // This thread has to join the tx
- tm.resume(tx);
- try
- {
- tx.rollback();
- }
- finally
- {
- tm.suspend();
- }
-
- }
- else if (tryCount > 100)
- {
- // Something is wrong; our initial rollback call
- // didn't generate a valid state change; just force it
- lock.release(gtx);
- status = TransactionLockStatus.STATUS_BROKEN;
- }
- break;
-
- case Status.STATUS_COMMITTING:
- case Status.STATUS_ROLLING_BACK:
- // We'll try up to 10 times before just releasing
- if (tryCount < 10)
- break; // let it finish
- // fall through and release
-
- case Status.STATUS_COMMITTED:
- case Status.STATUS_ROLLEDBACK:
- case Status.STATUS_NO_TRANSACTION:
- lock.release(gtx);
- status = TransactionLockStatus.STATUS_BROKEN;
- break;
-
- case Status.STATUS_PREPARED:
- // If the tx was started here, we can still abort the commit,
- // otherwise we are in the middle of a remote commit() call
- // and the status is just about to change
- if (tryCount == 0 && gtx.addr.equals(getLocalAddress()))
- {
- // We can still abort the commit
- if (log.isTraceEnabled())
- log.trace("Attempting to break transaction lock held " +
- "by " + gtx + " by marking local tx as " +
- "rollback-only");
- tx.setRollbackOnly();
- break;
- }
- else if (tryCount < 10)
- {
- // EITHER tx was started elsewhere (in which case we'll
- // wait a bit to allow the commit() call to finish;
- // same as STATUS_COMMITTING above)
- // OR we marked the tx rollbackOnly above and are just
- // waiting a bit for the status to change
- break;
- }
-
- // fall through and release
- default:
- lock.release(gtx);
- status = TransactionLockStatus.STATUS_BROKEN;
- }
- }
- catch (Exception e)
- {
- log.error("Exception breaking locks held by " + gtx, e);
- lock.release(gtx);
- status = TransactionLockStatus.STATUS_BROKEN;
- }
- }
- else
- {
- // Race condition; gtx was cleared from tx_table.
- // Just double check if gtx still holds a lock
- if (gtx == lock.getWriterOwner()
- || lock.getReaderOwners().contains(gtx))
- {
- // TODO should we throw an exception??
- lock.release(gtx);
- status = TransactionLockStatus.STATUS_BROKEN;
- }
- }
-
- return status;
+ return getStateTransferManager().getState(fqn, timeout, force, suppressErrors);
}
private void removeLocksForDeadMembers(DataNode node,
@@ -2148,7 +1551,13 @@
for (iter = deadOwners.iterator(); iter.hasNext();)
{
- breakTransactionLock(node, lock, (GlobalTransaction) iter.next());
+ GlobalTransaction deadOwner = (GlobalTransaction) iter.next();
+ boolean localTx = deadOwner.getAddress().equals(getLocalAddress());
+ boolean broken = LockUtil.breakTransactionLock(lock, deadOwner, localTx, this);
+
+ if (broken && log.isTraceEnabled())
+ log.trace("Broke lock for node " + node.getFqn() +
+ " held by " + deadOwner);
}
// Recursively unlock children
@@ -2162,31 +1571,6 @@
}
}
- private void breakTransactionLock(DataNode node,
- IdentityLock lock,
- GlobalTransaction gtx)
- {
- boolean broken = false;
- int tryCount = 0;
- int lastStatus = TransactionLockStatus.STATUS_BROKEN;
-
- while (!broken && lock.isOwner(gtx))
- {
- int status = breakTransactionLock(gtx, lock, lastStatus, tryCount);
- if (status == TransactionLockStatus.STATUS_BROKEN)
- broken = true;
- else if (status != lastStatus)
- tryCount = 0;
- lastStatus = status;
-
- if (broken && log.isTraceEnabled())
- log.trace("Broke lock for node " + node.getFqn() +
- " held by owner " + gtx);
-
- tryCount++;
- }
- }
-
private boolean isLockOwnerDead(Object owner, Vector deadMembers)
{
boolean result = false;
@@ -4316,7 +3700,7 @@
// but we have to catch the Throwable declared in the method sig
my_log.error("Caught " + t.getClass().getName() +
" while responding to initial state transfer request;" +
- " returning null");
+ " returning null", t);
return null;
}
}
@@ -4329,7 +3713,7 @@
if (new_state == null)
my_log.info("transferred state is null (may be first member in cluster)");
else
- TreeCache.this._setState(new_state, Fqn.ROOT, null);
+ getStateTransferManager().setState(new_state, Fqn.ROOT, null);
isStateSet = true;
}
@@ -4599,7 +3983,7 @@
* @param fqn Fully qualified name for the corresponding node.
* @return DataNode
*/
- private DataNode findNode(Fqn fqn)
+ public DataNode findNode(Fqn fqn)
{
try
{
@@ -4911,27 +4295,6 @@
}
/**
- * Generates NodeAdded notifications for all nodes of the tree. This is
- * called whenever the tree is initially retrieved (state transfer)
- */
- protected void notifyAllNodesCreated(DataNode curr)
- {
- DataNode n;
- Map children;
-
- if (curr == null) return;
- notifyNodeCreated(curr.getFqn());
- if ((children = curr.getChildren()) != null)
- {
- for (Iterator it = children.values().iterator(); it.hasNext();)
- {
- n = (DataNode) it.next();
- notifyAllNodesCreated(n);
- }
- }
- }
-
- /**
* Returns the default JGroup properties.
* Subclasses may wish to override this method.
*/
@@ -5075,10 +4438,4 @@
}
- static interface TransactionLockStatus extends Status
- {
- public static final int STATUS_BROKEN = Integer.MIN_VALUE;
- }
-
-
}
More information about the jboss-cvs-commits
mailing list