Author: manik.surtani(a)jboss.com
Date: 2008-04-24 06:38:05 -0400 (Thu, 24 Apr 2008)
New Revision: 5657
Modified:
core/trunk/src/main/java/org/jboss/cache/commands/remote/ReplicateCommand.java
core/trunk/src/main/java/org/jboss/cache/commands/tx/BaseGlobalTransactionCommand.java
core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/CallInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java
core/trunk/src/main/java/org/jboss/cache/transaction/TransactionEntry.java
Log:
Fixed stuff
Modified: core/trunk/src/main/java/org/jboss/cache/commands/remote/ReplicateCommand.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/commands/remote/ReplicateCommand.java 2008-04-24
10:36:20 UTC (rev 5656)
+++
core/trunk/src/main/java/org/jboss/cache/commands/remote/ReplicateCommand.java 2008-04-24
10:38:05 UTC (rev 5657)
@@ -200,7 +200,7 @@
public String toString()
{
return "ReplicateCommand{" +
- "modifications=" + (isSingleCommand() ? singleModification :
modifications) +
+ "cmds=" + (isSingleCommand() ? singleModification : modifications)
+
'}';
}
}
\ No newline at end of file
Modified:
core/trunk/src/main/java/org/jboss/cache/commands/tx/BaseGlobalTransactionCommand.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/commands/tx/BaseGlobalTransactionCommand.java 2008-04-24
10:36:20 UTC (rev 5656)
+++
core/trunk/src/main/java/org/jboss/cache/commands/tx/BaseGlobalTransactionCommand.java 2008-04-24
10:38:05 UTC (rev 5657)
@@ -69,7 +69,7 @@
public String toString()
{
return getClass().getSimpleName() + "{" +
- "globalTransaction=" + globalTransaction +
+ "gtx=" + globalTransaction +
'}';
}
}
Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java 2008-04-24
10:36:20 UTC (rev 5656)
+++
core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java 2008-04-24
10:38:05 UTC (rev 5657)
@@ -168,17 +168,17 @@
}
/**
- * It does not make sense replicating a transaction method(commit, rollback, prepare)
if one of the following:
+ * It does not make sense replicating a transaction method(commit, rollback, prepare)
if one of the following is true:
* <pre>
* - call was not initiated here, but on other member of the cluster
- * - there is no transaction. Why calling a commit if no transaction going on?
- * - the current transaction did not modufy any data, so other members are not
aware of it
+ * - there is no transaction. Why broadcast a commit or rollback if there is no
transaction going on?
+ * - the current transaction did not modify any data
* </pre>
*/
protected boolean skipReplicationOfTransactionMethod(InvocationContext ctx)
{
GlobalTransaction gtx = ctx.getGlobalTransaction();
- return gtx == null || gtx.isRemote() ||
ctx.getOptionOverrides().isCacheModeLocal();
+ return ctx.getTransaction() == null || gtx == null || gtx.isRemote() ||
ctx.getOptionOverrides().isCacheModeLocal() || !ctx.isTxHasMods();
}
/**
Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/CallInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/CallInterceptor.java 2008-04-24
10:36:20 UTC (rev 5656)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/CallInterceptor.java 2008-04-24
10:38:05 UTC (rev 5657)
@@ -6,6 +6,7 @@
import org.jboss.cache.commands.cachedata.PutDataMapCommand;
import org.jboss.cache.commands.cachedata.PutKeyValueCommand;
import org.jboss.cache.commands.cachedata.RemoveDataCommand;
+import org.jboss.cache.commands.cachedata.RemoveKeyCommand;
import org.jboss.cache.commands.cachedata.RemoveNodeCommand;
import org.jboss.cache.commands.functional.TxCacheCommand;
import org.jboss.cache.commands.tx.CommitCommand;
@@ -137,6 +138,12 @@
}
@Override
+ public Object handleRemoveKeyCommand(InvocationContext ctx, RemoveKeyCommand command)
throws Throwable
+ {
+ return handleAlterCacheMethod(ctx, command);
+ }
+
+ @Override
public Object handleMoveCommand(InvocationContext ctx, MoveCommand command) throws
Throwable
{
return handleAlterCacheMethod(ctx, command);
Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java 2008-04-24
10:36:20 UTC (rev 5656)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java 2008-04-24
10:38:05 UTC (rev 5657)
@@ -12,6 +12,7 @@
import org.jboss.cache.ReplicationException;
import org.jboss.cache.commands.CacheCommand;
import org.jboss.cache.commands.CommandsFactory;
+import org.jboss.cache.commands.cachedata.CreateNodeCommand;
import org.jboss.cache.commands.cachedata.InvalidateCommand;
import org.jboss.cache.commands.channel.BlockChannelCommand;
import org.jboss.cache.commands.channel.UnblockChannelCommand;
@@ -41,7 +42,9 @@
import javax.transaction.Synchronization;
import javax.transaction.SystemException;
import javax.transaction.Transaction;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -846,13 +849,32 @@
}
/**
+ * An optimisation of commands to be broadcast in a prepare call, which involves
removing of unnecessary commands
+ * as well as the "aggregation" of commands that can be aggregated.
+ *
+ * @param mods list of modifications
+ * @return compacted list of modifications
+ */
+ private List<TxCacheCommand> compact(List<TxCacheCommand> mods)
+ {
+ // TODO: Make this more sophisticated, so it aggregates multiple puts on the same
node, puts followed by a remove, etc.
+ // for now this just removes the redundant CreateNodeCommands from the list.
+ List<TxCacheCommand> newList = new LinkedList<TxCacheCommand>();
+ for (TxCacheCommand cmd : mods)
+ {
+ if (!(cmd instanceof CreateNodeCommand)) newList.add(cmd);
+ }
+ return newList;
+ }
+
+ /**
* Handles a local prepare - invoked by the sync handler. Tests if the current tx
matches the gtx passed in to the
* method call and passes the prepare() call up the chain.
*
* @return
* @throws Throwable
*/
- public Object runPreparePhase(InvocationContext ctx, GlobalTransaction gtx, List
modifications) throws Throwable
+ public Object runPreparePhase(InvocationContext ctx, GlobalTransaction gtx,
List<TxCacheCommand> modifications) throws Throwable
{
// build the method call
CacheCommand prepareCommand;
@@ -888,6 +910,8 @@
if (txManager.getTransaction() != null && ltx != null &&
txManager.getTransaction().equals(ltx))
{
ctx.setExecutingCommand(prepareCommand);
+ // set the hasMods flag in the invocation ctx. This should not be replicated,
just used locally by the interceptors.
+ ctx.setTxHasMods(modifications != null && modifications.size() > 0);
result = invokeNextInterceptor(ctx, prepareCommand);
}
else
@@ -1029,12 +1053,10 @@
{
Transaction tx = null;
GlobalTransaction gtx = null;
- // CacheSPI cache = null;
- List modifications = null;
+ List<TxCacheCommand> modifications = null;
TransactionEntry entry = null;
protected InvocationContext ctx; // the context for this call.
-
RemoteSynchronizationHandler(GlobalTransaction gtx, Transaction tx)
{
this.gtx = gtx;
@@ -1099,7 +1121,8 @@
// set any transaction wide options as current for this thread.
if (entry != null)
{
- modifications = entry.getModifications();
+ // this should ideally be set in beforeCompletion(), after compacting the
list.
+ if (modifications == null) modifications = entry.getModifications();
cacheLoaderModifications = entry.getCacheLoaderModifications();
ctx.setOptionOverrides(entry.getOption());
}
@@ -1208,9 +1231,10 @@
// fetch the modifications before the transaction is committed
// (and thus removed from the txTable)
setTransactionalContext(tx, gtx, ctx);
- if (!entry.existModifications())
+ if (!entry.hasModifications())
{
if (trace) log.trace("No modifications in this tx. Skipping
beforeCompletion()");
+ modifications = Collections.emptyList();
return;
}
@@ -1227,6 +1251,7 @@
case Status.STATUS_ACTIVE:
case Status.STATUS_PREPARING:
// run a prepare call.
+ modifications = compact(modifications);
Object result = runPreparePhase(ctx, gtx, modifications);
if (result instanceof Throwable)
Modified: core/trunk/src/main/java/org/jboss/cache/transaction/TransactionEntry.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/transaction/TransactionEntry.java 2008-04-24
10:36:20 UTC (rev 5656)
+++ core/trunk/src/main/java/org/jboss/cache/transaction/TransactionEntry.java 2008-04-24
10:38:05 UTC (rev 5657)
@@ -390,7 +390,7 @@
/**
* Returns true if modifications were registered to either modificationList or to
class loader modifications list.
*/
- public boolean existModifications()
+ public boolean hasModifications()
{
return !modificationList.isEmpty() || !classLoadeModList.isEmpty();
}