Author: manik.surtani(a)jboss.com
Date: 2009-02-25 13:30:30 -0500 (Wed, 25 Feb 2009)
New Revision: 7790
Added:
core/branches/flat/src/main/java/org/horizon/transaction/TransactionLog.java
Modified:
core/branches/flat/src/main/java/org/horizon/commands/tx/PrepareCommand.java
core/branches/flat/src/main/java/org/horizon/container/DataContainer.java
core/branches/flat/src/main/java/org/horizon/container/UnsortedDataContainer.java
core/branches/flat/src/main/java/org/horizon/factories/EmptyConstructorNamedCacheFactory.java
core/branches/flat/src/main/java/org/horizon/interceptors/InvalidationInterceptor.java
core/branches/flat/src/main/java/org/horizon/loader/CacheLoaderManager.java
core/branches/flat/src/main/java/org/horizon/loader/CacheLoaderManagerImpl.java
core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/StateTransferMonitor.java
core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferManagerImpl.java
core/branches/flat/src/test/java/org/horizon/test/ReplListener.java
Log:
More WIP on porting NBST
Modified: core/branches/flat/src/main/java/org/horizon/commands/tx/PrepareCommand.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/commands/tx/PrepareCommand.java 2009-02-25
17:39:20 UTC (rev 7789)
+++
core/branches/flat/src/main/java/org/horizon/commands/tx/PrepareCommand.java 2009-02-25
18:30:30 UTC (rev 7790)
@@ -21,10 +21,10 @@
*/
package org.horizon.commands.tx;
-import org.horizon.commands.DataCommand;
import org.horizon.commands.ReplicableCommand;
import org.horizon.commands.VisitableCommand;
import org.horizon.commands.Visitor;
+import org.horizon.commands.write.WriteCommand;
import org.horizon.context.InvocationContext;
import org.horizon.remoting.transport.Address;
import org.horizon.transaction.GlobalTransaction;
@@ -42,11 +42,11 @@
public class PrepareCommand extends AbstractTransactionBoundaryCommand {
public static final byte METHOD_ID = 10;
- protected List<DataCommand> modifications;
+ protected List<WriteCommand> modifications;
protected Address localAddress;
protected boolean onePhaseCommit;
- public PrepareCommand(GlobalTransaction gtx, List<DataCommand> modifications,
Address localAddress, boolean onePhaseCommit) {
+ public PrepareCommand(GlobalTransaction gtx, List<WriteCommand> modifications,
Address localAddress, boolean onePhaseCommit) {
this.gtx = gtx;
this.modifications = modifications;
this.localAddress = localAddress;
@@ -64,7 +64,7 @@
return visitor.visitPrepareCommand(ctx, this);
}
- public List<DataCommand> getModifications() {
+ public List<WriteCommand> getModifications() {
return modifications;
}
@@ -97,7 +97,7 @@
@SuppressWarnings("unchecked")
public void setParameters(int commandId, Object[] args) {
gtx = (GlobalTransaction) args[0];
- modifications = (List<DataCommand>) args[1];
+ modifications = (List<WriteCommand>) args[1];
localAddress = (Address) args[2];
onePhaseCommit = (Boolean) args[3];
}
@@ -130,7 +130,7 @@
PrepareCommand copy = new PrepareCommand();
copy.gtx = gtx;
copy.localAddress = localAddress;
- copy.modifications = modifications == null ? null : new
ArrayList<DataCommand>(modifications);
+ copy.modifications = modifications == null ? null : new
ArrayList<WriteCommand>(modifications);
copy.onePhaseCommit = onePhaseCommit;
return copy;
}
@@ -146,7 +146,7 @@
}
public boolean containsModificationType(Class<? extends ReplicableCommand>
replicableCommandClass) {
- for (DataCommand mod : getModifications()) {
+ for (WriteCommand mod : getModifications()) {
if (mod.getClass().equals(replicableCommandClass)) {
return true;
}
Modified: core/branches/flat/src/main/java/org/horizon/container/DataContainer.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/container/DataContainer.java 2009-02-25
17:39:20 UTC (rev 7789)
+++ core/branches/flat/src/main/java/org/horizon/container/DataContainer.java 2009-02-25
18:30:30 UTC (rev 7790)
@@ -59,4 +59,6 @@
Set<Object> purgeExpiredEntries();
StoredEntry createEntryForStorage(Object key);
+
+ Set<StoredEntry> getAllEntriesForStorage();
}
Modified:
core/branches/flat/src/main/java/org/horizon/container/UnsortedDataContainer.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/container/UnsortedDataContainer.java 2009-02-25
17:39:20 UTC (rev 7789)
+++
core/branches/flat/src/main/java/org/horizon/container/UnsortedDataContainer.java 2009-02-25
18:30:30 UTC (rev 7790)
@@ -205,6 +205,22 @@
return new StoredEntry(key, ecv.getValue(), ecv.getCreatedTime(),
ecv.getExpiryTime());
}
+ public Set<StoredEntry> getAllEntriesForStorage() {
+ Set<StoredEntry> set = new HashSet<StoredEntry>(immortalData.size() +
expirableData.size());
+ for (Map.Entry<Object, CachedValue> entry: immortalData.entrySet())
+ set.add(new StoredEntry(entry.getKey(), entry.getValue().getValue()));
+
+ for (Iterator<Map.Entry<Object, ExpirableCachedValue>> it =
expirableData.entrySet().iterator(); it.hasNext();) {
+ Map.Entry<Object, ExpirableCachedValue> entry = it.next();
+ ExpirableCachedValue ecv = entry.getValue();
+ if (ecv.isExpired())
+ it.remove();
+ else
+ set.add(new StoredEntry(entry.getKey(), ecv.getValue(), ecv.getCreatedTime(),
ecv.getExpiryTime()));
+ }
+ return set;
+ }
+
private class KeySet extends AbstractSet<Object> {
Set<Object> immortalKeys;
Set<Object> expirableKeys;
Modified:
core/branches/flat/src/main/java/org/horizon/factories/EmptyConstructorNamedCacheFactory.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/factories/EmptyConstructorNamedCacheFactory.java 2009-02-25
17:39:20 UTC (rev 7789)
+++
core/branches/flat/src/main/java/org/horizon/factories/EmptyConstructorNamedCacheFactory.java 2009-02-25
18:30:30 UTC (rev 7790)
@@ -33,6 +33,7 @@
import org.horizon.marshall.VersionAwareMarshaller;
import org.horizon.notifications.cachelistener.CacheNotifier;
import org.horizon.transaction.TransactionTable;
+import org.horizon.transaction.TransactionLog;
/**
* Simple factory that just uses reflection and an empty constructor of the component
type.
@@ -42,7 +43,8 @@
*/
@DefaultFactoryFor(classes = {CacheNotifier.class, EntryFactory.class,
CommandsFactory.class,
CacheLoaderManager.class,
InvocationContextContainer.class,
- TransactionTable.class, BatchContainer.class,
ContextFactory.class})
+ TransactionTable.class, BatchContainer.class,
ContextFactory.class,
+ TransactionLog.class})
public class EmptyConstructorNamedCacheFactory extends AbstractNamedCacheComponentFactory
implements AutoInstantiableFactory {
@Override
public <T> T construct(Class<T> componentType) {
Modified:
core/branches/flat/src/main/java/org/horizon/interceptors/InvalidationInterceptor.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/interceptors/InvalidationInterceptor.java 2009-02-25
17:39:20 UTC (rev 7789)
+++
core/branches/flat/src/main/java/org/horizon/interceptors/InvalidationInterceptor.java 2009-02-25
18:30:30 UTC (rev 7790)
@@ -23,7 +23,6 @@
import org.horizon.commands.AbstractVisitor;
import org.horizon.commands.CommandsFactory;
-import org.horizon.commands.DataCommand;
import org.horizon.commands.VisitableCommand;
import org.horizon.commands.tx.PrepareCommand;
import org.horizon.commands.write.ClearCommand;
@@ -124,9 +123,9 @@
throw new IllegalStateException("cannot find transaction
transactionContext for " + gtx);
if (transactionContext.hasModifications()) {
- List<DataCommand> mods;
+ List<WriteCommand> mods;
if (transactionContext.hasLocalModifications()) {
- mods = new ArrayList<DataCommand>(command.getModifications());
+ mods = new ArrayList<WriteCommand>(command.getModifications());
mods.removeAll(transactionContext.getLocalModifications());
} else {
mods = command.getModifications();
@@ -162,7 +161,7 @@
return retval;
}
- private void broadcastInvalidate(List<DataCommand> modifications, Transaction
tx, InvocationContext ctx) throws Throwable {
+ private void broadcastInvalidate(List<WriteCommand> modifications, Transaction
tx, InvocationContext ctx) throws Throwable {
if (ctx.getTransaction() != null && !isLocalModeForced(ctx)) {
if (modifications == null || modifications.isEmpty()) return;
InvalidationFilterVisitor filterVisitor = new
InvalidationFilterVisitor(modifications.size());
Modified: core/branches/flat/src/main/java/org/horizon/loader/CacheLoaderManager.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/CacheLoaderManager.java 2009-02-25
17:39:20 UTC (rev 7789)
+++ core/branches/flat/src/main/java/org/horizon/loader/CacheLoaderManager.java 2009-02-25
18:30:30 UTC (rev 7790)
@@ -23,6 +23,8 @@
boolean isFetchPersistentState();
void preload();
+
+ boolean isEnabled();
}
Modified: core/branches/flat/src/main/java/org/horizon/loader/CacheLoaderManagerImpl.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/loader/CacheLoaderManagerImpl.java 2009-02-25
17:39:20 UTC (rev 7789)
+++
core/branches/flat/src/main/java/org/horizon/loader/CacheLoaderManagerImpl.java 2009-02-25
18:30:30 UTC (rev 7790)
@@ -75,15 +75,15 @@
}
public boolean isUsingPassivation() {
- return clmConfig.isPassivation();
+ return isEnabled() ? clmConfig.isPassivation() : false;
}
public boolean isShared() {
- return clmConfig.isShared();
+ return isEnabled() ? clmConfig.isShared() : false;
}
public boolean isFetchPersistentState() {
- return clmConfig.isFetchPersistentState();
+ return isEnabled() ? clmConfig.isFetchPersistentState() : false;
}
@Start(priority = 10)
@@ -100,6 +100,10 @@
}
}
+ public boolean isEnabled() {
+ return clmConfig != null;
+ }
+
/**
* Performs a preload on the cache based on the cache loader preload configs used when
configuring the cache.
*/
Modified:
core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/StateTransferMonitor.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/StateTransferMonitor.java 2009-02-25
17:39:20 UTC (rev 7789)
+++
core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/StateTransferMonitor.java 2009-02-25
18:30:30 UTC (rev 7790)
@@ -13,14 +13,6 @@
*/
private volatile boolean isStateSet = false;
- public boolean isStateSet() {
- return isStateSet;
- }
-
- public void setStateSet(boolean stateSet) {
- isStateSet = stateSet;
- }
-
public StateTransferException getSetStateException() {
return setStateException;
}
@@ -43,6 +35,7 @@
public void notifyStateReceiptSucceeded() {
synchronized (stateLock) {
+ isStateSet = true;
// Notify wait that state has been set.
stateLock.notifyAll();
}
@@ -50,6 +43,7 @@
public void notifyStateReceiptFailed(StateTransferException setStateException) {
this.setStateException = setStateException;
+ isStateSet = false;
notifyStateReceiptSucceeded();
}
}
Modified:
core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferManagerImpl.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferManagerImpl.java 2009-02-25
17:39:20 UTC (rev 7789)
+++
core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferManagerImpl.java 2009-02-25
18:30:30 UTC (rev 7790)
@@ -21,12 +21,20 @@
*/
package org.horizon.statetransfer;
-import org.horizon.Cache;
+import org.horizon.AdvancedCache;
+import org.horizon.transaction.TransactionLog;
import org.horizon.config.Configuration;
+import org.horizon.container.DataContainer;
import org.horizon.factories.annotations.Inject;
import org.horizon.factories.annotations.Start;
+import org.horizon.invocation.Options;
+import org.horizon.loader.CacheLoaderException;
+import org.horizon.loader.CacheLoaderManager;
+import org.horizon.loader.CacheStore;
+import org.horizon.loader.StoredEntry;
import org.horizon.logging.Log;
import org.horizon.logging.LogFactory;
+import org.horizon.marshall.Marshaller;
import org.horizon.remoting.RPCManager;
import org.horizon.util.Util;
@@ -36,25 +44,40 @@
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.io.Serializable;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
public class StateTransferManagerImpl implements StateTransferManager {
RPCManager rpcManager;
- Cache cache;
+ AdvancedCache cache;
Configuration configuration;
+ DataContainer dataContainer;
+ CacheLoaderManager clm;
+ CacheStore cs;
+ Marshaller marshaller;
+ TransactionLog transactionLog;
private static final Log log = LogFactory.getLog(StateTransferManagerImpl.class);
private static final Delimiter DELIMITER = new Delimiter();
@Inject
- public void injectDependencies(RPCManager rpcManager, Cache cache, Configuration
configuration) {
+ public void injectDependencies(RPCManager rpcManager, AdvancedCache cache,
Configuration configuration,
+ DataContainer dataContainer, CacheLoaderManager clm,
Marshaller marshaller,
+ TransactionLog transactionLog) {
this.rpcManager = rpcManager;
this.cache = cache;
this.configuration = configuration;
+ this.dataContainer = dataContainer;
+ this.clm = clm;
+ this.marshaller = marshaller;
+ this.transactionLog = transactionLog;
}
@Start(priority = 14)
// it is imperative that this starts *after* the RPCManager does.
public void start() throws StateTransferException {
+ cs = clm == null || !clm.isEnabled() || !clm.isFetchPersistentState() ? null :
clm.getCacheStore();
+
long startTime = 0;
if (log.isDebugEnabled()) {
log.debug("Initiating state transfer process");
@@ -81,6 +104,7 @@
delimit(oos);
oos.flush();
oos.close();
+ if (log.isDebugEnabled()) log.debug("State generated, closing object
stream");
// just close the object stream but do NOT close the underlying stream
} catch (StateTransferException ste) {
throw ste;
@@ -99,6 +123,7 @@
assertDelimited(ois);
applyPersistentState(ois);
assertDelimited(ois);
+ if (log.isDebugEnabled()) log.debug("State applied, closing object
stream");
ois.close();
// just close the object stream but do NOT close the underlying stream
} catch (StateTransferException ste) {
@@ -109,19 +134,49 @@
}
private void applyInMemoryState(ObjectInputStream i) throws StateTransferException {
- throw new StateTransferException("Implement me!");
+ dataContainer.clear();
+ try {
+ Set<StoredEntry> set = (Set<StoredEntry>)
marshaller.objectFromObjectStream(i);
+ for (StoredEntry se: set) cache.put(se.getKey(), se.getValue(),
se.getLifespan(), TimeUnit.MILLISECONDS, Options.CACHE_MODE_LOCAL);
+ } catch (Exception e) {
+ dataContainer.clear();
+ throw new StateTransferException(e);
+ }
}
private void generateInMemoryState(ObjectOutputStream o) throws StateTransferException
{
- throw new StateTransferException("Implement me!");
+ // write all StoredEntries to the stream using the marshaller.
+ // TODO is it safe enough to get these from the data container directly?
+ try {
+ Set<StoredEntry> s = dataContainer.getAllEntriesForStorage();
+ marshaller.objectToObjectStream(s, o);
+ } catch (Exception e) {
+ throw new StateTransferException(e);
+ }
}
private void applyPersistentState(ObjectInputStream i) throws StateTransferException
{
- throw new StateTransferException("Implement me!");
+ if (cs == null) {
+ if (log.isDebugEnabled()) log.debug("Not configured to fetch persistent
state, or no cache store configured. Skipping applying persistent state.");
+ } else {
+ try {
+ cs.fromStream(i);
+ } catch (CacheLoaderException cle) {
+ throw new StateTransferException(cle);
+ }
+ }
}
-
+
private void generatePersistentState(ObjectOutputStream o) throws
StateTransferException {
- throw new StateTransferException("Implement me!");
+ if (cs == null) {
+ if (log.isDebugEnabled()) log.debug("Not configured to fetch persistent
state, or no cache store configured. Skipping generating persistent state.");
+ } else {
+ try {
+ cs.toStream(o);
+ } catch (CacheLoaderException cle) {
+ throw new StateTransferException(cle);
+ }
+ }
}
private void delimit(ObjectOutputStream o) throws IOException {
@@ -135,7 +190,8 @@
} catch (Exception e) {
throw new StateTransferException(e);
}
- if ((o == null) || !(o instanceof Delimiter)) throw new
StateTransferException("Expected a delimiter, recieved " + o);
+ if ((o == null) || !(o instanceof Delimiter))
+ throw new StateTransferException("Expected a delimiter, recieved " +
o);
}
// used as a marker for streams.
Added: core/branches/flat/src/main/java/org/horizon/transaction/TransactionLog.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/transaction/TransactionLog.java
(rev 0)
+++
core/branches/flat/src/main/java/org/horizon/transaction/TransactionLog.java 2009-02-25
18:30:30 UTC (rev 7790)
@@ -0,0 +1,178 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2005, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+*/
+package org.horizon.transaction;
+
+import org.horizon.commands.tx.PrepareCommand;
+import org.horizon.commands.write.WriteCommand;
+import org.horizon.logging.Log;
+import org.horizon.logging.LogFactory;
+import org.horizon.marshall.Marshaller;
+
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Logs transactions and writes for Non-Blocking State Transfer
+ *
+ * @author Jason T. Greene
+ */
+public class TransactionLog
+{
+ private final Map<GlobalTransaction, PrepareCommand> pendingPrepares = new
ConcurrentHashMap<GlobalTransaction, PrepareCommand>();
+ private final BlockingQueue<LogEntry> entries = new
LinkedBlockingQueue<LogEntry>();
+ private AtomicBoolean active = new AtomicBoolean();
+
+ public static class LogEntry
+ {
+ private final GlobalTransaction transaction;
+ private final List<WriteCommand> modifications;
+
+ public LogEntry(GlobalTransaction transaction, List<WriteCommand>
modifications)
+ {
+ this.transaction = transaction;
+ this.modifications = modifications;
+ }
+
+ public GlobalTransaction getTransaction()
+ {
+ return transaction;
+ }
+
+ public List<WriteCommand> getModifications()
+ {
+ return modifications;
+ }
+ }
+
+ private static Log log = LogFactory.getLog(TransactionLog.class);
+
+ public void logPrepare(PrepareCommand command)
+ {
+ pendingPrepares.put(command.getGlobalTransaction(), command);
+ }
+
+ public void logCommit(GlobalTransaction gtx)
+ {
+ PrepareCommand command = pendingPrepares.remove(gtx);
+ // it is perfectly normal for a prepare not to be logged for this gtx, for example
if a transaction did not
+ // modify anything, then beforeCompletion() is not invoked and logPrepare() will
not be called to register the
+ // prepare.
+ if (command != null) addEntry(new LogEntry(gtx, command.getModifications()));
+ }
+
+ private void addEntry(LogEntry entry)
+ {
+ if (! isActive())
+ return;
+
+ for (;;)
+ {
+ try
+ {
+ if (log.isTraceEnabled())
+ log.trace("Added commit entry to tx log" + entry);
+
+ entries.put(entry);
+ break;
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ public void logOnePhaseCommit(GlobalTransaction gtx, List<WriteCommand>
modifications)
+ {
+ // Just in case...
+ if (gtx != null) pendingPrepares.remove(gtx);
+ if (!modifications.isEmpty()) addEntry(new LogEntry(gtx, modifications));
+ }
+
+ public void logNoTxWrite(WriteCommand write)
+ {
+ if (! isActive())
+ return;
+
+ ArrayList<WriteCommand> list = new ArrayList<WriteCommand>();
+ list.add(write);
+ addEntry(new LogEntry(null, list));
+ }
+
+ public void rollback(GlobalTransaction gtx)
+ {
+ pendingPrepares.remove(gtx);
+ }
+
+ public boolean isActive()
+ {
+ return active.get();
+ }
+
+ public boolean activate()
+ {
+ return active.compareAndSet(false, true);
+ }
+
+ public void deactivate()
+ {
+ active.set(false);
+ if (entries.size() > 0)
+ log.error("Unprocessed Transaction Log Entries! = " +
entries.size());
+ entries.clear();
+ }
+
+ public int size()
+ {
+ return entries.size();
+ }
+
+ public void writeCommitLog(Marshaller marshaller, ObjectOutputStream out) throws
Exception
+ {
+ List<LogEntry> buffer = new ArrayList<LogEntry>(10);
+
+ while (entries.drainTo(buffer, 10) > 0)
+ {
+ for (LogEntry entry : buffer)
+ marshaller.objectToObjectStream(entry, out);
+
+ buffer.clear();
+ }
+ }
+
+ public void writePendingPrepares(Marshaller marshaller, ObjectOutputStream out) throws
Exception
+ {
+ for (PrepareCommand entry : pendingPrepares.values())
+ marshaller.objectToObjectStream(entry, out);
+ }
+
+ public boolean hasPendingPrepare(PrepareCommand command)
+ {
+ return pendingPrepares.containsKey(command.getGlobalTransaction());
+ }
+}
Modified: core/branches/flat/src/test/java/org/horizon/test/ReplListener.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/test/ReplListener.java 2009-02-25
17:39:20 UTC (rev 7789)
+++ core/branches/flat/src/test/java/org/horizon/test/ReplListener.java 2009-02-25
18:30:30 UTC (rev 7790)
@@ -1,16 +1,16 @@
package org.horizon.test;
import org.horizon.Cache;
+import org.horizon.commands.VisitableCommand;
+import org.horizon.commands.tx.CommitCommand;
+import org.horizon.commands.tx.PrepareCommand;
+import org.horizon.commands.write.WriteCommand;
import org.horizon.context.InvocationContext;
import org.horizon.interceptors.base.CommandInterceptor;
-import org.horizon.commands.VisitableCommand;
-import org.horizon.commands.DataCommand;
-import org.horizon.commands.tx.PrepareCommand;
-import org.horizon.commands.tx.CommitCommand;
+import java.util.Arrays;
+import java.util.HashSet;
import java.util.Set;
-import java.util.HashSet;
-import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -114,7 +114,7 @@
Object o = invokeNextInterceptor(ctx, cmd);
if (!ctx.isOriginLocal()) {
markAsVisited(cmd);
- for (DataCommand mod : cmd.getModifications()) markAsVisited(mod);
+ for (WriteCommand mod : cmd.getModifications()) markAsVisited(mod);
}
return o;
}