[infinispan-commits] Infinispan SVN: r1268 - in trunk/server/memcached/src: test/java/org/infinispan/server/memcached and 1 other directory.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Wed Dec 9 14:33:18 EST 2009


Author: galder.zamarreno at jboss.com
Date: 2009-12-09 14:33:18 -0500 (Wed, 09 Dec 2009)
New Revision: 1268

Added:
   trunk/server/memcached/src/main/java/org/infinispan/server/memcached/DeleteCommand.java
   trunk/server/memcached/src/main/java/org/infinispan/server/memcached/DeleteDelayed.java
   trunk/server/memcached/src/main/java/org/infinispan/server/memcached/DeleteDelayedEntry.java
   trunk/server/memcached/src/main/java/org/infinispan/server/memcached/ErrorReply.java
   trunk/server/memcached/src/main/java/org/infinispan/server/memcached/UnknownCommandException.java
Modified:
   trunk/server/memcached/src/main/java/org/infinispan/server/memcached/CommandFactory.java
   trunk/server/memcached/src/main/java/org/infinispan/server/memcached/CommandType.java
   trunk/server/memcached/src/main/java/org/infinispan/server/memcached/MemcachedTextServer.java
   trunk/server/memcached/src/main/java/org/infinispan/server/memcached/SetCommand.java
   trunk/server/memcached/src/main/java/org/infinispan/server/memcached/TextCommandDecoder.java
   trunk/server/memcached/src/main/java/org/infinispan/server/memcached/TextProtocolPipelineFactory.java
   trunk/server/memcached/src/main/java/org/infinispan/server/memcached/TextProtocolUtil.java
   trunk/server/memcached/src/test/java/org/infinispan/server/memcached/FunctionalTest.java
Log:
[ISPN-173] (Build memcached server module) Added error handling and partially implemented delete command. Further testing for delete needed.

Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/CommandFactory.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/CommandFactory.java	2009-12-09 18:40:58 UTC (rev 1267)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/CommandFactory.java	2009-12-09 19:33:18 UTC (rev 1268)
@@ -27,6 +27,7 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 import org.infinispan.Cache;
@@ -43,9 +44,11 @@
    private static final Log log = LogFactory.getLog(CommandFactory.class);
 
    private final Cache cache;
+   private final BlockingQueue<DeleteDelayedEntry> queue;
    
-   public CommandFactory(Cache cache) {
+   public CommandFactory(Cache cache, BlockingQueue<DeleteDelayedEntry> queue) {
       this.cache = cache;
+      this.queue = queue;
    }
 
    public Command createCommand(String line) throws IOException {
@@ -65,39 +68,51 @@
          case REPLACE:
          case APPEND:
          case PREPEND:
+            return StorageCommand.newStorageCommand(cache, type, getStorageParameters(args), null);
          case CAS:
-            String key = args[1]; // key
-            if(key == null) throw new EOFException();
-
-            tmp = args[2]; // flags
-            if(tmp == null) throw new EOFException();
-            int flags = Integer.parseInt(tmp);
-
-            tmp = args[3]; // expiry time
-            if(tmp == null) throw new EOFException();
-            long expiry = Long.parseLong(tmp); // seconds
-
-            tmp = args[4]; // number of bytes
-            if(tmp == null) throw new EOFException();
-            int bytes = Integer.parseInt(tmp);
-
-            StorageParameters storage = new StorageParameters(key, flags, expiry, bytes);
-
-            if (type == CommandType.CAS) {
-               tmp = args[5]; // cas unique, 64-bit integer
-               long cas = Long.parseLong(tmp);
-               return CasCommand.newCasCommand(cache, storage, cas, null);
-            }
-
-            return StorageCommand.newStorageCommand(cache, type, storage, null);
+            tmp = args[5]; // cas unique, 64-bit integer
+            long cas = Long.parseLong(tmp);
+            return CasCommand.newCasCommand(cache, getStorageParameters(args), cas, null);
          case GET:
          case GETS:
             List<String> keys = new ArrayList<String>(5);
             keys.addAll(Arrays.asList(args).subList(1, args.length));
             return RetrievalCommand.newRetrievalCommand(cache, type, new RetrievalParameters(keys));
+         case DELETE:
+            String key = getKey(args[1]);
+            long time = getOptionalTime(args[2]);
+            return DeleteCommand.newDeleteCommand(cache, key, time, queue);
          default:
             return null;
       }
    }
 
