[jbosscache-commits] JBoss Cache SVN: r6540 - in core/trunk/src: main/java/org/jboss/cache/batch and 12 other directories.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Thu Aug 7 12:45:40 EDT 2008


Author: manik.surtani at jboss.com
Date: 2008-08-07 12:45:39 -0400 (Thu, 07 Aug 2008)
New Revision: 6540

Added:
   core/trunk/src/main/java/org/jboss/cache/batch/
   core/trunk/src/main/java/org/jboss/cache/batch/BatchContainer.java
   core/trunk/src/main/java/org/jboss/cache/interceptors/BatchingInterceptor.java
   core/trunk/src/test/java/org/jboss/cache/api/batch/
   core/trunk/src/test/java/org/jboss/cache/api/batch/AbstractBatchTest.java
   core/trunk/src/test/java/org/jboss/cache/api/batch/BatchWithTM.java
   core/trunk/src/test/java/org/jboss/cache/api/batch/BatchWithoutTM.java
Modified:
   core/trunk/src/main/java/org/jboss/cache/Cache.java
   core/trunk/src/main/java/org/jboss/cache/CacheSPI.java
   core/trunk/src/main/java/org/jboss/cache/config/Configuration.java
   core/trunk/src/main/java/org/jboss/cache/config/parsing/XmlConfigurationParser.java
   core/trunk/src/main/java/org/jboss/cache/factories/EmptyConstructorFactory.java
   core/trunk/src/main/java/org/jboss/cache/factories/InterceptorChainFactory.java
   core/trunk/src/main/java/org/jboss/cache/factories/TransactionManagerFactory.java
   core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticLockingInterceptor.java
   core/trunk/src/main/java/org/jboss/cache/invocation/CacheInvocationDelegate.java
   core/trunk/src/main/java/org/jboss/cache/transaction/BatchModeTransactionManagerLookup.java
   core/trunk/src/main/resources/jbosscache-config-3.0.xsd
   core/trunk/src/test/java/org/jboss/cache/config/parsing/XmlConfigurationParserTest.java
   core/trunk/src/test/java/org/jboss/cache/factories/InterceptorChainFactoryTest.java
   core/trunk/src/test/resources/configs/parser-test.xml
Log:
JBCACHE-991 - Batching API

Modified: core/trunk/src/main/java/org/jboss/cache/Cache.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/Cache.java	2008-08-07 16:04:06 UTC (rev 6539)
+++ core/trunk/src/main/java/org/jboss/cache/Cache.java	2008-08-07 16:45:39 UTC (rev 6540)
@@ -495,4 +495,30 @@
     */
    void clearData(Fqn fqn);
 
+   /**
+    * Starts a batch.  This is a lightweight batching mechanism that groups cache writes together and finally performs the
+    * write, persistence and/or replication when {@link #endBatch(boolean)} is called rather than for each invocation on the
+    * cache.
+    * <p/>
+    * Note that if there is an existing transaction in scope and the cache has been configured to use a JTA compliant
+    * transaction manager, calls to {@link #startBatch()} and {@link #endBatch(boolean)} are ignored and treated as no-ops.
+    * <p/>
+    *
+    * @see #endBatch(boolean)
+    * @since 3.0
+    */
+   void startBatch();
+
+   /**
+    * Ends an existing ongoing batch.  A no-op if a batch has not been started yet.
+    * <p/>
+    * Note that if there is an existing transaction in scope and the cache has been configured to use a JTA compliant
+    * transaction manager, calls to {@link #startBatch()} and {@link #endBatch(boolean)} are ignored and treated as no-ops.
+    * <p/>
+    *
+    * @param successful if <tt>true</tt>, changes made in the batch are committed.  If <tt>false</tt>, they are discarded.
+    * @see #startBatch()
+    * @since 3.0
+    */
+   void endBatch(boolean successful);
 }

Modified: core/trunk/src/main/java/org/jboss/cache/CacheSPI.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/CacheSPI.java	2008-08-07 16:04:06 UTC (rev 6539)
+++ core/trunk/src/main/java/org/jboss/cache/CacheSPI.java	2008-08-07 16:45:39 UTC (rev 6540)
@@ -39,7 +39,6 @@
  * @see NodeSPI
  * @see Cache
  * @see org.jboss.cache.loader.CacheLoader
- * @see org.jboss.cache.interceptors.ChainedInterceptor
  * @since 2.0.0
  */
 @ThreadSafe

