[infinispan-commits] Infinispan SVN: r1249 - in trunk/server/memcached/src: test/java/org/infinispan/server/memcached and 1 other directory.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Thu Dec 3 13:57:23 EST 2009


Author: galder.zamarreno at jboss.com
Date: 2009-12-03 13:57:23 -0500 (Thu, 03 Dec 2009)
New Revision: 1249

Added:
   trunk/server/memcached/src/main/java/org/infinispan/server/memcached/GetsCommand.java
Removed:
   trunk/server/memcached/src/main/java/org/infinispan/server/memcached/CasValue.java
Modified:
   trunk/server/memcached/src/main/java/org/infinispan/server/memcached/AddCommand.java
   trunk/server/memcached/src/main/java/org/infinispan/server/memcached/AppendCommand.java
   trunk/server/memcached/src/main/java/org/infinispan/server/memcached/GetCommand.java
   trunk/server/memcached/src/main/java/org/infinispan/server/memcached/PrependCommand.java
   trunk/server/memcached/src/main/java/org/infinispan/server/memcached/ReplaceCommand.java
   trunk/server/memcached/src/main/java/org/infinispan/server/memcached/RetrievalCommand.java
   trunk/server/memcached/src/main/java/org/infinispan/server/memcached/SetCommand.java
   trunk/server/memcached/src/main/java/org/infinispan/server/memcached/StorageCommand.java
   trunk/server/memcached/src/main/java/org/infinispan/server/memcached/Value.java
   trunk/server/memcached/src/test/java/org/infinispan/server/memcached/FunctionalTest.java
