[infinispan-commits] Infinispan SVN: r1340 - in trunk/server/memcached/src: main/java/org/infinispan/server/memcached/commands and 2 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Mon Jan 4 12:46:05 EST 2010


Author: galder.zamarreno at jboss.com
Date: 2010-01-04 12:46:04 -0500 (Mon, 04 Jan 2010)
New Revision: 1340

Added:
   trunk/server/memcached/src/test/java/org/infinispan/server/memcached/ClusterTest.java
Modified:
   trunk/server/memcached/src/main/java/org/infinispan/server/memcached/MemcachedTextServer.java
   trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/AddCommand.java
   trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/AppendCommand.java
   trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/CasCommand.java
   trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/CommandFactory.java
   trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/CommandType.java
   trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/DecrementCommand.java
   trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/DeleteCommand.java
   trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/FlushAllCommand.java
   trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/IncrementCommand.java
   trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/NumericCommand.java
   trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/PrependCommand.java
   trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/ReplaceCommand.java
   trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/SetCommand.java
   trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/StorageCommand.java
   trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/Value.java
   trunk/server/memcached/src/test/java/org/infinispan/server/memcached/FunctionalTest.java
   trunk/server/memcached/src/test/java/org/infinispan/server/memcached/StatsTest.java
   trunk/server/memcached/src/test/java/org/infinispan/server/memcached/test/MemcachedTestingUtil.java
Log:
[ISPN-173] (Build memcached server module) Implemented noreply and added clustered test to verify that internals replicate correctly.

Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/MemcachedTextServer.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/MemcachedTextServer.java	2009-12-30 19:53:57 UTC (rev 1339)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/MemcachedTextServer.java	2010-01-04 17:46:04 UTC (rev 1340)
@@ -27,7 +27,6 @@
 import java.util.concurrent.ScheduledExecutorService;
 
 import org.infinispan.Cache;
-import org.infinispan.manager.CacheManager;
 import org.infinispan.server.core.ServerBootstrap;
 import org.infinispan.server.core.netty.NettyChannelPipelineFactory;
 import org.infinispan.server.core.netty.NettyChannelUpstreamHandler;