Added: core/trunk/src/main/java/org/jboss/cache/batch/BatchContainer.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/batch/BatchContainer.java	                        (rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/batch/BatchContainer.java	2008-08-07 16:45:39 UTC (rev 6540)
@@ -0,0 +1,80 @@
+package org.jboss.cache.batch;
+
+import org.jboss.cache.CacheException;
+import org.jboss.cache.factories.annotations.Inject;
+
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+
+/**
+ * A container for holding thread locals for batching, to be used with the {@link org.jboss.cache.Cache#startBatch()} and
+ * {@link org.jboss.cache.Cache#endBatch(boolean)} calls.
+ *
+ * @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
+ * @since 3.0
+ */
+public class BatchContainer
+{
+   TransactionManager transactionManager;
+   private ThreadLocal<Transaction> batchTransactionContainer = new ThreadLocal<Transaction>();
+
+   @Inject
+   void inject(TransactionManager transactionManager)
+   {
+      this.transactionManager = transactionManager;
+   }
+
+   public void startBatch() throws CacheException
+   {
+      try
+      {
+         if (transactionManager.getTransaction() != null) return;
+         if (batchTransactionContainer.get() == null)
+         {
+            transactionManager.begin();
+            batchTransactionContainer.set(transactionManager.suspend());
+         }
+      }
+      catch (Exception e)
+      {
+         throw new CacheException("Unable to start batch", e);
+      }
+   }
+
+   public void endBatch(boolean success)
+   {
+      Transaction tx = batchTransactionContainer.get();
+      if (tx == null) return;
+      Transaction existingTx = null;
+      try
+      {
+         existingTx = transactionManager.getTransaction();
+         transactionManager.resume(tx);
+         if (success)
+            tx.commit();
+         else
+            tx.rollback();
+      }
+      catch (Exception e)
+      {
+         throw new CacheException("Unable to end batch", e);
+      }
+      finally
+      {
+         batchTransactionContainer.remove();
+         try
+         {
+            if (existingTx != null) transactionManager.resume(existingTx);
+         }
+         catch (Exception e)
+         {
+            throw new CacheException("Failed resuming existing transaction " + existingTx, e);
+         }
+      }
+   }
+
+   public Transaction getBatchTransaction()
+   {
+      return batchTransactionContainer.get();
+   }
+}

Modified: core/trunk/src/main/java/org/jboss/cache/config/Configuration.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/config/Configuration.java	2008-08-07 16:04:06 UTC (rev 6539)
+++ core/trunk/src/main/java/org/jboss/cache/config/Configuration.java	2008-08-07 16:45:39 UTC (rev 6540)
@@ -32,6 +32,7 @@
    private Marshaller marshaller;
 
    private JGroupsStackParser jGroupsStackParser = new JGroupsStackParser();
+   private boolean invocationBatchingEnabled;
 
    /**
     * Behavior of the JVM shutdown hook registered by the cache
@@ -299,6 +300,19 @@
       this.exposeManagementStatistics = useMbean;
    }
 
+   /**
+    * Enables invocation batching if set to <tt>true</tt>.  You still need to use {@link org.jboss.cache.Cache#startBatch()}
+    * and {@link org.jboss.cache.Cache#endBatch(boolean)} to demarcate the start and end of batches.
+    *
+    * @param enabled if true, batching is enabled.
+    * @since 3.0
+    */
+   public void setInvocationBatchingEnabled(boolean enabled)
+   {
+      testImmutability("invocationBatchingEnabled");
+      this.invocationBatchingEnabled = enabled;
+   }
+
    public void setFetchInMemoryState(boolean fetchInMemoryState)
    {
       testImmutability("fetchInMemoryState");
@@ -652,6 +666,15 @@
       return exposeManagementStatistics;
    }
 
+   /**
+    * @return true if invocation batching is enabled.
+    * @since 3.0
+    */
+   public boolean isInvocationBatchingEnabled()
+   {
+      return invocationBatchingEnabled;
+   }
+
    public boolean isFetchInMemoryState()
    {
       return fetchInMemoryState;

Modified: core/trunk/src/main/java/org/jboss/cache/config/parsing/XmlConfigurationParser.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/config/parsing/XmlConfigurationParser.java	2008-08-07 16:04:06 UTC (rev 6539)
+++ core/trunk/src/main/java/org/jboss/cache/config/parsing/XmlConfigurationParser.java	2008-08-07 16:45:39 UTC (rev 6540)
@@ -52,8 +52,8 @@
  * <p/>
  * Following system properties can be used for customizing parser behavior:
  * <ul>
- *   <li> <b>-Djbosscache.config.validate=false</b> will make the parser non-validating </li>
- *   <li> <b>-Djbosscache.config.schemaLocation=url</b> allows one to specify a validation schema that would override the one specified in the the xml document </li>
+ * <li> <b>-Djbosscache.config.validate=false</b> will make the parser non-validating </li>
+ * <li> <b>-Djbosscache.config.schemaLocation=url</b> allows one to specify a validation schema that would override the one specified in the the xml document </li>
  * </ul>
  * This class is stateful and one instance should be used for parsing a single configuration file.
  *
@@ -173,6 +173,7 @@
          configureCacheLoaders(getSingleElement("loaders"));
          configureCustomInterceptors(getSingleElement("customInterceptors"));
          configureListeners(getSingleElement("listeners"));
+         configureInvocationBatching(getSingleElement("invocationBatching"));
       }
       catch (Exception e)
       {
@@ -248,6 +249,13 @@
       }
    }
 
