[jbosscache-commits] JBoss Cache SVN: r7890 - in core/branches/flat/src/main/java/org/horizon: factories and 6 other directories.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Mon Mar 9 05:05:22 EDT 2009


Author: manik.surtani at jboss.com
Date: 2009-03-09 05:05:21 -0400 (Mon, 09 Mar 2009)
New Revision: 7890

Modified:
   core/branches/flat/src/main/java/org/horizon/commands/tx/PrepareCommand.java
   core/branches/flat/src/main/java/org/horizon/factories/DefaultCacheFactory.java
   core/branches/flat/src/main/java/org/horizon/interceptors/InvalidationInterceptor.java
   core/branches/flat/src/main/java/org/horizon/interceptors/ReplicationInterceptor.java
   core/branches/flat/src/main/java/org/horizon/manager/DefaultCacheManager.java
   core/branches/flat/src/main/java/org/horizon/marshall/HorizonMarshaller.java
   core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/CommandAwareRpcDispatcher.java
   core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/JGroupsTransport.java
   core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferManagerImpl.java
   core/branches/flat/src/main/java/org/horizon/transaction/TransactionLog.java
Log:
More performance improvements

Modified: core/branches/flat/src/main/java/org/horizon/commands/tx/PrepareCommand.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/commands/tx/PrepareCommand.java	2009-03-09 09:03:53 UTC (rev 7889)
+++ core/branches/flat/src/main/java/org/horizon/commands/tx/PrepareCommand.java	2009-03-09 09:05:21 UTC (rev 7890)
@@ -28,9 +28,11 @@
 import org.horizon.remoting.transport.Address;
 import org.horizon.transaction.GlobalTransaction;
 
