[infinispan-commits] Infinispan SVN: r1249 - in trunk/server/memcached/src: test/java/org/infinispan/server/memcached and 1 other directory.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Thu Dec 3 13:57:23 EST 2009
Author: galder.zamarreno at jboss.com
Date: 2009-12-03 13:57:23 -0500 (Thu, 03 Dec 2009)
New Revision: 1249
Added:
trunk/server/memcached/src/main/java/org/infinispan/server/memcached/GetsCommand.java
Removed:
trunk/server/memcached/src/main/java/org/infinispan/server/memcached/CasValue.java
Modified:
trunk/server/memcached/src/main/java/org/infinispan/server/memcached/AddCommand.java
trunk/server/memcached/src/main/java/org/infinispan/server/memcached/AppendCommand.java
trunk/server/memcached/src/main/java/org/infinispan/server/memcached/GetCommand.java
trunk/server/memcached/src/main/java/org/infinispan/server/memcached/PrependCommand.java
trunk/server/memcached/src/main/java/org/infinispan/server/memcached/ReplaceCommand.java
trunk/server/memcached/src/main/java/org/infinispan/server/memcached/RetrievalCommand.java
trunk/server/memcached/src/main/java/org/infinispan/server/memcached/SetCommand.java
trunk/server/memcached/src/main/java/org/infinispan/server/memcached/StorageCommand.java
trunk/server/memcached/src/main/java/org/infinispan/server/memcached/Value.java
trunk/server/memcached/src/test/java/org/infinispan/server/memcached/FunctionalTest.java
Log:
[ISPN-173] (Build memcached server module) Implemented gets and part of cas command.
Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/AddCommand.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/AddCommand.java 2009-12-02 20:18:03 UTC (rev 1248)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/AddCommand.java 2009-12-03 18:57:23 UTC (rev 1249)
@@ -34,28 +34,22 @@
*/
public class AddCommand extends SetCommand {
- AddCommand(Cache cache, StorageParameters params, byte[] data) {
- super(cache, CommandType.ADD, params, data);
+ AddCommand(Cache cache, CommandType type, StorageParameters params, byte[] data) {
+ super(cache, type, params, data);
}
@Override
- protected StorageReply put(String key, Value value) {
- Object prev = cache.putIfAbsent(params.key, value);
- return reply(prev);
+ protected StorageReply put(String key, int flags, byte[] data) {
+ return put(key, flags, data, -1);
}
@Override
- protected StorageReply putExpiry(String key, Value value, long expiry) {
- Object prev = cache.putIfAbsent(params.key, value, expiry, TimeUnit.SECONDS);
+ protected StorageReply put(String key, int flags, byte[] data, long expiry) {
+ Value value = new Value(flags, data);
+ Object prev = cache.putIfAbsent(key, value, expiry, TimeUnit.MILLISECONDS);
return reply(prev);
}
- @Override
- protected StorageReply putExpiryUnixTime(String key, Value value, long expiry) {
- Object prev = cache.putIfAbsent(params.key, value, expiry, TimeUnit.MILLISECONDS);
- return reply(prev);
- }
-
private StorageReply reply(Object prev) {
if (prev == null)
return StorageReply.STORED;
Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/AppendCommand.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/AppendCommand.java 2009-12-02 20:18:03 UTC (rev 1248)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/AppendCommand.java 2009-12-03 18:57:23 UTC (rev 1249)
@@ -41,11 +41,12 @@
}
@Override
- protected StorageReply put(String key, Value append) {
+ protected StorageReply put(String key, int flags, byte[] data) {
+ Value append = new Value(flags, data);
Value current = (Value) cache.get(key);
if (current != null) {
- byte[] data = concat(current.getData(), append.getData());
- Value next = new Value(current.getFlags(), data);
+ byte[] concatenated = concat(current.getData(), append.getData());
+ Value next = new Value(current.getFlags(), concatenated);
boolean replaced = cache.replace(key, current, next);
if (replaced)
return StorageReply.STORED;
@@ -61,13 +62,8 @@
}
@Override
- protected StorageReply putExpiry(String key, Value value, long expiry) {
- return put(key, value); // ignore expiry
+ protected StorageReply put(String key, int flags, byte[] data, long expiry) {
+ return put(key, flags, data); // ignore expiry
}
- @Override
- protected StorageReply putExpiryUnixTime(String key, Value value, long expiry) {
- return put(key, value); // ignore expiry
- }
-
}
Deleted: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/CasValue.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/CasValue.java 2009-12-02 20:18:03 UTC (rev 1248)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/CasValue.java 2009-12-03 18:57:23 UTC (rev 1249)
@@ -1,39 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2009, Red Hat, Inc. and/or its affiliates, and
- * individual contributors as indicated by the @author tags. See the
- * copyright.txt file in the distribution for a full listing of
- * individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.infinispan.server.memcached;
-
-/**
- * CasValue.
- *
- * @author Galder Zamarreño
- * @since 4.0
- */
-public class CasValue {
- final Value value;
- final long unique;
-
- CasValue(Value value, long unique) {
- this.value = value;
- this.unique = unique;
- }
-}
Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/GetCommand.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/GetCommand.java 2009-12-02 20:18:03 UTC (rev 1248)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/GetCommand.java 2009-12-03 18:57:23 UTC (rev 1249)
@@ -43,17 +43,14 @@
super(cache, type, params);
}
+
@Override
public Object perform(Channel ch) throws Exception {
ChannelBuffer buffer;
for (String key : params.keys) {
Value value = (Value) cache.get(key);
if (value != null) {
- StringBuilder sb = new StringBuilder();
- sb.append(VALUE).append(" ")
- .append(key).append(" ")
- .append(value.getFlags()).append(" ")
- .append(value.getData().length).append(" ");
+ StringBuilder sb = constructValue(key, value);
buffer = wrappedBuffer(wrappedBuffer(sb.toString().getBytes()), wrappedBuffer(CRLF),
wrappedBuffer(value.getData()), wrappedBuffer(CRLF));
ch.write(buffer);
@@ -64,4 +61,12 @@
return null;
}
+ protected StringBuilder constructValue(String key, Value value) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(VALUE).append(" ")
+ .append(key).append(" ")
+ .append(value.getFlags()).append(" ")
+ .append(value.getData().length).append(" ");
+ return sb;
+ }
}
Added: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/GetsCommand.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/GetsCommand.java (rev 0)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/GetsCommand.java 2009-12-03 18:57:23 UTC (rev 1249)
@@ -0,0 +1,45 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat, Inc. and/or its affiliates, and
+ * individual contributors as indicated by the @author tags. See the
+ * copyright.txt file in the distribution for a full listing of
+ * individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.server.memcached;
+
+import org.infinispan.Cache;
+
+/**
+ * GetsCommand.
+ *
+ * @author Galder Zamarreño
+ * @since 4.0
+ */
+public class GetsCommand extends GetCommand {
+
+ GetsCommand(Cache cache, CommandType type, RetrievalParameters params) {
+ super(cache, type, params);
+ }
+
+ @Override
+ protected StringBuilder constructValue(String key, Value value) {
+ return super.constructValue(key, value)
+ .append(value.getCas()).append(" ");
+ }
+
+}
Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/PrependCommand.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/PrependCommand.java 2009-12-02 20:18:03 UTC (rev 1248)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/PrependCommand.java 2009-12-03 18:57:23 UTC (rev 1249)
@@ -32,8 +32,8 @@
*/
public class PrependCommand extends AppendCommand {
- PrependCommand(Cache cache, StorageParameters params, byte[] data) {
- super(cache, CommandType.PREPEND, params, data);
+ PrependCommand(Cache cache, CommandType type, StorageParameters params, byte[] data) {
+ super(cache, type, params, data);
}
@Override
Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/ReplaceCommand.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/ReplaceCommand.java 2009-12-02 20:18:03 UTC (rev 1248)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/ReplaceCommand.java 2009-12-03 18:57:23 UTC (rev 1249)
@@ -34,24 +34,18 @@
*/
public class ReplaceCommand extends SetCommand {
- ReplaceCommand(Cache cache, StorageParameters params, byte[] data) {
- super(cache, CommandType.REPLACE, params, data);
+ ReplaceCommand(Cache cache, CommandType type, StorageParameters params, byte[] data) {
+ super(cache, type, params, data);
}
@Override
- protected StorageReply put(String key, Value value) {
- Object prev = cache.replace(params.key, value);
- return reply(prev);
+ protected StorageReply put(String key, int flags, byte[] data) {
+ return put(key, flags, data, -1);
}
@Override
- protected StorageReply putExpiry(String key, Value value, long expiry) {
- Object prev = cache.replace(params.key, value, expiry, TimeUnit.SECONDS);
- return reply(prev);
- }
-
- @Override
- protected StorageReply putExpiryUnixTime(String key, Value value, long expiry) {
+ protected StorageReply put(String key, int flags, byte[] data, long expiry) {
+ Value value = new Value(flags, data);
Object prev = cache.replace(params.key, value, expiry, TimeUnit.MILLISECONDS);
return reply(prev);
}
Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/RetrievalCommand.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/RetrievalCommand.java 2009-12-02 20:18:03 UTC (rev 1248)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/RetrievalCommand.java 2009-12-03 18:57:23 UTC (rev 1249)
@@ -49,10 +49,8 @@
public static Command newRetrievalCommand(Cache cache, CommandType type, RetrievalParameters params) {
switch(type) {
case GET: return new GetCommand(cache, type, params);
-// case GETS: ...
+ case GETS: return new GetsCommand(cache, type, params);
default: throw new IllegalStateException("Unable to build storage command for type: " + type);
}
-
-
}
}
Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/SetCommand.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/SetCommand.java 2009-12-02 20:18:03 UTC (rev 1248)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/SetCommand.java 2009-12-03 18:57:23 UTC (rev 1249)
@@ -44,10 +44,6 @@
private static final Log log = LogFactory.getLog(SetCommand.class);
- SetCommand(Cache cache, StorageParameters params, byte[] data) {
- super(cache, CommandType.SET, params, data);
- }
-
SetCommand(Cache cache, CommandType type, StorageParameters params, byte[] data) {
super(cache, type, params, data);
}
@@ -56,16 +52,15 @@
public Object perform(Channel ch) throws Exception {
StorageReply reply;
try {
- Value value = new Value(params.flags, data);
if (params.expiry == 0) {
- reply = put(params.key, value);
+ reply = put(params.key, params.flags, data);
} else {
if (params.expiry > 60*60*24*30) {
// If expiry bigger number of seconds in 30 days, then it's considered unix time
long future = TimeUnit.SECONDS.toMillis(params.expiry);
long expiry = future - System.currentTimeMillis();
if (expiry > 0) {
- reply = putExpiryUnixTime(params.key, value, expiry);
+ reply = put(params.key, params.flags, data, expiry);
} else {
StringBuilder sb = new StringBuilder();
sb.append("Given expiry is bigger than 30 days, hence is treated as Unix time, ")
@@ -74,7 +69,9 @@
throw new CacheException(sb.toString());
}
} else {
- reply = putExpiry(params.key, value, params.expiry);
+ // Convert seconds to milliseconds to simplify code
+ long expiry = TimeUnit.SECONDS.toMillis(params.expiry);
+ reply = put(params.key, params.flags, data, expiry);
}
}
@@ -86,21 +83,16 @@
return null;
}
- protected StorageReply put(String key, Value value) {
- cache.put(params.key, value);
- return reply();
+ protected StorageReply put(String key, int flags, byte[] data) {
+ return put(key, flags, data, -1);
}
- protected StorageReply putExpiry(String key, Value value, long expiry) {
- cache.put(params.key, value, params.expiry, TimeUnit.SECONDS);
+ protected StorageReply put(String key, int flags, byte[] data, long expiry) {
+ Value value = new Value(flags, data);
+ cache.put(key, value, expiry, TimeUnit.MILLISECONDS);
return reply();
}
- protected StorageReply putExpiryUnixTime(String key, Value value, long expiry) {
- cache.put(params.key, value, expiry, TimeUnit.MILLISECONDS);
- return reply();
- }
-
private StorageReply reply() {
return StorageReply.STORED;
}
Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/StorageCommand.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/StorageCommand.java 2009-12-02 20:18:03 UTC (rev 1248)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/StorageCommand.java 2009-12-03 18:57:23 UTC (rev 1249)
@@ -64,11 +64,11 @@
public static Command newStorageCommand(Cache cache, CommandType type, StorageParameters params, byte[] data) throws IOException {
switch(type) {
- case SET: return new SetCommand(cache, params, data);
- case ADD: return new AddCommand(cache, params, data);
- case REPLACE: return new ReplaceCommand(cache, params, data);
- case APPEND: return new AppendCommand(cache, params, data);
- case PREPEND: return new PrependCommand(cache, params, data);
+ case SET: return new SetCommand(cache, type, params, data);
+ case ADD: return new AddCommand(cache, type, params, data);
+ case REPLACE: return new ReplaceCommand(cache, type, params, data);
+ case APPEND: return new AppendCommand(cache, type, params, data);
+ case PREPEND: return new PrependCommand(cache, type, params, data);
default: throw new StreamCorruptedException("Unable to build storage command for type: " + type);
}
}
Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/Value.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/Value.java 2009-12-02 20:18:03 UTC (rev 1248)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/Value.java 2009-12-03 18:57:23 UTC (rev 1249)
@@ -37,10 +37,17 @@
class Value implements Externalizable {
private int flags;
private byte[] data;
+ private long cas;
Value(int flags, byte[] data) {
this.flags = flags;
this.data = data;
+ // Since nano time is an offset from an arbitrary time, it is very unlikely that
+ // two threads running on different VMs will generate the same value, so even two
+ // modifications on the same key on different VMs will generate different cas,
+ // making it a safe option for cas unique id.
+ // Also, using nano time avoid issues after expiration and eviction.
+ cas = System.nanoTime();
}
public int getFlags() {
@@ -51,6 +58,10 @@
return data;
}
+ public long getCas() {
+ return cas;
+ }
+
@Override
public boolean equals(Object obj) {
if (obj == this)
@@ -58,7 +69,9 @@
if (!(obj instanceof Value))
return false;
Value other = (Value) obj;
- return Arrays.equals(data, other.data) && flags == other.flags;
+ return Arrays.equals(data, other.data)
+ && flags == other.flags
+ && cas == other.cas;
}
@Override
@@ -66,6 +79,7 @@
int result = 17;
result = 31 * result + flags;
result = 31 * result + data.hashCode();
+ result = 31 * result + (int)(cas ^ (cas >>> 32));
return result;
}
@@ -74,6 +88,7 @@
flags = in.read();
data = new byte[in.read()];
in.read(data);
+ cas = in.readLong();
}
@Override
@@ -81,5 +96,6 @@
out.write(flags);
out.write(data.length);
out.write(data);
+ out.writeLong(cas);
}
}
Modified: trunk/server/memcached/src/test/java/org/infinispan/server/memcached/FunctionalTest.java
===================================================================
--- trunk/server/memcached/src/test/java/org/infinispan/server/memcached/FunctionalTest.java 2009-12-02 20:18:03 UTC (rev 1248)
+++ trunk/server/memcached/src/test/java/org/infinispan/server/memcached/FunctionalTest.java 2009-12-03 18:57:23 UTC (rev 1249)
@@ -29,6 +29,8 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import net.spy.memcached.CASResponse;
+import net.spy.memcached.CASValue;
import net.spy.memcached.DefaultConnectionFactory;
import net.spy.memcached.MemcachedClient;
@@ -67,7 +69,7 @@
public void testBasicSet(Method m) throws Exception {
Future<Boolean> f = client.set(k(m), 0, v(m));
assert f.get(5, TimeUnit.SECONDS);
- assert client.get(k(m)).equals(v(m));
+ assert v(m).equals(client.get(k(m)));
}
public void testSetWithExpirySeconds(Method m) throws Exception {
@@ -201,6 +203,25 @@
assert expected.equals(client.get(k(m)));
}
+ public void testBasicGets(Method m) throws Exception {
+ Future<Boolean> f = client.set(k(m), 0, v(m));
+ assert f.get(5, TimeUnit.SECONDS);
+ CASValue<Object> value = client.gets(k(m));
+ assert v(m).equals(value.getValue());
+ assert value.getCas() != 0;
+ }
+
+ public void testBasicCas(Method m) throws Exception {
+ Future<Boolean> f = client.set(k(m), 0, v(m));
+ assert f.get(5, TimeUnit.SECONDS);
+ CASValue<Object> value = client.gets(k(m));
+ assert v(m).equals(value.getValue());
+ assert value.getCas() != 0;
+
+ CASResponse resp = client.cas(k(m), value.getCas(), v(m, "k1-"));
+ assert CASResponse.OK == resp;
+ }
+
private String k(Method method, String prefix) {
return prefix + method.getName();
}
More information about the infinispan-commits
mailing list