[infinispan-commits] Infinispan SVN: r169 - in trunk/core/src: main/java/org/infinispan/commands/write and 10 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Fri Apr 24 14:15:22 EDT 2009
Author: manik.surtani at jboss.com
Date: 2009-04-24 14:15:21 -0400 (Fri, 24 Apr 2009)
New Revision: 169
Added:
trunk/core/src/main/java/org/infinispan/commands/write/InvalidateL1Command.java
trunk/core/src/main/java/org/infinispan/factories/DistributionManagerFactory.java
trunk/core/src/main/java/org/infinispan/interceptors/DistLockingInterceptor.java
trunk/core/src/main/java/org/infinispan/interceptors/DistTxInterceptor.java
trunk/core/src/test/java/org/infinispan/distribution/MagicKeyTest.java
Modified:
trunk/core/src/main/java/org/infinispan/commands/CommandsFactory.java
trunk/core/src/main/java/org/infinispan/commands/CommandsFactoryImpl.java
trunk/core/src/main/java/org/infinispan/commands/RemoteCommandFactory.java
trunk/core/src/main/java/org/infinispan/commands/write/ClearCommand.java
trunk/core/src/main/java/org/infinispan/commands/write/InvalidateCommand.java
trunk/core/src/main/java/org/infinispan/commands/write/PutKeyValueCommand.java
trunk/core/src/main/java/org/infinispan/commands/write/PutMapCommand.java
trunk/core/src/main/java/org/infinispan/commands/write/RemoveCommand.java
trunk/core/src/main/java/org/infinispan/commands/write/ReplaceCommand.java
trunk/core/src/main/java/org/infinispan/commands/write/WriteCommand.java
trunk/core/src/main/java/org/infinispan/config/Configuration.java
trunk/core/src/main/java/org/infinispan/context/InvocationContextImpl.java
trunk/core/src/main/java/org/infinispan/distribution/DistributionManager.java
trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java
trunk/core/src/main/java/org/infinispan/factories/AbstractComponentRegistry.java
trunk/core/src/main/java/org/infinispan/factories/EmptyConstructorNamedCacheFactory.java
trunk/core/src/main/java/org/infinispan/factories/InterceptorChainFactory.java
trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java
trunk/core/src/main/java/org/infinispan/interceptors/LockingInterceptor.java
trunk/core/src/main/java/org/infinispan/interceptors/TxInterceptor.java
trunk/core/src/main/java/org/infinispan/loader/cluster/ClusterCacheLoader.java
trunk/core/src/main/java/org/infinispan/remoting/responses/ClusteredGetResponseValidityFilter.java
trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java
trunk/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java
trunk/core/src/test/java/org/infinispan/distribution/DefaultConsistentHashTest.java
trunk/core/src/test/java/org/infinispan/distribution/DistAsyncFuncTest.java
trunk/core/src/test/java/org/infinispan/distribution/DistAsyncTxFuncTest.java
trunk/core/src/test/java/org/infinispan/distribution/DistSyncFuncTest.java
trunk/core/src/test/java/org/infinispan/distribution/DistSyncTxFuncTest.java
trunk/core/src/test/java/org/infinispan/marshall/jboss/JBossMarshallerTest.java
Log:
[ISPN-30] (DIST) Initial implementation of DIST
Modified: trunk/core/src/main/java/org/infinispan/commands/CommandsFactory.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/CommandsFactory.java 2009-04-24 16:06:56 UTC (rev 168)
+++ trunk/core/src/main/java/org/infinispan/commands/CommandsFactory.java 2009-04-24 18:15:21 UTC (rev 169)
@@ -58,6 +58,8 @@
InvalidateCommand buildInvalidateCommand(Object... keys);
+ InvalidateCommand buildInvalidateFromL1Command(Object... keys);
+
ReplaceCommand buildReplaceCommand(Object key, Object oldValue, Object newValue, long lifespanMillis, long maxIdleTimeMillis);
SizeCommand buildSizeCommand();
Modified: trunk/core/src/main/java/org/infinispan/commands/CommandsFactoryImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/CommandsFactoryImpl.java 2009-04-24 16:06:56 UTC (rev 168)
+++ trunk/core/src/main/java/org/infinispan/commands/CommandsFactoryImpl.java 2009-04-24 18:15:21 UTC (rev 169)
@@ -34,11 +34,13 @@
import org.infinispan.commands.write.ClearCommand;
import org.infinispan.commands.write.EvictCommand;
import org.infinispan.commands.write.InvalidateCommand;
+import org.infinispan.commands.write.InvalidateL1Command;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.commands.write.PutMapCommand;
import org.infinispan.commands.write.RemoveCommand;
import org.infinispan.commands.write.ReplaceCommand;
import org.infinispan.container.DataContainer;
+import org.infinispan.distribution.DistributionManager;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.interceptors.InterceptorChain;
import org.infinispan.loader.CacheLoaderManager;
@@ -62,14 +64,18 @@
// some stateless commands can be reused so that they aren't constructed again all the time.
SizeCommand cachedSizeCommand;
private InterceptorChain interceptorChain;
+ private DistributionManager distributionManager;
@Inject
- public void setupDependencies(DataContainer container, CacheNotifier notifier, Cache cache, InterceptorChain interceptorChain, CacheLoaderManager clManager) {
+ public void setupDependencies(DataContainer container, CacheNotifier notifier, Cache cache,
+ InterceptorChain interceptorChain, CacheLoaderManager clManager,
+ DistributionManager distributionManager) {
this.dataContainer = container;
this.notifier = notifier;
this.cache = cache;
this.interceptorChain = interceptorChain;
this.cacheLoaderManager = clManager;
+ this.distributionManager = distributionManager;
}
public PutKeyValueCommand buildPutKeyValueCommand(Object key, Object value, long lifespanMillis, long maxIdleTimeMillis) {
@@ -84,6 +90,10 @@
return new InvalidateCommand(notifier, keys);
}
+ public InvalidateCommand buildInvalidateFromL1Command(Object... keys) {
+ return new InvalidateL1Command(distributionManager, notifier, keys);
+ }
+
public ReplaceCommand buildReplaceCommand(Object key, Object oldValue, Object newValue, long lifespan, long maxIdleTimeMillis) {
return new ReplaceCommand(key, oldValue, newValue, lifespan, maxIdleTimeMillis);
}
@@ -171,6 +181,10 @@
InvalidateCommand ic = (InvalidateCommand) c;
ic.init(notifier);
break;
+ case InvalidateL1Command.COMMAND_ID:
+ InvalidateL1Command ilc = (InvalidateL1Command) c;
+ ilc.init(distributionManager, notifier);
+ break;
case PrepareCommand.COMMAND_ID:
PrepareCommand pc = (PrepareCommand) c;
if (pc.getModifications() != null)
Modified: trunk/core/src/main/java/org/infinispan/commands/RemoteCommandFactory.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/RemoteCommandFactory.java 2009-04-24 16:06:56 UTC (rev 168)
+++ trunk/core/src/main/java/org/infinispan/commands/RemoteCommandFactory.java 2009-04-24 18:15:21 UTC (rev 169)
@@ -11,6 +11,7 @@
import org.infinispan.commands.tx.RollbackCommand;
import org.infinispan.commands.write.ClearCommand;
import org.infinispan.commands.write.InvalidateCommand;
+import org.infinispan.commands.write.InvalidateL1Command;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.commands.write.PutMapCommand;
import org.infinispan.commands.write.RemoveCommand;
@@ -29,7 +30,7 @@
@Scope(Scopes.GLOBAL)
public class RemoteCommandFactory {
RpcManager rpcManager;
-
+
@Inject
public void init(RpcManager rpcManager) {
this.rpcManager = rpcManager;
@@ -85,6 +86,9 @@
case InvalidateCommand.COMMAND_ID:
command = new InvalidateCommand();
break;
+ case InvalidateL1Command.COMMAND_ID:
+ command = new InvalidateL1Command();
+ break;
case StateTransferControlCommand.METHOD_ID:
command = new StateTransferControlCommand();
((StateTransferControlCommand) command).init(rpcManager);
Modified: trunk/core/src/main/java/org/infinispan/commands/write/ClearCommand.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/write/ClearCommand.java 2009-04-24 16:06:56 UTC (rev 168)
+++ trunk/core/src/main/java/org/infinispan/commands/write/ClearCommand.java 2009-04-24 18:15:21 UTC (rev 169)
@@ -22,8 +22,8 @@
package org.infinispan.commands.write;
import org.infinispan.commands.Visitor;
+import org.infinispan.container.entries.CacheEntry;
import org.infinispan.container.entries.MVCCEntry;
-import org.infinispan.container.entries.CacheEntry;
import org.infinispan.context.InvocationContext;
/**
@@ -69,4 +69,8 @@
public boolean isSuccessful() {
return true;
}
+
+ public boolean isConditional() {
+ return false;
+ }
}
Modified: trunk/core/src/main/java/org/infinispan/commands/write/InvalidateCommand.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/write/InvalidateCommand.java 2009-04-24 16:06:56 UTC (rev 168)
+++ trunk/core/src/main/java/org/infinispan/commands/write/InvalidateCommand.java 2009-04-24 18:15:21 UTC (rev 169)
@@ -56,20 +56,24 @@
* @param ctx invocation context
* @return null
*/
+ @Override
public Object perform(InvocationContext ctx) throws Throwable {
- if (trace) log.trace("Invalidating keys:" + Arrays.toString(keys));
- for (Object key : keys) {
- this.key = key;
- super.perform(ctx);
- }
+ if (trace) log.trace("Invalidating keys {0}", Arrays.toString(keys));
+ for (Object k : keys) invalidate(ctx, k);
return null;
}
+ protected void invalidate(InvocationContext ctx, Object keyToInvalidate) throws Throwable {
+ this.key = keyToInvalidate; // so that the superclass can see it
+ super.perform(ctx);
+ }
+
@Override
protected void notify(InvocationContext ctx, Object value, boolean isPre) {
notifier.notifyCacheEntryInvalidated(key, isPre, ctx);
}
+ @Override
public byte getCommandId() {
return COMMAND_ID;
}
@@ -106,6 +110,7 @@
}
}
+ @Override
public Object acceptVisitor(InvocationContext ctx, Visitor visitor) throws Throwable {
return visitor.visitInvalidateCommand(ctx, this);
}
Added: trunk/core/src/main/java/org/infinispan/commands/write/InvalidateL1Command.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/write/InvalidateL1Command.java (rev 0)
+++ trunk/core/src/main/java/org/infinispan/commands/write/InvalidateL1Command.java 2009-04-24 18:15:21 UTC (rev 169)
@@ -0,0 +1,42 @@
+package org.infinispan.commands.write;
+
+import org.infinispan.context.InvocationContext;
+import org.infinispan.distribution.DistributionManager;
+import org.infinispan.notifications.cachelistener.CacheNotifier;
+
+/**
+ * Invalidates an entry in a L1 cache (used with DIST mode)
+ *
+ * @author Manik Surtani
+ * @since 4.0
+ */
+public class InvalidateL1Command extends InvalidateCommand {
+ public static final int COMMAND_ID = 48;
+ private DistributionManager dm;
+
+ public InvalidateL1Command() {
+ }
+
+ public InvalidateL1Command(DistributionManager dm, CacheNotifier notifier, Object... keys) {
+ super(notifier, keys);
+ this.dm = dm;
+ }
+
+ @Override
+ public byte getCommandId() {
+ return COMMAND_ID;
+ }
+
+ public void init(DistributionManager dm, CacheNotifier n) {
+ super.init(n);
+ this.dm = dm;
+ }
+
+ @Override
+ public Object perform(InvocationContext ctx) throws Throwable {
+ for (Object k : getKeys()) {
+ if (!dm.isLocal(k)) invalidate(ctx, k);
+ }
+ return null;
+ }
+}
Property changes on: trunk/core/src/main/java/org/infinispan/commands/write/InvalidateL1Command.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Modified: trunk/core/src/main/java/org/infinispan/commands/write/PutKeyValueCommand.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/write/PutKeyValueCommand.java 2009-04-24 16:06:56 UTC (rev 168)
+++ trunk/core/src/main/java/org/infinispan/commands/write/PutKeyValueCommand.java 2009-04-24 18:15:21 UTC (rev 169)
@@ -172,4 +172,8 @@
public boolean isSuccessful() {
return successful;
}
+
+ public boolean isConditional() {
+ return putIfAbsent;
+ }
}
\ No newline at end of file
Modified: trunk/core/src/main/java/org/infinispan/commands/write/PutMapCommand.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/write/PutMapCommand.java 2009-04-24 16:06:56 UTC (rev 168)
+++ trunk/core/src/main/java/org/infinispan/commands/write/PutMapCommand.java 2009-04-24 18:15:21 UTC (rev 169)
@@ -131,6 +131,10 @@
return true;
}
+ public boolean isConditional() {
+ return false;
+ }
+
public long getLifespanMillis() {
return lifespanMillis;
}
Modified: trunk/core/src/main/java/org/infinispan/commands/write/RemoveCommand.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/write/RemoveCommand.java 2009-04-24 16:06:56 UTC (rev 168)
+++ trunk/core/src/main/java/org/infinispan/commands/write/RemoveCommand.java 2009-04-24 18:15:21 UTC (rev 169)
@@ -42,10 +42,11 @@
protected CacheNotifier notifier;
boolean successful = true;
- /** When not null, value indicates that the entry should only be removed if
- * the key is mapped to this value. By the time the RemoveCommand needs to
- * be marshalled, the condition must have been true locally already, so
- * there's no need to marshall the value. **/
+ /**
+ * When not null, value indicates that the entry should only be removed if the key is mapped to this value. By the
+ * time the RemoveCommand needs to be marshalled, the condition must have been true locally already, so there's no
+ * need to marshall the value. *
+ */
protected transient Object value;
public RemoveCommand(Object key, Object value, CacheNotifier notifier) {
@@ -125,4 +126,8 @@
public boolean isSuccessful() {
return successful;
}
+
+ public boolean isConditional() {
+ return value != null;
+ }
}
Modified: trunk/core/src/main/java/org/infinispan/commands/write/ReplaceCommand.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/write/ReplaceCommand.java 2009-04-24 16:06:56 UTC (rev 168)
+++ trunk/core/src/main/java/org/infinispan/commands/write/ReplaceCommand.java 2009-04-24 18:15:21 UTC (rev 169)
@@ -40,7 +40,7 @@
long maxIdleTimeMillis = -1;
boolean successful = true;
- public ReplaceCommand() {
+ public ReplaceCommand() {
}
public ReplaceCommand(Object key, Object oldValue, Object newValue, long lifespanMillis, long maxIdleTimeMillis) {
@@ -136,6 +136,10 @@
return successful;
}
+ public boolean isConditional() {
+ return true;
+ }
+
public long getLifespanMillis() {
return lifespanMillis;
}
Modified: trunk/core/src/main/java/org/infinispan/commands/write/WriteCommand.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/write/WriteCommand.java 2009-04-24 16:06:56 UTC (rev 168)
+++ trunk/core/src/main/java/org/infinispan/commands/write/WriteCommand.java 2009-04-24 18:15:21 UTC (rev 169)
@@ -17,4 +17,13 @@
* @return true if the command completed successfully, false otherwise.
*/
boolean isSuccessful();
+
+ /**
+ * Certain commands only work based on a certain condition or state of the cache. For example, {@link
+ * org.infinispan.Cache#putIfAbsent(Object, Object)} only does anything if a condition is met, i.e., the entry in
+ * question is not already present. This method tests whether the command in question is conditional or not.
+ *
+ * @return true if the command is conditional, false otherwise
+ */
+ boolean isConditional();
}
Modified: trunk/core/src/main/java/org/infinispan/config/Configuration.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/config/Configuration.java 2009-04-24 16:06:56 UTC (rev 168)
+++ trunk/core/src/main/java/org/infinispan/config/Configuration.java 2009-04-24 18:15:21 UTC (rev 169)
@@ -77,6 +77,7 @@
}
public void setUnsafeUnreliableReturnValues(boolean unsafeUnreliableReturnValues) {
+ testImmutability("unsafeUnreliableReturnValues");
this.unsafeUnreliableReturnValues = unsafeUnreliableReturnValues;
}
Modified: trunk/core/src/main/java/org/infinispan/context/InvocationContextImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/context/InvocationContextImpl.java 2009-04-24 16:06:56 UTC (rev 168)
+++ trunk/core/src/main/java/org/infinispan/context/InvocationContextImpl.java 2009-04-24 18:15:21 UTC (rev 169)
@@ -305,7 +305,6 @@
", transactionContext=" + transactionContext +
", flags=" + flags +
", contextFlags=" + contextFlags +
-// ", invocationLocks=" + locks +
", lookedUpEntries size=" + (lookedUpEntries == null ? 0 : lookedUpEntries.size()) +
'}';
}
Modified: trunk/core/src/main/java/org/infinispan/distribution/DistributionManager.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/DistributionManager.java 2009-04-24 16:06:56 UTC (rev 168)
+++ trunk/core/src/main/java/org/infinispan/distribution/DistributionManager.java 2009-04-24 18:15:21 UTC (rev 169)
@@ -1,5 +1,7 @@
package org.infinispan.distribution;
+import org.infinispan.container.entries.CacheEntry;
+import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.factories.scopes.Scope;
import org.infinispan.factories.scopes.Scopes;
import org.infinispan.remoting.transport.Address;
@@ -40,5 +42,20 @@
* @return a list of addresses where the key may reside
*/
Map<Object, List<Address>> locateAll(Collection<Object> keys);
+
+ /**
+ * Transforms a cache entry so it is marked for L1 rather than the primary cache data structure.
+ *
+ * @param entry entry to transform
+ */
+ void transformForL1(CacheEntry entry);
+
+ /**
+ * Retrieves a cache entry from a remote source
+ *
+ * @param key key to look up
+ * @return an internal cache entry, or null if it cannot be located
+ */
+ InternalCacheEntry retrieveFromRemoteSource(Object key) throws Exception;
}
Modified: trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java 2009-04-24 16:06:56 UTC (rev 168)
+++ trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java 2009-04-24 18:15:21 UTC (rev 169)
@@ -1,6 +1,10 @@
package org.infinispan.distribution;
+import org.infinispan.commands.CommandsFactory;
+import org.infinispan.commands.remote.ClusteredGetCommand;
import org.infinispan.config.Configuration;
+import org.infinispan.container.entries.CacheEntry;
+import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.factories.annotations.Stop;
@@ -10,7 +14,12 @@
import org.infinispan.notifications.cachemanagerlistener.CacheManagerNotifier;
import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged;
import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
+import org.infinispan.remoting.ResponseFilter;
+import org.infinispan.remoting.ResponseMode;
import org.infinispan.remoting.RpcManager;
+import org.infinispan.remoting.responses.ClusteredGetResponseValidityFilter;
+import org.infinispan.remoting.responses.Response;
+import org.infinispan.remoting.responses.SuccessfulResponse;
import org.infinispan.remoting.transport.Address;
import org.infinispan.util.Util;
@@ -19,7 +28,7 @@
import java.util.Map;
/**
- * // TODO: Manik: Document this
+ * The default distribution manager implementation
*
* @author Manik Surtani
* @since 4.0
@@ -33,12 +42,14 @@
CacheManagerNotifier notifier;
int replCount;
ViewChangeListener listener;
+ CommandsFactory cf;
@Inject
- public void init(Configuration configuration, RpcManager rpcManager, CacheManagerNotifier notifier) {
+ public void init(Configuration configuration, RpcManager rpcManager, CacheManagerNotifier notifier, CommandsFactory cf) {
this.configuration = configuration;
this.rpcManager = rpcManager;
this.notifier = notifier;
+ this.cf = cf;
}
// needs to be AFTER the RpcManager
@@ -66,16 +77,36 @@
}
public List<Address> locate(Object key) {
- List<Address> adds = consistentHash.locate(key, replCount);
- if (trace) log.trace("Located {0} addresses for key {1}. Repl count is {2}, addresses are {3}", adds.size(),
- key, replCount, adds);
- return adds;
+ return consistentHash.locate(key, replCount);
}
public Map<Object, List<Address>> locateAll(Collection<Object> keys) {
return consistentHash.locateAll(keys, replCount);
}
+ public void transformForL1(CacheEntry entry) {
+ if (entry.getLifespan() < 0 || entry.getLifespan() > configuration.getL1Lifespan())
+ entry.setLifespan(configuration.getL1Lifespan());
+ }
+
+ public InternalCacheEntry retrieveFromRemoteSource(Object key) throws Exception {
+ ClusteredGetCommand get = cf.buildClusteredGetCommand(key);
+
+ ResponseFilter filter = new ClusteredGetResponseValidityFilter(locate(key));
+ List<Response> responses = rpcManager.invokeRemotely(locate(key), get, ResponseMode.SYNCHRONOUS,
+ configuration.getSyncReplTimeout(), false, filter, false);
+
+ if (!responses.isEmpty()) {
+ for (Response r : responses) {
+ if (r instanceof SuccessfulResponse) {
+ return (InternalCacheEntry) ((SuccessfulResponse) r).getResponseValue();
+ }
+ }
+ }
+
+ return null;
+ }
+
@Listener
public class ViewChangeListener {
@ViewChanged
Modified: trunk/core/src/main/java/org/infinispan/factories/AbstractComponentRegistry.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/factories/AbstractComponentRegistry.java 2009-04-24 16:06:56 UTC (rev 168)
+++ trunk/core/src/main/java/org/infinispan/factories/AbstractComponentRegistry.java 2009-04-24 18:15:21 UTC (rev 169)
@@ -155,6 +155,7 @@
s.add(TransportFactory.class);
s.add(MarshallerFactory.class);
s.add(ResponseGeneratorFactory.class);
+ s.add(DistributionManagerFactory.class);
return s;
}
Added: trunk/core/src/main/java/org/infinispan/factories/DistributionManagerFactory.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/factories/DistributionManagerFactory.java (rev 0)
+++ trunk/core/src/main/java/org/infinispan/factories/DistributionManagerFactory.java 2009-04-24 18:15:21 UTC (rev 169)
@@ -0,0 +1,16 @@
+package org.infinispan.factories;
+
+import org.infinispan.distribution.DistributionManager;
+import org.infinispan.distribution.DistributionManagerImpl;
+import org.infinispan.factories.annotations.DefaultFactoryFor;
+
+ at DefaultFactoryFor(classes = DistributionManager.class)
+public class DistributionManagerFactory extends AbstractNamedCacheComponentFactory implements AutoInstantiableFactory {
+ @SuppressWarnings("unchecked")
+ public <T> T construct(Class<T> componentType) {
+ if (configuration.getCacheMode().isDistributed())
+ return (T) new DistributionManagerImpl();
+ else
+ return null;
+ }
+}
Property changes on: trunk/core/src/main/java/org/infinispan/factories/DistributionManagerFactory.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Modified: trunk/core/src/main/java/org/infinispan/factories/EmptyConstructorNamedCacheFactory.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/factories/EmptyConstructorNamedCacheFactory.java 2009-04-24 16:06:56 UTC (rev 168)
+++ trunk/core/src/main/java/org/infinispan/factories/EmptyConstructorNamedCacheFactory.java 2009-04-24 18:15:21 UTC (rev 169)
@@ -25,7 +25,6 @@
import org.infinispan.batch.BatchContainer;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.config.ConfigurationException;
-import org.infinispan.distribution.DistributionManager;
import org.infinispan.eviction.EvictionManager;
import org.infinispan.factories.annotations.DefaultFactoryFor;
import org.infinispan.factories.context.ContextFactory;
@@ -46,7 +45,7 @@
@DefaultFactoryFor(classes = {CacheNotifier.class, EntryFactory.class, CommandsFactory.class,
CacheLoaderManager.class, InvocationContextContainer.class,
TransactionTable.class, BatchContainer.class, ContextFactory.class,
- TransactionLog.class, EvictionManager.class, DistributionManager.class})
+ TransactionLog.class, EvictionManager.class})
public class EmptyConstructorNamedCacheFactory extends AbstractNamedCacheComponentFactory implements AutoInstantiableFactory {
@Override
public <T> T construct(Class<T> componentType) {
Modified: trunk/core/src/main/java/org/infinispan/factories/InterceptorChainFactory.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/factories/InterceptorChainFactory.java 2009-04-24 16:06:56 UTC (rev 168)
+++ trunk/core/src/main/java/org/infinispan/factories/InterceptorChainFactory.java 2009-04-24 18:15:21 UTC (rev 169)
@@ -77,7 +77,10 @@
interceptorChain.appendIntereceptor(createInterceptor(CacheMgmtInterceptor.class));
// load the tx interceptor
- interceptorChain.appendIntereceptor(createInterceptor(TxInterceptor.class));
+ if (configuration.getCacheMode().isDistributed())
+ interceptorChain.appendIntereceptor(createInterceptor(DistTxInterceptor.class));
+ else
+ interceptorChain.appendIntereceptor(createInterceptor(TxInterceptor.class));
if (configuration.isUseLazyDeserialization())
interceptorChain.appendIntereceptor(createInterceptor(MarshalledValueInterceptor.class));
@@ -110,8 +113,12 @@
interceptorChain.appendIntereceptor(createInterceptor(CacheStoreInterceptor.class));
}
}
- interceptorChain.appendIntereceptor(createInterceptor(LockingInterceptor.class));
+ if (configuration.getCacheMode().isDistributed())
+ interceptorChain.appendIntereceptor(createInterceptor(DistLockingInterceptor.class));
+ else
+ interceptorChain.appendIntereceptor(createInterceptor(LockingInterceptor.class));
+
CommandInterceptor callInterceptor = createInterceptor(CallInterceptor.class);
interceptorChain.appendIntereceptor(callInterceptor);
if (log.isTraceEnabled()) log.trace("Finished building default interceptor chain.");
Added: trunk/core/src/main/java/org/infinispan/interceptors/DistLockingInterceptor.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/interceptors/DistLockingInterceptor.java (rev 0)
+++ trunk/core/src/main/java/org/infinispan/interceptors/DistLockingInterceptor.java 2009-04-24 18:15:21 UTC (rev 169)
@@ -0,0 +1,28 @@
+package org.infinispan.interceptors;
+
+import org.infinispan.container.entries.CacheEntry;
+import org.infinispan.context.InvocationContext;
+import org.infinispan.distribution.DistributionManager;
+import org.infinispan.factories.annotations.Inject;
+
+/**
+ * A subclass of the locking interceptor that is able to differentiate committing changes on a ReadCommittedEntry for
+ * storage in the main cache or in L1, used by DIST
+ *
+ * @author Manik Surtani
+ * @since 4.0
+ */
+public class DistLockingInterceptor extends LockingInterceptor {
+ DistributionManager dm;
+
+ @Inject
+ public void injectDistributionManager(DistributionManager dm) {
+ this.dm = dm;
+ }
+
+ @Override
+ protected void commitEntry(InvocationContext ctx, CacheEntry entry) {
+ if (!dm.isLocal(entry.getKey())) dm.transformForL1(entry);
+ entry.commit(dataContainer);
+ }
+}
Property changes on: trunk/core/src/main/java/org/infinispan/interceptors/DistLockingInterceptor.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Added: trunk/core/src/main/java/org/infinispan/interceptors/DistTxInterceptor.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/interceptors/DistTxInterceptor.java (rev 0)
+++ trunk/core/src/main/java/org/infinispan/interceptors/DistTxInterceptor.java 2009-04-24 18:15:21 UTC (rev 169)
@@ -0,0 +1,84 @@
+package org.infinispan.interceptors;
+
+import org.infinispan.commands.AbstractVisitor;
+import org.infinispan.commands.VisitableCommand;
+import org.infinispan.commands.write.DataWriteCommand;
+import org.infinispan.commands.write.PutKeyValueCommand;
+import org.infinispan.commands.write.PutMapCommand;
+import org.infinispan.commands.write.RemoveCommand;
+import org.infinispan.commands.write.ReplaceCommand;
+import org.infinispan.context.InvocationContext;
+import org.infinispan.distribution.DistributionManager;
+import org.infinispan.factories.annotations.Inject;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A special form of the TxInterceptor that is aware of distribution and consistent hashing, and as such only replays
+ * methods during a remote prepare that are targeted to this specific cache instance.
+ *
+ * @author Manik Surtani
+ * @since 4.0
+ */
+public class DistTxInterceptor extends TxInterceptor {
+ DistributionManager dm;
+ ReplayCommandVisitor replayCommandVisitor = new ReplayCommandVisitor();
+
+ @Inject
+ public void injectDistributionManager(DistributionManager dm) {
+ this.dm = dm;
+ }
+
+ /**
+ * Only replays modifications that are
+ */
+ @Override
+ protected VisitableCommand getCommandToReplay(VisitableCommand command) {
+ try {
+ return (VisitableCommand) command.acceptVisitor(null, replayCommandVisitor);
+ } catch (RuntimeException re) {
+ throw re;
+ } catch (Throwable th) {
+ throw new RuntimeException(th);
+ }
+ }
+
+ class ReplayCommandVisitor extends AbstractVisitor {
+ @Override
+ public Object visitPutMapCommand(InvocationContext ignored, PutMapCommand command) {
+ Map newMap = new HashMap();
+ for (Map.Entry entry : command.getMap().entrySet()) {
+ if (dm.isLocal(entry.getKey())) newMap.put(entry.getKey(), entry.getValue());
+ }
+
+ if (newMap.isEmpty()) return null;
+ if (newMap.size() == command.getMap().size()) return command;
+ return commandsFactory.buildPutMapCommand(newMap, command.getLifespanMillis(), command.getMaxIdleTimeMillis());
+ }
+
+ @Override
+ public Object visitPutKeyValueCommand(InvocationContext ignored, PutKeyValueCommand command) {
+ return visitDataWriteCommand(command);
+ }
+
+ @Override
+ public Object visitRemoveCommand(InvocationContext ignored, RemoveCommand command) {
+ return visitDataWriteCommand(command);
+ }
+
+ @Override
+ public Object visitReplaceCommand(InvocationContext ignored, ReplaceCommand command) {
+ return visitDataWriteCommand(command);
+ }
+
+ private VisitableCommand visitDataWriteCommand(DataWriteCommand command) {
+ return dm.isLocal(command.getKey()) ? command : null;
+ }
+
+ @Override
+ public Object handleDefault(InvocationContext ignored, VisitableCommand command) {
+ return command;
+ }
+ }
+}
Property changes on: trunk/core/src/main/java/org/infinispan/interceptors/DistTxInterceptor.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Modified: trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java 2009-04-24 16:06:56 UTC (rev 168)
+++ trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java 2009-04-24 18:15:21 UTC (rev 169)
@@ -3,7 +3,6 @@
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.DataCommand;
import org.infinispan.commands.read.GetKeyValueCommand;
-import org.infinispan.commands.remote.ClusteredGetCommand;
import org.infinispan.commands.tx.CommitCommand;
import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.commands.tx.RollbackCommand;
@@ -20,8 +19,6 @@
import org.infinispan.distribution.DistributionManager;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.interceptors.base.BaseRpcInterceptor;
-import org.infinispan.remoting.ResponseMode;
-import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.transport.Address;
import org.infinispan.transaction.GlobalTransaction;
import org.infinispan.util.Immutables;
@@ -34,7 +31,7 @@
import java.util.concurrent.ConcurrentHashMap;
/**
- * // TODO: Document this
+ * The interceptor that handles distribution of entries across a cluster, as well as transparent lookup
*
* @author manik
* @since 4.0
@@ -45,13 +42,14 @@
// TODO move this to the transaction context. Will scale better there.
private final Map<GlobalTransaction, List<Address>> txRecipients = new ConcurrentHashMap<GlobalTransaction, List<Address>>();
static final RecipientGenerator CLEAR_COMMAND_GENERATOR = new RecipientGenerator() {
+ private final Object[] EMPTY_ARRAY = {};
public List<Address> generateRecipients() {
return null;
}
public Object[] getKeys() {
- return null;
+ return EMPTY_ARRAY;
}
};
@@ -70,31 +68,36 @@
@Override
public Object visitGetKeyValueCommand(InvocationContext ctx, GetKeyValueCommand command) throws Throwable {
Object returnValue = invokeNextInterceptor(ctx, command);
- if (returnValue == null) {
+ // need to check in the context as well since a null retval is not necessarily an indication of the entry not being
+ // available. It could just have been removed in the same tx beforehand.
+ if (returnValue == null && ctx.lookupEntry(command.getKey()) == null)
+ returnValue = remoteGetAndStoreInL1(ctx, command.getKey());
+ return returnValue;
+ }
+
+ private Object remoteGetAndStoreInL1(InvocationContext ctx, Object key) throws Throwable {
+ if (ctx.isOriginLocal() && !dm.isLocal(key)) {
+ if (trace) log.trace("Doing a remote get for key {0}", key);
// attempt a remote lookup
- // TODO update ClusteredGetCommand (maybe a new command?) to ensure we get back ICEs.
- ClusteredGetCommand get = cf.buildClusteredGetCommand(command.getKey());
- // TODO use a RspFilter to filter responses
- List<Response> responses = rpcManager.invokeRemotely(dm.locate(command.getKey()), get, ResponseMode.SYNCHRONOUS,
- configuration.getSyncReplTimeout(), false, false);
+ InternalCacheEntry ice = dm.retrieveFromRemoteSource(key);
- // the first response is all that matters
- if (responses.isEmpty()) return returnValue;
-
- for (Object response : responses) {
- if (!(response instanceof Throwable)) {
- InternalCacheEntry ice = (InternalCacheEntry) response;
- if (configuration.isL1CacheEnabled()) {
- long lifespan = ice.getLifespan() < 0 ? configuration.getL1Lifespan() : Math.min(ice.getLifespan(), configuration.getL1Lifespan());
- PutKeyValueCommand put = cf.buildPutKeyValueCommand(ice.getKey(), ice.getValue(), lifespan, -1);
- invokeNextInterceptor(ctx, put);
- }
- return ice.getValue();
+ if (ice != null) {
+ if (configuration.isL1CacheEnabled()) {
+ if (trace) log.trace("Caching remotely retrieved entry for key {0} in L1", key);
+ long lifespan = ice.getLifespan() < 0 ? configuration.getL1Lifespan() : Math.min(ice.getLifespan(), configuration.getL1Lifespan());
+ PutKeyValueCommand put = cf.buildPutKeyValueCommand(ice.getKey(), ice.getValue(), lifespan, -1);
+ invokeNextInterceptor(ctx, put);
+ } else {
+ if (trace) log.trace("Not caching remotely retrieved entry for key {0} in L1", key);
}
+ return ice.getValue();
}
- return null;
+
+ } else {
+ if (trace)
+ log.trace("Not doing a remote get for key {0} since entry is mapped to current node, or is in L1", key);
}
- return returnValue;
+ return null;
}
// ---- WRITE commands
@@ -113,6 +116,7 @@
@Override
public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws Throwable {
+
return handleWriteCommand(ctx, command,
new SingleKeyRecipientGenerator(command.getKey()));
}
@@ -163,7 +167,7 @@
txRecipients.put(command.getGlobalTransaction(), recipients);
// this method will return immediately if we're the only member (because exclude_self=true)
- replicateCall(ctx, command, sync);
+ replicateCall(ctx, recipients, command, sync, false);
}
return retVal;
@@ -202,12 +206,21 @@
}
+ private void remoteGetBeforeWrite(InvocationContext ctx, Object... keys) throws Throwable {
+ // only do this if we are sync (OR if we dont care about return values!)
+// if (!configuration.isUnsafeUnreliableReturnValues()) {
+ for (Object k : keys) remoteGetAndStoreInL1(ctx, k);
+// }
+ }
+
/**
* If we are within one transaction we won't do any replication as replication would only be performed at commit
* time. If the operation didn't originate locally we won't do any replication either.
*/
private Object handleWriteCommand(InvocationContext ctx, WriteCommand command, RecipientGenerator recipientGenerator) throws Throwable {
boolean local = isLocalModeForced(ctx);
+ // see if we need to load values from remote srcs first
+ remoteGetBeforeWrite(ctx, recipientGenerator.getKeys());
if (local && ctx.getTransaction() == null) return invokeNextInterceptor(ctx, command);
// FIRST pass this call up the chain. Only if it succeeds (no exceptions) locally do we attempt to replicate.
@@ -215,16 +228,11 @@
if (command.isSuccessful()) {
if (ctx.getTransaction() == null && ctx.isOriginLocal()) {
- if (trace) {
- log.trace("invoking method " + command.getClass().getSimpleName() + ", members=" + rpcManager.getTransport().getMembers() + ", mode=" +
- configuration.getCacheMode() + ", exclude_self=" + true + ", timeout=" +
- configuration.getSyncReplTimeout());
- }
-
List<Address> rec = recipientGenerator.generateRecipients();
+ if (trace) log.trace("Invoking command {0} on hosts {1}", command, rec);
// if L1 caching is used make sure we broadcast an invalidate message
if (configuration.isL1CacheEnabled() && rec != null) {
- InvalidateCommand ic = cf.buildInvalidateCommand(recipientGenerator.getKeys());
+ InvalidateCommand ic = cf.buildInvalidateFromL1Command(recipientGenerator.getKeys());
replicateCall(ctx, ic, isSynchronous(ctx), false);
}
replicateCall(ctx, rec, command, isSynchronous(ctx), false);
Modified: trunk/core/src/main/java/org/infinispan/interceptors/LockingInterceptor.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/interceptors/LockingInterceptor.java 2009-04-24 16:06:56 UTC (rev 168)
+++ trunk/core/src/main/java/org/infinispan/interceptors/LockingInterceptor.java 2009-04-24 18:15:21 UTC (rev 169)
@@ -222,6 +222,7 @@
if (trace) log.trace("Nothing to do since there are no modifications in scope.");
}
} else {
+ if (trace) log.trace("Transactional. Not cleaning up locks till the transaction ends.");
if (useReadCommitted) {
Map<Object, CacheEntry> lookedUpEntries = ctx.getLookedUpEntries();
if (!lookedUpEntries.isEmpty()) {
@@ -258,8 +259,9 @@
Object key = e.getKey();
boolean needToUnlock = lockManager.possiblyLocked(entry);
// could be null with read-committed
- if (entry != null && entry.isChanged()) entry.commit(dataContainer);
- else {
+ if (entry != null && entry.isChanged()) {
+ commitEntry(ctx, entry);
+ } else {
if (trace) log.trace("Entry for key {0} is null, not calling commitUpdate", key);
}
@@ -291,6 +293,10 @@
ctx.setContainsLocks(false);
}
+ protected void commitEntry(InvocationContext ctx, CacheEntry entry) {
+ entry.commit(dataContainer);
+ }
+
@SuppressWarnings("unchecked")
private void transactionalCleanup(boolean commit, InvocationContext ctx) {
if (ctx.getTransactionContext() != null) {
Modified: trunk/core/src/main/java/org/infinispan/interceptors/TxInterceptor.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/interceptors/TxInterceptor.java 2009-04-24 16:06:56 UTC (rev 168)
+++ trunk/core/src/main/java/org/infinispan/interceptors/TxInterceptor.java 2009-04-24 18:15:21 UTC (rev 169)
@@ -34,8 +34,8 @@
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.context.ContextFactory;
+import org.infinispan.invocation.Flag;
import org.infinispan.invocation.InvocationContextContainer;
-import org.infinispan.invocation.Flag;
import org.infinispan.jmx.annotations.ManagedAttribute;
import org.infinispan.jmx.annotations.ManagedOperation;
import org.infinispan.lock.LockManager;
@@ -405,14 +405,29 @@
}
/**
+ * Designed to be overridden. Returns a VisitableCommand fit for replaying locally, based on the modification passed
+ * in. If a null value is returned, this means that the command should not be replayed.
+ *
+ * @param modification modification in a prepare call
+ * @return a VisitableCommand representing this modification, fit for replaying, or null if the command should not be
+ * replayed.
+ */
+ protected VisitableCommand getCommandToReplay(VisitableCommand modification) {
+ return modification;
+ }
+
+ /**
* Replays modifications
*/
- protected void replayModifications(InvocationContext ctx, Transaction ltx, PrepareCommand command) throws Throwable {
+ private void replayModifications(InvocationContext ctx, Transaction ltx, PrepareCommand command) throws Throwable {
try {
// replay modifications
for (VisitableCommand modification : command.getModifications()) {
- invokeNextInterceptor(ctx, modification);
- assertTxIsStillValid(ltx);
+ VisitableCommand toReplay = getCommandToReplay(modification);
+ if (toReplay != null) {
+ invokeNextInterceptor(ctx, toReplay);
+ assertTxIsStillValid(ltx);
+ }
}
}
catch (Throwable th) {
Modified: trunk/core/src/main/java/org/infinispan/loader/cluster/ClusterCacheLoader.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/loader/cluster/ClusterCacheLoader.java 2009-04-24 16:06:56 UTC (rev 168)
+++ trunk/core/src/main/java/org/infinispan/loader/cluster/ClusterCacheLoader.java 2009-04-24 18:15:21 UTC (rev 169)
@@ -18,8 +18,10 @@
import org.infinispan.remoting.responses.ClusteredGetResponseValidityFilter;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.responses.SuccessfulResponse;
+import org.infinispan.remoting.transport.Address;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -77,7 +79,9 @@
}
private List<Response> doRemoteCall(ClusteredGetCommand clusteredGetCommand) throws CacheLoaderException {
- ResponseFilter filter = new ClusteredGetResponseValidityFilter(rpcManager.getTransport().getMembers(), rpcManager.getLocalAddress());
+ Set<Address> validMembers = new HashSet<Address>(rpcManager.getTransport().getMembers());
+ validMembers.remove(rpcManager.getLocalAddress());
+ ResponseFilter filter = new ClusteredGetResponseValidityFilter(validMembers);
try {
return rpcManager.invokeRemotely(null, clusteredGetCommand, ResponseMode.WAIT_FOR_VALID_RESPONSE, config.getRemoteCallTimeout(), false, filter, false);
} catch (Exception e) {
Modified: trunk/core/src/main/java/org/infinispan/remoting/responses/ClusteredGetResponseValidityFilter.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/responses/ClusteredGetResponseValidityFilter.java 2009-04-24 16:06:56 UTC (rev 168)
+++ trunk/core/src/main/java/org/infinispan/remoting/responses/ClusteredGetResponseValidityFilter.java 2009-04-24 18:15:21 UTC (rev 169)
@@ -3,8 +3,8 @@
import org.infinispan.remoting.ResponseFilter;
import org.infinispan.remoting.transport.Address;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Collection;
+import java.util.HashSet;
/**
* A filter that tests the validity of {@link org.infinispan.commands.remote.ClusteredGetCommand}s.
@@ -16,12 +16,10 @@
private int numValidResponses = 0;
- private List<Address> pendingResponders;
+ private Collection<Address> pendingResponders;
- public ClusteredGetResponseValidityFilter(List<Address> expected, Address localAddress) {
- this.pendingResponders = new ArrayList<Address>(expected);
- // We'll never get a response from ourself
- this.pendingResponders.remove(localAddress);
+ public ClusteredGetResponseValidityFilter(Collection<Address> pendingResponders) {
+ this.pendingResponders = new HashSet<Address>(pendingResponders);
}
public boolean isAcceptable(Response response, Address address) {
Modified: trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java 2009-04-24 16:06:56 UTC (rev 168)
+++ trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java 2009-04-24 18:15:21 UTC (rev 169)
@@ -304,14 +304,17 @@
if (rsps == null) return Collections.emptyList();
List<Response> retval = new ArrayList<Response>(rsps.size());
+ boolean noValidResponses = true;
for (Rsp rsp : rsps.values()) {
if (rsp.wasSuspected() || !rsp.wasReceived()) {
if (rsp.wasSuspected()) {
throw new SuspectException("Suspected member: " + rsp.getSender());
} else {
- throw new TimeoutException("Replication timeout for " + rsp.getSender());
+ // if we have a response filter then we may not have waited for some nodes!
+ if (responseFilter == null) throw new TimeoutException("Replication timeout for " + rsp.getSender());
}
} else {
+ noValidResponses = false;
if (rsp.getValue() != null) {
Response value = (Response) rsp.getValue();
if (value instanceof ExceptionResponse) {
@@ -326,6 +329,8 @@
}
}
}
+
+ if (noValidResponses) throw new TimeoutException("Timed out waiting for valid responses!");
return retval;
} finally {
// release the "processing" lock so that other threads are aware of the network call having completed
Modified: trunk/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java 2009-04-24 16:06:56 UTC (rev 168)
+++ trunk/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java 2009-04-24 18:15:21 UTC (rev 169)
@@ -1,6 +1,8 @@
package org.infinispan.distribution;
import org.infinispan.Cache;
+import org.infinispan.commands.VisitableCommand;
+import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.config.Configuration;
import static org.infinispan.config.Configuration.CacheMode.DIST_ASYNC;
import static org.infinispan.config.Configuration.CacheMode.DIST_SYNC;
@@ -16,7 +18,6 @@
import javax.transaction.TransactionManager;
import java.io.Serializable;
-import java.util.ArrayList;
import java.util.List;
@Test(groups = "functional", testName = "distribution.BaseDistFunctionalTest")
@@ -28,12 +29,13 @@
protected void createCacheManagers() throws Throwable {
Configuration c = getDefaultClusteredConfig(sync ? DIST_SYNC : DIST_ASYNC);
+ if (!sync) c.setUnsafeUnreliableReturnValues(true);
if (tx) c.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
caches = createClusteredCaches(4, "dist", c);
c1 = caches.get(0);
- c2 = caches.get(0);
- c3 = caches.get(0);
- c4 = caches.get(0);
+ c2 = caches.get(1);
+ c3 = caches.get(2);
+ c4 = caches.get(3);
}
// ----------------- HELPERS ----------------
@@ -42,30 +44,41 @@
for (Cache<Object, String> c : caches) assert c.isEmpty();
c1.put("k1", "value");
- asyncWait();
- for (Cache<Object, String> c : caches) assert c.get("k1").equals("value");
+ asyncWait("k1", PutKeyValueCommand.class, getNonOwners("k1"));
+ for (Cache<Object, String> c : caches)
+ assert "value".equals(c.get("k1")) : "Failed on cache " + c.getCacheManager().getAddress();
assertOwnershipAndNonOwnership("k1");
}
protected Cache<Object, String> getFirstNonOwner(String key) {
- return getNonOwners(key).get(0);
+ return getNonOwners(key)[0];
}
+ protected Cache<Object, String> getSecondNonOwner(String key) {
+ return getNonOwners(key)[1];
+ }
+
protected void assertOnAllCachesAndOwnership(Object key, String value) {
for (Cache<Object, String> c : caches) {
- if (value == null)
- assert c.get(key) == null;
- else
- assert value.equals(c.get(key));
+ Object realVal = c.get(key);
+ if (value == null) {
+ assert realVal == null : "Expecting [" + key + "] to equal [" + value + "] on cache ["
+ + c.getCacheManager().getAddress() + "] but was [" + realVal + "]";
+ } else {
+ assert value.equals(realVal) : "Expecting [" + key + "] to equal [" + value + "] on cache ["
+ + c.getCacheManager().getAddress() + "] but was [" + realVal + "]";
+ }
}
- assertOwnershipAndNonOwnership("k1");
+ if (value != null) assertOwnershipAndNonOwnership(key);
}
protected void assertOwnershipAndNonOwnership(Object key) {
for (Cache<Object, String> c : caches) {
DataContainer dc = c.getAdvancedCache().getDataContainer();
if (isOwner(c, key)) {
- assert dc.get(key) instanceof ImmortalCacheEntry : "Fail on cache " + c.getCacheManager().getAddress() + ": dc.get(" + key + ") returned " + dc.get(key);
+ InternalCacheEntry ice = dc.get(key);
+ assert ice != null : "Fail on cache " + c.getCacheManager().getAddress() + ": dc.get(" + key + ") returned null!";
+ assert ice instanceof ImmortalCacheEntry : "Fail on cache " + c.getCacheManager().getAddress() + ": dc.get(" + key + ") returned " + dc.get(key);
} else {
if (dc.containsKey(key)) {
assert dc.get(key) instanceof MortalCacheEntry : "Fail on cache " + c.getCacheManager().getAddress() + ": dc.get(" + key + ") returned " + dc.get(key);
@@ -104,18 +117,26 @@
return false;
}
- protected List<Cache<Object, String>> getOwners(Object key) {
- List<Cache<Object, String>> owners = new ArrayList<Cache<Object, String>>();
+ protected static boolean isFirstOwner(Cache<?, ?> c, Object key) {
+ DistributionManager dm = c.getAdvancedCache().getComponentRegistry().getComponent(DistributionManager.class);
+ List<Address> ownerAddresses = dm.locate(key);
+ return c.getCacheManager().getAddress().equals(ownerAddresses.get(0));
+ }
+
+ protected Cache<Object, String>[] getOwners(Object key) {
+ Cache<Object, String>[] owners = new Cache[2];
+ int i = 0;
for (Cache<Object, String> c : caches) {
- if (isOwner(c, key)) owners.add(c);
+ if (isOwner(c, key)) owners[i++] = c;
}
return owners;
}
- protected List<Cache<Object, String>> getNonOwners(Object key) {
- List<Cache<Object, String>> nonOwners = new ArrayList<Cache<Object, String>>();
+ protected Cache<Object, String>[] getNonOwners(Object key) {
+ Cache<Object, String>[] nonOwners = new Cache[2];
+ int i = 0;
for (Cache<Object, String> c : caches) {
- if (!isOwner(c, key)) nonOwners.add(c);
+ if (!isOwner(c, key)) nonOwners[i++] = c;
}
return nonOwners;
}
@@ -125,7 +146,19 @@
return dm.locate(key);
}
- protected void asyncWait() {
+ protected DistributionManager getDistributionManager(Cache<?, ?> c) {
+ return c.getAdvancedCache().getComponentRegistry().getComponent(DistributionManager.class);
+ }
+
+ /**
+ * Blocks and waits for a replication event on async caches
+ *
+ * @param key key that causes the replication. Used to determine which caches to listen on. If null, all caches
+ * are checked
+ * @param command command to listen for
+ * @param caches on which this key should be invalidated
+ */
+ protected void asyncWait(Object key, Class<? extends VisitableCommand> command, Cache<?, ?>... caches) {
// no op.
}
@@ -144,7 +177,7 @@
public MagicKey(Cache<?, ?> toMapTo) {
address = toMapTo.getCacheManager().getAddress().toString();
// generate a hashcode that will always map it to the specified cache.
- for (int i = 0; i < DefaultConsistentHash.HASH_SPACE; i += 100) {
+ for (int i = 1; i < DefaultConsistentHash.HASH_SPACE; i += 100) {
// create a dummy object with this hashcode
final int hc = i;
Object dummy = new Object() {
@@ -154,7 +187,7 @@
}
};
- if (BaseDistFunctionalTest.isOwner(toMapTo, dummy)) {
+ if (BaseDistFunctionalTest.isFirstOwner(toMapTo, dummy)) {
// we have found a hashcode that works!
hashcode = hc;
break;
@@ -168,6 +201,19 @@
}
@Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ MagicKey magicKey = (MagicKey) o;
+
+ if (hashcode != magicKey.hashcode) return false;
+ if (address != null ? !address.equals(magicKey.address) : magicKey.address != null) return false;
+
+ return true;
+ }
+
+ @Override
public String toString() {
return "MagicKey{" +
"hashcode=" + hashcode +
Modified: trunk/core/src/test/java/org/infinispan/distribution/DefaultConsistentHashTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/DefaultConsistentHashTest.java 2009-04-24 16:06:56 UTC (rev 168)
+++ trunk/core/src/test/java/org/infinispan/distribution/DefaultConsistentHashTest.java 2009-04-24 18:15:21 UTC (rev 169)
@@ -11,7 +11,7 @@
import java.util.List;
import java.util.Map;
- at Test(groups = "unit", testName = "distribution.DefaultConsistentHashTest", enabled = false)
+ at Test(groups = "unit", testName = "distribution.DefaultConsistentHashTest")
public class DefaultConsistentHashTest {
List<Address> servers;
Modified: trunk/core/src/test/java/org/infinispan/distribution/DistAsyncFuncTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/DistAsyncFuncTest.java 2009-04-24 16:06:56 UTC (rev 168)
+++ trunk/core/src/test/java/org/infinispan/distribution/DistAsyncFuncTest.java 2009-04-24 18:15:21 UTC (rev 169)
@@ -1,24 +1,57 @@
package org.infinispan.distribution;
+import org.infinispan.Cache;
+import org.infinispan.commands.VisitableCommand;
+import org.infinispan.commands.write.InvalidateL1Command;
+import org.infinispan.test.ReplListener;
import org.testng.annotations.Test;
- at Test(groups = "functional", testName = "distribution.DistAsyncFuncTest", enabled = false)
+import java.util.HashMap;
+import java.util.Map;
+
+ at Test(groups = "functional", testName = "distribution.DistAsyncFuncTest")
public class DistAsyncFuncTest extends DistSyncFuncTest {
+ ReplListener r1, r2, r3, r4;
+ ReplListener[] r;
+ Map<Cache<?, ?>, ReplListener> listenerLookup;
+
public DistAsyncFuncTest() {
sync = false;
tx = false;
}
@Override
- protected void asyncWait() {
- // we need to wait for an async event to happen on *each* cache?
- // TODO figure this out properly!
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- e.printStackTrace(); // TODO: Customise this generated block
+ protected void createCacheManagers() throws Throwable {
+ super.createCacheManagers();
+ r1 = new ReplListener(c1, true, true);
+ r2 = new ReplListener(c2, true, true);
+ r3 = new ReplListener(c3, true, true);
+ r4 = new ReplListener(c4, true, true);
+ r = new ReplListener[]{r1, r2, r3, r4};
+ listenerLookup = new HashMap<Cache<?, ?>, ReplListener>();
+ for (ReplListener rl : r) listenerLookup.put(rl.getCache(), rl);
+ }
+
+
+ @Override
+ protected void asyncWait(Object key, Class<? extends VisitableCommand> command, Cache<?, ?>... cachesOnWhichKeyShouldInval) {
+ if (key == null) {
+ // test all caches.
+ for (ReplListener rl : r) rl.expect(command);
+ for (ReplListener rl : r) rl.waitForRpc();
+ } else {
+ for (Cache<?, ?> c : getOwners(key)) {
+ listenerLookup.get(c).expect(command);
+ listenerLookup.get(c).waitForRpc();
+ }
+
+ if (cachesOnWhichKeyShouldInval != null) {
+ for (Cache<?, ?> c : cachesOnWhichKeyShouldInval) {
+ listenerLookup.get(c).expect(InvalidateL1Command.class);
+ listenerLookup.get(c).waitForRpc();
+ }
+ }
}
}
-
}
\ No newline at end of file
Modified: trunk/core/src/test/java/org/infinispan/distribution/DistAsyncTxFuncTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/DistAsyncTxFuncTest.java 2009-04-24 16:06:56 UTC (rev 168)
+++ trunk/core/src/test/java/org/infinispan/distribution/DistAsyncTxFuncTest.java 2009-04-24 18:15:21 UTC (rev 169)
@@ -1,10 +1,24 @@
package org.infinispan.distribution;
+import org.infinispan.Cache;
+import org.infinispan.commands.VisitableCommand;
+import org.infinispan.commands.write.InvalidateCommand;
+import org.infinispan.test.ReplListener;
import org.testng.annotations.Test;
- at Test(groups = "functional", testName = "distribution.DistAsyncTxFuncTest", enabled = false)
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+ at Test(groups = "functional", testName = "distribution.DistAsyncTxFuncTest")
public class DistAsyncTxFuncTest extends DistSyncTxFuncTest {
+ ReplListener r1, r2, r3, r4;
+ ReplListener[] r;
+ Map<Cache<?, ?>, ReplListener> listenerLookup;
+
public DistAsyncTxFuncTest() {
sync = false;
tx = true;
@@ -12,13 +26,51 @@
}
@Override
- protected void asyncWait() {
- // we need to wait for an async event to happen on *each* cache?
- // TODO figure this out properly!
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- e.printStackTrace(); // TODO: Customise this generated block
+ protected void createCacheManagers() throws Throwable {
+ super.createCacheManagers();
+ r1 = new ReplListener(c1, true, true);
+ r2 = new ReplListener(c2, true, true);
+ r3 = new ReplListener(c3, true, true);
+ r4 = new ReplListener(c4, true, true);
+ r = new ReplListener[]{r1, r2, r3, r4};
+ listenerLookup = new HashMap<Cache<?, ?>, ReplListener>();
+ for (ReplListener rl : r) listenerLookup.put(rl.getCache(), rl);
+ }
+
+ @Override
+ protected void asyncWait(Object key, Class<? extends VisitableCommand> command, Cache<?, ?>... cachesOnWhichKeyShouldInval) {
+ if (key == null) {
+ // test all caches.
+ for (ReplListener rl : r) rl.expect(command);
+ for (ReplListener rl : r) rl.waitForRpc();
+ } else {
+ for (Cache<?, ?> c : getOwners(key)) {
+ listenerLookup.get(c).expect(command);
+ listenerLookup.get(c).waitForRpc();
+ }
+
+ if (cachesOnWhichKeyShouldInval != null) {
+ for (Cache<?, ?> c : cachesOnWhichKeyShouldInval) {
+ listenerLookup.get(c).expect(InvalidateCommand.class);
+ listenerLookup.get(c).waitForRpc();
+ }
+ }
}
}
+
+ @Override
+ protected void asyncTxWait(Object... keys) {
+ // Wait for a tx completion event
+ if (keys != null) {
+ Set<Cache<?, ?>> cachesInTx = new HashSet<Cache<?, ?>>();
+ for (Object k : keys) {
+ cachesInTx.addAll(Arrays.asList(getOwners(k)));
+ }
+
+ for (Cache<?, ?> c : cachesInTx) {
+ listenerLookup.get(c).expectAnyWithTx();
+ listenerLookup.get(c).waitForRpc();
+ }
+ }
+ }
}
\ No newline at end of file
Modified: trunk/core/src/test/java/org/infinispan/distribution/DistSyncFuncTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/DistSyncFuncTest.java 2009-04-24 16:06:56 UTC (rev 168)
+++ trunk/core/src/test/java/org/infinispan/distribution/DistSyncFuncTest.java 2009-04-24 18:15:21 UTC (rev 169)
@@ -1,9 +1,13 @@
package org.infinispan.distribution;
import org.infinispan.Cache;
+import org.infinispan.commands.write.ClearCommand;
+import org.infinispan.commands.write.PutKeyValueCommand;
+import org.infinispan.commands.write.RemoveCommand;
+import org.infinispan.commands.write.ReplaceCommand;
import org.testng.annotations.Test;
- at Test(groups = "functional", testName = "distribution.DistSyncFuncTest", enabled = false)
+ at Test(groups = "functional", testName = "distribution.DistSyncFuncTest")
public class DistSyncFuncTest extends BaseDistFunctionalTest {
public DistSyncFuncTest() {
@@ -14,9 +18,9 @@
public void testBasicDistribution() {
for (Cache<Object, String> c : caches) assert c.isEmpty();
- c1.put("k1", "value");
+ getOwners("k1")[0].put("k1", "value");
- asyncWait();
+ asyncWait("k1", PutKeyValueCommand.class, getNonOwners("k1"));
for (Cache<Object, String> c : caches) {
if (isOwner(c, "k1")) {
@@ -42,26 +46,30 @@
public void testPutFromNonOwner() {
initAndTest();
- Object retval = getFirstNonOwner("k1").put("k1", "value2");
- asyncWait();
- if (sync) assert "value".equals(retval);
+ Cache<Object, String> nonOwner = getFirstNonOwner("k1");
+ System.out.println("Non-owner address is " + nonOwner.getCacheManager().getAddress());
+
+ Object retval = nonOwner.put("k1", "value2");
+ asyncWait("k1", PutKeyValueCommand.class, getSecondNonOwner("k1"));
+
+ assert "value".equals(retval);
assertOnAllCachesAndOwnership("k1", "value2");
}
public void testPutIfAbsentFromNonOwner() {
initAndTest();
Object retval = getFirstNonOwner("k1").putIfAbsent("k1", "value2");
- asyncWait();
- if (sync) assert "value".equals(retval);
+ assert "value".equals(retval);
+
assertOnAllCachesAndOwnership("k1", "value");
c1.clear();
- asyncWait();
+ asyncWait(null, ClearCommand.class);
retval = getFirstNonOwner("k1").putIfAbsent("k1", "value2");
- asyncWait();
- if (sync) assert null == retval;
+ asyncWait("k1", PutKeyValueCommand.class, getSecondNonOwner("k1"));
+ assert null == retval;
assertOnAllCachesAndOwnership("k1", "value2");
}
@@ -69,7 +77,7 @@
public void testRemoveFromNonOwner() {
initAndTest();
Object retval = getFirstNonOwner("k1").remove("k1");
- asyncWait();
+ asyncWait("k1", RemoveCommand.class, getSecondNonOwner("k1"));
if (sync) assert "value".equals(retval);
assertOnAllCachesAndOwnership("k1", null);
@@ -78,14 +86,13 @@
public void testConditionalRemoveFromNonOwner() {
initAndTest();
boolean retval = getFirstNonOwner("k1").remove("k1", "value2");
- asyncWait();
- if (sync) assert !retval : "Should not have removed entry";
+ assert !retval : "Should not have removed entry";
assertOnAllCachesAndOwnership("k1", "value");
retval = getFirstNonOwner("k1").remove("k1", "value");
- asyncWait();
- if (sync) assert retval : "Should have removed entry";
+ asyncWait("k1", RemoveCommand.class, getSecondNonOwner("k1"));
+ assert retval : "Should have removed entry";
assertOnAllCachesAndOwnership("k1", null);
}
@@ -93,36 +100,34 @@
public void testReplaceFromNonOwner() {
initAndTest();
Object retval = getFirstNonOwner("k1").replace("k1", "value2");
- asyncWait();
- if (sync) assert "value".equals(retval);
+ assert "value".equals(retval);
+ asyncWait("k1", ReplaceCommand.class, getSecondNonOwner("k1"));
+
assertOnAllCachesAndOwnership("k1", "value2");
c1.clear();
- asyncWait();
+ asyncWait(null, ClearCommand.class);
retval = getFirstNonOwner("k1").replace("k1", "value2");
- asyncWait();
- if (sync) assert retval == null;
+ assert retval == null;
assertOnAllCachesAndOwnership("k1", null);
}
public void testConditionalReplaceFromNonOwner() {
initAndTest();
- boolean retval = getFirstNonOwner("k1").replace("k1", "valueX", "value2");
- asyncWait();
- if (sync) assert !retval : "Should not have replaced";
+ Cache<Object, String> nonOwner = getFirstNonOwner("k1");
+ boolean retval = nonOwner.replace("k1", "valueX", "value2");
+ assert !retval : "Should not have replaced";
assertOnAllCachesAndOwnership("k1", "value");
- c1.clear();
- asyncWait();
+ assert !nonOwner.getAdvancedCache().getComponentRegistry().getComponent(DistributionManager.class).isLocal("k1");
+ retval = nonOwner.replace("k1", "value", "value2");
+ asyncWait("k1", ReplaceCommand.class, getSecondNonOwner("k1"));
+ assert retval : "Should have replaced";
- retval = getFirstNonOwner("k1").replace("k1", "value", "value2");
- asyncWait();
- if (sync) assert retval : "Should have replaced";
-
assertOnAllCachesAndOwnership("k1", "value2");
}
@@ -130,8 +135,8 @@
for (Cache<Object, String> c : caches) assert c.isEmpty();
for (int i = 0; i < 10; i++) {
- c1.put("k" + i, "value" + i);
- asyncWait();
+ getOwners("k" + i)[0].put("k" + i, "value" + i);
+ asyncWait("k" + i, PutKeyValueCommand.class, getNonOwners("k" + i));
}
// this will fill up L1 as well
@@ -140,7 +145,7 @@
for (Cache<Object, String> c : caches) assert !c.isEmpty();
c1.clear();
- asyncWait();
+ asyncWait(null, ClearCommand.class);
for (Cache<Object, String> c : caches) assert c.isEmpty();
}
Modified: trunk/core/src/test/java/org/infinispan/distribution/DistSyncTxFuncTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/DistSyncTxFuncTest.java 2009-04-24 16:06:56 UTC (rev 168)
+++ trunk/core/src/test/java/org/infinispan/distribution/DistSyncTxFuncTest.java 2009-04-24 18:15:21 UTC (rev 169)
@@ -1,10 +1,11 @@
package org.infinispan.distribution;
+import org.infinispan.commands.write.PutKeyValueCommand;
import org.testng.annotations.Test;
import javax.transaction.TransactionManager;
- at Test(groups = "functional", testName = "distribution.DistSyncTxFuncTest", enabled = false)
+ at Test(groups = "functional", testName = "distribution.DistSyncTxFuncTest")
public class DistSyncTxFuncTest extends BaseDistFunctionalTest {
public DistSyncTxFuncTest() {
sync = true;
@@ -12,10 +13,17 @@
cleanup = CleanupPhase.AFTER_METHOD; // ensure any stale TXs are wiped
}
+ protected void asyncTxWait(Object... keys) {
+ // no op. Meant to be overridden
+ }
+
private void init(MagicKey k1, MagicKey k2) {
// neither key maps on to c4
c2.put(k1, "value1");
+ asyncWait(k1, PutKeyValueCommand.class);
+
c2.put(k2, "value2");
+ asyncWait(k2, PutKeyValueCommand.class);
assertIsInContainerImmortal(c1, k1);
assertIsInContainerImmortal(c2, k1);
@@ -42,7 +50,7 @@
c4.put(k2, "new_value2");
tm4.commit();
- asyncWait();
+ asyncTxWait(k1, k2);
assertIsInContainerImmortal(c1, k1);
assertIsInContainerImmortal(c2, k1);
@@ -54,9 +62,13 @@
assertIsNotInL1(c1, k2);
assertIsNotInL1(c3, k1);
- assertOnAllCachesAndOwnership(k1, "new_value1");
- assertOnAllCachesAndOwnership(k2, "new_value2");
+ checkOwnership(k1, k2, "new_value1", "new_value2");
+ }
+ private void checkOwnership(MagicKey k1, MagicKey k2, String v1, String v2) {
+ assertOnAllCachesAndOwnership(k1, v1);
+ assertOnAllCachesAndOwnership(k2, v2);
+
assertIsInL1(c4, k1);
assertIsInL1(c4, k2);
assertIsInL1(c1, k2);
@@ -77,8 +89,34 @@
c4.put(k2, "new_value2");
tm4.rollback();
- asyncWait();
+ assertIsInContainerImmortal(c1, k1);
+ assertIsInContainerImmortal(c2, k1);
+ assertIsInContainerImmortal(c2, k2);
+ assertIsInContainerImmortal(c3, k2);
+ assertIsNotInL1(c4, k1);
+ assertIsNotInL1(c4, k2);
+ assertIsNotInL1(c1, k2);
+ assertIsNotInL1(c3, k1);
+
+ checkOwnership(k1, k2, "value1", "value2");
+ }
+
+ public void testPutFromNonOwner() throws Exception {
+ // we need 2 keys that reside on different caches...
+ MagicKey k1 = new MagicKey(c1); // maps on to c1 and c2
+ MagicKey k2 = new MagicKey(c2); // maps on to c2 and c3
+
+ init(k1, k2);
+
+ TransactionManager tm4 = getTransactionManager(c4);
+ tm4.begin();
+ Object ret = c4.put(k1, "new_value");
+ assert "value1".equals(ret);
+ ret = c4.put(k2, "new_value");
+ assert "value2".equals(ret);
+ tm4.rollback();
+
assertIsInContainerImmortal(c1, k1);
assertIsInContainerImmortal(c2, k1);
assertIsInContainerImmortal(c2, k2);
@@ -89,16 +127,181 @@
assertIsNotInL1(c1, k2);
assertIsNotInL1(c3, k1);
- assertOnAllCachesAndOwnership(k1, "value1");
- assertOnAllCachesAndOwnership(k2, "value2");
+ checkOwnership(k1, k2, "value1", "value2");
+ }
- assertIsInL1(c4, k1);
- assertIsInL1(c4, k2);
- assertIsInL1(c1, k2);
- assertIsInL1(c3, k1);
+ public void testPutIfAbsentFromNonOwner() throws Exception {
+ // we need 2 keys that reside on different caches...
+ MagicKey k1 = new MagicKey(c1); // maps on to c1 and c2
+ MagicKey k2 = new MagicKey(c2); // maps on to c2 and c3
+
+ init(k1, k2);
+
+ TransactionManager tm4 = getTransactionManager(c4);
+ tm4.begin();
+ Object ret = c4.putIfAbsent(k1, "new_value");
+ assert "value1".equals(ret) : "Was expecting value1 but was " + ret;
+ ret = c4.putIfAbsent(k2, "new_value");
+ assert "value2".equals(ret) : "Was expecting value2 but was " + ret;
+
+ assert c4.get(k1).equals("value1");
+ assert c4.get(k2).equals("value2");
+
+ tm4.rollback();
+
+ assert c2.get(k1).equals("value1");
+ assert c2.get(k2).equals("value2");
+
+ assertIsInContainerImmortal(c1, k1);
+ assertIsInContainerImmortal(c2, k1);
+ assertIsInContainerImmortal(c2, k2);
+ assertIsInContainerImmortal(c3, k2);
+
+ assertIsNotInL1(c4, k1);
+ assertIsNotInL1(c4, k2);
+ assertIsNotInL1(c1, k2);
+ assertIsNotInL1(c3, k1);
+
+ checkOwnership(k1, k2, "value1", "value2");
}
- public void testReturnValuesInTx() throws Exception {
- // TODO
+ public void testRemoveFromNonOwner() throws Exception {
+ // we need 2 keys that reside on different caches...
+ MagicKey k1 = new MagicKey(c1); // maps on to c1 and c2
+ MagicKey k2 = new MagicKey(c2); // maps on to c2 and c3
+
+ init(k1, k2);
+
+ TransactionManager tm4 = getTransactionManager(c4);
+ tm4.begin();
+ Object ret = c4.remove(k1);
+ assert "value1".equals(ret);
+ ret = c4.remove(k2);
+ assert "value2".equals(ret);
+
+ assert !c4.containsKey(k1);
+ assert !c4.containsKey(k2);
+ tm4.rollback();
+
+ assertIsInContainerImmortal(c1, k1);
+ assertIsInContainerImmortal(c2, k1);
+ assertIsInContainerImmortal(c2, k2);
+ assertIsInContainerImmortal(c3, k2);
+
+ assertIsNotInL1(c4, k1);
+ assertIsNotInL1(c4, k2);
+ assertIsNotInL1(c1, k2);
+ assertIsNotInL1(c3, k1);
+
+ checkOwnership(k1, k2, "value1", "value2");
}
+
+ public void testConditionalRemoveFromNonOwner() throws Exception {
+ // we need 2 keys that reside on different caches...
+ MagicKey k1 = new MagicKey(c1); // maps on to c1 and c2
+ MagicKey k2 = new MagicKey(c2); // maps on to c2 and c3
+
+ init(k1, k2);
+
+ TransactionManager tm4 = getTransactionManager(c4);
+ tm4.begin();
+ boolean ret = c4.remove(k1, "valueX");
+ assert !ret;
+ ret = c4.remove(k2, "valueX");
+ assert !ret;
+
+ assert c4.containsKey(k1);
+ assert c4.containsKey(k2);
+
+ ret = c4.remove(k1, "value1");
+ assert ret;
+ ret = c4.remove(k2, "value2");
+ assert ret;
+
+ assert !c4.containsKey(k1);
+ assert !c4.containsKey(k2);
+ tm4.rollback();
+
+ assertIsInContainerImmortal(c1, k1);
+ assertIsInContainerImmortal(c2, k1);
+ assertIsInContainerImmortal(c2, k2);
+ assertIsInContainerImmortal(c3, k2);
+
+ assertIsNotInL1(c4, k1);
+ assertIsNotInL1(c4, k2);
+ assertIsNotInL1(c1, k2);
+ assertIsNotInL1(c3, k1);
+
+ checkOwnership(k1, k2, "value1", "value2");
+ }
+
+ public void testReplaceFromNonOwner() throws Exception {
+ // we need 2 keys that reside on different caches...
+ MagicKey k1 = new MagicKey(c1); // maps on to c1 and c2
+ MagicKey k2 = new MagicKey(c2); // maps on to c2 and c3
+
+ init(k1, k2);
+
+ TransactionManager tm4 = getTransactionManager(c4);
+ tm4.begin();
+ Object ret = c4.replace(k1, "new_value");
+ assert "value1".equals(ret);
+ ret = c4.replace(k2, "new_value");
+ assert "value2".equals(ret);
+
+ assert "new_value".equals(c4.get(k1));
+ assert "new_value".equals(c4.get(k2));
+ tm4.rollback();
+
+ assertIsInContainerImmortal(c1, k1);
+ assertIsInContainerImmortal(c2, k1);
+ assertIsInContainerImmortal(c2, k2);
+ assertIsInContainerImmortal(c3, k2);
+
+ assertIsNotInL1(c4, k1);
+ assertIsNotInL1(c4, k2);
+ assertIsNotInL1(c1, k2);
+ assertIsNotInL1(c3, k1);
+
+ checkOwnership(k1, k2, "value1", "value2");
+ }
+
+ public void testConditionalReplaceFromNonOwner() throws Exception {
+ // we need 2 keys that reside on different caches...
+ MagicKey k1 = new MagicKey(c1); // maps on to c1 and c2
+ MagicKey k2 = new MagicKey(c2); // maps on to c2 and c3
+
+ init(k1, k2);
+
+ TransactionManager tm4 = getTransactionManager(c4);
+ tm4.begin();
+ boolean ret = c4.replace(k1, "valueX", "new_value");
+ assert !ret;
+ ret = c4.replace(k2, "valueX", "new_value");
+ assert !ret;
+
+ assert "value1".equals(c4.get(k1));
+ assert "value2".equals(c4.get(k2));
+
+ ret = c4.replace(k1, "value1", "new_value");
+ assert ret;
+ ret = c4.replace(k2, "value2", "new_value");
+ assert ret;
+
+ assert "new_value".equals(c4.get(k1));
+ assert "new_value".equals(c4.get(k2));
+ tm4.rollback();
+
+ assertIsInContainerImmortal(c1, k1);
+ assertIsInContainerImmortal(c2, k1);
+ assertIsInContainerImmortal(c2, k2);
+ assertIsInContainerImmortal(c3, k2);
+
+ assertIsNotInL1(c4, k1);
+ assertIsNotInL1(c4, k2);
+ assertIsNotInL1(c1, k2);
+ assertIsNotInL1(c3, k1);
+
+ checkOwnership(k1, k2, "value1", "value2");
+ }
}
Added: trunk/core/src/test/java/org/infinispan/distribution/MagicKeyTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/MagicKeyTest.java (rev 0)
+++ trunk/core/src/test/java/org/infinispan/distribution/MagicKeyTest.java 2009-04-24 18:15:21 UTC (rev 169)
@@ -0,0 +1,33 @@
+package org.infinispan.distribution;
+
+import org.testng.annotations.Test;
+
+
+ at Test(groups = "unit", testName = "distribution.MagicKeyTest")
+public class MagicKeyTest extends BaseDistFunctionalTest {
+ public void testMagicKeys() {
+ BaseDistFunctionalTest.MagicKey k1 = new BaseDistFunctionalTest.MagicKey(c1);
+ assert getDistributionManager(c1).isLocal(k1);
+ assert getDistributionManager(c2).isLocal(k1);
+ assert !getDistributionManager(c3).isLocal(k1);
+ assert !getDistributionManager(c4).isLocal(k1);
+
+ BaseDistFunctionalTest.MagicKey k2 = new BaseDistFunctionalTest.MagicKey(c2);
+ assert !getDistributionManager(c1).isLocal(k2);
+ assert getDistributionManager(c2).isLocal(k2);
+ assert getDistributionManager(c3).isLocal(k2);
+ assert !getDistributionManager(c4).isLocal(k2);
+
+ BaseDistFunctionalTest.MagicKey k3 = new BaseDistFunctionalTest.MagicKey(c3);
+ assert !getDistributionManager(c1).isLocal(k3);
+ assert !getDistributionManager(c2).isLocal(k3);
+ assert getDistributionManager(c3).isLocal(k3);
+ assert getDistributionManager(c4).isLocal(k3);
+
+ BaseDistFunctionalTest.MagicKey k4 = new BaseDistFunctionalTest.MagicKey(c4);
+ assert getDistributionManager(c1).isLocal(k4);
+ assert !getDistributionManager(c2).isLocal(k4);
+ assert !getDistributionManager(c3).isLocal(k4);
+ assert getDistributionManager(c4).isLocal(k4);
+ }
+}
Property changes on: trunk/core/src/test/java/org/infinispan/distribution/MagicKeyTest.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Modified: trunk/core/src/test/java/org/infinispan/marshall/jboss/JBossMarshallerTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/marshall/jboss/JBossMarshallerTest.java 2009-04-24 16:06:56 UTC (rev 168)
+++ trunk/core/src/test/java/org/infinispan/marshall/jboss/JBossMarshallerTest.java 2009-04-24 18:15:21 UTC (rev 169)
@@ -202,7 +202,7 @@
// EvictCommand does not have an empty constructor, so doesn't look to be one that is marshallable.
- InvalidateCommand c7 = new InvalidateCommand(null, "key1", "key2");
+ InvalidateCommand c7 = new InvalidateCommand(null, null, "key1", "key2");
bytes = marshaller.objectToByteBuffer(c7);
InvalidateCommand rc7 = (InvalidateCommand) marshaller.objectFromByteBuffer(bytes);
assert rc7.getCommandId() == c7.getCommandId() : "Writen[" + c7.getCommandId() + "] and read[" + rc7.getCommandId() + "] objects should be the same";
More information about the infinispan-commits
mailing list