[infinispan-commits] Infinispan SVN: r883 - in trunk: core/src/main/java/org/infinispan/container and 6 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Tue Sep 29 10:38:05 EDT 2009
Author: manik.surtani at jboss.com
Date: 2009-09-29 10:38:05 -0400 (Tue, 29 Sep 2009)
New Revision: 883
Modified:
trunk/cachestore/bdbje/src/main/java/org/infinispan/loaders/bdbje/BdbjeCacheStore.java
trunk/core/src/main/java/org/infinispan/container/FIFODataContainer.java
trunk/core/src/main/java/org/infinispan/container/LRUDataContainer.java
trunk/core/src/main/java/org/infinispan/container/SimpleDataContainer.java
trunk/core/src/main/java/org/infinispan/factories/DataContainerFactory.java
trunk/core/src/main/java/org/infinispan/interceptors/CacheStoreInterceptor.java
trunk/core/src/main/java/org/infinispan/loaders/AbstractCacheStore.java
trunk/core/src/main/java/org/infinispan/loaders/decorators/AsyncStore.java
trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java
trunk/core/src/test/java/org/infinispan/container/FIFODataContainerTest.java
trunk/core/src/test/java/org/infinispan/container/LRUDataContainerTest.java
trunk/core/src/test/java/org/infinispan/container/SimpleDataContainerTest.java
Log:
[ISPN-198] (Ability to tune data container concurrency levels)
Modified: trunk/cachestore/bdbje/src/main/java/org/infinispan/loaders/bdbje/BdbjeCacheStore.java
===================================================================
--- trunk/cachestore/bdbje/src/main/java/org/infinispan/loaders/bdbje/BdbjeCacheStore.java 2009-09-29 12:49:01 UTC (rev 882)
+++ trunk/cachestore/bdbje/src/main/java/org/infinispan/loaders/bdbje/BdbjeCacheStore.java 2009-09-29 14:38:05 UTC (rev 883)
@@ -39,7 +39,7 @@
* <p/>
* All data access is transactional. Any attempted reads to locked records will block. The maximum duration of this is
* set in nanoseconds via the parameter {@link org.infinispan.loaders.bdbje.BdbjeCacheStoreConfig#getLockAcquistionTimeout()}.
- * Calls to {@link org.infinispan.loaders.CacheStore#prepare(java.util.List prepare} will attempt
+ * Calls to {@link org.infinispan.loaders.CacheStore#prepare(java.util.List, org.infinispan.transaction.xa.GlobalTransaction, boolean)} will attempt
* to resolve deadlocks, retrying up to {@link org.infinispan.loaders.bdbje.BdbjeCacheStoreConfig#getMaxTxRetries()}
* attempts.
* <p/>
@@ -117,7 +117,7 @@
}
private void openTransactionServices() {
- txnMap = new ConcurrentHashMap<GlobalTransaction, Transaction>();
+ txnMap = new ConcurrentHashMap<GlobalTransaction, Transaction>(64, 0.75f, cache.getConfiguration().getConcurrencyLevel());
currentTransaction = factory.createCurrentTransaction(env);
transactionRunner = factory.createPreparableTransactionRunner(env);
}
@@ -225,7 +225,7 @@
/**
* {@inheritDoc} delegates to {@link BdbjeCacheStore#applyModifications(java.util.List)}, if <code>isOnePhase</code>.
- * Otherwise, delegates to {@link BdbjeCacheStore#prepare(java.util.List, javax.transaction.Transaction) prepare}.
+ * Otherwise, delegates to {@link BdbjeCacheStore#prepare(java.util.List, org.infinispan.transaction.xa.GlobalTransaction)}
*/
public void prepare(List<? extends Modification> mods, GlobalTransaction tx, boolean isOnePhase) throws CacheLoaderException {
if (isOnePhase) {
@@ -280,7 +280,7 @@
/**
* {@inheritDoc}
* <p/>
- * This implementation calls {@link BdbjeCacheStore#completeTransaction(javax.transaction.Transaction, boolean)
+ * This implementation calls {@link BdbjeCacheStore#completeTransaction(org.infinispan.transaction.xa.GlobalTransaction, boolean)}
* completeTransaction} with an argument of false.
*/
public void rollback(GlobalTransaction tx) {
@@ -294,7 +294,7 @@
/**
* {@inheritDoc}
* <p/>
- * This implementation calls {@link BdbjeCacheStore#completeTransaction(javax.transaction.Transaction, boolean)
+ * This implementation calls {@link BdbjeCacheStore#completeTransaction(org.infinispan.transaction.xa.GlobalTransaction, boolean)}
* completeTransaction} with an argument of true.
*/
public void commit(GlobalTransaction tx) throws CacheLoaderException {
Modified: trunk/core/src/main/java/org/infinispan/container/FIFODataContainer.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/container/FIFODataContainer.java 2009-09-29 12:49:01 UTC (rev 882)
+++ trunk/core/src/main/java/org/infinispan/container/FIFODataContainer.java 2009-09-29 14:38:05 UTC (rev 883)
@@ -61,10 +61,9 @@
final LinkedEntry head = new LinkedEntry(null), tail = new LinkedEntry(null);
- public FIFODataContainer() {
+ public FIFODataContainer(int concurrencyLevel) {
float loadFactor = 0.75f;
- int initialCapacity = 16;
- int concurrencyLevel = 16;
+ int initialCapacity = 256;
// Find power-of-two sizes best matching arguments
int sshift = 0;
Modified: trunk/core/src/main/java/org/infinispan/container/LRUDataContainer.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/container/LRUDataContainer.java 2009-09-29 12:49:01 UTC (rev 882)
+++ trunk/core/src/main/java/org/infinispan/container/LRUDataContainer.java 2009-09-29 14:38:05 UTC (rev 883)
@@ -16,6 +16,10 @@
*/
public class LRUDataContainer extends FIFODataContainer {
+ public LRUDataContainer(int concurrencyLevel) {
+ super(concurrencyLevel);
+ }
+
@Override
public InternalCacheEntry get(Object k) {
int h = hash(k.hashCode());
Modified: trunk/core/src/main/java/org/infinispan/container/SimpleDataContainer.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/container/SimpleDataContainer.java 2009-09-29 12:49:01 UTC (rev 882)
+++ trunk/core/src/main/java/org/infinispan/container/SimpleDataContainer.java 2009-09-29 14:38:05 UTC (rev 883)
@@ -30,10 +30,15 @@
*/
@ThreadSafe
public class SimpleDataContainer implements DataContainer {
- final ConcurrentMap<Object, InternalCacheEntry> immortalEntries = new ConcurrentHashMap<Object, InternalCacheEntry>();
- final ConcurrentMap<Object, InternalCacheEntry> mortalEntries = new ConcurrentHashMap<Object, InternalCacheEntry>();
+ final ConcurrentMap<Object, InternalCacheEntry> immortalEntries;
+ final ConcurrentMap<Object, InternalCacheEntry> mortalEntries;
final AtomicInteger numEntries = new AtomicInteger(0);
+ public SimpleDataContainer(int concurrencyLevel) {
+ immortalEntries = new ConcurrentHashMap<Object, InternalCacheEntry>(128, 0.75f, concurrencyLevel);
+ mortalEntries = new ConcurrentHashMap<Object, InternalCacheEntry>(64, 0.75f, concurrencyLevel);
+ }
+
/**
* Like a get, but doesn't check for expired entries
*
Modified: trunk/core/src/main/java/org/infinispan/factories/DataContainerFactory.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/factories/DataContainerFactory.java 2009-09-29 12:49:01 UTC (rev 882)
+++ trunk/core/src/main/java/org/infinispan/factories/DataContainerFactory.java 2009-09-29 14:38:05 UTC (rev 883)
@@ -41,11 +41,11 @@
public <T> T construct(Class<T> componentType) {
switch (configuration.getEvictionStrategy()) {
case NONE:
- return (T) new SimpleDataContainer();
+ return (T) new SimpleDataContainer(configuration.getConcurrencyLevel());
case FIFO:
- return (T) new FIFODataContainer();
+ return (T) new FIFODataContainer(configuration.getConcurrencyLevel());
case LRU:
- return (T) new LRUDataContainer();
+ return (T) new LRUDataContainer(configuration.getConcurrencyLevel());
default:
throw new ConfigurationException("Unknown eviction strategy " + configuration.getEvictionStrategy());
}
Modified: trunk/core/src/main/java/org/infinispan/interceptors/CacheStoreInterceptor.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/interceptors/CacheStoreInterceptor.java 2009-09-29 12:49:01 UTC (rev 882)
+++ trunk/core/src/main/java/org/infinispan/interceptors/CacheStoreInterceptor.java 2009-09-29 14:38:05 UTC (rev 883)
@@ -72,8 +72,8 @@
@MBean(objectName = "CacheStore", description = "Component that handles storing of entries to a CacheStore from memory.")
public class CacheStoreInterceptor extends JmxStatsCommandInterceptor {
CacheLoaderManagerConfig loaderConfig = null;
- private Map<GlobalTransaction, Integer> txStores = new ConcurrentHashMap<GlobalTransaction, Integer>();
- private Map<GlobalTransaction, Set<Object>> preparingTxs = new ConcurrentHashMap<GlobalTransaction, Set<Object>>();
+ private Map<GlobalTransaction, Integer> txStores;
+ private Map<GlobalTransaction, Set<Object>> preparingTxs;
final AtomicLong cacheStores = new AtomicLong(0);
CacheStore store;
private CacheLoaderManager loaderManager;
@@ -93,6 +93,8 @@
store = loaderManager.getCacheStore();
this.setStatisticsEnabled(configuration.isExposeJmxStatistics());
loaderConfig = configuration.getCacheLoaderManagerConfig();
+ txStores = new ConcurrentHashMap<GlobalTransaction, Integer>(64, 0.75f, configuration.getConcurrencyLevel());
+ preparingTxs = new ConcurrentHashMap<GlobalTransaction, Set<Object>>(64, 0.75f, configuration.getConcurrencyLevel());
}
/**
Modified: trunk/core/src/main/java/org/infinispan/loaders/AbstractCacheStore.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/loaders/AbstractCacheStore.java 2009-09-29 12:49:01 UTC (rev 882)
+++ trunk/core/src/main/java/org/infinispan/loaders/AbstractCacheStore.java 2009-09-29 14:38:05 UTC (rev 883)
@@ -1,11 +1,11 @@
package org.infinispan.loaders;
import org.infinispan.Cache;
-import org.infinispan.transaction.xa.GlobalTransaction;
import org.infinispan.loaders.modifications.Modification;
import org.infinispan.loaders.modifications.Remove;
import org.infinispan.loaders.modifications.Store;
import org.infinispan.marshall.Marshaller;
+import org.infinispan.transaction.xa.GlobalTransaction;
import org.infinispan.util.concurrent.WithinThreadExecutor;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
@@ -15,9 +15,9 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ConcurrentHashMap;
/**
* An abstract {@link org.infinispan.loaders.CacheStore} that holds common implementations for some methods
@@ -27,20 +27,17 @@
*/
public abstract class AbstractCacheStore extends AbstractCacheLoader implements CacheStore {
- private final Map<GlobalTransaction, List<? extends Modification>> transactions = new ConcurrentHashMap<GlobalTransaction, List<? extends Modification>>();
-
+ private Map<GlobalTransaction, List<? extends Modification>> transactions;
private static Log log = LogFactory.getLog(AbstractCacheStore.class);
-
private AbstractCacheStoreConfig config;
-
private ExecutorService purgerService;
-
protected Marshaller marshaller;
public void init(CacheLoaderConfig config, Cache cache, Marshaller m) throws CacheLoaderException{
this.config = (AbstractCacheStoreConfig) config;
this.marshaller = m;
if (config == null) throw new IllegalStateException("Null config!!!");
+ transactions = new ConcurrentHashMap<GlobalTransaction, List<? extends Modification>>(64, 0.75f, cache.getConfiguration().getConcurrencyLevel());
}
public void start() throws CacheLoaderException {
Modified: trunk/core/src/main/java/org/infinispan/loaders/decorators/AsyncStore.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/loaders/decorators/AsyncStore.java 2009-09-29 12:49:01 UTC (rev 882)
+++ trunk/core/src/main/java/org/infinispan/loaders/decorators/AsyncStore.java 2009-09-29 14:38:05 UTC (rev 883)
@@ -3,9 +3,12 @@
import net.jcip.annotations.GuardedBy;
import org.infinispan.CacheException;
+import org.infinispan.Cache;
+import org.infinispan.marshall.Marshaller;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.loaders.CacheLoaderException;
import org.infinispan.loaders.CacheStore;
+import org.infinispan.loaders.CacheLoaderConfig;
import org.infinispan.loaders.modifications.Clear;
import org.infinispan.loaders.modifications.Modification;
import org.infinispan.loaders.modifications.PurgeExpired;
@@ -72,12 +75,19 @@
private final ReadWriteLock mapLock = new ReentrantReadWriteLock();
private final Lock read = mapLock.readLock();
private final Lock write = mapLock.writeLock();
+ private int concurrencyLevel;
@GuardedBy("mapLock") private ConcurrentMap<Object, Modification> state;
public AsyncStore(CacheStore delegate, AsyncStoreConfig asyncStoreConfig) {
super(delegate);
this.asyncStoreConfig = asyncStoreConfig;
}
+
+ @Override
+ public void init(CacheLoaderConfig config, Cache cache, Marshaller m) throws CacheLoaderException {
+ super.init(config, cache, m);
+ concurrencyLevel = cache.getConfiguration().getConcurrencyLevel();
+ }
public void store(InternalCacheEntry ed) {
enqueue(ed.getKey(), new Store(ed));
@@ -100,7 +110,7 @@
@Override
public void start() throws CacheLoaderException {
- state = new ConcurrentHashMap<Object, Modification>();
+ state = newStateMap();
log.info("Async cache loader starting {0}", this);
stopped.set(false);
super.start();
@@ -233,7 +243,7 @@
* Processes modifications taking the latest updates from a state map.
*/
class AsyncProcessor implements Runnable {
- private ConcurrentMap<Object, Modification> swap = new ConcurrentHashMap<Object, Modification>();
+ private ConcurrentMap<Object, Modification> swap = newStateMap();
public void run() {
while (!Thread.interrupted()) {
@@ -262,7 +272,7 @@
acquireLock(write);
unlock = true;
swap = state;
- state = new ConcurrentHashMap<Object, Modification>();
+ state = newStateMap();
} finally {
if (unlock) write.unlock();
}
@@ -286,4 +296,8 @@
}
}
}
+
+ private ConcurrentMap<Object, Modification> newStateMap() {
+ return new ConcurrentHashMap<Object, Modification>(64, 0.75f, concurrencyLevel);
+ }
}
Modified: trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java 2009-09-29 12:49:01 UTC (rev 882)
+++ trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java 2009-09-29 14:38:05 UTC (rev 883)
@@ -83,6 +83,7 @@
private static final String DEFAULT_JGROUPS_CONFIGURATION_FILE = "flush-udp.xml";
Channel channel;
+ boolean createdChannel = false;
Address address;
volatile List<Address> members = Collections.emptyList();
volatile boolean coordinator = false;
@@ -100,6 +101,20 @@
private final JGroupsDistSync flushTracker = new JGroupsDistSync();
long distributedSyncTimeout;
+ /**
+ * This form is used when the transport is created by an external source and passed in to the GlobalConfiguration.
+ *
+ * @param channel created and running channel to use
+ */
+ public JGroupsTransport(Channel channel) {
+ this.channel = channel;
+ if (channel == null) throw new IllegalArgumentException("Cannot deal with a null channel!");
+ if (channel.isConnected()) throw new IllegalArgumentException("Channel passed in cannot already be connected!");
+ }
+
+ public JGroupsTransport() {
+ }
+
// ------------------------------------------------------------------------------------------------------------------
// Lifecycle and setup stuff
// ------------------------------------------------------------------------------------------------------------------
@@ -163,18 +178,23 @@
}
private void initChannelAndRPCDispatcher() throws CacheException {
- buildChannel();
- // Channel.LOCAL *must* be set to false so we don't see our own messages - otherwise invalidations targeted at
- // remote instances will be received by self.
- String transportNodeName = c.getTransportNodeName();
- if(transportNodeName != null && transportNodeName.length()>0) {
- long range = Short.MAX_VALUE *2;
- long randomInRange = (long)((Math.random() * range) % range) + 1;
- transportNodeName = transportNodeName + "-" + randomInRange;
- channel.setName(transportNodeName);
- }
+ if (channel == null) {
+ createdChannel = true;
+ buildChannel();
+ // Channel.LOCAL *must* be set to false so we don't see our own messages - otherwise invalidations targeted at
+ // remote instances will be received by self.
+ String transportNodeName = c.getTransportNodeName();
+ if (transportNodeName != null && transportNodeName.length() > 0) {
+ long range = Short.MAX_VALUE * 2;
+ long randomInRange = (long) ((Math.random() * range) % range) + 1;
+ transportNodeName = transportNodeName + "-" + randomInRange;
+ channel.setName(transportNodeName);
+ }
+ }
+
channel.setOpt(Channel.LOCAL, false);
channel.setOpt(Channel.BLOCK, true);
+
dispatcher = new CommandAwareRpcDispatcher(channel, this,
asyncExecutor, inboundInvocationHandler, flushTracker, distributedSyncTimeout);
MarshallerAdapter adapter = new MarshallerAdapter(marshaller);
@@ -183,7 +203,6 @@
}
private void buildChannel() {
- // TODO: Check for injected channels and for channel factories.
// in order of preference - we first look for an external JGroups file, then a set of XML properties, and
// finally the legacy JGroups String properties.
String cfg;
Modified: trunk/core/src/test/java/org/infinispan/container/FIFODataContainerTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/container/FIFODataContainerTest.java 2009-09-29 12:49:01 UTC (rev 882)
+++ trunk/core/src/test/java/org/infinispan/container/FIFODataContainerTest.java 2009-09-29 14:38:05 UTC (rev 883)
@@ -15,7 +15,7 @@
@Override
protected DataContainer createContainer() {
- return new FIFODataContainer();
+ return new FIFODataContainer(16);
}
public void testOrdering() {
Modified: trunk/core/src/test/java/org/infinispan/container/LRUDataContainerTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/container/LRUDataContainerTest.java 2009-09-29 12:49:01 UTC (rev 882)
+++ trunk/core/src/test/java/org/infinispan/container/LRUDataContainerTest.java 2009-09-29 14:38:05 UTC (rev 883)
@@ -11,7 +11,7 @@
public class LRUDataContainerTest extends FIFODataContainerTest {
@Override
protected DataContainer createContainer() {
- return new LRUDataContainer();
+ return new LRUDataContainer(16);
}
@Override
Modified: trunk/core/src/test/java/org/infinispan/container/SimpleDataContainerTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/container/SimpleDataContainerTest.java 2009-09-29 12:49:01 UTC (rev 882)
+++ trunk/core/src/test/java/org/infinispan/container/SimpleDataContainerTest.java 2009-09-29 14:38:05 UTC (rev 883)
@@ -29,7 +29,7 @@
}
protected DataContainer createContainer() {
- return new SimpleDataContainer();
+ return new SimpleDataContainer(16);
}
public void testExpiredData() throws InterruptedException {
More information about the infinispan-commits
mailing list