+   private StorageParameters getStorageParameters(String[] args) throws IOException {
+      return new StorageParameters(getKey(args[1]), getFlags(args[2]), getExpiry(args[3]), getBytes(args[4]));
+   }
+
+   private String getKey(String key) throws IOException {
+      if (key == null) throw new EOFException();
+      return key;
+   }
+
+   private int getFlags(String flags) throws IOException {
+      if (flags == null) throw new EOFException();
+      return Integer.parseInt(flags);
+   }
+
+   private long getExpiry(String expiry) throws IOException {
+      if (expiry == null) throw new EOFException();
+      return Long.parseLong(expiry); // seconds
+   }
+
+   private int getBytes(String bytes) throws IOException {
+      if (bytes == null) throw new EOFException();
+      return Integer.parseInt(bytes);
+   }
+
+   private long getOptionalTime(String time) {
+      if (time == null) return 0;
+      return Long.parseLong(time); // seconds
+   }
 }

Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/CommandType.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/CommandType.java	2009-12-09 18:40:58 UTC (rev 1267)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/CommandType.java	2009-12-09 19:33:18 UTC (rev 1268)
@@ -77,7 +77,7 @@
      else if(type.equals(CommandType.FLUSH_ALL.toString())) return FLUSH_ALL;
      else if(type.equals(CommandType.VERSION.toString())) return VERSION;
      else if(type.equals(CommandType.QUIT.toString())) return QUIT;
-     else throw new StreamCorruptedException("request \"" + type + "\" not known");
+     else throw new UnknownCommandException("request \"" + type + "\" not known");
    }
 
 

Added: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/DeleteCommand.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/DeleteCommand.java	                        (rev 0)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/DeleteCommand.java	2009-12-09 19:33:18 UTC (rev 1268)
@@ -0,0 +1,68 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat, Inc. and/or its affiliates, and
+ * individual contributors as indicated by the @author tags. See the
+ * copyright.txt file in the distribution for a full listing of
+ * individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.server.memcached;
+
+import java.util.concurrent.BlockingQueue;
+
+import org.infinispan.Cache;
+import org.jboss.netty.channel.Channel;
+
+/**
+ * DeleteCommand.
+ * 
+ * @author Galder Zamarreño
+ * @since 4.0
+ */
+public class DeleteCommand implements Command {
+
+   final Cache cache;
+   final String key;
+   final long time;
+   final BlockingQueue<DeleteDelayedEntry> queue;
+
+   DeleteCommand(Cache cache, String key, long time, BlockingQueue<DeleteDelayedEntry> queue) {
+      this.cache = cache;
+      this.key = key;
+      this.time = time;
+      this.queue = queue;
+   }
+
+   @Override
+   public CommandType getType() {
+      return CommandType.DELETE;
+   }
+
+   @Override
+   public Object perform(Channel ch) throws Exception {
+      if (time > 0) {
+         queue.offer(new DeleteDelayedEntry(key, time));
+      } else {
+         cache.remove(key);
+      }
+      return null;
+   }
+
+   public static DeleteCommand newDeleteCommand(Cache cache, String key, long time, BlockingQueue<DeleteDelayedEntry> queue) {
+      return new DeleteCommand(cache, key, time, queue);
+   }
+}

Added: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/DeleteDelayed.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/DeleteDelayed.java	                        (rev 0)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/DeleteDelayed.java	2009-12-09 19:33:18 UTC (rev 1268)
@@ -0,0 +1,59 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat, Inc. and/or its affiliates, and
+ * individual contributors as indicated by the @author tags. See the
+ * copyright.txt file in the distribution for a full listing of
+ * individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.server.memcached;
+
+import java.util.concurrent.BlockingQueue;
+
+import org.infinispan.Cache;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+
+/**
+ * DelayedDelete.
+ * 
+ * @author Galder Zamarreño
+ * @since 4.0
+ */
+public class DeleteDelayed implements Runnable {
+   private static final Log log = LogFactory.getLog(DeleteDelayed.class);
+
+   private final Cache cache;
+   private final BlockingQueue<DeleteDelayedEntry> queue;
+
+   DeleteDelayed(Cache cache, BlockingQueue<DeleteDelayedEntry> queue) {
+      this.queue = queue;
+      this.cache = cache;
+   }
+
+   @Override
+   public void run() {
+      try {
+         while (!Thread.currentThread().isInterrupted()) {
+            DeleteDelayedEntry entry = queue.take();
+            cache.remove(entry.key);
+         }
+      } catch (InterruptedException e) {
+         log.debug("Interrupted, so allow thread to exit"); /*  Allow thread to exit  */
+      }
+   }
+}

