[infinispan-commits] Infinispan SVN: r883 - in trunk: core/src/main/java/org/infinispan/container and 6 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Tue Sep 29 10:38:05 EDT 2009


Author: manik.surtani at jboss.com
Date: 2009-09-29 10:38:05 -0400 (Tue, 29 Sep 2009)
New Revision: 883

Modified:
   trunk/cachestore/bdbje/src/main/java/org/infinispan/loaders/bdbje/BdbjeCacheStore.java
   trunk/core/src/main/java/org/infinispan/container/FIFODataContainer.java
   trunk/core/src/main/java/org/infinispan/container/LRUDataContainer.java
   trunk/core/src/main/java/org/infinispan/container/SimpleDataContainer.java
   trunk/core/src/main/java/org/infinispan/factories/DataContainerFactory.java
   trunk/core/src/main/java/org/infinispan/interceptors/CacheStoreInterceptor.java
   trunk/core/src/main/java/org/infinispan/loaders/AbstractCacheStore.java
   trunk/core/src/main/java/org/infinispan/loaders/decorators/AsyncStore.java
   trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java
   trunk/core/src/test/java/org/infinispan/container/FIFODataContainerTest.java
   trunk/core/src/test/java/org/infinispan/container/LRUDataContainerTest.java
   trunk/core/src/test/java/org/infinispan/container/SimpleDataContainerTest.java
Log:
[ISPN-198] (Ability to tune data container concurrency levels)

