[jbosscache-commits] JBoss Cache SVN: r7790 - in core/branches/flat/src: main/java/org/horizon/container and 7 other directories.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Wed Feb 25 13:30:30 EST 2009


Author: manik.surtani at jboss.com
Date: 2009-02-25 13:30:30 -0500 (Wed, 25 Feb 2009)
New Revision: 7790

Added:
   core/branches/flat/src/main/java/org/horizon/transaction/TransactionLog.java
Modified:
   core/branches/flat/src/main/java/org/horizon/commands/tx/PrepareCommand.java
   core/branches/flat/src/main/java/org/horizon/container/DataContainer.java
   core/branches/flat/src/main/java/org/horizon/container/UnsortedDataContainer.java
   core/branches/flat/src/main/java/org/horizon/factories/EmptyConstructorNamedCacheFactory.java
   core/branches/flat/src/main/java/org/horizon/interceptors/InvalidationInterceptor.java
   core/branches/flat/src/main/java/org/horizon/loader/CacheLoaderManager.java
   core/branches/flat/src/main/java/org/horizon/loader/CacheLoaderManagerImpl.java
   core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/StateTransferMonitor.java
   core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferManagerImpl.java
   core/branches/flat/src/test/java/org/horizon/test/ReplListener.java
Log:
More WIP on porting NBST

Modified: core/branches/flat/src/main/java/org/horizon/commands/tx/PrepareCommand.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/commands/tx/PrepareCommand.java	2009-02-25 17:39:20 UTC (rev 7789)
+++ core/branches/flat/src/main/java/org/horizon/commands/tx/PrepareCommand.java	2009-02-25 18:30:30 UTC (rev 7790)
@@ -21,10 +21,10 @@
  */
 package org.horizon.commands.tx;
 
-import org.horizon.commands.DataCommand;
 import org.horizon.commands.ReplicableCommand;
 import org.horizon.commands.VisitableCommand;
 import org.horizon.commands.Visitor;
+import org.horizon.commands.write.WriteCommand;
 import org.horizon.context.InvocationContext;
 import org.horizon.remoting.transport.Address;
 import org.horizon.transaction.GlobalTransaction;