-import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 /**
  * // TODO: MANIK: Document this
@@ -41,19 +43,39 @@
 public class PrepareCommand extends AbstractTransactionBoundaryCommand {
    public static final byte METHOD_ID = 10;
 
-   protected List<WriteCommand> modifications;
+   protected WriteCommand[] modifications;
    protected Address localAddress;
    protected boolean onePhaseCommit;
 
-   public PrepareCommand(GlobalTransaction gtx, List<WriteCommand> modifications, Address localAddress, boolean onePhaseCommit) {
+   public PrepareCommand(GlobalTransaction gtx, Address localAddress, boolean onePhaseCommit, WriteCommand... modifications) {
       this.gtx = gtx;
       this.modifications = modifications;
       this.localAddress = localAddress;
       this.onePhaseCommit = onePhaseCommit;
    }
 
+   public PrepareCommand(GlobalTransaction gtx, List<WriteCommand> commands, Address localAddress, boolean onePhaseCommit) {
+      this.gtx = gtx;
+      this.modifications = commands == null || commands.size() == 0 ? null : commands.toArray(new WriteCommand[commands.size()]);
+      this.localAddress = localAddress;
+      this.onePhaseCommit = onePhaseCommit;
+   }
+
    public void removeModifications(Collection<WriteCommand> modificationsToRemove) {
-      if (modifications != null) modifications.removeAll(modificationsToRemove);
+      if (modifications != null && modificationsToRemove != null && modificationsToRemove.size() > 0) {
+         // defensive copy
+         Set<WriteCommand> toRemove = new HashSet<WriteCommand>(modificationsToRemove);
+         WriteCommand[] newMods = new WriteCommand[modifications.length - modificationsToRemove.size()];
+         int i = 0;
+         for (WriteCommand c : modifications) {
+            if (toRemove.contains(c)) {
+               toRemove.remove(c);
+            } else {
+               newMods[i++] = c;
+            }
+         }
+         modifications = newMods;
+      }
    }
 
    public PrepareCommand() {
@@ -63,7 +85,7 @@
       return visitor.visitPrepareCommand(ctx, this);
    }
 
-   public List<WriteCommand> getModifications() {
+   public WriteCommand[] getModifications() {
       return modifications;
    }
 
@@ -76,11 +98,11 @@
    }
 
    public boolean existModifications() {
-      return modifications != null && modifications.size() > 0;
+      return modifications != null && modifications.length > 0;
    }
 
    public int getModificationsCount() {
-      return modifications != null ? modifications.size() : 0;
+      return modifications != null ? modifications.length : 0;
    }
 
    public byte getCommandId() {
@@ -89,16 +111,27 @@
 
    @Override
    public Object[] getParameters() {
-      return new Object[]{gtx, modifications, localAddress, onePhaseCommit};
+      int numMods = modifications == null ? 0 : modifications.length;
+      Object[] retval = new Object[numMods + 4];
+      retval[0] = gtx;
+      retval[1] = localAddress;
+      retval[2] = onePhaseCommit;
+      retval[3] = numMods;
+      if (numMods > 0) System.arraycopy(modifications, 0, retval, 4, numMods);
+      return retval;
    }
 
    @Override
    @SuppressWarnings("unchecked")
    public void setParameters(int commandId, Object[] args) {
       gtx = (GlobalTransaction) args[0];
-      modifications = (List<WriteCommand>) args[1];
-      localAddress = (Address) args[2];
-      onePhaseCommit = (Boolean) args[3];
+      localAddress = (Address) args[1];
+      onePhaseCommit = (Boolean) args[2];
+      int numMods = (Integer) args[3];
+      if (numMods > 0) {
+         modifications = new WriteCommand[numMods];
+         System.arraycopy(args, 4, modifications, 0, numMods);
+      }
    }
 
    @Override
@@ -129,7 +162,7 @@
       PrepareCommand copy = new PrepareCommand();
       copy.gtx = gtx;
       copy.localAddress = localAddress;
-      copy.modifications = modifications == null ? null : new ArrayList<WriteCommand>(modifications);
+      copy.modifications = modifications == null ? null : modifications.clone();
       copy.onePhaseCommit = onePhaseCommit;
       return copy;
    }
@@ -138,7 +171,7 @@
    public String toString() {
       return "PrepareCommand{" +
             "globalTransaction=" + gtx +
-            ", modifications=" + modifications +
+            ", modifications=" + Arrays.toString(modifications) +
             ", localAddress=" + localAddress +
             ", onePhaseCommit=" + onePhaseCommit +
             '}';

Modified: core/branches/flat/src/main/java/org/horizon/factories/DefaultCacheFactory.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/factories/DefaultCacheFactory.java	2009-03-09 09:03:53 UTC (rev 7889)
+++ core/branches/flat/src/main/java/org/horizon/factories/DefaultCacheFactory.java	2009-03-09 09:05:21 UTC (rev 7890)
@@ -54,9 +54,7 @@
     */
    public Cache<K, V> createCache(Configuration configuration, GlobalComponentRegistry globalComponentRegistry, String cacheName) throws ConfigurationException {
       try {
-         AdvancedCache<K, V> cache = createAndWire(configuration, globalComponentRegistry, cacheName);
-         cache.start();
-         return cache;
+         return createAndWire(configuration, globalComponentRegistry, cacheName);
       }
       catch (ConfigurationException ce) {
          throw ce;

Modified: core/branches/flat/src/main/java/org/horizon/interceptors/InvalidationInterceptor.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/interceptors/InvalidationInterceptor.java	2009-03-09 09:03:53 UTC (rev 7889)
+++ core/branches/flat/src/main/java/org/horizon/interceptors/InvalidationInterceptor.java	2009-03-09 09:05:21 UTC (rev 7890)
@@ -45,7 +45,7 @@
 
 import javax.transaction.SystemException;
 import javax.transaction.Transaction;
-import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -125,10 +125,10 @@
          if (transactionContext.hasModifications()) {
             List<WriteCommand> mods;
             if (transactionContext.hasLocalModifications()) {
-               mods = new ArrayList<WriteCommand>(command.getModifications());
+               mods = Arrays.asList(command.getModifications());
                mods.removeAll(transactionContext.getLocalModifications());
             } else {
-               mods = command.getModifications();
+               mods = Arrays.asList(command.getModifications());
             }
             broadcastInvalidate(mods, tx, ctx);
          } else {
@@ -254,4 +254,4 @@
    public long getInvalidations() {
       return invalidations.get();
    }
-}
\ No newline at end of file
+}

Modified: core/branches/flat/src/main/java/org/horizon/interceptors/ReplicationInterceptor.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/interceptors/ReplicationInterceptor.java	2009-03-09 09:03:53 UTC (rev 7889)
+++ core/branches/flat/src/main/java/org/horizon/interceptors/ReplicationInterceptor.java	2009-03-09 09:05:21 UTC (rev 7890)
@@ -57,7 +57,7 @@
       Object retVal = invokeNextInterceptor(ctx, command);
       TransactionContext transactionContext = ctx.getTransactionContext();
       if (transactionContext.hasLocalModifications()) {
-         PrepareCommand replicablePrepareCommand = command.copy(); // makre sure we remove any "local" transactions
+         PrepareCommand replicablePrepareCommand = command.copy(); // make sure we remove any "local" transactions
          replicablePrepareCommand.removeModifications(transactionContext.getLocalModifications());
          command = replicablePrepareCommand;
       }
@@ -144,4 +144,4 @@
       // this method will return immediately if we're the only member (because exclude_self=true)
       replicateCall(ctx, prepareMethod, !async);
    }
