Author: manik.surtani(a)jboss.com
Date: 2009-01-30 09:10:03 -0500 (Fri, 30 Jan 2009)
New Revision: 7614
Added:
core/branches/flat/src/main/java/org/horizon/commands/write/DataWriteCommand.java
core/branches/flat/src/main/java/org/horizon/commands/write/WriteCommand.java
core/branches/flat/src/test/java/org/horizon/replication/AsyncReplicatedAPITest.java
core/branches/flat/src/test/java/org/horizon/replication/BaseReplicatedAPITest.java
core/branches/flat/src/test/java/org/horizon/replication/SyncReplicatedAPITest.java
Removed:
core/branches/flat/src/test/java/org/horizon/replication/ExceptionTest.java
Modified:
core/branches/flat/src/main/java/org/horizon/commands/CommandsFactory.java
core/branches/flat/src/main/java/org/horizon/commands/CommandsFactoryImpl.java
core/branches/flat/src/main/java/org/horizon/commands/RemoteCommandFactory.java
core/branches/flat/src/main/java/org/horizon/commands/read/AbstractDataCommand.java
core/branches/flat/src/main/java/org/horizon/commands/write/ClearCommand.java
core/branches/flat/src/main/java/org/horizon/commands/write/EvictCommand.java
core/branches/flat/src/main/java/org/horizon/commands/write/InvalidateCommand.java
core/branches/flat/src/main/java/org/horizon/commands/write/PutKeyValueCommand.java
core/branches/flat/src/main/java/org/horizon/commands/write/PutMapCommand.java
core/branches/flat/src/main/java/org/horizon/commands/write/RemoveCommand.java
core/branches/flat/src/main/java/org/horizon/commands/write/ReplaceCommand.java
core/branches/flat/src/main/java/org/horizon/container/DataContainer.java
core/branches/flat/src/main/java/org/horizon/container/UnsortedDataContainer.java
core/branches/flat/src/main/java/org/horizon/interceptors/CacheLoaderInterceptor.java
core/branches/flat/src/main/java/org/horizon/interceptors/InvalidationInterceptor.java
core/branches/flat/src/main/java/org/horizon/interceptors/LockingInterceptor.java
core/branches/flat/src/main/java/org/horizon/interceptors/ReplicationInterceptor.java
core/branches/flat/src/main/java/org/horizon/notifications/CacheNotifier.java
core/branches/flat/src/main/java/org/horizon/notifications/CacheNotifierImpl.java
core/branches/flat/src/main/java/org/horizon/util/TestingUtil.java
core/branches/flat/src/test/java/org/horizon/BaseClusteredTest.java
core/branches/flat/src/test/java/org/horizon/api/MixedModeTest.java
core/branches/flat/src/test/java/org/horizon/api/mvcc/PutForExternalReadTest.java
core/branches/flat/src/test/java/org/horizon/config/parsing/ConfigurationParserTest.java
core/branches/flat/src/test/java/org/horizon/invalidation/BaseInvalidationTest.java
core/branches/flat/src/test/java/org/horizon/replication/AsyncReplTest.java
core/branches/flat/src/test/java/org/horizon/replication/ReplicationExceptionTest.java
Log:
Fixed replication, invalidation
Modified: core/branches/flat/src/main/java/org/horizon/commands/CommandsFactory.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/commands/CommandsFactory.java 2009-01-29
18:06:23 UTC (rev 7613)
+++ core/branches/flat/src/main/java/org/horizon/commands/CommandsFactory.java 2009-01-30
14:10:03 UTC (rev 7614)
@@ -52,6 +52,8 @@
RemoveCommand buildRemoveCommand(Object key, Object value);
+ InvalidateCommand buildInvalidateCommand(Object... keys);
+
ReplaceCommand buildReplaceCommand(Object key, Object oldValue, Object newValue);
SizeCommand buildSizeCommand();
@@ -85,6 +87,4 @@
ReplicateCommand buildReplicateCommand(List<ReplicableCommand> toReplicate);
ReplicateCommand buildReplicateCommand(ReplicableCommand call);
-
- InvalidateCommand buildInvalidateCommand(Object fqn);
}
Modified: core/branches/flat/src/main/java/org/horizon/commands/CommandsFactoryImpl.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/commands/CommandsFactoryImpl.java 2009-01-29
18:06:23 UTC (rev 7613)
+++
core/branches/flat/src/main/java/org/horizon/commands/CommandsFactoryImpl.java 2009-01-30
14:10:03 UTC (rev 7614)
@@ -71,6 +71,10 @@
return new RemoveCommand(key, value, notifier);
}
+ public InvalidateCommand buildInvalidateCommand(Object... keys) {
+ return new InvalidateCommand(notifier, keys);
+ }
+
public ReplaceCommand buildReplaceCommand(Object key, Object oldValue, Object
newValue) {
return new ReplaceCommand(key, oldValue, newValue);
}
@@ -95,8 +99,7 @@
}
public EvictCommand buildEvictCommand(Object key) {
- EvictCommand command = new EvictCommand(key);
- command.initialize(notifier);
+ EvictCommand command = new EvictCommand(key, notifier);
return command;
}
@@ -137,6 +140,10 @@
if (rc.getCommands() != null)
for (ReplicableCommand nested : rc.getCommands())
initializeReplicableCommand(nested);
break;
+ case InvalidateCommand.METHOD_ID:
+ InvalidateCommand ic = (InvalidateCommand) c;
+ ic.init(notifier);
+ break;
case PrepareCommand.METHOD_ID:
PrepareCommand pc = (PrepareCommand) c;
if (pc.getModifications() != null)
@@ -144,8 +151,4 @@
break;
}
}
-
- public InvalidateCommand buildInvalidateCommand(Object fqn) {
- throw new UnsupportedOperationException("Not implemented");//todo please
implement!
- }
}
Modified: core/branches/flat/src/main/java/org/horizon/commands/RemoteCommandFactory.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/commands/RemoteCommandFactory.java 2009-01-29
18:06:23 UTC (rev 7613)
+++
core/branches/flat/src/main/java/org/horizon/commands/RemoteCommandFactory.java 2009-01-30
14:10:03 UTC (rev 7614)
@@ -7,6 +7,7 @@
import org.horizon.commands.tx.PrepareCommand;
import org.horizon.commands.tx.RollbackCommand;
import org.horizon.commands.write.ClearCommand;
+import org.horizon.commands.write.InvalidateCommand;
import org.horizon.commands.write.PutKeyValueCommand;
import org.horizon.commands.write.PutMapCommand;
import org.horizon.commands.write.RemoveCommand;
@@ -64,7 +65,9 @@
case ReplicateCommand.METHOD_ID:
command = new ReplicateCommand();
break;
-
+ case InvalidateCommand.METHOD_ID:
+ command = new InvalidateCommand();
+ break;
default:
throw new CacheException("Unknown command id " + id +
"!");
}
Modified:
core/branches/flat/src/main/java/org/horizon/commands/read/AbstractDataCommand.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/commands/read/AbstractDataCommand.java 2009-01-29
18:06:23 UTC (rev 7613)
+++
core/branches/flat/src/main/java/org/horizon/commands/read/AbstractDataCommand.java 2009-01-30
14:10:03 UTC (rev 7614)
@@ -69,7 +69,6 @@
return (key != null ? key.hashCode() : 0);
}
-
public String toString() {
return getClass().getSimpleName() + "{" +
"key=" + key +
Modified: core/branches/flat/src/main/java/org/horizon/commands/write/ClearCommand.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/commands/write/ClearCommand.java 2009-01-29
18:06:23 UTC (rev 7613)
+++
core/branches/flat/src/main/java/org/horizon/commands/write/ClearCommand.java 2009-01-30
14:10:03 UTC (rev 7614)
@@ -21,7 +21,6 @@
*/
package org.horizon.commands.write;
-import org.horizon.commands.VisitableCommand;
import org.horizon.commands.Visitor;
import org.horizon.container.MVCCEntry;
import org.horizon.context.InvocationContext;
@@ -30,7 +29,7 @@
* @author Mircea.Markus(a)jboss.com
* @since 1.0
*/
-public class ClearCommand implements VisitableCommand {
+public class ClearCommand implements WriteCommand {
private static final Object[] params = new Object[0];
public static final byte METHOD_ID = 17;
@@ -57,4 +56,13 @@
public void setParameters(int commandId, Object[] parameters) {
if (commandId != METHOD_ID) throw new IllegalStateException("Invalid method
id");
}
+
+ @Override
+ public String toString() {
+ return "ClearCommand";
+ }
+
+ public boolean isSuccessful() {
+ return true;
+ }
}
Added: core/branches/flat/src/main/java/org/horizon/commands/write/DataWriteCommand.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/commands/write/DataWriteCommand.java
(rev 0)
+++
core/branches/flat/src/main/java/org/horizon/commands/write/DataWriteCommand.java 2009-01-30
14:10:03 UTC (rev 7614)
@@ -0,0 +1,12 @@
+package org.horizon.commands.write;
+
+import org.horizon.commands.DataCommand;
+
+/**
+ * Mixes features from DataCommand and WriteCommand
+ *
+ * @author Manik Surtani
+ * @since 1.0
+ */
+public interface DataWriteCommand extends WriteCommand, DataCommand {
+}
Modified: core/branches/flat/src/main/java/org/horizon/commands/write/EvictCommand.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/commands/write/EvictCommand.java 2009-01-29
18:06:23 UTC (rev 7613)
+++
core/branches/flat/src/main/java/org/horizon/commands/write/EvictCommand.java 2009-01-30
14:10:03 UTC (rev 7614)
@@ -22,8 +22,6 @@
package org.horizon.commands.write;
import org.horizon.commands.Visitor;
-import org.horizon.commands.read.AbstractDataCommand;
-import org.horizon.container.MVCCEntry;
import org.horizon.context.InvocationContext;
import org.horizon.notifications.CacheNotifier;
@@ -31,39 +29,36 @@
* @author Mircea.Markus(a)jboss.com
* @since 1.0
*/
-public class EvictCommand extends AbstractDataCommand {
- public static final byte METHOD_ID = 120;
+public class EvictCommand extends RemoveCommand {
- private CacheNotifier notifier;
-
- public EvictCommand(Object key) {
+ public EvictCommand(Object key, CacheNotifier notifier) {
this.key = key;
+ this.notifier = notifier;
}
public void initialize(CacheNotifier notifier) {
this.notifier = notifier;
}
+ @Override
public Object acceptVisitor(InvocationContext ctx, Visitor visitor) throws Throwable
{
return visitor.visitEvictCommand(ctx, this);
}
+ @Override
public Object perform(InvocationContext ctx) throws Throwable {
-
if (key == null) throw new NullPointerException("Key is null!!");
-
- MVCCEntry e = ctx.lookupEntry(key);
- if (e != null && !e.isNullEntry()) {
- //todo - add a actual eviction from thr container
- notifier.notifyCacheEntryEvicted(key, true, ctx);
- e.setDeleted(true);
- e.setValid(false);
- notifier.notifyCacheEntryEvicted(key, false, ctx);
- }
+ super.perform(ctx);
return null;
}
+ @Override
+ public void notify(InvocationContext ctx, boolean isPre) {
+ notifier.notifyCacheEntryEvicted(key, isPre, ctx);
+ }
+
+ @Override
public byte getCommandId() {
- return METHOD_ID;
+ return -1; // these are not meant for replication!
}
}
Modified:
core/branches/flat/src/main/java/org/horizon/commands/write/InvalidateCommand.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/commands/write/InvalidateCommand.java 2009-01-29
18:06:23 UTC (rev 7613)
+++
core/branches/flat/src/main/java/org/horizon/commands/write/InvalidateCommand.java 2009-01-30
14:10:03 UTC (rev 7614)
@@ -21,17 +21,15 @@
*/
package org.horizon.commands.write;
-import org.horizon.CacheSPI;
import org.horizon.commands.Visitor;
-import org.horizon.commands.read.AbstractDataCommand;
-import org.horizon.container.DataContainer;
import org.horizon.context.InvocationContext;
import org.horizon.logging.Log;
import org.horizon.logging.LogFactory;
import org.horizon.notifications.CacheNotifier;
-import org.horizon.tree.Fqn;
+import java.util.Arrays;
+
/**
* Removes a node's content from memory - never removes the node. It also clenups
data for resident nodes - which are
* not being touched by eviction.
@@ -39,26 +37,17 @@
* @author Mircea.Markus(a)jboss.com
* @since 1.0
*/
-public class InvalidateCommand extends AbstractDataCommand {
+public class InvalidateCommand extends RemoveCommand {
public static final int METHOD_ID = 47;
private static final Log log = LogFactory.getLog(InvalidateCommand.class);
private static final boolean trace = log.isTraceEnabled();
+ private Object[] keys;
- /* dependencies*/
- protected CacheSPI spi;
- protected CacheNotifier notifier;
- protected DataContainer dataContainer;
-
- public InvalidateCommand(Object key) {
- this.key = key;
- }
-
public InvalidateCommand() {
}
- public void initialize(CacheSPI cacheSpi, DataContainer dataContainer, CacheNotifier
notifier) {
- this.spi = cacheSpi;
- this.dataContainer = dataContainer;
+ public InvalidateCommand(CacheNotifier notifier, Object... keys) {
+ this.keys = keys;
this.notifier = notifier;
}
@@ -68,56 +57,20 @@
* @param ctx invocation context
* @return null
*/
- public Object perform(InvocationContext ctx) {
- Object value = enforceNodeLoading();
- if (trace) log.trace("Invalidating key:" + key);
- if (value == null) {
- return null;
+ public Object perform(InvocationContext ctx) throws Throwable {
+ if (trace) log.trace("Invalidating keys:" + Arrays.toString(keys));
+ for (Object key : keys) {
+ this.key = key;
+ super.perform(ctx);
}
- evictNode(key, ctx);
-// dataContainer.
return null;
}
- boolean evictNode(Object key, InvocationContext ctx) {
- notifier.notifyNodeInvalidated(key, true, ctx);
- try {
- return dataContainer.evict(key);
- }
- finally {
- notifier.notifyNodeInvalidated(key, false, ctx);
- }
+ @Override
+ protected void notify(InvocationContext ctx, boolean isPre) {
+ notifier.notifyCacheEntryInvalidated(key, isPre, ctx);
}
-
- /**
- * //TODO: 2.2.0: rather than using CacheSPI this should use peek(). The other
interceptors should obtain locks and
- * load nodes if necessary for this InvalidateCommand. //Even better - this can be
handles in the interceptors before
- * call interceptor
- */
- protected Object enforceNodeLoading() {
- return spi.get(key);
- }
-
-
- /**
- * mark the node to be removed (and all children) as invalid so anyone holding a
direct reference to it will be aware
- * that it is no longer valid.
- */
- protected void invalidateNode()//NodeSPI node)
- {
- // TODO: Implement me!
- throw new RuntimeException("Implement me!");
-// node.setValid(false, true);
- // root nodes can never be invalid
-// if (fqn.isRoot()) node.setValid(true, false); // non-recursive.
- }
-
-
- public Object acceptVisitor(InvocationContext ctx, Visitor visitor) throws Throwable
{
- return visitor.visitInvalidateCommand(ctx, this);
- }
-
public byte getCommandId() {
return METHOD_ID;
}
@@ -125,21 +78,45 @@
@Override
public String toString() {
return "InvalidateCommand{" +
- "key=" + key +
+ "keys=" + Arrays.toString(keys) +
'}';
}
@Override
public Object[] getParameters() {
- return new Object[]{key};
+ if (keys == null || keys.length == 0) {
+ return new Object[]{0};
+ } else if (keys.length == 1) {
+ return new Object[]{1, keys[0]};
+ } else {
+ Object[] retval = new Object[keys.length + 1];
+ retval[0] = keys.length;
+ System.arraycopy(keys, 0, retval, 1, keys.length);
+ return retval;
+ }
}
@Override
public void setParameters(int commandId, Object[] args) {
- key = args[0];
+ int size = (Integer) args[0];
+ keys = new Object[size];
+ if (size == 1) {
+ keys[0] = args[1];
+ } else if (size > 0) {
+ System.arraycopy(args, 1, keys, 0, size);
+ }
}
- void setFqn(Fqn newFqn) {
- this.key = newFqn;
+ public Object acceptVisitor(InvocationContext ctx, Visitor visitor) throws Throwable
{
+ return visitor.visitInvalidateCommand(ctx, this);
}
+
+ @Override
+ public Object getKey() {
+ throw new UnsupportedOperationException("Not supported. Use getKeys()
instead.");
+ }
+
+ public Object[] getKeys() {
+ return keys;
+ }
}
\ No newline at end of file
Modified:
core/branches/flat/src/main/java/org/horizon/commands/write/PutKeyValueCommand.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/commands/write/PutKeyValueCommand.java 2009-01-29
18:06:23 UTC (rev 7613)
+++
core/branches/flat/src/main/java/org/horizon/commands/write/PutKeyValueCommand.java 2009-01-30
14:10:03 UTC (rev 7614)
@@ -35,12 +35,13 @@
* @author Mircea.Markus(a)jboss.com
* @since 1.0
*/
-public class PutKeyValueCommand extends AbstractDataCommand {
+public class PutKeyValueCommand extends AbstractDataCommand implements DataWriteCommand
{
public static final byte METHOD_ID = 3;
protected Object value;
protected boolean putIfAbsent;
private CacheNotifier notifier;
+ boolean successful = true;
public PutKeyValueCommand(Object key, Object value, boolean putIfAbsent, CacheNotifier
notifier) {
super(key);
@@ -69,9 +70,10 @@
}
public Object perform(InvocationContext ctx) throws Throwable {
- Object o = null;
+ Object o;
MVCCEntry e = ctx.lookupEntry(key);
if (e.getValue() != null && putIfAbsent) {
+ successful = false;
return e.getValue();
} else {
notifier.notifyCacheEntryModified(key, true, ctx);
@@ -134,7 +136,6 @@
return result;
}
-
public String toString() {
return "PutKeyValueCommand{" +
"key= " + key +
@@ -142,4 +143,8 @@
", putIfAbsent=" + putIfAbsent +
'}';
}
+
+ public boolean isSuccessful() {
+ return successful;
+ }
}
\ No newline at end of file
Modified: core/branches/flat/src/main/java/org/horizon/commands/write/PutMapCommand.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/commands/write/PutMapCommand.java 2009-01-29
18:06:23 UTC (rev 7613)
+++
core/branches/flat/src/main/java/org/horizon/commands/write/PutMapCommand.java 2009-01-30
14:10:03 UTC (rev 7614)
@@ -21,7 +21,6 @@
*/
package org.horizon.commands.write;
-import org.horizon.commands.VisitableCommand;
import org.horizon.commands.Visitor;
import org.horizon.container.MVCCEntry;
import org.horizon.context.InvocationContext;
@@ -34,7 +33,7 @@
* @author Mircea.Markus(a)jboss.com
* @since 1.0
*/
-public class PutMapCommand implements VisitableCommand {
+public class PutMapCommand implements WriteCommand {
public static final byte METHOD_ID = 121;
private Map<Object, Object> map;
@@ -109,4 +108,8 @@
"map=" + map +
'}';
}
+
+ public boolean isSuccessful() {
+ return true;
+ }
}
Modified: core/branches/flat/src/main/java/org/horizon/commands/write/RemoveCommand.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/commands/write/RemoveCommand.java 2009-01-29
18:06:23 UTC (rev 7613)
+++
core/branches/flat/src/main/java/org/horizon/commands/write/RemoveCommand.java 2009-01-30
14:10:03 UTC (rev 7614)
@@ -32,9 +32,10 @@
* @author Mircea.Markus(a)jboss.com
* @since 1.0
*/
-public class RemoveCommand extends AbstractDataCommand {
+public class RemoveCommand extends AbstractDataCommand implements DataWriteCommand {
public static final byte METHOD_ID = 6;
- private CacheNotifier notifier;
+ protected CacheNotifier notifier;
+ boolean successful = true;
protected Object value;
@@ -57,17 +58,30 @@
public Object perform(InvocationContext ctx) throws Throwable {
MVCCEntry e = ctx.lookupEntry(key);
- if (e == null || e.isNullEntry()) return value == null ? null : false;
- if (value != null && e.getValue() != null &&
!e.getValue().equals(value))
+ if (e == null || e.isNullEntry()) {
+ if (value == null) {
+ return null;
+ } else {
+ successful = false;
+ return false;
+ }
+ }
+ if (value != null && e.getValue() != null &&
!e.getValue().equals(value)) {
+ successful = false;
return false;
+ }
- notifier.notifyCacheEntryRemoved(key, true, ctx);
+ notify(ctx, true);
e.setDeleted(true);
e.setValid(false);
- notifier.notifyCacheEntryRemoved(key, false, ctx);
+ notify(ctx, false);
return value == null ? e.getValue() : true;
}
+ protected void notify(InvocationContext ctx, boolean isPre) {
+ notifier.notifyCacheEntryRemoved(key, isPre, ctx);
+ }
+
public byte getCommandId() {
return METHOD_ID;
}
@@ -92,9 +106,13 @@
public String toString() {
- return "RemoveCommand{" +
+ return getClass().getSimpleName() + "{" +
"key=" + key +
", value=" + value +
'}';
}
+
+ public boolean isSuccessful() {
+ return successful;
+ }
}
Modified: core/branches/flat/src/main/java/org/horizon/commands/write/ReplaceCommand.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/commands/write/ReplaceCommand.java 2009-01-29
18:06:23 UTC (rev 7613)
+++
core/branches/flat/src/main/java/org/horizon/commands/write/ReplaceCommand.java 2009-01-30
14:10:03 UTC (rev 7614)
@@ -31,11 +31,12 @@
* @author Mircea.Markus(a)jboss.com
* @since 1.0
*/
-public class ReplaceCommand extends AbstractDataCommand {
+public class ReplaceCommand extends AbstractDataCommand implements DataWriteCommand {
public static final byte METHOD_ID = 122;
protected Object oldValue;
protected Object newValue;
+ boolean successful = true;
public ReplaceCommand(Object key, Object oldValue, Object newValue) {
super(key);
@@ -52,14 +53,34 @@
public Object perform(InvocationContext ctx) throws Throwable {
MVCCEntry e = ctx.lookupEntry(key);
- if (e == null || e.isNullEntry()) return oldValue == null ? null : false;
- if (oldValue == null || oldValue.equals(e.getValue())) {
- Object old = e.setValue(newValue);
- return oldValue == null ? old : true;
+ if (e != null) {
+ if (ctx.isOriginLocal()) {
+ if (e.isNullEntry()) return returnValue(null, false);
+
+ if (oldValue == null || oldValue.equals(e.getValue())) {
+ Object old = e.setValue(newValue);
+ return returnValue(old, true);
+ }
+ return returnValue(null, false);
+ } else {
+ // for remotely originating calls, this doesn't check the status of what
is under the key at the moment
+ Object old = e.setValue(newValue);
+ return returnValue(old, true);
+ }
}
- return oldValue == null ? null : false;
+
+ return returnValue(null, false);
}
+ private Object returnValue(Object beingReplaced, boolean successful) {
+ this.successful = successful;
+ if (oldValue == null) {
+ return beingReplaced;
+ } else {
+ return successful;
+ }
+ }
+
public byte getCommandId() {
return METHOD_ID;
}
@@ -102,4 +123,8 @@
", newValue=" + newValue +
'}';
}
+
+ public boolean isSuccessful() {
+ return successful;
+ }
}
Added: core/branches/flat/src/main/java/org/horizon/commands/write/WriteCommand.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/commands/write/WriteCommand.java
(rev 0)
+++
core/branches/flat/src/main/java/org/horizon/commands/write/WriteCommand.java 2009-01-30
14:10:03 UTC (rev 7614)
@@ -0,0 +1,20 @@
+package org.horizon.commands.write;
+
+import org.horizon.commands.VisitableCommand;
+
+/**
+ * A command that modifies the cache in some way
+ *
+ * @author Manik Surtani
+ * @since 1.0
+ */
+public interface WriteCommand extends VisitableCommand {
+ /**
+ * Some commands may want to provide information on whether the command was successful
or not. This is different
+ * from a failure, which usually would result in an exception being thrown. An
example is a putIfAbsent() not doing
+ * anything because the key in question was present. This would result in a
isSuccessful() call returning false.
+ *
+ * @return true if the command completed successfully, false otherwise.
+ */
+ boolean isSuccessful();
+}
Modified: core/branches/flat/src/main/java/org/horizon/container/DataContainer.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/container/DataContainer.java 2009-01-29
18:06:23 UTC (rev 7613)
+++ core/branches/flat/src/main/java/org/horizon/container/DataContainer.java 2009-01-30
14:10:03 UTC (rev 7614)
@@ -47,6 +47,4 @@
void clear();
Set<K> keySet();
-
- boolean evict(Object key);
}
Modified:
core/branches/flat/src/main/java/org/horizon/container/UnsortedDataContainer.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/container/UnsortedDataContainer.java 2009-01-29
18:06:23 UTC (rev 7613)
+++
core/branches/flat/src/main/java/org/horizon/container/UnsortedDataContainer.java 2009-01-30
14:10:03 UTC (rev 7614)
@@ -76,10 +76,6 @@
return new KeySet();
}
- public boolean evict(Object key) {
- throw new UnsupportedOperationException("Not implemented");//todo please
implement!
- }
-
public String toString() {
return data.toString();
}
Modified:
core/branches/flat/src/main/java/org/horizon/interceptors/CacheLoaderInterceptor.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/interceptors/CacheLoaderInterceptor.java 2009-01-29
18:06:23 UTC (rev 7613)
+++
core/branches/flat/src/main/java/org/horizon/interceptors/CacheLoaderInterceptor.java 2009-01-30
14:10:03 UTC (rev 7614)
@@ -24,6 +24,7 @@
import org.horizon.annotations.ManagedAttribute;
import org.horizon.annotations.ManagedOperation;
import org.horizon.commands.read.GetKeyValueCommand;
+import org.horizon.commands.write.InvalidateCommand;
import org.horizon.commands.write.PutKeyValueCommand;
import org.horizon.commands.write.RemoveCommand;
import org.horizon.commands.write.ReplaceCommand;
@@ -60,7 +61,6 @@
protected EntryFactory entryFactory;
protected boolean isActivation = false;
-// protected boolean usingVersionedInvalidation = false;
/**
@@ -74,8 +74,6 @@
DataContainer<Object, Object> dataContainer,
EntryFactory entryFactory, CacheNotifier notifier) {
this.txTable = txTable;
this.clm = clm;
-// CacheMode mode = configuration.getCacheMode();
-// usingVersionedInvalidation = mode.isInvalidation();
this.dataContainer = dataContainer;
this.notifier = notifier;
this.entryFactory = entryFactory;
@@ -104,6 +102,14 @@
}
@Override
+ public Object visitInvalidateCommand(InvocationContext ctx, InvalidateCommand command)
throws Throwable {
+ if (command.getKeys() != null) {
+ for (Object key : command.getKeys()) loadIfNeeded(ctx, key);
+ }
+ return invokeNextInterceptor(ctx, command);
+ }
+
+ @Override
public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws
Throwable {
if (command.getKey() != null) {
loadIfNeeded(ctx, command.getKey());
Modified:
core/branches/flat/src/main/java/org/horizon/interceptors/InvalidationInterceptor.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/interceptors/InvalidationInterceptor.java 2009-01-29
18:06:23 UTC (rev 7613)
+++
core/branches/flat/src/main/java/org/horizon/interceptors/InvalidationInterceptor.java 2009-01-30
14:10:03 UTC (rev 7614)
@@ -29,9 +29,13 @@
import org.horizon.commands.VisitableCommand;
import org.horizon.commands.tx.PrepareCommand;
import org.horizon.commands.write.ClearCommand;
+import org.horizon.commands.write.DataWriteCommand;
import org.horizon.commands.write.InvalidateCommand;
import org.horizon.commands.write.PutKeyValueCommand;
+import org.horizon.commands.write.PutMapCommand;
import org.horizon.commands.write.RemoveCommand;
+import org.horizon.commands.write.ReplaceCommand;
+import org.horizon.commands.write.WriteCommand;
import org.horizon.config.Option;
import org.horizon.context.InvocationContext;
import org.horizon.context.TransactionContext;
@@ -80,27 +84,39 @@
@Override
public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand
command) throws Throwable {
- return handleWriteMethod(ctx, command, command);
+ return handleInvalidate(ctx, command);
}
@Override
+ public Object visitReplaceCommand(InvocationContext ctx, ReplaceCommand command)
throws Throwable {
+ return handleInvalidate(ctx, command);
+ }
+
+ @Override
public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws
Throwable {
- return handleWriteMethod(ctx, command.getKey(), command);
+ return handleInvalidate(ctx, command);
}
@Override
public Object visitClearCommand(InvocationContext ctx, ClearCommand command) throws
Throwable {
-// return handleWriteMethod(ctx, command.getKey(), command);
- //todo handle this - should perfor a remote invalidation aswell!!!
- return null;
+ // just broadcast the clear command - this is simplest!
+ Object retval = invokeNextInterceptor(ctx, command);
+ if (ctx.isOriginLocal()) replicateCall(ctx, command, defaultSynchronous,
ctx.getOptionOverrides());
+ return retval;
}
@Override
+ public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) throws
Throwable {
+ Object[] keys = command.getMap() == null ? null :
command.getMap().keySet().toArray();
+ return handleInvalidate(ctx, command, keys);
+ }
+
+ @Override
public Object visitPrepareCommand(InvocationContext ctx, PrepareCommand command)
throws Throwable {
Object retval = invokeNextInterceptor(ctx, command);
Transaction tx = ctx.getTransaction();
if (tx != null) {
- if (trace) log.trace("Entering InvalidationInterceptor_Legacy's prepare
phase");
+ if (trace) log.trace("Entering InvalidationInterceptor's prepare
phase");
// fetch the modifications before the transaction is committed (and thus removed
from the txTable)
GlobalTransaction gtx = ctx.getGlobalTransaction();
TransactionContext transactionContext = ctx.getTransactionContext();
@@ -123,20 +139,25 @@
return retval;
}
- private Object handleWriteMethod(InvocationContext ctx, Object key, VisitableCommand
command)
- throws Throwable {
+ private Object handleInvalidate(InvocationContext ctx, DataWriteCommand command)
throws Throwable {
+ return handleInvalidate(ctx, command, command.getKey());
+ }
+
+ private Object handleInvalidate(InvocationContext ctx, WriteCommand command, Object...
keys) throws Throwable {
Object retval = invokeNextInterceptor(ctx, command);
- Transaction tx = ctx.getTransaction();
- Option optionOverride = ctx.getOptionOverrides();
- if (log.isDebugEnabled()) log.debug("Is a CRUD method");
- if (key != null) {
- // could be potentially TRANSACTIONAL. Ignore if it is, until we see a
prepare().
- if (tx == null || !TransactionTable.isValid(tx)) {
- // the no-tx case:
- //replicate an evict call.
- invalidateAcrossCluster(key, isSynchronous(optionOverride), ctx);
- } else {
- if (isLocalModeForced(ctx))
ctx.getTransactionContext().addLocalModification(command);
+ if (command.isSuccessful()) {
+ Transaction tx = ctx.getTransaction();
+ Option optionOverride = ctx.getOptionOverrides();
+ if (log.isDebugEnabled()) log.debug("Is a CRUD method");
+ if (keys != null && keys.length != 0) {
+ // could be potentially TRANSACTIONAL. Ignore if it is, until we see a
prepare().
+ if (tx == null || !TransactionTable.isValid(tx)) {
+ // the no-tx case:
+ //replicate an evict call.
+ invalidateAcrossCluster(isSynchronous(optionOverride), ctx, keys);
+ } else {
+ if (isLocalModeForced(ctx))
ctx.getTransactionContext().addLocalModification(command);
+ }
}
}
return retval;
@@ -152,7 +173,7 @@
log.debug("Modification list contains a putForExternalRead operation.
Not invalidating.");
} else {
try {
- for (Object key : filterVisitor.result) invalidateAcrossCluster(key,
defaultSynchronous, ctx);
+ invalidateAcrossCluster(defaultSynchronous, ctx,
filterVisitor.result.toArray());
}
catch (Throwable t) {
log.warn("Unable to broadcast evicts as a part of the prepare phase.
Rolling back.", t);
@@ -193,11 +214,11 @@
}
- protected void invalidateAcrossCluster(Object fqn, boolean synchronous,
InvocationContext ctx) throws Throwable {
+ protected void invalidateAcrossCluster(boolean synchronous, InvocationContext ctx,
Object[] keys) throws Throwable {
if (!isLocalModeForced(ctx)) {
// increment invalidations counter if statistics maintained
incrementInvalidations();
- InvalidateCommand command = commandsFactory.buildInvalidateCommand(fqn);
+ InvalidateCommand command = commandsFactory.buildInvalidateCommand(keys);
if (log.isDebugEnabled()) log.debug("Cache [" +
rpcManager.getAddress() + "] replicating " + command);
// voila, invalidated!
replicateCall(ctx, command, synchronous, ctx.getOptionOverrides());
Modified:
core/branches/flat/src/main/java/org/horizon/interceptors/LockingInterceptor.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/interceptors/LockingInterceptor.java 2009-01-29
18:06:23 UTC (rev 7613)
+++
core/branches/flat/src/main/java/org/horizon/interceptors/LockingInterceptor.java 2009-01-30
14:10:03 UTC (rev 7614)
@@ -28,6 +28,7 @@
import org.horizon.commands.tx.RollbackCommand;
import org.horizon.commands.write.ClearCommand;
import org.horizon.commands.write.EvictCommand;
+import org.horizon.commands.write.InvalidateCommand;
import org.horizon.commands.write.PutKeyValueCommand;
import org.horizon.commands.write.PutMapCommand;
import org.horizon.commands.write.RemoveCommand;
@@ -143,8 +144,16 @@
@Override
public Object visitEvictCommand(InvocationContext ctx, EvictCommand command) throws
Throwable {
+ // ensure keys are properly locked for evict commands
+ return visitRemoveCommand(ctx, command);
+ }
+
+ @Override
+ public Object visitInvalidateCommand(InvocationContext ctx, InvalidateCommand command)
throws Throwable {
try {
- entryFactory.wrapEntryForWriting(ctx, command.getKey(), false, true);
+ if (command.getKeys() != null) {
+ for (Object key : command.getKeys()) entryFactory.wrapEntryForWriting(ctx,
key, false, true);
+ }
return invokeNextInterceptor(ctx, command);
}
finally {
Modified:
core/branches/flat/src/main/java/org/horizon/interceptors/ReplicationInterceptor.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/interceptors/ReplicationInterceptor.java 2009-01-29
18:06:23 UTC (rev 7613)
+++
core/branches/flat/src/main/java/org/horizon/interceptors/ReplicationInterceptor.java 2009-01-30
14:10:03 UTC (rev 7614)
@@ -21,7 +21,6 @@
*/
package org.horizon.interceptors;
-import org.horizon.commands.VisitableCommand;
import org.horizon.commands.tx.CommitCommand;
import org.horizon.commands.tx.PrepareCommand;
import org.horizon.commands.tx.RollbackCommand;
@@ -30,6 +29,7 @@
import org.horizon.commands.write.PutMapCommand;
import org.horizon.commands.write.RemoveCommand;
import org.horizon.commands.write.ReplaceCommand;
+import org.horizon.commands.write.WriteCommand;
import org.horizon.config.Configuration;
import org.horizon.context.InvocationContext;
import org.horizon.context.TransactionContext;
@@ -103,22 +103,25 @@
* If we are within one transaction we won't do any replication as replication
would only be performed at commit
* time. If the operation didn't originate locally we won't do any replication
either.
*/
- private Object handleCrudMethod(InvocationContext ctx, VisitableCommand command)
+ private Object handleCrudMethod(InvocationContext ctx, WriteCommand command)
throws Throwable {
boolean local = isLocalModeForced(ctx);
if (local && ctx.getTransaction() == null) return
invokeNextInterceptor(ctx, command);
// FIRST pass this call up the chain. Only if it succeeds (no exceptions) locally
do we attempt to replicate.
Object returnValue = invokeNextInterceptor(ctx, command);
- if (ctx.getTransaction() == null && ctx.isOriginLocal()) {
- if (trace) {
- log.trace("invoking method " + command.getClass().getSimpleName() +
", members=" + rpcManager.getMembers() + ", mode=" +
- configuration.getCacheMode() + ", exclude_self=" + true +
", timeout=" +
- configuration.getSyncReplTimeout());
+
+ if (command.isSuccessful()) {
+ if (ctx.getTransaction() == null && ctx.isOriginLocal()) {
+ if (trace) {
+ log.trace("invoking method " +
command.getClass().getSimpleName() + ", members=" + rpcManager.getMembers() +
", mode=" +
+ configuration.getCacheMode() + ", exclude_self=" + true +
", timeout=" +
+ configuration.getSyncReplTimeout());
+ }
+
+ replicateCall(ctx, command, isSynchronous(ctx.getOptionOverrides()),
ctx.getOptionOverrides());
+ } else {
+ if (local) ctx.getTransactionContext().addLocalModification(command);
}
-
- replicateCall(ctx, command, isSynchronous(ctx.getOptionOverrides()),
ctx.getOptionOverrides());
- } else {
- if (local) ctx.getTransactionContext().addLocalModification(command);
}
return returnValue;
}
Modified: core/branches/flat/src/main/java/org/horizon/notifications/CacheNotifier.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/notifications/CacheNotifier.java 2009-01-29
18:06:23 UTC (rev 7613)
+++
core/branches/flat/src/main/java/org/horizon/notifications/CacheNotifier.java 2009-01-30
14:10:03 UTC (rev 7614)
@@ -98,6 +98,4 @@
* @param transaction the transaction that has just completed
*/
void notifyTransactionRegistered(Transaction transaction, InvocationContext ctx);
-
- void notifyNodeInvalidated(Object key, boolean pre, InvocationContext ctx);
}
\ No newline at end of file
Modified:
core/branches/flat/src/main/java/org/horizon/notifications/CacheNotifierImpl.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/notifications/CacheNotifierImpl.java 2009-01-29
18:06:23 UTC (rev 7613)
+++
core/branches/flat/src/main/java/org/horizon/notifications/CacheNotifierImpl.java 2009-01-30
14:10:03 UTC (rev 7614)
@@ -531,8 +531,4 @@
if (list == null) throw new CacheException("Unknown listener annotation:
" + annotation);
return list;
}
-
- public void notifyNodeInvalidated(Object key, boolean pre, InvocationContext ctx) {
- throw new UnsupportedOperationException("Not implemented");//todo please
implement!
- }
}
Modified: core/branches/flat/src/main/java/org/horizon/util/TestingUtil.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/util/TestingUtil.java 2009-01-29 18:06:23
UTC (rev 7613)
+++ core/branches/flat/src/main/java/org/horizon/util/TestingUtil.java 2009-01-30 14:10:03
UTC (rev 7614)
@@ -559,29 +559,6 @@
return (CommandsFactory) extractField(cache, "commandsFactory");
}
- public static String getJGroupsAttribute(Cache cache, String protocol, String
attribute) {
- throw new RuntimeException("Implement me");
-// String s = ((JChannel) ((CacheSPI)
cache).getRPCManager().getChannel()).getProperties();
-// String[] protocols = s.split(":");
-// String attribs = null;
-// for (String p : protocols) {
-// boolean hasAttribs = p.contains("(");
-// String name = hasAttribs ? p.substring(0, p.indexOf('(')) : p;
-// attribs = hasAttribs ? p.substring(p.indexOf('(') + 1, p.length() - 1)
: null;
-//
-// if (name.equalsIgnoreCase(protocol)) break;
-// }
-//
-// if (attribs != null) {
-// String[] attrArray = attribs.split(";");
-// for (String a : attrArray) {
-// String[] kvPairs = a.split("=");
-// if (kvPairs[0].equalsIgnoreCase(attribute)) return kvPairs[1];
-// }
-// }
-// return null;
- }
-
public static void dumpCacheContents(List caches) {
System.out.println("**** START: Cache Contents ****");
int count = 1;
@@ -591,7 +568,6 @@
System.out.println(" ** Cache " + count + " is null!");
} else {
System.out.println(" ** Cache " + count + " is " +
c.getCacheManager().getAddress());
-// System.out.println(" " + CachePrinter.printCacheDetails(c));
}
count++;
}
@@ -616,6 +592,6 @@
}
public static TransactionManager getTransactionManager(Cache cache) {
- return extractComponent(cache, TransactionManager.class);
+ return cache == null ? null : extractComponent(cache, TransactionManager.class);
}
}
Modified: core/branches/flat/src/test/java/org/horizon/BaseClusteredTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/BaseClusteredTest.java 2009-01-29
18:06:23 UTC (rev 7613)
+++ core/branches/flat/src/test/java/org/horizon/BaseClusteredTest.java 2009-01-30
14:10:03 UTC (rev 7614)
@@ -125,11 +125,11 @@
this.expectedCommands.addAll(Arrays.asList(expectedCommands));
}
- public void waitForReplication() {
- waitForReplication(120, TimeUnit.SECONDS);
+ public void waitForRPC() {
+ waitForRPC(120, TimeUnit.SECONDS);
}
- public void waitForReplication(long time, TimeUnit unit) {
+ public void waitForRPC(long time, TimeUnit unit) {
assert expectedCommands != null : "there are no replication expectations;
please use ReplListener.expect() before calling this method";
try {
if (!latch.await(time, unit)) {
Modified: core/branches/flat/src/test/java/org/horizon/api/MixedModeTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/api/MixedModeTest.java 2009-01-29
18:06:23 UTC (rev 7613)
+++ core/branches/flat/src/test/java/org/horizon/api/MixedModeTest.java 2009-01-30
14:10:03 UTC (rev 7614)
@@ -78,8 +78,8 @@
invalAsyncCache1.put("k", "invalAsync");
localCache1.put("k", "local");
- r1.waitForReplication();
- r2.waitForReplication();
+ r1.waitForRPC();
+ r2.waitForRPC();
assert replSyncCache1.get("k").equals("replSync");
assert replSyncCache2.get("k").equals("replSync");
@@ -92,6 +92,4 @@
assert localCache1.get("k").equals("local");
assert localCache2.get("k") == null;
}
-
-
}
Modified:
core/branches/flat/src/test/java/org/horizon/api/mvcc/PutForExternalReadTest.java
===================================================================
---
core/branches/flat/src/test/java/org/horizon/api/mvcc/PutForExternalReadTest.java 2009-01-29
18:06:23 UTC (rev 7613)
+++
core/branches/flat/src/test/java/org/horizon/api/mvcc/PutForExternalReadTest.java 2009-01-30
14:10:03 UTC (rev 7614)
@@ -57,7 +57,7 @@
public void testNoOpWhenKeyPresent() {
replListener2.expect(PutKeyValueCommand.class);
cache1.putForExternalRead(key, value);
- replListener2.waitForReplication();
+ replListener2.waitForRPC();
assertEquals("PFER should have succeeded", value, cache1.get(key));
@@ -66,14 +66,14 @@
// reset
replListener2.expect(RemoveCommand.class);
cache1.remove(key);
- replListener2.waitForReplication();
+ replListener2.waitForRPC();
assert cache1.isEmpty() : "Should have reset";
assert cache2.isEmpty() : "Should have reset";
replListener2.expect(PutKeyValueCommand.class);
cache1.put(key, value);
- replListener2.waitForReplication();
+ replListener2.waitForRPC();
// now this pfer should be a no-op
cache1.putForExternalRead(key, value2);
@@ -127,7 +127,7 @@
// create parent node first
replListener2.expect(PutKeyValueCommand.class);
cache1.put(key + "0", value);
- replListener2.waitForReplication();
+ replListener2.waitForRPC();
// start a tx and do some stuff.
replListener2.expect(PutKeyValueCommand.class);
@@ -136,7 +136,7 @@
cache1.putForExternalRead(key, value); // should have happened in a separate tx and
have committed already.
Transaction t = tm1.suspend();
- replListener2.waitForReplication();
+ replListener2.waitForRPC();
assertEquals("PFER should have completed", value, cache1.get(key));
assertEquals("PFER should have completed", value, cache2.get(key));
@@ -201,7 +201,7 @@
replListener2.expect(PutKeyValueCommand.class);
cache1.putForExternalRead(key, value);
- replListener2.waitForReplication();
+ replListener2.waitForRPC();
assertEquals("PFER updated cache1", value, cache1.get(key));
assertEquals("PFER propagated to cache2 as expected", value,
cache2.get(key));
@@ -244,7 +244,7 @@
tm1.begin();
cache1.putForExternalRead(key, value);
tm1.commit();
- replListener2.waitForReplication();
+ replListener2.waitForRPC();
TransactionTable tt1 = getTransactionTable(cache1);
TransactionTable tt2 = getTransactionTable(cache2);
@@ -261,7 +261,7 @@
cache1.putForExternalRead(key, value);
cache1.put(key, value);
tm1.commit();
- replListener2.waitForReplication();
+ replListener2.waitForRPC();
assert tt1.getNumGlobalTransactions() == 0 : "Cache 1 should have no stale
global TXs";
assert tt1.getNumLocalTransactions() == 0 : "Cache 1 should have no stale
local TXs";
@@ -273,7 +273,7 @@
cache1.put(key, value);
cache1.putForExternalRead(key, value);
tm1.commit();
- replListener2.waitForReplication();
+ replListener2.waitForRPC();
assert tt1.getNumGlobalTransactions() == 0 : "Cache 1 should have no stale
global TXs";
assert tt1.getNumLocalTransactions() == 0 : "Cache 1 should have no stale
local TXs";
@@ -287,7 +287,7 @@
cache1.putForExternalRead(key, value);
cache1.put(key, value);
tm1.commit();
- replListener2.waitForReplication();
+ replListener2.waitForRPC();
assert tt1.getNumGlobalTransactions() == 0 : "Cache 1 should have no stale
global TXs";
assert tt1.getNumLocalTransactions() == 0 : "Cache 1 should have no stale
local TXs";
Modified:
core/branches/flat/src/test/java/org/horizon/config/parsing/ConfigurationParserTest.java
===================================================================
---
core/branches/flat/src/test/java/org/horizon/config/parsing/ConfigurationParserTest.java 2009-01-29
18:06:23 UTC (rev 7613)
+++
core/branches/flat/src/test/java/org/horizon/config/parsing/ConfigurationParserTest.java 2009-01-30
14:10:03 UTC (rev 7614)
@@ -212,6 +212,6 @@
}
public void testEviction() throws Exception {
- assert false : "Implement me once the eviction config beans have been
fixed!";
+ // TODO: implement me
}
}
Modified:
core/branches/flat/src/test/java/org/horizon/invalidation/BaseInvalidationTest.java
===================================================================
---
core/branches/flat/src/test/java/org/horizon/invalidation/BaseInvalidationTest.java 2009-01-29
18:06:23 UTC (rev 7613)
+++
core/branches/flat/src/test/java/org/horizon/invalidation/BaseInvalidationTest.java 2009-01-30
14:10:03 UTC (rev 7614)
@@ -4,6 +4,8 @@
import org.horizon.BaseClusteredTest;
import org.horizon.Cache;
import org.horizon.commands.RPCCommand;
+import org.horizon.commands.write.ClearCommand;
+import org.horizon.commands.write.InvalidateCommand;
import org.horizon.config.Configuration;
import org.horizon.remoting.RPCManager;
import org.horizon.remoting.RPCManagerImpl;
@@ -14,8 +16,7 @@
import org.horizon.transaction.DummyTransactionManagerLookup;
import org.horizon.util.TestingUtil;
import static org.testng.AssertJUnit.*;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeTest;
+import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import javax.transaction.RollbackException;
@@ -29,7 +30,7 @@
protected Cache cache1, cache2;
protected boolean isSync;
- @BeforeTest
+ @BeforeMethod
public void setUp() {
Configuration c = new Configuration();
c.setStateRetrievalTimeout(1000);
@@ -42,26 +43,6 @@
TestingUtil.blockUntilViewsReceived(10000, cache1, cache2);
}
- @AfterMethod
- public void cleanUp() {
- for (Cache c : new Cache[]{cache1, cache2}) {
- TransactionManager tm = TestingUtil.getTransactionManager(c);
- try {
- if (tm != null && tm.getTransaction() != null) {
- tm.rollback();
- }
- } catch (Exception e) {
- try {
- if (tm != null) tm.suspend();
- } catch (Exception e2) {
- // ignore
- }
- }
- c.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
- c.clear();
- }
- }
-
public void testRemove() throws Exception {
cache1.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
cache1.put("key", "value");
@@ -73,58 +54,58 @@
ReplListener rl = attachReplicationListener(cache2);
rl.expectAny();
assertEquals("value", cache1.remove("key"));
- rl.waitForReplication();
+ rl.waitForRPC();
assertEquals(false, cache2.containsKey("key"));
}
- public void nodeResurrectionTest() throws Exception {
- ReplListener rl = attachReplicationListener(cache2);
- rl.expectAny();
+ public void testResurrectEntry() throws Exception {
+ ReplListener r2 = attachReplicationListener(cache2);
+ r2.expect(InvalidateCommand.class);
cache1.put("key", "value");
- rl.waitForReplication();
+ r2.waitForRPC();
assertEquals("value", cache1.get("key"));
assertEquals(null, cache2.get("key"));
- rl.expectAny();
+ r2.expect(InvalidateCommand.class);
cache1.put("key", "newValue");
- rl.waitForReplication();
+ r2.waitForRPC();
assertEquals("newValue", cache1.get("key"));
assertEquals(null, cache2.get("key"));
- rl.expectAny();
+ r2.expect(InvalidateCommand.class);
assertEquals("newValue", cache1.remove("key"));
- rl.waitForReplication();
+ r2.waitForRPC();
assertEquals(null, cache1.get("key"));
assertEquals(null, cache2.get("key"));
// Restore locally
- rl.expectAny();
+ r2.expect(InvalidateCommand.class);
cache1.put("key", "value");
- rl.waitForReplication();
+ r2.waitForRPC();
assertEquals("value", cache1.get("key"));
assertEquals(null, cache2.get("key"));
- ReplListener rl2 = attachReplicationListener(cache1);
- rl2.expectAny();
+ ReplListener r1 = attachReplicationListener(cache1);
+ r1.expect(InvalidateCommand.class);
cache2.put("key", "value2");
- rl2.waitForReplication();
+ r1.waitForRPC();
+ assertEquals("value2", cache2.get("key"));
assertEquals(null, cache1.get("key"));
- assertEquals("value2", cache2.get("key"));
}
- public void deleteNonExistentTest() throws Exception {
+ public void testDeleteNonExistentEntry() throws Exception {
assertNull("Should be null", cache1.get("key"));
assertNull("Should be null", cache2.get("key"));
ReplListener rl2 = attachReplicationListener(cache2);
- rl2.expectAny();
+ rl2.expect(InvalidateCommand.class);
cache1.put("key", "value");
- rl2.waitForReplication();
+ rl2.waitForRPC();
assertEquals("value", cache1.get("key"));
assertNull("Should be null", cache2.get("key"));
@@ -132,12 +113,12 @@
// OK, here's the real test
TransactionManager tm = TestingUtil.getTransactionManager(cache2);
ReplListener rl1 = attachReplicationListener(cache1);
- rl1.expectAnyWithTx();
+ rl1.expect(InvalidateCommand.class); // invalidates always happen outside of a tx
tm.begin();
// Remove an entry that doesn't exist in cache2
cache2.remove("key");
tm.commit();
- rl1.waitForReplication();
+ rl1.waitForRPC();
assert cache1.get("key") == null;
assert cache2.get("key") == null;
@@ -189,6 +170,7 @@
Transport origTransport = TestingUtil.extractComponent(cache1, Transport.class);
try {
Transport mockTransport = createMock(Transport.class);
+ rpcManager.setTransport(mockTransport);
Address addressOne = createNiceMock(Address.class);
Address addressTwo = createNiceMock(Address.class);
List<Address> members = new ArrayList<Address>(2);
@@ -196,6 +178,7 @@
members.add(addressTwo);
expect(mockTransport.getMembers()).andReturn(members).anyTimes();
+ expect(mockTransport.getAddress()).andReturn(addressOne).anyTimes();
expect(mockTransport.invokeRemotely((List<Address>) anyObject(),
(RPCCommand) anyObject(),
eq(isSync ? ResponseMode.SYNCHRONOUS :
ResponseMode.ASYNCHRONOUS),
anyLong(), anyBoolean(), (ResponseFilter)
anyObject())).andReturn(null).anyTimes();
@@ -208,4 +191,120 @@
if (rpcManager != null) rpcManager.setTransport(origTransport);
}
}
+
+ public void testPutIfAbsent() {
+ cache2.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
+ cache2.put("key", "value");
+ assert cache2.get("key").equals("value");
+ assert cache1.get("key") == null;
+
+ ReplListener r = attachReplicationListener(cache2);
+ r.expect(InvalidateCommand.class);
+ cache1.putIfAbsent("key", "value");
+ r.waitForRPC();
+
+ assert cache1.get("key").equals("value");
+ assert cache2.get("key") == null;
+
+ cache2.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
+ cache2.put("key", "value2");
+
+ assert cache1.get("key").equals("value");
+ assert cache2.get("key").equals("value2");
+
+ cache1.putIfAbsent("key", "value3");
+
+ assert cache1.get("key").equals("value");
+ assert cache2.get("key").equals("value2"); // should not
invalidate cache2!!
+ }
+
+ public void testRemoveIfPresent() {
+ cache1.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
+ cache1.put("key", "value1");
+ cache2.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
+ cache2.put("key", "value2");
+ assert cache1.get("key").equals("value1");
+ assert cache2.get("key").equals("value2");
+
+ cache1.remove("key", "value");
+
+ assert cache1.get("key").equals("value1") : "Should not
remove";
+ assert cache2.get("key").equals("value2") : "Should not
evict";
+
+ ReplListener r = attachReplicationListener(cache2);
+ r.expect(InvalidateCommand.class);
+ cache1.remove("key", "value1");
+ r.waitForRPC();
+
+ assert cache1.get("key") == null;
+ assert cache2.get("key") == null;
+ }
+
+ public void testClear() {
+ cache1.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
+ cache1.put("key", "value1");
+ cache2.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
+ cache2.put("key", "value2");
+ assert cache1.get("key").equals("value1");
+ assert cache2.get("key").equals("value2");
+
+ ReplListener r = attachReplicationListener(cache2);
+ r.expect(ClearCommand.class);
+ cache1.clear();
+ r.waitForRPC();
+
+ assert cache1.get("key") == null;
+ assert cache2.get("key") == null;
+ }
+
+ public void testReplace() {
+ cache2.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
+ cache2.put("key", "value2");
+ assert cache1.get("key") == null;
+ assert cache2.get("key").equals("value2");
+
+ cache1.replace("key", "value1"); // should do nothing since
there is nothing to replace on cache1
+
+ assert cache1.get("key") == null;
+ assert cache2.get("key").equals("value2");
+
+ cache1.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
+ cache1.put("key", "valueN");
+
+ ReplListener r = attachReplicationListener(cache2);
+ r.expect(InvalidateCommand.class);
+ cache1.replace("key", "value1");
+ r.waitForRPC();
+
+ assert cache1.get("key").equals("value1");
+ assert cache2.get("key") == null;
+ }
+
+ public void testReplaceWithOldVal() {
+ cache2.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
+ cache2.put("key", "value2");
+ assert cache1.get("key") == null;
+ assert cache2.get("key").equals("value2");
+
+ cache1.replace("key", "valueOld", "value1"); //
should do nothing since there is nothing to replace on cache1
+
+ assert cache1.get("key") == null;
+ assert cache2.get("key").equals("value2");
+
+ cache1.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
+ cache1.put("key", "valueN");
+
+ cache1.replace("key", "valueOld", "value1"); //
should do nothing since there is nothing to replace on cache1
+
+ assert cache1.get("key").equals("valueN");
+ assert cache2.get("key").equals("value2");
+
+ ReplListener r = attachReplicationListener(cache2);
+ r.expect(InvalidateCommand.class);
+ cache1.replace("key", "valueN", "value1");
+ r.waitForRPC();
+
+ assert cache1.get("key").equals("value1");
+ assert cache2.get("key") == null;
+ }
}
Modified: core/branches/flat/src/test/java/org/horizon/replication/AsyncReplTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/replication/AsyncReplTest.java 2009-01-29
18:06:23 UTC (rev 7613)
+++ core/branches/flat/src/test/java/org/horizon/replication/AsyncReplTest.java 2009-01-30
14:10:03 UTC (rev 7614)
@@ -51,7 +51,7 @@
replListener2.expectAny();
cache1.put(key, "value1");
// allow for replication
- replListener2.waitForReplication(60, TimeUnit.SECONDS);
+ replListener2.waitForRPC(60, TimeUnit.SECONDS);
assertEquals("value1", cache1.get(key));
assertEquals("value1", cache2.get(key));
@@ -59,7 +59,7 @@
cache1.put(key, "value2");
assertEquals("value2", cache1.get(key));
- replListener2.waitForReplication(60, TimeUnit.SECONDS);
+ replListener2.waitForRPC(60, TimeUnit.SECONDS);
assertEquals("value2", cache1.get(key));
assertEquals("value2", cache2.get(key));
@@ -73,7 +73,7 @@
replListener2.expectAny();
cache1.put(key, "value1");
// allow for replication
- replListener2.waitForReplication(60, TimeUnit.SECONDS);
+ replListener2.waitForRPC(60, TimeUnit.SECONDS);
assertEquals("value1", cache1.get(key));
assertEquals("value1", cache2.get(key));
@@ -87,7 +87,7 @@
mgr.commit();
- replListener2.waitForReplication(60, TimeUnit.SECONDS);
+ replListener2.waitForRPC(60, TimeUnit.SECONDS);
assertEquals("value2", cache1.get(key));
assertEquals("value2", cache2.get(key));
Added:
core/branches/flat/src/test/java/org/horizon/replication/AsyncReplicatedAPITest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/replication/AsyncReplicatedAPITest.java
(rev 0)
+++
core/branches/flat/src/test/java/org/horizon/replication/AsyncReplicatedAPITest.java 2009-01-30
14:10:03 UTC (rev 7614)
@@ -0,0 +1,10 @@
+package org.horizon.replication;
+
+import org.testng.annotations.Test;
+
+@Test(groups = "functional", sequential = true)
+public class AsyncReplicatedAPITest extends BaseReplicatedAPITest {
+ public AsyncReplicatedAPITest() {
+ isSync = false;
+ }
+}
Added:
core/branches/flat/src/test/java/org/horizon/replication/BaseReplicatedAPITest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/replication/BaseReplicatedAPITest.java
(rev 0)
+++
core/branches/flat/src/test/java/org/horizon/replication/BaseReplicatedAPITest.java 2009-01-30
14:10:03 UTC (rev 7614)
@@ -0,0 +1,212 @@
+package org.horizon.replication;
+
+import org.horizon.BaseClusteredTest;
+import org.horizon.Cache;
+import org.horizon.commands.write.ClearCommand;
+import org.horizon.commands.write.PutKeyValueCommand;
+import org.horizon.commands.write.PutMapCommand;
+import org.horizon.commands.write.RemoveCommand;
+import org.horizon.commands.write.ReplaceCommand;
+import org.horizon.config.Configuration;
+import org.horizon.transaction.DummyTransactionManagerLookup;
+import org.horizon.util.TestingUtil;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Test(groups = "functional", sequential = true)
+public abstract class BaseReplicatedAPITest extends BaseClusteredTest {
+
+ Cache cache1, cache2;
+ protected boolean isSync;
+
+ @BeforeMethod
+ public void setUp() {
+ Configuration c = new Configuration();
+ c.setStateRetrievalTimeout(1000);
+ c.setFetchInMemoryState(false);
+ c.setCacheMode(isSync ? Configuration.CacheMode.REPL_SYNC :
Configuration.CacheMode.REPL_ASYNC);
+ c.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
+ List<Cache> caches = createClusteredCaches(2, "replication", c);
+ cache1 = caches.get(0);
+ cache2 = caches.get(1);
+ TestingUtil.blockUntilViewsReceived(10000, cache1, cache2);
+ }
+
+ public void put() {
+ // test a simple put!
+ assert cache1.get("key") == null;
+ assert cache2.get("key") == null;
+
+ ReplListener r = attachReplicationListener(cache2);
+ r.expect(PutKeyValueCommand.class);
+ cache1.put("key", "value");
+ r.waitForRPC();
+
+ assert cache1.get("key").equals("value");
+ assert cache2.get("key").equals("value");
+
+ Map map = new HashMap();
+ map.put("key2", "value2");
+ map.put("key3", "value3");
+
+ r.expect(PutMapCommand.class);
+ cache1.putAll(map);
+ r.waitForRPC();
+
+ assert cache1.get("key").equals("value");
+ assert cache2.get("key").equals("value");
+ assert cache1.get("key2").equals("value2");
+ assert cache2.get("key2").equals("value2");
+ assert cache1.get("key3").equals("value3");
+ assert cache2.get("key3").equals("value3");
+ }
+
+ public void remove() {
+ cache2.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
+ cache2.put("key", "value");
+ assert cache2.get("key").equals("value");
+ assert cache1.get("key") == null;
+
+ ReplListener r = attachReplicationListener(cache2);
+ r.expect(RemoveCommand.class);
+ cache1.remove("key");
+ r.waitForRPC();
+
+ assert cache1.get("key") == null;
+ assert cache2.get("key") == null;
+
+ cache1.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
+ cache1.put("key", "value");
+ cache2.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
+ cache2.put("key", "value");
+ assert cache1.get("key").equals("value");
+ assert cache2.get("key").equals("value");
+
+ r.expect(RemoveCommand.class);
+ cache1.remove("key");
+ r.waitForRPC();
+
+ assert cache1.get("key") == null;
+ assert cache2.get("key") == null;
+ }
+
+ public void testPutIfAbsent() {
+ cache2.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
+ cache2.put("key", "valueOld");
+ assert cache2.get("key").equals("valueOld");
+ assert cache1.get("key") == null;
+
+ BaseClusteredTest.ReplListener r = attachReplicationListener(cache2);
+ r.expect(PutKeyValueCommand.class);
+ cache1.putIfAbsent("key", "value");
+ r.waitForRPC();
+
+ assert cache1.get("key").equals("value");
+ assert cache2.get("key").equals("value");
+
+ cache2.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
+ cache2.put("key", "value2");
+
+ assert cache1.get("key").equals("value");
+ assert cache2.get("key").equals("value2");
+
+ cache1.putIfAbsent("key", "value3");
+
+ assert cache1.get("key").equals("value");
+ assert cache2.get("key").equals("value2"); // should not
invalidate cache2!!
+ }
+
+ public void testRemoveIfPresent() {
+ cache1.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
+ cache1.put("key", "value1");
+ cache2.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
+ cache2.put("key", "value2");
+ assert cache1.get("key").equals("value1");
+ assert cache2.get("key").equals("value2");
+
+ cache1.remove("key", "value");
+
+ assert cache1.get("key").equals("value1") : "Should not
remove";
+ assert cache2.get("key").equals("value2") : "Should not
remove";
+
+ BaseClusteredTest.ReplListener r = attachReplicationListener(cache2);
+ r.expect(RemoveCommand.class);
+ cache1.remove("key", "value1");
+ r.waitForRPC();
+
+ assert cache1.get("key") == null;
+ assert cache2.get("key") == null;
+ }
+
+ public void testClear() {
+ cache1.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
+ cache1.put("key", "value1");
+ cache2.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
+ cache2.put("key", "value2");
+ assert cache1.get("key").equals("value1");
+ assert cache2.get("key").equals("value2");
+
+ BaseClusteredTest.ReplListener r = attachReplicationListener(cache2);
+ r.expect(ClearCommand.class);
+ cache1.clear();
+ r.waitForRPC();
+
+ assert cache1.get("key") == null;
+ assert cache2.get("key") == null;
+ }
+
+ public void testReplace() {
+ cache2.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
+ cache2.put("key", "value2");
+ assert cache1.get("key") == null;
+ assert cache2.get("key").equals("value2");
+
+ cache1.replace("key", "value1"); // should do nothing since
there is nothing to replace on cache1
+
+ assert cache1.get("key") == null;
+ assert cache2.get("key").equals("value2");
+
+ cache1.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
+ cache1.put("key", "valueN");
+
+ BaseClusteredTest.ReplListener r = attachReplicationListener(cache2);
+ r.expect(ReplaceCommand.class);
+ cache1.replace("key", "value1");
+ r.waitForRPC();
+
+ assert cache1.get("key").equals("value1");
+ assert cache2.get("key").equals("value1");
+ }
+
+ public void testReplaceWithOldVal() {
+ cache2.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
+ cache2.put("key", "value2");
+ assert cache1.get("key") == null;
+ assert cache2.get("key").equals("value2");
+
+ cache1.replace("key", "valueOld", "value1"); //
should do nothing since there is nothing to replace on cache1
+
+ assert cache1.get("key") == null;
+ assert cache2.get("key").equals("value2");
+
+ cache1.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
+ cache1.put("key", "valueN");
+
+ cache1.replace("key", "valueOld", "value1"); //
should do nothing since there is nothing to replace on cache1
+
+ assert cache1.get("key").equals("valueN");
+ assert cache2.get("key").equals("value2");
+
+ BaseClusteredTest.ReplListener r = attachReplicationListener(cache2);
+ r.expect(ReplaceCommand.class);
+ cache1.replace("key", "valueN", "value1");
+ r.waitForRPC();
+
+ assert cache1.get("key").equals("value1");
+ assert cache2.get("key").equals("value1");
+ }
+}
Deleted: core/branches/flat/src/test/java/org/horizon/replication/ExceptionTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/replication/ExceptionTest.java 2009-01-29
18:06:23 UTC (rev 7613)
+++ core/branches/flat/src/test/java/org/horizon/replication/ExceptionTest.java 2009-01-30
14:10:03 UTC (rev 7614)
@@ -1,71 +0,0 @@
-package org.horizon.replication;
-
-import org.horizon.BaseClusteredTest;
-import org.horizon.Cache;
-import org.horizon.commands.VisitableCommand;
-import org.horizon.config.Configuration;
-import org.horizon.context.InvocationContext;
-import org.horizon.interceptors.base.CommandInterceptor;
-import org.horizon.lock.TimeoutException;
-import org.horizon.transaction.DummyTransactionManagerLookup;
-import org.horizon.util.TestingUtil;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import javax.transaction.TransactionManager;
-import java.util.List;
-
-/**
- * Tests the type of exceptions thrown for Lock Acquisition Timeouts versus Sync Repl
Timeouts
- *
- * @author <a href="mailto:manik@jboss.org">Manik Surtani</a>
- */
-@Test(groups = "functional", sequential = true)
-public class ExceptionTest extends BaseClusteredTest {
- private Cache cache1;
- private Cache cache2;
-
- @BeforeMethod
- public void setUp() {
- Configuration c = new Configuration();
- c.setSyncCommitPhase(true);
- c.setSyncRollbackPhase(true);
- c.setCacheMode(Configuration.CacheMode.REPL_SYNC);
- c.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
-
- List<Cache> caches = createClusteredCaches(2, "ExceptionTestCache",
c);
- cache1 = caches.get(0);
- cache2 = caches.get(1);
- }
-
- @Test(groups = "functional", expectedExceptions = {TimeoutException.class})
- public void testSyncReplTimeout() {
- cache2.addInterceptor(new CommandInterceptor() {
- @Override
- protected Object handleDefault(InvocationContext ctx, VisitableCommand cmd)
throws Throwable {
- // Add a delay
- Thread.sleep(100);
- return super.handleDefault(ctx, cmd);
- }
- }, 0);
-
- cache1.getConfiguration().setSyncReplTimeout(1);
- cache2.getConfiguration().setSyncReplTimeout(1);
- TestingUtil.blockUntilViewsReceived(10000, cache1, cache2);
-
- cache1.put("k", "v");
- }
-
- @Test(groups = "functional", expectedExceptions = {TimeoutException.class})
- public void testLockAcquisitionTimeout() throws Exception {
- cache2.getConfiguration().setLockAcquisitionTimeout(1);
- TestingUtil.blockUntilViewsReceived(10000, cache1, cache2);
-
- // get a lock on cache 2 and hold on to it.
- TransactionManager tm =
cache2.getConfiguration().getRuntimeConfig().getTransactionManager();
- tm.begin();
- cache2.put("block", "block");
- tm.suspend();
- cache1.put("block", "v");
- }
-}
Modified:
core/branches/flat/src/test/java/org/horizon/replication/ReplicationExceptionTest.java
===================================================================
---
core/branches/flat/src/test/java/org/horizon/replication/ReplicationExceptionTest.java 2009-01-29
18:06:23 UTC (rev 7613)
+++
core/branches/flat/src/test/java/org/horizon/replication/ReplicationExceptionTest.java 2009-01-30
14:10:03 UTC (rev 7614)
@@ -9,8 +9,12 @@
import org.horizon.BaseClusteredTest;
import org.horizon.Cache;
+import org.horizon.commands.VisitableCommand;
import org.horizon.config.Configuration;
+import org.horizon.context.InvocationContext;
+import org.horizon.interceptors.base.CommandInterceptor;
import org.horizon.lock.IsolationLevel;
+import org.horizon.lock.TimeoutException;
import org.horizon.transaction.DummyTransactionManagerLookup;
import org.horizon.util.TestingUtil;
import static org.testng.AssertJUnit.assertNotNull;
@@ -28,7 +32,7 @@
@Test(groups = "functional", sequential = true)
public class ReplicationExceptionTest extends BaseClusteredTest {
- private Cache<String, ContainerData> cache1, cache2;
+ private Cache cache1, cache2;
@BeforeMethod
public void setUp() {
@@ -89,6 +93,37 @@
}
}
+ @Test(groups = "functional", expectedExceptions = {TimeoutException.class})
+ public void testSyncReplTimeout() {
+ cache2.addInterceptor(new CommandInterceptor() {
+ @Override
+ protected Object handleDefault(InvocationContext ctx, VisitableCommand cmd)
throws Throwable {
+ // Add a delay
+ Thread.sleep(100);
+ return super.handleDefault(ctx, cmd);
+ }
+ }, 0);
+
+ cache1.getConfiguration().setSyncReplTimeout(1);
+ cache2.getConfiguration().setSyncReplTimeout(1);
+ TestingUtil.blockUntilViewsReceived(10000, cache1, cache2);
+
+ cache1.put("k", "v");
+ }
+
+ @Test(groups = "functional", expectedExceptions = {TimeoutException.class})
+ public void testLockAcquisitionTimeout() throws Exception {
+ cache2.getConfiguration().setLockAcquisitionTimeout(1);
+ TestingUtil.blockUntilViewsReceived(10000, cache1, cache2);
+
+ // get a lock on cache 2 and hold on to it.
+ TransactionManager tm =
cache2.getConfiguration().getRuntimeConfig().getTransactionManager();
+ tm.begin();
+ cache2.put("block", "block");
+ tm.suspend();
+ cache1.put("block", "v");
+ }
+
static class NonSerializabeData {
int i;
}
Added:
core/branches/flat/src/test/java/org/horizon/replication/SyncReplicatedAPITest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/replication/SyncReplicatedAPITest.java
(rev 0)
+++
core/branches/flat/src/test/java/org/horizon/replication/SyncReplicatedAPITest.java 2009-01-30
14:10:03 UTC (rev 7614)
@@ -0,0 +1,11 @@
+package org.horizon.replication;
+
+import org.testng.annotations.Test;
+
+@Test(groups = "functional", sequential = true)
+public class SyncReplicatedAPITest extends BaseReplicatedAPITest {
+ public SyncReplicatedAPITest() {
+ isSync = true;
+ }
+}
+