@@ -44,13 +43,13 @@
  * @since 4.0
  */
 public class MemcachedTextServer {
-   private final CacheManager manager;
+   private final Cache cache;
    private final int port;
    private final ScheduledExecutorService scheduler;
    private ServerBootstrap bootstrap;
    
-   public MemcachedTextServer(CacheManager manager, int port) {
-      this.manager = manager;
+   public MemcachedTextServer(Cache cache, int port) {
+      this.cache = cache;
       this.port = port;
       this.scheduler = Executors.newScheduledThreadPool(1);
    }
@@ -60,9 +59,6 @@
    }
 
    public void start() {
-      // Configure Infinispan Cache instance
-      Cache cache = manager.getCache();
-
       InterceptorChain chain = TextProtocolInterceptorChainFactory.getInstance(cache).buildInterceptorChain();
       NettyMemcachedDecoder decoder = new NettyMemcachedDecoder(cache, chain, scheduler);
       TextCommandHandler commandHandler = new TextCommandHandler(cache, chain);
@@ -74,7 +70,7 @@
 
    public void stop() {
       bootstrap.stop();
-      manager.stop();
+      cache.stop();
       scheduler.shutdown();
    }
 }

Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/AddCommand.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/AddCommand.java	2009-12-30 19:53:57 UTC (rev 1339)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/AddCommand.java	2010-01-04 17:46:04 UTC (rev 1340)
@@ -37,8 +37,8 @@
  */
 public class AddCommand extends SetCommand {
 
-   AddCommand(Cache<String, Value> cache, CommandType type, StorageParameters params, byte[] data) {
-      super(cache, type, params, data);
+   AddCommand(Cache<String, Value> cache, CommandType type, StorageParameters params, byte[] data, boolean noReply) {
+      super(cache, type, params, data, noReply);
    }
 
    @Override

Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/AppendCommand.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/AppendCommand.java	2009-12-30 19:53:57 UTC (rev 1339)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/AppendCommand.java	2010-01-04 17:46:04 UTC (rev 1340)
@@ -36,14 +36,10 @@
  */
 public class AppendCommand extends SetCommand {
 
-   AppendCommand(Cache<String, Value> cache, StorageParameters params, byte[] data) {
-      super(cache, CommandType.APPEND, params, data);
+   AppendCommand(Cache<String, Value> cache, CommandType type, StorageParameters params, byte[] data, boolean noReply) {
+      super(cache, type, params, data, noReply);
    }
 
-   AppendCommand(Cache<String, Value> cache, CommandType type, StorageParameters params, byte[] data) {
-      super(cache, type, params, data);
-   }
-
    @Override
    public Object acceptVisitor(ChannelHandlerContext ctx, TextProtocolVisitor next) throws Throwable {
       return next.visitAppend(ctx, this);

Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/CasCommand.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/CasCommand.java	2009-12-30 19:53:57 UTC (rev 1339)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/CasCommand.java	2010-01-04 17:46:04 UTC (rev 1340)
@@ -39,8 +39,8 @@
 public class CasCommand extends SetCommand {
    final long cas;
 
-   CasCommand(Cache cache, StorageParameters params, long cas, byte[] data) {
-      super(cache, CommandType.CAS, params, data);
+   CasCommand(Cache cache, CommandType type, StorageParameters params, long cas, byte[] data, boolean noReply) {
+      super(cache, type, params, data, noReply);
       this.cas = cas;
    }
 
@@ -51,7 +51,7 @@
 
    @Override
    public Command setData(byte[] data) throws IOException {
-      return newCasCommand(cache, params, cas, data);
+      return newCasCommand(cache, params, cas, data, noReply);
    }
 
    @Override
@@ -72,7 +72,7 @@
       return Reply.NOT_FOUND;
    }
 
-   public static CasCommand newCasCommand(Cache cache, StorageParameters params, long cas, byte[] data) {
-      return new CasCommand(cache, params, cas, data);
+   public static CasCommand newCasCommand(Cache cache, StorageParameters params, long cas, byte[] data, boolean noReply) {
+      return new CasCommand(cache, CommandType.CAS, params, cas, data, noReply);
    }
 }

Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/CommandFactory.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/CommandFactory.java	2009-12-30 19:53:57 UTC (rev 1339)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/CommandFactory.java	2010-01-04 17:46:04 UTC (rev 1340)
@@ -24,6 +24,7 @@
 
 import java.io.EOFException;
 import java.io.IOException;
+import java.io.StreamCorruptedException;
 import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -45,6 +46,7 @@
  */
 public class CommandFactory {
    private static final Log log = LogFactory.getLog(CommandFactory.class);
+   private static final String NO_REPLY = "noreply";
 
    private final Cache cache;
    private final InterceptorChain chain;
@@ -73,11 +75,11 @@
          case REPLACE:
          case APPEND:
          case PREPEND:
-            return StorageCommand.newStorageCommand(cache, type, getStorageParameters(args), null);
+            return StorageCommand.newStorageCommand(cache, type, getStorageParameters(args), null, parseNoReply(5, args));
          case CAS:
             tmp = args[5]; // cas unique, 64-bit integer
             long cas = Long.parseLong(tmp);
-            return CasCommand.newCasCommand(cache, getStorageParameters(args), cas, null);
+            return CasCommand.newCasCommand(cache, getStorageParameters(args), cas, null, parseNoReply(6, args));
          case GET:
          case GETS:
             List<String> keys = new ArrayList<String>(5);
@@ -85,19 +87,19 @@
             return RetrievalCommand.newRetrievalCommand(cache, type, new RetrievalParameters(keys));
          case DELETE:
             String delKey = getKey(args[1]);
-            return DeleteCommand.newDeleteCommand(cache, delKey);
+            return DeleteCommand.newDeleteCommand(cache, delKey, parseNoReply(2, args));
          case INCR:
          case DECR:
             String key = getKey(args[1]);
             // Value is defined as unsigned 64-integer (or simply unsigned long in java language)
             // TODO: To simplify, could use long as long as the value was less than Long.MAX_VALUE
             BigInteger value = new BigInteger(args[2]);
-            return NumericCommand.newNumericCommand(cache, type, key, value);
+            return NumericCommand.newNumericCommand(cache, type, key, value, parseNoReply(3, args));
          case STATS:
             return StatsCommand.newStatsCommand(cache, type, chain);
          case FLUSH_ALL:
             long delay = args.length > 1 ? Long.parseLong(args[1]) : 0;
-            return FlushAllCommand.newFlushAllCommand(cache, delay, scheduler);
+            return FlushAllCommand.newFlushAllCommand(cache, delay, scheduler, parseNoReply(2, args));
          case VERSION:
             return VersionCommand.newVersionCommand();
          case QUIT:
@@ -131,4 +133,14 @@
       return Integer.parseInt(bytes);
    }
 
+   private boolean parseNoReply(int expectedIndex, String[] args) throws IOException {
+      if (args.length > expectedIndex) {
+         if (NO_REPLY.equals(args[expectedIndex])) {
+            return true;
+         } else {
+            throw new StreamCorruptedException("Unable to parse noreply optional argument");
+         }
+      }
+      return false;
+   }
 }

Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/CommandType.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/CommandType.java	2009-12-30 19:53:57 UTC (rev 1339)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/CommandType.java	2010-01-04 17:46:04 UTC (rev 1340)
@@ -42,7 +42,7 @@
    VERSION,
    QUIT
    ;
-   
+
    public boolean isStorage() {
       switch(this) {
          case SET:
@@ -57,29 +57,12 @@
       }
    }
 
-   @Override
-   public String toString() {
-      return super.toString().toLowerCase();
-   }
-
    static CommandType parseType(String type) throws IOException {
-     if(type.equals(CommandType.SET.toString())) return SET;
-     else if(type.equals(CommandType.ADD.toString())) return ADD;
-     else if(type.equals(CommandType.REPLACE.toString())) return REPLACE;
-     else if(type.equals(CommandType.APPEND.toString())) return APPEND;
-     else if(type.equals(CommandType.PREPEND.toString())) return PREPEND;
-     else if(type.equals(CommandType.CAS.toString())) return CAS;
-     else if(type.equals(CommandType.GET.toString())) return GET;
-     else if(type.equals(CommandType.GETS.toString())) return GETS;
-     else if(type.equals(CommandType.DELETE.toString())) return DELETE;
-     else if(type.equals(CommandType.INCR.toString())) return INCR;
-     else if(type.equals(CommandType.DECR.toString())) return DECR;
-     else if(type.equals(CommandType.STATS.toString())) return STATS;
-     else if(type.equals(CommandType.FLUSH_ALL.toString())) return FLUSH_ALL;
-     else if(type.equals(CommandType.VERSION.toString())) return VERSION;
-     else if(type.equals(CommandType.QUIT.toString())) return QUIT;
-     else throw new UnknownCommandException("request \"" + type + "\" not known");
+      try {
+         return valueOf(type.toUpperCase());
+      } catch(IllegalStateException e) {
+         throw new UnknownCommandException("request \"" + type + "\" not known");
+      }
    }
 
-
 }

Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/DecrementCommand.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/DecrementCommand.java	2009-12-30 19:53:57 UTC (rev 1339)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/DecrementCommand.java	2010-01-04 17:46:04 UTC (rev 1340)
@@ -39,8 +39,8 @@
 public class DecrementCommand extends NumericCommand {
    private static final Log log = LogFactory.getLog(DecrementCommand.class);
 
-   public DecrementCommand(Cache cache, CommandType type, String key, BigInteger value) {
-      super(cache, type, key, value);
+   public DecrementCommand(Cache cache, CommandType type, String key, BigInteger value, boolean noReply) {
+      super(cache, type, key, value, noReply);
    }
 
    @Override

Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/DeleteCommand.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/DeleteCommand.java	2009-12-30 19:53:57 UTC (rev 1339)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/DeleteCommand.java	2010-01-04 17:46:04 UTC (rev 1340)
@@ -41,10 +41,12 @@
 
    final Cache cache;
    final String key;
+   final boolean noReply;
 
-   DeleteCommand(Cache cache, String key, long time) {
+   DeleteCommand(Cache cache, String key, long time, boolean noReply) {
       this.cache = cache;
       this.key = key;
+      this.noReply = noReply;
    }
 
    @Override
@@ -63,8 +65,10 @@
       Reply reply;
       Object prev = cache.remove(key);
       reply = reply(prev);
-      ChannelBuffers buffers = ctx.getChannelBuffers();
-      ch.write(buffers.wrappedBuffer(buffers.wrappedBuffer(reply.bytes()), buffers.wrappedBuffer(CRLF)));
+      if (!noReply) {
+         ChannelBuffers buffers = ctx.getChannelBuffers();
+         ch.write(buffers.wrappedBuffer(buffers.wrappedBuffer(reply.bytes()), buffers.wrappedBuffer(CRLF)));
+      }
       return reply;
    }
 
@@ -75,7 +79,7 @@
          return Reply.DELETED;
    }
 
-   public static DeleteCommand newDeleteCommand(Cache cache, String key) {
-      return new DeleteCommand(cache, key, 0);
+   public static DeleteCommand newDeleteCommand(Cache cache, String key, boolean noReply) {
+      return new DeleteCommand(cache, key, 0, noReply);
    }
 }

Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/FlushAllCommand.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/FlushAllCommand.java	2009-12-30 19:53:57 UTC (rev 1339)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/FlushAllCommand.java	2010-01-04 17:46:04 UTC (rev 1340)
@@ -45,11 +45,13 @@
    final Cache cache;
    final long delay;
    final ScheduledExecutorService scheduler;
+   final boolean noReply;
 
-   FlushAllCommand(Cache cache, long delay, ScheduledExecutorService scheduler) {
+   FlushAllCommand(Cache cache, long delay, ScheduledExecutorService scheduler, boolean noReply) {
       this.cache = cache;
       this.delay = delay;
       this.scheduler = scheduler;
+      this.noReply = noReply;
    }
 
    @Override
@@ -70,13 +72,15 @@
       } else {
          scheduler.schedule(new FlushAllDelayed(cache), delay, TimeUnit.SECONDS);
       }
-      ChannelBuffers buffers = ctx.getChannelBuffers();
-      ch.write(buffers.wrappedBuffer(buffers.wrappedBuffer(OK.bytes()), buffers.wrappedBuffer(CRLF)));
+      if (!noReply) {
+         ChannelBuffers buffers = ctx.getChannelBuffers();
+         ch.write(buffers.wrappedBuffer(buffers.wrappedBuffer(OK.bytes()), buffers.wrappedBuffer(CRLF)));
+      }
       return OK;
    }
 
-   public static FlushAllCommand newFlushAllCommand(Cache cache, long delay, ScheduledExecutorService scheduler) {
-      return new FlushAllCommand(cache, delay, scheduler);
+   public static FlushAllCommand newFlushAllCommand(Cache cache, long delay, ScheduledExecutorService scheduler, boolean noReply) {
+      return new FlushAllCommand(cache, delay, scheduler, noReply);
    }
 
    private static class FlushAllDelayed implements Runnable {

Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/IncrementCommand.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/IncrementCommand.java	2009-12-30 19:53:57 UTC (rev 1339)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/IncrementCommand.java	2010-01-04 17:46:04 UTC (rev 1340)
@@ -39,8 +39,8 @@
 public class IncrementCommand extends NumericCommand {
    private static final Log log = LogFactory.getLog(IncrementCommand.class);
 
-   public IncrementCommand(Cache cache, CommandType type, String key, BigInteger value) {
-      super(cache, type, key, value);
+   public IncrementCommand(Cache cache, CommandType type, String key, BigInteger value, boolean noReply) {
+      super(cache, type, key, value, noReply);
    }
 
    @Override

Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/NumericCommand.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/NumericCommand.java	2009-12-30 19:53:57 UTC (rev 1339)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/NumericCommand.java	2010-01-04 17:46:04 UTC (rev 1340)
@@ -49,12 +49,14 @@
    private final CommandType type;
    final String key;
    final BigInteger value;
+   final boolean noReply;
 
-   public NumericCommand(Cache cache, CommandType type, String key, BigInteger value) {
+   public NumericCommand(Cache cache, CommandType type, String key, BigInteger value, boolean noReply) {
       this.cache = cache;
       this.type = type;
       this.key = key;
       this.value = value;
+      this.noReply = noReply;
    }
 
    public CommandType getType() {
@@ -72,24 +74,26 @@
          byte[] newData = newBigInt.toByteArray();
          Value curr = new Value(old.getFlags(), newData, old.getCas() + 1);
          boolean replaced = cache.replace(key, old, curr);
-         if (replaced) {
+         if (replaced && !noReply) {
             ch.write(buffers.wrappedBuffer(buffers.wrappedBuffer(newBigInt.toString().getBytes()), buffers.wrappedBuffer(CRLF)));
-         } else {
+         } else if (!replaced) {
             throw new CacheException("Value modified since we retrieved from the cache, old value was " + oldBigInt);
          }
          return curr;
       } else {
-         ch.write(buffers.wrappedBuffer(buffers.wrappedBuffer(Reply.NOT_FOUND.bytes()), buffers.wrappedBuffer(CRLF)));
+         if (!noReply) {
+            ch.write(buffers.wrappedBuffer(buffers.wrappedBuffer(Reply.NOT_FOUND.bytes()), buffers.wrappedBuffer(CRLF)));
+         }
          return Reply.NOT_FOUND;
       }
    }
 
    protected abstract BigInteger operate(BigInteger oldValue, BigInteger newValue);
 
-   public static TextCommand newNumericCommand(Cache cache, CommandType type, String key, BigInteger value) throws IOException {
+   public static TextCommand newNumericCommand(Cache cache, CommandType type, String key, BigInteger value, boolean noReply) throws IOException {
       switch(type) {
-         case INCR: return new IncrementCommand(cache, type, key, value);
-         case DECR: return new DecrementCommand(cache, type, key, value);
+         case INCR: return new IncrementCommand(cache, type, key, value, noReply);
+         case DECR: return new DecrementCommand(cache, type, key, value, noReply);
          default: throw new StreamCorruptedException("Unable to build storage command for type: " + type);
       }
    }

Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/PrependCommand.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/PrependCommand.java	2009-12-30 19:53:57 UTC (rev 1339)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/PrependCommand.java	2010-01-04 17:46:04 UTC (rev 1340)
@@ -35,8 +35,8 @@
  */
 public class PrependCommand extends AppendCommand {
 
-   PrependCommand(Cache cache, CommandType type, StorageParameters params, byte[] data) {
-      super(cache, type, params, data);
+   PrependCommand(Cache cache, CommandType type, StorageParameters params, byte[] data, boolean noReply) {
+      super(cache, type, params, data, noReply);
    }
 
    @Override

Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/ReplaceCommand.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/ReplaceCommand.java	2009-12-30 19:53:57 UTC (rev 1339)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/ReplaceCommand.java	2010-01-04 17:46:04 UTC (rev 1340)
@@ -37,8 +37,8 @@
  */
 public class ReplaceCommand extends SetCommand {
 
-   ReplaceCommand(Cache<String, Value> cache, CommandType type, StorageParameters params, byte[] data) {
-      super(cache, type, params, data);
+   ReplaceCommand(Cache<String, Value> cache, CommandType type, StorageParameters params, byte[] data, boolean noReply) {
+      super(cache, type, params, data, noReply);
    }
 
    @Override

Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/SetCommand.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/SetCommand.java	2009-12-30 19:53:57 UTC (rev 1339)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/SetCommand.java	2010-01-04 17:46:04 UTC (rev 1340)
@@ -47,8 +47,8 @@
 public class SetCommand extends StorageCommand {
    private static final Log log = LogFactory.getLog(SetCommand.class);
 
-   SetCommand(Cache<String, Value> cache, CommandType type, StorageParameters params, byte[] data) {
-      super(cache, type, params, data);
+   SetCommand(Cache<String, Value> cache, CommandType type, StorageParameters params, byte[] data, boolean noReply) {
+      super(cache, type, params, data, noReply);
    }
 
    @Override
@@ -88,8 +88,10 @@
          log.error("Unexpected exception performing command", e);
          reply = Reply.NOT_STORED;
       }
-      ChannelBuffers buffers = ctx.getChannelBuffers();
-      ch.write(buffers.wrappedBuffer(buffers.wrappedBuffer(reply.bytes()), buffers.wrappedBuffer(CRLF)));
+      if (!noReply) {
+         ChannelBuffers buffers = ctx.getChannelBuffers();
+         ch.write(buffers.wrappedBuffer(buffers.wrappedBuffer(reply.bytes()), buffers.wrappedBuffer(CRLF)));
+      }
       return reply;
    }
 

Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/StorageCommand.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/StorageCommand.java	2009-12-30 19:53:57 UTC (rev 1339)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/StorageCommand.java	2010-01-04 17:46:04 UTC (rev 1340)
@@ -39,12 +39,14 @@
    final Cache<String, Value> cache;
    final StorageParameters params;
    final byte[] data;
+   final boolean noReply;
 
-   StorageCommand(Cache<String, Value> cache, CommandType type, StorageParameters params, byte[] data) {
+   StorageCommand(Cache<String, Value> cache, CommandType type, StorageParameters params, byte[] data, boolean noReply) {
       this.type = type;
       this.params = params;
       this.cache = cache;
       this.data = data;
+      this.noReply = noReply;
    }
 
    public CommandType getType() {
@@ -52,20 +54,20 @@
    }
 
    public Command setData(byte[] data) throws IOException {
-      return newStorageCommand(cache, type, params, data);
+      return newStorageCommand(cache, type, params, data, noReply);
    }
 
    public StorageParameters getParams() {
       return params;
    }
 
-   public static TextCommand newStorageCommand(Cache<String, Value> cache, CommandType type, StorageParameters params, byte[] data) throws IOException {
+   public static TextCommand newStorageCommand(Cache<String, Value> cache, CommandType type, StorageParameters params, byte[] data, boolean noReply) throws IOException {
       switch(type) {
-         case SET: return new SetCommand(cache, type, params, data);
-         case ADD: return new AddCommand(cache, type, params, data);
-         case REPLACE: return new ReplaceCommand(cache, type, params, data);
-         case APPEND: return new AppendCommand(cache, type, params, data);
-         case PREPEND: return new PrependCommand(cache, type, params, data);
+         case SET: return new SetCommand(cache, type, params, data, noReply);
+         case ADD: return new AddCommand(cache, type, params, data, noReply);
+         case REPLACE: return new ReplaceCommand(cache, type, params, data, noReply);
+         case APPEND: return new AppendCommand(cache, type, params, data, noReply);
+         case PREPEND: return new PrependCommand(cache, type, params, data, noReply);
          default: throw new StreamCorruptedException("Unable to build storage command for type: " + type);
       }
    }

Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/Value.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/Value.java	2009-12-30 19:53:57 UTC (rev 1339)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/Value.java	2010-01-04 17:46:04 UTC (rev 1340)
@@ -81,7 +81,7 @@
    @Override
    public String toString() {
       return getClass().getSimpleName() + "{" +
-         "data=" + Arrays.toString(data) +
+         "data=" + data +
          ", flags=" + flags +
          ", cas=" + cas +
          "}";

Added: trunk/server/memcached/src/test/java/org/infinispan/server/memcached/ClusterTest.java
===================================================================
--- trunk/server/memcached/src/test/java/org/infinispan/server/memcached/ClusterTest.java	                        (rev 0)
+++ trunk/server/memcached/src/test/java/org/infinispan/server/memcached/ClusterTest.java	2010-01-04 17:46:04 UTC (rev 1340)
@@ -0,0 +1,175 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat, Inc. and/or its affiliates, and
+ * individual contributors as indicated by the @author tags. See the
+ * copyright.txt file in the distribution for a full listing of
+ * individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.server.memcached;
+
+import static org.infinispan.server.memcached.test.MemcachedTestingUtil.createMemcachedClient;
+import static org.infinispan.server.memcached.test.MemcachedTestingUtil.k;
+import static org.infinispan.server.memcached.test.MemcachedTestingUtil.v;
+
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import net.spy.memcached.CASResponse;
+import net.spy.memcached.CASValue;
+import net.spy.memcached.MemcachedClient;
+
+import org.infinispan.config.Configuration;
+import org.infinispan.server.memcached.test.MemcachedTestingUtil;
+import org.infinispan.test.MultipleCacheManagersTest;
+import org.testng.annotations.Test;
+
+/**
+ * ClusterTest.
+ * 
+ * @author Galder Zamarreño
+ * @since 4.0
+ */
+ at Test(groups = "functional", testName = "server.memcached.ClusterTest")
+public class ClusterTest extends MultipleCacheManagersTest {
+   MemcachedClient client1;
+   MemcachedClient client2;
+   MemcachedTextServer server1;
+   MemcachedTextServer server2;
+   
+   @Override
+   protected void createCacheManagers() throws Throwable {
+      Configuration replSync = getDefaultClusteredConfig(Configuration.CacheMode.REPL_SYNC);
+      createClusteredCaches(2, "replSync", replSync);
+      server1 = MemcachedTestingUtil.createMemcachedTextServer(cache(0, "replSync"));
+      server1.start();
+      server2 = MemcachedTestingUtil.createMemcachedTextServer(cache(1, "replSync"), server1.getPort() + 50);
+      server2.start();
+      client1 = createMemcachedClient(60000, server1.getPort());
+      client2 = createMemcachedClient(60000, server2.getPort());
+   }
+
+   public void testReplicatedSet(Method m) throws Exception {
+      Future<Boolean> f = client1.set(k(m), 0, v(m));
+      assert f.get(120, TimeUnit.SECONDS);
+      assert v(m).equals(client2.get(k(m)));
+   }
+
+   public void testReplicatedGetMultipleKeys(Method m) throws Exception {
+      Future<Boolean> f1 = client1.set(k(m, "k1-"), 0, v(m, "v1-"));
+      Future<Boolean> f2 = client1.set(k(m, "k2-"), 0, v(m, "v2-"));
+      Future<Boolean> f3 = client1.set(k(m, "k3-"), 0, v(m, "v3-"));
+      assert f1.get(5, TimeUnit.SECONDS);
+      assert f2.get(5, TimeUnit.SECONDS);
+      assert f3.get(5, TimeUnit.SECONDS);
+      
+      Map<String, Object> ret = client2.getBulk(Arrays.asList(new String[]{k(m, "k1-"), k(m, "k2-"), k(m, "k3-")}));
+      assert ret.get(k(m, "k1-")).equals(v(m, "v1-"));
+      assert ret.get(k(m, "k2-")).equals(v(m, "v2-"));
+      assert ret.get(k(m, "k3-")).equals(v(m, "v3-"));
+   }
+
+   public void testReplicatedAdd(Method m) throws Exception {
+      Future<Boolean> f = client1.add(k(m), 0, v(m));
+      assert f.get(5, TimeUnit.SECONDS);
+      assert v(m).equals(client2.get(k(m)));
+   }
+
+   public void testReplicatedReplace(Method m) throws Exception {
+      Future<Boolean> f = client1.add(k(m), 0, v(m));
+      assert(f.get(5, TimeUnit.SECONDS));
+      assert v(m).equals(client2.get(k(m)));
+      
+      f = client2.replace(k(m), 0, v(m, "k1-"));
+      assert f.get(5, TimeUnit.SECONDS);
+      assert v(m, "k1-").equals(client1.get(k(m)));
+   }
+
+   public void testReplicatedAppend(Method m) throws Exception {
+      Future<Boolean> f = client1.add(k(m), 0, v(m));
+      assert f.get(5, TimeUnit.SECONDS);
+      assert v(m).equals(client2.get(k(m)));
+
+      f = client2.append(0, k(m), v(m, "v1-"));
+      assert f.get(5, TimeUnit.SECONDS);
+      String expected = v(m).toString() + v(m, "v1-").toString();
+      assert expected.equals(client1.get(k(m)));
+   }
+
+   public void testReplicatedPrepend(Method m) throws Exception {
+      Future<Boolean> f = client1.add(k(m), 0, v(m));
+      assert f.get(5, TimeUnit.SECONDS);
+      assert v(m).equals(client2.get(k(m)));
+
+      f = client2.prepend(0, k(m), v(m, "v1-"));
+      assert f.get(5, TimeUnit.SECONDS);
+      String expected = v(m, "v1-").toString() + v(m).toString();
+      assert expected.equals(client1.get(k(m)));
+   }
+
+   public void testReplicatedGets(Method m) throws Exception {
+      Future<Boolean> f = client1.set(k(m), 0, v(m));
+      assert f.get(5, TimeUnit.SECONDS);
+      CASValue<Object> value = client2.gets(k(m));
+      assert v(m).equals(value.getValue());
+      assert value.getCas() != 0;
+   }
+
+   public void testReplicatedCasExists(Method m) throws Exception {
+      Future<Boolean> f = client1.set(k(m), 0, v(m));
+      assert f.get(5, TimeUnit.SECONDS);
+      CASValue<Object> value = client2.gets(k(m));
+      assert v(m).equals(value.getValue());
+      assert value.getCas() != 0;
+
+      long old = value.getCas();
+      CASResponse resp = client2.cas(k(m), value.getCas(), v(m, "v1-"));
+      value = client1.gets(k(m));
+      assert v(m, "v1-").equals(value.getValue());
+      assert value.getCas() != 0;
+      assert value.getCas() != old;
+
+      resp = client1.cas(k(m), old, v(m, "v2-"));
+      assert CASResponse.EXISTS == resp;
+
+      resp = client2.cas(k(m), value.getCas(), v(m, "v2-"));
+      assert CASResponse.OK == resp;
+   }
+
+   public void testReplicatedDelete(Method m) throws Exception {
+      Future<Boolean> f = client1.set(k(m), 0, v(m));
+      assert f.get(5, TimeUnit.SECONDS);
+      f = client2.delete(k(m));
+      assert f.get(5, TimeUnit.SECONDS);
+   }
+
+   public void testReplicatedIncrement(Method m) throws Exception {
+      Future<Boolean> f = client1.set(k(m), 0, 1);
+      assert f.get(5, TimeUnit.SECONDS);
+      assert 2 == client2.incr(k(m), 1);
+   }
+
+   public void testReplicatedDecrement(Method m) throws Exception {
+      Future<Boolean> f = client1.set(k(m), 0, 1);
+      assert f.get(5, TimeUnit.SECONDS);
+      assert 0 == client2.decr(k(m), 1);
+   }
+
+}

Modified: trunk/server/memcached/src/test/java/org/infinispan/server/memcached/FunctionalTest.java
===================================================================
--- trunk/server/memcached/src/test/java/org/infinispan/server/memcached/FunctionalTest.java	2009-12-30 19:53:57 UTC (rev 1339)
+++ trunk/server/memcached/src/test/java/org/infinispan/server/memcached/FunctionalTest.java	2010-01-04 17:46:04 UTC (rev 1340)
@@ -58,7 +58,7 @@
    @Override
    protected CacheManager createCacheManager() throws Exception {
       cacheManager = TestCacheManagerFactory.createLocalCacheManager();
-      server = MemcachedTestingUtil.createMemcachedTextServer(cacheManager);
+      server = MemcachedTestingUtil.createMemcachedTextServer(cacheManager.getCache());
       server.start();
       client = createMemcachedClient(60000, server.getPort());
       return cacheManager;
@@ -68,7 +68,7 @@
    protected void destroyAfterClass() {
       server.stop();
    }
-   
+
    public void testSetBasic(Method m) throws Exception {
       Future<Boolean> f = client.set(k(m), 0, v(m));
       assert f.get(5, TimeUnit.SECONDS);

Modified: trunk/server/memcached/src/test/java/org/infinispan/server/memcached/StatsTest.java
===================================================================
--- trunk/server/memcached/src/test/java/org/infinispan/server/memcached/StatsTest.java	2009-12-30 19:53:57 UTC (rev 1339)
+++ trunk/server/memcached/src/test/java/org/infinispan/server/memcached/StatsTest.java	2010-01-04 17:46:04 UTC (rev 1340)
@@ -57,7 +57,7 @@
    @Override
    protected CacheManager createCacheManager() throws Exception {
       cacheManager = TestCacheManagerFactory.createJmxEnabledCacheManager(JMX_DOMAIN);
-      server = MemcachedTestingUtil.createMemcachedTextServer(cacheManager);
+      server = MemcachedTestingUtil.createMemcachedTextServer(cacheManager.getCache());
       server.start();
       client = createMemcachedClient(60000, server.getPort());
       return cacheManager;

Modified: trunk/server/memcached/src/test/java/org/infinispan/server/memcached/test/MemcachedTestingUtil.java
===================================================================
--- trunk/server/memcached/src/test/java/org/infinispan/server/memcached/test/MemcachedTestingUtil.java	2009-12-30 19:53:57 UTC (rev 1339)
+++ trunk/server/memcached/src/test/java/org/infinispan/server/memcached/test/MemcachedTestingUtil.java	2010-01-04 17:46:04 UTC (rev 1340)
@@ -28,7 +28,7 @@
 import java.util.Arrays;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.infinispan.manager.CacheManager;
+import org.infinispan.Cache;
 import org.infinispan.server.memcached.MemcachedTextServer;
 
 import net.spy.memcached.DefaultConnectionFactory;
@@ -77,7 +77,11 @@
       return new MemcachedClient(d, Arrays.asList(new InetSocketAddress[]{new InetSocketAddress(port)}));
    }
 
-   public static MemcachedTextServer createMemcachedTextServer(CacheManager cacheManager) {
-      return new MemcachedTextServer(cacheManager, threadMemcachedPort.get());
+   public static MemcachedTextServer createMemcachedTextServer(Cache cache) {
+      return new MemcachedTextServer(cache, threadMemcachedPort.get());
    }
+
+   public static MemcachedTextServer createMemcachedTextServer(Cache cache, int port) {
+      return new MemcachedTextServer(cache, port);
+   }
 }



More information about the infinispan-commits mailing list