[jbosscache-commits] JBoss Cache SVN: r6540 - in core/trunk/src: main/java/org/jboss/cache/batch and 12 other directories.
jbosscache-commits at lists.jboss.org
jbosscache-commits at lists.jboss.org
Thu Aug 7 12:45:40 EDT 2008
Author: manik.surtani at jboss.com
Date: 2008-08-07 12:45:39 -0400 (Thu, 07 Aug 2008)
New Revision: 6540
Added:
core/trunk/src/main/java/org/jboss/cache/batch/
core/trunk/src/main/java/org/jboss/cache/batch/BatchContainer.java
core/trunk/src/main/java/org/jboss/cache/interceptors/BatchingInterceptor.java
core/trunk/src/test/java/org/jboss/cache/api/batch/
core/trunk/src/test/java/org/jboss/cache/api/batch/AbstractBatchTest.java
core/trunk/src/test/java/org/jboss/cache/api/batch/BatchWithTM.java
core/trunk/src/test/java/org/jboss/cache/api/batch/BatchWithoutTM.java
Modified:
core/trunk/src/main/java/org/jboss/cache/Cache.java
core/trunk/src/main/java/org/jboss/cache/CacheSPI.java
core/trunk/src/main/java/org/jboss/cache/config/Configuration.java
core/trunk/src/main/java/org/jboss/cache/config/parsing/XmlConfigurationParser.java
core/trunk/src/main/java/org/jboss/cache/factories/EmptyConstructorFactory.java
core/trunk/src/main/java/org/jboss/cache/factories/InterceptorChainFactory.java
core/trunk/src/main/java/org/jboss/cache/factories/TransactionManagerFactory.java
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticLockingInterceptor.java
core/trunk/src/main/java/org/jboss/cache/invocation/CacheInvocationDelegate.java
core/trunk/src/main/java/org/jboss/cache/transaction/BatchModeTransactionManagerLookup.java
core/trunk/src/main/resources/jbosscache-config-3.0.xsd
core/trunk/src/test/java/org/jboss/cache/config/parsing/XmlConfigurationParserTest.java
core/trunk/src/test/java/org/jboss/cache/factories/InterceptorChainFactoryTest.java
core/trunk/src/test/resources/configs/parser-test.xml
Log:
JBCACHE-991 - Batching API
Modified: core/trunk/src/main/java/org/jboss/cache/Cache.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/Cache.java 2008-08-07 16:04:06 UTC (rev 6539)
+++ core/trunk/src/main/java/org/jboss/cache/Cache.java 2008-08-07 16:45:39 UTC (rev 6540)
@@ -495,4 +495,30 @@
*/
void clearData(Fqn fqn);
+ /**
+ * Starts a batch. This is a lightweight batching mechanism that groups cache writes together and finally performs the
+ * write, persistence and/or replication when {@link #endBatch(boolean)} is called rather than for each invocation on the
+ * cache.
+ * <p/>
+ * Note that if there is an existing transaction in scope and the cache has been configured to use a JTA compliant
+ * transaction manager, calls to {@link #startBatch()} and {@link #endBatch(boolean)} are ignored and treated as no-ops.
+ * <p/>
+ *
+ * @see #endBatch(boolean)
+ * @since 3.0
+ */
+ void startBatch();
+
+ /**
+ * Ends an existing ongoing batch. A no-op if a batch has not been started yet.
+ * <p/>
+ * Note that if there is an existing transaction in scope and the cache has been configured to use a JTA compliant
+ * transaction manager, calls to {@link #startBatch()} and {@link #endBatch(boolean)} are ignored and treated as no-ops.
+ * <p/>
+ *
+ * @param successful if <tt>true</tt>, changes made in the batch are committed. If <tt>false</tt>, they are discarded.
+ * @see #startBatch()
+ * @since 3.0
+ */
+ void endBatch(boolean successful);
}
Modified: core/trunk/src/main/java/org/jboss/cache/CacheSPI.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/CacheSPI.java 2008-08-07 16:04:06 UTC (rev 6539)
+++ core/trunk/src/main/java/org/jboss/cache/CacheSPI.java 2008-08-07 16:45:39 UTC (rev 6540)
@@ -39,7 +39,6 @@
* @see NodeSPI
* @see Cache
* @see org.jboss.cache.loader.CacheLoader
- * @see org.jboss.cache.interceptors.ChainedInterceptor
* @since 2.0.0
*/
@ThreadSafe
Added: core/trunk/src/main/java/org/jboss/cache/batch/BatchContainer.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/batch/BatchContainer.java (rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/batch/BatchContainer.java 2008-08-07 16:45:39 UTC (rev 6540)
@@ -0,0 +1,80 @@
+package org.jboss.cache.batch;
+
+import org.jboss.cache.CacheException;
+import org.jboss.cache.factories.annotations.Inject;
+
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+
+/**
+ * A container for holding thread locals for batching, to be used with the {@link org.jboss.cache.Cache#startBatch()} and
+ * {@link org.jboss.cache.Cache#endBatch(boolean)} calls.
+ *
+ * @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
+ * @since 3.0
+ */
+public class BatchContainer
+{
+ TransactionManager transactionManager;
+ private ThreadLocal<Transaction> batchTransactionContainer = new ThreadLocal<Transaction>();
+
+ @Inject
+ void inject(TransactionManager transactionManager)
+ {
+ this.transactionManager = transactionManager;
+ }
+
+ public void startBatch() throws CacheException
+ {
+ try
+ {
+ if (transactionManager.getTransaction() != null) return;
+ if (batchTransactionContainer.get() == null)
+ {
+ transactionManager.begin();
+ batchTransactionContainer.set(transactionManager.suspend());
+ }
+ }
+ catch (Exception e)
+ {
+ throw new CacheException("Unable to start batch", e);
+ }
+ }
+
+ public void endBatch(boolean success)
+ {
+ Transaction tx = batchTransactionContainer.get();
+ if (tx == null) return;
+ Transaction existingTx = null;
+ try
+ {
+ existingTx = transactionManager.getTransaction();
+ transactionManager.resume(tx);
+ if (success)
+ tx.commit();
+ else
+ tx.rollback();
+ }
+ catch (Exception e)
+ {
+ throw new CacheException("Unable to end batch", e);
+ }
+ finally
+ {
+ batchTransactionContainer.remove();
+ try
+ {
+ if (existingTx != null) transactionManager.resume(existingTx);
+ }
+ catch (Exception e)
+ {
+ throw new CacheException("Failed resuming existing transaction " + existingTx, e);
+ }
+ }
+ }
+
+ public Transaction getBatchTransaction()
+ {
+ return batchTransactionContainer.get();
+ }
+}
Modified: core/trunk/src/main/java/org/jboss/cache/config/Configuration.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/config/Configuration.java 2008-08-07 16:04:06 UTC (rev 6539)
+++ core/trunk/src/main/java/org/jboss/cache/config/Configuration.java 2008-08-07 16:45:39 UTC (rev 6540)
@@ -32,6 +32,7 @@
private Marshaller marshaller;
private JGroupsStackParser jGroupsStackParser = new JGroupsStackParser();
+ private boolean invocationBatchingEnabled;
/**
* Behavior of the JVM shutdown hook registered by the cache
@@ -299,6 +300,19 @@
this.exposeManagementStatistics = useMbean;
}
+ /**
+ * Enables invocation batching if set to <tt>true</tt>. You still need to use {@link org.jboss.cache.Cache#startBatch()}
+ * and {@link org.jboss.cache.Cache#endBatch(boolean)} to demarcate the start and end of batches.
+ *
+ * @param enabled if true, batching is enabled.
+ * @since 3.0
+ */
+ public void setInvocationBatchingEnabled(boolean enabled)
+ {
+ testImmutability("invocationBatchingEnabled");
+ this.invocationBatchingEnabled = enabled;
+ }
+
public void setFetchInMemoryState(boolean fetchInMemoryState)
{
testImmutability("fetchInMemoryState");
@@ -652,6 +666,15 @@
return exposeManagementStatistics;
}
+ /**
+ * @return true if invocation batching is enabled.
+ * @since 3.0
+ */
+ public boolean isInvocationBatchingEnabled()
+ {
+ return invocationBatchingEnabled;
+ }
+
public boolean isFetchInMemoryState()
{
return fetchInMemoryState;
Modified: core/trunk/src/main/java/org/jboss/cache/config/parsing/XmlConfigurationParser.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/config/parsing/XmlConfigurationParser.java 2008-08-07 16:04:06 UTC (rev 6539)
+++ core/trunk/src/main/java/org/jboss/cache/config/parsing/XmlConfigurationParser.java 2008-08-07 16:45:39 UTC (rev 6540)
@@ -52,8 +52,8 @@
* <p/>
* Following system properties can be used for customizing parser behavior:
* <ul>
- * <li> <b>-Djbosscache.config.validate=false</b> will make the parser non-validating </li>
- * <li> <b>-Djbosscache.config.schemaLocation=url</b> allows one to specify a validation schema that would override the one specified in the the xml document </li>
+ * <li> <b>-Djbosscache.config.validate=false</b> will make the parser non-validating </li>
+ * <li> <b>-Djbosscache.config.schemaLocation=url</b> allows one to specify a validation schema that would override the one specified in the the xml document </li>
* </ul>
* This class is stateful and one instance should be used for parsing a single configuration file.
*
@@ -173,6 +173,7 @@
configureCacheLoaders(getSingleElement("loaders"));
configureCustomInterceptors(getSingleElement("customInterceptors"));
configureListeners(getSingleElement("listeners"));
+ configureInvocationBatching(getSingleElement("invocationBatching"));
}
catch (Exception e)
{
@@ -248,6 +249,13 @@
}
}
+ private void configureInvocationBatching(Element element)
+ {
+ if (element == null) return; //this element is optional
+ boolean enabled = getBoolean(getAttributeValue(element, "enabled"));
+ config.setInvocationBatchingEnabled(enabled);
+ }
+
private void configureBuddyReplication(Element element)
{
if (element == null) return;//buddy config might not exist, expect that
Modified: core/trunk/src/main/java/org/jboss/cache/factories/EmptyConstructorFactory.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/factories/EmptyConstructorFactory.java 2008-08-07 16:04:06 UTC (rev 6539)
+++ core/trunk/src/main/java/org/jboss/cache/factories/EmptyConstructorFactory.java 2008-08-07 16:45:39 UTC (rev 6540)
@@ -2,6 +2,7 @@
import org.jboss.cache.DataContainer;
import org.jboss.cache.RegionRegistry;
+import org.jboss.cache.batch.BatchContainer;
import org.jboss.cache.buddyreplication.BuddyFqnTransformer;
import org.jboss.cache.config.ConfigurationException;
import org.jboss.cache.factories.annotations.DefaultFactoryFor;
@@ -25,7 +26,7 @@
@DefaultFactoryFor(classes = {Notifier.class, MVCCNodeHelper.class, RegionRegistry.class,
ChannelMessageListener.class, CacheLoaderManager.class, Marshaller.class, InvocationContextContainer.class,
CacheInvocationDelegate.class, TransactionTable.class, DataContainer.class,
- LockStrategyFactory.class, BuddyFqnTransformer.class})
+ LockStrategyFactory.class, BuddyFqnTransformer.class, BatchContainer.class})
public class EmptyConstructorFactory extends ComponentFactory
{
@Override
Modified: core/trunk/src/main/java/org/jboss/cache/factories/InterceptorChainFactory.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/factories/InterceptorChainFactory.java 2008-08-07 16:04:06 UTC (rev 6539)
+++ core/trunk/src/main/java/org/jboss/cache/factories/InterceptorChainFactory.java 2008-08-07 16:45:39 UTC (rev 6540)
@@ -56,13 +56,18 @@
public InterceptorChain buildInterceptorChain() throws IllegalAccessException, InstantiationException, ClassNotFoundException
{
boolean optimistic = configuration.getNodeLockingScheme() == NodeLockingScheme.OPTIMISTIC;
+ boolean invocationBatching = configuration.isInvocationBatchingEnabled();
// load the icInterceptor first
- CommandInterceptor first = createInterceptor(InvocationContextInterceptor.class);
+ CommandInterceptor first = invocationBatching ? createInterceptor(BatchingInterceptor.class) : createInterceptor(InvocationContextInterceptor.class);
InterceptorChain interceptorChain = new InterceptorChain(first);
// add the interceptor chain to the registry first, since some interceptors may ask for it.
componentRegistry.registerComponent(interceptorChain, InterceptorChain.class);
+ // NOW add the ICI if we are using batching!
+ if (invocationBatching)
+ interceptorChain.appendIntereceptor(createInterceptor(InvocationContextInterceptor.class));
+
// load the cache management interceptor next
if (configuration.getExposeManagementStatistics())
interceptorChain.appendIntereceptor(createInterceptor(CacheMgmtInterceptor.class));
Modified: core/trunk/src/main/java/org/jboss/cache/factories/TransactionManagerFactory.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/factories/TransactionManagerFactory.java 2008-08-07 16:04:06 UTC (rev 6539)
+++ core/trunk/src/main/java/org/jboss/cache/factories/TransactionManagerFactory.java 2008-08-07 16:45:39 UTC (rev 6540)
@@ -2,6 +2,7 @@
import org.jboss.cache.config.ConfigurationException;
import org.jboss.cache.factories.annotations.DefaultFactoryFor;
+import org.jboss.cache.transaction.BatchModeTransactionManager;
import org.jboss.cache.transaction.TransactionManagerLookup;
import javax.transaction.TransactionManager;
@@ -52,6 +53,12 @@
log.info("failed looking up TransactionManager, will not use transactions", e);
}
}
+
+ if (transactionManager == null && configuration.isInvocationBatchingEnabled())
+ {
+ log.info("Using a batchMode transaction manager");
+ transactionManager = BatchModeTransactionManager.getInstance();
+ }
return componentType.cast(transactionManager);
}
}
Added: core/trunk/src/main/java/org/jboss/cache/interceptors/BatchingInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/BatchingInterceptor.java (rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/BatchingInterceptor.java 2008-08-07 16:45:39 UTC (rev 6540)
@@ -0,0 +1,58 @@
+package org.jboss.cache.interceptors;
+
+import org.jboss.cache.batch.BatchContainer;
+import org.jboss.cache.commands.VisitableCommand;
+import org.jboss.cache.factories.annotations.Inject;
+import org.jboss.cache.interceptors.base.CommandInterceptor;
+import org.jboss.cache.invocation.InvocationContext;
+
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+
+/**
+ * Interceptor that captures batched calls and attaches contexts.
+ *
+ * @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
+ * @since 3.0
+ */
+public class BatchingInterceptor extends CommandInterceptor
+{
+ BatchContainer batchContainer;
+ TransactionManager transactionManager;
+
+ @Inject
+ private void inject(BatchContainer batchContainer, TransactionManager transactionManager)
+ {
+ this.batchContainer = batchContainer;
+ this.transactionManager = transactionManager;
+ }
+
+ /**
+ * Simply check if there is an ongoing tx.
+ * <ul>
+ * <li>If there is one, this is a no-op and just passes the call up the chain.</li>
+ * <li>If there isn't one and there is a batch in progress, resume the batch's tx, pass up, and finally suspend the batch's tx.</li>
+ * <li>If there is no batch in progress, just pass the call up the chain.</li>
+ * </ul>
+ */
+ @Override
+ protected Object handleDefault(InvocationContext ctx, VisitableCommand command) throws Throwable
+ {
+ Transaction tx = null;
+ try
+ {
+ // if in a batch, attach tx
+ if (transactionManager.getTransaction() == null &&
+ (tx = batchContainer.getBatchTransaction()) != null)
+ {
+ transactionManager.resume(tx);
+ }
+ return super.handleDefault(ctx, command);
+ }
+ finally
+ {
+ if (tx != null && transactionManager.getTransaction() != null)
+ transactionManager.suspend();
+ }
+ }
+}
Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticLockingInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticLockingInterceptor.java 2008-08-07 16:04:06 UTC (rev 6539)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticLockingInterceptor.java 2008-08-07 16:45:39 UTC (rev 6540)
@@ -18,6 +18,7 @@
import static org.jboss.cache.lock.LockType.WRITE;
import org.jboss.cache.optimistic.TransactionWorkspace;
import org.jboss.cache.optimistic.WorkspaceNode;
+import org.jboss.cache.transaction.BatchModeTransactionManager;
import org.jboss.cache.transaction.GlobalTransaction;
import org.jboss.cache.transaction.TransactionContext;
@@ -35,7 +36,7 @@
@Start
private void init()
{
- if (txManager == null)
+ if (txManager == null || txManager.getClass().equals(BatchModeTransactionManager.class))
log.fatal("No transaction manager lookup class has been defined. Transactions cannot be used and thus OPTIMISTIC locking cannot be used! Expect errors!!");
}
Modified: core/trunk/src/main/java/org/jboss/cache/invocation/CacheInvocationDelegate.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/invocation/CacheInvocationDelegate.java 2008-08-07 16:04:06 UTC (rev 6539)
+++ core/trunk/src/main/java/org/jboss/cache/invocation/CacheInvocationDelegate.java 2008-08-07 16:45:39 UTC (rev 6540)
@@ -1,13 +1,5 @@
package org.jboss.cache.invocation;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import javax.transaction.Transaction;
-import javax.transaction.TransactionManager;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.cache.CacheException;
@@ -21,6 +13,7 @@
import org.jboss.cache.Region;
import org.jboss.cache.RegionManager;
import org.jboss.cache.Version;
+import org.jboss.cache.batch.BatchContainer;
import org.jboss.cache.buddyreplication.BuddyManager;
import org.jboss.cache.buddyreplication.GravitateResult;
import org.jboss.cache.commands.CommandsFactory;
@@ -39,8 +32,9 @@
import org.jboss.cache.commands.write.RemoveKeyCommand;
import org.jboss.cache.commands.write.RemoveNodeCommand;
import org.jboss.cache.config.Configuration;
+import org.jboss.cache.config.Configuration.NodeLockingScheme;
+import org.jboss.cache.config.ConfigurationException;
import org.jboss.cache.config.Option;
-import org.jboss.cache.config.Configuration.NodeLockingScheme;
import org.jboss.cache.factories.annotations.Inject;
import org.jboss.cache.factories.annotations.NonVolatile;
import org.jboss.cache.factories.annotations.Start;
@@ -55,6 +49,13 @@
import org.jboss.cache.util.Immutables;
import org.jgroups.Address;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
/**
* The delegate that users (and ChainedInterceptor authors) interact with when they create a cache by using a cache factory.
* This wrapper delegates calls down the interceptor chain.
@@ -81,12 +82,13 @@
private CommandsFactory commandsFactory;
private MVCCNodeHelper mvccHelper;
private boolean usingMvcc;
+ private BatchContainer batchContainer;
@Inject
public void initialize(StateTransferManager stateTransferManager, CacheLoaderManager cacheLoaderManager, Notifier notifier,
TransactionManager transactionManager, BuddyManager buddyManager, TransactionTable transactionTable,
RPCManager rpcManager, RegionManager regionManager, Marshaller marshaller,
- CommandsFactory commandsFactory, DataContainer dataContainer, MVCCNodeHelper mvccHelper)
+ CommandsFactory commandsFactory, DataContainer dataContainer, MVCCNodeHelper mvccHelper, BatchContainer batchContainer)
{
this.stateTransferManager = stateTransferManager;
this.cacheLoaderManager = cacheLoaderManager;
@@ -100,6 +102,7 @@
this.dataContainer = dataContainer;
this.commandsFactory = commandsFactory;
this.mvccHelper = mvccHelper;
+ this.batchContainer = batchContainer;
}
@Start
@@ -582,6 +585,20 @@
invoker.invoke(ctx, commandsFactory.buildClearDataCommand(tx, fqn));
}
+ public void startBatch()
+ {
+ if (!configuration.isInvocationBatchingEnabled())
+ throw new ConfigurationException("Invocation batching not enabled in current configuration! Please use the <invocationBatching /> element.");
+ batchContainer.startBatch();
+ }
+
+ public void endBatch(boolean successful)
+ {
+ if (!configuration.isInvocationBatchingEnabled())
+ throw new ConfigurationException("Invocation batching not enabled in current configuration! Please use the <invocationBatching /> element.");
+ batchContainer.endBatch(successful);
+ }
+
@SuppressWarnings("unchecked")
public Set<Object> getChildrenNames(Fqn fqn)
{
Modified: core/trunk/src/main/java/org/jboss/cache/transaction/BatchModeTransactionManagerLookup.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/transaction/BatchModeTransactionManagerLookup.java 2008-08-07 16:04:06 UTC (rev 6539)
+++ core/trunk/src/main/java/org/jboss/cache/transaction/BatchModeTransactionManagerLookup.java 2008-08-07 16:45:39 UTC (rev 6540)
@@ -1,6 +1,9 @@
package org.jboss.cache.transaction;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
import javax.transaction.TransactionManager;
@@ -9,10 +12,16 @@
*
* @author Bela Ban Sept 5 2003
* @version $Id$
+ * @deprecated Use batching API on Cache instead.
*/
-public class BatchModeTransactionManagerLookup implements TransactionManagerLookup {
+ at Deprecated
+public class BatchModeTransactionManagerLookup implements TransactionManagerLookup
+{
+ private Log log = LogFactory.getLog(BatchModeTransactionManagerLookup.class);
- public TransactionManager getTransactionManager() throws Exception {
+ public TransactionManager getTransactionManager() throws Exception
+ {
+ log.warn("Using a deprecated/unsupported transaction manager!");
return BatchModeTransactionManager.getInstance();
}
}
Modified: core/trunk/src/main/resources/jbosscache-config-3.0.xsd
===================================================================
--- core/trunk/src/main/resources/jbosscache-config-3.0.xsd 2008-08-07 16:04:06 UTC (rev 6539)
+++ core/trunk/src/main/resources/jbosscache-config-3.0.xsd 2008-08-07 16:45:39 UTC (rev 6540)
@@ -15,10 +15,11 @@
<xs:element name="invalidation" type="invalidationType" minOccurs="0" maxOccurs="1"/>
<xs:element name="jmxStatistics" type="jmxStatisticsType" minOccurs="0" maxOccurs="1"/>
<xs:element name="listeners" type="listenersType" minOccurs="0" maxOccurs="1"/>
+ <xs:element name="invocationBatching" type="invocationBatchingType" minOccurs="0" maxOccurs="1"/>
<xs:element name="transport" type="transportType" minOccurs="0" maxOccurs="1"/>
<xs:element name="eviction" type="evictionType" minOccurs="0" maxOccurs="1"/>
<xs:element name="loaders" type="loadersType" minOccurs="0" maxOccurs="1"/>
- <xs:element name="customInterceptors" type="customInterceptorsType" minOccurs="0"/>
+ <xs:element name="customInterceptors" type="customInterceptorsType" minOccurs="0" maxOccurs="1"/>
</xs:all>
</xs:complexType>
</xs:element>
@@ -138,6 +139,10 @@
<xs:attribute name="asyncPoolSize" type="positiveInteger"/>
</xs:complexType>
+ <xs:complexType name="invocationBatchingType">
+ <xs:attribute name="enabled" type="booleanType"/>
+ </xs:complexType>
+
<xs:complexType name="transportType">
<xs:sequence>
<xs:element name="jgroupsConfig" type="xs:anyType" minOccurs="0" maxOccurs="1"/>
Added: core/trunk/src/test/java/org/jboss/cache/api/batch/AbstractBatchTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/api/batch/AbstractBatchTest.java (rev 0)
+++ core/trunk/src/test/java/org/jboss/cache/api/batch/AbstractBatchTest.java 2008-08-07 16:45:39 UTC (rev 6540)
@@ -0,0 +1,24 @@
+package org.jboss.cache.api.batch;
+
+import org.jboss.cache.Cache;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+public abstract class AbstractBatchTest
+{
+ protected String getOnDifferentThread(final Cache<String, String> cache, final String fqn, final String key) throws InterruptedException
+ {
+ final AtomicReference<String> ref = new AtomicReference<String>();
+ Thread t = new Thread()
+ {
+ public void run()
+ {
+ ref.set(cache.get(fqn, key));
+ }
+ };
+
+ t.start();
+ t.join();
+ return ref.get();
+ }
+}
Added: core/trunk/src/test/java/org/jboss/cache/api/batch/BatchWithTM.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/api/batch/BatchWithTM.java (rev 0)
+++ core/trunk/src/test/java/org/jboss/cache/api/batch/BatchWithTM.java 2008-08-07 16:45:39 UTC (rev 6540)
@@ -0,0 +1,124 @@
+package org.jboss.cache.api.batch;
+
+import org.jboss.cache.Cache;
+import org.jboss.cache.CacheFactory;
+import org.jboss.cache.DefaultCacheFactory;
+import org.jboss.cache.config.Configuration;
+import org.jboss.cache.config.Configuration.CacheMode;
+import org.jboss.cache.config.Configuration.NodeLockingScheme;
+import org.jboss.cache.factories.UnitTestCacheConfigurationFactory;
+import org.jboss.cache.util.TestingUtil;
+import org.testng.annotations.Test;
+
+import javax.transaction.TransactionManager;
+
+ at Test(groups = {"functional", "transaction"})
+public class BatchWithTM extends AbstractBatchTest
+{
+ public void testBatchWithOngoingTM() throws Exception
+ {
+ Cache<String, String> cache = null;
+ try
+ {
+ cache = createCache();
+ TransactionManager tm = getTransactionManager(cache);
+ tm.begin();
+ cache.put("/a/b/c", "k", "v");
+ cache.startBatch();
+ cache.put("/a/b/c", "k2", "v2");
+ tm.commit();
+
+ assert "v".equals(cache.get("/a/b/c", "k"));
+ assert "v2".equals(cache.get("/a/b/c", "k2"));
+
+ cache.endBatch(false); // should be a no op
+ assert "v".equals(cache.get("/a/b/c", "k"));
+ assert "v2".equals(cache.get("/a/b/c", "k2"));
+ }
+ finally
+ {
+ TestingUtil.killCaches(cache);
+ }
+ }
+
+ public void testBatchWithoutOngoingTMSuspension() throws Exception
+ {
+ Cache<String, String> cache = null;
+ try
+ {
+ cache = createCache();
+ TransactionManager tm = getTransactionManager(cache);
+ assert tm.getTransaction() == null : "Should have no ongoing txs";
+ cache.startBatch();
+ cache.put("/a/b/c", "k", "v");
+ assert tm.getTransaction() == null : "Should have no ongoing txs";
+ cache.put("/a/b/c", "k2", "v2");
+
+ assert getOnDifferentThread(cache, "/a/b/c", "k") == null;
+ assert getOnDifferentThread(cache, "/a/b/c", "k2") == null;
+
+ try
+ {
+ tm.commit(); // should have no effect
+ }
+ catch (Exception e)
+ {
+ // the TM may barf here ... this is OK.
+ }
+
+ assert tm.getTransaction() == null : "Should have no ongoing txs";
+
+ assert getOnDifferentThread(cache, "/a/b/c", "k") == null;
+ assert getOnDifferentThread(cache, "/a/b/c", "k2") == null;
+
+ cache.endBatch(true); // should be a no op
+
+ assert "v".equals(getOnDifferentThread(cache, "/a/b/c", "k"));
+ assert "v2".equals(getOnDifferentThread(cache, "/a/b/c", "k2"));
+ }
+ finally
+ {
+ TestingUtil.killCaches(cache);
+ }
+ }
+
+ public void testBatchRollback() throws Exception
+ {
+ Cache<String, String> cache = null;
+ try
+ {
+ cache = createCache();
+ TransactionManager tm = getTransactionManager(cache);
+ cache.startBatch();
+ cache.put("/a/b/c", "k", "v");
+ cache.put("/a/b/c", "k2", "v2");
+
+ assert getOnDifferentThread(cache, "/a/b/c", "k") == null;
+ assert getOnDifferentThread(cache, "/a/b/c", "k2") == null;
+
+ cache.endBatch(false);
+
+ assert getOnDifferentThread(cache, "/a/b/c", "k") == null;
+ assert getOnDifferentThread(cache, "/a/b/c", "k2") == null;
+ }
+ finally
+ {
+ TestingUtil.killCaches(cache);
+ }
+ }
+
+ private TransactionManager getTransactionManager(Cache<String, String> c)
+ {
+ return c.getConfiguration().getRuntimeConfig().getTransactionManager();
+ }
+
+ private Cache<String, String> createCache()
+ {
+ CacheFactory<String, String> cf = new DefaultCacheFactory<String, String>();
+ Configuration c = UnitTestCacheConfigurationFactory.createConfiguration(CacheMode.LOCAL); // this should pick up any configured TM for the test
+ c.setNodeLockingScheme(NodeLockingScheme.MVCC);
+ c.setInvocationBatchingEnabled(true);
+ assert c.getTransactionManagerLookupClass() != null : "Should have a transaction manager lookup class attached!!";
+ return cf.createCache(c);
+ }
+}
Added: core/trunk/src/test/java/org/jboss/cache/api/batch/BatchWithoutTM.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/api/batch/BatchWithoutTM.java (rev 0)
+++ core/trunk/src/test/java/org/jboss/cache/api/batch/BatchWithoutTM.java 2008-08-07 16:45:39 UTC (rev 6540)
@@ -0,0 +1,146 @@
+package org.jboss.cache.api.batch;
+
+import org.jboss.cache.Cache;
+import org.jboss.cache.CacheFactory;
+import org.jboss.cache.DefaultCacheFactory;
+import org.jboss.cache.config.Configuration;
+import org.jboss.cache.config.Configuration.NodeLockingScheme;
+import org.jboss.cache.config.ConfigurationException;
+import org.jboss.cache.util.TestingUtil;
+import org.testng.annotations.Test;
+
+ at Test(groups = "functional")
+public class BatchWithoutTM extends AbstractBatchTest
+{
+ public void testBatchWithoutCfg()
+ {
+ Cache<String, String> cache = null;
+ try
+ {
+ cache = createCache(false);
+ try
+ {
+ cache.startBatch();
+ assert false : "Should have failed";
+ }
+ catch (ConfigurationException good)
+ {
+ // do nothing
+ }
+
+ try
+ {
+ cache.endBatch(true);
+ assert false : "Should have failed";
+ }
+ catch (ConfigurationException good)
+ {
+ // do nothing
+ }
+
+ try
+ {
+ cache.endBatch(false);
+ assert false : "Should have failed";
+ }
+ catch (ConfigurationException good)
+ {
+ // do nothing
+ }
+ }
+ finally
+ {
+ TestingUtil.killCaches(cache);
+ }
+ }
+
+ public void testEndBatchWithoutStartBatch()
+ {
+ Cache<String, String> cache = null;
+ try
+ {
+ cache = createCache(true);
+ cache.endBatch(true);
+ cache.endBatch(false);
+ // should not fail.
+ }
+ finally
+ {
+ TestingUtil.killCaches(cache);
+ }
+ }
+
+ public void testStartBatchIdempotency()
+ {
+ Cache<String, String> cache = null;
+ try
+ {
+ cache = createCache(true);
+ cache.startBatch();
+ cache.put("/a/b/c", "k", "v");
+ cache.startBatch(); // again
+ cache.put("/a/b/c", "k2", "v2");
+ cache.endBatch(true);
+
+ assert "v".equals(cache.get("/a/b/c", "k"));
+ assert "v2".equals(cache.get("/a/b/c", "k2"));
+ }
+ finally
+ {
+ TestingUtil.killCaches(cache);
+ }
+ }
+
+ public void testBatchVisibility() throws InterruptedException
+ {
+ Cache<String, String> cache = null;
+ try
+ {
+ cache = createCache(true);
+ cache.startBatch();
+ cache.put("/a/b/c", "k", "v");
+ assert getOnDifferentThread(cache, "/a/b/c", "k") == null : "Other thread should not see batch update till batch completes!";
+
+ cache.endBatch(true);
+
+ assert "v".equals(getOnDifferentThread(cache, "/a/b/c", "k"));
+ }
+ finally
+ {
+ TestingUtil.killCaches(cache);
+ }
+ }
+
+ public void testBatchRollback() throws Exception
+ {
+ Cache<String, String> cache = null;
+ try
+ {
+ cache = createCache(true);
+ cache.startBatch();
+ cache.put("/a/b/c", "k", "v");
+ cache.put("/a/b/c", "k2", "v2");
+
+ assert getOnDifferentThread(cache, "/a/b/c", "k") == null;
+ assert getOnDifferentThread(cache, "/a/b/c", "k2") == null;
+
+ cache.endBatch(false);
+
+ assert getOnDifferentThread(cache, "/a/b/c", "k") == null;
+ assert getOnDifferentThread(cache, "/a/b/c", "k2") == null;
+ }
+ finally
+ {
+ TestingUtil.killCaches(cache);
+ }
+ }
+
+ private Cache<String, String> createCache(boolean enableBatch)
+ {
+ CacheFactory<String, String> cf = new DefaultCacheFactory<String, String>();
+ Configuration c = new Configuration();
+ c.setNodeLockingScheme(NodeLockingScheme.MVCC);
+ c.setInvocationBatchingEnabled(enableBatch);
+ return cf.createCache(c);
+ }
+}
Modified: core/trunk/src/test/java/org/jboss/cache/config/parsing/XmlConfigurationParserTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/config/parsing/XmlConfigurationParserTest.java 2008-08-07 16:04:06 UTC (rev 6539)
+++ core/trunk/src/test/java/org/jboss/cache/config/parsing/XmlConfigurationParserTest.java 2008-08-07 16:45:39 UTC (rev 6540)
@@ -269,4 +269,9 @@
{
assert config.getListenerAsyncPoolSize() == 5;
}
+
+ public void testInvocationBatching()
+ {
+ assert config.isInvocationBatchingEnabled();
+ }
}
Modified: core/trunk/src/test/java/org/jboss/cache/factories/InterceptorChainFactoryTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/factories/InterceptorChainFactoryTest.java 2008-08-07 16:04:06 UTC (rev 6539)
+++ core/trunk/src/test/java/org/jboss/cache/factories/InterceptorChainFactoryTest.java 2008-08-07 16:45:39 UTC (rev 6540)
@@ -67,7 +67,29 @@
assertInterceptorLinkage(list);
}
+ public void testBatchingConfig() throws Exception
+ {
+ cache.getConfiguration().setExposeManagementStatistics(false);
+ cache.getConfiguration().setInvocationBatchingEnabled(true);
+ InterceptorChain chain = getInterceptorChainFactory(cache).buildInterceptorChain();
+ List<CommandInterceptor> list = chain.asList();
+ Iterator<CommandInterceptor> interceptors = list.iterator();
+ System.out.println("testBareConfig interceptors are:\n" + list);
+ assertNotNull(list);
+ assertEquals(6, list.size());
+
+ assertEquals(BatchingInterceptor.class, interceptors.next().getClass());
+ assertEquals(InvocationContextInterceptor.class, interceptors.next().getClass());
+ assertEquals(TxInterceptor.class, interceptors.next().getClass());
+ assertEquals(NotificationInterceptor.class, interceptors.next().getClass());
+ assertEquals(PessimisticLockInterceptor.class, interceptors.next().getClass());
+ assertEquals(CallInterceptor.class, interceptors.next().getClass());
+
+ assertInterceptorLinkage(list);
+ }
+
+
public void testMvccConfig() throws Exception
{
cache.getConfiguration().setExposeManagementStatistics(false);
Modified: core/trunk/src/test/resources/configs/parser-test.xml
===================================================================
--- core/trunk/src/test/resources/configs/parser-test.xml 2008-08-07 16:04:06 UTC (rev 6539)
+++ core/trunk/src/test/resources/configs/parser-test.xml 2008-08-07 16:45:39 UTC (rev 6540)
@@ -113,4 +113,5 @@
<!-- the number of threads to use for asynchronous cache listeners - defaults to 1 -->
<listeners asyncPoolSize="5"/>
+ <invocationBatching enabled="true"/>
</jbosscache>
More information about the jbosscache-commits
mailing list