JBoss Cache SVN: r6451 - core/trunk/src/main/java/org/jboss/cache/util.
by jbosscache-commits@lists.jboss.org
Author: jason.greene(a)jboss.com
Date: 2008-07-30 19:52:09 -0400 (Wed, 30 Jul 2008)
New Revision: 6451
Modified:
core/trunk/src/main/java/org/jboss/cache/util/FastCopyHashMap.java
Log:
Better optimized copy constructor
Modified: core/trunk/src/main/java/org/jboss/cache/util/FastCopyHashMap.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/util/FastCopyHashMap.java 2008-07-30 23:46:45 UTC (rev 6450)
+++ core/trunk/src/main/java/org/jboss/cache/util/FastCopyHashMap.java 2008-07-30 23:52:09 UTC (rev 6451)
@@ -110,22 +110,26 @@
init(initialCapacity, loadFactor);
}
+ @SuppressWarnings("unchecked")
public FastCopyHashMap(Map<? extends K, ? extends V> map)
{
- this(map.size());
- putAll(map);
+ if (map instanceof FastCopyHashMap)
+ {
+ FastCopyHashMap<? extends K, ? extends V> fast = (FastCopyHashMap<? extends K, ? extends V>) map;
+ this.table = (Entry<K, V>[])fast.table.clone();
+ this.loadFactor = fast.loadFactor;
+ this.size = fast.size;
+ this.threshold = fast.threshold;
+ }
+ else
+ {
+ this.loadFactor = DEFAULT_LOAD_FACTOR;
+ init(map.size(), this.loadFactor);
+ putAll(map);
+ }
}
@SuppressWarnings("unchecked")
- public FastCopyHashMap(FastCopyHashMap<? extends K, ? extends V> map)
- {
- this.table = (Entry<K, V>[])map.table.clone();
- this.loadFactor = map.loadFactor;
- this.size = map.size;
- this.threshold = map.threshold;
- }
-
- @SuppressWarnings("unchecked")
private void init(int initialCapacity, float loadFactor)
{
int c = 1;
16 years, 4 months
JBoss Cache SVN: r6450 - core/trunk/src/main/java/org/jboss/cache/util.
by jbosscache-commits@lists.jboss.org
Author: jason.greene(a)jboss.com
Date: 2008-07-30 19:46:45 -0400 (Wed, 30 Jul 2008)
New Revision: 6450
Modified:
core/trunk/src/main/java/org/jboss/cache/util/FastCopyHashMap.java
Log:
Fix null handling for iterators/views
Add optimized copy constructor
Modified: core/trunk/src/main/java/org/jboss/cache/util/FastCopyHashMap.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/util/FastCopyHashMap.java 2008-07-30 22:58:03 UTC (rev 6449)
+++ core/trunk/src/main/java/org/jboss/cache/util/FastCopyHashMap.java 2008-07-30 23:46:45 UTC (rev 6450)
@@ -40,8 +40,14 @@
*/
public class FastCopyHashMap<K, V> extends AbstractMap<K, V> implements Map<K, V>, Cloneable, Serializable
{
+ /**
+ * Marks null keys.
+ */
+ private static final Object NULL = new Object();
- private static final Object NULL = new Object();
+ /**
+ * Serialization ID
+ */
private static final long serialVersionUID = 10929568968762L;
/**
@@ -80,7 +86,7 @@
private final float loadFactor;
/**
- * Counter used to detech changes made outside of an iterator
+ * Counter used to detect changes made outside of an iterator
*/
private transient int modCount;
@@ -104,13 +110,22 @@
init(initialCapacity, loadFactor);
}
- public FastCopyHashMap(Map<K, V> data)
+ public FastCopyHashMap(Map<? extends K, ? extends V> map)
{
- this();
- putAll(data);
+ this(map.size());
+ putAll(map);
}
@SuppressWarnings("unchecked")
+ public FastCopyHashMap(FastCopyHashMap<? extends K, ? extends V> map)
+ {
+ this.table = (Entry<K, V>[])map.table.clone();
+ this.loadFactor = map.loadFactor;
+ this.size = map.size;
+ this.threshold = map.threshold;
+ }
+
+ @SuppressWarnings("unchecked")
private void init(int initialCapacity, float loadFactor)
{
int c = 1;
@@ -139,6 +154,17 @@
return h ^ (h >>> 7) ^ (h >>> 4);
}
+ @SuppressWarnings("unchecked")
+ private static final <K> K maskNull(K key)
+ {
+ return key == null ? (K) NULL : key;
+ }
+
+ private static final <K> K unmaskNull(K key)
+ {
+ return key == NULL ? null : key;
+ }
+
private int nextIndex(int index, int length)
{
index = (index >= length - 1) ? 0 : index + 1;
@@ -167,7 +193,7 @@
public V get(Object key)
{
- if (key == null) key = NULL;
+ key = maskNull(key);
int hash = hash(key);
int length = table.length;
@@ -188,7 +214,7 @@
public boolean containsKey(Object key)
{
- if (key == null) key = NULL;
+ key = maskNull(key);
int hash = hash(key);
int length = table.length;
@@ -216,12 +242,10 @@
return false;
}
- @SuppressWarnings("unchecked")
public V put(K key, V value)
{
- if (key == null) key = (K) NULL;
+ key = maskNull(key);
-
Entry<K, V>[] table = this.table;
int hash = hash(key);
int length = table.length;
@@ -306,7 +330,7 @@
public V remove(Object key)
{
- if (key == null) key = NULL;
+ key = maskNull(key);
Entry<K, V>[] table = this.table;
int length = table.length;
@@ -424,7 +448,6 @@
System.out.println(" Optimal: " + optimal + " (" + (float) optimal * 100 / total + "%)");
System.out.println(" Average Distnce: " + ((float) totalSkew / (total - optimal)));
System.out.println(" Max Distance: " + maxSkew);
-
}
public Set<Map.Entry<K, V>> entrySet()
@@ -471,7 +494,7 @@
@SuppressWarnings("unchecked")
private void putForCreate(K key, V value)
{
- if (key == null) key = (K) NULL;
+ key = maskNull(key);
Entry<K, V>[] table = this.table;
int hash = hash(key);
@@ -493,11 +516,11 @@
s.defaultWriteObject();
s.writeInt(size);
- for (Entry e : table)
+ for (Entry<K,V> e : table)
{
if (e != null)
{
- s.writeObject(e.key);
+ s.writeObject(unmaskNull(e.key));
s.writeObject(e.value);
}
}
@@ -631,7 +654,7 @@
{
public K next()
{
- return nextEntry().key;
+ return unmaskNull(nextEntry().key);
}
}
@@ -664,7 +687,7 @@
public Map.Entry<K, V> next()
{
Entry<K, V> e = nextEntry();
- return new WriteThroughEntry(e.key, e.value);
+ return new WriteThroughEntry(unmaskNull(e.key), e.value);
}
}
16 years, 4 months
JBoss Cache SVN: r6449 - core/trunk/src/main/java/org/jboss/cache/util.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2008-07-30 18:58:03 -0400 (Wed, 30 Jul 2008)
New Revision: 6449
Modified:
core/trunk/src/main/java/org/jboss/cache/util/FastCopyHashMap.java
Log:
Changed default load factor
Modified: core/trunk/src/main/java/org/jboss/cache/util/FastCopyHashMap.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/util/FastCopyHashMap.java 2008-07-30 21:33:43 UTC (rev 6448)
+++ core/trunk/src/main/java/org/jboss/cache/util/FastCopyHashMap.java 2008-07-30 22:58:03 UTC (rev 6449)
@@ -57,7 +57,7 @@
/**
* 67%, just like IdentityHashMap
*/
- private static final float DEFAULT_LOAD_FACTOR = 0.5f;
+ private static final float DEFAULT_LOAD_FACTOR = 0.67f;
/**
* The open-addressed table
16 years, 4 months
JBoss Cache SVN: r6448 - core/trunk/src/main/java/org/jboss/cache/util.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2008-07-30 17:33:43 -0400 (Wed, 30 Jul 2008)
New Revision: 6448
Modified:
core/trunk/src/main/java/org/jboss/cache/util/FastCopyHashMap.java
Log:
Changed default load factor
Modified: core/trunk/src/main/java/org/jboss/cache/util/FastCopyHashMap.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/util/FastCopyHashMap.java 2008-07-30 21:20:36 UTC (rev 6447)
+++ core/trunk/src/main/java/org/jboss/cache/util/FastCopyHashMap.java 2008-07-30 21:33:43 UTC (rev 6448)
@@ -57,7 +57,7 @@
/**
* 67%, just like IdentityHashMap
*/
- private static final float DEFAULT_LOAD_FACTOR = 0.67f;
+ private static final float DEFAULT_LOAD_FACTOR = 0.5f;
/**
* The open-addressed table
16 years, 4 months
JBoss Cache SVN: r6447 - benchmarks/benchmark-fwk/trunk/cache-products/coherence-3.3.1/src/org/cachebench/cachewrappers.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2008-07-30 17:20:36 -0400 (Wed, 30 Jul 2008)
New Revision: 6447
Modified:
benchmarks/benchmark-fwk/trunk/cache-products/coherence-3.3.1/src/org/cachebench/cachewrappers/Coherence331Wrapper.java
Log:
Added explicit locking for coherence
Modified: benchmarks/benchmark-fwk/trunk/cache-products/coherence-3.3.1/src/org/cachebench/cachewrappers/Coherence331Wrapper.java
===================================================================
--- benchmarks/benchmark-fwk/trunk/cache-products/coherence-3.3.1/src/org/cachebench/cachewrappers/Coherence331Wrapper.java 2008-07-30 17:13:20 UTC (rev 6446)
+++ benchmarks/benchmark-fwk/trunk/cache-products/coherence-3.3.1/src/org/cachebench/cachewrappers/Coherence331Wrapper.java 2008-07-30 21:20:36 UTC (rev 6447)
@@ -2,6 +2,7 @@
import com.tangosol.net.CacheFactory;
import com.tangosol.net.NamedCache;
+import com.tangosol.util.TransactionMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.cachebench.CacheWrapper;
@@ -18,7 +19,8 @@
public class Coherence331Wrapper implements CacheWrapper
{
- private NamedCache cache;
+ private TransactionMap cache;
+ private NamedCache nc;
boolean localmode;
Map parameters;
private Log log = LogFactory.getLog(Coherence331Wrapper.class);
@@ -35,43 +37,59 @@
localmode = (Boolean.parseBoolean((String) parameters.get("localOnly")));
if (trimmedConfig.indexOf("repl") == 0)
{
- cache = CacheFactory.getCache("repl-CacheBenchmarkFramework");
+ nc = CacheFactory.getCache("repl-CacheBenchmarkFramework");
}
else if (trimmedConfig.indexOf("dist") == 0)
{
- cache = CacheFactory.getCache("dist-CacheBenchmarkFramework");
+ nc = CacheFactory.getCache("dist-CacheBenchmarkFramework");
}
else if (trimmedConfig.indexOf("local") == 0)
{
- cache = CacheFactory.getCache("local-CacheBenchmarkFramework");
+ nc = CacheFactory.getCache("local-CacheBenchmarkFramework");
}
else if (trimmedConfig.indexOf("opt") == 0)
{
- cache = CacheFactory.getCache("opt-CacheBenchmarkFramework");
+ nc = CacheFactory.getCache("opt-CacheBenchmarkFramework");
}
else if (trimmedConfig.indexOf("near") == 0)
{
- cache = CacheFactory.getCache("near-CacheBenchmarkFramework");
+ nc = CacheFactory.getCache("near-CacheBenchmarkFramework");
}
else
throw new RuntimeException("Invalid configuration ('" + trimmedConfig + "'). Configuration name should start with: 'dist', 'repl', 'local', 'opt' or 'near'");
- log.info("Starting Coherence cache " + cache.getCacheName());
+ cache = CacheFactory.getLocalTransaction(nc);
+ log.info("Starting Coherence cache " + nc.getCacheName());
}
public void tearDown() throws Exception
{
- if (cache != null) cache.release();
+ if (cache != null) nc.release();
}
public void put(List<String> path, Object key, Object value) throws Exception
{
- cache.put(pathAsString(path, key), value);
+ cache.lock(key);
+ try
+ {
+ cache.put(pathAsString(path, key), value);
+ }
+ finally
+ {
+ cache.unlock(key);
+ }
}
public Object get(List<String> path, Object key) throws Exception
{
- return cache.get(pathAsString(path, key));
+ try
+ {
+ return cache.get(pathAsString(path, key));
+ }
+ finally
+ {
+ cache.unlock(key);
+ }
}
public void empty() throws Exception
@@ -81,12 +99,12 @@
public int getNumMembers()
{
- return localmode ? 0 : cache.getCacheService().getCluster().getMemberSet().size();
+ return localmode ? 0 : nc.getCacheService().getCluster().getMemberSet().size();
}
public String getInfo()
{
- return cache.getCacheName();
+ return nc.getCacheName();
}
public Object getReplicatedData(List<String> path, String key) throws Exception
16 years, 4 months
JBoss Cache SVN: r6446 - in core/trunk/src: main/java/org/jboss/cache/loader and 3 other directories.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2008-07-30 13:13:20 -0400 (Wed, 30 Jul 2008)
New Revision: 6446
Modified:
core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java
core/trunk/src/main/java/org/jboss/cache/loader/AbstractCacheLoader.java
core/trunk/src/main/java/org/jboss/cache/loader/CacheLoaderManager.java
core/trunk/src/main/java/org/jboss/cache/remoting/jgroups/ChannelMessageListener.java
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferGenerator.java
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferManager.java
core/trunk/src/main/java/org/jboss/cache/statetransfer/LegacyStateTransferIntegrator.java
core/trunk/src/main/java/org/jboss/cache/statetransfer/LegacyStateTransferManager.java
core/trunk/src/main/java/org/jboss/cache/statetransfer/StateTransferIntegrator.java
core/trunk/src/test/java/org/jboss/cache/statetransfer/StateTransfer200Test.java
core/trunk/src/test/java/org/jboss/cache/statetransfer/StateTransferTestBase.java
Log:
Fixed funky state transfer brittleness
Modified: core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java 2008-07-30 16:06:38 UTC (rev 6445)
+++ core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java 2008-07-30 17:13:20 UTC (rev 6446)
@@ -51,6 +51,7 @@
import java.io.ByteArrayInputStream;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -297,7 +298,10 @@
{
// need to get the root DIRECTLY. cache.getRoot() will pass a call up the interceptor chain and we will
// have a problem with the cache not being started.
- dataContainer.getRoot().addChildDirect(BUDDY_BACKUP_SUBTREE_FQN);
+
+ cache.getInvocationContext().getOptionOverrides().setSkipCacheStatusCheck(true);
+ cache.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
+ cache.put(BUDDY_BACKUP_SUBTREE_FQN, (Map) Collections.emptyMap());
}
// allow waiting threads to process.
Modified: core/trunk/src/main/java/org/jboss/cache/loader/AbstractCacheLoader.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/loader/AbstractCacheLoader.java 2008-07-30 16:06:38 UTC (rev 6445)
+++ core/trunk/src/main/java/org/jboss/cache/loader/AbstractCacheLoader.java 2008-07-30 17:13:20 UTC (rev 6446)
@@ -85,6 +85,7 @@
if (objectFromStream instanceof NodeDataMarker)
{
// no persistent state sent across; return?
+ if (trace) log.trace("Empty persistent stream?");
return;
}
if (objectFromStream instanceof NodeDataExceptionMarker)
@@ -113,7 +114,11 @@
for (Object aNodeData : nodeData)
{
NodeData nd = (NodeData) aNodeData;
- if (nd.isMarker()) break;
+ if (nd.isMarker())
+ {
+ if (trace) log.trace("Reached delimiter; exiting loop");
+ break;
+ }
Fqn fqn;
if (moveToBuddy)
{
@@ -123,7 +128,7 @@
{
fqn = nd.getFqn();
}
-
+ if (trace) log.trace("Storing state in Fqn " + fqn);
if (nd.getAttributes() != null)
{
this.put(fqn, nd.getAttributes(), true);// creates a node with 0 or more attributes
Modified: core/trunk/src/main/java/org/jboss/cache/loader/CacheLoaderManager.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/loader/CacheLoaderManager.java 2008-07-30 16:06:38 UTC (rev 6445)
+++ core/trunk/src/main/java/org/jboss/cache/loader/CacheLoaderManager.java 2008-07-30 17:13:20 UTC (rev 6446)
@@ -451,6 +451,7 @@
{
throw new CacheException("Unable to start cache loaders", e);
}
+ fetchPersistentState |= loader.getConfig().isFetchPersistentState();
}
}
Modified: core/trunk/src/main/java/org/jboss/cache/remoting/jgroups/ChannelMessageListener.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/remoting/jgroups/ChannelMessageListener.java 2008-07-30 16:06:38 UTC (rev 6445)
+++ core/trunk/src/main/java/org/jboss/cache/remoting/jgroups/ChannelMessageListener.java 2008-07-30 17:13:20 UTC (rev 6446)
@@ -35,7 +35,8 @@
*/
protected volatile Exception setStateException;
private final Object stateLock = new Object();
- private Log log = LogFactory.getLog(ChannelMessageListener.class);
+ private static final Log log = LogFactory.getLog(ChannelMessageListener.class);
+ private static final boolean trace = log.isTraceEnabled();
private StateTransferManager stateTransferManager;
private Configuration configuration;
/**
@@ -43,6 +44,7 @@
*/
private volatile boolean isStateSet = false;
+
@Inject
private void injectDependencies(StateTransferManager stateTransferManager, Configuration configuration)
{
@@ -183,6 +185,7 @@
public byte[] getState(String state_id)
{
+ if (trace) log.trace("Getting state for state id " + state_id);
MarshalledValueOutputStream out = null;
String sourceRoot = state_id;
byte[] result;
@@ -233,6 +236,7 @@
public void getState(String state_id, OutputStream ostream)
{
+ if (trace) log.trace("Getting state for state id " + state_id);
String sourceRoot = state_id;
MarshalledValueOutputStream out = null;
boolean hasDifferentSourceAndIntegrationRoots = state_id.indexOf(DefaultStateTransferManager.PARTIAL_STATE_DELIMITER) > 0;
@@ -286,6 +290,7 @@
public void setState(String state_id, byte[] state)
{
+ if (trace) log.trace("Receiving state for " + state_id);
if (state == null)
{
log.debug("partial transferred state is null");
@@ -333,7 +338,7 @@
public void setState(String stateId, InputStream istream)
{
- if (log.isTraceEnabled()) log.trace("**** Receiving state for " + stateId);
+ if (trace) log.trace("Receiving state for " + stateId);
String targetRoot = stateId;
MarshalledValueInputStream in = null;
boolean hasDifferentSourceAndIntegrationRoots = stateId.indexOf(DefaultStateTransferManager.PARTIAL_STATE_DELIMITER) > 0;
Modified: core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferGenerator.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferGenerator.java 2008-07-30 16:06:38 UTC (rev 6445)
+++ core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferGenerator.java 2008-07-30 17:13:20 UTC (rev 6446)
@@ -43,7 +43,7 @@
this.cache = cache;
}
- @Start(priority = 18)
+ @Start(priority = 14)
private void start()
{
this.internalFqns = cache.getInternalFqns();
Modified: core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java 2008-07-30 16:06:38 UTC (rev 6445)
+++ core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java 2008-07-30 17:13:20 UTC (rev 6446)
@@ -39,7 +39,8 @@
public class DefaultStateTransferIntegrator implements StateTransferIntegrator
{
- protected Log log = LogFactory.getLog(getClass().getName());
+ private static final Log log = LogFactory.getLog(DefaultStateTransferIntegrator.class);
+ private static final boolean trace = log.isTraceEnabled();
private CacheSPI cache;
@@ -47,6 +48,7 @@
private Set<Fqn> internalFqns;
+
@Inject
public void inject(CacheSPI<?, ?> cache, NodeFactory nodefactory)
{
@@ -60,14 +62,18 @@
this.internalFqns = cache.getInternalFqns();
}
- public void integrateState(ObjectInputStream ois, Object target, Fqn targetRoot) throws Exception
+ public void integrateState(ObjectInputStream ois, Object target, Fqn targetRoot, boolean integratePersistentState) throws Exception
{
// pop version from the stream first!
short version = (Short) cache.getMarshaller().objectFromObjectStream(ois);
log.info("Using version " + version);
integrateTransientState(ois, (InternalNode) target);
- integrateAssociatedState(ois);
- integratePersistentState(ois, targetRoot);
+ if (trace) log.trace("Reading marker for nonexistent associated state");
+ cache.getMarshaller().objectFromObjectStream(ois);
+ if (integratePersistentState)
+ {
+ integratePersistentState(ois, targetRoot);
+ }
}
protected void integrateTransientState(ObjectInputStream in, InternalNode target) throws Exception
@@ -75,7 +81,7 @@
boolean transientSet = false;
try
{
- if (log.isTraceEnabled())
+ if (trace)
{
log.trace("integrating transient state for " + target);
}
@@ -84,7 +90,7 @@
transientSet = true;
- if (log.isTraceEnabled())
+ if (trace)
{
log.trace("transient state successfully integrated");
}
@@ -105,26 +111,13 @@
}
}
- /**
- * Provided for subclasses that deal with associated state.
- *
- * @throws Exception
- */
- protected void integrateAssociatedState(ObjectInputStream in) throws Exception
- {
- // no-op in this base class
- // just read marker
- cache.getMarshaller().objectFromObjectStream(in);
- }
-
protected void integratePersistentState(ObjectInputStream in, Fqn targetFqn) throws Exception
{
-
CacheLoaderManager loaderManager = cache.getCacheLoaderManager();
CacheLoader loader = loaderManager == null ? null : loaderManager.getCacheLoader();
if (loader == null)
{
- if (log.isTraceEnabled())
+ if (trace)
{
log.trace("cache loader is null, will not attempt to integrate persistent state");
}
@@ -132,7 +125,7 @@
else
{
- if (log.isTraceEnabled())
+ if (trace)
{
log.trace("integrating persistent state using " + loader.getClass().getName());
}
@@ -165,7 +158,7 @@
}
else
{
- if (log.isTraceEnabled())
+ if (trace)
{
log.trace("persistent state integrated successfully");
}
@@ -234,7 +227,7 @@
}
// read marker off stack
- cache.getMarshaller().objectFromObjectStream(in);
+// cache.getMarshaller().objectFromObjectStream(in);
}
}
Modified: core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferManager.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferManager.java 2008-07-30 16:06:38 UTC (rev 6445)
+++ core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferManager.java 2008-07-30 17:13:20 UTC (rev 6446)
@@ -12,7 +12,6 @@
import org.jboss.cache.CacheSPI;
import org.jboss.cache.Fqn;
import org.jboss.cache.InternalNode;
-import org.jboss.cache.NodeFactory;
import org.jboss.cache.NodeSPI;
import org.jboss.cache.RegionEmptyException;
import org.jboss.cache.RegionManager;
@@ -34,6 +33,7 @@
public class DefaultStateTransferManager implements StateTransferManager
{
protected final static Log log = LogFactory.getLog(DefaultStateTransferManager.class);
+ protected static final boolean trace = log.isTraceEnabled();
public static final NodeData STREAMING_DELIMITER_NODE = new NodeDataMarker();
@@ -47,25 +47,23 @@
boolean fetchTransientState;
boolean fetchPersistentState;
protected long stateRetrievalTimeout;
- private NodeFactory nodeFactory;
protected StateTransferIntegrator integrator;
protected StateTransferGenerator generator;
@Inject
- public void injectDependencies(CacheSPI cache, Marshaller marshaller, RegionManager regionManager, Configuration configuration, CacheLoaderManager cacheLoaderManager, NodeFactory nodeFactory, StateTransferIntegrator integrator, StateTransferGenerator generator)
+ public void injectDependencies(CacheSPI cache, Marshaller marshaller, RegionManager regionManager, Configuration configuration, CacheLoaderManager cacheLoaderManager, StateTransferIntegrator integrator, StateTransferGenerator generator)
{
this.cache = cache;
this.regionManager = regionManager;
this.marshaller = marshaller;
this.configuration = configuration;
this.cacheLoaderManager = cacheLoaderManager;
- this.nodeFactory = nodeFactory;
this.integrator = integrator;
this.generator = generator;
}
- @Start(priority = 19)
+ @Start(priority = 14)
public void start()
{
fetchTransientState = configuration.isFetchInMemoryState();
@@ -77,19 +75,18 @@
{
// can't give state for regions currently being activated/inactivated
boolean canProvideState = (!regionManager.isInactive(fqn) && cache.peek(fqn, false) != null);
-
+ if (trace) log.trace("Can provide state? " + canProvideState);
if (canProvideState && (fetchPersistentState || fetchTransientState))
{
marshaller.objectToObjectStream(true, out);
long startTime = System.currentTimeMillis();
- InternalNode rootNode = cache.getRoot().getDelegationTarget();
+ InternalNode subtreeRoot = fqn.isRoot() ? cache.getRoot().getDelegationTarget() : cache.getNode(fqn).getDelegationTarget();
// we don't need READ locks for MVCC based state transfer!
-
if (log.isDebugEnabled())
- log.debug("locking the " + fqn + " subtree to return the in-memory (transient) state");
+ log.debug("Generating in-memory (transient) state for subtree " + fqn);
- generator.generateState(out, rootNode, fetchTransientState, fetchPersistentState, suppressErrors);
+ generator.generateState(out, subtreeRoot, fetchTransientState, fetchPersistentState, suppressErrors);
if (log.isDebugEnabled())
log.debug("Successfully generated state in " + (System.currentTimeMillis() - startTime) + " msec");
@@ -175,8 +172,9 @@
* to be directly stored into a tree since we bypass interceptor chain.
*
*/
- if (log.isDebugEnabled()) log.debug("starting state integration at node " + targetRoot);
- integrator.integrateState(state, targetRoot.getDelegationTarget(), targetRoot.getFqn());
+ if (log.isDebugEnabled())
+ log.debug("starting state integration at node " + targetRoot + ". Fetch Persistent State = " + fetchPersistentState);
+ integrator.integrateState(state, targetRoot.getDelegationTarget(), targetRoot.getFqn(), fetchPersistentState);
if (log.isDebugEnabled())
log.debug("successfully integrated state in " + (System.currentTimeMillis() - startTime) + " msec");
Modified: core/trunk/src/main/java/org/jboss/cache/statetransfer/LegacyStateTransferIntegrator.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/statetransfer/LegacyStateTransferIntegrator.java 2008-07-30 16:06:38 UTC (rev 6445)
+++ core/trunk/src/main/java/org/jboss/cache/statetransfer/LegacyStateTransferIntegrator.java 2008-07-30 17:13:20 UTC (rev 6446)
@@ -63,7 +63,7 @@
this.internalFqns = cache.getInternalFqns();
}
- public void integrateState(ObjectInputStream ois, Object target, Fqn targetFqn) throws Exception
+ public void integrateState(ObjectInputStream ois, Object target, Fqn targetFqn, boolean integratePersistentState) throws Exception
{
// pop version from the stream first!
short version = (Short) cache.getMarshaller().objectFromObjectStream(ois);
@@ -72,7 +72,7 @@
// read another marker for the dummy associated state
if (trace) log.trace("Reading marker for nonexistent associated state");
cache.getMarshaller().objectFromObjectStream(ois);
- integratePersistentState(ois, targetFqn);
+ if (integratePersistentState) integratePersistentState(ois, targetFqn);
}
protected void integrateTransientState(ObjectInputStream in, NodeSPI target) throws Exception
Modified: core/trunk/src/main/java/org/jboss/cache/statetransfer/LegacyStateTransferManager.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/statetransfer/LegacyStateTransferManager.java 2008-07-30 16:06:38 UTC (rev 6445)
+++ core/trunk/src/main/java/org/jboss/cache/statetransfer/LegacyStateTransferManager.java 2008-07-30 17:13:20 UTC (rev 6446)
@@ -121,7 +121,7 @@
{
log.debug("starting state integration at node " + targetRoot);
}
- integrator.integrateState(state, targetRoot, targetRoot.getFqn());
+ integrator.integrateState(state, targetRoot, targetRoot.getFqn(), fetchPersistentState);
if (log.isDebugEnabled())
{
log.debug("successfully integrated state in " + (System.currentTimeMillis() - startTime) + " msec");
Modified: core/trunk/src/main/java/org/jboss/cache/statetransfer/StateTransferIntegrator.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/statetransfer/StateTransferIntegrator.java 2008-07-30 16:06:38 UTC (rev 6445)
+++ core/trunk/src/main/java/org/jboss/cache/statetransfer/StateTransferIntegrator.java 2008-07-30 17:13:20 UTC (rev 6446)
@@ -16,5 +16,5 @@
public interface StateTransferIntegrator
{
// TODO: target is an Object to support both InternalNodes and NodeSPIs.
- void integrateState(ObjectInputStream ois, Object target, Fqn targetFqn) throws Exception;
+ void integrateState(ObjectInputStream ois, Object target, Fqn targetFqn, boolean integratePersistentState) throws Exception;
}
\ No newline at end of file
Modified: core/trunk/src/test/java/org/jboss/cache/statetransfer/StateTransfer200Test.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/statetransfer/StateTransfer200Test.java 2008-07-30 16:06:38 UTC (rev 6445)
+++ core/trunk/src/test/java/org/jboss/cache/statetransfer/StateTransfer200Test.java 2008-07-30 17:13:20 UTC (rev 6446)
@@ -11,12 +11,12 @@
import org.jboss.cache.CacheSPI;
import org.jboss.cache.Fqn;
import org.jboss.cache.Region;
-import org.jboss.cache.util.TestingUtil;
import org.jboss.cache.buddyreplication.BuddyFqnTransformer;
import org.jboss.cache.buddyreplication.BuddyManager;
import org.jboss.cache.config.BuddyReplicationConfig;
import org.jboss.cache.loader.CacheLoader;
import org.jboss.cache.marshall.MarshalledValue;
+import org.jboss.cache.util.TestingUtil;
import static org.testng.AssertJUnit.*;
import org.testng.annotations.Test;
@@ -407,7 +407,7 @@
public void testStalePersistentState() throws Exception
{
- CacheSPI c1 = createCache("1", true, false, true, false);
+ CacheSPI c1 = createCache("1", true, false, true, true);
c1.put(A, "K", "V");
assert c1.get(A, "K").equals("V");
@@ -423,10 +423,12 @@
assert l1.exists(A);
assert l1.get(A).get("K").equals("V");
- Cache c2 = createCache("2", true, false, true, false);
+ Cache c2 = createCache("2", true, false, true, true);
c2.put(B, "K", "V");
+ assert c1.getConfiguration().isFetchInMemoryState();
+ assert c1.getConfiguration().getCacheLoaderConfig().isFetchPersistentState();
c1.start();
assert c1.get(B, "K").equals("V");
Modified: core/trunk/src/test/java/org/jboss/cache/statetransfer/StateTransferTestBase.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/statetransfer/StateTransferTestBase.java 2008-07-30 16:06:38 UTC (rev 6445)
+++ core/trunk/src/test/java/org/jboss/cache/statetransfer/StateTransferTestBase.java 2008-07-30 17:13:20 UTC (rev 6446)
@@ -269,17 +269,12 @@
cache1.put(A_C, "name", BOB);
cache1.put(A_C, "age", FORTY);
- CacheSPI<Object, Object> cache2 = createCache("cache2", false, false, cacheLoaderClass2, asyncLoader, false, true);
+ CacheSPI<Object, Object> cache2 = createCache("cache2", false, false, cacheLoaderClass2, asyncLoader, true, true);
- cache2.start();
-
// Pause to give caches time to see each other
TestingUtil.blockUntilViewsReceived(new CacheSPI[]{cache1, cache2}, 60000);
- if (asyncLoader)
- {
- TestingUtil.sleepThread((long) 100);
- }
+ if (asyncLoader) TestingUtil.sleepThread(100);
CacheLoader loader = cache2.getCacheLoaderManager().getCacheLoader();
16 years, 4 months
JBoss Cache SVN: r6445 - core/trunk/src/main/java/org/jboss/cache.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2008-07-30 12:06:38 -0400 (Wed, 30 Jul 2008)
New Revision: 6445
Modified:
core/trunk/src/main/java/org/jboss/cache/RegionManagerImpl.java
Log:
Removed excessive debug
Modified: core/trunk/src/main/java/org/jboss/cache/RegionManagerImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/RegionManagerImpl.java 2008-07-30 15:45:51 UTC (rev 6444)
+++ core/trunk/src/main/java/org/jboss/cache/RegionManagerImpl.java 2008-07-30 16:06:38 UTC (rev 6445)
@@ -70,12 +70,10 @@
protected final void lock(Fqn fqn)
{
regionLocks.getLock(fqn).lock();
- if (trace) log.trace("LOCKED " + fqn, new Throwable());
}
protected final void unlock(Fqn fqn)
{
- if (trace) log.trace("UNLOCKED " + fqn, new Throwable());
regionLocks.getLock(fqn).unlock();
}
16 years, 4 months
JBoss Cache SVN: r6444 - core/trunk/src/main/java/org/jboss/cache.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2008-07-30 11:45:51 -0400 (Wed, 30 Jul 2008)
New Revision: 6444
Modified:
core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
Log:
Catch exceptions on state retrieval and retry
Modified: core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java 2008-07-30 15:45:12 UTC (rev 6443)
+++ core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java 2008-07-30 15:45:51 UTC (rev 6444)
@@ -518,24 +518,33 @@
boolean successfulTransfer = false;
for (Address target : targets)
{
- if (log.isDebugEnabled())
- log.debug("Node " + getLocalAddress() + " fetching partial state " + stateId + " from member " + target);
- messageListener.setStateSet(false);
- successfulTransfer = channel.getState(target, stateId, configuration.getStateRetrievalTimeout());
- if (successfulTransfer)
+ try
{
- try
+ if (log.isDebugEnabled())
+ log.debug("Node " + getLocalAddress() + " fetching partial state " + stateId + " from member " + target);
+ messageListener.setStateSet(false);
+ successfulTransfer = channel.getState(target, stateId, configuration.getStateRetrievalTimeout());
+ if (successfulTransfer)
{
- messageListener.waitForState();
+ try
+ {
+ messageListener.waitForState();
+ }
+ catch (Exception transferFailed)
+ {
+ successfulTransfer = false;
+ }
}
- catch (Exception transferFailed)
- {
- successfulTransfer = false;
- }
+ if (log.isDebugEnabled())
+ log.debug("Node " + getLocalAddress() + " fetching partial state " + stateId + " from member " + target + (successfulTransfer ? " successful" : " failed"));
+ if (successfulTransfer) break;
}
- if (log.isDebugEnabled())
- log.debug("Node " + getLocalAddress() + " fetching partial state " + stateId + " from member " + target + (successfulTransfer ? " successful" : " failed"));
- if (successfulTransfer) break;
+ catch (IllegalStateException ise)
+ {
+ // thrown by the JGroups channel if state retrieval fails.
+ if (log.isInfoEnabled())
+ log.info("Channel problems fetching state. Continuing on to next provider. ", ise);
+ }
}
if (!successfulTransfer)
@@ -543,6 +552,7 @@
if (log.isDebugEnabled())
log.debug("Node " + getLocalAddress() + " could not fetch partial state " + stateId + " from any member " + targets);
}
+
}
// ------------ END: Partial state transfer methods ------------
16 years, 4 months
JBoss Cache SVN: r6443 - core/trunk/src/main/java/org/jboss/cache.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2008-07-30 11:45:12 -0400 (Wed, 30 Jul 2008)
New Revision: 6443
Modified:
core/trunk/src/main/java/org/jboss/cache/LegacyRegionManagerImpl.java
core/trunk/src/main/java/org/jboss/cache/RegionManagerImpl.java
Log:
Use StripedLock instead of a Set to keep track of concurrent activations and deactivations!!!
cached log trace level
Modified: core/trunk/src/main/java/org/jboss/cache/LegacyRegionManagerImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/LegacyRegionManagerImpl.java 2008-07-30 15:43:05 UTC (rev 6442)
+++ core/trunk/src/main/java/org/jboss/cache/LegacyRegionManagerImpl.java 2008-07-30 15:45:12 UTC (rev 6443)
@@ -32,14 +32,6 @@
@Override
protected void inactivateRegion(Fqn fqn) throws CacheException
{
- if (isActivatingDeactivating(fqn))
- {
-// throw new CacheException("Region " + fqn + " is already being activated/deactivated");
- log.warn("Region " + fqn + " is already being activated/deactivated");
- return;
- }
-
-
NodeSPI parent = null;
NodeSPI subtreeRoot = null;
boolean parentLocked = false;
@@ -48,7 +40,7 @@
try
{
// Record that this fqn is in status change, so can't provide state
- activationChangeNodes.add(fqn);
+ lock(fqn);
if (!isInactive(fqn))
{
@@ -142,7 +134,7 @@
}
}
- activationChangeNodes.remove(fqn);
+ unlock(fqn);
}
}
Modified: core/trunk/src/main/java/org/jboss/cache/RegionManagerImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/RegionManagerImpl.java 2008-07-30 15:43:05 UTC (rev 6442)
+++ core/trunk/src/main/java/org/jboss/cache/RegionManagerImpl.java 2008-07-30 15:45:12 UTC (rev 6443)
@@ -19,11 +19,12 @@
import org.jboss.cache.invocation.InvocationContext;
import org.jboss.cache.lock.LockManager;
import static org.jboss.cache.lock.LockType.WRITE;
+import org.jboss.cache.util.concurrent.locks.LockContainer;
+import org.jboss.cache.util.concurrent.locks.ReentrantLockContainer;
import org.jgroups.Address;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -45,19 +46,39 @@
private RegionRegistry regionsRegistry;
private boolean defaultInactive;
- protected final Log log = LogFactory.getLog(RegionManagerImpl.class);
+ protected static final Log log = LogFactory.getLog(RegionManagerImpl.class);
+ protected static final boolean trace = log.isTraceEnabled();
CacheSPI<?, ?> cache;
private boolean usingEvictions;
private EvictionConfig evictionConfig;
private final EvictionTimerTask evictionTimerTask = new EvictionTimerTask();
- protected final Set<Fqn> activationChangeNodes = Collections.synchronizedSet(new HashSet<Fqn>());
+ private final LockContainer<Fqn> regionLocks = new ReentrantLockContainer<Fqn>(4);
protected Configuration configuration;
protected RPCManager rpcManager;
protected LockManager lockManager;
protected BuddyFqnTransformer buddyFqnTransformer;
private boolean isUsingBR;
+ // -------- region lock helpers
+
+ protected final boolean isRegionLocked(Fqn fqn)
+ {
+ return regionLocks.isLocked(fqn);
+ }
+
+ protected final void lock(Fqn fqn)
+ {
+ regionLocks.getLock(fqn).lock();
+ if (trace) log.trace("LOCKED " + fqn, new Throwable());
+ }
+
+ protected final void unlock(Fqn fqn)
+ {
+ if (trace) log.trace("UNLOCKED " + fqn, new Throwable());
+ regionLocks.getLock(fqn).unlock();
+ }
+
@Inject
public void injectDependencies(CacheSPI cache, Configuration configuration, RPCManager rpcManager, LockManager lockManager,
BuddyFqnTransformer transformer, RegionRegistry regionsRegistry)
@@ -109,7 +130,7 @@
protected void destroy()
{
regionsRegistry.clear();
- activationChangeNodes.clear();
+ regionLocks.reset();
}
public boolean isUsingEvictions()
@@ -169,7 +190,7 @@
fqn = buddyFqnTransformer.getActualFqn(fqn);
}
- if (log.isTraceEnabled()) log.trace("Contents of RegionsRegistry: " + regionsRegistry);
+ if (trace) log.trace("Contents of RegionsRegistry: " + regionsRegistry);
Fqn fqnToUse = fqn;
if (DEFAULT_REGION.equals(fqnToUse)) fqnToUse = Fqn.ROOT;
// first see if a region for this specific Fqn exists
@@ -210,7 +231,7 @@
if (regionsRegistry.containsKey(nextFqn))
{
Region r = regionsRegistry.get(nextFqn);
- if (log.isTraceEnabled()) log.trace("Trying next region " + nextFqn + " and got " + r);
+ if (trace) log.trace("Trying next region " + nextFqn + " and got " + r);
// this is a very poor way of telling whether a region is a marshalling one or an eviction one. :-(
// mandates that class loaders be registered for marshalling regions.
@@ -271,7 +292,7 @@
{
try
{
- if (log.isTraceEnabled()) log.trace("Activating region " + fqn);
+ if (trace) log.trace("Activating region " + fqn);
Region r = getRegion(fqn, false);
if (r != null)
{
@@ -339,11 +360,6 @@
// Check whether the node already exists and has data
Node subtreeRoot = cache.getNode(fqn); // NOTE this used to be a peek!
- if (isActivatingDeactivating(fqn))
- {
- throw new CacheException("Region " + fqn + " is already being activated/deactivated");
- }
-
if (log.isDebugEnabled())
{
log.debug("activating " + fqn);
@@ -354,7 +370,7 @@
// Add this fqn to the set of those we are activating
// so calls to _getState for the fqn can return quickly
- activationChangeNodes.add(fqn);
+ lock(fqn);
BuddyManager buddyManager = cache.getBuddyManager();
// Request partial state from the cluster and integrate it
@@ -366,8 +382,9 @@
{
// We'll update this node with the state we receive
// need to obtain all necessary locks.
+ Node root = cache.getRoot();
cache.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
- subtreeRoot = cache.getRoot().addChild(fqn);
+ subtreeRoot = root.addChild(fqn);
cache.getInvocationContext().getOptionOverrides().setCacheModeLocal(false);
}
@@ -440,7 +457,7 @@
}
finally
{
- activationChangeNodes.remove(fqn);
+ unlock(fqn);
}
}
@@ -466,13 +483,6 @@
*/
protected void inactivateRegion(Fqn fqn) throws CacheException
{
- if (isActivatingDeactivating(fqn))
- {
- log.warn("Region " + fqn + " is already being activated/deactivated. Not doing anything.");
- return;
- }
-// throw new CacheException("Region " + fqn + " is already being activated/deactivated");
-
NodeSPI parent = null;
NodeSPI subtreeRoot = null;
boolean parentLocked = false;
@@ -483,7 +493,7 @@
try
{
// Record that this fqn is in status change, so can't provide state
- activationChangeNodes.add(fqn);
+ lock(fqn);
if (!isInactive(fqn))
{
@@ -579,24 +589,10 @@
// If necessary, release locks
if (ctx != null) lockManager.unlock(ctx);
- activationChangeNodes.remove(fqn);
+ unlock(fqn);
}
}
- /**
- * <p/>
- * This is legacy code and should not be called directly. This is a private method for now and will be refactored out.
- * You should be using {@link #activate(Fqn)} and {@link #deactivate(Fqn)}
- * <p/>
- *
- * @param fqn fqn of the region
- * @return true if the region defined by the fqn is in the process of activating/deactivating
- */
- protected boolean isActivatingDeactivating(Fqn fqn)
- {
- return activationChangeNodes.contains(fqn);
- }
-
public boolean hasRegion(Fqn fqn, Region.Type type)
{
Region r = regionsRegistry.get(fqn);
@@ -728,7 +724,7 @@
for (EvictionRegionConfig erc : ercs)
{
Fqn fqn = erc.getRegionFqn();
- if (log.isTraceEnabled()) log.trace("Creating eviction region " + fqn);
+ if (trace) log.trace("Creating eviction region " + fqn);
if (fqn.equals(DEFAULT_REGION))
{
@@ -736,7 +732,7 @@
{
throw new ConfigurationException("A default region for evictions has already been set for this cache");
}
- if (log.isTraceEnabled()) log.trace("Applying settings for " + DEFAULT_REGION + " to Fqn.ROOT");
+ if (trace) log.trace("Applying settings for " + DEFAULT_REGION + " to Fqn.ROOT");
fqn = Fqn.ROOT;
setDefault = true;
}
16 years, 4 months
JBoss Cache SVN: r6442 - in core/trunk/src: main/java/org/jboss/cache/util/concurrent/locks and 2 other directories.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2008-07-30 11:43:05 -0400 (Wed, 30 Jul 2008)
New Revision: 6442
Added:
core/trunk/src/main/java/org/jboss/cache/util/concurrent/locks/LockContainer.java
core/trunk/src/main/java/org/jboss/cache/util/concurrent/locks/OwnableReentrantLockContainer.java
core/trunk/src/main/java/org/jboss/cache/util/concurrent/locks/ReentrantLockContainer.java
Modified:
core/trunk/src/main/java/org/jboss/cache/lock/MVCCLockManager.java
core/trunk/src/main/java/org/jboss/cache/util/concurrent/locks/OwnableReentrantLock.java
core/trunk/src/test/java/org/jboss/cache/api/mvcc/LockAssert.java
core/trunk/src/test/java/org/jboss/cache/lock/MVCCLockManagerRecordingTest.java
Log:
Exracted LockContainer to separate class for better reuse
Modified: core/trunk/src/main/java/org/jboss/cache/lock/MVCCLockManager.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/lock/MVCCLockManager.java 2008-07-30 15:42:20 UTC (rev 6441)
+++ core/trunk/src/main/java/org/jboss/cache/lock/MVCCLockManager.java 2008-07-30 15:43:05 UTC (rev 6442)
@@ -1,6 +1,5 @@
package org.jboss.cache.lock;
-import net.jcip.annotations.ThreadSafe;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.cache.CacheSPI;
@@ -14,10 +13,12 @@
import org.jboss.cache.invocation.InvocationContext;
import org.jboss.cache.invocation.InvocationContextContainer;
import static org.jboss.cache.lock.LockType.READ;
+import org.jboss.cache.util.concurrent.locks.LockContainer;
import org.jboss.cache.util.concurrent.locks.OwnableReentrantLock;
+import org.jboss.cache.util.concurrent.locks.OwnableReentrantLockContainer;
+import org.jboss.cache.util.concurrent.locks.ReentrantLockContainer;
import javax.transaction.TransactionManager;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -26,7 +27,6 @@
import java.util.Set;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
/**
* This lock manager acquires and releases locks based on the Fqn passed in and not on the node itself. The main benefit
@@ -48,7 +48,7 @@
*/
public class MVCCLockManager extends FqnLockManager
{
- LockContainer lockContainer;
+ LockContainer<Fqn> lockContainer;
DataContainer dataContainer;
private Set<Fqn> internalFqns;
private CacheSPI<?, ?> cache;
@@ -69,7 +69,7 @@
@Start
public void startLockManager()
{
- lockContainer = transactionManager == null ? new ReentrantLockContainer() : new OwnableReentrantLockContainer();
+ lockContainer = transactionManager == null ? new ReentrantLockContainer<Fqn>(configuration.getConcurrencyLevel()) : new OwnableReentrantLockContainer<Fqn>(configuration.getConcurrencyLevel(), invocationContextContainer);
}
@Start
@@ -281,150 +281,4 @@
{
return lockContainer.toString();
}
-
- @ThreadSafe
- public abstract class LockContainer
- {
- private final int lockSegmentMask;
- private final int lockSegmentShift;
-
- /**
- * Creates a new LockContainer which uses a certain number of shared locks across all elements that need to be locked.
- */
- LockContainer()
- {
- int tempLockSegShift = 0;
- int numLocks = 1;
- while (numLocks < configuration.getConcurrencyLevel())
- {
- ++tempLockSegShift;
- numLocks <<= 1;
- }
- lockSegmentShift = 32 - tempLockSegShift;
- lockSegmentMask = numLocks - 1;
-
- initLocks(numLocks);
- }
-
- final int hashToIndex(Fqn fqn)
- {
- return (hash(fqn) >>> lockSegmentShift) & lockSegmentMask;
- }
-
- /**
- * Returns a hash code for non-null Object x.
- * Uses the same hash code spreader as most other java.util hash tables, except that this uses the string representation
- * of the object passed in.
- *
- * @param fqn the object serving as a key
- * @return the hash code
- */
- final int hash(Fqn fqn)
- {
- int h = fqn.hashCode();
- h += ~(h << 9);
- h ^= (h >>> 14);
- h += (h << 4);
- h ^= (h >>> 10);
- return h;
- }
-
- abstract void initLocks(int numLocks);
-
- abstract boolean ownsLock(Fqn fqn, Object owner);
-
- abstract boolean isLocked(Fqn fqn);
-
- abstract Lock getLock(Fqn fqn);
-
- public abstract int getNumLocksHeld();
- }
-
- public class ReentrantLockContainer extends LockContainer
- {
- ReentrantLock[] sharedLocks;
-
- void initLocks(int numLocks)
- {
- sharedLocks = new ReentrantLock[numLocks];
- for (int i = 0; i < numLocks; i++) sharedLocks[i] = new ReentrantLock();
- }
-
- ReentrantLock getLock(Fqn fqn)
- {
- ReentrantLock l = sharedLocks[hashToIndex(fqn)];
- if (trace) log.trace("Found lock " + l + " for fqn " + fqn);
- return l;
- }
-
- public int getNumLocksHeld()
- {
- int i = 0;
- for (ReentrantLock l : sharedLocks) if (l.isLocked()) i++;
- return i;
- }
-
- boolean ownsLock(Fqn fqn, Object owner)
- {
- ReentrantLock lock = getLock(fqn);
- return lock.isHeldByCurrentThread();
- }
-
- boolean isLocked(Fqn fqn)
- {
- ReentrantLock lock = getLock(fqn);
- return lock.isLocked();
- }
-
- public String toString()
- {
- return "ReentrantLockContainer{" +
- "sharedLocks=" + (sharedLocks == null ? null : Arrays.asList(sharedLocks)) +
- '}';
- }
- }
-
- public class OwnableReentrantLockContainer extends LockContainer
- {
- OwnableReentrantLock[] sharedLocks;
-
- void initLocks(int numLocks)
- {
- sharedLocks = new OwnableReentrantLock[numLocks];
- for (int i = 0; i < numLocks; i++) sharedLocks[i] = new OwnableReentrantLock(invocationContextContainer);
- }
-
- OwnableReentrantLock getLock(Fqn fqn)
- {
- OwnableReentrantLock l = sharedLocks[hashToIndex(fqn)];
- if (trace) log.trace("Found lock " + l + " for fqn " + fqn);
- return l;
- }
-
- boolean ownsLock(Fqn fqn, Object owner)
- {
- OwnableReentrantLock lock = getLock(fqn);
- return owner.equals(lock.getOwner());
- }
-
- boolean isLocked(Fqn fqn)
- {
- OwnableReentrantLock lock = getLock(fqn);
- return lock.isLocked();
- }
-
- public int getNumLocksHeld()
- {
- int i = 0;
- for (OwnableReentrantLock l : sharedLocks) if (l.isLocked()) i++;
- return i;
- }
-
- public String toString()
- {
- return "OwnableReentrantLockContainer{" +
- "sharedLocks=" + (sharedLocks == null ? null : Arrays.asList(sharedLocks)) +
- '}';
- }
- }
}
Added: core/trunk/src/main/java/org/jboss/cache/util/concurrent/locks/LockContainer.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/util/concurrent/locks/LockContainer.java (rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/util/concurrent/locks/LockContainer.java 2008-07-30 15:43:05 UTC (rev 6442)
@@ -0,0 +1,89 @@
+package org.jboss.cache.util.concurrent.locks;
+
+import net.jcip.annotations.ThreadSafe;
+
+import java.util.concurrent.locks.Lock;
+
+/**
+ * A container for locks. Used with lock striping.
+ *
+ * @author Manik Surtani (<a href="mailto:manik@jboss.org">manik(a)jboss.org</a>)
+ * @since 3.0
+ */
+@ThreadSafe
+public abstract class LockContainer<E>
+{
+ private int lockSegmentMask;
+ private int lockSegmentShift;
+
+
+ protected int calculateNumberOfSegments(int concurrencyLevel)
+ {
+ int tempLockSegShift = 0;
+ int numLocks = 1;
+ while (numLocks < concurrencyLevel)
+ {
+ ++tempLockSegShift;
+ numLocks <<= 1;
+ }
+ lockSegmentShift = 32 - tempLockSegShift;
+ lockSegmentMask = numLocks - 1;
+ return numLocks;
+ }
+
+ public final int hashToIndex(E object)
+ {
+ return (hash(object) >>> lockSegmentShift) & lockSegmentMask;
+ }
+
+ /**
+ * Returns a hash code for non-null Object x.
+ * Uses the same hash code spreader as most other java.util hash tables, except that this uses the string representation
+ * of the object passed in.
+ *
+ * @param object the object serving as a key
+ * @return the hash code
+ */
+ final int hash(E object)
+ {
+ int h = object.hashCode();
+ h += ~(h << 9);
+ h ^= (h >>> 14);
+ h += (h << 4);
+ h ^= (h >>> 10);
+ return h;
+ }
+
+ protected abstract void initLocks(int numLocks);
+
+ /**
+ * Tests if a give owner owns a lock on a specified object.
+ *
+ * @param object object to check
+ * @param owner owner to test
+ * @return true if owner owns lock, false otherwise
+ */
+ public abstract boolean ownsLock(E object, Object owner);
+
+ /**
+ * @param object object
+ * @return true if an object is locked, false otherwise
+ */
+ public abstract boolean isLocked(E object);
+
+ /**
+ * @param object object
+ * @return the lock for a specific object
+ */
+ public abstract Lock getLock(E object);
+
+ /**
+ * @return number of locks held
+ */
+ public abstract int getNumLocksHeld();
+
+ /**
+ * Clears all locks held and re-initialises stripes.
+ */
+ public abstract void reset();
+}
Modified: core/trunk/src/main/java/org/jboss/cache/util/concurrent/locks/OwnableReentrantLock.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/util/concurrent/locks/OwnableReentrantLock.java 2008-07-30 15:42:20 UTC (rev 6441)
+++ core/trunk/src/main/java/org/jboss/cache/util/concurrent/locks/OwnableReentrantLock.java 2008-07-30 15:43:05 UTC (rev 6442)
@@ -42,6 +42,8 @@
*/
public OwnableReentrantLock(InvocationContextContainer invocationContextContainer)
{
+ if (invocationContextContainer == null)
+ throw new IllegalArgumentException("Invocation context container cannot be null!");
this.invocationContextContainer = invocationContextContainer;
}
Added: core/trunk/src/main/java/org/jboss/cache/util/concurrent/locks/OwnableReentrantLockContainer.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/util/concurrent/locks/OwnableReentrantLockContainer.java (rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/util/concurrent/locks/OwnableReentrantLockContainer.java 2008-07-30 15:43:05 UTC (rev 6442)
@@ -0,0 +1,75 @@
+package org.jboss.cache.util.concurrent.locks;
+
+import net.jcip.annotations.ThreadSafe;
+import org.jboss.cache.invocation.InvocationContextContainer;
+
+import java.util.Arrays;
+
+/**
+ * A LockContainer that holds {@link org.jboss.cache.util.concurrent.locks.OwnableReentrantLock}s.
+ *
+ * @author Manik Surtani (<a href="mailto:manik@jboss.org">manik(a)jboss.org</a>)
+ * @see org.jboss.cache.util.concurrent.locks.ReentrantLockContainer
+ * @see org.jboss.cache.util.concurrent.locks.OwnableReentrantLock
+ * @since 3.0
+ */
+@ThreadSafe
+public class OwnableReentrantLockContainer<E> extends LockContainer<E>
+{
+ OwnableReentrantLock[] sharedLocks;
+ InvocationContextContainer icc;
+
+ /**
+ * Creates a new LockContainer which uses a certain number of shared locks across all elements that need to be locked.
+ *
+ * @param concurrencyLevel concurrency level for number of stripes to create. Stripes are created in powers of two, with a minimum of concurrencyLevel created.
+ * @param icc invocation context container to use
+ */
+ public OwnableReentrantLockContainer(int concurrencyLevel, InvocationContextContainer icc)
+ {
+ this.icc = icc;
+ initLocks(calculateNumberOfSegments(concurrencyLevel));
+ }
+
+ protected void initLocks(int numLocks)
+ {
+ sharedLocks = new OwnableReentrantLock[numLocks];
+ for (int i = 0; i < numLocks; i++) sharedLocks[i] = new OwnableReentrantLock(icc);
+ }
+
+ public final OwnableReentrantLock getLock(E object)
+ {
+ return sharedLocks[hashToIndex(object)];
+ }
+
+ public final boolean ownsLock(E object, Object owner)
+ {
+ OwnableReentrantLock lock = getLock(object);
+ return owner.equals(lock.getOwner());
+ }
+
+ public final boolean isLocked(E object)
+ {
+ OwnableReentrantLock lock = getLock(object);
+ return lock.isLocked();
+ }
+
+ public final int getNumLocksHeld()
+ {
+ int i = 0;
+ for (OwnableReentrantLock l : sharedLocks) if (l.isLocked()) i++;
+ return i;
+ }
+
+ public String toString()
+ {
+ return "OwnableReentrantLockContainer{" +
+ "sharedLocks=" + (sharedLocks == null ? null : Arrays.asList(sharedLocks)) +
+ '}';
+ }
+
+ public void reset()
+ {
+ initLocks(sharedLocks.length);
+ }
+}
Added: core/trunk/src/main/java/org/jboss/cache/util/concurrent/locks/ReentrantLockContainer.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/util/concurrent/locks/ReentrantLockContainer.java (rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/util/concurrent/locks/ReentrantLockContainer.java 2008-07-30 15:43:05 UTC (rev 6442)
@@ -0,0 +1,71 @@
+package org.jboss.cache.util.concurrent.locks;
+
+import net.jcip.annotations.ThreadSafe;
+
+import java.util.Arrays;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * A LockContainer that holds ReentrantLocks
+ *
+ * @author Manik Surtani (<a href="mailto:manik@jboss.org">manik(a)jboss.org</a>)
+ * @see org.jboss.cache.util.concurrent.locks.OwnableReentrantLockContainer
+ * @since 3.0
+ */
+@ThreadSafe
+public class ReentrantLockContainer<E> extends LockContainer<E>
+{
+ ReentrantLock[] sharedLocks;
+
+ /**
+ * Creates a new LockContainer which uses a certain number of shared locks across all elements that need to be locked.
+ *
+ * @param concurrencyLevel concurrency level for number of stripes to create. Stripes are created in powers of two, with a minimum of concurrencyLevel created.
+ */
+ public ReentrantLockContainer(int concurrencyLevel)
+ {
+ initLocks(calculateNumberOfSegments(concurrencyLevel));
+ }
+
+ protected void initLocks(int numLocks)
+ {
+ sharedLocks = new ReentrantLock[numLocks];
+ for (int i = 0; i < numLocks; i++) sharedLocks[i] = new ReentrantLock();
+ }
+
+ public final ReentrantLock getLock(E object)
+ {
+ return sharedLocks[hashToIndex(object)];
+ }
+
+ public final int getNumLocksHeld()
+ {
+ int i = 0;
+ for (ReentrantLock l : sharedLocks) if (l.isLocked()) i++;
+ return i;
+ }
+
+ public final boolean ownsLock(E object, Object owner)
+ {
+ ReentrantLock lock = getLock(object);
+ return lock.isHeldByCurrentThread();
+ }
+
+ public final boolean isLocked(E object)
+ {
+ ReentrantLock lock = getLock(object);
+ return lock.isLocked();
+ }
+
+ public String toString()
+ {
+ return "ReentrantLockContainer{" +
+ "sharedLocks=" + (sharedLocks == null ? null : Arrays.asList(sharedLocks)) +
+ '}';
+ }
+
+ public void reset()
+ {
+ initLocks(sharedLocks.length);
+ }
+}
Modified: core/trunk/src/test/java/org/jboss/cache/api/mvcc/LockAssert.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/api/mvcc/LockAssert.java 2008-07-30 15:42:20 UTC (rev 6441)
+++ core/trunk/src/test/java/org/jboss/cache/api/mvcc/LockAssert.java 2008-07-30 15:43:05 UTC (rev 6442)
@@ -3,8 +3,8 @@
import org.jboss.cache.Fqn;
import org.jboss.cache.invocation.InvocationContextContainer;
import org.jboss.cache.lock.LockManager;
-import org.jboss.cache.lock.MVCCLockManager.LockContainer;
import org.jboss.cache.util.TestingUtil;
+import org.jboss.cache.util.concurrent.locks.LockContainer;
/**
* Helper class to assert lock status in MVCC
Modified: core/trunk/src/test/java/org/jboss/cache/lock/MVCCLockManagerRecordingTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/lock/MVCCLockManagerRecordingTest.java 2008-07-30 15:42:20 UTC (rev 6441)
+++ core/trunk/src/test/java/org/jboss/cache/lock/MVCCLockManagerRecordingTest.java 2008-07-30 15:43:05 UTC (rev 6442)
@@ -4,9 +4,9 @@
import org.jboss.cache.config.Configuration;
import org.jboss.cache.factories.context.MVCCContextFactory;
import org.jboss.cache.invocation.InvocationContextContainer;
-import org.jboss.cache.lock.MVCCLockManager.LockContainer;
import org.jboss.cache.transaction.DummyTransactionManager;
import org.jboss.cache.util.TestingUtil;
+import org.jboss.cache.util.concurrent.locks.LockContainer;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
16 years, 4 months