[jbosscache-commits] JBoss Cache SVN: r4833 - in core/trunk/src/main/java/org/jboss/cache: remoting and 2 other directories.
jbosscache-commits at lists.jboss.org
jbosscache-commits at lists.jboss.org
Tue Dec 11 12:26:38 EST 2007
Author: manik.surtani at jboss.com
Date: 2007-12-11 12:26:38 -0500 (Tue, 11 Dec 2007)
New Revision: 4833
Added:
core/trunk/src/main/java/org/jboss/cache/remoting/
core/trunk/src/main/java/org/jboss/cache/remoting/jgroups/
core/trunk/src/main/java/org/jboss/cache/remoting/jgroups/CacheMessageListener.java
Modified:
core/trunk/src/main/java/org/jboss/cache/Cache.java
core/trunk/src/main/java/org/jboss/cache/CacheImpl.java
core/trunk/src/main/java/org/jboss/cache/CacheSPI.java
core/trunk/src/main/java/org/jboss/cache/NodeFactory.java
core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
core/trunk/src/main/java/org/jboss/cache/RegionManager.java
core/trunk/src/main/java/org/jboss/cache/util/BeanUtils.java
Log:
Initial check in of injection and aop code
Modified: core/trunk/src/main/java/org/jboss/cache/Cache.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/Cache.java 2007-12-11 17:25:09 UTC (rev 4832)
+++ core/trunk/src/main/java/org/jboss/cache/Cache.java 2007-12-11 17:26:38 UTC (rev 4833)
@@ -155,6 +155,18 @@
V put(Fqn<?> fqn, K key, V value);
/**
+ * Convenience method that takes a string representation of an Fqn. Otherwise identical to {@link #put(Fqn, Object, Object)}
+ *
+ * @param fqn String representation of the Fqn
+ * @param key key with which the specified value is to be associated.
+ * @param value value to be associated with the specified key.
+ * @return previous value associated with specified key, or <code>null</code> if there was no mapping for key.
+ * A <code>null</code> return can also indicate that the Node previously associated <code>null</code> with the specified key, if the implementation supports null values.
+ */
+
+ V put(String fqn, K key, V value);
+
+ /**
* Under special operating behavior, associates the value with the specified key for a node identified by the Fqn passed in.
* <ul>
* <li> Only goes through if the node specified does not exist; no-op otherwise.</i>
@@ -189,6 +201,11 @@
void put(Fqn<?> fqn, Map<K, V> data);
/**
+ * Convenience method that takes a string representation of an Fqn. Otherwise identical to {@link #put(Fqn, java.util.Map)}
+ */
+ void put(String fqn, Map<K, V> data);
+
+ /**
* Removes the mapping for this key from a Node.
* Returns the value to which the Node previously associated the key, or
* <code>null</code> if the Node contained no mapping for this key.
@@ -200,6 +217,11 @@
V remove(Fqn<?> fqn, K key);
/**
+ * Convenience method that takes a string representation of an Fqn. Otherwise identical to {@link #remove(Fqn, Object)}
+ */
+ V remove(String fqn, K key);
+
+ /**
* Removes a {@link Node} indicated by absolute {@link Fqn}.
*
* @param fqn {@link Node} to remove
@@ -208,6 +230,25 @@
boolean removeNode(Fqn<?> fqn);
/**
+ * Convenience method that takes a string representation of an Fqn. Otherwise identical to {@link #removeNode(Fqn)}
+ */
+ boolean removeNode(String fqn);
+
+ /**
+ * A convenience method to retrieve a node directly from the cache. Equivalent to calling cache.getRoot().getChild(fqn).
+ *
+ * @param fqn fqn of the node to retrieve
+ * @return a Node object, or a null if the node does not exist.
+ */
+ Node getNode(Fqn<?> fqn);
+
+ /**
+ * Convenience method that takes a string representation of an Fqn. Otherwise identical to {@link #getNode(Fqn)}
+ */
+ Node getNode(String fqn);
+
+
+ /**
* Convenience method that allows for direct access to the data in a {@link Node}.
*
* @param fqn <b><i>absolute</i></b> {@link Fqn} to the {@link Node} to be accessed.
@@ -217,6 +258,11 @@
V get(Fqn<?> fqn, K key);
/**
+ * Convenience method that takes a string representation of an Fqn. Otherwise identical to {@link #get(Fqn, Object)}
+ */
+ V get(String fqn, K key);
+
+ /**
* Eviction call that evicts the specified {@link Node} from memory.
*
* @param fqn <b><i>absolute</i></b> {@link Fqn} to the {@link Node} to be evicted.
@@ -225,6 +271,13 @@
void evict(Fqn<?> fqn, boolean recursive);
/**
+ * Eviction call that evicts the specified {@link Node} from memory. Not recursive.
+ *
+ * @param fqn <b><i>absolute</i></b> {@link Fqn} to the {@link Node} to be evicted.
+ */
+ void evict(Fqn<?> fqn);
+
+ /**
* Retrieves a {@link Region} for a given {@link Fqn}. If the region does not exist,
* and <li>createIfAbsent</li> is true, then one is created.
* <p/>
@@ -364,10 +417,58 @@
void move(Fqn<?> nodeToMove, Fqn<?> newParent) throws NodeNotExistsException;
/**
+ * Convenience method that takes in string representations of Fqns. Otherwise identical to {@link #move(Fqn, Fqn)}
+ */
+ void move(String nodeToMove, String newParent) throws NodeNotExistsException;
+
+ /**
* Returns the version of the cache as a string.
*
* @return the version string of the cache.
* @see Version#printVersion
*/
String getVersion();
+
+ /**
+ * Retrieves a defensively copied data map of the underlying node. A convenience method to retrieving a node and
+ * getting data from the node directly.
+ *
+ * @param fqn
+ * @return map of data, or an empty map
+ * @throws CacheException
+ */
+ Map<K, V> getData(Fqn<?> fqn);
+
+ /**
+ * Convenience method that takes in a String represenation of the Fqn. Otherwise identical to {@link #getKeys(Fqn)}.
+ */
+ Set<K> getKeys(String fqn);
+
+ /**
+ * Returns a set of attribute keys for the Fqn.
+ * Returns null if the node is not found, otherwise a Set.
+ * The set is a copy of the actual keys for this node.
+ * <p/>
+ * A convenience method to retrieving a node and
+ * getting keys from the node directly.
+ *
+ * @param fqn name of the node
+ */
+ Set<K> getKeys(Fqn<?> fqn);
+
+ /**
+ * Convenience method that takes in a String represenation of the Fqn. Otherwise identical to {@link #removeData(Fqn)}.
+ */
+ void clearData(String fqn);
+
+ /**
+ * Removes the keys and properties from a named node.
+ * <p/>
+ * A convenience method to retrieving a node and
+ * getting keys from the node directly.
+ *
+ * @param fqn name of the node
+ */
+ void clearData(Fqn<?> fqn);
+
}
Modified: core/trunk/src/main/java/org/jboss/cache/CacheImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/CacheImpl.java 2007-12-11 17:25:09 UTC (rev 4832)
+++ core/trunk/src/main/java/org/jboss/cache/CacheImpl.java 2007-12-11 17:26:38 UTC (rev 4833)
@@ -12,12 +12,13 @@
import org.jboss.cache.buddyreplication.BuddyManager;
import org.jboss.cache.buddyreplication.BuddyNotInitException;
import org.jboss.cache.buddyreplication.GravitateResult;
-import org.jboss.cache.config.BuddyReplicationConfig;
import org.jboss.cache.config.Configuration;
import org.jboss.cache.config.Configuration.NodeLockingScheme;
import org.jboss.cache.config.Option;
import org.jboss.cache.config.RuntimeConfig;
+import org.jboss.cache.factories.ComponentRegistry;
import org.jboss.cache.factories.InterceptorChainFactory;
+import org.jboss.cache.factories.annotations.Inject;
import org.jboss.cache.interceptors.Interceptor;
import org.jboss.cache.loader.CacheLoader;
import org.jboss.cache.loader.CacheLoaderManager;
@@ -33,41 +34,33 @@
import org.jboss.cache.marshall.MethodCallFactory;
import org.jboss.cache.marshall.MethodDeclarations;
import org.jboss.cache.marshall.NodeData;
-import org.jboss.cache.marshall.VersionAwareMarshaller;
import org.jboss.cache.notifications.Notifier;
import org.jboss.cache.notifications.event.NodeModifiedEvent;
import org.jboss.cache.optimistic.DataVersion;
+import org.jboss.cache.remoting.jgroups.CacheMessageListener;
import org.jboss.cache.statetransfer.StateTransferManager;
import org.jboss.cache.transaction.GlobalTransaction;
import org.jboss.cache.transaction.OptimisticTransactionEntry;
import org.jboss.cache.transaction.TransactionEntry;
import org.jboss.cache.transaction.TransactionManagerLookup;
import org.jboss.cache.transaction.TransactionTable;
-import org.jboss.cache.util.ExposedByteArrayOutputStream;
import org.jboss.cache.util.ThreadGate;
import org.jboss.cache.util.concurrent.ConcurrentHashSet;
-import org.jboss.util.stream.MarshalledValueInputStream;
-import org.jboss.util.stream.MarshalledValueOutputStream;
import org.jgroups.*;
import org.jgroups.blocks.GroupRequest;
import org.jgroups.blocks.RpcDispatcher;
import org.jgroups.blocks.RspFilter;
import org.jgroups.util.Rsp;
import org.jgroups.util.RspList;
-import org.jgroups.util.Util;
import javax.management.MBeanServerFactory;
import javax.transaction.Status;
import javax.transaction.SystemException;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
-import java.io.ByteArrayInputStream;
-import java.io.InputStream;
import java.io.NotSerializableException;
-import java.io.OutputStream;
import java.lang.reflect.Method;
import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
/**
* The default implementation class of {@link org.jboss.cache.Cache} and {@link org.jboss.cache.CacheSPI}. This class
@@ -83,7 +76,7 @@
* @author Daniel Huang (dhuang at jboss.org)
* @see org.jboss.cache.Cache
*/
-public class CacheImpl<K, V> implements CacheSPI<K, V>
+public class CacheImpl
{
/**
@@ -99,7 +92,7 @@
/**
* Root node.
*/
- private NodeSPI<K, V> root;
+ private NodeSPI root;
/**
* Cache's region manager.
@@ -129,38 +122,23 @@
/**
* JGroups message listener.
*/
- private MessageListenerAdaptor ml = new MessageListenerAdaptor();
+ private CacheMessageListener messageListener;
/**
* Maintains mapping of transactions (keys) and Modifications/Undo-Operations
*/
- private final TransactionTable tx_table = new TransactionTable();
+ private TransactionTable transactionTable;
/**
- * HashMap<Thread, List<Lock>, maintains locks acquired by threads (used when no TXs are used)
- */
- private Map<Thread, List<NodeLock>> lock_table;
-
- /**
* Set<Fqn> of Fqns of the topmost node of internal regions that should
* not included in standard state transfers.
*/
private Set<Fqn> internalFqns = new ConcurrentHashSet<Fqn>();
/**
- * True if state was initialized during start-up.
- */
- private volatile boolean isStateSet = false;
-
- /**
- * Class name used to handle evictions.
- */
- private String evictionInterceptorClass = "org.jboss.cache.interceptors.EvictionInterceptor";
-
- /**
* Marshaller if register to handle marshalling
*/
- private Marshaller marshaller_ = null;
+ private Marshaller marshaller = null;
/**
* {@link #invokeMethod(org.jboss.cache.marshall.MethodCall,boolean)} will dispatch to this chain of interceptors.
@@ -177,7 +155,7 @@
/**
* Used to get the Transaction associated with the current thread
*/
- private TransactionManager tm = null;
+ private TransactionManager transactionManager = null;
/**
* Cache loader manager.
@@ -218,33 +196,49 @@
}
};
- private Configuration configuration;
+ private final Configuration configuration;
+ private final ComponentRegistry componentRegistry;
+ private NodeFactory nodeFactory;
+ private CacheSPI spi;
/**
* Constructs an uninitialized CacheImpl.
*/
protected CacheImpl() throws Exception
{
- configuration = new Configuration(this);
- notifier = new Notifier(this);
- regionManager = new RegionManager(this);
- cacheStatus = CacheStatus.INSTANTIATED;
+ this(new Configuration());
}
- public StateTransferManager getStateTransferManager()
+ /**
+ * Constructs an uninitialized CacheImpl.
+ */
+ protected CacheImpl(Configuration configuration) throws Exception
{
- if (stateTransferManager == null)
- {
- stateTransferManager = new StateTransferManager(this);
- }
- return stateTransferManager;
+ this.configuration = configuration;
+ this.componentRegistry = new ComponentRegistry(configuration);
+ this.cacheStatus = CacheStatus.INSTANTIATED;
}
- public void setStateTransferManager(StateTransferManager manager)
+ protected ComponentRegistry getComponentRegistry()
{
- this.stateTransferManager = manager;
+ return componentRegistry;
}
+ @Inject
+ private void injectDependencies(Notifier notifier, RegionManager regionManager, TransactionManager transactionManager,
+ TransactionTable transactionTable, StateTransferManager stateTransferManager, NodeFactory nodeFactory,
+ CacheSPI spi, CacheMessageListener messageListener)
+ {
+ this.notifier = notifier;
+ this.regionManager = regionManager;
+ this.transactionManager = transactionManager;
+ this.transactionTable = transactionTable;
+ this.stateTransferManager = stateTransferManager;
+ this.nodeFactory = nodeFactory;
+ this.spi = spi;
+ this.messageListener = messageListener;
+ }
+
public Configuration getConfiguration()
{
return configuration;
@@ -261,7 +255,7 @@
/**
* Returns the root node.
*/
- public NodeSPI<K, V> getRoot()
+ public NodeSPI getRoot()
{
return root;
}
@@ -299,27 +293,15 @@
*/
public TransactionTable getTransactionTable()
{
- return tx_table;
+ return transactionTable;
}
/**
- * Returns the lock table.
- */
- public Map<Thread, List<NodeLock>> getLockTable()
- {
- if (lock_table == null)
- {
- lock_table = new ConcurrentHashMap<Thread, List<NodeLock>>();
- }
- return lock_table;
- }
-
- /**
* Returns the contents of the TransactionTable as a string.
*/
public String dumpTransactionTable()
{
- return tx_table.toString(true);
+ return transactionTable.toString(true);
}
/**
@@ -348,11 +330,6 @@
return cacheLoaderManager.getCacheLoader();
}
- public String getEvictionInterceptorClass()
- {
- return this.evictionInterceptorClass;
- }
-
private void setUseReplQueue(boolean flag)
{
if (flag)
@@ -406,7 +383,7 @@
*/
public TransactionManager getTransactionManager()
{
- return tm;
+ return transactionManager;
}
/**
@@ -430,13 +407,13 @@
}
}
- void fetchPartialState(List<Address> sources, Fqn sourceTarget, Fqn integrationTarget) throws Exception
+ public void fetchPartialState(List<Address> sources, Fqn sourceTarget, Fqn integrationTarget) throws Exception
{
String encodedStateId = sourceTarget + StateTransferManager.PARTIAL_STATE_DELIMITER + integrationTarget;
fetchPartialState(sources, encodedStateId);
}
- void fetchPartialState(List<Address> sources, Fqn subtree) throws Exception
+ public void fetchPartialState(List<Address> sources, Fqn subtree) throws Exception
{
if (subtree == null)
{
@@ -455,7 +432,7 @@
if (log.isWarnEnabled())
{
log.warn("Cannot fetch partial state, targets are " + sources +
- " and stateId is " + stateId);
+ " and stateId is " + stateId);
}
return;
}
@@ -478,13 +455,13 @@
for (Address target : targets)
{
log.debug("Node " + getLocalAddress() + " fetching partial state " + stateId + " from member " + target);
- isStateSet = false;
+ messageListener.setStateSet(false);
successfulTransfer = channel.getState(target, stateId, configuration.getStateRetrievalTimeout());
if (successfulTransfer)
{
try
{
- ml.waitForState();
+ messageListener.waitForState();
}
catch (Exception transferFailed)
{
@@ -538,7 +515,7 @@
* @throws Error
*/
private void handleLifecycleTransitionFailure(Throwable t)
- throws CacheException, RuntimeException, Error
+ throws RuntimeException, Error
{
cacheStatus = CacheStatus.FAILED;
if (t instanceof CacheException)
@@ -562,23 +539,23 @@
configureLogCategory();
// initialise the node factory and set this in the runtime.
- NodeFactory<K, V> nf;
- if ((nf = configuration.getRuntimeConfig().getNodeFactory()) == null)
- {
- nf = new NodeFactory<K, V>(this);
- configuration.getRuntimeConfig().setNodeFactory(nf);
- }
- else
- {
- // don't create a new one each and every time. After stopping and starting the cache, the old NodeFactory may still be valid.
- nf.init();
- }
+// NodeFactory nf;
+// if ((nf = configuration.getRuntimeConfig().getNodeFactory()) == null)
+// {
+// nf = new NodeFactory(this);
+// configuration.getRuntimeConfig().setNodeFactory(nf);
+// }
+// else
+// {
+// // don't create a new one each and every time. After stopping and starting the cache, the old NodeFactory may still be valid.
+// nf.init();
+// }
- if (notifier == null)
- notifier = new Notifier(this);
+// if (notifier == null)
+// notifier = new Notifier(this);
// create a new root temporarily.
- NodeSPI<K, V> tempRoot = nf.createRootDataNode();
+ NodeSPI tempRoot = nodeFactory.createRootDataNode();
// if we don't already have a root or the new (temp) root is of a different class (optimistic vs pessimistic) to
// the current root, then we use the new one. Helps preserve data between cache restarts.
if (root == null || !root.getClass().equals(tempRoot.getClass()))
@@ -589,29 +566,28 @@
initialiseCacheLoaderManager();
}
// first set up the Buddy Manager and an RPCManager
- if (configuration.getCacheMode() != Configuration.CacheMode.LOCAL)
- {
- getConfiguration().getRuntimeConfig().setRPCManager(new RPCManagerImpl(this));
- setBuddyReplicationConfig(configuration.getBuddyReplicationConfig());
- }
+// if (configuration.getCacheMode() != Configuration.CacheMode.LOCAL)
+// {
+ //getConfiguration().getRuntimeConfig().setRPCManager(new RPCManagerImpl(this));
+// setBuddyReplicationConfig(configuration.getBuddyReplicationConfig());
+// }
// build interceptor chain
- try
- {
- interceptor_chain = InterceptorChainFactory.getInstance().buildInterceptorChain(this);
- }
- catch (Exception e)
- {
- throw new CacheException("Unable to build interceptor chain", e);
- }
+// try
+// {
+// interceptor_chain = InterceptorChainFactory.getInstance().buildInterceptorChain(configuration, componentRegistry);
+// }
+// catch (Exception e)
+// {
+// throw new CacheException("Unable to build interceptor chain", e);
+// }
setUseReplQueue(configuration.isUseReplQueue());
setIsolationLevel(configuration.getIsolationLevel());
- getRegionManager();// make sure we create one
createEvictionPolicy();
- getRegionManager().setDefaultInactive(configuration.isInactiveOnStartup());
+ regionManager.setDefaultInactive(configuration.isInactiveOnStartup());
cacheStatus = CacheStatus.CREATED;
}
@@ -619,8 +595,8 @@
private void createTransactionManager()
{
// See if we had a TransactionManager injected into our config
- this.tm = configuration.getRuntimeConfig().getTransactionManager();
- if (tm == null)
+ this.transactionManager = configuration.getRuntimeConfig().getTransactionManager();
+ if (transactionManager == null)
{
// Nope. See if we can look it up from JNDI
if (this.tm_lookup == null && configuration.getTransactionManagerLookupClass() != null)
@@ -640,8 +616,8 @@
{
if (tm_lookup != null)
{
- tm = tm_lookup.getTransactionManager();
- configuration.getRuntimeConfig().setTransactionManager(tm);
+ transactionManager = tm_lookup.getTransactionManager();
+ configuration.getRuntimeConfig().setTransactionManager(transactionManager);
}
else
{
@@ -706,7 +682,7 @@
{
cacheStatus = CacheStatus.STARTING;
- createTransactionManager();
+// createTransactionManager();
// cache loaders should be initialised *before* any state transfers take place to prevent
// exceptions involving cache loaders not being started. - Manik
@@ -716,7 +692,7 @@
cacheLoaderManager.startCacheLoader();
}
// now that we have a TM we can init the interceptor chain
- InterceptorChainFactory.getInstance().initialiseInterceptors(interceptor_chain, this);
+// InterceptorChainFactory.getInstance().initialiseInterceptors(interceptor_chain, this);
switch (configuration.getCacheMode())
{
@@ -738,15 +714,15 @@
long start = System.currentTimeMillis();
channel.connect(configuration.getClusterName(), null, null, configuration.getStateRetrievalTimeout());
//if I am not the only and the first member than wait for a state to arrive
- if (getMembers().size()>1)
+ if (getMembers().size() > 1)
{
- ml.waitForState();
+ messageListener.waitForState();
}
if (log.isDebugEnabled())
{
log.debug("connected, state was retrieved successfully (in " + (System.currentTimeMillis() - start)
- + " milliseconds)");
+ + " milliseconds)");
}
}
catch (StateTransferException ste)
@@ -784,7 +760,8 @@
}
if (buddyManager != null)
{
- buddyManager.init(this);
+ //buddyManager.init(this);
+ buddyManager.init();
if (configuration.isUseReplQueue())
{
log.warn("Replication queue not supported when using buddy replication. Disabling repliction queue.");
@@ -809,7 +786,7 @@
regionManager.startEvictionThread();
}
- notifier.notifyCacheStarted(this, getInvocationContext());
+ notifier.notifyCacheStarted(spi, getInvocationContext());
addShutdownHook();
@@ -833,7 +810,7 @@
}
};
- Runtime.getRuntime().addShutdownHook(shutdownHook);
+ Runtime.getRuntime().addShutdownHook(shutdownHook);
}
else
{
@@ -857,7 +834,7 @@
catch (CacheException e)
{
log.warn("Needed to call stop() before destroying but stop() " +
- "threw exception. Proceeding to destroy", e);
+ "threw exception. Proceeding to destroy", e);
}
}
else
@@ -906,7 +883,7 @@
configuration.getRuntimeConfig().setChannel(null);
}
disp = null;
- tm = null;
+ transactionManager = null;
}
/**
@@ -931,7 +908,7 @@
if (failed)
{
log.warn("Attempted to stop() from FAILED state, " +
- "but caught exception; try calling destroy()", t);
+ "but caught exception; try calling destroy()", t);
}
handleLifecycleTransitionFailure(t);
}
@@ -990,12 +967,12 @@
if (notifier != null)
{
- notifier.notifyCacheStopped(this, getInvocationContext());
+ notifier.notifyCacheStopped(spi, getInvocationContext());
notifier.removeAllCacheListeners();
}
// unset transaction manager reference
- tm = null;
+ transactionManager = null;
cacheStatus = CacheStatus.STOPPED;
@@ -1018,21 +995,21 @@
*
* @param config
*/
- private void setBuddyReplicationConfig(BuddyReplicationConfig config)
- {
- if (config != null)
- {
- buddyManager = new BuddyManager(config);
- if (!buddyManager.isEnabled())
- {
- buddyManager = null;
- }
- else
- {
- internalFqns.add(BuddyManager.BUDDY_BACKUP_SUBTREE_FQN);
- }
- }
- }
+// private void setBuddyReplicationConfig(BuddyReplicationConfig config)
+// {
+// if (config != null)
+// {
+// buddyManager = new BuddyManager(config);
+// if (!buddyManager.isEnabled())
+// {
+// buddyManager = null;
+// }
+// else
+// {
+// internalFqns.add(BuddyManager.BUDDY_BACKUP_SUBTREE_FQN);
+// }
+// }
+// }
/**
* Retrieves the Buddy Manager configured.
@@ -1062,7 +1039,7 @@
protected void createEvictionPolicy()
{
if (configuration.getEvictionConfig() != null
- && configuration.getEvictionConfig().isValidConfig())
+ && configuration.getEvictionConfig().isValidConfig())
{
regionManager.setEvictionConfig(configuration.getEvictionConfig());
regionManager.setUsingEvictions(true);
@@ -1149,10 +1126,10 @@
Fqn<Object> tmp = new Fqn<Object>(subtree, s);
_remove(null, // no tx
- tmp,
- false, // no undo ops
- false, // no nodeEvent
- true);// is an eviction
+ tmp,
+ false, // no undo ops
+ false, // no nodeEvent
+ true);// is an eviction
}
}
@@ -1161,7 +1138,7 @@
}
- private void removeLocksForDeadMembers(NodeSPI<K, V> node, List deadMembers)
+ private void removeLocksForDeadMembers(NodeSPI node, List deadMembers)
{
Set<GlobalTransaction> deadOwners = new HashSet<GlobalTransaction>();
NodeLock lock = node.getLock();
@@ -1188,14 +1165,14 @@
if (broken && log.isTraceEnabled())
{
log.trace("Broke lock for node " + node.getFqn() +
- " held by " + deadOwner);
+ " held by " + deadOwner);
}
}
// Recursively unlock children
- for (NodeSPI<K, V> child : node.getChildrenDirect())
+ for (Object child : node.getChildrenDirect())
{
- removeLocksForDeadMembers(child, deadMembers);
+ removeLocksForDeadMembers((NodeSPI) child, deadMembers);
}
}
@@ -1229,16 +1206,16 @@
*
* @param fqn name of the DataNode to retreive
*/
- public Node<K, V> get(Fqn<?> fqn) throws CacheException
+ public Node get(Fqn<?> fqn) throws CacheException
{
MethodCall m = MethodCallFactory.create(MethodDeclarations.getNodeMethodLocal, fqn);
- return (Node<K, V>) invokeMethod(m, true);
+ return (Node) invokeMethod(m, true);
}
/**
* Returns the raw data of the node; called externally internally.
*/
- public Node<K, V> _get(Fqn<?> fqn) throws CacheException
+ public Node _get(Fqn<?> fqn) throws CacheException
{
return findNode(fqn);
}
@@ -1254,53 +1231,27 @@
}
/**
- * Returns a set of attribute keys for the Fqn.
- * Returns null if the node is not found, otherwise a Set.
- * The set is a copy of the actual keys for this node.
- *
- * @param fqn name of the node
- */
- public Set getKeys(String fqn) throws CacheException
- {
- return getKeys(Fqn.fromString(fqn));
- }
-
- /**
- * Returns a set of attribute keys for the Fqn.
- * Returns null if the node is not found, otherwise a Set.
- * The set is a copy of the actual keys for this node.
- *
- * @param fqn name of the node
- */
- public Set<K> getKeys(Fqn<?> fqn) throws CacheException
- {
- MethodCall m = MethodCallFactory.create(MethodDeclarations.getKeysMethodLocal, fqn);
- return (Set<K>) invokeMethod(m, true);
- }
-
-
- /**
* Retrieves a defensively copied data map of the underlying node.
*
* @param fqn
* @return map of data, or an empty map
* @throws CacheException
*/
- public Map<K, V> getData(Fqn<?> fqn) throws CacheException
+ public Map getData(Fqn<?> fqn) throws CacheException
{
MethodCall m = MethodCallFactory.create(MethodDeclarations.getDataMapMethodLocal, fqn);
- return (Map<K, V>) invokeMethod(m, true);
+ return (Map) invokeMethod(m, true);
}
public Set _getKeys(Fqn<?> fqn) throws CacheException
{
- NodeSPI<K, V> n = findNode(fqn);
+ NodeSPI n = findNode(fqn);
if (n == null)
{
return null;
}
- Set<K> keys = n.getKeysDirect();
- return new HashSet<K>(keys);
+ Set keys = n.getKeysDirect();
+ return new HashSet(keys);
}
/**
@@ -1310,7 +1261,7 @@
* @param fqn The fully qualified name of the node.
* @param key The key.
*/
- public V get(String fqn, K key) throws CacheException
+ public Object get(String fqn, Object key) throws CacheException
{
return get(Fqn.fromString(fqn), key);
}
@@ -1323,21 +1274,21 @@
* @param fqn The fully qualified name of the node.
* @param key The key.
*/
- public V get(Fqn<?> fqn, K key) throws CacheException
+ public Object get(Fqn<?> fqn, Object key) throws CacheException
{
return get(fqn, key, true);
}
- public V _get(Fqn<?> fqn, K key, boolean sendNodeEvent) throws CacheException
+ public Object _get(Fqn<?> fqn, Object key, boolean sendNodeEvent) throws CacheException
{
InvocationContext ctx = getInvocationContext();
if (log.isTraceEnabled())
{
log.trace(new StringBuffer("_get(").append("\"").append(fqn).append("\", \"").append(key).append("\", \"").
- append(sendNodeEvent).append("\")"));
+ append(sendNodeEvent).append("\")"));
}
if (sendNodeEvent) notifier.notifyNodeVisited(fqn, true, ctx);
- NodeSPI<K, V> n = findNode(fqn);
+ NodeSPI n = findNode(fqn);
if (n == null)
{
log.trace("node not found");
@@ -1348,10 +1299,10 @@
}
- protected V get(Fqn<?> fqn, K key, boolean sendNodeEvent) throws CacheException
+ protected Object get(Fqn<?> fqn, Object key, boolean sendNodeEvent) throws CacheException
{
MethodCall m = MethodCallFactory.create(MethodDeclarations.getKeyValueMethodLocal, fqn, key, sendNodeEvent);
- return (V) invokeMethod(m, true);
+ return invokeMethod(m, true);
}
/**
@@ -1384,15 +1335,15 @@
return n != null;
}
- public NodeSPI<K, V> peek(Fqn<?> fqn, boolean includeDeletedNodes)
+ public NodeSPI peek(Fqn<?> fqn, boolean includeDeletedNodes)
{
return peek(fqn, includeDeletedNodes, false);
}
- public NodeSPI<K, V> peek(Fqn<?> fqn, boolean includeDeletedNodes, boolean includeInvalidNodes)
+ public NodeSPI peek(Fqn<?> fqn, boolean includeDeletedNodes, boolean includeInvalidNodes)
{
if (fqn == null || fqn.size() == 0) return root;
- NodeSPI<K, V> n = root;
+ NodeSPI n = root;
int fqnSize = fqn.size();
for (int i = 0; i < fqnSize; i++)
{
@@ -1463,12 +1414,12 @@
* @param fqn The fully qualified name of the new node
* @param data The new data. May be null if no data should be set in the node.
*/
- public void put(Fqn<?> fqn, Map<K, V> data) throws CacheException
+ public void put(Fqn<?> fqn, Map data) throws CacheException
{
put(fqn, data, false);
}
- public void put(Fqn<?> fqn, Map<K, V> data, boolean erase) throws CacheException
+ public void put(Fqn<?> fqn, Map data, boolean erase) throws CacheException
{
GlobalTransaction tx = getCurrentTransaction();
MethodCall m;
@@ -1493,7 +1444,7 @@
* @param value The value
* @return Object The previous value (if any), if node was present
*/
- public V put(String fqn, K key, V value) throws CacheException
+ public Object put(String fqn, Object key, Object value) throws CacheException
{
return put(Fqn.fromString(fqn), key, value);
}
@@ -1508,11 +1459,11 @@
* @param value The value
* @return Object The previous value (if any), if node was present
*/
- public V put(Fqn<?> fqn, K key, V value) throws CacheException
+ public Object put(Fqn<?> fqn, Object key, Object value) throws CacheException
{
GlobalTransaction tx = getCurrentTransaction();
MethodCall m = MethodCallFactory.create(MethodDeclarations.putKeyValMethodLocal, tx, fqn, key, value, true);
- return (V) invokeMethod(m, true);
+ return invokeMethod(m, true);
}
/**
@@ -1594,7 +1545,7 @@
* @param key The key to be removed
* @return The previous value, or null if none was associated with the given key
*/
- public V remove(String fqn, K key) throws CacheException
+ public Object remove(String fqn, Object key) throws CacheException
{
return remove(Fqn.fromString(fqn), key);
}
@@ -1606,32 +1557,14 @@
* @param key The key to be removed
* @return The previous value, or null if none was associated with the given key
*/
- public V remove(Fqn<?> fqn, K key) throws CacheException
+ public Object remove(Fqn<?> fqn, Object key) throws CacheException
{
GlobalTransaction tx = getCurrentTransaction();
MethodCall m = MethodCallFactory.create(MethodDeclarations.removeKeyMethodLocal, tx, fqn, key, true);
- return (V) invokeMethod(m, true);
+ return invokeMethod(m, true);
}
/**
- * Removes the keys and properties from a node.
- */
- public void removeData(String fqn) throws CacheException
- {
- removeData(Fqn.fromString(fqn));
- }
-
- /**
- * Removes the keys and properties from a named node.
- */
- public void removeData(Fqn fqn) throws CacheException
- {
- GlobalTransaction tx = getCurrentTransaction();
- MethodCall m = MethodCallFactory.create(MethodDeclarations.removeDataMethodLocal, tx, fqn, true);
- invokeMethod(m, true);
- }
-
- /**
* Lock a given node (or the entire subtree starting at this node)
* @param fqn The FQN of the node
* @param owner The owner. This is simply a key into a hashtable, and can be anything, e.g.
@@ -1716,23 +1649,7 @@
}
}
-
/**
- * Returns all children of a given node.
- * Returns null of the parent node was not found, or if there are no
- * children.
- * The set is unmodifiable.
- *
- * @param fqn The fully qualified name of the node
- * @return Set A list of child names (as Strings)
- * @see #getChildrenNames(Fqn)
- */
- public Set getChildrenNames(String fqn) throws CacheException
- {
- return getChildrenNames(Fqn.fromString(fqn));
- }
-
- /**
* Returns all children of a given node. Returns an empty set if there are no children.
* The set is unmodifiable.
*
@@ -1754,7 +1671,7 @@
@SuppressWarnings("unchecked")
public <E> Set<E> _getChildrenNames(Fqn<E> fqn) throws CacheException
{
- NodeSPI<K, V> n = findNode(fqn);
+ NodeSPI n = findNode(fqn);
if (n == null) return null;
Set<E> childNames = new HashSet<E>();
Map childrenMap = n.getChildrenMapDirect();
@@ -1810,9 +1727,9 @@
{
if (root == null)
return sb.toString();
- for (NodeSPI n : root.getChildrenDirect())
+ for (Object n : root.getChildrenDirect())
{
- n.print(sb, indent);
+ ((NodeSPI) n).print(sb, indent);
sb.append("\n");
}
}
@@ -1841,9 +1758,9 @@
StringBuffer sb = new StringBuffer("\n");
int indent = 0;
- for (NodeSPI n : root.getChildrenDirect())
+ for (Object n : root.getChildrenDirect())
{
- n.getLock().printLockInfo(sb, indent);
+ ((NodeSPI) n).getLock().printLockInfo(sb, indent);
sb.append("\n");
}
return sb.toString();
@@ -1857,17 +1774,20 @@
return numLocks(root);
}
- private int numLocks(NodeSPI<K, V> n)
+ private int numLocks(NodeSPI n)
{
int num = 0;
- if (n.getLock().isLocked())
+ if (n != null)
{
- num++;
+ if (n.getLock().isLocked())
+ {
+ num++;
+ }
+ for (Object cn : n.getChildrenDirect(true))
+ {
+ num += numLocks((NodeSPI) cn);
+ }
}
- for (NodeSPI<K, V> cn : n.getChildrenDirect(true))
- {
- num += numLocks(cn);
- }
return num;
}
@@ -1882,12 +1802,15 @@
return numNodes(root) - 1;
}
- private int numNodes(NodeSPI<K, V> n)
+ private int numNodes(NodeSPI n)
{
int count = 1;// for n
- for (NodeSPI<K, V> child : n.getChildrenDirect())
+ if (n != null)
{
- count += numNodes(child);
+ for (Object child : n.getChildrenDirect())
+ {
+ count += numNodes((NodeSPI) child);
+ }
}
return count;
}
@@ -1914,12 +1837,12 @@
return numAttributes(findNode(fqn));
}
- private int numAttributes(NodeSPI<K, V> n)
+ private int numAttributes(NodeSPI n)
{
int count = 0;
- for (NodeSPI<K, V> child : n.getChildrenDirect())
+ for (Object child : n.getChildrenDirect())
{
- count += numAttributes(child);
+ count += numAttributes((NodeSPI) child);
}
count += n.getDataDirect().size();
return count;
@@ -1940,14 +1863,14 @@
@Deprecated
public List callRemoteMethods(List<Address> mbrs, MethodCall method_call,
boolean synchronous, boolean exclude_self, long timeout)
- throws Exception
+ throws Exception
{
return callRemoteMethods(mbrs, method_call, synchronous ? GroupRequest.GET_ALL : GroupRequest.GET_NONE, exclude_self, timeout);
}
@Deprecated
public List callRemoteMethods(List<Address> mbrs, MethodCall method_call, int mode, boolean exclude_self, long timeout)
- throws Exception
+ throws Exception
{
return callRemoteMethods(mbrs, method_call, mode, exclude_self, timeout, null);
}
@@ -1966,7 +1889,7 @@
*/
@Deprecated
public List callRemoteMethods(List<Address> mbrs, MethodCall method_call, int mode, boolean exclude_self, long timeout, RspFilter rspFilter)
- throws Exception
+ throws Exception
{
int modeToUse = mode;
int preferredMode;
@@ -1974,7 +1897,6 @@
modeToUse = preferredMode;
RspList rsps = null;
- Rsp rsp;
List retval;
Vector<Address> validMembers;
@@ -2022,8 +1944,8 @@
throw new TimeoutException("State retrieval timed out waiting for flush unblock.");
}
rsps = rspFilter == null
- ? disp.callRemoteMethods(validMembers, method_call, modeToUse, timeout, buddyManager != null && buddyManager.isEnabled())
- : disp.callRemoteMethods(validMembers, method_call, modeToUse, timeout, buddyManager != null && buddyManager.isEnabled(), false, rspFilter);
+ ? disp.callRemoteMethods(validMembers, method_call, modeToUse, timeout, buddyManager != null && buddyManager.isEnabled())
+ : disp.callRemoteMethods(validMembers, method_call, modeToUse, timeout, buddyManager != null && buddyManager.isEnabled(), false, rspFilter);
// a null response is 99% likely to be due to a marshalling problem - we throw a NSE, this needs to be changed when
// JGroups supports http://jira.jboss.com/jira/browse/JGRP-193
@@ -2043,9 +1965,9 @@
}
retval = new ArrayList(rsps.size());
- for (int i = 0; i < rsps.size(); i++)
+
+ for (Rsp rsp : rsps.values())
{
- rsp = (Rsp) rsps.elementAt(i);
if (rsp.wasSuspected() || !rsp.wasReceived())
{
CacheException ex;
@@ -2063,7 +1985,8 @@
{
if (rsp.getValue() instanceof Exception)
{
- if (log.isTraceEnabled()) log.trace("Recieved exception'" + rsp.getValue() + "' from " + rsp.getSender());
+ if (log.isTraceEnabled())
+ log.trace("Recieved exception'" + rsp.getValue() + "' from " + rsp.getSender());
throw (Exception) rsp.getValue();
}
retval.add(rsp.getValue());
@@ -2086,7 +2009,7 @@
@Deprecated
public List callRemoteMethods(List<Address> members, Method method, Object[] args,
boolean synchronous, boolean exclude_self, long timeout)
- throws Exception
+ throws Exception
{
return callRemoteMethods(members, MethodCallFactory.create(method, args), synchronous, exclude_self, timeout);
}
@@ -2107,7 +2030,7 @@
public List callRemoteMethods(Vector<Address> members, String method_name,
Class[] types, Object[] args,
boolean synchronous, boolean exclude_self, long timeout)
- throws Exception
+ throws Exception
{
Method method = getClass().getDeclaredMethod(method_name, types);
return callRemoteMethods(members, method, args, synchronous, exclude_self, timeout);
@@ -2119,22 +2042,22 @@
/* ----- These are VERSIONED callbacks to facilitate JBCACHE-843. Also see docs/design/DataVersion.txt --- */
- public void _putForExternalRead(GlobalTransaction gtx, Fqn fqn, K key, V value, DataVersion dv) throws CacheException
+ public void _putForExternalRead(GlobalTransaction gtx, Fqn fqn, Object key, Object value, DataVersion dv) throws CacheException
{
_putForExternalRead(gtx, fqn, key, value);
}
- public void _put(GlobalTransaction tx, Fqn fqn, Map<K, V> data, boolean create_undo_ops, DataVersion dv) throws CacheException
+ public void _put(GlobalTransaction tx, Fqn fqn, Map data, boolean create_undo_ops, DataVersion dv) throws CacheException
{
_put(tx, fqn, data, create_undo_ops, false, dv);
}
- public void _put(GlobalTransaction tx, Fqn fqn, Map<K, V> data, boolean create_undo_ops, boolean erase_contents, DataVersion dv) throws CacheException
+ public void _put(GlobalTransaction tx, Fqn fqn, Map data, boolean create_undo_ops, boolean erase_contents, DataVersion dv) throws CacheException
{
_put(tx, fqn, data, create_undo_ops, erase_contents);
}
- public Object _put(GlobalTransaction tx, Fqn fqn, K key, V value, boolean create_undo_ops, DataVersion dv) throws CacheException
+ public Object _put(GlobalTransaction tx, Fqn fqn, Object key, Object value, boolean create_undo_ops, DataVersion dv) throws CacheException
{
return _put(tx, fqn, key, value, create_undo_ops);
}
@@ -2144,7 +2067,7 @@
return _remove(tx, fqn, create_undo_ops, true);
}
- public Object _remove(GlobalTransaction tx, Fqn fqn, K key, boolean create_undo_ops, DataVersion dv) throws CacheException
+ public Object _remove(GlobalTransaction tx, Fqn fqn, Object key, boolean create_undo_ops, DataVersion dv) throws CacheException
{
return _remove(tx, fqn, key, create_undo_ops);
}
@@ -2172,8 +2095,8 @@
* @param create_undo_ops If true, undo operations will be created (default is true).
* Otherwise they will not be created (used by rollback()).
*/
- public void _put(GlobalTransaction tx, String fqn, Map<K, V> data, boolean create_undo_ops)
- throws CacheException
+ public void _put(GlobalTransaction tx, String fqn, Map data, boolean create_undo_ops)
+ throws CacheException
{
_put(tx, Fqn.fromString(fqn), data, create_undo_ops);
}
@@ -2194,8 +2117,8 @@
* @param create_undo_ops If true, undo operations will be created (default is true).
* Otherwise they will not be created (used by rollback()).
*/
- public void _put(GlobalTransaction tx, Fqn fqn, Map<K, V> data, boolean create_undo_ops)
- throws CacheException
+ public void _put(GlobalTransaction tx, Fqn fqn, Map data, boolean create_undo_ops)
+ throws CacheException
{
_put(tx, fqn, data, create_undo_ops, false);
}
@@ -2216,8 +2139,8 @@
* @param erase_contents Clear the existing hashmap before putting the new data into it
* Otherwise they will not be created (used by rollback()).
*/
- public void _put(GlobalTransaction tx, Fqn fqn, Map<K, V> data, boolean create_undo_ops, boolean erase_contents)
- throws CacheException
+ public void _put(GlobalTransaction tx, Fqn fqn, Map data, boolean create_undo_ops, boolean erase_contents)
+ throws CacheException
{
if (log.isTraceEnabled())
{
@@ -2225,8 +2148,8 @@
}
InvocationContext ctx = getInvocationContext();
boolean isRollback = checkIsRollingBack(ctx.getTransaction());
- NodeSPI<K, V> n = findNodeCheck(tx, fqn, isRollback);
- Map<K, V> rawData = n.getDataDirect();
+ NodeSPI n = findNodeCheck(tx, fqn, isRollback);
+ Map rawData = n.getDataDirect();
if (!isRollback) notifier.notifyNodeModified(fqn, true, NodeModifiedEvent.ModificationType.PUT_MAP, rawData, ctx);
// create a compensating method call (reverting the effect of
@@ -2234,8 +2157,8 @@
if (tx != null && create_undo_ops)
{
// erase and set to previous hashmap contents
- MethodCall undo_op = MethodCallFactory.create(MethodDeclarations.putDataEraseMethodLocal, tx, fqn, new HashMap<K, V>(rawData), false, true);
- tx_table.addUndoOperation(tx, undo_op);
+ MethodCall undo_op = MethodCallFactory.create(MethodDeclarations.putDataEraseMethodLocal, tx, fqn, new HashMap(rawData), false, true);
+ transactionTable.addUndoOperation(tx, undo_op);
}
if (erase_contents)
@@ -2251,8 +2174,8 @@
*
* @return Previous value (if any)
*/
- public Object _put(GlobalTransaction tx, String fqn, K key, V value, boolean create_undo_ops)
- throws CacheException
+ public Object _put(GlobalTransaction tx, String fqn, Object key, Object value, boolean create_undo_ops)
+ throws CacheException
{
return _put(tx, Fqn.fromString(fqn), key, value, create_undo_ops);
}
@@ -2262,9 +2185,9 @@
try
{
return tx != null && (
- tx.getStatus() == javax.transaction.Status.STATUS_ROLLEDBACK ||
- tx.getStatus() == javax.transaction.Status.STATUS_ROLLING_BACK ||
- tx.getStatus() == javax.transaction.Status.STATUS_MARKED_ROLLBACK);
+ tx.getStatus() == javax.transaction.Status.STATUS_ROLLEDBACK ||
+ tx.getStatus() == javax.transaction.Status.STATUS_ROLLING_BACK ||
+ tx.getStatus() == javax.transaction.Status.STATUS_MARKED_ROLLBACK);
}
catch (Exception e)
{
@@ -2278,13 +2201,13 @@
*
* @return Previous value (if any)
*/
- public Object _put(GlobalTransaction tx, Fqn fqn, K key, V value, boolean create_undo_ops)
- throws CacheException
+ public Object _put(GlobalTransaction tx, Fqn fqn, Object key, Object value, boolean create_undo_ops)
+ throws CacheException
{
if (log.isTraceEnabled())
{
log.trace(new StringBuffer("_put(").append(tx).append(", \"").
- append(fqn).append("\", k=").append(key).append(", v=").append(value).append(")"));
+ append(fqn).append("\", k=").append(key).append(", v=").append(value).append(")"));
}
@@ -2292,12 +2215,12 @@
// if this is a rollback then don't fire notifications.
boolean isRollback = checkIsRollingBack(ctx.getTransaction());
- NodeSPI<K, V> n = findNodeCheck(tx, fqn, isRollback);
- Map<K, V> rawData = n.getDataDirect();
+ NodeSPI n = findNodeCheck(tx, fqn, isRollback);
+ Map rawData = n.getDataDirect();
if (!isRollback)
notifier.notifyNodeModified(fqn, true, NodeModifiedEvent.ModificationType.PUT_DATA, rawData, ctx);
- V old_value = n.putDirect(key, value);
+ Object old_value = n.putDirect(key, value);
// create a compensating method call (reverting the effect of
// this modification) and put it into the TX's undo list.
@@ -2313,13 +2236,13 @@
undo_op = MethodCallFactory.create(MethodDeclarations.putKeyValMethodLocal, tx, fqn, key, old_value, false);
}
// 1. put undo-op in TX' undo-operations list (needed to rollback TX)
- tx_table.addUndoOperation(tx, undo_op);
+ transactionTable.addUndoOperation(tx, undo_op);
}
- Map<K, V> newData = Collections.singletonMap(key, value);
+ Map newData = Collections.singletonMap(key, value);
if (!isRollback)
notifier.notifyNodeModified(fqn, false, NodeModifiedEvent.ModificationType.PUT_DATA, newData, ctx);
-
+
return old_value;
}
@@ -2340,7 +2263,7 @@
}
public boolean _remove(GlobalTransaction tx, Fqn fqn, boolean create_undo_ops, boolean sendNodeEvent)
- throws CacheException
+ throws CacheException
{
return _remove(tx, fqn, create_undo_ops, sendNodeEvent, false);
}
@@ -2354,7 +2277,7 @@
* @param sendNodeEvent
*/
public boolean _remove(GlobalTransaction tx, Fqn fqn, boolean create_undo_ops, boolean sendNodeEvent, boolean eviction)
- throws CacheException
+ throws CacheException
{
return _remove(tx, fqn, create_undo_ops, sendNodeEvent, eviction, null);
}
@@ -2374,11 +2297,11 @@
* @throws CacheException
*/
public boolean _remove(GlobalTransaction tx, Fqn fqn, boolean create_undo_ops, boolean sendNodeEvent, boolean eviction, DataVersion version)
- throws CacheException
+ throws CacheException
{
- NodeSPI<K, V> n;
- NodeSPI<K, V> parent_node;
+ NodeSPI n;
+ NodeSPI parent_node;
MethodCall undo_op = null;
if (log.isTraceEnabled())
@@ -2458,7 +2381,7 @@
undo_op = MethodCallFactory.create(MethodDeclarations.addChildMethodLocal, tx, parent_node.getFqn(), n.getFqn().getLastElement(), n, false);
// 1. put undo-op in TX' undo-operations list (needed to rollback TX)
- tx_table.addUndoOperation(tx, undo_op);
+ transactionTable.addUndoOperation(tx, undo_op);
}
if (!isRollback)
@@ -2483,8 +2406,8 @@
* @param key
* @return Object
*/
- public V _remove(GlobalTransaction tx, String fqn, K key, boolean create_undo_ops)
- throws CacheException
+ public Object _remove(GlobalTransaction tx, String fqn, Object key, boolean create_undo_ops)
+ throws CacheException
{
return _remove(tx, Fqn.fromString(fqn), key, create_undo_ops);
}
@@ -2496,11 +2419,11 @@
* @param key
* @return Object
*/
- public V _remove(GlobalTransaction tx, Fqn fqn, K key, boolean create_undo_ops)
- throws CacheException
+ public Object _remove(GlobalTransaction tx, Fqn fqn, Object key, boolean create_undo_ops)
+ throws CacheException
{
MethodCall undo_op = null;
- V old_value = null;
+ Object old_value = null;
if (log.isTraceEnabled())
{
@@ -2509,7 +2432,7 @@
// Find the node. This will lock it (if <tt>locking</tt> is true) and
// add the temporarily created parent nodes to the TX's node list if tx != null)
- NodeSPI<K, V> n = findNode(fqn);
+ NodeSPI n = findNode(fqn);
if (n == null)
{
log.warn("node " + fqn + " not found");
@@ -2528,10 +2451,10 @@
{
undo_op = MethodCallFactory.create(MethodDeclarations.putKeyValMethodLocal, tx, fqn, key, old_value, false);
// 1. put undo-op in TX' undo-operations list (needed to rollback TX)
- tx_table.addUndoOperation(tx, undo_op);
+ transactionTable.addUndoOperation(tx, undo_op);
}
- Map<K, V> removedData = Collections.singletonMap(key, old_value);
+ Map removedData = Collections.singletonMap(key, old_value);
if (!isRollback)
notifier.notifyNodeModified(fqn, false, NodeModifiedEvent.ModificationType.REMOVE_DATA, removedData, ctx);
@@ -2542,7 +2465,7 @@
* Internal method to remove data from a node.
*/
public void _removeData(GlobalTransaction tx, String fqn, boolean create_undo_ops)
- throws CacheException
+ throws CacheException
{
_removeData(tx, Fqn.fromString(fqn), create_undo_ops);
}
@@ -2551,7 +2474,7 @@
* Internal method to remove data from a node.
*/
public void _removeData(GlobalTransaction tx, Fqn fqn, boolean create_undo_ops)
- throws CacheException
+ throws CacheException
{
_removeData(tx, fqn, create_undo_ops, true);
}
@@ -2560,7 +2483,7 @@
* Internal method to remove data from a node.
*/
public void _removeData(GlobalTransaction tx, Fqn fqn, boolean create_undo_ops, boolean sendNodeEvent)
- throws CacheException
+ throws CacheException
{
_removeData(tx, fqn, create_undo_ops, sendNodeEvent, false);
}
@@ -2569,7 +2492,7 @@
* Internal method to remove data from a node.
*/
public void _removeData(GlobalTransaction tx, Fqn fqn, boolean create_undo_ops, boolean sendNodeEvent, boolean eviction)
- throws CacheException
+ throws CacheException
{
_removeData(tx, fqn, create_undo_ops, sendNodeEvent, eviction, null);
}
@@ -2578,7 +2501,7 @@
* Internal method to remove data from a node.
*/
public void _removeData(GlobalTransaction tx, Fqn fqn, boolean create_undo_ops, boolean sendNodeEvent, boolean eviction, DataVersion version)
- throws CacheException
+ throws CacheException
{
MethodCall undo_op = null;
@@ -2589,14 +2512,14 @@
// Find the node. This will lock it (if <tt>locking</tt> is true) and
// add the temporarily created parent nodes to the TX's node list if tx != null)
- NodeSPI<K, V> n = findNode(fqn, version);
+ NodeSPI n = findNode(fqn, version);
if (n == null)
{
log.warn("node " + fqn + " not found");
return;
}
- Map<K, V> data = n.getDataDirect();
+ Map data = n.getDataDirect();
InvocationContext ctx = getInvocationContext();
boolean isRollback = checkIsRollingBack(ctx.getTransaction());
// create a compensating method call (reverting the effect of
@@ -2606,7 +2529,7 @@
if (!data.isEmpty())
{
undo_op = MethodCallFactory.create(MethodDeclarations.putDataMethodLocal,
- tx, fqn, new HashMap<K, V>(data), false);
+ tx, fqn, new HashMap(data), false);
}
}
@@ -2650,7 +2573,7 @@
// put undo-op in TX' undo-operations list (needed to rollback TX)
if (tx != null && create_undo_ops)
{
- tx_table.addUndoOperation(tx, undo_op);
+ transactionTable.addUndoOperation(tx, undo_op);
}
}
@@ -2733,7 +2656,7 @@
*/
public void invalidate(Fqn fqn, DataVersion versionToInvalidate)
{
- Node<K, V> node = get(fqn); // force interceptor chain, load if necessary from cache loader.
+ Node node = get(fqn); // force interceptor chain, load if necessary from cache loader.
if (node == null)
{
@@ -2746,7 +2669,7 @@
{
log.trace("Node doesn't exist; creating a tombstone");
// create the node we need.
- Map<K, V> m = Collections.emptyMap();
+ Map m = Collections.emptyMap();
InvocationContext ic = getInvocationContext();
boolean origCacheModeLocal = ic.getOptionOverrides().isCacheModeLocal();
ic.getOptionOverrides().setCacheModeLocal(true);
@@ -2770,7 +2693,7 @@
}
node = nodeSPI;
}
-
+
if (configuration.isNodeLockingOptimistic())
_removeData(null, fqn, false, false, true, versionToInvalidate);
else
@@ -2806,7 +2729,7 @@
* Compensating method to {@link #_remove(GlobalTransaction,Fqn,boolean)}.
*/
public void _addChild(GlobalTransaction gtx, Fqn parent_fqn, Object child_name, Node cn, boolean undoOps)
- throws CacheException
+ throws CacheException
{
NodeSPI childNode = (NodeSPI) cn;
if (log.isTraceEnabled())
@@ -2841,7 +2764,7 @@
if (gtx != null && undoOps)
{
// 1. put undo-op in TX' undo-operations list (needed to rollback TX)
- tx_table.addUndoOperation(gtx, MethodCallFactory.create(MethodDeclarations.removeNodeMethodLocal, gtx, fqn, false));
+ transactionTable.addUndoOperation(gtx, MethodCallFactory.create(MethodDeclarations.removeNodeMethodLocal, gtx, fqn, false));
}
if (!isRollback) notifier.notifyNodeCreated(fqn, false, ctx);
@@ -2872,9 +2795,10 @@
catch (Throwable ex)
{
if (method_call.getMethodId() != MethodDeclarations.putForExternalReadMethodLocal_id
- || method_call.getMethodId() != MethodDeclarations.putForExternalReadVersionedMethodLocal_id )
+ || method_call.getMethodId() != MethodDeclarations.putForExternalReadVersionedMethodLocal_id)
{
- if (!MethodDeclarations.isBuddyGroupOrganisationMethod(method_call.getMethodId()) && log.isWarnEnabled()) log.warn("replication failure with method_call " + method_call + " exception", ex);
+ if (!MethodDeclarations.isBuddyGroupOrganisationMethod(method_call.getMethodId()) && log.isWarnEnabled())
+ log.warn("replication failure with method_call " + method_call + " exception", ex);
throw ex;
}
else return null;
@@ -2944,7 +2868,7 @@
* @return a GravitateResult which contains the data for the gravitation
*/
public GravitateResult gravitateData(Fqn fqn, boolean searchSubtrees)
- throws CacheException
+ throws CacheException
{
// we need to get the state for this Fqn and its sub-nodes.
@@ -2959,7 +2883,7 @@
// use a get() call into the cache to make sure cache loading takes place.
// no need to cache the original skipDataGravitation setting here - it will always be false of we got here!!
ctx.getOptionOverrides().setSkipDataGravitation(true);
- Node<K, V> actualNode = get(fqn);
+ Node actualNode = get(fqn);
ctx.getOptionOverrides().setSkipDataGravitation(false);
if (log.isTraceEnabled()) log.trace("In local tree, this is " + actualNode);
@@ -2985,7 +2909,8 @@
ctx.getOptionOverrides().setSkipDataGravitation(true);
actualNode = get(backupNodeFqn);
ctx.getOptionOverrides().setSkipDataGravitation(false);
- if (log.isTraceEnabled()) log.trace("Looking for " + backupNodeFqn + ". Search result: " + actualNode);
+ if (log.isTraceEnabled())
+ log.trace("Looking for " + backupNodeFqn + ". Search result: " + actualNode);
if (actualNode != null) break;
}
}
@@ -3007,7 +2932,7 @@
backupNodeFqn = BuddyManager.getBackupFqn(BuddyManager.getGroupNameFromAddress(getLocalAddress()), fqn);
}
- List<NodeData> list = getNodeData(new LinkedList<NodeData>(), (NodeSPI<K, V>) actualNode);
+ List<NodeData> list = getNodeData(new LinkedList<NodeData>(), (NodeSPI) actualNode);
return GravitateResult.subtreeResult(list, backupNodeFqn);
}
@@ -3017,13 +2942,13 @@
}
}
- private List<NodeData> getNodeData(List<NodeData> list, NodeSPI<K, V> node)
+ private List<NodeData> getNodeData(List<NodeData> list, NodeSPI node)
{
NodeData data = new NodeData(BuddyManager.getActualFqn(node.getFqn()), node.getDataDirect());
list.add(data);
- for (NodeSPI<K, V> childNode : node.getChildrenDirect())
+ for (Object childNode : node.getChildrenDirect())
{
- getNodeData(list, childNode);
+ getNodeData(list, (NodeSPI) childNode);
}
return list;
}
@@ -3062,8 +2987,8 @@
{
if (log.isTraceEnabled())
log.trace("DataGravitationCleanup: Removing primary (" + primary + ") and backup (" + backup + ")");
- primaryDataCleanup = MethodCallFactory.create(MethodDeclarations.removeNodeMethodLocal, null, primary, false);
- backupDataCleanup = MethodCallFactory.create(MethodDeclarations.removeNodeMethodLocal, null, backup, false);
+ primaryDataCleanup = MethodCallFactory.create(MethodDeclarations.removeNodeMethodLocal, primary, false);
+ backupDataCleanup = MethodCallFactory.create(MethodDeclarations.removeNodeMethodLocal, backup, false);
}
else
{
@@ -3117,7 +3042,7 @@
*/
public void _releaseAllLocks(Fqn fqn)
{
- NodeSPI<K, V> n;
+ NodeSPI n;
try
{
@@ -3135,11 +3060,11 @@
}
}
- private void releaseAll(NodeSPI<K, V> n)
+ private void releaseAll(NodeSPI n)
{
- for (NodeSPI<K, V> child : n.getChildrenDirect())
+ for (Object child : n.getChildrenDirect())
{
- releaseAll(child);
+ releaseAll((NodeSPI) child);
}
n.getLock().releaseAll();
}
@@ -3167,7 +3092,7 @@
* Should not be called.
*/
public void _lock(Fqn fqn, NodeLock.LockType lock_type, boolean recursive)
- throws TimeoutException, LockingException
+ throws TimeoutException, LockingException
{
throw new UnsupportedOperationException("method _lock() should not be invoked on CacheImpl");
}
@@ -3211,7 +3136,7 @@
*/
public void addUndoOperation(GlobalTransaction gtx, MethodCall undo_op)
{
- tx_table.addUndoOperation(gtx, undo_op);
+ transactionTable.addUndoOperation(gtx, undo_op);
}
/**
@@ -3230,12 +3155,6 @@
this.cacheLoaderManager = cacheLoaderManager;
}
- public void setConfiguration(Configuration configuration)
- {
- this.configuration = configuration;
- configuration.setCacheImpl(this);
- }
-
/**
* @return an instance of {@link Notifier} which has been configured with this instance of CacheImpl.
*/
@@ -3281,14 +3200,14 @@
public void _move(Fqn nodeToMoveFqn, Fqn newParentFqn)
{
// the actual move algorithm.
- NodeSPI<K, V> newParent = findNode(newParentFqn);
+ NodeSPI newParent = findNode(newParentFqn);
if (newParent == null)
{
throw new NodeNotExistsException("New parent node " + newParentFqn + " does not exist when attempting to move node!!");
}
- NodeSPI<K, V> node = findNode(nodeToMoveFqn);
+ NodeSPI node = findNode(nodeToMoveFqn);
if (node == null)
{
@@ -3320,7 +3239,7 @@
if (ctx.getTransaction() != null)
{
MethodCall undo = MethodCallFactory.create(MethodDeclarations.moveMethodLocal, new Fqn(newParentFqn, nodeToMoveFqn.getLastElement()), oldParent.getFqn());
- tx_table.addUndoOperation(ctx.getGlobalTransaction(), undo);
+ transactionTable.addUndoOperation(ctx.getGlobalTransaction(), undo);
}
}
@@ -3372,339 +3291,6 @@
}
}
- protected class MessageListenerAdaptor implements ExtendedMessageListener
- {
- /**
- * Reference to an exception that was raised during
- * state installation on this node.
- */
- protected volatile Exception setStateException;
- private final Object stateLock = new Object();
-
- protected MessageListenerAdaptor()
- {
- }
-
- public void waitForState() throws Exception
- {
- synchronized (stateLock)
- {
- while (!isStateSet)
- {
- if (setStateException != null)
- {
- throw setStateException;
- }
-
- try
- {
- stateLock.wait();
- }
- catch (InterruptedException iex)
- {
- }
- }
- }
- }
-
- protected void stateReceivedSuccess()
- {
- isStateSet = true;
- setStateException = null;
- }
-
- protected void stateReceivingFailed(Throwable t)
- {
- if (t instanceof CacheException)
- {
- log.debug(t);
- }
- else
- {
- log.error("failed setting state", t);
- }
- if (t instanceof Exception)
- {
- setStateException = (Exception) t;
- }
- else
- {
- setStateException = new Exception(t);
- }
- }
-
- protected void stateProducingFailed(Throwable t)
- {
- if (t instanceof CacheException)
- {
- log.debug(t);
- }
- else
- {
- log.error("Caught " + t.getClass().getName()
- + " while responding to state transfer request", t);
- }
- }
-
- /**
- * Callback, does nothing.
- */
- public void receive(Message msg)
- {
- }
-
- public byte[] getState()
- {
- MarshalledValueOutputStream out = null;
- byte[] result = null;
- ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream(16 * 1024);
- try
- {
- out = new MarshalledValueOutputStream(baos);
-
- getStateTransferManager().getState(out, Fqn.ROOT, configuration.getStateRetrievalTimeout(), true, true);
- }
- catch (Throwable t)
- {
- stateProducingFailed(t);
- }
- finally
- {
- result = baos.getRawBuffer();
- Util.close(out);
- }
- return result;
- }
-
- public void setState(byte[] new_state)
- {
- if (new_state == null)
- {
- log.debug("transferred state is null (may be first member in cluster)");
- return;
- }
- ByteArrayInputStream bais = new ByteArrayInputStream(new_state);
- MarshalledValueInputStream in = null;
- try
- {
- in = new MarshalledValueInputStream(bais);
- getStateTransferManager().setState(in, Fqn.ROOT);
- stateReceivedSuccess();
- }
- catch (Throwable t)
- {
- stateReceivingFailed(t);
- }
- finally
- {
- Util.close(in);
- synchronized (stateLock)
- {
- // Notify wait that state has been set.
- stateLock.notifyAll();
- }
- }
- }
-
- public byte[] getState(String state_id)
- {
- MarshalledValueOutputStream out = null;
- String sourceRoot = state_id;
- byte[] result = null;
-
- boolean hasDifferentSourceAndIntegrationRoots = state_id.indexOf(StateTransferManager.PARTIAL_STATE_DELIMITER) > 0;
- if (hasDifferentSourceAndIntegrationRoots)
- {
- sourceRoot = state_id.split(StateTransferManager.PARTIAL_STATE_DELIMITER)[0];
- }
-
- ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream(16 * 1024);
- try
- {
- out = new MarshalledValueOutputStream(baos);
-
- getStateTransferManager().getState(out, Fqn.fromString(sourceRoot),
- configuration.getStateRetrievalTimeout(), true, true);
- }
- catch (Throwable t)
- {
- stateProducingFailed(t);
- }
- finally
- {
- result = baos.getRawBuffer();
- Util.close(out);
- }
- return result;
- }
-
- public void getState(OutputStream ostream)
- {
- MarshalledValueOutputStream out = null;
- try
- {
- out = new MarshalledValueOutputStream(ostream);
- getStateTransferManager().getState(out, Fqn.ROOT, configuration.getStateRetrievalTimeout(), true, true);
- }
- catch (Throwable t)
- {
- stateProducingFailed(t);
- }
- finally
- {
- Util.close(out);
- }
- }
-
- public void getState(String state_id, OutputStream ostream)
- {
- String sourceRoot = state_id;
- MarshalledValueOutputStream out = null;
- boolean hasDifferentSourceAndIntegrationRoots = state_id.indexOf(StateTransferManager.PARTIAL_STATE_DELIMITER) > 0;
- if (hasDifferentSourceAndIntegrationRoots)
- {
- sourceRoot = state_id.split(StateTransferManager.PARTIAL_STATE_DELIMITER)[0];
- }
- try
- {
- out = new MarshalledValueOutputStream(ostream);
- getStateTransferManager().getState(out, Fqn.fromString(sourceRoot), configuration.getStateRetrievalTimeout(), true, true);
- }
- catch (Throwable t)
- {
- stateProducingFailed(t);
- }
- finally
- {
- Util.close(out);
- }
- }
-
- public void setState(InputStream istream)
- {
- if (istream == null)
- {
- log.debug("stream is null (may be first member in cluster)");
- return;
- }
- MarshalledValueInputStream in = null;
- try
- {
- in = new MarshalledValueInputStream(istream);
- getStateTransferManager().setState(in, Fqn.ROOT);
- stateReceivedSuccess();
- }
- catch (Throwable t)
- {
- stateReceivingFailed(t);
- }
- finally
- {
- Util.close(in);
- synchronized (stateLock)
- {
- // Notify wait that state has been set.
- stateLock.notifyAll();
- }
- }
- }
-
- public void setState(String state_id, byte[] state)
- {
- if (state == null)
- {
- log.debug("partial transferred state is null");
- return;
- }
-
- MarshalledValueInputStream in = null;
- String targetRoot = state_id;
- boolean hasDifferentSourceAndIntegrationRoots = state_id.indexOf(StateTransferManager.PARTIAL_STATE_DELIMITER) > 0;
- if (hasDifferentSourceAndIntegrationRoots)
- {
- targetRoot = state_id.split(StateTransferManager.PARTIAL_STATE_DELIMITER)[1];
- }
- try
- {
- log.debug("Setting received partial state for subroot " + state_id);
- Fqn subroot = Fqn.fromString(targetRoot);
-// Region region = regionManager.getRegion(subroot, false);
-// ClassLoader cl = null;
-// if (region != null)
-// {
-// // If a classloader is registered for the node's region, use it
-// cl = region.getClassLoader();
-// }
- ByteArrayInputStream bais = new ByteArrayInputStream(state);
- in = new MarshalledValueInputStream(bais);
- //getStateTransferManager().setState(in, subroot, cl);
- getStateTransferManager().setState(in, subroot);
- stateReceivedSuccess();
- }
- catch (Throwable t)
- {
- stateReceivingFailed(t);
- }
- finally
- {
- Util.close(in);
- synchronized (stateLock)
- {
- // Notify wait that state has been set.
- stateLock.notifyAll();
- }
- }
- }
-
- public void setState(String state_id, InputStream istream)
- {
- String targetRoot = state_id;
- MarshalledValueInputStream in = null;
- boolean hasDifferentSourceAndIntegrationRoots = state_id.indexOf(StateTransferManager.PARTIAL_STATE_DELIMITER) > 0;
- if (hasDifferentSourceAndIntegrationRoots)
- {
- targetRoot = state_id.split(StateTransferManager.PARTIAL_STATE_DELIMITER)[1];
- }
- if (istream == null)
- {
- log.debug("stream is null (may be first member in cluster). State is not set");
- return;
- }
-
- try
- {
- log.debug("Setting received partial state for subroot " + state_id);
- in = new MarshalledValueInputStream(istream);
- Fqn subroot = Fqn.fromString(targetRoot);
-// Region region = regionManager.getRegion(subroot, false);
-// ClassLoader cl = null;
-// if (region != null)
-// {
-// // If a classloader is registered for the node's region, use it
-// cl = region.getClassLoader();
-// }
- //getStateTransferManager().setState(in, subroot, cl);
- getStateTransferManager().setState(in, subroot);
- stateReceivedSuccess();
- }
- catch (Throwable t)
- {
- stateReceivingFailed(t);
- }
- finally
- {
- Util.close(in);
- synchronized (stateLock)
- {
- // Notify wait that state has been set.
- stateLock.notifyAll();
- }
- }
- }
- }
-
- /*-------------------- End of MessageListener ----------------------*/
-
/*----------------------- MembershipListener ------------------------*/
protected class MembershipListenerAdaptor implements ExtendedMembershipListener
@@ -3805,13 +3391,13 @@
*/
protected Transaction getLocalTransaction()
{
- if (tm == null)
+ if (transactionManager == null)
{
return null;
}
try
{
- return tm.getTransaction();
+ return transactionManager.getTransaction();
}
catch (Throwable t)
{
@@ -3914,15 +3500,15 @@
// operate on the transaction at one time so no concern about 2 threads trying to call
// this method for the same Transaction instance at the same time
//
- GlobalTransaction gtx = tx_table.get(tx);
+ GlobalTransaction gtx = transactionTable.get(tx);
if (gtx == null && createIfNotExists)
{
Address addr = getLocalAddress();
gtx = GlobalTransaction.create(addr);
- tx_table.put(tx, gtx);
+ transactionTable.put(tx, gtx);
TransactionEntry ent = configuration.isNodeLockingOptimistic() ? new OptimisticTransactionEntry() : new TransactionEntry();
ent.setTransaction(tx);
- tx_table.put(gtx, ent);
+ transactionTable.put(gtx, ent);
if (log.isTraceEnabled())
{
log.trace("created new GTX: " + gtx + ", local TX=" + tx);
@@ -3957,70 +3543,7 @@
*/
protected Object invokeMethod(MethodCall m, boolean originLocal) throws CacheException
{
- // don't create a new one; get it from ThreadLocal just this once, in case a user has added any overrides.
- InvocationContext ctx = getInvocationContext();
-
- // BR methods should NOT block on the cache being started, since the cache depends on these completing to start.
- if (!MethodDeclarations.isBuddyGroupOrganisationMethod(m.getMethodId()) && !cacheStatus.allowInvocations() && !ctx.getOptionOverrides().isSkipCacheStatusCheck())
- {
- // only throw an exception if this is a locally originating call - JBCACHE-1179
- if (originLocal)
- {
- throw new IllegalStateException("Cache not in STARTED state!");
- }
- else
- {
- if (getCacheStatus() == CacheStatus.STARTING)
- {
- try
- {
- blockUntilCacheStarts();
- }
- catch (InterruptedException e)
- {
- Thread.currentThread().interrupt();
- }
-
- // if the cache STILL can't take invocations...
- if (!cacheStatus.allowInvocations()) throw new IllegalStateException("Cache not in STARTED state!");
- }
- else
- {
- log.warn("Received a remote call but the cache is not in STARTED state - ignoring call.");
- return null;
- }
- }
- }
-
-
- MethodCall oldCall = null;
- try
- {
- // check if we had a method call lurking around
- oldCall = ctx.getMethodCall();
- ctx.setMethodCall(m);
- // only set this if originLocal is EXPLICITLY passed in as FALSE. Otherwise leave it as a default.
- if (!originLocal) ctx.setOriginLocal(false);
- return interceptor_chain.invoke(ctx);
- }
- catch (CacheException e)
- {
- throw e;
- }
- catch (RuntimeException e)
- {
- throw e;
- }
- catch (Throwable t)
- {
- throw new CacheException(t);
- }
- finally
- {
- if (!originLocal) ctx.setOriginLocal(true);
- // reset old method call
- ctx.setMethodCall(oldCall);
- }
+ throw new UnsupportedOperationException("DEPRECATED and should not be used!!");
}
/**
@@ -4054,7 +3577,7 @@
* @param fqn Fully qualified name for the corresponding node.
* @return DataNode
*/
- public NodeSPI<K, V> findNode(Fqn fqn)
+ public NodeSPI findNode(Fqn fqn)
{
try
{
@@ -4067,9 +3590,9 @@
}
}
- private NodeSPI<K, V> findNodeCheck(GlobalTransaction tx, Fqn fqn, boolean includeInvalid)
+ private NodeSPI findNodeCheck(GlobalTransaction tx, Fqn fqn, boolean includeInvalid)
{
- NodeSPI<K, V> n = findNode(fqn, null, includeInvalid);
+ NodeSPI n = findNode(fqn, null, includeInvalid);
if (n == null)
{
String errStr = "node " + fqn + " not found (gtx=" + tx + ", caller=" + Thread.currentThread() + ")";
@@ -4131,16 +3654,16 @@
/**
* Finds a node given a fully qualified name and DataVersion. Does not include invalid nodes.
*/
- private NodeSPI<K, V> findNode(Fqn fqn, DataVersion version)
+ private NodeSPI findNode(Fqn fqn, DataVersion version)
{
return findNode(fqn, version, false);
}
- private NodeSPI<K, V> findNode(Fqn fqn, DataVersion version, boolean includeInvalidNodes)
+ private NodeSPI findNode(Fqn fqn, DataVersion version, boolean includeInvalidNodes)
{
if (fqn == null) return null;
- NodeSPI<K, V> toReturn = peek(fqn, false, includeInvalidNodes);
+ NodeSPI toReturn = peek(fqn, false, includeInvalidNodes);
if (toReturn != null && version != null && configuration.isNodeLockingOptimistic())
{
@@ -4159,54 +3682,13 @@
return toReturn;
}
- public synchronized RegionManager getRegionManager()
- {
- if (regionManager == null)
- {
- regionManager = new RegionManager(this);
- }
- return regionManager;
- }
-
-
- public Marshaller getMarshaller()
- {
- if (marshaller_ == null)
- {
- synchronized (this)
- {
- if (marshaller_ == null)
- {
- if (configuration.getMarshallerClass() == null || configuration.getMarshallerClass().equals(VersionAwareMarshaller.class.getName()))
- {
- marshaller_ = new VersionAwareMarshaller(getRegionManager(), configuration);
- }
- else
- {
- try
- {
- marshaller_ = (Marshaller) org.jboss.cache.util.Util.loadClass(configuration.getMarshallerClass()).newInstance();
- }
- catch (Exception e)
- {
- log.error("Unable to load marshaller " + configuration.getMarshallerClass() + ". Falling back to default (" + VersionAwareMarshaller.class.getName() + ")");
- marshaller_ = new VersionAwareMarshaller(getRegionManager(), configuration);
- }
- }
- if (log.isTraceEnabled()) log.trace("Using marshaller " + marshaller_.getClass().getName());
- }
- }
- }
- return marshaller_;
- }
-
private void initialiseCacheLoaderManager() throws CacheException
{
if (cacheLoaderManager == null)
{
cacheLoaderManager = new CacheLoaderManager();
}
- cacheLoaderManager.setConfig(configuration.getCacheLoaderConfig(), this);
+ cacheLoaderManager.setConfig(configuration.getCacheLoaderConfig(), spi);
}
/**
@@ -4258,7 +3740,7 @@
if (log.isDebugEnabled())
{
log.debug("Created Multiplexer Channel for cache cluster " + configuration.getClusterName() +
- " using stack " + configuration.getMultiplexerStack());
+ " using stack " + configuration.getMultiplexerStack());
}
}
else
@@ -4295,11 +3777,11 @@
// always use the InactiveRegionAwareRpcDispatcher - exceptions due to regions not being active should not propagate to remote
// nodes as errors. - Manik
- disp = new InactiveRegionAwareRpcDispatcher(channel, ml, new MembershipListenerAdaptor(), this);
+ disp = new InactiveRegionAwareRpcDispatcher(channel, messageListener, new MembershipListenerAdaptor(), this);
// disp = new RpcDispatcher(channel, ml, this, this);
- disp.setRequestMarshaller(getMarshaller());
- disp.setResponseMarshaller(getMarshaller());
+ disp.setRequestMarshaller(marshaller);
+ disp.setResponseMarshaller(marshaller);
}
private JChannel getMultiplexerChannel() throws CacheException
@@ -4371,12 +3853,12 @@
{
List<Interceptor> interceptors = getInterceptors();
- i.setCache(this);
+ InterceptorChainFactory factory = InterceptorChainFactory.getInstance();
interceptors.add(position, i);
// now correct the chaining of interceptors...
- Interceptor linkedChain = InterceptorChainFactory.getInstance().correctInterceptorChaining(interceptors);
+ Interceptor linkedChain = factory.correctInterceptorChaining(interceptors, configuration, componentRegistry);
setInterceptorChain(linkedChain);
}
@@ -4385,7 +3867,7 @@
{
List<Interceptor> i = getInterceptors();
i.remove(position);
- setInterceptorChain(InterceptorChainFactory.getInstance().correctInterceptorChaining(i));
+ setInterceptorChain(InterceptorChainFactory.getInstance().correctInterceptorChaining(i, configuration, componentRegistry));
}
public RPCManager getRPCManager()
@@ -4401,7 +3883,7 @@
public void evict(Fqn<?> fqn, boolean recursive)
{
//this method should be called by eviction thread only, so no transaction - expected (sec param is false)
- Node<K, V> node = peek(fqn, false);
+ Node node = peek(fqn, false);
if (node != null && node.isResident())
{
return;
@@ -4410,7 +3892,7 @@
{
if (node != null)
{
- evictChildren((NodeSPI<K, V>) node);
+ evictChildren((NodeSPI) node);
}
}
else
@@ -4419,23 +3901,23 @@
}
}
- private void evictChildren(NodeSPI<K, V> n)
+ private void evictChildren(NodeSPI n)
{
- for (NodeSPI<K, V> child : n.getChildrenDirect())
+ for (Object child : n.getChildrenDirect())
{
- evictChildren(child);
+ evictChildren((NodeSPI) child);
}
evict(n.getFqn());
}
public Region getRegion(Fqn<?> fqn, boolean createIfAbsent)
{
- return getRegionManager().getRegion(fqn, createIfAbsent);
+ return regionManager.getRegion(fqn, createIfAbsent);
}
public boolean removeRegion(Fqn<?> fqn)
{
- return getRegionManager().removeRegion(fqn);
+ return regionManager.removeRegion(fqn);
}
public boolean removeNode(Fqn<?> fqn)
@@ -4443,7 +3925,7 @@
return remove(fqn);
}
- public void putForExternalRead(Fqn<?> fqn, K key, V value)
+ public void putForExternalRead(Fqn<?> fqn, Object key, Object value)
{
// if the node exists then this should be a no-op.
if (!exists(fqn))
@@ -4460,7 +3942,7 @@
}
}
- public void _putForExternalRead(GlobalTransaction gtx, Fqn fqn, K key, V value)
+ public void _putForExternalRead(GlobalTransaction gtx, Fqn fqn, Object key, Object value)
{
_put(gtx, fqn, key, value, true);
}
@@ -4469,10 +3951,4 @@
{
return getCacheStatus() == CacheStatus.STARTED;
}
-
- protected void setMessageListener(MessageListenerAdaptor ml)
- {
- this.ml = ml;
- }
-
}
Modified: core/trunk/src/main/java/org/jboss/cache/CacheSPI.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/CacheSPI.java 2007-12-11 17:25:09 UTC (rev 4832)
+++ core/trunk/src/main/java/org/jboss/cache/CacheSPI.java 2007-12-11 17:26:38 UTC (rev 4833)
@@ -18,11 +18,13 @@
import org.jboss.cache.statetransfer.StateTransferManager;
import org.jboss.cache.transaction.GlobalTransaction;
import org.jboss.cache.transaction.TransactionTable;
+import org.jgroups.Address;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import java.util.List;
import java.util.Map;
+import java.util.Set;
/**
* A more detailed interface to {@link Cache}, which is used when writing plugins for or extending JBoss Cache. A reference
@@ -52,12 +54,20 @@
/**
* Retrieves a reference to a running {@link javax.transaction.TransactionManager}, if one is configured.
+ * <p/>
+ * From 2.1.0, Interceptor authors should obtain this by injection rather than this method. See the
+ * {@link org.jboss.cache.factories.annotations.Inject} annotation.
*
* @return a TransactionManager
*/
TransactionManager getTransactionManager();
/**
+ * Retrieves the current Interceptor chain.
+ * <p/>
+ * From 2.1.0, Interceptor authors should obtain this by injection rather than this method. See the
+ * {@link org.jboss.cache.factories.annotations.Inject} annotation.
+ *
* @return an immutable {@link List} of {@link Interceptor}s configured for this cache, or
* <code>null</code> if {@link Cache#create() create()} has not been invoked
* and the interceptors thus do not exist.
@@ -65,6 +75,22 @@
List<Interceptor> getInterceptorChain();
/**
+ * Retrieves an instance of a {@link Marshaller}, which is capable of
+ * converting Java objects to bytestreams and back in an efficient manner, which is
+ * also interoperable with bytestreams produced/consumed by other versions of JBoss
+ * Cache.
+ * <p/>
+ * The use of this marshaller is the <b>recommended</b> way of creating efficient,
+ * compatible, byte streams from objects.
+ * <p/>
+ * From 2.1.0, Interceptor authors should obtain this by injection rather than this method. See the
+ * {@link org.jboss.cache.factories.annotations.Inject} annotation.
+ *
+ * @return an instance of {@link Marshaller}
+ */
+ Marshaller getMarshaller();
+
+ /**
* Adds a custom interceptor to the interceptor chain, at specified position, where the first interceptor in the chain
* is at position 0 and the last one at getInterceptorChain().size() - 1.
*
@@ -82,33 +108,78 @@
void removeInterceptor(int position);
/**
+ * Retrieves the current CacheCacheLoaderManager instance associated with the current Cache instance.
+ * <p/>
+ * From 2.1.0, Interceptor authors should obtain this by injection rather than this method. See the
+ * {@link org.jboss.cache.factories.annotations.Inject} annotation.
+ *
* @return Retrieves a reference to the currently configured {@link org.jboss.cache.loader.CacheLoaderManager} if one or more cache loaders are configured, null otherwise.
*/
CacheLoaderManager getCacheLoaderManager();
/**
+ * Retrieves the current BuddyManager instance associated with the current Cache instance.
+ * <p/>
+ * From 2.1.0, Interceptor authors should obtain this by injection rather than this method. See the
+ * {@link org.jboss.cache.factories.annotations.Inject} annotation.
+ *
* @return an instance of {@link BuddyManager} if buddy replication is enabled, null otherwise.
*/
BuddyManager getBuddyManager();
/**
+ * Retrieves the current TransactionTable instance associated with the current Cache instance.
+ * <p/>
+ * From 2.1.0, Interceptor authors should obtain this by injection rather than this method. See the
+ * {@link org.jboss.cache.factories.annotations.Inject} annotation.
+ *
* @return the current {@link TransactionTable}
*/
TransactionTable getTransactionTable();
/**
* Gets a handle of the RPC manager.
+ * <p/>
+ * From 2.1.0, Interceptor authors should obtain this by injection rather than this method. See the
+ * {@link org.jboss.cache.factories.annotations.Inject} annotation.
*
* @return the {@link org.jboss.cache.RPCManager} configured.
*/
RPCManager getRPCManager();
/**
+ * Retrieves the current StateTransferManager instance associated with the current Cache instance.
+ * <p/>
+ * From 2.1.0, Interceptor authors should obtain this by injection rather than this method. See the
+ * {@link org.jboss.cache.factories.annotations.Inject} annotation.
+ *
* @return the current {@link org.jboss.cache.statetransfer.StateTransferManager}
*/
StateTransferManager getStateTransferManager();
/**
+ * Retrieves the current RegionManager instance associated with the current Cache instance.
+ * <p/>
+ * From 2.1.0, Interceptor authors should obtain this by injection rather than this method. See the
+ * {@link org.jboss.cache.factories.annotations.Inject} annotation.
+ *
+ * @return the {@link org.jboss.cache.RegionManager}
+ */
+ RegionManager getRegionManager();
+
+
+ /**
+ * Retrieves the current Notifier instance associated with the current Cache instance.
+ * <p/>
+ * From 2.1.0, Interceptor authors should obtain this by injection rather than this method. See the
+ * {@link org.jboss.cache.factories.annotations.Inject} annotation.
+ *
+ * @return the notifier attached with this instance of the cache. See {@link Notifier}, a class
+ * that is responsible for emitting notifications to registered CacheListeners.
+ */
+ Notifier getNotifier();
+
+ /**
* @return the name of the cluster. Null if running in local mode.
*/
String getClusterName();
@@ -131,11 +202,6 @@
Map<Thread, List<NodeLock>> getLockTable();
/**
- * @return the {@link org.jboss.cache.RegionManager}
- */
- RegionManager getRegionManager();
-
- /**
* Returns the global transaction for this local transaction.
* Optionally creates a new global transaction if it does not exist.
*
@@ -146,13 +212,21 @@
GlobalTransaction getCurrentTransaction(Transaction tx, boolean createIfNotExists);
/**
- * @return the notifier attached with this instance of the cache. See {@link Notifier}, a class
- * that is responsible for emitting notifications to registered CacheListeners.
+ * Returns the transaction associated with the current thread.
+ * If a local transaction exists, but doesn't yet have a mapping to a
+ * GlobalTransaction, a new GlobalTransaction will be created and mapped to
+ * the local transaction. Note that if a local transaction exists, but is
+ * not ACTIVE or PREPARING, null is returned.
+ *
+ * @return A GlobalTransaction, or null if no (local) transaction was associated with the current thread
*/
- Notifier getNotifier();
+ GlobalTransaction getCurrentTransaction();
/**
- * Returns a node without accessing the interceptor chain. Does not return any nodes marked as invalid.
+ * Returns a node without accessing the interceptor chain. Does not return any nodes marked as invalid. Note that this call works
+ * directly on the cache data structure and will not pass through the interceptor chain. Hence node locking, cache
+ * loading or activation does not take place, and so the results of this call should not be treated as definitive. Concurrent node
+ * removal, passivation, etc. may affect the results of this call.
*
* @param fqn the Fqn to look up.
* @param includeDeletedNodes if you intend to see nodes marked as deleted within the current tx, set this to true
@@ -162,6 +236,10 @@
/**
* Returns a node without accessing the interceptor chain, optionally returning nodes that are marked as invalid ({@link org.jboss.cache.Node#isValid()} == false).
+ * Note that this call works
+ * directly on the cache data structure and will not pass through the interceptor chain. Hence node locking, cache
+ * loading or activation does not take place, and so the results of this call should not be treated as definitive. Concurrent node
+ * removal, passivation, etc. may affect the results of this call.
*
* @param fqn the Fqn to look up.
* @param includeDeletedNodes if you intend to see nodes marked as deleted within the current tx, set this to true
@@ -182,15 +260,55 @@
GravitateResult gravitateData(Fqn<?> fqn, boolean searchBuddyBackupSubtrees);
/**
- * Retrieves an instance of a {@link Marshaller}, which is capable of
- * converting Java objects to bytestreams and back in an efficient manner, which is
- * also interoperable with bytestreams produced/consumed by other versions of JBoss
- * Cache.
- * <p/>
- * The use of this marshaller is the <b>recommended</b> way of creating efficient,
- * compatible, byte streams from objects.
+ * Returns a Set<Fqn> of Fqns of the topmost node of internal regions that
+ * should not included in standard state transfers. Will include
+ * {@link BuddyManager#BUDDY_BACKUP_SUBTREE} if buddy replication is
+ * enabled.
*
- * @return an instance of {@link Marshaller}
+ * @return an unmodifiable Set<Fqn>. Will not return <code>null</code>.
*/
- Marshaller getMarshaller();
+ Set<Fqn> getInternalFqns();
+
+ @Deprecated
+ void fetchPartialState(List<Address> members, Fqn subtreeRoot) throws Exception;
+
+ @Deprecated
+ void fetchPartialState(List<Address> members, Fqn subtreeRoot, Fqn integrationPoint) throws Exception;
+
+ int getNumberOfLocksHeld();
+
+ /**
+ * Helper method that does a peek and ensures that the result of the peek is not null. Note that this call works
+ * directly on the cache data structure and will not pass through the interceptor chain. Hence node locking, cache
+ * loading or activation does not take place, and so the results of this call should not be treated as definitive.
+ *
+ * @param fqn Fqn to peek
+ * @return true if the peek returns a non-null value.
+ */
+ boolean exists(Fqn<?> fqn);
+
+ /**
+ * A convenience method that takes a String representation of an Fqn. Otherwise identical to {@link #exists(Fqn)}.
+ * Note that this call works
+ * directly on the cache data structure and will not pass through the interceptor chain. Hence node locking, cache
+ * loading or activation does not take place, and so the results of this call should not be treated as definitive.
+ */
+ boolean exists(String fqn);
+
+ /**
+ * Returns all children of a given node. Returns an empty set if there are no children.
+ * The set is unmodifiable.
+ *
+ * @param fqn The fully qualified name of the node
+ * @return Set an unmodifiable set of children names, Object.
+ */
+ <E> Set<E> getChildrenNames(Fqn<E> fqn);
+
+ /**
+ * Convenience method that takes a String representation of an Fqn. Otherwise identical to {@link #getChildrenNames(Fqn)}
+ *
+ * @param fqn as a string
+ * @return Set an unmodifiable set of children names, Object.
+ */
+ Set getChildrenNames(String fqn);
}
Modified: core/trunk/src/main/java/org/jboss/cache/NodeFactory.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/NodeFactory.java 2007-12-11 17:25:09 UTC (rev 4832)
+++ core/trunk/src/main/java/org/jboss/cache/NodeFactory.java 2007-12-11 17:26:38 UTC (rev 4833)
@@ -6,6 +6,11 @@
*/
package org.jboss.cache;
+import org.jboss.cache.config.Configuration;
+import org.jboss.cache.factories.ComponentFactory;
+import org.jboss.cache.factories.ComponentRegistry;
+import org.jboss.cache.factories.annotations.Inject;
+import org.jboss.cache.invocation.NodeInvocationDelegate;
import org.jboss.cache.optimistic.TransactionWorkspace;
import org.jboss.cache.optimistic.WorkspaceNode;
import org.jboss.cache.optimistic.WorkspaceNodeImpl;
@@ -17,16 +22,24 @@
*
* @author <a href="mailto:manik at jboss.org">Manik Surtani (manik at jboss.org)</a>
*/
-public class NodeFactory<K, V>
+// TODO: Rethink how nodes are created. Perhaps use the component factory to create instance nodes as well, not just singletons?
+public class NodeFactory<K, V> extends ComponentFactory
{
+ private ComponentRegistry componentRegistry;
+ private CacheSPI<K, V> cache;
+ private boolean optimistic;
+ private Configuration configuration;
+
+ protected <T> T construct(String componentName, Class<T> componentType)
+ {
+ throw new UnsupportedOperationException("Should never be called!");
+ }
+
public enum NodeType
{
UNVERSIONED_NODE, VERSIONED_NODE, WORKSPACE_NODE
}
- private CacheSPI<K, V> cache;
- private boolean optimistic;
-
/**
* Constructs an instance of the factory
*/
@@ -36,12 +49,25 @@
init();
}
+ public NodeFactory()
+ {
+ }
+
+ @Inject
+ private void injectDependencies(CacheSPI<K, V> cache, Configuration configuration, ComponentRegistry componentRegistry)
+ {
+ this.cache = cache;
+ this.configuration = configuration;
+ this.componentRegistry = componentRegistry;
+ init();
+ }
+
/**
* Initialises the node factory with the configuration from the cache.
*/
public void init()
{
- optimistic = cache.getConfiguration().isNodeLockingOptimistic();
+ optimistic = configuration.isNodeLockingOptimistic();
}
@@ -60,10 +86,12 @@
*/
public NodeSPI<K, V> createDataNode(Object childName, Fqn fqn, NodeSPI<K, V> parent, Map<K, V> data, boolean mapSafe)
{
- NodeSPI<K, V> n = optimistic ? new VersionedNode<K, V>(fqn, parent, data, cache) : new UnversionedNode<K, V>(childName, fqn, data, mapSafe, cache);
+ UnversionedNode un = optimistic ? new VersionedNode<K, V>(fqn, parent, data, cache) : new UnversionedNode<K, V>(childName, fqn, data, mapSafe, cache);
// always assume that new nodes do not have data loaded
- n.setDataLoaded(false);
- return n;
+ un.setDataLoaded(false);
+ NodeInvocationDelegate<K, V> nid = new NodeInvocationDelegate(un);
+ componentRegistry.wireDependencies(nid);
+ return nid;
}
public Node<K, V> createNode(Object childName, Node<K, V> parent, Map<K, V> data)
Modified: core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java 2007-12-11 17:25:09 UTC (rev 4832)
+++ core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java 2007-12-11 17:26:38 UTC (rev 4833)
@@ -6,6 +6,7 @@
*/
package org.jboss.cache;
+import org.jboss.cache.factories.annotations.Inject;
import org.jboss.cache.marshall.MethodCall;
import org.jgroups.Address;
import org.jgroups.blocks.RspFilter;
@@ -20,7 +21,7 @@
*/
public class RPCManagerImpl implements RPCManager
{
- private CacheImpl<?, ?> c;
+ private CacheImpl c;
/**
* Empty ctor for mock object creation/unit testing
@@ -29,6 +30,12 @@
{
}
+ @Inject
+ private void setupDependencies(CacheImpl c)
+ {
+ this.c = c;
+ }
+
public RPCManagerImpl(CacheSPI c)
{
this.c = (CacheImpl) c;
Modified: core/trunk/src/main/java/org/jboss/cache/RegionManager.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/RegionManager.java 2007-12-11 17:25:09 UTC (rev 4832)
+++ core/trunk/src/main/java/org/jboss/cache/RegionManager.java 2007-12-11 17:26:38 UTC (rev 4833)
@@ -10,7 +10,9 @@
import org.jboss.cache.config.EvictionRegionConfig;
import org.jboss.cache.eviction.EvictionTimerTask;
import org.jboss.cache.eviction.RegionNameConflictException;
+import org.jboss.cache.factories.annotations.Inject;
import org.jboss.cache.lock.NodeLock;
+import org.jboss.classadapter.spi.ClassAdapterFactory;
import org.jgroups.Address;
import java.util.ArrayList;
@@ -44,7 +46,7 @@
private Map<Fqn, Region> regionsRegistry = new ConcurrentHashMap<Fqn, Region>();
private boolean defaultInactive;
private Log log = LogFactory.getLog(RegionManager.class);
- private CacheImpl<?, ?> cache;
+ private CacheSPI cache;
private boolean usingEvictions;
private EvictionConfig evictionConfig;
private EvictionTimerTask evictionTimerTask = new EvictionTimerTask();
@@ -62,10 +64,8 @@
r.setActive(true);
}
- /**
- * Constructs a new instance attached to a cache.
- */
- public RegionManager(CacheImpl cache)
+ @Inject
+ private void injectDependencies(CacheSPI cache)
{
this.cache = cache;
}
@@ -152,8 +152,8 @@
// this is a very poor way of telling whether a region is a marshalling one or an eviction one. :-(
// mandates that class loaders be registered for marshalling regions.
if (type == ANY
- || (type == MARSHALLING && r.getClassLoader() != null)
- || (type == EVICTION && r.getEvictionPolicyConfig() != null))
+ || (type == MARSHALLING && r.getClassLoader() != null)
+ || (type == EVICTION && r.getEvictionPolicyConfig() != null))
{
return r;
}
@@ -186,8 +186,8 @@
// this is a very poor way of telling whether a region is a marshalling one or an eviction one. :-(
// mandates that class loaders be registered for marshalling regions.
if (type == ANY
- || (type == MARSHALLING && r.getClassLoader() != null)
- || (type == EVICTION && r.getEvictionPolicyConfig() != null))
+ || (type == MARSHALLING && r.getClassLoader() != null)
+ || (type == EVICTION && r.getEvictionPolicyConfig() != null))
{
nextBestThing = r;
}
@@ -198,7 +198,7 @@
// test if the default region has been defined. If not, and if eviction regions
// are in use, throw an exception since it is required.
if ((nextBestThing == null || nextBestThing.getFqn().isRoot() && !regionsRegistry.containsKey(Fqn.ROOT))
- && type == EVICTION)
+ && type == EVICTION)
{
throw new RuntimeException("No default eviction region defined!");
}
@@ -345,7 +345,7 @@
private void activateRegion(Fqn fqn, boolean suppressRegionNotEmptyException)
{
// Check whether the node already exists and has data
- Node subtreeRoot = cache.findNode(fqn);
+ Node subtreeRoot = cache.peek(fqn, false, false);
/*
* Commented out on Nov 16,2006 Manik&Vladimir
@@ -402,7 +402,7 @@
continue;
sources.add(buddy);
Fqn buddyRoot = BuddyManager.getBackupFqn(buddy, fqn);
- subtreeRoot = cache.findNode(buddyRoot);
+ subtreeRoot = cache.peek(buddyRoot, false, false);
if (subtreeRoot == null)
{
// We'll update this node with the state we receive
@@ -412,7 +412,7 @@
subtreeRoot = cache.getRoot().addChild(buddyRoot);
cache.getInvocationContext().getOptionOverrides().setCacheModeLocal(false);
}
- cache.fetchPartialState(sources, fqn, subtreeRoot.getFqn());
+ cache.fetchPartialState(sources, fqn, subtreeRoot.getFqn());
}
}
else
@@ -447,7 +447,7 @@
else
{
throw new CacheException(t.getClass().getName() + " " +
- t.getLocalizedMessage(), t);
+ t.getLocalizedMessage(), t);
}
}
finally
@@ -519,7 +519,7 @@
if (buddyManager != null)
{
- Set buddies = cache.getChildrenNames(BuddyManager.BUDDY_BACKUP_SUBTREE_FQN);
+ Set buddies = cache.peek(BuddyManager.BUDDY_BACKUP_SUBTREE_FQN, false, false).getChildrenNames();
if (buddies != null)
{
for (Iterator it = buddies.iterator(); it.hasNext();)
@@ -535,11 +535,12 @@
for (Iterator<Fqn> it = list.iterator(); it.hasNext();)
{
Fqn subtree = it.next();
- subtreeRoot = cache.findNode(subtree);
+ subtreeRoot = cache.peek(subtree, false, false);
if (subtreeRoot != null)
{
// Acquire locks
- Object owner = cache.getOwnerForLock();
+
+ Object owner = getOwnerForLock();
subtreeLock = subtreeRoot.getLock();
subtreeLock.acquireAll(owner, stateFetchTimeout, NodeLock.LockType.WRITE);
subtreeLocked = true;
@@ -554,7 +555,8 @@
}
// Remove the subtree
- cache._evictSubtree(subtree);
+ cache.evict(subtree, true);
+ //cache._evictSubtree(subtree);
// Release locks
if (parent != null)
@@ -612,6 +614,13 @@
}
}
+ private Object getOwnerForLock()
+ {
+ Object owner = cache.getCurrentTransaction();
+ return owner == null ? Thread.currentThread() : owner;
+ }
+
+
/**
* <p/>
* This is legacy code and should not be called directly. This is a private method for now and will be refactored out.
@@ -733,7 +742,7 @@
for (Region r : regionsRegistry.values())
{
if ((type == EVICTION && r.getEvictionPolicy() != null && evictionTimerTask.isRegionRegisteredForProcessing(r)) ||
- (type == MARSHALLING && r.isActive() && r.getClassLoader() != null))
+ (type == MARSHALLING && r.isActive() && r.getClassLoader() != null))
regions.add(r);
}
}
@@ -823,11 +832,8 @@
return "RegionManager " + dumpRegions();
}
- /**
- * Returns the cache for this region manager
- */
- public CacheImpl getCache()
+ public CacheSPI getCache()
{
- return this.cache;
+ return cache;
}
}
Added: core/trunk/src/main/java/org/jboss/cache/remoting/jgroups/CacheMessageListener.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/remoting/jgroups/CacheMessageListener.java (rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/remoting/jgroups/CacheMessageListener.java 2007-12-11 17:26:38 UTC (rev 4833)
@@ -0,0 +1,376 @@
+package org.jboss.cache.remoting.jgroups;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.cache.CacheException;
+import org.jboss.cache.Fqn;
+import org.jboss.cache.config.Configuration;
+import org.jboss.cache.factories.annotations.Inject;
+import org.jboss.cache.statetransfer.StateTransferManager;
+import org.jboss.cache.util.ExposedByteArrayOutputStream;
+import org.jboss.util.stream.MarshalledValueInputStream;
+import org.jboss.util.stream.MarshalledValueOutputStream;
+import org.jgroups.ExtendedMessageListener;
+import org.jgroups.Message;
+import org.jgroups.util.Util;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * JGroups MessageListener
+ *
+ * @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
+ * @since 2.1.0
+ */
+public class CacheMessageListener implements ExtendedMessageListener
+{
+ /**
+ * Reference to an exception that was raised during
+ * state installation on this node.
+ */
+ protected volatile Exception setStateException;
+ private final Object stateLock = new Object();
+ private Log log = LogFactory.getLog(CacheMessageListener.class);
+ private StateTransferManager stateTransferManager;
+ private Configuration configuration;
+ /**
+ * True if state was initialized during start-up.
+ */
+ private volatile boolean isStateSet = false;
+
+ @Inject
+ private void injectDependencies(StateTransferManager stateTransferManager, Configuration configuration)
+ {
+ this.stateTransferManager = stateTransferManager;
+ this.configuration = configuration;
+ }
+
+ public boolean isStateSet()
+ {
+ return isStateSet;
+ }
+
+ public void setStateSet(boolean stateSet)
+ {
+ isStateSet = stateSet;
+ }
+
+ public void waitForState() throws Exception
+ {
+ synchronized (stateLock)
+ {
+ while (!isStateSet)
+ {
+ if (setStateException != null)
+ {
+ throw setStateException;
+ }
+
+ try
+ {
+ stateLock.wait();
+ }
+ catch (InterruptedException iex)
+ {
+ }
+ }
+ }
+ }
+
+ protected void stateReceivedSuccess()
+ {
+ isStateSet = true;
+ setStateException = null;
+ }
+
+ protected void stateReceivingFailed(Throwable t)
+ {
+ if (t instanceof CacheException)
+ {
+ log.debug(t);
+ }
+ else
+ {
+ log.error("failed setting state", t);
+ }
+ if (t instanceof Exception)
+ {
+ setStateException = (Exception) t;
+ }
+ else
+ {
+ setStateException = new Exception(t);
+ }
+ }
+
+ protected void stateProducingFailed(Throwable t)
+ {
+ if (t instanceof CacheException)
+ {
+ log.debug(t);
+ }
+ else
+ {
+ log.error("Caught " + t.getClass().getName()
+ + " while responding to state transfer request", t);
+ }
+ }
+
+ /**
+ * Callback, does nothing.
+ */
+ public void receive(Message msg)
+ {
+ }
+
+ public byte[] getState()
+ {
+ MarshalledValueOutputStream out = null;
+ byte[] result = null;
+ ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream(16 * 1024);
+ try
+ {
+ out = new MarshalledValueOutputStream(baos);
+
+ stateTransferManager.getState(out, Fqn.ROOT, configuration.getStateRetrievalTimeout(), true, true);
+ }
+ catch (Throwable t)
+ {
+ stateProducingFailed(t);
+ }
+ finally
+ {
+ result = baos.getRawBuffer();
+ Util.close(out);
+ }
+ return result;
+ }
+
+ public void setState(byte[] new_state)
+ {
+ if (new_state == null)
+ {
+ log.debug("transferred state is null (may be first member in cluster)");
+ return;
+ }
+ ByteArrayInputStream bais = new ByteArrayInputStream(new_state);
+ MarshalledValueInputStream in = null;
+ try
+ {
+ in = new MarshalledValueInputStream(bais);
+ stateTransferManager.setState(in, Fqn.ROOT);
+ stateReceivedSuccess();
+ }
+ catch (Throwable t)
+ {
+ stateReceivingFailed(t);
+ }
+ finally
+ {
+ Util.close(in);
+ synchronized (stateLock)
+ {
+ // Notify wait that state has been set.
+ stateLock.notifyAll();
+ }
+ }
+ }
+
+ public byte[] getState(String state_id)
+ {
+ MarshalledValueOutputStream out = null;
+ String sourceRoot = state_id;
+ byte[] result = null;
+
+ boolean hasDifferentSourceAndIntegrationRoots = state_id.indexOf(StateTransferManager.PARTIAL_STATE_DELIMITER) > 0;
+ if (hasDifferentSourceAndIntegrationRoots)
+ {
+ sourceRoot = state_id.split(StateTransferManager.PARTIAL_STATE_DELIMITER)[0];
+ }
+
+ ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream(16 * 1024);
+ try
+ {
+ out = new MarshalledValueOutputStream(baos);
+
+ stateTransferManager.getState(out, Fqn.fromString(sourceRoot),
+ configuration.getStateRetrievalTimeout(), true, true);
+ }
+ catch (Throwable t)
+ {
+ stateProducingFailed(t);
+ }
+ finally
+ {
+ result = baos.getRawBuffer();
+ Util.close(out);
+ }
+ return result;
+ }
+
+ public void getState(OutputStream ostream)
+ {
+ MarshalledValueOutputStream out = null;
+ try
+ {
+ out = new MarshalledValueOutputStream(ostream);
+ stateTransferManager.getState(out, Fqn.ROOT, configuration.getStateRetrievalTimeout(), true, true);
+ }
+ catch (Throwable t)
+ {
+ stateProducingFailed(t);
+ }
+ finally
+ {
+ Util.close(out);
+ }
+ }
+
+ public void getState(String state_id, OutputStream ostream)
+ {
+ String sourceRoot = state_id;
+ MarshalledValueOutputStream out = null;
+ boolean hasDifferentSourceAndIntegrationRoots = state_id.indexOf(StateTransferManager.PARTIAL_STATE_DELIMITER) > 0;
+ if (hasDifferentSourceAndIntegrationRoots)
+ {
+ sourceRoot = state_id.split(StateTransferManager.PARTIAL_STATE_DELIMITER)[0];
+ }
+ try
+ {
+ out = new MarshalledValueOutputStream(ostream);
+ stateTransferManager.getState(out, Fqn.fromString(sourceRoot), configuration.getStateRetrievalTimeout(), true, true);
+ }
+ catch (Throwable t)
+ {
+ stateProducingFailed(t);
+ }
+ finally
+ {
+ Util.close(out);
+ }
+ }
+
+ public void setState(InputStream istream)
+ {
+ if (istream == null)
+ {
+ log.debug("stream is null (may be first member in cluster)");
+ return;
+ }
+ MarshalledValueInputStream in = null;
+ try
+ {
+ in = new MarshalledValueInputStream(istream);
+ stateTransferManager.setState(in, Fqn.ROOT);
+ stateReceivedSuccess();
+ }
+ catch (Throwable t)
+ {
+ stateReceivingFailed(t);
+ }
+ finally
+ {
+ Util.close(in);
+ synchronized (stateLock)
+ {
+ // Notify wait that state has been set.
+ stateLock.notifyAll();
+ }
+ }
+ }
+
+ public void setState(String state_id, byte[] state)
+ {
+ if (state == null)
+ {
+ log.debug("partial transferred state is null");
+ return;
+ }
+
+ MarshalledValueInputStream in = null;
+ String targetRoot = state_id;
+ boolean hasDifferentSourceAndIntegrationRoots = state_id.indexOf(StateTransferManager.PARTIAL_STATE_DELIMITER) > 0;
+ if (hasDifferentSourceAndIntegrationRoots)
+ {
+ targetRoot = state_id.split(StateTransferManager.PARTIAL_STATE_DELIMITER)[1];
+ }
+ try
+ {
+ log.debug("Setting received partial state for subroot " + state_id);
+ Fqn subroot = Fqn.fromString(targetRoot);
+// Region region = regionManager.getRegion(subroot, false);
+// ClassLoader cl = null;
+// if (region != null)
+// {
+// // If a classloader is registered for the node's region, use it
+// cl = region.getClassLoader();
+// }
+ ByteArrayInputStream bais = new ByteArrayInputStream(state);
+ in = new MarshalledValueInputStream(bais);
+ //getStateTransferManager().setState(in, subroot, cl);
+ stateTransferManager.setState(in, subroot);
+ stateReceivedSuccess();
+ }
+ catch (Throwable t)
+ {
+ stateReceivingFailed(t);
+ }
+ finally
+ {
+ Util.close(in);
+ synchronized (stateLock)
+ {
+ // Notify wait that state has been set.
+ stateLock.notifyAll();
+ }
+ }
+ }
+
+ public void setState(String state_id, InputStream istream)
+ {
+ String targetRoot = state_id;
+ MarshalledValueInputStream in = null;
+ boolean hasDifferentSourceAndIntegrationRoots = state_id.indexOf(StateTransferManager.PARTIAL_STATE_DELIMITER) > 0;
+ if (hasDifferentSourceAndIntegrationRoots)
+ {
+ targetRoot = state_id.split(StateTransferManager.PARTIAL_STATE_DELIMITER)[1];
+ }
+ if (istream == null)
+ {
+ log.debug("stream is null (may be first member in cluster). State is not set");
+ return;
+ }
+
+ try
+ {
+ log.debug("Setting received partial state for subroot " + state_id);
+ in = new MarshalledValueInputStream(istream);
+ Fqn subroot = Fqn.fromString(targetRoot);
+// Region region = regionManager.getRegion(subroot, false);
+// ClassLoader cl = null;
+// if (region != null)
+// {
+// // If a classloader is registered for the node's region, use it
+// cl = region.getClassLoader();
+// }
+ //getStateTransferManager().setState(in, subroot, cl);
+ stateTransferManager.setState(in, subroot);
+ stateReceivedSuccess();
+ }
+ catch (Throwable t)
+ {
+ stateReceivingFailed(t);
+ }
+ finally
+ {
+ Util.close(in);
+ synchronized (stateLock)
+ {
+ // Notify wait that state has been set.
+ stateLock.notifyAll();
+ }
+ }
+ }
+}
Modified: core/trunk/src/main/java/org/jboss/cache/util/BeanUtils.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/util/BeanUtils.java 2007-12-11 17:25:09 UTC (rev 4832)
+++ core/trunk/src/main/java/org/jboss/cache/util/BeanUtils.java 2007-12-11 17:26:38 UTC (rev 4833)
@@ -15,8 +15,10 @@
public class BeanUtils
{
private static Log log = LogFactory.getLog(BeanUtils.class);
+
/**
* Retrieves a setter name based on a field name passed in
+ *
* @param fieldName field name to find setter for
* @return name of setter method
*/
@@ -36,6 +38,7 @@
/**
* Returns a getter for a given class
+ *
* @param componentClass class to find getter for
* @return name of getter method
*/
@@ -47,8 +50,23 @@
}
/**
+ * Returns a setter for a given class
+ *
+ * @param componentClass class to find setter for
+ * @return name of getter method
+ */
+ public static String setterName(Class componentClass)
+ {
+ StringBuilder sb = new StringBuilder("set");
+ sb.append(componentClass.getSimpleName());
+ return sb.toString();
+ }
+
+
+ /**
* Returns a Method object corresponding to a getter that retrieves an instance of componentClass from target.
- * @param target class that the getter should exist on
+ *
+ * @param target class that the getter should exist on
* @param componentClass component to get
* @return Method object, or null of one does not exist
*/
@@ -60,8 +78,30 @@
}
catch (NoSuchMethodException e)
{
- log.trace("Unable to find method " + getterName(componentClass) + " in class " + target);
+ if (log.isTraceEnabled())
+ log.trace("Unable to find method " + getterName(componentClass) + " in class " + target);
return null;
}
}
+
+ /**
+ * Returns a Method object corresponding to a setter that sets an instance of componentClass from target.
+ *
+ * @param target class that the setter should exist on
+ * @param componentClass component to set
+ * @return Method object, or null of one does not exist
+ */
+ public static Method setterMethod(Class target, Class componentClass)
+ {
+ try
+ {
+ return target.getMethod(setterName(componentClass), componentClass);
+ }
+ catch (NoSuchMethodException e)
+ {
+ if (log.isTraceEnabled())
+ log.trace("Unable to find method " + setterName(componentClass) + " in class " + target);
+ return null;
+ }
+ }
}
More information about the jbosscache-commits
mailing list