[infinispan-commits] Infinispan SVN: r1340 - in trunk/server/memcached/src: main/java/org/infinispan/server/memcached/commands and 2 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Mon Jan 4 12:46:05 EST 2010
Author: galder.zamarreno at jboss.com
Date: 2010-01-04 12:46:04 -0500 (Mon, 04 Jan 2010)
New Revision: 1340
Added:
trunk/server/memcached/src/test/java/org/infinispan/server/memcached/ClusterTest.java
Modified:
trunk/server/memcached/src/main/java/org/infinispan/server/memcached/MemcachedTextServer.java
trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/AddCommand.java
trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/AppendCommand.java
trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/CasCommand.java
trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/CommandFactory.java
trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/CommandType.java
trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/DecrementCommand.java
trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/DeleteCommand.java
trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/FlushAllCommand.java
trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/IncrementCommand.java
trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/NumericCommand.java
trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/PrependCommand.java
trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/ReplaceCommand.java
trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/SetCommand.java
trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/StorageCommand.java
trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/Value.java
trunk/server/memcached/src/test/java/org/infinispan/server/memcached/FunctionalTest.java
trunk/server/memcached/src/test/java/org/infinispan/server/memcached/StatsTest.java
trunk/server/memcached/src/test/java/org/infinispan/server/memcached/test/MemcachedTestingUtil.java
Log:
[ISPN-173] (Build memcached server module) Implemented noreply and added clustered test to verify that internals replicate correctly.
Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/MemcachedTextServer.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/MemcachedTextServer.java 2009-12-30 19:53:57 UTC (rev 1339)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/MemcachedTextServer.java 2010-01-04 17:46:04 UTC (rev 1340)
@@ -27,7 +27,6 @@
import java.util.concurrent.ScheduledExecutorService;
import org.infinispan.Cache;
-import org.infinispan.manager.CacheManager;
import org.infinispan.server.core.ServerBootstrap;
import org.infinispan.server.core.netty.NettyChannelPipelineFactory;
import org.infinispan.server.core.netty.NettyChannelUpstreamHandler;
@@ -44,13 +43,13 @@
* @since 4.0
*/
public class MemcachedTextServer {
- private final CacheManager manager;
+ private final Cache cache;
private final int port;
private final ScheduledExecutorService scheduler;
private ServerBootstrap bootstrap;
- public MemcachedTextServer(CacheManager manager, int port) {
- this.manager = manager;
+ public MemcachedTextServer(Cache cache, int port) {
+ this.cache = cache;
this.port = port;
this.scheduler = Executors.newScheduledThreadPool(1);
}
@@ -60,9 +59,6 @@
}
public void start() {
- // Configure Infinispan Cache instance
- Cache cache = manager.getCache();
-
InterceptorChain chain = TextProtocolInterceptorChainFactory.getInstance(cache).buildInterceptorChain();
NettyMemcachedDecoder decoder = new NettyMemcachedDecoder(cache, chain, scheduler);
TextCommandHandler commandHandler = new TextCommandHandler(cache, chain);
@@ -74,7 +70,7 @@
public void stop() {
bootstrap.stop();
- manager.stop();
+ cache.stop();
scheduler.shutdown();
}
}
Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/AddCommand.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/AddCommand.java 2009-12-30 19:53:57 UTC (rev 1339)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/AddCommand.java 2010-01-04 17:46:04 UTC (rev 1340)
@@ -37,8 +37,8 @@
*/
public class AddCommand extends SetCommand {
- AddCommand(Cache<String, Value> cache, CommandType type, StorageParameters params, byte[] data) {
- super(cache, type, params, data);
+ AddCommand(Cache<String, Value> cache, CommandType type, StorageParameters params, byte[] data, boolean noReply) {
+ super(cache, type, params, data, noReply);
}
@Override
Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/AppendCommand.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/AppendCommand.java 2009-12-30 19:53:57 UTC (rev 1339)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/AppendCommand.java 2010-01-04 17:46:04 UTC (rev 1340)
@@ -36,14 +36,10 @@
*/
public class AppendCommand extends SetCommand {
- AppendCommand(Cache<String, Value> cache, StorageParameters params, byte[] data) {
- super(cache, CommandType.APPEND, params, data);
+ AppendCommand(Cache<String, Value> cache, CommandType type, StorageParameters params, byte[] data, boolean noReply) {
+ super(cache, type, params, data, noReply);
}
- AppendCommand(Cache<String, Value> cache, CommandType type, StorageParameters params, byte[] data) {
- super(cache, type, params, data);
- }
-
@Override
public Object acceptVisitor(ChannelHandlerContext ctx, TextProtocolVisitor next) throws Throwable {
return next.visitAppend(ctx, this);
Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/CasCommand.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/CasCommand.java 2009-12-30 19:53:57 UTC (rev 1339)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/CasCommand.java 2010-01-04 17:46:04 UTC (rev 1340)
@@ -39,8 +39,8 @@
public class CasCommand extends SetCommand {
final long cas;
- CasCommand(Cache cache, StorageParameters params, long cas, byte[] data) {
- super(cache, CommandType.CAS, params, data);
+ CasCommand(Cache cache, CommandType type, StorageParameters params, long cas, byte[] data, boolean noReply) {
+ super(cache, type, params, data, noReply);
this.cas = cas;
}
@@ -51,7 +51,7 @@
@Override
public Command setData(byte[] data) throws IOException {
- return newCasCommand(cache, params, cas, data);
+ return newCasCommand(cache, params, cas, data, noReply);
}
@Override
@@ -72,7 +72,7 @@
return Reply.NOT_FOUND;
}
- public static CasCommand newCasCommand(Cache cache, StorageParameters params, long cas, byte[] data) {
- return new CasCommand(cache, params, cas, data);
+ public static CasCommand newCasCommand(Cache cache, StorageParameters params, long cas, byte[] data, boolean noReply) {
+ return new CasCommand(cache, CommandType.CAS, params, cas, data, noReply);
}
}
Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/CommandFactory.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/CommandFactory.java 2009-12-30 19:53:57 UTC (rev 1339)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/CommandFactory.java 2010-01-04 17:46:04 UTC (rev 1340)
@@ -24,6 +24,7 @@
import java.io.EOFException;
import java.io.IOException;
+import java.io.StreamCorruptedException;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Arrays;
@@ -45,6 +46,7 @@
*/
public class CommandFactory {
private static final Log log = LogFactory.getLog(CommandFactory.class);
+ private static final String NO_REPLY = "noreply";
private final Cache cache;
private final InterceptorChain chain;
@@ -73,11 +75,11 @@
case REPLACE:
case APPEND:
case PREPEND:
- return StorageCommand.newStorageCommand(cache, type, getStorageParameters(args), null);
+ return StorageCommand.newStorageCommand(cache, type, getStorageParameters(args), null, parseNoReply(5, args));
case CAS:
tmp = args[5]; // cas unique, 64-bit integer
long cas = Long.parseLong(tmp);
- return CasCommand.newCasCommand(cache, getStorageParameters(args), cas, null);
+ return CasCommand.newCasCommand(cache, getStorageParameters(args), cas, null, parseNoReply(6, args));
case GET:
case GETS:
List<String> keys = new ArrayList<String>(5);
@@ -85,19 +87,19 @@
return RetrievalCommand.newRetrievalCommand(cache, type, new RetrievalParameters(keys));
case DELETE:
String delKey = getKey(args[1]);
- return DeleteCommand.newDeleteCommand(cache, delKey);
+ return DeleteCommand.newDeleteCommand(cache, delKey, parseNoReply(2, args));
case INCR:
case DECR:
String key = getKey(args[1]);
// Value is defined as unsigned 64-integer (or simply unsigned long in java language)
// TODO: To simplify, could use long as long as the value was less than Long.MAX_VALUE
BigInteger value = new BigInteger(args[2]);
- return NumericCommand.newNumericCommand(cache, type, key, value);
+ return NumericCommand.newNumericCommand(cache, type, key, value, parseNoReply(3, args));
case STATS:
return StatsCommand.newStatsCommand(cache, type, chain);
case FLUSH_ALL:
long delay = args.length > 1 ? Long.parseLong(args[1]) : 0;
- return FlushAllCommand.newFlushAllCommand(cache, delay, scheduler);
+ return FlushAllCommand.newFlushAllCommand(cache, delay, scheduler, parseNoReply(2, args));
case VERSION:
return VersionCommand.newVersionCommand();
case QUIT:
@@ -131,4 +133,14 @@
return Integer.parseInt(bytes);
}
+ private boolean parseNoReply(int expectedIndex, String[] args) throws IOException {
+ if (args.length > expectedIndex) {
+ if (NO_REPLY.equals(args[expectedIndex])) {
+ return true;
+ } else {
+ throw new StreamCorruptedException("Unable to parse noreply optional argument");
+ }
+ }
+ return false;
+ }
}
Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/CommandType.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/CommandType.java 2009-12-30 19:53:57 UTC (rev 1339)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/CommandType.java 2010-01-04 17:46:04 UTC (rev 1340)
@@ -42,7 +42,7 @@
VERSION,
QUIT
;
-
+
public boolean isStorage() {
switch(this) {
case SET:
@@ -57,29 +57,12 @@
}
}
- @Override
- public String toString() {
- return super.toString().toLowerCase();
- }
-
static CommandType parseType(String type) throws IOException {
- if(type.equals(CommandType.SET.toString())) return SET;
- else if(type.equals(CommandType.ADD.toString())) return ADD;
- else if(type.equals(CommandType.REPLACE.toString())) return REPLACE;
- else if(type.equals(CommandType.APPEND.toString())) return APPEND;
- else if(type.equals(CommandType.PREPEND.toString())) return PREPEND;
- else if(type.equals(CommandType.CAS.toString())) return CAS;
- else if(type.equals(CommandType.GET.toString())) return GET;
- else if(type.equals(CommandType.GETS.toString())) return GETS;
- else if(type.equals(CommandType.DELETE.toString())) return DELETE;
- else if(type.equals(CommandType.INCR.toString())) return INCR;
- else if(type.equals(CommandType.DECR.toString())) return DECR;
- else if(type.equals(CommandType.STATS.toString())) return STATS;
- else if(type.equals(CommandType.FLUSH_ALL.toString())) return FLUSH_ALL;
- else if(type.equals(CommandType.VERSION.toString())) return VERSION;
- else if(type.equals(CommandType.QUIT.toString())) return QUIT;
- else throw new UnknownCommandException("request \"" + type + "\" not known");
+ try {
+ return valueOf(type.toUpperCase());
+ } catch(IllegalStateException e) {
+ throw new UnknownCommandException("request \"" + type + "\" not known");
+ }
}
-
}
Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/DecrementCommand.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/DecrementCommand.java 2009-12-30 19:53:57 UTC (rev 1339)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/DecrementCommand.java 2010-01-04 17:46:04 UTC (rev 1340)
@@ -39,8 +39,8 @@
public class DecrementCommand extends NumericCommand {
private static final Log log = LogFactory.getLog(DecrementCommand.class);
- public DecrementCommand(Cache cache, CommandType type, String key, BigInteger value) {
- super(cache, type, key, value);
+ public DecrementCommand(Cache cache, CommandType type, String key, BigInteger value, boolean noReply) {
+ super(cache, type, key, value, noReply);
}
@Override
Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/DeleteCommand.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/DeleteCommand.java 2009-12-30 19:53:57 UTC (rev 1339)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/DeleteCommand.java 2010-01-04 17:46:04 UTC (rev 1340)
@@ -41,10 +41,12 @@
final Cache cache;
final String key;
+ final boolean noReply;
- DeleteCommand(Cache cache, String key, long time) {
+ DeleteCommand(Cache cache, String key, long time, boolean noReply) {
this.cache = cache;
this.key = key;
+ this.noReply = noReply;
}
@Override
@@ -63,8 +65,10 @@
Reply reply;
Object prev = cache.remove(key);
reply = reply(prev);
- ChannelBuffers buffers = ctx.getChannelBuffers();
- ch.write(buffers.wrappedBuffer(buffers.wrappedBuffer(reply.bytes()), buffers.wrappedBuffer(CRLF)));
+ if (!noReply) {
+ ChannelBuffers buffers = ctx.getChannelBuffers();
+ ch.write(buffers.wrappedBuffer(buffers.wrappedBuffer(reply.bytes()), buffers.wrappedBuffer(CRLF)));
+ }
return reply;
}
@@ -75,7 +79,7 @@
return Reply.DELETED;
}
- public static DeleteCommand newDeleteCommand(Cache cache, String key) {
- return new DeleteCommand(cache, key, 0);
+ public static DeleteCommand newDeleteCommand(Cache cache, String key, boolean noReply) {
+ return new DeleteCommand(cache, key, 0, noReply);
}
}
Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/FlushAllCommand.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/FlushAllCommand.java 2009-12-30 19:53:57 UTC (rev 1339)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/FlushAllCommand.java 2010-01-04 17:46:04 UTC (rev 1340)
@@ -45,11 +45,13 @@
final Cache cache;
final long delay;
final ScheduledExecutorService scheduler;
+ final boolean noReply;
- FlushAllCommand(Cache cache, long delay, ScheduledExecutorService scheduler) {
+ FlushAllCommand(Cache cache, long delay, ScheduledExecutorService scheduler, boolean noReply) {
this.cache = cache;
this.delay = delay;
this.scheduler = scheduler;
+ this.noReply = noReply;
}
@Override
@@ -70,13 +72,15 @@
} else {
scheduler.schedule(new FlushAllDelayed(cache), delay, TimeUnit.SECONDS);
}
- ChannelBuffers buffers = ctx.getChannelBuffers();
- ch.write(buffers.wrappedBuffer(buffers.wrappedBuffer(OK.bytes()), buffers.wrappedBuffer(CRLF)));
+ if (!noReply) {
+ ChannelBuffers buffers = ctx.getChannelBuffers();
+ ch.write(buffers.wrappedBuffer(buffers.wrappedBuffer(OK.bytes()), buffers.wrappedBuffer(CRLF)));
+ }
return OK;
}
- public static FlushAllCommand newFlushAllCommand(Cache cache, long delay, ScheduledExecutorService scheduler) {
- return new FlushAllCommand(cache, delay, scheduler);
+ public static FlushAllCommand newFlushAllCommand(Cache cache, long delay, ScheduledExecutorService scheduler, boolean noReply) {
+ return new FlushAllCommand(cache, delay, scheduler, noReply);
}
private static class FlushAllDelayed implements Runnable {
Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/IncrementCommand.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/IncrementCommand.java 2009-12-30 19:53:57 UTC (rev 1339)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/IncrementCommand.java 2010-01-04 17:46:04 UTC (rev 1340)
@@ -39,8 +39,8 @@
public class IncrementCommand extends NumericCommand {
private static final Log log = LogFactory.getLog(IncrementCommand.class);
- public IncrementCommand(Cache cache, CommandType type, String key, BigInteger value) {
- super(cache, type, key, value);
+ public IncrementCommand(Cache cache, CommandType type, String key, BigInteger value, boolean noReply) {
+ super(cache, type, key, value, noReply);
}
@Override
Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/NumericCommand.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/NumericCommand.java 2009-12-30 19:53:57 UTC (rev 1339)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/NumericCommand.java 2010-01-04 17:46:04 UTC (rev 1340)
@@ -49,12 +49,14 @@
private final CommandType type;
final String key;
final BigInteger value;
+ final boolean noReply;
- public NumericCommand(Cache cache, CommandType type, String key, BigInteger value) {
+ public NumericCommand(Cache cache, CommandType type, String key, BigInteger value, boolean noReply) {
this.cache = cache;
this.type = type;
this.key = key;
this.value = value;
+ this.noReply = noReply;
}
public CommandType getType() {
@@ -72,24 +74,26 @@
byte[] newData = newBigInt.toByteArray();
Value curr = new Value(old.getFlags(), newData, old.getCas() + 1);
boolean replaced = cache.replace(key, old, curr);
- if (replaced) {
+ if (replaced && !noReply) {
ch.write(buffers.wrappedBuffer(buffers.wrappedBuffer(newBigInt.toString().getBytes()), buffers.wrappedBuffer(CRLF)));
- } else {
+ } else if (!replaced) {
throw new CacheException("Value modified since we retrieved from the cache, old value was " + oldBigInt);
}
return curr;
} else {
- ch.write(buffers.wrappedBuffer(buffers.wrappedBuffer(Reply.NOT_FOUND.bytes()), buffers.wrappedBuffer(CRLF)));
+ if (!noReply) {
+ ch.write(buffers.wrappedBuffer(buffers.wrappedBuffer(Reply.NOT_FOUND.bytes()), buffers.wrappedBuffer(CRLF)));
+ }
return Reply.NOT_FOUND;
}
}
protected abstract BigInteger operate(BigInteger oldValue, BigInteger newValue);
- public static TextCommand newNumericCommand(Cache cache, CommandType type, String key, BigInteger value) throws IOException {
+ public static TextCommand newNumericCommand(Cache cache, CommandType type, String key, BigInteger value, boolean noReply) throws IOException {
switch(type) {
- case INCR: return new IncrementCommand(cache, type, key, value);
- case DECR: return new DecrementCommand(cache, type, key, value);
+ case INCR: return new IncrementCommand(cache, type, key, value, noReply);
+ case DECR: return new DecrementCommand(cache, type, key, value, noReply);
default: throw new StreamCorruptedException("Unable to build storage command for type: " + type);
}
}
Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/PrependCommand.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/PrependCommand.java 2009-12-30 19:53:57 UTC (rev 1339)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/PrependCommand.java 2010-01-04 17:46:04 UTC (rev 1340)
@@ -35,8 +35,8 @@
*/
public class PrependCommand extends AppendCommand {
- PrependCommand(Cache cache, CommandType type, StorageParameters params, byte[] data) {
- super(cache, type, params, data);
+ PrependCommand(Cache cache, CommandType type, StorageParameters params, byte[] data, boolean noReply) {
+ super(cache, type, params, data, noReply);
}
@Override
Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/ReplaceCommand.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/ReplaceCommand.java 2009-12-30 19:53:57 UTC (rev 1339)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/ReplaceCommand.java 2010-01-04 17:46:04 UTC (rev 1340)
@@ -37,8 +37,8 @@
*/
public class ReplaceCommand extends SetCommand {
- ReplaceCommand(Cache<String, Value> cache, CommandType type, StorageParameters params, byte[] data) {
- super(cache, type, params, data);
+ ReplaceCommand(Cache<String, Value> cache, CommandType type, StorageParameters params, byte[] data, boolean noReply) {
+ super(cache, type, params, data, noReply);
}
@Override
Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/SetCommand.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/SetCommand.java 2009-12-30 19:53:57 UTC (rev 1339)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/SetCommand.java 2010-01-04 17:46:04 UTC (rev 1340)
@@ -47,8 +47,8 @@
public class SetCommand extends StorageCommand {
private static final Log log = LogFactory.getLog(SetCommand.class);
- SetCommand(Cache<String, Value> cache, CommandType type, StorageParameters params, byte[] data) {
- super(cache, type, params, data);
+ SetCommand(Cache<String, Value> cache, CommandType type, StorageParameters params, byte[] data, boolean noReply) {
+ super(cache, type, params, data, noReply);
}
@Override
@@ -88,8 +88,10 @@
log.error("Unexpected exception performing command", e);
reply = Reply.NOT_STORED;
}
- ChannelBuffers buffers = ctx.getChannelBuffers();
- ch.write(buffers.wrappedBuffer(buffers.wrappedBuffer(reply.bytes()), buffers.wrappedBuffer(CRLF)));
+ if (!noReply) {
+ ChannelBuffers buffers = ctx.getChannelBuffers();
+ ch.write(buffers.wrappedBuffer(buffers.wrappedBuffer(reply.bytes()), buffers.wrappedBuffer(CRLF)));
+ }
return reply;
}
Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/StorageCommand.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/StorageCommand.java 2009-12-30 19:53:57 UTC (rev 1339)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/StorageCommand.java 2010-01-04 17:46:04 UTC (rev 1340)
@@ -39,12 +39,14 @@
final Cache<String, Value> cache;
final StorageParameters params;
final byte[] data;
+ final boolean noReply;
- StorageCommand(Cache<String, Value> cache, CommandType type, StorageParameters params, byte[] data) {
+ StorageCommand(Cache<String, Value> cache, CommandType type, StorageParameters params, byte[] data, boolean noReply) {
this.type = type;
this.params = params;
this.cache = cache;
this.data = data;
+ this.noReply = noReply;
}
public CommandType getType() {
@@ -52,20 +54,20 @@
}
public Command setData(byte[] data) throws IOException {
- return newStorageCommand(cache, type, params, data);
+ return newStorageCommand(cache, type, params, data, noReply);
}
public StorageParameters getParams() {
return params;
}
- public static TextCommand newStorageCommand(Cache<String, Value> cache, CommandType type, StorageParameters params, byte[] data) throws IOException {
+ public static TextCommand newStorageCommand(Cache<String, Value> cache, CommandType type, StorageParameters params, byte[] data, boolean noReply) throws IOException {
switch(type) {
- case SET: return new SetCommand(cache, type, params, data);
- case ADD: return new AddCommand(cache, type, params, data);
- case REPLACE: return new ReplaceCommand(cache, type, params, data);
- case APPEND: return new AppendCommand(cache, type, params, data);
- case PREPEND: return new PrependCommand(cache, type, params, data);
+ case SET: return new SetCommand(cache, type, params, data, noReply);
+ case ADD: return new AddCommand(cache, type, params, data, noReply);
+ case REPLACE: return new ReplaceCommand(cache, type, params, data, noReply);
+ case APPEND: return new AppendCommand(cache, type, params, data, noReply);
+ case PREPEND: return new PrependCommand(cache, type, params, data, noReply);
default: throw new StreamCorruptedException("Unable to build storage command for type: " + type);
}
}
Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/Value.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/Value.java 2009-12-30 19:53:57 UTC (rev 1339)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/Value.java 2010-01-04 17:46:04 UTC (rev 1340)
@@ -81,7 +81,7 @@
@Override
public String toString() {
return getClass().getSimpleName() + "{" +
- "data=" + Arrays.toString(data) +
+ "data=" + data +
", flags=" + flags +
", cas=" + cas +
"}";
Added: trunk/server/memcached/src/test/java/org/infinispan/server/memcached/ClusterTest.java
===================================================================
--- trunk/server/memcached/src/test/java/org/infinispan/server/memcached/ClusterTest.java (rev 0)
+++ trunk/server/memcached/src/test/java/org/infinispan/server/memcached/ClusterTest.java 2010-01-04 17:46:04 UTC (rev 1340)
@@ -0,0 +1,175 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat, Inc. and/or its affiliates, and
+ * individual contributors as indicated by the @author tags. See the
+ * copyright.txt file in the distribution for a full listing of
+ * individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.server.memcached;
+
+import static org.infinispan.server.memcached.test.MemcachedTestingUtil.createMemcachedClient;
+import static org.infinispan.server.memcached.test.MemcachedTestingUtil.k;
+import static org.infinispan.server.memcached.test.MemcachedTestingUtil.v;
+
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import net.spy.memcached.CASResponse;
+import net.spy.memcached.CASValue;
+import net.spy.memcached.MemcachedClient;
+
+import org.infinispan.config.Configuration;
+import org.infinispan.server.memcached.test.MemcachedTestingUtil;
+import org.infinispan.test.MultipleCacheManagersTest;
+import org.testng.annotations.Test;
+
+/**
+ * ClusterTest.
+ *
+ * @author Galder Zamarreño
+ * @since 4.0
+ */
+ at Test(groups = "functional", testName = "server.memcached.ClusterTest")
+public class ClusterTest extends MultipleCacheManagersTest {
+ MemcachedClient client1;
+ MemcachedClient client2;
+ MemcachedTextServer server1;
+ MemcachedTextServer server2;
+
+ @Override
+ protected void createCacheManagers() throws Throwable {
+ Configuration replSync = getDefaultClusteredConfig(Configuration.CacheMode.REPL_SYNC);
+ createClusteredCaches(2, "replSync", replSync);
+ server1 = MemcachedTestingUtil.createMemcachedTextServer(cache(0, "replSync"));
+ server1.start();
+ server2 = MemcachedTestingUtil.createMemcachedTextServer(cache(1, "replSync"), server1.getPort() + 50);
+ server2.start();
+ client1 = createMemcachedClient(60000, server1.getPort());
+ client2 = createMemcachedClient(60000, server2.getPort());
+ }
+
+ public void testReplicatedSet(Method m) throws Exception {
+ Future<Boolean> f = client1.set(k(m), 0, v(m));
+ assert f.get(120, TimeUnit.SECONDS);
+ assert v(m).equals(client2.get(k(m)));
+ }
+
+ public void testReplicatedGetMultipleKeys(Method m) throws Exception {
+ Future<Boolean> f1 = client1.set(k(m, "k1-"), 0, v(m, "v1-"));
+ Future<Boolean> f2 = client1.set(k(m, "k2-"), 0, v(m, "v2-"));
+ Future<Boolean> f3 = client1.set(k(m, "k3-"), 0, v(m, "v3-"));
+ assert f1.get(5, TimeUnit.SECONDS);
+ assert f2.get(5, TimeUnit.SECONDS);
+ assert f3.get(5, TimeUnit.SECONDS);
+
+ Map<String, Object> ret = client2.getBulk(Arrays.asList(new String[]{k(m, "k1-"), k(m, "k2-"), k(m, "k3-")}));
+ assert ret.get(k(m, "k1-")).equals(v(m, "v1-"));
+ assert ret.get(k(m, "k2-")).equals(v(m, "v2-"));
+ assert ret.get(k(m, "k3-")).equals(v(m, "v3-"));
+ }
+
+ public void testReplicatedAdd(Method m) throws Exception {
+ Future<Boolean> f = client1.add(k(m), 0, v(m));
+ assert f.get(5, TimeUnit.SECONDS);
+ assert v(m).equals(client2.get(k(m)));
+ }
+
+ public void testReplicatedReplace(Method m) throws Exception {
+ Future<Boolean> f = client1.add(k(m), 0, v(m));
+ assert(f.get(5, TimeUnit.SECONDS));
+ assert v(m).equals(client2.get(k(m)));
+
+ f = client2.replace(k(m), 0, v(m, "k1-"));
+ assert f.get(5, TimeUnit.SECONDS);
+ assert v(m, "k1-").equals(client1.get(k(m)));
+ }
+
+ public void testReplicatedAppend(Method m) throws Exception {
+ Future<Boolean> f = client1.add(k(m), 0, v(m));
+ assert f.get(5, TimeUnit.SECONDS);
+ assert v(m).equals(client2.get(k(m)));
+
+ f = client2.append(0, k(m), v(m, "v1-"));
+ assert f.get(5, TimeUnit.SECONDS);
+ String expected = v(m).toString() + v(m, "v1-").toString();
+ assert expected.equals(client1.get(k(m)));
+ }
+
+ public void testReplicatedPrepend(Method m) throws Exception {
+ Future<Boolean> f = client1.add(k(m), 0, v(m));
+ assert f.get(5, TimeUnit.SECONDS);
+ assert v(m).equals(client2.get(k(m)));
+
+ f = client2.prepend(0, k(m), v(m, "v1-"));
+ assert f.get(5, TimeUnit.SECONDS);
+ String expected = v(m, "v1-").toString() + v(m).toString();
+ assert expected.equals(client1.get(k(m)));
+ }
+
+ public void testReplicatedGets(Method m) throws Exception {
+ Future<Boolean> f = client1.set(k(m), 0, v(m));
+ assert f.get(5, TimeUnit.SECONDS);
+ CASValue<Object> value = client2.gets(k(m));
+ assert v(m).equals(value.getValue());
+ assert value.getCas() != 0;
+ }
+
+ public void testReplicatedCasExists(Method m) throws Exception {
+ Future<Boolean> f = client1.set(k(m), 0, v(m));
+ assert f.get(5, TimeUnit.SECONDS);
+ CASValue<Object> value = client2.gets(k(m));
+ assert v(m).equals(value.getValue());
+ assert value.getCas() != 0;
+
+ long old = value.getCas();
+ CASResponse resp = client2.cas(k(m), value.getCas(), v(m, "v1-"));
+ value = client1.gets(k(m));
+ assert v(m, "v1-").equals(value.getValue());
+ assert value.getCas() != 0;
+ assert value.getCas() != old;
+
+ resp = client1.cas(k(m), old, v(m, "v2-"));
+ assert CASResponse.EXISTS == resp;
+
+ resp = client2.cas(k(m), value.getCas(), v(m, "v2-"));
+ assert CASResponse.OK == resp;
+ }
+
+ public void testReplicatedDelete(Method m) throws Exception {
+ Future<Boolean> f = client1.set(k(m), 0, v(m));
+ assert f.get(5, TimeUnit.SECONDS);
+ f = client2.delete(k(m));
+ assert f.get(5, TimeUnit.SECONDS);
+ }
+
+ public void testReplicatedIncrement(Method m) throws Exception {
+ Future<Boolean> f = client1.set(k(m), 0, 1);
+ assert f.get(5, TimeUnit.SECONDS);
+ assert 2 == client2.incr(k(m), 1);
+ }
+
+ public void testReplicatedDecrement(Method m) throws Exception {
+ Future<Boolean> f = client1.set(k(m), 0, 1);
+ assert f.get(5, TimeUnit.SECONDS);
+ assert 0 == client2.decr(k(m), 1);
+ }
+
+}
Modified: trunk/server/memcached/src/test/java/org/infinispan/server/memcached/FunctionalTest.java
===================================================================
--- trunk/server/memcached/src/test/java/org/infinispan/server/memcached/FunctionalTest.java 2009-12-30 19:53:57 UTC (rev 1339)
+++ trunk/server/memcached/src/test/java/org/infinispan/server/memcached/FunctionalTest.java 2010-01-04 17:46:04 UTC (rev 1340)
@@ -58,7 +58,7 @@
@Override
protected CacheManager createCacheManager() throws Exception {
cacheManager = TestCacheManagerFactory.createLocalCacheManager();
- server = MemcachedTestingUtil.createMemcachedTextServer(cacheManager);
+ server = MemcachedTestingUtil.createMemcachedTextServer(cacheManager.getCache());
server.start();
client = createMemcachedClient(60000, server.getPort());
return cacheManager;
@@ -68,7 +68,7 @@
protected void destroyAfterClass() {
server.stop();
}
-
+
public void testSetBasic(Method m) throws Exception {
Future<Boolean> f = client.set(k(m), 0, v(m));
assert f.get(5, TimeUnit.SECONDS);
Modified: trunk/server/memcached/src/test/java/org/infinispan/server/memcached/StatsTest.java
===================================================================
--- trunk/server/memcached/src/test/java/org/infinispan/server/memcached/StatsTest.java 2009-12-30 19:53:57 UTC (rev 1339)
+++ trunk/server/memcached/src/test/java/org/infinispan/server/memcached/StatsTest.java 2010-01-04 17:46:04 UTC (rev 1340)
@@ -57,7 +57,7 @@
@Override
protected CacheManager createCacheManager() throws Exception {
cacheManager = TestCacheManagerFactory.createJmxEnabledCacheManager(JMX_DOMAIN);
- server = MemcachedTestingUtil.createMemcachedTextServer(cacheManager);
+ server = MemcachedTestingUtil.createMemcachedTextServer(cacheManager.getCache());
server.start();
client = createMemcachedClient(60000, server.getPort());
return cacheManager;
Modified: trunk/server/memcached/src/test/java/org/infinispan/server/memcached/test/MemcachedTestingUtil.java
===================================================================
--- trunk/server/memcached/src/test/java/org/infinispan/server/memcached/test/MemcachedTestingUtil.java 2009-12-30 19:53:57 UTC (rev 1339)
+++ trunk/server/memcached/src/test/java/org/infinispan/server/memcached/test/MemcachedTestingUtil.java 2010-01-04 17:46:04 UTC (rev 1340)
@@ -28,7 +28,7 @@
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;
-import org.infinispan.manager.CacheManager;
+import org.infinispan.Cache;
import org.infinispan.server.memcached.MemcachedTextServer;
import net.spy.memcached.DefaultConnectionFactory;
@@ -77,7 +77,11 @@
return new MemcachedClient(d, Arrays.asList(new InetSocketAddress[]{new InetSocketAddress(port)}));
}
- public static MemcachedTextServer createMemcachedTextServer(CacheManager cacheManager) {
- return new MemcachedTextServer(cacheManager, threadMemcachedPort.get());
+ public static MemcachedTextServer createMemcachedTextServer(Cache cache) {
+ return new MemcachedTextServer(cache, threadMemcachedPort.get());
}
+
+ public static MemcachedTextServer createMemcachedTextServer(Cache cache, int port) {
+ return new MemcachedTextServer(cache, port);
+ }
}
More information about the infinispan-commits
mailing list