Added: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/DeleteDelayedEntry.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/DeleteDelayedEntry.java	                        (rev 0)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/DeleteDelayedEntry.java	2009-12-09 19:33:18 UTC (rev 1268)
@@ -0,0 +1,68 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat, Inc. and/or its affiliates, and
+ * individual contributors as indicated by the @author tags. See the
+ * copyright.txt file in the distribution for a full listing of
+ * individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.server.memcached;
+
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * DelayedDeleteEntry.
+ * 
+ * @author Galder Zamarreño
+ * @since 4.0
+ */
+public class DeleteDelayedEntry implements Delayed {
+
+   final String key;
+   private final long time;
+   private final boolean isUnix;
+
+   DeleteDelayedEntry(String key, long time) {
+      this.time = time;
+      this.key = key;
+      this.isUnix = time > TextProtocolUtil.SECONDS_IN_A_MONTH;
+   }
+
+   @Override
+   public long getDelay(TimeUnit unit) {
+      long now = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
+      return unit.convert(time - now, TimeUnit.SECONDS);
+   }
+
+   @Override
+   public int compareTo(Delayed o) {
+      if (o == this)
+         return 0;
+
+      if (o instanceof DeleteDelayedEntry) {
+         DeleteDelayedEntry x = (DeleteDelayedEntry) o;
+         long diff = time - x.time;
+         if (diff < 0) return -1;
+         else if (diff > 0) return 1;
+         else return 0;
+      } else {
+         throw new ClassCastException(o.getClass() + " is not of type " + DeleteDelayedEntry.class);
+      }
+   }
+
+}

Added: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/ErrorReply.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/ErrorReply.java	                        (rev 0)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/ErrorReply.java	2009-12-09 19:33:18 UTC (rev 1268)
@@ -0,0 +1,33 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat, Inc. and/or its affiliates, and
+ * individual contributors as indicated by the @author tags. See the
+ * copyright.txt file in the distribution for a full listing of
+ * individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.server.memcached;
+
+/**
+ * ErrorReply.
+ * 
+ * @author Galder Zamarreño
+ * @since 4.0
+ */
+public enum ErrorReply {
+   ERROR, CLIENT_ERROR, SERVER_ERROR;
+}

Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/MemcachedTextServer.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/MemcachedTextServer.java	2009-12-09 18:40:58 UTC (rev 1267)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/MemcachedTextServer.java	2009-12-09 19:33:18 UTC (rev 1268)
@@ -23,6 +23,9 @@
 package org.infinispan.server.memcached;
 
 import java.net.InetSocketAddress;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
 import org.infinispan.Cache;