-}
\ No newline at end of file
+}

Modified: core/branches/flat/src/main/java/org/horizon/manager/DefaultCacheManager.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/manager/DefaultCacheManager.java	2009-03-09 09:03:53 UTC (rev 7889)
+++ core/branches/flat/src/main/java/org/horizon/manager/DefaultCacheManager.java	2009-03-09 09:05:21 UTC (rev 7890)
@@ -374,4 +374,9 @@
    public ComponentStatus getStatus() {
       return globalComponentRegistry.getStatus();
    }
+
+   @Override
+   public String toString() {
+      return super.toString() + "@Address:" + getAddress();
+   }
 }

Modified: core/branches/flat/src/main/java/org/horizon/marshall/HorizonMarshaller.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/marshall/HorizonMarshaller.java	2009-03-09 09:03:53 UTC (rev 7889)
+++ core/branches/flat/src/main/java/org/horizon/marshall/HorizonMarshaller.java	2009-03-09 09:05:21 UTC (rev 7890)
@@ -196,7 +196,10 @@
             out.writeByte(MAGICNUMBER_TRANSACTION_LOG);
             TransactionLog.LogEntry le = (TransactionLog.LogEntry) o;
             marshallObject(le.getTransaction(), out, refMap);
-            marshallObject(le.getModifications(), out, refMap);
+            WriteCommand[] cmds = le.getModifications();
+            writeUnsignedInt(out, cmds.length);
+            for (WriteCommand c : cmds)
+               marshallObject(c, out, refMap);
          } else if (o instanceof Serializable) {
             if (trace) log.trace("WARNING: using object serialization for [{0}]", o.getClass());
 
@@ -313,8 +316,11 @@
             retVal = unmarshallJGroupsAddress(in);
             return retVal;
          case MAGICNUMBER_TRANSACTION_LOG:
-            retVal = new TransactionLog.LogEntry((GlobalTransaction) unmarshallObject(in, refMap), (List<WriteCommand>) unmarshallObject(in, refMap));
-            return retVal;
+            GlobalTransaction gtx = (GlobalTransaction) unmarshallObject(in, refMap);
+            int numCommands = readUnsignedInt(in);
+            WriteCommand[] cmds = new WriteCommand[numCommands];
+            for (int i = 0; i < numCommands; i++) cmds[i] = (WriteCommand) unmarshallObject(in, refMap);
+            return new TransactionLog.LogEntry(gtx, cmds);
          case MAGICNUMBER_ARRAY:
             return unmarshallArray(in, refMap);
          case MAGICNUMBER_ARRAY_LIST:
@@ -783,4 +789,4 @@
    public Object objectFromByteBuffer(byte[] bytes) throws IOException, ClassNotFoundException {
       return objectFromByteBuffer(bytes, 0, bytes.length);
    }
-}
\ No newline at end of file
+}

Modified: core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/CommandAwareRpcDispatcher.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/CommandAwareRpcDispatcher.java	2009-03-09 09:03:53 UTC (rev 7889)
+++ core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/CommandAwareRpcDispatcher.java	2009-03-09 09:05:21 UTC (rev 7890)
@@ -24,6 +24,7 @@
 import org.horizon.CacheException;
 import org.horizon.commands.RPCCommand;
 import org.horizon.commands.ReplicableCommand;
+import org.horizon.commands.remote.ClusteredGetCommand;
 import org.horizon.logging.Log;
 import org.horizon.logging.LogFactory;
 import org.horizon.remoting.InboundInvocationHandler;
@@ -166,10 +167,16 @@
          if (replayIgnored) {
             ExtendedResponse extended = new ExtendedResponse(retval);
             extended.setReplayIgnoredRequests(true);
-            retval = extended;
+            return extended;
+         } else {
+
+            // Do we really need a response?!?  The caller would only ever expect a response for certain types of
+            // commands, such as a ClusteredGet
+            if (cmd.isSingleCommand() && cmd.getSingleCommand() instanceof ClusteredGetCommand)
+               return retval;
+            else
+               return null; // saves on serializing a response!
          }
-         return retval;
-
       } finally {
          if (unlock) distributedSync.releaseProcessingLock();
       }
@@ -254,4 +261,4 @@
          return retval;
       }
    }
-}
\ No newline at end of file
+}