Modified: trunk/cachestore/bdbje/src/main/java/org/infinispan/loaders/bdbje/BdbjeCacheStore.java
===================================================================
--- trunk/cachestore/bdbje/src/main/java/org/infinispan/loaders/bdbje/BdbjeCacheStore.java	2009-09-29 12:49:01 UTC (rev 882)
+++ trunk/cachestore/bdbje/src/main/java/org/infinispan/loaders/bdbje/BdbjeCacheStore.java	2009-09-29 14:38:05 UTC (rev 883)
@@ -39,7 +39,7 @@
  * <p/>
  * All data access is transactional.  Any attempted reads to locked records will block.  The maximum duration of this is
  * set in nanoseconds via the parameter {@link org.infinispan.loaders.bdbje.BdbjeCacheStoreConfig#getLockAcquistionTimeout()}.
- * Calls to {@link org.infinispan.loaders.CacheStore#prepare(java.util.List prepare} will attempt
+ * Calls to {@link org.infinispan.loaders.CacheStore#prepare(java.util.List, org.infinispan.transaction.xa.GlobalTransaction, boolean)}  will attempt
  * to resolve deadlocks, retrying up to {@link org.infinispan.loaders.bdbje.BdbjeCacheStoreConfig#getMaxTxRetries()}
  * attempts.
  * <p/>
@@ -117,7 +117,7 @@
     }
 
     private void openTransactionServices() {
-        txnMap = new ConcurrentHashMap<GlobalTransaction, Transaction>();
+        txnMap = new ConcurrentHashMap<GlobalTransaction, Transaction>(64, 0.75f, cache.getConfiguration().getConcurrencyLevel());
         currentTransaction = factory.createCurrentTransaction(env);
         transactionRunner = factory.createPreparableTransactionRunner(env);
     }
@@ -225,7 +225,7 @@
 
     /**
      * {@inheritDoc} delegates to {@link BdbjeCacheStore#applyModifications(java.util.List)}, if <code>isOnePhase</code>.
-     * Otherwise, delegates to {@link BdbjeCacheStore#prepare(java.util.List, javax.transaction.Transaction) prepare}.
+     * Otherwise, delegates to {@link BdbjeCacheStore#prepare(java.util.List, org.infinispan.transaction.xa.GlobalTransaction)} 
      */
     public void prepare(List<? extends Modification> mods, GlobalTransaction tx, boolean isOnePhase) throws CacheLoaderException {
         if (isOnePhase) {
@@ -280,7 +280,7 @@
     /**
      * {@inheritDoc}
      * <p/>
-     * This implementation calls {@link BdbjeCacheStore#completeTransaction(javax.transaction.Transaction, boolean)
+     * This implementation calls {@link BdbjeCacheStore#completeTransaction(org.infinispan.transaction.xa.GlobalTransaction, boolean)}
      * completeTransaction} with an argument of false.
      */
     public void rollback(GlobalTransaction tx) {
@@ -294,7 +294,7 @@
     /**
      * {@inheritDoc}
      * <p/>
-     * This implementation calls {@link BdbjeCacheStore#completeTransaction(javax.transaction.Transaction, boolean)
+     * This implementation calls {@link BdbjeCacheStore#completeTransaction(org.infinispan.transaction.xa.GlobalTransaction, boolean)}
      * completeTransaction} with an argument of true.
      */
     public void commit(GlobalTransaction tx) throws CacheLoaderException {

Modified: trunk/core/src/main/java/org/infinispan/container/FIFODataContainer.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/container/FIFODataContainer.java	2009-09-29 12:49:01 UTC (rev 882)
+++ trunk/core/src/main/java/org/infinispan/container/FIFODataContainer.java	2009-09-29 14:38:05 UTC (rev 883)
@@ -61,10 +61,9 @@
 
    final LinkedEntry head = new LinkedEntry(null), tail = new LinkedEntry(null);
 
-   public FIFODataContainer() {
+   public FIFODataContainer(int concurrencyLevel) {
       float loadFactor = 0.75f;
-      int initialCapacity = 16;
-      int concurrencyLevel = 16;
+      int initialCapacity = 256;
 
       // Find power-of-two sizes best matching arguments
       int sshift = 0;

Modified: trunk/core/src/main/java/org/infinispan/container/LRUDataContainer.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/container/LRUDataContainer.java	2009-09-29 12:49:01 UTC (rev 882)
+++ trunk/core/src/main/java/org/infinispan/container/LRUDataContainer.java	2009-09-29 14:38:05 UTC (rev 883)
@@ -16,6 +16,10 @@
  */
 public class LRUDataContainer extends FIFODataContainer {
 
+   public LRUDataContainer(int concurrencyLevel) {
+      super(concurrencyLevel);
+   }
+
    @Override
    public InternalCacheEntry get(Object k) {
       int h = hash(k.hashCode());

Modified: trunk/core/src/main/java/org/infinispan/container/SimpleDataContainer.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/container/SimpleDataContainer.java	2009-09-29 12:49:01 UTC (rev 882)
+++ trunk/core/src/main/java/org/infinispan/container/SimpleDataContainer.java	2009-09-29 14:38:05 UTC (rev 883)
@@ -30,10 +30,15 @@
  */
 @ThreadSafe
 public class SimpleDataContainer implements DataContainer {
-   final ConcurrentMap<Object, InternalCacheEntry> immortalEntries = new ConcurrentHashMap<Object, InternalCacheEntry>();
-   final ConcurrentMap<Object, InternalCacheEntry> mortalEntries = new ConcurrentHashMap<Object, InternalCacheEntry>();
+   final ConcurrentMap<Object, InternalCacheEntry> immortalEntries;
+   final ConcurrentMap<Object, InternalCacheEntry> mortalEntries;
    final AtomicInteger numEntries = new AtomicInteger(0);
 
+   public SimpleDataContainer(int concurrencyLevel) {      
+      immortalEntries = new ConcurrentHashMap<Object, InternalCacheEntry>(128, 0.75f, concurrencyLevel);
+      mortalEntries = new ConcurrentHashMap<Object, InternalCacheEntry>(64, 0.75f, concurrencyLevel);
+   }
+
    /**
     * Like a get, but doesn't check for expired entries
     *

Modified: trunk/core/src/main/java/org/infinispan/factories/DataContainerFactory.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/factories/DataContainerFactory.java	2009-09-29 12:49:01 UTC (rev 882)
+++ trunk/core/src/main/java/org/infinispan/factories/DataContainerFactory.java	2009-09-29 14:38:05 UTC (rev 883)
@@ -41,11 +41,11 @@
    public <T> T construct(Class<T> componentType) {
       switch (configuration.getEvictionStrategy()) {
          case NONE:
-            return (T) new SimpleDataContainer();
+            return (T) new SimpleDataContainer(configuration.getConcurrencyLevel());
          case FIFO:
-            return (T) new FIFODataContainer();
+            return (T) new FIFODataContainer(configuration.getConcurrencyLevel());
          case LRU:
-            return (T) new LRUDataContainer();
+            return (T) new LRUDataContainer(configuration.getConcurrencyLevel());
          default:
             throw new ConfigurationException("Unknown eviction strategy " + configuration.getEvictionStrategy());
       }

Modified: trunk/core/src/main/java/org/infinispan/interceptors/CacheStoreInterceptor.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/interceptors/CacheStoreInterceptor.java	2009-09-29 12:49:01 UTC (rev 882)
+++ trunk/core/src/main/java/org/infinispan/interceptors/CacheStoreInterceptor.java	2009-09-29 14:38:05 UTC (rev 883)
@@ -72,8 +72,8 @@
 @MBean(objectName = "CacheStore", description = "Component that handles storing of entries to a CacheStore from memory.")
 public class CacheStoreInterceptor extends JmxStatsCommandInterceptor {
    CacheLoaderManagerConfig loaderConfig = null;
-   private Map<GlobalTransaction, Integer> txStores = new ConcurrentHashMap<GlobalTransaction, Integer>();
-   private Map<GlobalTransaction, Set<Object>> preparingTxs = new ConcurrentHashMap<GlobalTransaction, Set<Object>>();
+   private Map<GlobalTransaction, Integer> txStores;
+   private Map<GlobalTransaction, Set<Object>> preparingTxs;
    final AtomicLong cacheStores = new AtomicLong(0);
    CacheStore store;
    private CacheLoaderManager loaderManager;
@@ -93,6 +93,8 @@
       store = loaderManager.getCacheStore();
       this.setStatisticsEnabled(configuration.isExposeJmxStatistics());
       loaderConfig = configuration.getCacheLoaderManagerConfig();
+      txStores = new ConcurrentHashMap<GlobalTransaction, Integer>(64, 0.75f, configuration.getConcurrencyLevel());
+      preparingTxs = new ConcurrentHashMap<GlobalTransaction, Set<Object>>(64, 0.75f, configuration.getConcurrencyLevel());
    }
 
    /**

Modified: trunk/core/src/main/java/org/infinispan/loaders/AbstractCacheStore.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/loaders/AbstractCacheStore.java	2009-09-29 12:49:01 UTC (rev 882)
+++ trunk/core/src/main/java/org/infinispan/loaders/AbstractCacheStore.java	2009-09-29 14:38:05 UTC (rev 883)
@@ -1,11 +1,11 @@
 package org.infinispan.loaders;
 
 import org.infinispan.Cache;
-import org.infinispan.transaction.xa.GlobalTransaction;
 import org.infinispan.loaders.modifications.Modification;
 import org.infinispan.loaders.modifications.Remove;
 import org.infinispan.loaders.modifications.Store;
 import org.infinispan.marshall.Marshaller;
+import org.infinispan.transaction.xa.GlobalTransaction;
 import org.infinispan.util.concurrent.WithinThreadExecutor;
 import org.infinispan.util.logging.Log;
 import org.infinispan.util.logging.LogFactory;
@@ -15,9 +15,9 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * An abstract {@link org.infinispan.loaders.CacheStore} that holds common implementations for some methods
@@ -27,20 +27,17 @@
  */
 public abstract class AbstractCacheStore extends AbstractCacheLoader implements CacheStore {
 
-   private final Map<GlobalTransaction, List<? extends Modification>> transactions = new ConcurrentHashMap<GlobalTransaction, List<? extends Modification>>();
-
+   private Map<GlobalTransaction, List<? extends Modification>> transactions;
    private static Log log = LogFactory.getLog(AbstractCacheStore.class);
-
    private AbstractCacheStoreConfig config;
-
    private ExecutorService purgerService;
-
    protected Marshaller marshaller;
 
    public void init(CacheLoaderConfig config, Cache cache, Marshaller m) throws CacheLoaderException{
       this.config = (AbstractCacheStoreConfig) config;
       this.marshaller = m;
       if (config == null) throw new IllegalStateException("Null config!!!");
+      transactions = new ConcurrentHashMap<GlobalTransaction, List<? extends Modification>>(64, 0.75f, cache.getConfiguration().getConcurrencyLevel());
    }
 
    public void start() throws CacheLoaderException {

Modified: trunk/core/src/main/java/org/infinispan/loaders/decorators/AsyncStore.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/loaders/decorators/AsyncStore.java	2009-09-29 12:49:01 UTC (rev 882)
+++ trunk/core/src/main/java/org/infinispan/loaders/decorators/AsyncStore.java	2009-09-29 14:38:05 UTC (rev 883)
@@ -3,9 +3,12 @@
 import net.jcip.annotations.GuardedBy;
 
 import org.infinispan.CacheException;
+import org.infinispan.Cache;
+import org.infinispan.marshall.Marshaller;
 import org.infinispan.container.entries.InternalCacheEntry;
 import org.infinispan.loaders.CacheLoaderException;
 import org.infinispan.loaders.CacheStore;
+import org.infinispan.loaders.CacheLoaderConfig;
 import org.infinispan.loaders.modifications.Clear;
 import org.infinispan.loaders.modifications.Modification;
 import org.infinispan.loaders.modifications.PurgeExpired;
@@ -72,12 +75,19 @@
    private final ReadWriteLock mapLock = new ReentrantReadWriteLock();
    private final Lock read = mapLock.readLock();
    private final Lock write = mapLock.writeLock();
+   private int concurrencyLevel;
    @GuardedBy("mapLock") private ConcurrentMap<Object, Modification> state;
    
    public AsyncStore(CacheStore delegate, AsyncStoreConfig asyncStoreConfig) {
       super(delegate);
       this.asyncStoreConfig = asyncStoreConfig;
    }
+
+   @Override
+   public void init(CacheLoaderConfig config, Cache cache, Marshaller m) throws CacheLoaderException {
+      super.init(config, cache, m);
+      concurrencyLevel = cache.getConfiguration().getConcurrencyLevel();
+   }
    
    public void store(InternalCacheEntry ed) {
       enqueue(ed.getKey(), new Store(ed));
@@ -100,7 +110,7 @@
    
    @Override
    public void start() throws CacheLoaderException {
-      state = new ConcurrentHashMap<Object, Modification>();
+      state = newStateMap();
       log.info("Async cache loader starting {0}", this);
       stopped.set(false);
       super.start();
@@ -233,7 +243,7 @@
     * Processes modifications taking the latest updates from a state map.
     */
    class AsyncProcessor implements Runnable {
-      private ConcurrentMap<Object, Modification> swap = new ConcurrentHashMap<Object, Modification>();
+      private ConcurrentMap<Object, Modification> swap = newStateMap();
       
       public void run() {
          while (!Thread.interrupted()) {
@@ -262,7 +272,7 @@
             acquireLock(write);
             unlock = true;
             swap = state;
-            state = new ConcurrentHashMap<Object, Modification>();
+            state = newStateMap();
          } finally {
             if (unlock) write.unlock();
          }
@@ -286,4 +296,8 @@
          }
       }
    }
+
+   private ConcurrentMap<Object, Modification> newStateMap() {
+      return new ConcurrentHashMap<Object, Modification>(64, 0.75f, concurrencyLevel);
+   }
 }

Modified: trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java	2009-09-29 12:49:01 UTC (rev 882)
+++ trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java	2009-09-29 14:38:05 UTC (rev 883)
@@ -83,6 +83,7 @@
    private static final String DEFAULT_JGROUPS_CONFIGURATION_FILE = "flush-udp.xml";
 
    Channel channel;
+   boolean createdChannel = false;
    Address address;
    volatile List<Address> members = Collections.emptyList();
    volatile boolean coordinator = false;
@@ -100,6 +101,20 @@
    private final JGroupsDistSync flushTracker = new JGroupsDistSync();
    long distributedSyncTimeout;
 
+   /**
+    * This form is used when the transport is created by an external source and passed in to the GlobalConfiguration.
+    *
+    * @param channel created and running channel to use
+    */
+   public JGroupsTransport(Channel channel) {
+      this.channel = channel;
+      if (channel == null) throw new IllegalArgumentException("Cannot deal with a null channel!");
+      if (channel.isConnected()) throw new IllegalArgumentException("Channel passed in cannot already be connected!");
+   }
+
+   public JGroupsTransport() {
+   }
+
    // ------------------------------------------------------------------------------------------------------------------
    // Lifecycle and setup stuff
    // ------------------------------------------------------------------------------------------------------------------
@@ -163,18 +178,23 @@
    }
 
    private void initChannelAndRPCDispatcher() throws CacheException {
-      buildChannel();
-      // Channel.LOCAL *must* be set to false so we don't see our own messages - otherwise invalidations targeted at
-      // remote instances will be received by self.
-      String transportNodeName = c.getTransportNodeName();
-      if(transportNodeName != null && transportNodeName.length()>0) {
-         long range = Short.MAX_VALUE *2;
-         long randomInRange = (long)((Math.random() * range) % range) + 1;         
-         transportNodeName = transportNodeName + "-" + randomInRange;
-         channel.setName(transportNodeName);
-      }     
+      if (channel == null) {
+         createdChannel = true;
+         buildChannel();
+         // Channel.LOCAL *must* be set to false so we don't see our own messages - otherwise invalidations targeted at
+         // remote instances will be received by self.
+         String transportNodeName = c.getTransportNodeName();
+         if (transportNodeName != null && transportNodeName.length() > 0) {
+            long range = Short.MAX_VALUE * 2;
+            long randomInRange = (long) ((Math.random() * range) % range) + 1;
+            transportNodeName = transportNodeName + "-" + randomInRange;
+            channel.setName(transportNodeName);
+         }
+      }
+
       channel.setOpt(Channel.LOCAL, false);
       channel.setOpt(Channel.BLOCK, true);
+
       dispatcher = new CommandAwareRpcDispatcher(channel, this,
                                                  asyncExecutor, inboundInvocationHandler, flushTracker, distributedSyncTimeout);
       MarshallerAdapter adapter = new MarshallerAdapter(marshaller);
@@ -183,7 +203,6 @@
    }
 
    private void buildChannel() {
-      // TODO: Check for injected channels and for channel factories.
       // in order of preference - we first look for an external JGroups file, then a set of XML properties, and
       // finally the legacy JGroups String properties.
       String cfg;

Modified: trunk/core/src/test/java/org/infinispan/container/FIFODataContainerTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/container/FIFODataContainerTest.java	2009-09-29 12:49:01 UTC (rev 882)
+++ trunk/core/src/test/java/org/infinispan/container/FIFODataContainerTest.java	2009-09-29 14:38:05 UTC (rev 883)
@@ -15,7 +15,7 @@
 
    @Override
    protected DataContainer createContainer() {
-      return new FIFODataContainer();
+      return new FIFODataContainer(16);
    }
 
    public void testOrdering() {

Modified: trunk/core/src/test/java/org/infinispan/container/LRUDataContainerTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/container/LRUDataContainerTest.java	2009-09-29 12:49:01 UTC (rev 882)
+++ trunk/core/src/test/java/org/infinispan/container/LRUDataContainerTest.java	2009-09-29 14:38:05 UTC (rev 883)
@@ -11,7 +11,7 @@
 public class LRUDataContainerTest extends FIFODataContainerTest {
    @Override
    protected DataContainer createContainer() {
-      return new LRUDataContainer();
+      return new LRUDataContainer(16);
    }
 
    @Override

Modified: trunk/core/src/test/java/org/infinispan/container/SimpleDataContainerTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/container/SimpleDataContainerTest.java	2009-09-29 12:49:01 UTC (rev 882)
+++ trunk/core/src/test/java/org/infinispan/container/SimpleDataContainerTest.java	2009-09-29 14:38:05 UTC (rev 883)
@@ -29,7 +29,7 @@
    }
 
    protected DataContainer createContainer() {
-      return new SimpleDataContainer();
+      return new SimpleDataContainer(16);
    }
 
    public void testExpiredData() throws InterruptedException {



More information about the infinispan-commits mailing list