+   private void configureInvocationBatching(Element element)
+   {
+      if (element == null) return; //this element is optional
+      boolean enabled = getBoolean(getAttributeValue(element, "enabled"));
+      config.setInvocationBatchingEnabled(enabled);
+   }
+
    private void configureBuddyReplication(Element element)
    {
       if (element == null) return;//buddy config might not exist, expect that

Modified: core/trunk/src/main/java/org/jboss/cache/factories/EmptyConstructorFactory.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/factories/EmptyConstructorFactory.java	2008-08-07 16:04:06 UTC (rev 6539)
+++ core/trunk/src/main/java/org/jboss/cache/factories/EmptyConstructorFactory.java	2008-08-07 16:45:39 UTC (rev 6540)
@@ -2,6 +2,7 @@
 
 import org.jboss.cache.DataContainer;
 import org.jboss.cache.RegionRegistry;
+import org.jboss.cache.batch.BatchContainer;
 import org.jboss.cache.buddyreplication.BuddyFqnTransformer;
 import org.jboss.cache.config.ConfigurationException;
 import org.jboss.cache.factories.annotations.DefaultFactoryFor;
@@ -25,7 +26,7 @@
 @DefaultFactoryFor(classes = {Notifier.class, MVCCNodeHelper.class, RegionRegistry.class,
       ChannelMessageListener.class, CacheLoaderManager.class, Marshaller.class, InvocationContextContainer.class,
       CacheInvocationDelegate.class, TransactionTable.class, DataContainer.class,
-      LockStrategyFactory.class, BuddyFqnTransformer.class})
+      LockStrategyFactory.class, BuddyFqnTransformer.class, BatchContainer.class})
 public class EmptyConstructorFactory extends ComponentFactory
 {
    @Override

Modified: core/trunk/src/main/java/org/jboss/cache/factories/InterceptorChainFactory.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/factories/InterceptorChainFactory.java	2008-08-07 16:04:06 UTC (rev 6539)
+++ core/trunk/src/main/java/org/jboss/cache/factories/InterceptorChainFactory.java	2008-08-07 16:45:39 UTC (rev 6540)
@@ -56,13 +56,18 @@
    public InterceptorChain buildInterceptorChain() throws IllegalAccessException, InstantiationException, ClassNotFoundException
    {
       boolean optimistic = configuration.getNodeLockingScheme() == NodeLockingScheme.OPTIMISTIC;
+      boolean invocationBatching = configuration.isInvocationBatchingEnabled();
       // load the icInterceptor first
-      CommandInterceptor first = createInterceptor(InvocationContextInterceptor.class);
+      CommandInterceptor first = invocationBatching ? createInterceptor(BatchingInterceptor.class) : createInterceptor(InvocationContextInterceptor.class);
       InterceptorChain interceptorChain = new InterceptorChain(first);
 
       // add the interceptor chain to the registry first, since some interceptors may ask for it.
       componentRegistry.registerComponent(interceptorChain, InterceptorChain.class);
 
+      // NOW add the ICI if we are using batching!
+      if (invocationBatching)
+         interceptorChain.appendIntereceptor(createInterceptor(InvocationContextInterceptor.class));
+
       // load the cache management interceptor next
       if (configuration.getExposeManagementStatistics())
          interceptorChain.appendIntereceptor(createInterceptor(CacheMgmtInterceptor.class));

Modified: core/trunk/src/main/java/org/jboss/cache/factories/TransactionManagerFactory.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/factories/TransactionManagerFactory.java	2008-08-07 16:04:06 UTC (rev 6539)
+++ core/trunk/src/main/java/org/jboss/cache/factories/TransactionManagerFactory.java	2008-08-07 16:45:39 UTC (rev 6540)
@@ -2,6 +2,7 @@
 
 import org.jboss.cache.config.ConfigurationException;
 import org.jboss.cache.factories.annotations.DefaultFactoryFor;
+import org.jboss.cache.transaction.BatchModeTransactionManager;
 import org.jboss.cache.transaction.TransactionManagerLookup;
 
 import javax.transaction.TransactionManager;
@@ -52,6 +53,12 @@
             log.info("failed looking up TransactionManager, will not use transactions", e);
          }
       }