@@ -40,19 +43,31 @@
  */
 class MemcachedTextServer {
    final CacheManager manager;
+   final ExecutorService delayedExecutor;
    
    MemcachedTextServer(CacheManager manager) {
       this.manager = manager;
+      this.delayedExecutor = Executors.newSingleThreadExecutor();
    }
 
    public void start() {
       // Configure Infinispan Cache instance
       Cache cache = manager.getCache();
 
+      // Create delaye queue for delayed deletes and start thread
+      BlockingQueue<DeleteDelayedEntry> queue = new DelayQueue<DeleteDelayedEntry>();
+      DeleteDelayed runnable = new DeleteDelayed(cache, queue);
+      delayedExecutor.submit(runnable);
+
       // Configure the server.
       ChannelFactory factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
       ServerBootstrap bootstrap = new ServerBootstrap(factory);
-      bootstrap.setPipelineFactory(new TextProtocolPipelineFactory(cache));
+      bootstrap.setPipelineFactory(new TextProtocolPipelineFactory(cache, queue));
       bootstrap.bind(new InetSocketAddress(11211));
    }
+
+   public void stop() {
+      manager.stop();
+      delayedExecutor.shutdown();
+   }
 }

Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/SetCommand.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/SetCommand.java	2009-12-09 18:40:58 UTC (rev 1267)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/SetCommand.java	2009-12-09 19:33:18 UTC (rev 1268)
@@ -55,7 +55,7 @@
          if (params.expiry == 0) {
             reply = put(params.key, params.flags, data);
          } else {
-            if (params.expiry > 60*60*24*30) {
+            if (params.expiry > TextProtocolUtil.SECONDS_IN_A_MONTH) {
                // If expiry bigger number of seconds in 30 days, then it's considered unix time
                long future = TimeUnit.SECONDS.toMillis(params.expiry);
                long expiry = future - System.currentTimeMillis();

Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/TextCommandDecoder.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/TextCommandDecoder.java	2009-12-09 18:40:58 UTC (rev 1267)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/TextCommandDecoder.java	2009-12-09 19:33:18 UTC (rev 1268)
@@ -24,6 +24,7 @@
 
 import java.io.IOException;
 import java.io.StreamCorruptedException;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.infinispan.Cache;
@@ -35,6 +36,7 @@
 import org.jboss.netty.channel.ExceptionEvent;
 import org.jboss.netty.handler.codec.replay.ReplayingDecoder;
 import static org.infinispan.server.memcached.TextProtocolUtil.*;
+import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer;
 
 /**
  * TextCommandDecoder.
@@ -47,15 +49,15 @@
    
    private final CommandFactory factory;
    private volatile Command command;
-   private final AtomicBoolean corrupted = new AtomicBoolean();
+//   private final AtomicBoolean corrupted = new AtomicBoolean();
 
    protected enum State {
       READ_COMMAND, READ_UNSTRUCTURED_DATA;
    }
 
-   TextCommandDecoder(Cache cache) {
+   TextCommandDecoder(Cache cache, BlockingQueue<DeleteDelayedEntry> queue) {
       super(State.READ_COMMAND, true);
-      factory = new CommandFactory(cache);
+      factory = new CommandFactory(cache, queue);
    }
 
    @Override
@@ -63,15 +65,15 @@
       switch (state) {
          case READ_COMMAND:
             String line = readLine(buffer);
-            try {
+//            try {
                command = factory.createCommand(line);
-               corrupted.set(false);
-            } catch (IOException ioe) {
-               if (corrupted.get())
-                  log.debug("Channel is corrupted and we're reading garbage, ignore read until we find a good command again");
-               else
-                  throw ioe;
-            }
+//               corrupted.set(false);
+//            } catch (IOException ioe) {
+//               if (corrupted.get())
+//                  log.debug("Channel is corrupted and we're reading garbage, ignore read until we find a good command again");
+//               else
+//                  throw ioe;
+//            }
             
             if (command.getType().isStorage())
                checkpoint(State.READ_UNSTRUCTURED_DATA);
@@ -104,8 +106,21 @@
 
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
-      corrupted.compareAndSet(false, true);
-      log.error("Unexpected exception", e.getCause());
+//      corrupted.compareAndSet(false, true);
+      Throwable t = e.getCause();
+      log.error("Unexpected exception", t);
+      Channel ch = ctx.getChannel();
+      if (t instanceof UnknownCommandException) {
+         ch.write(wrappedBuffer(wrappedBuffer(ErrorReply.ERROR.toString().getBytes()), wrappedBuffer(CRLF)));
+      } else if (t instanceof IOException) {
+         StringBuilder sb = new StringBuilder();
+         sb.append(ErrorReply.CLIENT_ERROR).append(' ').append(t);
+         ch.write(wrappedBuffer(wrappedBuffer(sb.toString().getBytes()), wrappedBuffer(CRLF)));
+      } else {
+         StringBuilder sb = new StringBuilder();
+         sb.append(ErrorReply.SERVER_ERROR).append(' ').append(t);
+         ch.write(wrappedBuffer(wrappedBuffer(sb.toString().getBytes()), wrappedBuffer(CRLF)));
+      }
    }
 
    private Object reset(Command c) {

Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/TextProtocolPipelineFactory.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/TextProtocolPipelineFactory.java	2009-12-09 18:40:58 UTC (rev 1267)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/TextProtocolPipelineFactory.java	2009-12-09 19:33:18 UTC (rev 1268)
@@ -24,10 +24,13 @@
 
 import static org.jboss.netty.channel.Channels.*;
 
+import java.util.concurrent.BlockingQueue;
+
 import org.infinispan.Cache;
 import org.jboss.netty.channel.ChannelHandler;
 import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.handler.codec.replay.ReplayingDecoder;
 
 /**
  * TextProtocolPipelineFactory.
@@ -43,18 +46,23 @@
 //      this.handler = handler;
 //   }
 
-   private final Cache cache;
+//   private final Cache cache;
+//   private final BlockingQueue<DelayedDeleteEntry> queue;
+   
+   private final ReplayingDecoder<TextCommandDecoder.State> decoder;
+   private final ChannelHandler handler;
 
-   public TextProtocolPipelineFactory(Cache cache) {
-      this.cache = cache;
+   public TextProtocolPipelineFactory(Cache cache, BlockingQueue<DeleteDelayedEntry> queue) {
+      this.decoder = new TextCommandDecoder(cache, queue);
+      this.handler = new TextCommandHandler();
    }
 
    @Override
    public ChannelPipeline getPipeline() throws Exception {
       // Create a default pipeline implementation.
       ChannelPipeline pipeline = pipeline();
-      pipeline.addLast("decoder", new TextCommandDecoder(cache));
-      pipeline.addLast("handler", new TextCommandHandler());
+      pipeline.addLast("decoder", decoder);
+      pipeline.addLast("handler", handler);
 //      pipeline.addLast("encoder", new TextResponseEncoder());
 
       return pipeline;

Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/TextProtocolUtil.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/TextProtocolUtil.java	2009-12-09 18:40:58 UTC (rev 1267)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/TextProtocolUtil.java	2009-12-09 19:33:18 UTC (rev 1268)
@@ -32,6 +32,7 @@
    static final byte CR = 13;
    static final byte LF = 10;
    static final byte[] CRLF = new byte[] { CR, LF };
+   static final long SECONDS_IN_A_MONTH = 60*60*24*30;
 
    public static byte[] concat(byte[] a, byte[] b) {
       byte[] data = new byte[a.length + b.length];

Added: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/UnknownCommandException.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/UnknownCommandException.java	                        (rev 0)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/UnknownCommandException.java	2009-12-09 19:33:18 UTC (rev 1268)
@@ -0,0 +1,46 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat, Inc. and/or its affiliates, and
+ * individual contributors as indicated by the @author tags. See the
+ * copyright.txt file in the distribution for a full listing of
+ * individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.server.memcached;
+
+import java.io.StreamCorruptedException;
+
+/**
+ * UnknownCommandException.
+ * 
+ * @author Galder Zamarreño
+ * @since 4.0
+ */
+public class UnknownCommandException extends StreamCorruptedException {
+
+   /** The serialVersionUID */
+   private static final long serialVersionUID = 7677317727970191637L;
+
+   public UnknownCommandException() {
+      super();
+   }
+
+   public UnknownCommandException(String reason) {
+      super(reason);
+   }
+
+}

Modified: trunk/server/memcached/src/test/java/org/infinispan/server/memcached/FunctionalTest.java
===================================================================
--- trunk/server/memcached/src/test/java/org/infinispan/server/memcached/FunctionalTest.java	2009-12-09 18:40:58 UTC (rev 1267)
+++ trunk/server/memcached/src/test/java/org/infinispan/server/memcached/FunctionalTest.java	2009-12-09 19:33:18 UTC (rev 1268)
@@ -36,7 +36,10 @@
 
 import org.infinispan.manager.CacheManager;
 import org.infinispan.test.SingleCacheManagerTest;
+import org.infinispan.test.TestingUtil;
+import org.infinispan.test.AbstractCacheTest.CleanupPhase;
 import org.infinispan.test.fwk.TestCacheManagerFactory;
+import org.testng.annotations.AfterClass;
 import org.testng.annotations.Test;
 
 /**
@@ -58,8 +61,8 @@
       DefaultConnectionFactory d = new DefaultConnectionFactory() {
          @Override
          public long getOperationTimeout() {
-            return 360000;
-            // return 5000;
+            // return 360000;
+            return 5000;
          }
       };
       
@@ -67,6 +70,11 @@
       return cacheManager;
    }
 
+   @AfterClass(alwaysRun=true)
+   protected void destroyAfterClass() {
+      server.stop();
+   }
+   
    public void testBasicSet(Method m) throws Exception {
       Future<Boolean> f = client.set(k(m), 0, v(m));
       assert f.get(5, TimeUnit.SECONDS);
@@ -214,7 +222,7 @@
 
    public void testBasicCas(Method m) throws Exception {
       Future<Boolean> f = client.set(k(m), 0, v(m));
-      assert f.get(5, TimeUnit.MINUTES);
+      assert f.get(5, TimeUnit.SECONDS);
       CASValue<Object> value = client.gets(k(m));
       assert v(m).equals(value.getValue());
       assert value.getCas() != 0;
@@ -225,7 +233,7 @@
 
    public void testCasNotFound(Method m) throws Exception {
       Future<Boolean> f = client.set(k(m), 0, v(m));
-      assert f.get(5, TimeUnit.MINUTES);
+      assert f.get(5, TimeUnit.SECONDS);
       CASValue<Object> value = client.gets(k(m));
       assert v(m).equals(value.getValue());
       assert value.getCas() != 0;
@@ -255,6 +263,11 @@
       assert CASResponse.OK == resp;
    }
 
+   public void testBasicDelete(Method m) throws Exception {
+      Future<Boolean> f = client.delete(k(m));
+      assert f.get(5, TimeUnit.SECONDS);
+   }
+
    private String k(Method method, String prefix) {
       return prefix + method.getName();
    }



More information about the infinispan-commits mailing list