Log:
[ISPN-173] (Build memcached server module) Implemented gets and part of cas command.

Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/AddCommand.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/AddCommand.java	2009-12-02 20:18:03 UTC (rev 1248)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/AddCommand.java	2009-12-03 18:57:23 UTC (rev 1249)
@@ -34,28 +34,22 @@
  */
 public class AddCommand extends SetCommand {
 
-   AddCommand(Cache cache, StorageParameters params, byte[] data) {
-      super(cache, CommandType.ADD, params, data);
+   AddCommand(Cache cache, CommandType type, StorageParameters params, byte[] data) {
+      super(cache, type, params, data);
    }
 
    @Override
-   protected StorageReply put(String key, Value value) {
-      Object prev = cache.putIfAbsent(params.key, value);
-      return reply(prev);
+   protected StorageReply put(String key, int flags, byte[] data) {
+      return put(key, flags, data, -1);
    }
 
    @Override
-   protected StorageReply putExpiry(String key, Value value, long expiry) {
-      Object prev = cache.putIfAbsent(params.key, value, expiry, TimeUnit.SECONDS);
+   protected StorageReply put(String key, int flags, byte[] data, long expiry) {
+      Value value = new Value(flags, data);
+      Object prev = cache.putIfAbsent(key, value, expiry, TimeUnit.MILLISECONDS);
       return reply(prev);
    }
 
-   @Override
-   protected StorageReply putExpiryUnixTime(String key, Value value, long expiry) {
-      Object prev = cache.putIfAbsent(params.key, value, expiry, TimeUnit.MILLISECONDS);
-      return reply(prev);
-   }
-
    private StorageReply reply(Object prev) {
       if (prev == null)
          return StorageReply.STORED;

Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/AppendCommand.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/AppendCommand.java	2009-12-02 20:18:03 UTC (rev 1248)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/AppendCommand.java	2009-12-03 18:57:23 UTC (rev 1249)
@@ -41,11 +41,12 @@
    }
 
    @Override
-   protected StorageReply put(String key, Value append) {
+   protected StorageReply put(String key, int flags, byte[] data) {
+      Value append = new Value(flags, data);
       Value current = (Value) cache.get(key);
       if (current != null) {
-         byte[] data = concat(current.getData(), append.getData());
-         Value next = new Value(current.getFlags(), data);
+         byte[] concatenated = concat(current.getData(), append.getData());
+         Value next = new Value(current.getFlags(), concatenated);
          boolean replaced = cache.replace(key, current, next);
          if (replaced)
             return StorageReply.STORED;
@@ -61,13 +62,8 @@
    }
 
    @Override
-   protected StorageReply putExpiry(String key, Value value, long expiry) {
-      return put(key, value); // ignore expiry
+   protected StorageReply put(String key, int flags, byte[] data, long expiry) {
+      return put(key, flags, data); // ignore expiry
    }
 
-   @Override
-   protected StorageReply putExpiryUnixTime(String key, Value value, long expiry) {
-      return put(key, value); // ignore expiry
-   }
-
 }

Deleted: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/CasValue.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/CasValue.java	2009-12-02 20:18:03 UTC (rev 1248)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/CasValue.java	2009-12-03 18:57:23 UTC (rev 1249)
@@ -1,39 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2009, Red Hat, Inc. and/or its affiliates, and
- * individual contributors as indicated by the @author tags. See the
- * copyright.txt file in the distribution for a full listing of
- * individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.infinispan.server.memcached;
-
-/**
- * CasValue.
- * 
- * @author Galder Zamarreño
- * @since 4.0
- */
-public class CasValue {
-   final Value value;
-   final long unique;
-   
-   CasValue(Value value, long unique) {
-      this.value = value;
-      this.unique = unique;
-   }
-}

Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/GetCommand.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/GetCommand.java	2009-12-02 20:18:03 UTC (rev 1248)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/GetCommand.java	2009-12-03 18:57:23 UTC (rev 1249)
@@ -43,17 +43,14 @@
       super(cache, type, params);
    }
 
+   
    @Override
    public Object perform(Channel ch) throws Exception {
       ChannelBuffer buffer;
       for (String key : params.keys) {
          Value value = (Value) cache.get(key);
          if (value != null) {
-            StringBuilder sb = new StringBuilder();
-            sb.append(VALUE).append(" ")
-               .append(key).append(" ")
-               .append(value.getFlags()).append(" ")
-               .append(value.getData().length).append(" ");
+            StringBuilder sb = constructValue(key, value);
             buffer = wrappedBuffer(wrappedBuffer(sb.toString().getBytes()), wrappedBuffer(CRLF),
                      wrappedBuffer(value.getData()), wrappedBuffer(CRLF));
             ch.write(buffer);
@@ -64,4 +61,12 @@
       return null;
    }
 
+   protected StringBuilder constructValue(String key, Value value) {
+      StringBuilder sb = new StringBuilder();
+      sb.append(VALUE).append(" ")
+         .append(key).append(" ")
+         .append(value.getFlags()).append(" ")
+         .append(value.getData().length).append(" ");
+      return sb;
+   }
 }

Added: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/GetsCommand.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/GetsCommand.java	                        (rev 0)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/GetsCommand.java	2009-12-03 18:57:23 UTC (rev 1249)
@@ -0,0 +1,45 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat, Inc. and/or its affiliates, and
+ * individual contributors as indicated by the @author tags. See the
+ * copyright.txt file in the distribution for a full listing of
+ * individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.server.memcached;
+
+import org.infinispan.Cache;
+
+/**
+ * GetsCommand.
+ * 
+ * @author Galder Zamarreño
+ * @since 4.0
+ */
+public class GetsCommand extends GetCommand {
+
+   GetsCommand(Cache cache, CommandType type, RetrievalParameters params) {
+      super(cache, type, params);
+   }
+
+   @Override
+   protected StringBuilder constructValue(String key, Value value) {
+      return super.constructValue(key, value)
+         .append(value.getCas()).append(" ");
+   }
+
+}

Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/PrependCommand.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/PrependCommand.java	2009-12-02 20:18:03 UTC (rev 1248)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/PrependCommand.java	2009-12-03 18:57:23 UTC (rev 1249)
@@ -32,8 +32,8 @@
  */
 public class PrependCommand extends AppendCommand {
 
-   PrependCommand(Cache cache, StorageParameters params, byte[] data) {
-      super(cache, CommandType.PREPEND, params, data);
+   PrependCommand(Cache cache, CommandType type, StorageParameters params, byte[] data) {
+      super(cache, type, params, data);
    }
 
    @Override

Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/ReplaceCommand.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/ReplaceCommand.java	2009-12-02 20:18:03 UTC (rev 1248)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/ReplaceCommand.java	2009-12-03 18:57:23 UTC (rev 1249)
@@ -34,24 +34,18 @@
  */
 public class ReplaceCommand extends SetCommand {
 
-   ReplaceCommand(Cache cache, StorageParameters params, byte[] data) {
-      super(cache, CommandType.REPLACE, params, data);
+   ReplaceCommand(Cache cache, CommandType type, StorageParameters params, byte[] data) {
+      super(cache, type, params, data);
    }
 
    @Override
-   protected StorageReply put(String key, Value value) {
-      Object prev = cache.replace(params.key, value);
-      return reply(prev);
+   protected StorageReply put(String key, int flags, byte[] data) {
+      return put(key, flags, data, -1);
    }
 
    @Override
-   protected StorageReply putExpiry(String key, Value value, long expiry) {
-      Object prev = cache.replace(params.key, value, expiry, TimeUnit.SECONDS);
-      return reply(prev);
-   }
-
-   @Override
-   protected StorageReply putExpiryUnixTime(String key, Value value, long expiry) {
+   protected StorageReply put(String key, int flags, byte[] data, long expiry) {
+      Value value = new Value(flags, data);
       Object prev = cache.replace(params.key, value, expiry, TimeUnit.MILLISECONDS);
       return reply(prev);
    }

Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/RetrievalCommand.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/RetrievalCommand.java	2009-12-02 20:18:03 UTC (rev 1248)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/RetrievalCommand.java	2009-12-03 18:57:23 UTC (rev 1249)
@@ -49,10 +49,8 @@
    public static Command newRetrievalCommand(Cache cache, CommandType type, RetrievalParameters params) {
       switch(type) {
          case GET: return new GetCommand(cache, type, params);
-//       case GETS: ...
+         case GETS: return new GetsCommand(cache, type, params);
          default: throw new IllegalStateException("Unable to build storage command for type: " + type);
       }
-      
-      
    }
 }

Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/SetCommand.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/SetCommand.java	2009-12-02 20:18:03 UTC (rev 1248)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/SetCommand.java	2009-12-03 18:57:23 UTC (rev 1249)
@@ -44,10 +44,6 @@
 
    private static final Log log = LogFactory.getLog(SetCommand.class);
 
-   SetCommand(Cache cache, StorageParameters params, byte[] data) {
-      super(cache, CommandType.SET, params, data);
-   }
-   
    SetCommand(Cache cache, CommandType type, StorageParameters params, byte[] data) {
       super(cache, type, params, data);
    }
@@ -56,16 +52,15 @@
    public Object perform(Channel ch) throws Exception {
       StorageReply reply;
       try {
-         Value value = new Value(params.flags, data);
          if (params.expiry == 0) {
-            reply = put(params.key, value);
+            reply = put(params.key, params.flags, data);
          } else {
             if (params.expiry > 60*60*24*30) {
                // If expiry bigger number of seconds in 30 days, then it's considered unix time
                long future = TimeUnit.SECONDS.toMillis(params.expiry);
                long expiry = future - System.currentTimeMillis();
                if (expiry > 0) {
-                  reply = putExpiryUnixTime(params.key, value, expiry);
+                  reply = put(params.key, params.flags, data, expiry);
                } else {
                   StringBuilder sb = new StringBuilder();
                   sb.append("Given expiry is bigger than 30 days, hence is treated as Unix time, ")
@@ -74,7 +69,9 @@
                   throw new CacheException(sb.toString());
                }
             } else {
-               reply = putExpiry(params.key, value, params.expiry);
+               // Convert seconds to milliseconds to simplify code
+               long expiry = TimeUnit.SECONDS.toMillis(params.expiry);
+               reply = put(params.key, params.flags, data, expiry);
             }
          }
          
@@ -86,21 +83,16 @@
       return null;
    }
 
-   protected StorageReply put(String key, Value value) {
-      cache.put(params.key, value);
-      return reply();
+   protected StorageReply put(String key, int flags, byte[] data) {
+      return put(key, flags, data, -1);
    }
 
-   protected StorageReply putExpiry(String key, Value value, long expiry) {
-      cache.put(params.key, value, params.expiry, TimeUnit.SECONDS);
+   protected StorageReply put(String key, int flags, byte[] data, long expiry) {
+      Value value = new Value(flags, data);
+      cache.put(key, value, expiry, TimeUnit.MILLISECONDS);
       return reply();
    }
 
-   protected StorageReply putExpiryUnixTime(String key, Value value, long expiry) {
-      cache.put(params.key, value, expiry, TimeUnit.MILLISECONDS);
-      return reply();
-   }
-
    private StorageReply reply() {
       return StorageReply.STORED;
    }

Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/StorageCommand.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/StorageCommand.java	2009-12-02 20:18:03 UTC (rev 1248)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/StorageCommand.java	2009-12-03 18:57:23 UTC (rev 1249)
@@ -64,11 +64,11 @@
 
    public static Command newStorageCommand(Cache cache, CommandType type, StorageParameters params, byte[] data) throws IOException {
       switch(type) {
-         case SET: return new SetCommand(cache, params, data);
-         case ADD: return new AddCommand(cache, params, data);
-         case REPLACE: return new ReplaceCommand(cache, params, data);
-         case APPEND: return new AppendCommand(cache, params, data);
-         case PREPEND: return new PrependCommand(cache, params, data);
+         case SET: return new SetCommand(cache, type, params, data);
+         case ADD: return new AddCommand(cache, type, params, data);
+         case REPLACE: return new ReplaceCommand(cache, type, params, data);
+         case APPEND: return new AppendCommand(cache, type, params, data);
+         case PREPEND: return new PrependCommand(cache, type, params, data);
          default: throw new StreamCorruptedException("Unable to build storage command for type: " + type);
       }
    }

Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/Value.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/Value.java	2009-12-02 20:18:03 UTC (rev 1248)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/Value.java	2009-12-03 18:57:23 UTC (rev 1249)
@@ -37,10 +37,17 @@
 class Value implements Externalizable {
    private int flags;
    private byte[] data;
+   private long cas;
    
    Value(int flags, byte[] data) {
       this.flags = flags;
       this.data = data;
+      // Since nano time is an offset from an arbitrary time, it is very unlikely that 
+      // two threads running on different VMs will generate the same value, so even two 
+      // modifications on the same key on different VMs will generate different cas, 
+      // making it a safe option for cas unique id.
+      // Also, using nano time avoid issues after expiration and eviction.
+      cas = System.nanoTime();
    }
 
    public int getFlags() {
@@ -51,6 +58,10 @@
       return data;
    }
 
+   public long getCas() {
+      return cas;
+   }
+
    @Override
    public boolean equals(Object obj) {
       if (obj == this)
@@ -58,7 +69,9 @@
       if (!(obj instanceof Value))
          return false;
       Value other = (Value) obj;
-      return Arrays.equals(data, other.data) && flags == other.flags;
+      return Arrays.equals(data, other.data) 
+         && flags == other.flags
+         && cas == other.cas;
    }
 
    @Override
@@ -66,6 +79,7 @@
       int result = 17;
       result = 31 * result + flags;
       result = 31 * result + data.hashCode();
+      result = 31 * result + (int)(cas ^ (cas >>> 32));
       return result;
    }
 
@@ -74,6 +88,7 @@
       flags = in.read();
       data = new byte[in.read()];
       in.read(data);
+      cas = in.readLong();
    }
 
    @Override
@@ -81,5 +96,6 @@
       out.write(flags);
       out.write(data.length);
       out.write(data);
+      out.writeLong(cas);
    }
 }

Modified: trunk/server/memcached/src/test/java/org/infinispan/server/memcached/FunctionalTest.java
===================================================================
--- trunk/server/memcached/src/test/java/org/infinispan/server/memcached/FunctionalTest.java	2009-12-02 20:18:03 UTC (rev 1248)
+++ trunk/server/memcached/src/test/java/org/infinispan/server/memcached/FunctionalTest.java	2009-12-03 18:57:23 UTC (rev 1249)
@@ -29,6 +29,8 @@
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
+import net.spy.memcached.CASResponse;
+import net.spy.memcached.CASValue;
 import net.spy.memcached.DefaultConnectionFactory;
 import net.spy.memcached.MemcachedClient;
 
@@ -67,7 +69,7 @@
    public void testBasicSet(Method m) throws Exception {
       Future<Boolean> f = client.set(k(m), 0, v(m));
       assert f.get(5, TimeUnit.SECONDS);
-      assert client.get(k(m)).equals(v(m));
+      assert v(m).equals(client.get(k(m)));
    }
 
    public void testSetWithExpirySeconds(Method m) throws Exception {
@@ -201,6 +203,25 @@
       assert expected.equals(client.get(k(m)));
    }
 
+   public void testBasicGets(Method m) throws Exception {
+      Future<Boolean> f = client.set(k(m), 0, v(m));
+      assert f.get(5, TimeUnit.SECONDS);
+      CASValue<Object> value = client.gets(k(m));
+      assert v(m).equals(value.getValue());
+      assert value.getCas() != 0;
+   }
+
+   public void testBasicCas(Method m) throws Exception {
+      Future<Boolean> f = client.set(k(m), 0, v(m));
+      assert f.get(5, TimeUnit.SECONDS);
+      CASValue<Object> value = client.gets(k(m));
+      assert v(m).equals(value.getValue());
+      assert value.getCas() != 0;
+
+      CASResponse resp = client.cas(k(m), value.getCas(), v(m, "k1-"));
+      assert CASResponse.OK == resp;
+   }
+
    private String k(Method method, String prefix) {
       return prefix + method.getName();
    }



More information about the infinispan-commits mailing list