Modified: core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/JGroupsTransport.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/JGroupsTransport.java	2009-03-09 09:03:53 UTC (rev 7889)
+++ core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/JGroupsTransport.java	2009-03-09 09:05:21 UTC (rev 7890)
@@ -307,7 +307,7 @@
    }
 
    public Address getAddress() {
-      if (address == null) {
+      if (address == null && channel != null) {
          address = new JGroupsAddress(channel.getLocalAddress());
       }
       return address;

Modified: core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferManagerImpl.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferManagerImpl.java	2009-03-09 09:03:53 UTC (rev 7889)
+++ core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferManagerImpl.java	2009-03-09 09:05:21 UTC (rev 7890)
@@ -53,7 +53,7 @@
 import java.io.ObjectOutputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
-import java.util.List;
+import java.util.Arrays;
 import java.util.Set;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 
@@ -195,8 +195,8 @@
    private void processCommitLog(ObjectInputStream ois) throws Exception {
       Object object = marshaller.objectFromObjectStream(ois);
       while (object instanceof TransactionLog.LogEntry) {
-         List<WriteCommand> mods = ((TransactionLog.LogEntry) object).getModifications();
-         if (trace) log.trace("Mods = {0}", mods);
+         WriteCommand[] mods = ((TransactionLog.LogEntry) object).getModifications();
+         if (trace) log.trace("Mods = {0}", Arrays.toString(mods));
          for (WriteCommand mod : mods) {
             InvocationContext ctx = invocationContextContainer.get();
             ctx.setOriginLocal(false);

Modified: core/branches/flat/src/main/java/org/horizon/transaction/TransactionLog.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/transaction/TransactionLog.java	2009-03-09 09:03:53 UTC (rev 7889)
+++ core/branches/flat/src/main/java/org/horizon/transaction/TransactionLog.java	2009-03-09 09:05:21 UTC (rev 7890)
@@ -48,9 +48,9 @@
 
    public static class LogEntry {
       private final GlobalTransaction transaction;
-      private final List<WriteCommand> modifications;
+      private final WriteCommand[] modifications;
 
-      public LogEntry(GlobalTransaction transaction, List<WriteCommand> modifications) {
+      public LogEntry(GlobalTransaction transaction, WriteCommand... modifications) {
          this.transaction = transaction;
          this.modifications = modifications;
       }
@@ -59,7 +59,7 @@
          return transaction;
       }
 
-      public List<WriteCommand> getModifications() {
+      public WriteCommand[] getModifications() {
          return modifications;
       }
    }
@@ -75,20 +75,18 @@
       // it is perfectly normal for a prepare not to be logged for this gtx, for example if a transaction did not
       // modify anything, then beforeCompletion() is not invoked and logPrepare() will not be called to register the
       // prepare.
-      if (command != null) addEntry(new LogEntry(gtx, command.getModifications()));
+      if (command != null && isActive()) addEntry(gtx, command.getModifications());
    }
 
-   private void addEntry(LogEntry entry) {
-      if (!isActive())
-         return;
-
-      for (; ;) {
+   private void addEntry(GlobalTransaction gtx, WriteCommand... commands) {
+      LogEntry entry = new LogEntry(gtx, commands);
+      boolean success = false;
+      while (!success) {
          try {
-            if (log.isTraceEnabled())
-               log.trace("Added commit entry to tx log" + entry);
+            if (log.isTraceEnabled()) log.trace("Added commit entry to tx log {0}", entry);
 
             entries.put(entry);
-            break;
+            success = true;
          }
          catch (InterruptedException e) {
             Thread.currentThread().interrupt();
@@ -99,16 +97,12 @@
    public final void logOnePhaseCommit(GlobalTransaction gtx, List<WriteCommand> modifications) {
       // Just in case...
       if (gtx != null) pendingPrepares.remove(gtx);
-      if (!modifications.isEmpty()) addEntry(new LogEntry(gtx, modifications));
+      if (isActive() && modifications != null && modifications.size() > 0)
+         addEntry(gtx, modifications.toArray(new WriteCommand[modifications.size()]));
    }
 
    public final void logNoTxWrite(WriteCommand write) {
-      if (!isActive())
-         return;
-
-      ArrayList<WriteCommand> list = new ArrayList<WriteCommand>();
-      list.add(write);
-      addEntry(new LogEntry(null, list));
+      if (isActive()) addEntry(null, write);
    }
 
    public void rollback(GlobalTransaction gtx) {
@@ -130,7 +124,7 @@
       entries.clear();
    }
 
-   public int size() {
+   public final int size() {
       return entries.size();
    }
 




More information about the jbosscache-commits mailing list