+
+      if (transactionManager == null && configuration.isInvocationBatchingEnabled())
+      {
+         log.info("Using a batchMode transaction manager");
+         transactionManager = BatchModeTransactionManager.getInstance();
+      }
       return componentType.cast(transactionManager);
    }
 }

Added: core/trunk/src/main/java/org/jboss/cache/interceptors/BatchingInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/BatchingInterceptor.java	                        (rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/BatchingInterceptor.java	2008-08-07 16:45:39 UTC (rev 6540)
@@ -0,0 +1,58 @@
+package org.jboss.cache.interceptors;
+
+import org.jboss.cache.batch.BatchContainer;
+import org.jboss.cache.commands.VisitableCommand;
+import org.jboss.cache.factories.annotations.Inject;
+import org.jboss.cache.interceptors.base.CommandInterceptor;
+import org.jboss.cache.invocation.InvocationContext;
+
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+
+/**
+ * Interceptor that captures batched calls and attaches contexts.
+ *
+ * @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
+ * @since 3.0
+ */
+public class BatchingInterceptor extends CommandInterceptor
+{
+   BatchContainer batchContainer;
+   TransactionManager transactionManager;
+
+   @Inject
+   private void inject(BatchContainer batchContainer, TransactionManager transactionManager)
+   {
+      this.batchContainer = batchContainer;
+      this.transactionManager = transactionManager;
+   }
+
+   /**
+    * Simply check if there is an ongoing tx.
+    * <ul>
+    * <li>If there is one, this is a no-op and just passes the call up the chain.</li>
+    * <li>If there isn't one and there is a batch in progress, resume the batch's tx, pass up, and finally suspend the batch's tx.</li>
+    * <li>If there is no batch in progress, just pass the call up the chain.</li>
+    * </ul>
+    */
+   @Override
+   protected Object handleDefault(InvocationContext ctx, VisitableCommand command) throws Throwable
+   {
+      Transaction tx = null;
+      try
+      {
+         // if in a batch, attach tx
+         if (transactionManager.getTransaction() == null &&
+               (tx = batchContainer.getBatchTransaction()) != null)
+         {
+            transactionManager.resume(tx);
+         }
+         return super.handleDefault(ctx, command);
+      }
+      finally
+      {
+         if (tx != null && transactionManager.getTransaction() != null)
+            transactionManager.suspend();
+      }
+   }
+}

Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticLockingInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticLockingInterceptor.java	2008-08-07 16:04:06 UTC (rev 6539)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticLockingInterceptor.java	2008-08-07 16:45:39 UTC (rev 6540)
@@ -18,6 +18,7 @@
 import static org.jboss.cache.lock.LockType.WRITE;
 import org.jboss.cache.optimistic.TransactionWorkspace;
 import org.jboss.cache.optimistic.WorkspaceNode;
+import org.jboss.cache.transaction.BatchModeTransactionManager;
 import org.jboss.cache.transaction.GlobalTransaction;
 import org.jboss.cache.transaction.TransactionContext;
 
@@ -35,7 +36,7 @@
    @Start
    private void init()
    {
-      if (txManager == null)
+      if (txManager == null || txManager.getClass().equals(BatchModeTransactionManager.class))
          log.fatal("No transaction manager lookup class has been defined. Transactions cannot be used and thus OPTIMISTIC locking cannot be used!  Expect errors!!");
    }
 

Modified: core/trunk/src/main/java/org/jboss/cache/invocation/CacheInvocationDelegate.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/invocation/CacheInvocationDelegate.java	2008-08-07 16:04:06 UTC (rev 6539)
+++ core/trunk/src/main/java/org/jboss/cache/invocation/CacheInvocationDelegate.java	2008-08-07 16:45:39 UTC (rev 6540)
@@ -1,13 +1,5 @@
 package org.jboss.cache.invocation;
 
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import javax.transaction.Transaction;
-import javax.transaction.TransactionManager;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.jboss.cache.CacheException;
@@ -21,6 +13,7 @@
 import org.jboss.cache.Region;
 import org.jboss.cache.RegionManager;
 import org.jboss.cache.Version;
+import org.jboss.cache.batch.BatchContainer;
 import org.jboss.cache.buddyreplication.BuddyManager;
 import org.jboss.cache.buddyreplication.GravitateResult;
 import org.jboss.cache.commands.CommandsFactory;
@@ -39,8 +32,9 @@
 import org.jboss.cache.commands.write.RemoveKeyCommand;
 import org.jboss.cache.commands.write.RemoveNodeCommand;
 import org.jboss.cache.config.Configuration;
+import org.jboss.cache.config.Configuration.NodeLockingScheme;
+import org.jboss.cache.config.ConfigurationException;
 import org.jboss.cache.config.Option;
-import org.jboss.cache.config.Configuration.NodeLockingScheme;
 import org.jboss.cache.factories.annotations.Inject;
 import org.jboss.cache.factories.annotations.NonVolatile;
 import org.jboss.cache.factories.annotations.Start;
@@ -55,6 +49,13 @@
 import org.jboss.cache.util.Immutables;
 import org.jgroups.Address;
 
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
 /**
  * The delegate that users (and ChainedInterceptor authors) interact with when they create a cache by using a cache factory.
  * This wrapper delegates calls down the interceptor chain.
@@ -81,12 +82,13 @@
    private CommandsFactory commandsFactory;
    private MVCCNodeHelper mvccHelper;
    private boolean usingMvcc;
+   private BatchContainer batchContainer;
 
    @Inject
    public void initialize(StateTransferManager stateTransferManager, CacheLoaderManager cacheLoaderManager, Notifier notifier,
                           TransactionManager transactionManager, BuddyManager buddyManager, TransactionTable transactionTable,
                           RPCManager rpcManager, RegionManager regionManager, Marshaller marshaller,
-                          CommandsFactory commandsFactory, DataContainer dataContainer, MVCCNodeHelper mvccHelper)
+                          CommandsFactory commandsFactory, DataContainer dataContainer, MVCCNodeHelper mvccHelper, BatchContainer batchContainer)
    {
       this.stateTransferManager = stateTransferManager;
       this.cacheLoaderManager = cacheLoaderManager;
@@ -100,6 +102,7 @@
       this.dataContainer = dataContainer;
       this.commandsFactory = commandsFactory;
       this.mvccHelper = mvccHelper;
+      this.batchContainer = batchContainer;
    }
 
    @Start
@@ -582,6 +585,20 @@
       invoker.invoke(ctx, commandsFactory.buildClearDataCommand(tx, fqn));
    }
 
+   public void startBatch()
+   {
+      if (!configuration.isInvocationBatchingEnabled())
+         throw new ConfigurationException("Invocation batching not enabled in current configuration!  Please use the <invocationBatching /> element.");
+      batchContainer.startBatch();
+   }
+
+   public void endBatch(boolean successful)
+   {
+      if (!configuration.isInvocationBatchingEnabled())
+         throw new ConfigurationException("Invocation batching not enabled in current configuration!  Please use the <invocationBatching /> element.");
+      batchContainer.endBatch(successful);
+   }
+
    @SuppressWarnings("unchecked")
    public Set<Object> getChildrenNames(Fqn fqn)
    {

Modified: core/trunk/src/main/java/org/jboss/cache/transaction/BatchModeTransactionManagerLookup.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/transaction/BatchModeTransactionManagerLookup.java	2008-08-07 16:04:06 UTC (rev 6539)
+++ core/trunk/src/main/java/org/jboss/cache/transaction/BatchModeTransactionManagerLookup.java	2008-08-07 16:45:39 UTC (rev 6540)
@@ -1,6 +1,9 @@
 package org.jboss.cache.transaction;
 
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
 import javax.transaction.TransactionManager;
 
 
@@ -9,10 +12,16 @@
  *
  * @author Bela Ban Sept 5 2003
  * @version $Id$
+ * @deprecated Use batching API on Cache instead.
  */
-public class BatchModeTransactionManagerLookup implements TransactionManagerLookup {
+ at Deprecated
+public class BatchModeTransactionManagerLookup implements TransactionManagerLookup
+{
+   private Log log = LogFactory.getLog(BatchModeTransactionManagerLookup.class);
 
-   public TransactionManager getTransactionManager() throws Exception {
+   public TransactionManager getTransactionManager() throws Exception
+   {
+      log.warn("Using a deprecated/unsupported transaction manager!");
       return BatchModeTransactionManager.getInstance();
    }
 }

Modified: core/trunk/src/main/resources/jbosscache-config-3.0.xsd
===================================================================
--- core/trunk/src/main/resources/jbosscache-config-3.0.xsd	2008-08-07 16:04:06 UTC (rev 6539)
+++ core/trunk/src/main/resources/jbosscache-config-3.0.xsd	2008-08-07 16:45:39 UTC (rev 6540)
@@ -15,10 +15,11 @@
             <xs:element name="invalidation" type="invalidationType" minOccurs="0" maxOccurs="1"/>
             <xs:element name="jmxStatistics" type="jmxStatisticsType" minOccurs="0" maxOccurs="1"/>
             <xs:element name="listeners" type="listenersType" minOccurs="0" maxOccurs="1"/>
+            <xs:element name="invocationBatching" type="invocationBatchingType" minOccurs="0" maxOccurs="1"/>
             <xs:element name="transport" type="transportType" minOccurs="0" maxOccurs="1"/>
             <xs:element name="eviction" type="evictionType" minOccurs="0" maxOccurs="1"/>
             <xs:element name="loaders" type="loadersType" minOccurs="0" maxOccurs="1"/>
-            <xs:element name="customInterceptors" type="customInterceptorsType" minOccurs="0"/>
+            <xs:element name="customInterceptors" type="customInterceptorsType" minOccurs="0" maxOccurs="1"/>
          </xs:all>
       </xs:complexType>
    </xs:element>
@@ -138,6 +139,10 @@
       <xs:attribute name="asyncPoolSize" type="positiveInteger"/>
    </xs:complexType>
 
+   <xs:complexType name="invocationBatchingType">
+      <xs:attribute name="enabled" type="booleanType"/>
+   </xs:complexType>
+
    <xs:complexType name="transportType">
       <xs:sequence>
          <xs:element name="jgroupsConfig" type="xs:anyType" minOccurs="0" maxOccurs="1"/>

Added: core/trunk/src/test/java/org/jboss/cache/api/batch/AbstractBatchTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/api/batch/AbstractBatchTest.java	                        (rev 0)
+++ core/trunk/src/test/java/org/jboss/cache/api/batch/AbstractBatchTest.java	2008-08-07 16:45:39 UTC (rev 6540)
@@ -0,0 +1,24 @@
+package org.jboss.cache.api.batch;
+
+import org.jboss.cache.Cache;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+public abstract class AbstractBatchTest
+{
+   protected String getOnDifferentThread(final Cache<String, String> cache, final String fqn, final String key) throws InterruptedException
+   {
+      final AtomicReference<String> ref = new AtomicReference<String>();
+      Thread t = new Thread()
+      {
+         public void run()
+         {
+            ref.set(cache.get(fqn, key));
+         }
+      };
+
+      t.start();
+      t.join();
+      return ref.get();
+   }
+}

Added: core/trunk/src/test/java/org/jboss/cache/api/batch/BatchWithTM.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/api/batch/BatchWithTM.java	                        (rev 0)
+++ core/trunk/src/test/java/org/jboss/cache/api/batch/BatchWithTM.java	2008-08-07 16:45:39 UTC (rev 6540)
@@ -0,0 +1,124 @@
+package org.jboss.cache.api.batch;
+
+import org.jboss.cache.Cache;
+import org.jboss.cache.CacheFactory;
+import org.jboss.cache.DefaultCacheFactory;
+import org.jboss.cache.config.Configuration;
+import org.jboss.cache.config.Configuration.CacheMode;
+import org.jboss.cache.config.Configuration.NodeLockingScheme;
+import org.jboss.cache.factories.UnitTestCacheConfigurationFactory;
+import org.jboss.cache.util.TestingUtil;
+import org.testng.annotations.Test;
+
+import javax.transaction.TransactionManager;
+
+ at Test(groups = {"functional", "transaction"})
+public class BatchWithTM extends AbstractBatchTest
+{
+   public void testBatchWithOngoingTM() throws Exception
+   {
+      Cache<String, String> cache = null;
+      try
+      {
+         cache = createCache();
+         TransactionManager tm = getTransactionManager(cache);
+         tm.begin();
+         cache.put("/a/b/c", "k", "v");
+         cache.startBatch();
+         cache.put("/a/b/c", "k2", "v2");
+         tm.commit();
+
+         assert "v".equals(cache.get("/a/b/c", "k"));
+         assert "v2".equals(cache.get("/a/b/c", "k2"));
+
+         cache.endBatch(false); // should be a no op
+         assert "v".equals(cache.get("/a/b/c", "k"));
+         assert "v2".equals(cache.get("/a/b/c", "k2"));
+      }
+      finally
+      {
+         TestingUtil.killCaches(cache);
+      }
+   }
+
+   public void testBatchWithoutOngoingTMSuspension() throws Exception
+   {
+      Cache<String, String> cache = null;
+      try
+      {
+         cache = createCache();
+         TransactionManager tm = getTransactionManager(cache);
+         assert tm.getTransaction() == null : "Should have no ongoing txs";
+         cache.startBatch();
+         cache.put("/a/b/c", "k", "v");
+         assert tm.getTransaction() == null : "Should have no ongoing txs";
+         cache.put("/a/b/c", "k2", "v2");
+
+         assert getOnDifferentThread(cache, "/a/b/c", "k") == null;
+         assert getOnDifferentThread(cache, "/a/b/c", "k2") == null;
+
+         try
+         {
+            tm.commit(); // should have no effect
+         }
+         catch (Exception e)
+         {
+            // the TM may barf here ... this is OK.
+         }
+
+         assert tm.getTransaction() == null : "Should have no ongoing txs";
+
+         assert getOnDifferentThread(cache, "/a/b/c", "k") == null;
+         assert getOnDifferentThread(cache, "/a/b/c", "k2") == null;
+
+         cache.endBatch(true); // should be a no op
+
+         assert "v".equals(getOnDifferentThread(cache, "/a/b/c", "k"));
+         assert "v2".equals(getOnDifferentThread(cache, "/a/b/c", "k2"));
+      }
+      finally
+      {
+         TestingUtil.killCaches(cache);
+      }
+   }
+
+   public void testBatchRollback() throws Exception
+   {
+      Cache<String, String> cache = null;
+      try
+      {
+         cache = createCache();
+         TransactionManager tm = getTransactionManager(cache);
+         cache.startBatch();
+         cache.put("/a/b/c", "k", "v");
+         cache.put("/a/b/c", "k2", "v2");
+
+         assert getOnDifferentThread(cache, "/a/b/c", "k") == null;
+         assert getOnDifferentThread(cache, "/a/b/c", "k2") == null;
+
+         cache.endBatch(false);
+
+         assert getOnDifferentThread(cache, "/a/b/c", "k") == null;
+         assert getOnDifferentThread(cache, "/a/b/c", "k2") == null;
+      }
+      finally
+      {
+         TestingUtil.killCaches(cache);
+      }
+   }
+
+   private TransactionManager getTransactionManager(Cache<String, String> c)
+   {
+      return c.getConfiguration().getRuntimeConfig().getTransactionManager();
+   }
+
+   private Cache<String, String> createCache()
+   {
+      CacheFactory<String, String> cf = new DefaultCacheFactory<String, String>();
+      Configuration c = UnitTestCacheConfigurationFactory.createConfiguration(CacheMode.LOCAL); // this should pick up any configured TM for the test
+      c.setNodeLockingScheme(NodeLockingScheme.MVCC);
+      c.setInvocationBatchingEnabled(true);
+      assert c.getTransactionManagerLookupClass() != null : "Should have a transaction manager lookup class attached!!";
+      return cf.createCache(c);
+   }
+}

Added: core/trunk/src/test/java/org/jboss/cache/api/batch/BatchWithoutTM.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/api/batch/BatchWithoutTM.java	                        (rev 0)
+++ core/trunk/src/test/java/org/jboss/cache/api/batch/BatchWithoutTM.java	2008-08-07 16:45:39 UTC (rev 6540)
@@ -0,0 +1,146 @@
+package org.jboss.cache.api.batch;
+
+import org.jboss.cache.Cache;
+import org.jboss.cache.CacheFactory;
+import org.jboss.cache.DefaultCacheFactory;
+import org.jboss.cache.config.Configuration;
+import org.jboss.cache.config.Configuration.NodeLockingScheme;
+import org.jboss.cache.config.ConfigurationException;
+import org.jboss.cache.util.TestingUtil;
+import org.testng.annotations.Test;
+
+ at Test(groups = "functional")
+public class BatchWithoutTM extends AbstractBatchTest
+{
+   public void testBatchWithoutCfg()
+   {
+      Cache<String, String> cache = null;
+      try
+      {
+         cache = createCache(false);
+         try
+         {
+            cache.startBatch();
+            assert false : "Should have failed";
+         }
+         catch (ConfigurationException good)
+         {
+            // do nothing
+         }
+
+         try
+         {
+            cache.endBatch(true);
+            assert false : "Should have failed";
+         }
+         catch (ConfigurationException good)
+         {
+            // do nothing
+         }
+
+         try
+         {
+            cache.endBatch(false);
+            assert false : "Should have failed";
+         }
+         catch (ConfigurationException good)
+         {
+            // do nothing
+         }
+      }
+      finally
+      {
+         TestingUtil.killCaches(cache);
+      }
+   }
+
+   public void testEndBatchWithoutStartBatch()
+   {
+      Cache<String, String> cache = null;
+      try
+      {
+         cache = createCache(true);
+         cache.endBatch(true);
+         cache.endBatch(false);
+         // should not fail.
+      }
+      finally
+      {
+         TestingUtil.killCaches(cache);
+      }
+   }
+
+   public void testStartBatchIdempotency()
+   {
+      Cache<String, String> cache = null;
+      try
+      {
+         cache = createCache(true);
+         cache.startBatch();
+         cache.put("/a/b/c", "k", "v");
+         cache.startBatch();     // again
+         cache.put("/a/b/c", "k2", "v2");
+         cache.endBatch(true);
+
+         assert "v".equals(cache.get("/a/b/c", "k"));
+         assert "v2".equals(cache.get("/a/b/c", "k2"));
+      }
+      finally
+      {
+         TestingUtil.killCaches(cache);
+      }
+   }
+
+   public void testBatchVisibility() throws InterruptedException
+   {
+      Cache<String, String> cache = null;
+      try
+      {
+         cache = createCache(true);
+         cache.startBatch();
+         cache.put("/a/b/c", "k", "v");
+         assert getOnDifferentThread(cache, "/a/b/c", "k") == null : "Other thread should not see batch update till batch completes!";
+
+         cache.endBatch(true);
+
+         assert "v".equals(getOnDifferentThread(cache, "/a/b/c", "k"));
+      }
+      finally
+      {
+         TestingUtil.killCaches(cache);
+      }
+   }
+
+   public void testBatchRollback() throws Exception
+   {
+      Cache<String, String> cache = null;
+      try
+      {
+         cache = createCache(true);
+         cache.startBatch();
+         cache.put("/a/b/c", "k", "v");
+         cache.put("/a/b/c", "k2", "v2");
+
+         assert getOnDifferentThread(cache, "/a/b/c", "k") == null;
+         assert getOnDifferentThread(cache, "/a/b/c", "k2") == null;
+
+         cache.endBatch(false);
+
+         assert getOnDifferentThread(cache, "/a/b/c", "k") == null;
+         assert getOnDifferentThread(cache, "/a/b/c", "k2") == null;
+      }
+      finally
+      {
+         TestingUtil.killCaches(cache);
+      }
+   }
+
+   private Cache<String, String> createCache(boolean enableBatch)
+   {
+      CacheFactory<String, String> cf = new DefaultCacheFactory<String, String>();
+      Configuration c = new Configuration();
+      c.setNodeLockingScheme(NodeLockingScheme.MVCC);
+      c.setInvocationBatchingEnabled(enableBatch);
+      return cf.createCache(c);
+   }
+}

Modified: core/trunk/src/test/java/org/jboss/cache/config/parsing/XmlConfigurationParserTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/config/parsing/XmlConfigurationParserTest.java	2008-08-07 16:04:06 UTC (rev 6539)
+++ core/trunk/src/test/java/org/jboss/cache/config/parsing/XmlConfigurationParserTest.java	2008-08-07 16:45:39 UTC (rev 6540)
@@ -269,4 +269,9 @@
    {
       assert config.getListenerAsyncPoolSize() == 5;
    }
+
+   public void testInvocationBatching()
+   {
+      assert config.isInvocationBatchingEnabled();
+   }
 }

Modified: core/trunk/src/test/java/org/jboss/cache/factories/InterceptorChainFactoryTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/factories/InterceptorChainFactoryTest.java	2008-08-07 16:04:06 UTC (rev 6539)
+++ core/trunk/src/test/java/org/jboss/cache/factories/InterceptorChainFactoryTest.java	2008-08-07 16:45:39 UTC (rev 6540)
@@ -67,7 +67,29 @@
       assertInterceptorLinkage(list);
    }
 
+   public void testBatchingConfig() throws Exception
+   {
+      cache.getConfiguration().setExposeManagementStatistics(false);
+      cache.getConfiguration().setInvocationBatchingEnabled(true);
+      InterceptorChain chain = getInterceptorChainFactory(cache).buildInterceptorChain();
+      List<CommandInterceptor> list = chain.asList();
+      Iterator<CommandInterceptor> interceptors = list.iterator();
 
+      System.out.println("testBareConfig interceptors are:\n" + list);
+      assertNotNull(list);
+      assertEquals(6, list.size());
+
+      assertEquals(BatchingInterceptor.class, interceptors.next().getClass());
+      assertEquals(InvocationContextInterceptor.class, interceptors.next().getClass());
+      assertEquals(TxInterceptor.class, interceptors.next().getClass());
+      assertEquals(NotificationInterceptor.class, interceptors.next().getClass());
+      assertEquals(PessimisticLockInterceptor.class, interceptors.next().getClass());
+      assertEquals(CallInterceptor.class, interceptors.next().getClass());
+
+      assertInterceptorLinkage(list);
+   }
+
+
    public void testMvccConfig() throws Exception
    {
       cache.getConfiguration().setExposeManagementStatistics(false);

Modified: core/trunk/src/test/resources/configs/parser-test.xml
===================================================================
--- core/trunk/src/test/resources/configs/parser-test.xml	2008-08-07 16:04:06 UTC (rev 6539)
+++ core/trunk/src/test/resources/configs/parser-test.xml	2008-08-07 16:45:39 UTC (rev 6540)
@@ -113,4 +113,5 @@
 
    <!-- the number of threads to use for asynchronous cache listeners - defaults to 1 -->
    <listeners asyncPoolSize="5"/>
+   <invocationBatching enabled="true"/>
 </jbosscache>




More information about the jbosscache-commits mailing list