Author: manik.surtani(a)jboss.com
Date: 2009-03-09 05:05:21 -0400 (Mon, 09 Mar 2009)
New Revision: 7890
Modified:
core/branches/flat/src/main/java/org/horizon/commands/tx/PrepareCommand.java
core/branches/flat/src/main/java/org/horizon/factories/DefaultCacheFactory.java
core/branches/flat/src/main/java/org/horizon/interceptors/InvalidationInterceptor.java
core/branches/flat/src/main/java/org/horizon/interceptors/ReplicationInterceptor.java
core/branches/flat/src/main/java/org/horizon/manager/DefaultCacheManager.java
core/branches/flat/src/main/java/org/horizon/marshall/HorizonMarshaller.java
core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/CommandAwareRpcDispatcher.java
core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/JGroupsTransport.java
core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferManagerImpl.java
core/branches/flat/src/main/java/org/horizon/transaction/TransactionLog.java
Log:
More performance improvements
Modified: core/branches/flat/src/main/java/org/horizon/commands/tx/PrepareCommand.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/commands/tx/PrepareCommand.java 2009-03-09
09:03:53 UTC (rev 7889)
+++
core/branches/flat/src/main/java/org/horizon/commands/tx/PrepareCommand.java 2009-03-09
09:05:21 UTC (rev 7890)
@@ -28,9 +28,11 @@
import org.horizon.remoting.transport.Address;
import org.horizon.transaction.GlobalTransaction;
-import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
/**
* // TODO: MANIK: Document this
@@ -41,19 +43,39 @@
public class PrepareCommand extends AbstractTransactionBoundaryCommand {
public static final byte METHOD_ID = 10;
- protected List<WriteCommand> modifications;
+ protected WriteCommand[] modifications;
protected Address localAddress;
protected boolean onePhaseCommit;
- public PrepareCommand(GlobalTransaction gtx, List<WriteCommand> modifications,
Address localAddress, boolean onePhaseCommit) {
+ public PrepareCommand(GlobalTransaction gtx, Address localAddress, boolean
onePhaseCommit, WriteCommand... modifications) {
this.gtx = gtx;
this.modifications = modifications;
this.localAddress = localAddress;
this.onePhaseCommit = onePhaseCommit;
}
+ public PrepareCommand(GlobalTransaction gtx, List<WriteCommand> commands,
Address localAddress, boolean onePhaseCommit) {
+ this.gtx = gtx;
+ this.modifications = commands == null || commands.size() == 0 ? null :
commands.toArray(new WriteCommand[commands.size()]);
+ this.localAddress = localAddress;
+ this.onePhaseCommit = onePhaseCommit;
+ }
+
public void removeModifications(Collection<WriteCommand> modificationsToRemove)
{
- if (modifications != null) modifications.removeAll(modificationsToRemove);
+ if (modifications != null && modificationsToRemove != null &&
modificationsToRemove.size() > 0) {
+ // defensive copy
+ Set<WriteCommand> toRemove = new
HashSet<WriteCommand>(modificationsToRemove);
+ WriteCommand[] newMods = new WriteCommand[modifications.length -
modificationsToRemove.size()];
+ int i = 0;
+ for (WriteCommand c : modifications) {
+ if (toRemove.contains(c)) {
+ toRemove.remove(c);
+ } else {
+ newMods[i++] = c;
+ }
+ }
+ modifications = newMods;
+ }
}
public PrepareCommand() {
@@ -63,7 +85,7 @@
return visitor.visitPrepareCommand(ctx, this);
}
- public List<WriteCommand> getModifications() {
+ public WriteCommand[] getModifications() {
return modifications;
}
@@ -76,11 +98,11 @@
}
public boolean existModifications() {
- return modifications != null && modifications.size() > 0;
+ return modifications != null && modifications.length > 0;
}
public int getModificationsCount() {
- return modifications != null ? modifications.size() : 0;
+ return modifications != null ? modifications.length : 0;
}
public byte getCommandId() {
@@ -89,16 +111,27 @@
@Override
public Object[] getParameters() {
- return new Object[]{gtx, modifications, localAddress, onePhaseCommit};
+ int numMods = modifications == null ? 0 : modifications.length;
+ Object[] retval = new Object[numMods + 4];
+ retval[0] = gtx;
+ retval[1] = localAddress;
+ retval[2] = onePhaseCommit;
+ retval[3] = numMods;
+ if (numMods > 0) System.arraycopy(modifications, 0, retval, 4, numMods);
+ return retval;
}
@Override
@SuppressWarnings("unchecked")
public void setParameters(int commandId, Object[] args) {
gtx = (GlobalTransaction) args[0];
- modifications = (List<WriteCommand>) args[1];
- localAddress = (Address) args[2];
- onePhaseCommit = (Boolean) args[3];
+ localAddress = (Address) args[1];
+ onePhaseCommit = (Boolean) args[2];
+ int numMods = (Integer) args[3];
+ if (numMods > 0) {
+ modifications = new WriteCommand[numMods];
+ System.arraycopy(args, 4, modifications, 0, numMods);
+ }
}
@Override
@@ -129,7 +162,7 @@
PrepareCommand copy = new PrepareCommand();
copy.gtx = gtx;
copy.localAddress = localAddress;
- copy.modifications = modifications == null ? null : new
ArrayList<WriteCommand>(modifications);
+ copy.modifications = modifications == null ? null : modifications.clone();
copy.onePhaseCommit = onePhaseCommit;
return copy;
}
@@ -138,7 +171,7 @@
public String toString() {
return "PrepareCommand{" +
"globalTransaction=" + gtx +
- ", modifications=" + modifications +
+ ", modifications=" + Arrays.toString(modifications) +
", localAddress=" + localAddress +
", onePhaseCommit=" + onePhaseCommit +
'}';
Modified: core/branches/flat/src/main/java/org/horizon/factories/DefaultCacheFactory.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/factories/DefaultCacheFactory.java 2009-03-09
09:03:53 UTC (rev 7889)
+++
core/branches/flat/src/main/java/org/horizon/factories/DefaultCacheFactory.java 2009-03-09
09:05:21 UTC (rev 7890)
@@ -54,9 +54,7 @@
*/
public Cache<K, V> createCache(Configuration configuration,
GlobalComponentRegistry globalComponentRegistry, String cacheName) throws
ConfigurationException {
try {
- AdvancedCache<K, V> cache = createAndWire(configuration,
globalComponentRegistry, cacheName);
- cache.start();
- return cache;
+ return createAndWire(configuration, globalComponentRegistry, cacheName);
}
catch (ConfigurationException ce) {
throw ce;
Modified:
core/branches/flat/src/main/java/org/horizon/interceptors/InvalidationInterceptor.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/interceptors/InvalidationInterceptor.java 2009-03-09
09:03:53 UTC (rev 7889)
+++
core/branches/flat/src/main/java/org/horizon/interceptors/InvalidationInterceptor.java 2009-03-09
09:05:21 UTC (rev 7890)
@@ -45,7 +45,7 @@
import javax.transaction.SystemException;
import javax.transaction.Transaction;
-import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -125,10 +125,10 @@
if (transactionContext.hasModifications()) {
List<WriteCommand> mods;
if (transactionContext.hasLocalModifications()) {
- mods = new ArrayList<WriteCommand>(command.getModifications());
+ mods = Arrays.asList(command.getModifications());
mods.removeAll(transactionContext.getLocalModifications());
} else {
- mods = command.getModifications();
+ mods = Arrays.asList(command.getModifications());
}
broadcastInvalidate(mods, tx, ctx);
} else {
@@ -254,4 +254,4 @@
public long getInvalidations() {
return invalidations.get();
}
-}
\ No newline at end of file
+}
Modified:
core/branches/flat/src/main/java/org/horizon/interceptors/ReplicationInterceptor.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/interceptors/ReplicationInterceptor.java 2009-03-09
09:03:53 UTC (rev 7889)
+++
core/branches/flat/src/main/java/org/horizon/interceptors/ReplicationInterceptor.java 2009-03-09
09:05:21 UTC (rev 7890)
@@ -57,7 +57,7 @@
Object retVal = invokeNextInterceptor(ctx, command);
TransactionContext transactionContext = ctx.getTransactionContext();
if (transactionContext.hasLocalModifications()) {
- PrepareCommand replicablePrepareCommand = command.copy(); // makre sure we
remove any "local" transactions
+ PrepareCommand replicablePrepareCommand = command.copy(); // make sure we remove
any "local" transactions
replicablePrepareCommand.removeModifications(transactionContext.getLocalModifications());
command = replicablePrepareCommand;
}
@@ -144,4 +144,4 @@
// this method will return immediately if we're the only member (because
exclude_self=true)
replicateCall(ctx, prepareMethod, !async);
}
-}
\ No newline at end of file
+}
Modified: core/branches/flat/src/main/java/org/horizon/manager/DefaultCacheManager.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/manager/DefaultCacheManager.java 2009-03-09
09:03:53 UTC (rev 7889)
+++
core/branches/flat/src/main/java/org/horizon/manager/DefaultCacheManager.java 2009-03-09
09:05:21 UTC (rev 7890)
@@ -374,4 +374,9 @@
public ComponentStatus getStatus() {
return globalComponentRegistry.getStatus();
}
+
+ @Override
+ public String toString() {
+ return super.toString() + "@Address:" + getAddress();
+ }
}
Modified: core/branches/flat/src/main/java/org/horizon/marshall/HorizonMarshaller.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/marshall/HorizonMarshaller.java 2009-03-09
09:03:53 UTC (rev 7889)
+++
core/branches/flat/src/main/java/org/horizon/marshall/HorizonMarshaller.java 2009-03-09
09:05:21 UTC (rev 7890)
@@ -196,7 +196,10 @@
out.writeByte(MAGICNUMBER_TRANSACTION_LOG);
TransactionLog.LogEntry le = (TransactionLog.LogEntry) o;
marshallObject(le.getTransaction(), out, refMap);
- marshallObject(le.getModifications(), out, refMap);
+ WriteCommand[] cmds = le.getModifications();
+ writeUnsignedInt(out, cmds.length);
+ for (WriteCommand c : cmds)
+ marshallObject(c, out, refMap);
} else if (o instanceof Serializable) {
if (trace) log.trace("WARNING: using object serialization for
[{0}]", o.getClass());
@@ -313,8 +316,11 @@
retVal = unmarshallJGroupsAddress(in);
return retVal;
case MAGICNUMBER_TRANSACTION_LOG:
- retVal = new TransactionLog.LogEntry((GlobalTransaction) unmarshallObject(in,
refMap), (List<WriteCommand>) unmarshallObject(in, refMap));
- return retVal;
+ GlobalTransaction gtx = (GlobalTransaction) unmarshallObject(in, refMap);
+ int numCommands = readUnsignedInt(in);
+ WriteCommand[] cmds = new WriteCommand[numCommands];
+ for (int i = 0; i < numCommands; i++) cmds[i] = (WriteCommand)
unmarshallObject(in, refMap);
+ return new TransactionLog.LogEntry(gtx, cmds);
case MAGICNUMBER_ARRAY:
return unmarshallArray(in, refMap);
case MAGICNUMBER_ARRAY_LIST:
@@ -783,4 +789,4 @@
public Object objectFromByteBuffer(byte[] bytes) throws IOException,
ClassNotFoundException {
return objectFromByteBuffer(bytes, 0, bytes.length);
}
-}
\ No newline at end of file
+}
Modified:
core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/CommandAwareRpcDispatcher.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/CommandAwareRpcDispatcher.java 2009-03-09
09:03:53 UTC (rev 7889)
+++
core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/CommandAwareRpcDispatcher.java 2009-03-09
09:05:21 UTC (rev 7890)
@@ -24,6 +24,7 @@
import org.horizon.CacheException;
import org.horizon.commands.RPCCommand;
import org.horizon.commands.ReplicableCommand;
+import org.horizon.commands.remote.ClusteredGetCommand;
import org.horizon.logging.Log;
import org.horizon.logging.LogFactory;
import org.horizon.remoting.InboundInvocationHandler;
@@ -166,10 +167,16 @@
if (replayIgnored) {
ExtendedResponse extended = new ExtendedResponse(retval);
extended.setReplayIgnoredRequests(true);
- retval = extended;
+ return extended;
+ } else {
+
+ // Do we really need a response?!? The caller would only ever expect a
response for certain types of
+ // commands, such as a ClusteredGet
+ if (cmd.isSingleCommand() && cmd.getSingleCommand() instanceof
ClusteredGetCommand)
+ return retval;
+ else
+ return null; // saves on serializing a response!
}
- return retval;
-
} finally {
if (unlock) distributedSync.releaseProcessingLock();
}
@@ -254,4 +261,4 @@
return retval;
}
}
-}
\ No newline at end of file
+}
Modified:
core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/JGroupsTransport.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/JGroupsTransport.java 2009-03-09
09:03:53 UTC (rev 7889)
+++
core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/JGroupsTransport.java 2009-03-09
09:05:21 UTC (rev 7890)
@@ -307,7 +307,7 @@
}
public Address getAddress() {
- if (address == null) {
+ if (address == null && channel != null) {
address = new JGroupsAddress(channel.getLocalAddress());
}
return address;
Modified:
core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferManagerImpl.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferManagerImpl.java 2009-03-09
09:03:53 UTC (rev 7889)
+++
core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferManagerImpl.java 2009-03-09
09:05:21 UTC (rev 7890)
@@ -53,7 +53,7 @@
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.io.Serializable;
-import java.util.List;
+import java.util.Arrays;
import java.util.Set;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -195,8 +195,8 @@
private void processCommitLog(ObjectInputStream ois) throws Exception {
Object object = marshaller.objectFromObjectStream(ois);
while (object instanceof TransactionLog.LogEntry) {
- List<WriteCommand> mods = ((TransactionLog.LogEntry)
object).getModifications();
- if (trace) log.trace("Mods = {0}", mods);
+ WriteCommand[] mods = ((TransactionLog.LogEntry) object).getModifications();
+ if (trace) log.trace("Mods = {0}", Arrays.toString(mods));
for (WriteCommand mod : mods) {
InvocationContext ctx = invocationContextContainer.get();
ctx.setOriginLocal(false);
Modified: core/branches/flat/src/main/java/org/horizon/transaction/TransactionLog.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/transaction/TransactionLog.java 2009-03-09
09:03:53 UTC (rev 7889)
+++
core/branches/flat/src/main/java/org/horizon/transaction/TransactionLog.java 2009-03-09
09:05:21 UTC (rev 7890)
@@ -48,9 +48,9 @@
public static class LogEntry {
private final GlobalTransaction transaction;
- private final List<WriteCommand> modifications;
+ private final WriteCommand[] modifications;
- public LogEntry(GlobalTransaction transaction, List<WriteCommand>
modifications) {
+ public LogEntry(GlobalTransaction transaction, WriteCommand... modifications) {
this.transaction = transaction;
this.modifications = modifications;
}
@@ -59,7 +59,7 @@
return transaction;
}
- public List<WriteCommand> getModifications() {
+ public WriteCommand[] getModifications() {
return modifications;
}
}
@@ -75,20 +75,18 @@
// it is perfectly normal for a prepare not to be logged for this gtx, for example
if a transaction did not
// modify anything, then beforeCompletion() is not invoked and logPrepare() will
not be called to register the
// prepare.
- if (command != null) addEntry(new LogEntry(gtx, command.getModifications()));
+ if (command != null && isActive()) addEntry(gtx,
command.getModifications());
}
- private void addEntry(LogEntry entry) {
- if (!isActive())
- return;
-
- for (; ;) {
+ private void addEntry(GlobalTransaction gtx, WriteCommand... commands) {
+ LogEntry entry = new LogEntry(gtx, commands);
+ boolean success = false;
+ while (!success) {
try {
- if (log.isTraceEnabled())
- log.trace("Added commit entry to tx log" + entry);
+ if (log.isTraceEnabled()) log.trace("Added commit entry to tx log
{0}", entry);
entries.put(entry);
- break;
+ success = true;
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -99,16 +97,12 @@
public final void logOnePhaseCommit(GlobalTransaction gtx, List<WriteCommand>
modifications) {
// Just in case...
if (gtx != null) pendingPrepares.remove(gtx);
- if (!modifications.isEmpty()) addEntry(new LogEntry(gtx, modifications));
+ if (isActive() && modifications != null && modifications.size()
> 0)
+ addEntry(gtx, modifications.toArray(new WriteCommand[modifications.size()]));
}
public final void logNoTxWrite(WriteCommand write) {
- if (!isActive())
- return;
-
- ArrayList<WriteCommand> list = new ArrayList<WriteCommand>();
- list.add(write);
- addEntry(new LogEntry(null, list));
+ if (isActive()) addEntry(null, write);
}
public void rollback(GlobalTransaction gtx) {
@@ -130,7 +124,7 @@
entries.clear();
}
- public int size() {
+ public final int size() {
return entries.size();
}