[jboss-cvs] JBossCache/src/org/jboss/cache ...
Vladmir Blagojevic
vladimir.blagojevic at jboss.com
Tue Nov 14 14:56:09 EST 2006
User: vblagojevic
Date: 06/11/14 14:56:09
Modified: src/org/jboss/cache RegionImpl.java RegionManager.java
TreeCache.java
Log:
[JBCACHE-591] partial state transfer
Revision Changes Path
1.8 +3 -1 JBossCache/src/org/jboss/cache/RegionImpl.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: RegionImpl.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/RegionImpl.java,v
retrieving revision 1.7
retrieving revision 1.8
diff -u -b -r1.7 -r1.8
--- RegionImpl.java 14 Nov 2006 14:17:11 -0000 1.7
+++ RegionImpl.java 14 Nov 2006 19:56:09 -0000 1.8
@@ -72,11 +72,13 @@
public void activate()
{
+ regionManager.activateRegion(fqn.toString());
active = true;
}
public void deactivate()
{
+ regionManager.inactivateRegion(fqn.toString());
active = false;
}
1.5 +300 -7 JBossCache/src/org/jboss/cache/RegionManager.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: RegionManager.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/RegionManager.java,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -b -r1.4 -r1.5
--- RegionManager.java 14 Nov 2006 14:17:11 -0000 1.4
+++ RegionManager.java 14 Nov 2006 19:56:09 -0000 1.5
@@ -1,16 +1,23 @@
package org.jboss.cache;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.jboss.cache.buddyreplication.BuddyManager;
-import org.jboss.cache.eviction.RegionNameConflictException;
-
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.cache.buddyreplication.BuddyManager;
+import org.jboss.cache.config.Configuration;
+import org.jboss.cache.marshall.RegionNameConflictException;
+import org.jboss.cache.marshall.VersionAwareMarshaller;
+import org.jgroups.Address;
+
/**
* Encapsulates the concept of a {@link Region}, and manages instances of such regions.
*
@@ -26,6 +33,16 @@
boolean defaultInactive;
private Log log = LogFactory.getLog(RegionManager.class);
+ protected final Set activationChangeNodes = Collections.synchronizedSet(new HashSet());
+
+ TreeCache cache;
+
+ public RegionManager(){}
+
+ public RegionManager(TreeCache cache)
+ {
+ this.cache = cache;
+ }
public boolean isDefaultInactive()
{
@@ -207,6 +224,282 @@
}
/**
+ * Causes the cache to transfer state for the subtree rooted at
+ * <code>subtreeFqn</code> and to begin accepting replication messages
+ * for that subtree.
+ * <p/>
+ * <strong>NOTE:</strong> This method will cause the creation of a node
+ * in the local tree at <code>subtreeFqn</code> whether or not that
+ * node exists anywhere else in the cluster. If the node does not exist
+ * elsewhere, the local node will be empty. The creation of this node will
+ * not be replicated.
+ *
+ * @param subtreeFqn Fqn string indicating the uppermost node in the
+ * portion of the tree that should be activated.
+ * @throws RegionNotEmptyException if the node <code>subtreeFqn</code>
+ * exists and has either data or children
+ */
+ public void activateRegion(String subtreeFqn) throws CacheException
+ {
+ Fqn fqn = Fqn.fromString(subtreeFqn);
+
+ // Check whether the node already exists and has data
+ DataNode subtreeRoot = cache.findNode(fqn);
+ if (!(cache.isNodeEmpty(subtreeRoot)))
+ {
+ throw new RegionNotEmptyException("Node " + subtreeRoot.getFqn() + " already exists and is not empty");
+ }
+
+ if(isActivatingDeactivating(fqn))
+ {
+ throw new CacheException("Region " + subtreeRoot.getFqn() + " is already being activated/deactivated");
+ }
+
+ if (log.isDebugEnabled())
+ {
+ log.debug("activating " + fqn);
+ }
+
+ try
+ {
+
+ // Add this fqn to the set of those we are activating
+ // so calls to _getState for the fqn can return quickly
+ activationChangeNodes.add(fqn);
+
+ Region region = getRegion(fqn, true);
+
+ // If a classloader is registered for the node's region, use it
+ ClassLoader cl = region.getClassLoader();
+
+ BuddyManager buddyManager = cache.getBuddyManager();
+ // Request partial state from the cluster and integrate it
+ if (buddyManager == null)
+ {
+ // Get the state from any node that has it and put it
+ // in the main tree
+ if (subtreeRoot == null)
+ {
+ // We'll update this node with the state we receive
+ subtreeRoot = cache.createSubtreeRootNode(fqn);
+ }
+
+ Address [] groupMembers = null;
+ Vector<Address> members = cache.getMembers();
+ synchronized (members)
+ {
+ groupMembers = members.toArray(new Address[members.size()]);
+ }
+ if (groupMembers.length < 2)
+ {
+ if (log.isDebugEnabled())
+ log.debug("No nodes able to give state");
+ }
+ else
+ {
+ cache.fetchPartialState(groupMembers, subtreeRoot.getFqn());
+ }
+ }
+ else
+ {
+ // Get the state from each DataOwner and integrate in their
+ // respective buddy backup tree
+ List buddies = buddyManager.getBuddyAddresses();
+ for (Iterator it = buddies.iterator(); it.hasNext();)
+ {
+ Address buddy = (Address) it.next();
+ Object sources[] = new Object[]{buddy};
+ Fqn base = new Fqn(BuddyManager.BUDDY_BACKUP_SUBTREE_FQN, BuddyManager.getGroupNameFromAddress(buddy));
+ Fqn buddyRoot = new Fqn(base, fqn);
+ subtreeRoot = cache.findNode(buddyRoot);
+ if (subtreeRoot == null)
+ {
+ // We'll update this node with the state we receive
+ subtreeRoot = cache.createSubtreeRootNode(buddyRoot);
+ }
+ cache.fetchPartialState(sources, subtreeRoot.getFqn());
+ }
+ }
+ }
+ catch (Throwable t)
+ {
+ log.error("failed to activate " + subtreeFqn, t);
+
+ // "Re-deactivate" the region
+ try
+ {
+ inactivateRegion(subtreeFqn);
+ }
+ catch (Exception e)
+ {
+ log.error("failed inactivating " + subtreeFqn, e);
+ // just swallow this one and throw the first one
+ }
+
+ // Throw the exception on, wrapping if necessary
+ if (t instanceof RegionNotEmptyException)
+ {
+ throw(RegionNotEmptyException) t;
+ }
+ else if (t instanceof CacheException)
+ {
+ throw(CacheException) t;
+ }
+ else
+ {
+ throw new CacheException(t.getClass().getName() + " " +
+ t.getLocalizedMessage(), t);
+ }
+ }
+ finally
+ {
+ activationChangeNodes.remove(fqn);
+ }
+ }
+
+ /**
+ * Causes the cache to stop accepting replication events for the subtree
+ * rooted at <code>subtreeFqn</code> and evict all nodes in that subtree.
+ *
+ * @param subtreeFqn Fqn string indicating the uppermost node in the
+ * portion of the tree that should be activated.
+ * @throws RegionNameConflictException if <code>subtreeFqn</code> indicates
+ * a node that is part of another
+ * subtree that is being specially
+ * managed (either by activate/inactiveRegion()
+ * or by registerClassLoader())
+ * @throws CacheException if there is a problem evicting nodes
+ * @throws IllegalStateException if {@link Configuration#isUseRegionBasedMarshalling()} is <code>false</code>
+ */
+ public void inactivateRegion(String subtreeFqn) throws CacheException
+ {
+ Fqn fqn = Fqn.fromString(subtreeFqn);
+ DataNode parent = null;
+ DataNode subtreeRoot = null;
+ boolean parentLocked = false;
+ boolean subtreeLocked = false;
+
+ if(isActivatingDeactivating(fqn))
+ {
+ throw new CacheException("Region " + subtreeRoot.getFqn() + " is already being activated/deactivated");
+ }
+
+ try
+ {
+ // Record that this fqn is in status change, so can't provide state
+ activationChangeNodes.add(fqn);
+
+ VersionAwareMarshaller marshaller = cache.getMarshaller();
+ boolean inactive = marshaller.isInactive(subtreeFqn);
+ if (!inactive)
+ {
+ deactivate(subtreeFqn);
+ }
+
+ // Create a list with the Fqn in the main tree and any buddy backup trees
+ BuddyManager buddyManager = cache.getBuddyManager();
+ ArrayList list = new ArrayList();
+ list.add(fqn);
+
+ if (buddyManager != null)
+ {
+ Set buddies = cache.getChildrenNames(BuddyManager.BUDDY_BACKUP_SUBTREE_FQN);
+ if (buddies != null)
+ {
+ for (Iterator it = buddies.iterator(); it.hasNext();)
+ {
+ Fqn base = new Fqn(BuddyManager.BUDDY_BACKUP_SUBTREE_FQN, it.next());
+ list.add(new Fqn(base, fqn));
+ }
+ }
+ }
+
+ long stateFetchTimeout = cache.getConfiguration().getLockAcquisitionTimeout() + 5000;
+ // Remove the subtree from the main tree and any buddy backup trees
+ for (Iterator it = list.iterator(); it.hasNext();)
+ {
+ Fqn subtree = (Fqn) it.next();
+ subtreeRoot = cache.findNode(subtree);
+ if (subtreeRoot != null)
+ {
+ // Acquire locks
+ Object owner = cache.getOwnerForLock();
+ subtreeRoot.acquireAll(owner, stateFetchTimeout, DataNode.LockType.WRITE);
+ subtreeLocked = true;
+
+ // Lock the parent, as we're about to write to it
+ parent = (DataNode) subtreeRoot.getParent();
+ if (parent != null)
+ {
+ parent.acquire(owner, stateFetchTimeout, DataNode.LockType.WRITE);
+ parentLocked = true;
+ }
+
+ // Remove the subtree
+ cache._evictSubtree(subtree);
+
+ // Release locks
+ if (parent != null)
+ {
+ log.debug("forcing release of locks in parent");
+ parent.releaseAllForce();
+ }
+
+ parentLocked = false;
+
+ log.debug("forcing release of all locks in subtree");
+ subtreeRoot.releaseAllForce();
+ subtreeLocked = false;
+ }
+ }
+ }
+ catch (InterruptedException ie)
+ {
+ throw new CacheException("Interrupted while acquiring lock", ie);
+ }
+ finally
+ {
+ // If we didn't succeed, undo the marshalling change
+ // NO. Since we inactivated, we may have missed changes
+ //if (!success && !inactive)
+ // marshaller_.activate(subtreeFqn);
+
+ // If necessary, release locks
+ if (parentLocked)
+ {
+ log.debug("forcing release of locks in parent");
+ try
+ {
+ parent.releaseAllForce();
+ }
+ catch (Throwable t)
+ {
+ log.error("failed releasing locks", t);
+ }
+ }
+ if (subtreeLocked)
+ {
+ log.debug("forcing release of all locks in subtree");
+ try
+ {
+ subtreeRoot.releaseAllForce();
+ }
+ catch (Throwable t)
+ {
+ log.error("failed releasing locks", t);
+ }
+ }
+
+ activationChangeNodes.remove(fqn);
+ }
+ }
+
+ public boolean isActivatingDeactivating(Fqn fqn)
+ {
+ return activationChangeNodes.contains(fqn);
+ }
+
+ /**
* Overloaded form of {@link #activate(Fqn)}
*
* @param fqn
@@ -321,7 +614,7 @@
* @author Ben Wang 02-2004
* @author Daniel Huang (dhuang at jboss.org)
* @author Brian Stansberry
- * @version $Id: RegionManager.java,v 1.4 2006/11/14 14:17:11 msurtani Exp $
+ * @version $Id: RegionManager.java,v 1.5 2006/11/14 19:56:09 vblagojevic Exp $
*/
/*public class ERegionManager
{
1.268 +205 -294 JBossCache/src/org/jboss/cache/TreeCache.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: TreeCache.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/TreeCache.java,v
retrieving revision 1.267
retrieving revision 1.268
diff -u -b -r1.267 -r1.268
--- TreeCache.java 14 Nov 2006 14:17:11 -0000 1.267
+++ TreeCache.java 14 Nov 2006 19:56:09 -0000 1.268
@@ -39,6 +39,7 @@
import org.jboss.cache.notifications.Notifier;
import org.jboss.cache.optimistic.DataVersion;
import org.jboss.cache.statetransfer.StateTransferManager;
+import org.jboss.util.stream.MarshalledValueInputStream;
import org.jboss.util.stream.MarshalledValueOutputStream;
import org.jgroups.Address;
import org.jgroups.Channel;
@@ -71,6 +72,7 @@
import java.io.OutputStream;
import java.lang.reflect.Method;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -92,7 +94,7 @@
* @author <a href="mailto:manik at jboss.org">Manik Surtani (manik at jboss.org)</a>
* @author Brian Stansberry
* @author Daniel Huang (dhuang at jboss.org)
- * @version $Id: TreeCache.java,v 1.267 2006/11/14 14:17:11 msurtani Exp $
+ * @version $Id: TreeCache.java,v 1.268 2006/11/14 19:56:09 vblagojevic Exp $
* <p/>
* @see <a href="http://labs.jboss.com/portal/jbosscache/docs">JBossCache doc</a>
*/
@@ -109,7 +111,7 @@
*/
protected DataNode root = NodeFactory.getInstance().createRootDataNode(NodeFactory.NODE_TYPE_TREENODE, this);
- private RegionManager regionManager = new RegionManager();
+ private RegionManager regionManager = null;
final static Object NULL = new Object();
@@ -258,12 +260,6 @@
private ThreadLocal<InvocationContext> invocationContextContainer = new ThreadLocal<InvocationContext>();
- /**
- * Set of Fqns of nodes that are currently being processed by
- * activateReqion or inactivateRegion. Requests for these fqns
- * will be ignored by _getState().
- */
- protected final Set activationChangeNodes = Collections.synchronizedSet(new HashSet());
public boolean started;
public Configuration getConfiguration()
@@ -292,6 +288,7 @@
{
notifier = new Notifier(this);
this.configuration = configuration;
+ regionManager = new RegionManager(this);
}
/**
@@ -300,6 +297,7 @@
public TreeCache() throws Exception
{
notifier = new Notifier(this);
+ regionManager = new RegionManager(this);
}
/**
@@ -309,6 +307,7 @@
{
notifier = new Notifier(this);
this.channel = channel;
+ regionManager = new RegionManager(this);
}
/**
@@ -555,6 +554,54 @@
}
}
+ public void fetchPartialState(Object sources [], Fqn sourceTarget, Fqn integrationTarget) throws Exception
+ {
+ String encodedStateId = sourceTarget + StateTransferManager.PARTIAL_STATE_DELIMETER + integrationTarget;
+ fetchPartialState(sources,encodedStateId);
+ }
+
+ public void fetchPartialState(Object sources [], Fqn subtree) throws Exception
+ {
+ if(subtree == null && subtree.isRoot())
+ {
+ throw new IllegalArgumentException("Cannot fetch partial state. Invalid subtree " + subtree);
+ }
+ fetchPartialState(sources,subtree.toString());
+ }
+
+ private void fetchPartialState(Object sources [], String stateId) throws Exception
+ {
+ if (sources == null || sources.length<1 || stateId == null)
+ {
+ throw new IllegalArgumentException("Cannot fetch partial state, targets are " + sources + " and stateId is "
+ + stateId);
+ }
+
+ ArrayList targets = new ArrayList(Arrays.asList(sources));
+ //skip *this* node as a target
+ targets.remove(getLocalAddress());
+
+ if (targets.isEmpty())
+ {
+ throw new IllegalArgumentException("Cannot fetch partial state. There are no target members specified");
+ }
+
+ log.debug("Node " + getLocalAddress() + " fetching partial state " + stateId + " from members " + targets);
+ boolean successfulTransfer = false;
+ for (Iterator iter = targets.iterator(); iter.hasNext()&&!successfulTransfer;)
+ {
+ Address target = (Address) iter.next();
+ log.debug("Node " + getLocalAddress() + " fetching partial state " + stateId + " from member " + target);
+ successfulTransfer = channel.getState(target, stateId, stateFetchTimeout);
+ log.debug("Node " + getLocalAddress() + " fetching partial state " + stateId + " from member " + target +(successfulTransfer?" successful": " failed"));
+ }
+
+ if(!successfulTransfer)
+ {
+ log.debug("Node " + getLocalAddress() + " could not fetch partial state " + stateId + " from any member " + targets);
+ }
+ }
+
/**
* Lifecycle method. This is like initialize.
*
@@ -652,6 +699,7 @@
}
channel.setOpt(Channel.AUTO_RECONNECT, true);
channel.setOpt(Channel.AUTO_GETSTATE, true);
+ channel.setOpt(Channel.BLOCK, true);
/* Used for JMX jconsole for JDK5.0
ArrayList servers=MBeanServerFactory.findMBeanServer(null);
@@ -964,133 +1012,6 @@
}
/**
- * Causes the cache to transfer state for the subtree rooted at
- * <code>subtreeFqn</code> and to begin accepting replication messages
- * for that subtree.
- * <p/>
- * <strong>NOTE:</strong> This method will cause the creation of a node
- * in the local tree at <code>subtreeFqn</code> whether or not that
- * node exists anywhere else in the cluster. If the node does not exist
- * elsewhere, the local node will be empty. The creation of this node will
- * not be replicated.
- *
- * @param subtreeFqn Fqn string indicating the uppermost node in the
- * portion of the tree that should be activated.
- * @throws RegionNotEmptyException if the node <code>subtreeFqn</code>
- * exists and has either data or children
- */
- public void activateRegion(String subtreeFqn) throws RegionNameConflictException, CacheException
- {
- Fqn fqn = Fqn.fromString(subtreeFqn);
-
- // Check whether the node already exists and has data
- DataNode subtreeRoot = findNode(fqn);
- if (!(isNodeEmpty(subtreeRoot)))
- {
- throw new RegionNotEmptyException("Node " + subtreeRoot.getFqn() + " already exists and is not empty");
- }
-
- if (log.isDebugEnabled())
- {
- log.debug("activating " + fqn);
- }
-
- try
- {
-
- // Add this fqn to the set of those we are activating
- // so calls to _getState for the fqn can return quickly
- activationChangeNodes.add(fqn);
-
- Region region = regionManager.getRegion(fqn, true);
-
- // If a classloader is registered for the node's region, use it
- ClassLoader cl = region.getClassLoader();
-
- // Request partial state from the cluster and integrate it
- if (buddyManager == null)
- {
- // Get the state from any node that has it and put it
- // in the main tree
- if (subtreeRoot == null)
- {
- // We'll update this node with the state we receive
- subtreeRoot = createSubtreeRootNode(fqn);
- }
-
- Object[] mbrArray = getMembers().toArray();
- getStateTransferManager().loadState(subtreeRoot.getFqn(), subtreeRoot, mbrArray, cl);
- }
- else
- {
- // Get the state from each DataOwner and integrate in their
- // respective buddy backup tree
- List buddies = buddyManager.getBuddyAddresses();
- for (Iterator it = buddies.iterator(); it.hasNext();)
- {
- Address buddy = (Address) it.next();
- Object[] sources = {buddy};
- Fqn base = new Fqn(BuddyManager.BUDDY_BACKUP_SUBTREE_FQN, BuddyManager.getGroupNameFromAddress(buddy));
- Fqn buddyRoot = new Fqn(base, fqn);
- subtreeRoot = findNode(buddyRoot);
- if (subtreeRoot == null)
- {
- // We'll update this node with the state we receive
- subtreeRoot = createSubtreeRootNode(buddyRoot);
- }
- getStateTransferManager().loadState(fqn, subtreeRoot, sources, cl);
- }
- }
-
- region.activate();
-
- }
- catch (Throwable t)
- {
- log.error("failed to activate " + subtreeFqn, t);
-
- // "Re-deactivate" the region
- try
- {
- inactivateRegion(subtreeFqn);
- }
- catch (Exception e)
- {
- log.error("failed inactivating " + subtreeFqn, e);
- // just swallow this one and throw the first one
- }
-
- // Throw the exception on, wrapping if necessary
- if (t instanceof RegionNameConflictException)
- {
- throw(RegionNameConflictException) t;
- }
- else if (t instanceof RegionNotEmptyException)
- {
- throw(RegionNotEmptyException) t;
- }
- else if (t instanceof CacheException)
- {
- throw(CacheException) t;
- }
- else
- {
- throw new CacheException(t.getClass().getName() + " " +
- t.getLocalizedMessage(), t);
- }
- }
- finally
- {
- activationChangeNodes.remove(fqn);
- }
- }
-
- public boolean isActivatingDeactivating(Fqn fqn)
- {
- return activationChangeNodes.contains(fqn);
- }
-
- /**
* Returns whether the given node is empty; i.e. has no key/value pairs
* in its data map and has no children.
*
@@ -1098,7 +1019,7 @@
* @return <code>true</code> if <code>node</code> is <code>null</code> or
* empty; <code>false</code> otherwise.
*/
- private boolean isNodeEmpty(DataNode node)
+ protected boolean isNodeEmpty(DataNode node)
{
boolean empty = true;
if (node != null)
@@ -1179,138 +1100,6 @@
}
/**
- * Causes the cache to stop accepting replication events for the subtree
- * rooted at <code>subtreeFqn</code> and evict all nodes in that subtree.
- *
- * @param subtreeFqn Fqn string indicating the uppermost node in the
- * portion of the tree that should be activated.
- * @throws RegionNameConflictException if <code>subtreeFqn</code> indicates
- * a node that is part of another
- * subtree that is being specially
- * managed (either by activate/inactiveRegion()
- * or by registerClassLoader())
- * @throws CacheException if there is a problem evicting nodes
- * @throws IllegalStateException if {@link Configuration#isUseRegionBasedMarshalling()} is <code>false</code>
- */
- public void inactivateRegion(String subtreeFqn) throws RegionNameConflictException, CacheException
- {
- if (!configuration.isUseRegionBasedMarshalling())
- {
- throw new IllegalStateException("TreeCache.deactivate(). useRegionBasedMarshalling flag is not set!");
- }
-
- Fqn fqn = Fqn.fromString(subtreeFqn);
- DataNode parent = null;
- DataNode subtreeRoot = null;
- boolean parentLocked = false;
- boolean subtreeLocked = false;
- try
- {
- // Record that this fqn is in status change, so can't provide state
- activationChangeNodes.add(fqn);
-
- boolean inactive = marshaller_.isInactive(subtreeFqn);
- if (!inactive)
- {
- regionManager.deactivate(subtreeFqn);
- }
-
- // Create a list with the Fqn in the main tree and any buddy backup trees
- ArrayList list = new ArrayList();
- list.add(fqn);
- if (buddyManager != null)
- {
- Set buddies = getChildrenNames(BuddyManager.BUDDY_BACKUP_SUBTREE_FQN);
- if (buddies != null)
- {
- for (Iterator it = buddies.iterator(); it.hasNext();)
- {
- Fqn base = new Fqn(BuddyManager.BUDDY_BACKUP_SUBTREE_FQN, it.next());
- list.add(new Fqn(base, fqn));
- }
- }
- }
-
- // Remove the subtree from the main tree and any buddy backup trees
- for (Iterator it = list.iterator(); it.hasNext();)
- {
- Fqn subtree = (Fqn) it.next();
- subtreeRoot = findNode(subtree);
- if (subtreeRoot != null)
- {
- // Acquire locks
- Object owner = getOwnerForLock();
- subtreeRoot.acquireAll(owner, stateFetchTimeout, DataNode.LockType.WRITE);
- subtreeLocked = true;
-
- // Lock the parent, as we're about to write to it
- parent = (DataNode) subtreeRoot.getParent();
- if (parent != null)
- {
- parent.acquire(owner, stateFetchTimeout, DataNode.LockType.WRITE);
- parentLocked = true;
- }
-
- // Remove the subtree
- _evictSubtree(subtree);
-
- // Release locks
- if (parent != null)
- {
- log.debug("forcing release of locks in parent");
- parent.releaseAllForce();
- }
-
- parentLocked = false;
-
- log.debug("forcing release of all locks in subtree");
- subtreeRoot.releaseAllForce();
- subtreeLocked = false;
- }
- }
- }
- catch (InterruptedException ie)
- {
- throw new CacheException("Interrupted while acquiring lock", ie);
- }
- finally
- {
- // If we didn't succeed, undo the marshalling change
- // NO. Since we inactivated, we may have missed changes
- //if (!success && !inactive)
- // marshaller_.activate(subtreeFqn);
-
- // If necessary, release locks
- if (parentLocked)
- {
- log.debug("forcing release of locks in parent");
- try
- {
- parent.releaseAllForce();
- }
- catch (Throwable t)
- {
- log.error("failed releasing locks", t);
- }
- }
- if (subtreeLocked)
- {
- log.debug("forcing release of all locks in subtree");
- try
- {
- subtreeRoot.releaseAllForce();
- }
- catch (Throwable t)
- {
- log.error("failed releasing locks", t);
- }
- }
-
- activationChangeNodes.remove(fqn);
- }
- }
-
- /**
* Evicts the node at <code>subtree</code> along with all descendant nodes.
*
* @param subtree Fqn indicating the uppermost node in the
@@ -3578,18 +3367,14 @@
public void setState(byte[] new_state)
{
- try
- {
-
if (new_state == null)
{
my_log.debug("transferred state is null (may be first member in cluster)");
+ return;
}
- else
+ try
{
getStateTransferManager().setState(new_state, Fqn.ROOT, null);
- }
-
isStateSet = true;
}
catch (Throwable t)
@@ -3616,8 +3401,26 @@
public byte[] getState(String state_id)
{
+ String sourceRoot = state_id;
+ boolean hasDifferentSourceAndIntegrationRoots = state_id.indexOf(StateTransferManager.PARTIAL_STATE_DELIMETER)>0;
+ if(hasDifferentSourceAndIntegrationRoots)
+ {
+ sourceRoot = state_id.split(StateTransferManager.PARTIAL_STATE_DELIMETER)[0];
+ }
+ try
+ {
+ return _getState(Fqn.fromString(sourceRoot), configuration.getInitialStateRetrievalTimeout(), true, true);
+ }
+ catch (Throwable t)
+ {
+ // This shouldn't happen as we set "suppressErrors" to true,
+ // but we have to catch the Throwable declared in the method sig
+ my_log.error("Caught " + t.getClass().getName() +
+ " while responding to partial state transfer request;" +
+ " returning null", t);
return null;
}
+ }
public void getState(OutputStream ostream)
{
@@ -3637,23 +3440,36 @@
public void getState(String state_id, OutputStream ostream)
{
-
- }
-
- public void setState(InputStream istream)
+ String sourceRoot = state_id;
+ boolean hasDifferentSourceAndIntegrationRoots = state_id.indexOf(StateTransferManager.PARTIAL_STATE_DELIMETER)>0;
+ if(hasDifferentSourceAndIntegrationRoots)
{
+ sourceRoot = state_id.split(StateTransferManager.PARTIAL_STATE_DELIMETER)[0];
+ }
try
{
+ _getState(ostream, Fqn.fromString(sourceRoot), configuration.getInitialStateRetrievalTimeout(), true, true);
+ }
+ catch (Throwable t)
+ {
+ // This shouldn't happen as we set "suppressErrors" to true,
+ // but we have to catch the Throwable declared in the method sig
+ my_log.error("Caught " + t.getClass().getName() +
+ " while responding to partial state transfer request;" +
+ " returning null", t);
+ }
+ }
+ public void setState(InputStream istream)
+ {
if (istream == null)
{
my_log.debug("stream is null (may be first member in cluster)");
+ return;
}
- else
+ try
{
getStateTransferManager().setState(istream, Fqn.ROOT, null);
- }
-
isStateSet = true;
}
catch (Throwable t)
@@ -3676,19 +3492,114 @@
stateLock.notifyAll();
}
}
-
}
public void setState(String state_id, byte[] state)
{
+ String targetRoot = state_id;
+ boolean hasDifferentSourceAndIntegrationRoots = state_id.indexOf(StateTransferManager.PARTIAL_STATE_DELIMETER)>0;
+ if(hasDifferentSourceAndIntegrationRoots)
+ {
+ targetRoot = state_id.split(StateTransferManager.PARTIAL_STATE_DELIMETER)[1];
+ }
+ try
+ {
+ if (state != null)
+ {
+ my_log.debug("Setting received partial state for subroot " +state_id);
+ Fqn subroot = Fqn.fromString(targetRoot);
+ Region region = regionManager.getRegion(subroot,false);
+ ClassLoader cl = null;
+ if(region!= null)
+ {
+ // If a classloader is registered for the node's region, use it
+ cl = region.getClassLoader();
+ }
+ getStateTransferManager().setState(state, subroot, cl);
+ isStateSet = true;
+ }
+ }
+ catch (Throwable t)
+ {
+ my_log.error("failed setting state", t);
+ if (t instanceof Exception)
+ {
+ setStateException = (Exception) t;
+ }
+ else
+ {
+ setStateException = new Exception(t);
+ }
+ }
+ finally
+ {
+ synchronized (stateLock)
+ {
+ // Notify wait that state has been set.
+ stateLock.notifyAll();
+ }
+ }
}
public void setState(String state_id, InputStream istream)
{
-
+ String targetRoot = state_id;
+ boolean hasDifferentSourceAndIntegrationRoots = state_id.indexOf(StateTransferManager.PARTIAL_STATE_DELIMETER)>0;
+ if(hasDifferentSourceAndIntegrationRoots)
+ {
+ targetRoot = state_id.split(StateTransferManager.PARTIAL_STATE_DELIMETER)[1];
+ }
+ if (istream == null)
+ {
+ my_log.debug("stream is null (may be first member in cluster). State is not set");
+ return;
}
+ try
+ {
+ MarshalledValueInputStream in = new MarshalledValueInputStream(istream);
+ boolean hasState = in.readBoolean();
+ if (!hasState)
+ {
+ in.close();
+ }
+ else
+ {
+ my_log.debug("Setting received partial state for subroot " + state_id);
+ Fqn subroot = Fqn.fromString(targetRoot);
+ Region region = regionManager.getRegion(subroot,false);
+ ClassLoader cl = null;
+ if (region != null)
+ {
+ // If a classloader is registered for the node's region, use it
+ cl = region.getClassLoader();
+ }
+ getStateTransferManager().setState(in, Fqn.fromString(state_id), cl);
+ isStateSet = true;
+ }
+ }
+ catch (Throwable t)
+ {
+ my_log.error("failed setting partial state", t);
+ if (t instanceof Exception)
+ {
+ setStateException = (Exception) t;
+ }
+ else
+ {
+ setStateException = new Exception(t);
+ }
+ }
+ finally
+ {
+ synchronized (stateLock)
+ {
+ // Notify wait that state has been set.
+ stateLock.notifyAll();
+ }
+ }
+ }
}
/*-------------------- End of MessageListener ----------------------*/
@@ -3995,7 +3906,7 @@
{
synchronized (this)
{
- if (regionManager == null) regionManager = new RegionManager();
+ if (regionManager == null) regionManager = new RegionManager(this);
}
}
return regionManager;
More information about the jboss-cvs-commits
mailing list