@@ -42,11 +42,11 @@
 public class PrepareCommand extends AbstractTransactionBoundaryCommand {
    public static final byte METHOD_ID = 10;
 
-   protected List<DataCommand> modifications;
+   protected List<WriteCommand> modifications;
    protected Address localAddress;
    protected boolean onePhaseCommit;
 
-   public PrepareCommand(GlobalTransaction gtx, List<DataCommand> modifications, Address localAddress, boolean onePhaseCommit) {
+   public PrepareCommand(GlobalTransaction gtx, List<WriteCommand> modifications, Address localAddress, boolean onePhaseCommit) {
       this.gtx = gtx;
       this.modifications = modifications;
       this.localAddress = localAddress;
@@ -64,7 +64,7 @@
       return visitor.visitPrepareCommand(ctx, this);
    }
 
-   public List<DataCommand> getModifications() {
+   public List<WriteCommand> getModifications() {
       return modifications;
    }
 
@@ -97,7 +97,7 @@
    @SuppressWarnings("unchecked")
    public void setParameters(int commandId, Object[] args) {
       gtx = (GlobalTransaction) args[0];
-      modifications = (List<DataCommand>) args[1];
+      modifications = (List<WriteCommand>) args[1];
       localAddress = (Address) args[2];
       onePhaseCommit = (Boolean) args[3];
    }
@@ -130,7 +130,7 @@
       PrepareCommand copy = new PrepareCommand();
       copy.gtx = gtx;
       copy.localAddress = localAddress;
-      copy.modifications = modifications == null ? null : new ArrayList<DataCommand>(modifications);
+      copy.modifications = modifications == null ? null : new ArrayList<WriteCommand>(modifications);
       copy.onePhaseCommit = onePhaseCommit;
       return copy;
    }
@@ -146,7 +146,7 @@
    }
 
    public boolean containsModificationType(Class<? extends ReplicableCommand> replicableCommandClass) {
-      for (DataCommand mod : getModifications()) {
+      for (WriteCommand mod : getModifications()) {
          if (mod.getClass().equals(replicableCommandClass)) {
             return true;
          }

Modified: core/branches/flat/src/main/java/org/horizon/container/DataContainer.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/container/DataContainer.java	2009-02-25 17:39:20 UTC (rev 7789)
+++ core/branches/flat/src/main/java/org/horizon/container/DataContainer.java	2009-02-25 18:30:30 UTC (rev 7790)
@@ -59,4 +59,6 @@
    Set<Object> purgeExpiredEntries();
 
    StoredEntry createEntryForStorage(Object key);
+
+   Set<StoredEntry> getAllEntriesForStorage();
 }

Modified: core/branches/flat/src/main/java/org/horizon/container/UnsortedDataContainer.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/container/UnsortedDataContainer.java	2009-02-25 17:39:20 UTC (rev 7789)
+++ core/branches/flat/src/main/java/org/horizon/container/UnsortedDataContainer.java	2009-02-25 18:30:30 UTC (rev 7790)
@@ -205,6 +205,22 @@
       return new StoredEntry(key, ecv.getValue(), ecv.getCreatedTime(), ecv.getExpiryTime());
    }
 
+   public Set<StoredEntry> getAllEntriesForStorage() {
+      Set<StoredEntry> set = new HashSet<StoredEntry>(immortalData.size() + expirableData.size());
+      for (Map.Entry<Object, CachedValue> entry: immortalData.entrySet())
+         set.add(new StoredEntry(entry.getKey(), entry.getValue().getValue()));
+
+      for (Iterator<Map.Entry<Object, ExpirableCachedValue>> it = expirableData.entrySet().iterator(); it.hasNext();) {
+         Map.Entry<Object, ExpirableCachedValue> entry = it.next();
+         ExpirableCachedValue ecv = entry.getValue();
+         if (ecv.isExpired())
+            it.remove();
+         else
+            set.add(new StoredEntry(entry.getKey(), ecv.getValue(), ecv.getCreatedTime(), ecv.getExpiryTime()));
+      }
+      return set;
+   }
+
    private class KeySet extends AbstractSet<Object> {
       Set<Object> immortalKeys;
       Set<Object> expirableKeys;

Modified: core/branches/flat/src/main/java/org/horizon/factories/EmptyConstructorNamedCacheFactory.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/factories/EmptyConstructorNamedCacheFactory.java	2009-02-25 17:39:20 UTC (rev 7789)
+++ core/branches/flat/src/main/java/org/horizon/factories/EmptyConstructorNamedCacheFactory.java	2009-02-25 18:30:30 UTC (rev 7790)
@@ -33,6 +33,7 @@
 import org.horizon.marshall.VersionAwareMarshaller;
 import org.horizon.notifications.cachelistener.CacheNotifier;
 import org.horizon.transaction.TransactionTable;
+import org.horizon.transaction.TransactionLog;
 
 /**
  * Simple factory that just uses reflection and an empty constructor of the component type.
@@ -42,7 +43,8 @@
  */
 @DefaultFactoryFor(classes = {CacheNotifier.class, EntryFactory.class, CommandsFactory.class,
                               CacheLoaderManager.class, InvocationContextContainer.class,
-                              TransactionTable.class, BatchContainer.class, ContextFactory.class})
+                              TransactionTable.class, BatchContainer.class, ContextFactory.class,
+                              TransactionLog.class})
 public class EmptyConstructorNamedCacheFactory extends AbstractNamedCacheComponentFactory implements AutoInstantiableFactory {
    @Override
    public <T> T construct(Class<T> componentType) {

Modified: core/branches/flat/src/main/java/org/horizon/interceptors/InvalidationInterceptor.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/interceptors/InvalidationInterceptor.java	2009-02-25 17:39:20 UTC (rev 7789)
+++ core/branches/flat/src/main/java/org/horizon/interceptors/InvalidationInterceptor.java	2009-02-25 18:30:30 UTC (rev 7790)
@@ -23,7 +23,6 @@
 
 import org.horizon.commands.AbstractVisitor;
 import org.horizon.commands.CommandsFactory;
-import org.horizon.commands.DataCommand;
 import org.horizon.commands.VisitableCommand;
 import org.horizon.commands.tx.PrepareCommand;
 import org.horizon.commands.write.ClearCommand;
@@ -124,9 +123,9 @@
             throw new IllegalStateException("cannot find transaction transactionContext for " + gtx);
 
          if (transactionContext.hasModifications()) {
-            List<DataCommand> mods;
+            List<WriteCommand> mods;
             if (transactionContext.hasLocalModifications()) {
-               mods = new ArrayList<DataCommand>(command.getModifications());
+               mods = new ArrayList<WriteCommand>(command.getModifications());
                mods.removeAll(transactionContext.getLocalModifications());
             } else {
                mods = command.getModifications();
@@ -162,7 +161,7 @@
       return retval;
    }
 
-   private void broadcastInvalidate(List<DataCommand> modifications, Transaction tx, InvocationContext ctx) throws Throwable {
+   private void broadcastInvalidate(List<WriteCommand> modifications, Transaction tx, InvocationContext ctx) throws Throwable {
       if (ctx.getTransaction() != null && !isLocalModeForced(ctx)) {
          if (modifications == null || modifications.isEmpty()) return;
          InvalidationFilterVisitor filterVisitor = new InvalidationFilterVisitor(modifications.size());

Modified: core/branches/flat/src/main/java/org/horizon/loader/CacheLoaderManager.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/CacheLoaderManager.java	2009-02-25 17:39:20 UTC (rev 7789)
+++ core/branches/flat/src/main/java/org/horizon/loader/CacheLoaderManager.java	2009-02-25 18:30:30 UTC (rev 7790)
@@ -23,6 +23,8 @@
    boolean isFetchPersistentState();
 
    void preload();
+
+   boolean isEnabled();
 }
 
 

Modified: core/branches/flat/src/main/java/org/horizon/loader/CacheLoaderManagerImpl.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/CacheLoaderManagerImpl.java	2009-02-25 17:39:20 UTC (rev 7789)
+++ core/branches/flat/src/main/java/org/horizon/loader/CacheLoaderManagerImpl.java	2009-02-25 18:30:30 UTC (rev 7790)
@@ -75,15 +75,15 @@
    }
 
    public boolean isUsingPassivation() {
-      return clmConfig.isPassivation();
+      return isEnabled() ? clmConfig.isPassivation() : false;
    }
 
    public boolean isShared() {
-      return clmConfig.isShared();
+      return isEnabled() ? clmConfig.isShared() : false;
    }
 
    public boolean isFetchPersistentState() {
-      return clmConfig.isFetchPersistentState();
+      return isEnabled() ? clmConfig.isFetchPersistentState() : false;
    }
 
    @Start(priority = 10)
@@ -100,6 +100,10 @@
       }
    }
 
+   public boolean isEnabled() {
+      return clmConfig != null;
+   }
+
    /**
     * Performs a preload on the cache based on the cache loader preload configs used when configuring the cache.
     */

Modified: core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/StateTransferMonitor.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/StateTransferMonitor.java	2009-02-25 17:39:20 UTC (rev 7789)
+++ core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/StateTransferMonitor.java	2009-02-25 18:30:30 UTC (rev 7790)
@@ -13,14 +13,6 @@
     */
    private volatile boolean isStateSet = false;
 
-   public boolean isStateSet() {
-      return isStateSet;
-   }
-
-   public void setStateSet(boolean stateSet) {
-      isStateSet = stateSet;
-   }
-
    public StateTransferException getSetStateException() {
       return setStateException;
    }
@@ -43,6 +35,7 @@
 
    public void notifyStateReceiptSucceeded() {
       synchronized (stateLock) {
+         isStateSet = true;
          // Notify wait that state has been set.
          stateLock.notifyAll();
       }
@@ -50,6 +43,7 @@
 
    public void notifyStateReceiptFailed(StateTransferException setStateException) {
       this.setStateException = setStateException;
+      isStateSet = false;
       notifyStateReceiptSucceeded();
    }
 }

Modified: core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferManagerImpl.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferManagerImpl.java	2009-02-25 17:39:20 UTC (rev 7789)
+++ core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferManagerImpl.java	2009-02-25 18:30:30 UTC (rev 7790)
@@ -21,12 +21,20 @@
  */
 package org.horizon.statetransfer;
 
-import org.horizon.Cache;
+import org.horizon.AdvancedCache;
+import org.horizon.transaction.TransactionLog;
 import org.horizon.config.Configuration;
+import org.horizon.container.DataContainer;
 import org.horizon.factories.annotations.Inject;
 import org.horizon.factories.annotations.Start;
+import org.horizon.invocation.Options;
+import org.horizon.loader.CacheLoaderException;
+import org.horizon.loader.CacheLoaderManager;
+import org.horizon.loader.CacheStore;
+import org.horizon.loader.StoredEntry;
 import org.horizon.logging.Log;
 import org.horizon.logging.LogFactory;
+import org.horizon.marshall.Marshaller;
 import org.horizon.remoting.RPCManager;
 import org.horizon.util.Util;
 
@@ -36,25 +44,40 @@
 import java.io.ObjectOutputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 public class StateTransferManagerImpl implements StateTransferManager {
 
    RPCManager rpcManager;
-   Cache cache;
+   AdvancedCache cache;
    Configuration configuration;
+   DataContainer dataContainer;
+   CacheLoaderManager clm;
+   CacheStore cs;
+   Marshaller marshaller;
+   TransactionLog transactionLog;
    private static final Log log = LogFactory.getLog(StateTransferManagerImpl.class);
    private static final Delimiter DELIMITER = new Delimiter();
 
    @Inject
-   public void injectDependencies(RPCManager rpcManager, Cache cache, Configuration configuration) {
+   public void injectDependencies(RPCManager rpcManager, AdvancedCache cache, Configuration configuration,
+                                  DataContainer dataContainer, CacheLoaderManager clm, Marshaller marshaller,
+                                  TransactionLog transactionLog) {
       this.rpcManager = rpcManager;
       this.cache = cache;
       this.configuration = configuration;
+      this.dataContainer = dataContainer;
+      this.clm = clm;
+      this.marshaller = marshaller;
+      this.transactionLog = transactionLog;
    }
 
    @Start(priority = 14)
    // it is imperative that this starts *after* the RPCManager does.
    public void start() throws StateTransferException {
+      cs = clm == null || !clm.isEnabled() || !clm.isFetchPersistentState() ? null : clm.getCacheStore();
+
       long startTime = 0;
       if (log.isDebugEnabled()) {
          log.debug("Initiating state transfer process");
@@ -81,6 +104,7 @@
          delimit(oos);
          oos.flush();
          oos.close();
+         if (log.isDebugEnabled()) log.debug("State generated, closing object stream");
          // just close the object stream but do NOT close the underlying stream
       } catch (StateTransferException ste) {
          throw ste;
@@ -99,6 +123,7 @@
          assertDelimited(ois);
          applyPersistentState(ois);
          assertDelimited(ois);
+         if (log.isDebugEnabled()) log.debug("State applied, closing object stream");
          ois.close();
          // just close the object stream but do NOT close the underlying stream
       } catch (StateTransferException ste) {
@@ -109,19 +134,49 @@
    }
 
    private void applyInMemoryState(ObjectInputStream i) throws StateTransferException {
-      throw new StateTransferException("Implement me!");
+      dataContainer.clear();
+      try {
+         Set<StoredEntry> set = (Set<StoredEntry>) marshaller.objectFromObjectStream(i);
+         for (StoredEntry se: set) cache.put(se.getKey(), se.getValue(), se.getLifespan(), TimeUnit.MILLISECONDS, Options.CACHE_MODE_LOCAL);
+      } catch (Exception e) {
+         dataContainer.clear();
+         throw new StateTransferException(e);
+      }
    }
 
    private void generateInMemoryState(ObjectOutputStream o) throws StateTransferException {
-      throw new StateTransferException("Implement me!");
+      // write all StoredEntries to the stream using the marshaller.
+      // TODO is it safe enough to get these from the data container directly?
+      try {
+         Set<StoredEntry> s = dataContainer.getAllEntriesForStorage();
+         marshaller.objectToObjectStream(s, o);
+      } catch (Exception e) {
+         throw new StateTransferException(e);
+      }
    }
 
    private void applyPersistentState(ObjectInputStream i) throws StateTransferException {
-      throw new StateTransferException("Implement me!");
+      if (cs == null) {
+         if (log.isDebugEnabled()) log.debug("Not configured to fetch persistent state, or no cache store configured.  Skipping applying persistent state.");
+      } else {
+         try {
+            cs.fromStream(i);
+         } catch (CacheLoaderException cle) {
+            throw new StateTransferException(cle);
+         }
+      }
    }
-   
+
    private void generatePersistentState(ObjectOutputStream o) throws StateTransferException {
-      throw new StateTransferException("Implement me!");
+      if (cs == null) {
+         if (log.isDebugEnabled()) log.debug("Not configured to fetch persistent state, or no cache store configured.  Skipping generating persistent state.");
+      } else {
+         try {
+            cs.toStream(o);
+         } catch (CacheLoaderException cle) {
+            throw new StateTransferException(cle);
+         }
+      }
    }
 
    private void delimit(ObjectOutputStream o) throws IOException {
@@ -135,7 +190,8 @@
       } catch (Exception e) {
          throw new StateTransferException(e);
       }
-      if ((o == null) || !(o instanceof Delimiter)) throw new StateTransferException("Expected a delimiter, recieved " + o);
+      if ((o == null) || !(o instanceof Delimiter))
+         throw new StateTransferException("Expected a delimiter, recieved " + o);
    }
 
    // used as a marker for streams.

Added: core/branches/flat/src/main/java/org/horizon/transaction/TransactionLog.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/transaction/TransactionLog.java	                        (rev 0)
+++ core/branches/flat/src/main/java/org/horizon/transaction/TransactionLog.java	2009-02-25 18:30:30 UTC (rev 7790)
@@ -0,0 +1,178 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2005, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.horizon.transaction;
+
+import org.horizon.commands.tx.PrepareCommand;
+import org.horizon.commands.write.WriteCommand;
+import org.horizon.logging.Log;
+import org.horizon.logging.LogFactory;
+import org.horizon.marshall.Marshaller;
+
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Logs transactions and writes for Non-Blocking State Transfer
+ *
+ * @author Jason T. Greene
+ */
+public class TransactionLog
+{
+   private final Map<GlobalTransaction, PrepareCommand> pendingPrepares = new ConcurrentHashMap<GlobalTransaction, PrepareCommand>();
+   private final BlockingQueue<LogEntry> entries = new LinkedBlockingQueue<LogEntry>();
+   private AtomicBoolean active = new AtomicBoolean();
+
+   public static class LogEntry
+   {
+      private final GlobalTransaction transaction;
+      private final List<WriteCommand> modifications;
+
+      public LogEntry(GlobalTransaction transaction, List<WriteCommand> modifications)
+      {
+         this.transaction = transaction;
+         this.modifications = modifications;
+      }
+
+      public GlobalTransaction getTransaction()
+      {
+         return transaction;
+      }
+
+      public List<WriteCommand> getModifications()
+      {
+         return modifications;
+      }
+   }
+
+   private static Log log = LogFactory.getLog(TransactionLog.class);
+
+   public void logPrepare(PrepareCommand command)
+   {
+      pendingPrepares.put(command.getGlobalTransaction(), command);
+   }
+
+   public void logCommit(GlobalTransaction gtx)
+   {
+      PrepareCommand command = pendingPrepares.remove(gtx);
+      // it is perfectly normal for a prepare not to be logged for this gtx, for example if a transaction did not
+      // modify anything, then beforeCompletion() is not invoked and logPrepare() will not be called to register the
+      // prepare.
+      if (command != null) addEntry(new LogEntry(gtx, command.getModifications()));
+   }
+
+   private void addEntry(LogEntry entry)
+   {
+      if (! isActive())
+         return;
+
+      for (;;)
+      {
+         try
+         {
+            if (log.isTraceEnabled())
+               log.trace("Added commit entry to tx log" + entry);
+
+            entries.put(entry);
+            break;
+         }
+         catch (InterruptedException e)
+         {
+            Thread.currentThread().interrupt();
+         }
+      }
+   }
+
+   public void logOnePhaseCommit(GlobalTransaction gtx, List<WriteCommand> modifications)
+   {
+      // Just in case...
+      if (gtx != null) pendingPrepares.remove(gtx);
+      if (!modifications.isEmpty()) addEntry(new LogEntry(gtx, modifications));
+   }
+
+   public void logNoTxWrite(WriteCommand write)
+   {
+      if (! isActive())
+         return;
+
+      ArrayList<WriteCommand> list = new ArrayList<WriteCommand>();
+      list.add(write);
+      addEntry(new LogEntry(null, list));
+   }
+
+   public void rollback(GlobalTransaction gtx)
+   {
+      pendingPrepares.remove(gtx);
+   }
+
+   public boolean isActive()
+   {
+      return active.get();
+   }
+
+   public boolean activate()
+   {
+      return active.compareAndSet(false, true);
+   }
+
+   public void deactivate()
+   {
+      active.set(false);
+      if (entries.size() > 0)
+         log.error("Unprocessed Transaction Log Entries! = " + entries.size());
+      entries.clear();
+   }
+
+   public int size()
+   {
+      return entries.size();
+   }
+
+   public void writeCommitLog(Marshaller marshaller, ObjectOutputStream out) throws Exception
+   {
+     List<LogEntry> buffer = new ArrayList<LogEntry>(10);
+
+     while (entries.drainTo(buffer, 10) > 0)
+     {
+        for (LogEntry entry : buffer)
+           marshaller.objectToObjectStream(entry, out);
+
+        buffer.clear();
+     }
+   }
+
+   public void writePendingPrepares(Marshaller marshaller, ObjectOutputStream out) throws Exception
+   {
+      for (PrepareCommand entry : pendingPrepares.values())
+         marshaller.objectToObjectStream(entry, out);
+   }
+
+   public boolean hasPendingPrepare(PrepareCommand command)
+   {
+      return pendingPrepares.containsKey(command.getGlobalTransaction());
+   }
+}

Modified: core/branches/flat/src/test/java/org/horizon/test/ReplListener.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/test/ReplListener.java	2009-02-25 17:39:20 UTC (rev 7789)
+++ core/branches/flat/src/test/java/org/horizon/test/ReplListener.java	2009-02-25 18:30:30 UTC (rev 7790)
@@ -1,16 +1,16 @@
 package org.horizon.test;
 
 import org.horizon.Cache;
+import org.horizon.commands.VisitableCommand;
+import org.horizon.commands.tx.CommitCommand;
+import org.horizon.commands.tx.PrepareCommand;
+import org.horizon.commands.write.WriteCommand;
 import org.horizon.context.InvocationContext;
 import org.horizon.interceptors.base.CommandInterceptor;
-import org.horizon.commands.VisitableCommand;
-import org.horizon.commands.DataCommand;
-import org.horizon.commands.tx.PrepareCommand;
-import org.horizon.commands.tx.CommitCommand;
 
+import java.util.Arrays;
+import java.util.HashSet;
 import java.util.Set;
-import java.util.HashSet;
-import java.util.Arrays;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -114,7 +114,7 @@
          Object o = invokeNextInterceptor(ctx, cmd);
          if (!ctx.isOriginLocal()) {
             markAsVisited(cmd);
-            for (DataCommand mod : cmd.getModifications()) markAsVisited(mod);
+            for (WriteCommand mod : cmd.getModifications()) markAsVisited(mod);
          }
          return o;
       }




More information about the jbosscache-commits mailing list