[infinispan-commits] Infinispan SVN: r1268 - in trunk/server/memcached/src: test/java/org/infinispan/server/memcached and 1 other directory.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Wed Dec 9 14:33:18 EST 2009
Author: galder.zamarreno at jboss.com
Date: 2009-12-09 14:33:18 -0500 (Wed, 09 Dec 2009)
New Revision: 1268
Added:
trunk/server/memcached/src/main/java/org/infinispan/server/memcached/DeleteCommand.java
trunk/server/memcached/src/main/java/org/infinispan/server/memcached/DeleteDelayed.java
trunk/server/memcached/src/main/java/org/infinispan/server/memcached/DeleteDelayedEntry.java
trunk/server/memcached/src/main/java/org/infinispan/server/memcached/ErrorReply.java
trunk/server/memcached/src/main/java/org/infinispan/server/memcached/UnknownCommandException.java
Modified:
trunk/server/memcached/src/main/java/org/infinispan/server/memcached/CommandFactory.java
trunk/server/memcached/src/main/java/org/infinispan/server/memcached/CommandType.java
trunk/server/memcached/src/main/java/org/infinispan/server/memcached/MemcachedTextServer.java
trunk/server/memcached/src/main/java/org/infinispan/server/memcached/SetCommand.java
trunk/server/memcached/src/main/java/org/infinispan/server/memcached/TextCommandDecoder.java
trunk/server/memcached/src/main/java/org/infinispan/server/memcached/TextProtocolPipelineFactory.java
trunk/server/memcached/src/main/java/org/infinispan/server/memcached/TextProtocolUtil.java
trunk/server/memcached/src/test/java/org/infinispan/server/memcached/FunctionalTest.java
Log:
[ISPN-173] (Build memcached server module) Added error handling and partially implemented delete command. Further testing for delete needed.
Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/CommandFactory.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/CommandFactory.java 2009-12-09 18:40:58 UTC (rev 1267)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/CommandFactory.java 2009-12-09 19:33:18 UTC (rev 1268)
@@ -27,6 +27,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.infinispan.Cache;
@@ -43,9 +44,11 @@
private static final Log log = LogFactory.getLog(CommandFactory.class);
private final Cache cache;
+ private final BlockingQueue<DeleteDelayedEntry> queue;
- public CommandFactory(Cache cache) {
+ public CommandFactory(Cache cache, BlockingQueue<DeleteDelayedEntry> queue) {
this.cache = cache;
+ this.queue = queue;
}
public Command createCommand(String line) throws IOException {
@@ -65,39 +68,51 @@
case REPLACE:
case APPEND:
case PREPEND:
+ return StorageCommand.newStorageCommand(cache, type, getStorageParameters(args), null);
case CAS:
- String key = args[1]; // key
- if(key == null) throw new EOFException();
-
- tmp = args[2]; // flags
- if(tmp == null) throw new EOFException();
- int flags = Integer.parseInt(tmp);
-
- tmp = args[3]; // expiry time
- if(tmp == null) throw new EOFException();
- long expiry = Long.parseLong(tmp); // seconds
-
- tmp = args[4]; // number of bytes
- if(tmp == null) throw new EOFException();
- int bytes = Integer.parseInt(tmp);
-
- StorageParameters storage = new StorageParameters(key, flags, expiry, bytes);
-
- if (type == CommandType.CAS) {
- tmp = args[5]; // cas unique, 64-bit integer
- long cas = Long.parseLong(tmp);
- return CasCommand.newCasCommand(cache, storage, cas, null);
- }
-
- return StorageCommand.newStorageCommand(cache, type, storage, null);
+ tmp = args[5]; // cas unique, 64-bit integer
+ long cas = Long.parseLong(tmp);
+ return CasCommand.newCasCommand(cache, getStorageParameters(args), cas, null);
case GET:
case GETS:
List<String> keys = new ArrayList<String>(5);
keys.addAll(Arrays.asList(args).subList(1, args.length));
return RetrievalCommand.newRetrievalCommand(cache, type, new RetrievalParameters(keys));
+ case DELETE:
+ String key = getKey(args[1]);
+ long time = getOptionalTime(args[2]);
+ return DeleteCommand.newDeleteCommand(cache, key, time, queue);
default:
return null;
}
}
+ private StorageParameters getStorageParameters(String[] args) throws IOException {
+ return new StorageParameters(getKey(args[1]), getFlags(args[2]), getExpiry(args[3]), getBytes(args[4]));
+ }
+
+ private String getKey(String key) throws IOException {
+ if (key == null) throw new EOFException();
+ return key;
+ }
+
+ private int getFlags(String flags) throws IOException {
+ if (flags == null) throw new EOFException();
+ return Integer.parseInt(flags);
+ }
+
+ private long getExpiry(String expiry) throws IOException {
+ if (expiry == null) throw new EOFException();
+ return Long.parseLong(expiry); // seconds
+ }
+
+ private int getBytes(String bytes) throws IOException {
+ if (bytes == null) throw new EOFException();
+ return Integer.parseInt(bytes);
+ }
+
+ private long getOptionalTime(String time) {
+ if (time == null) return 0;
+ return Long.parseLong(time); // seconds
+ }
}
Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/CommandType.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/CommandType.java 2009-12-09 18:40:58 UTC (rev 1267)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/CommandType.java 2009-12-09 19:33:18 UTC (rev 1268)
@@ -77,7 +77,7 @@
else if(type.equals(CommandType.FLUSH_ALL.toString())) return FLUSH_ALL;
else if(type.equals(CommandType.VERSION.toString())) return VERSION;
else if(type.equals(CommandType.QUIT.toString())) return QUIT;
- else throw new StreamCorruptedException("request \"" + type + "\" not known");
+ else throw new UnknownCommandException("request \"" + type + "\" not known");
}
Added: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/DeleteCommand.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/DeleteCommand.java (rev 0)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/DeleteCommand.java 2009-12-09 19:33:18 UTC (rev 1268)
@@ -0,0 +1,68 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat, Inc. and/or its affiliates, and
+ * individual contributors as indicated by the @author tags. See the
+ * copyright.txt file in the distribution for a full listing of
+ * individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.server.memcached;
+
+import java.util.concurrent.BlockingQueue;
+
+import org.infinispan.Cache;
+import org.jboss.netty.channel.Channel;
+
+/**
+ * DeleteCommand.
+ *
+ * @author Galder Zamarreño
+ * @since 4.0
+ */
+public class DeleteCommand implements Command {
+
+ final Cache cache;
+ final String key;
+ final long time;
+ final BlockingQueue<DeleteDelayedEntry> queue;
+
+ DeleteCommand(Cache cache, String key, long time, BlockingQueue<DeleteDelayedEntry> queue) {
+ this.cache = cache;
+ this.key = key;
+ this.time = time;
+ this.queue = queue;
+ }
+
+ @Override
+ public CommandType getType() {
+ return CommandType.DELETE;
+ }
+
+ @Override
+ public Object perform(Channel ch) throws Exception {
+ if (time > 0) {
+ queue.offer(new DeleteDelayedEntry(key, time));
+ } else {
+ cache.remove(key);
+ }
+ return null;
+ }
+
+ public static DeleteCommand newDeleteCommand(Cache cache, String key, long time, BlockingQueue<DeleteDelayedEntry> queue) {
+ return new DeleteCommand(cache, key, time, queue);
+ }
+}
Added: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/DeleteDelayed.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/DeleteDelayed.java (rev 0)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/DeleteDelayed.java 2009-12-09 19:33:18 UTC (rev 1268)
@@ -0,0 +1,59 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat, Inc. and/or its affiliates, and
+ * individual contributors as indicated by the @author tags. See the
+ * copyright.txt file in the distribution for a full listing of
+ * individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.server.memcached;
+
+import java.util.concurrent.BlockingQueue;
+
+import org.infinispan.Cache;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+
+/**
+ * DelayedDelete.
+ *
+ * @author Galder Zamarreño
+ * @since 4.0
+ */
+public class DeleteDelayed implements Runnable {
+ private static final Log log = LogFactory.getLog(DeleteDelayed.class);
+
+ private final Cache cache;
+ private final BlockingQueue<DeleteDelayedEntry> queue;
+
+ DeleteDelayed(Cache cache, BlockingQueue<DeleteDelayedEntry> queue) {
+ this.queue = queue;
+ this.cache = cache;
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (!Thread.currentThread().isInterrupted()) {
+ DeleteDelayedEntry entry = queue.take();
+ cache.remove(entry.key);
+ }
+ } catch (InterruptedException e) {
+ log.debug("Interrupted, so allow thread to exit"); /* Allow thread to exit */
+ }
+ }
+}
Added: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/DeleteDelayedEntry.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/DeleteDelayedEntry.java (rev 0)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/DeleteDelayedEntry.java 2009-12-09 19:33:18 UTC (rev 1268)
@@ -0,0 +1,68 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat, Inc. and/or its affiliates, and
+ * individual contributors as indicated by the @author tags. See the
+ * copyright.txt file in the distribution for a full listing of
+ * individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.server.memcached;
+
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * DelayedDeleteEntry.
+ *
+ * @author Galder Zamarreño
+ * @since 4.0
+ */
+public class DeleteDelayedEntry implements Delayed {
+
+ final String key;
+ private final long time;
+ private final boolean isUnix;
+
+ DeleteDelayedEntry(String key, long time) {
+ this.time = time;
+ this.key = key;
+ this.isUnix = time > TextProtocolUtil.SECONDS_IN_A_MONTH;
+ }
+
+ @Override
+ public long getDelay(TimeUnit unit) {
+ long now = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
+ return unit.convert(time - now, TimeUnit.SECONDS);
+ }
+
+ @Override
+ public int compareTo(Delayed o) {
+ if (o == this)
+ return 0;
+
+ if (o instanceof DeleteDelayedEntry) {
+ DeleteDelayedEntry x = (DeleteDelayedEntry) o;
+ long diff = time - x.time;
+ if (diff < 0) return -1;
+ else if (diff > 0) return 1;
+ else return 0;
+ } else {
+ throw new ClassCastException(o.getClass() + " is not of type " + DeleteDelayedEntry.class);
+ }
+ }
+
+}
Added: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/ErrorReply.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/ErrorReply.java (rev 0)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/ErrorReply.java 2009-12-09 19:33:18 UTC (rev 1268)
@@ -0,0 +1,33 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat, Inc. and/or its affiliates, and
+ * individual contributors as indicated by the @author tags. See the
+ * copyright.txt file in the distribution for a full listing of
+ * individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.server.memcached;
+
+/**
+ * ErrorReply.
+ *
+ * @author Galder Zamarreño
+ * @since 4.0
+ */
+public enum ErrorReply {
+ ERROR, CLIENT_ERROR, SERVER_ERROR;
+}
Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/MemcachedTextServer.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/MemcachedTextServer.java 2009-12-09 18:40:58 UTC (rev 1267)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/MemcachedTextServer.java 2009-12-09 19:33:18 UTC (rev 1268)
@@ -23,6 +23,9 @@
package org.infinispan.server.memcached;
import java.net.InetSocketAddress;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.infinispan.Cache;
@@ -40,19 +43,31 @@
*/
class MemcachedTextServer {
final CacheManager manager;
+ final ExecutorService delayedExecutor;
MemcachedTextServer(CacheManager manager) {
this.manager = manager;
+ this.delayedExecutor = Executors.newSingleThreadExecutor();
}
public void start() {
// Configure Infinispan Cache instance
Cache cache = manager.getCache();
+ // Create delaye queue for delayed deletes and start thread
+ BlockingQueue<DeleteDelayedEntry> queue = new DelayQueue<DeleteDelayedEntry>();
+ DeleteDelayed runnable = new DeleteDelayed(cache, queue);
+ delayedExecutor.submit(runnable);
+
// Configure the server.
ChannelFactory factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
ServerBootstrap bootstrap = new ServerBootstrap(factory);
- bootstrap.setPipelineFactory(new TextProtocolPipelineFactory(cache));
+ bootstrap.setPipelineFactory(new TextProtocolPipelineFactory(cache, queue));
bootstrap.bind(new InetSocketAddress(11211));
}
+
+ public void stop() {
+ manager.stop();
+ delayedExecutor.shutdown();
+ }
}
Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/SetCommand.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/SetCommand.java 2009-12-09 18:40:58 UTC (rev 1267)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/SetCommand.java 2009-12-09 19:33:18 UTC (rev 1268)
@@ -55,7 +55,7 @@
if (params.expiry == 0) {
reply = put(params.key, params.flags, data);
} else {
- if (params.expiry > 60*60*24*30) {
+ if (params.expiry > TextProtocolUtil.SECONDS_IN_A_MONTH) {
// If expiry bigger number of seconds in 30 days, then it's considered unix time
long future = TimeUnit.SECONDS.toMillis(params.expiry);
long expiry = future - System.currentTimeMillis();
Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/TextCommandDecoder.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/TextCommandDecoder.java 2009-12-09 18:40:58 UTC (rev 1267)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/TextCommandDecoder.java 2009-12-09 19:33:18 UTC (rev 1268)
@@ -24,6 +24,7 @@
import java.io.IOException;
import java.io.StreamCorruptedException;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.infinispan.Cache;
@@ -35,6 +36,7 @@
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.handler.codec.replay.ReplayingDecoder;
import static org.infinispan.server.memcached.TextProtocolUtil.*;
+import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer;
/**
* TextCommandDecoder.
@@ -47,15 +49,15 @@
private final CommandFactory factory;
private volatile Command command;
- private final AtomicBoolean corrupted = new AtomicBoolean();
+// private final AtomicBoolean corrupted = new AtomicBoolean();
protected enum State {
READ_COMMAND, READ_UNSTRUCTURED_DATA;
}
- TextCommandDecoder(Cache cache) {
+ TextCommandDecoder(Cache cache, BlockingQueue<DeleteDelayedEntry> queue) {
super(State.READ_COMMAND, true);
- factory = new CommandFactory(cache);
+ factory = new CommandFactory(cache, queue);
}
@Override
@@ -63,15 +65,15 @@
switch (state) {
case READ_COMMAND:
String line = readLine(buffer);
- try {
+// try {
command = factory.createCommand(line);
- corrupted.set(false);
- } catch (IOException ioe) {
- if (corrupted.get())
- log.debug("Channel is corrupted and we're reading garbage, ignore read until we find a good command again");
- else
- throw ioe;
- }
+// corrupted.set(false);
+// } catch (IOException ioe) {
+// if (corrupted.get())
+// log.debug("Channel is corrupted and we're reading garbage, ignore read until we find a good command again");
+// else
+// throw ioe;
+// }
if (command.getType().isStorage())
checkpoint(State.READ_UNSTRUCTURED_DATA);
@@ -104,8 +106,21 @@
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
- corrupted.compareAndSet(false, true);
- log.error("Unexpected exception", e.getCause());
+// corrupted.compareAndSet(false, true);
+ Throwable t = e.getCause();
+ log.error("Unexpected exception", t);
+ Channel ch = ctx.getChannel();
+ if (t instanceof UnknownCommandException) {
+ ch.write(wrappedBuffer(wrappedBuffer(ErrorReply.ERROR.toString().getBytes()), wrappedBuffer(CRLF)));
+ } else if (t instanceof IOException) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(ErrorReply.CLIENT_ERROR).append(' ').append(t);
+ ch.write(wrappedBuffer(wrappedBuffer(sb.toString().getBytes()), wrappedBuffer(CRLF)));
+ } else {
+ StringBuilder sb = new StringBuilder();
+ sb.append(ErrorReply.SERVER_ERROR).append(' ').append(t);
+ ch.write(wrappedBuffer(wrappedBuffer(sb.toString().getBytes()), wrappedBuffer(CRLF)));
+ }
}
private Object reset(Command c) {
Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/TextProtocolPipelineFactory.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/TextProtocolPipelineFactory.java 2009-12-09 18:40:58 UTC (rev 1267)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/TextProtocolPipelineFactory.java 2009-12-09 19:33:18 UTC (rev 1268)
@@ -24,10 +24,13 @@
import static org.jboss.netty.channel.Channels.*;
+import java.util.concurrent.BlockingQueue;
+
import org.infinispan.Cache;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.handler.codec.replay.ReplayingDecoder;
/**
* TextProtocolPipelineFactory.
@@ -43,18 +46,23 @@
// this.handler = handler;
// }
- private final Cache cache;
+// private final Cache cache;
+// private final BlockingQueue<DelayedDeleteEntry> queue;
+
+ private final ReplayingDecoder<TextCommandDecoder.State> decoder;
+ private final ChannelHandler handler;
- public TextProtocolPipelineFactory(Cache cache) {
- this.cache = cache;
+ public TextProtocolPipelineFactory(Cache cache, BlockingQueue<DeleteDelayedEntry> queue) {
+ this.decoder = new TextCommandDecoder(cache, queue);
+ this.handler = new TextCommandHandler();
}
@Override
public ChannelPipeline getPipeline() throws Exception {
// Create a default pipeline implementation.
ChannelPipeline pipeline = pipeline();
- pipeline.addLast("decoder", new TextCommandDecoder(cache));
- pipeline.addLast("handler", new TextCommandHandler());
+ pipeline.addLast("decoder", decoder);
+ pipeline.addLast("handler", handler);
// pipeline.addLast("encoder", new TextResponseEncoder());
return pipeline;
Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/TextProtocolUtil.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/TextProtocolUtil.java 2009-12-09 18:40:58 UTC (rev 1267)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/TextProtocolUtil.java 2009-12-09 19:33:18 UTC (rev 1268)
@@ -32,6 +32,7 @@
static final byte CR = 13;
static final byte LF = 10;
static final byte[] CRLF = new byte[] { CR, LF };
+ static final long SECONDS_IN_A_MONTH = 60*60*24*30;
public static byte[] concat(byte[] a, byte[] b) {
byte[] data = new byte[a.length + b.length];
Added: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/UnknownCommandException.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/UnknownCommandException.java (rev 0)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/UnknownCommandException.java 2009-12-09 19:33:18 UTC (rev 1268)
@@ -0,0 +1,46 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat, Inc. and/or its affiliates, and
+ * individual contributors as indicated by the @author tags. See the
+ * copyright.txt file in the distribution for a full listing of
+ * individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.server.memcached;
+
+import java.io.StreamCorruptedException;
+
+/**
+ * UnknownCommandException.
+ *
+ * @author Galder Zamarreño
+ * @since 4.0
+ */
+public class UnknownCommandException extends StreamCorruptedException {
+
+ /** The serialVersionUID */
+ private static final long serialVersionUID = 7677317727970191637L;
+
+ public UnknownCommandException() {
+ super();
+ }
+
+ public UnknownCommandException(String reason) {
+ super(reason);
+ }
+
+}
Modified: trunk/server/memcached/src/test/java/org/infinispan/server/memcached/FunctionalTest.java
===================================================================
--- trunk/server/memcached/src/test/java/org/infinispan/server/memcached/FunctionalTest.java 2009-12-09 18:40:58 UTC (rev 1267)
+++ trunk/server/memcached/src/test/java/org/infinispan/server/memcached/FunctionalTest.java 2009-12-09 19:33:18 UTC (rev 1268)
@@ -36,7 +36,10 @@
import org.infinispan.manager.CacheManager;
import org.infinispan.test.SingleCacheManagerTest;
+import org.infinispan.test.TestingUtil;
+import org.infinispan.test.AbstractCacheTest.CleanupPhase;
import org.infinispan.test.fwk.TestCacheManagerFactory;
+import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;
/**
@@ -58,8 +61,8 @@
DefaultConnectionFactory d = new DefaultConnectionFactory() {
@Override
public long getOperationTimeout() {
- return 360000;
- // return 5000;
+ // return 360000;
+ return 5000;
}
};
@@ -67,6 +70,11 @@
return cacheManager;
}
+ @AfterClass(alwaysRun=true)
+ protected void destroyAfterClass() {
+ server.stop();
+ }
+
public void testBasicSet(Method m) throws Exception {
Future<Boolean> f = client.set(k(m), 0, v(m));
assert f.get(5, TimeUnit.SECONDS);
@@ -214,7 +222,7 @@
public void testBasicCas(Method m) throws Exception {
Future<Boolean> f = client.set(k(m), 0, v(m));
- assert f.get(5, TimeUnit.MINUTES);
+ assert f.get(5, TimeUnit.SECONDS);
CASValue<Object> value = client.gets(k(m));
assert v(m).equals(value.getValue());
assert value.getCas() != 0;
@@ -225,7 +233,7 @@
public void testCasNotFound(Method m) throws Exception {
Future<Boolean> f = client.set(k(m), 0, v(m));
- assert f.get(5, TimeUnit.MINUTES);
+ assert f.get(5, TimeUnit.SECONDS);
CASValue<Object> value = client.gets(k(m));
assert v(m).equals(value.getValue());
assert value.getCas() != 0;
@@ -255,6 +263,11 @@
assert CASResponse.OK == resp;
}
+ public void testBasicDelete(Method m) throws Exception {
+ Future<Boolean> f = client.delete(k(m));
+ assert f.get(5, TimeUnit.SECONDS);
+ }
+
private String k(Method method, String prefix) {
return prefix + method.getName();
}
More information about the infinispan-commits
mailing list