[infinispan-commits] Infinispan SVN: r1500 - in trunk/server: core/src/main/java/org/infinispan/server/core/transport and 4 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Thu Feb 11 13:14:12 EST 2010


Author: galder.zamarreno at jboss.com
Date: 2010-02-11 13:14:11 -0500 (Thu, 11 Feb 2010)
New Revision: 1500

Added:
   trunk/server/core/src/main/java/org/infinispan/server/core/transport/Decoder.java
   trunk/server/core/src/main/java/org/infinispan/server/core/transport/ExceptionEvent.java
   trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyExceptionEvent.java
   trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyReplayingDecoder.java
   trunk/server/memcached/src/main/java/org/infinispan/server/memcached/transport/MemcachedDecoder.java
Modified:
   trunk/server/core/src/main/java/org/infinispan/server/core/CommandDecoder.java
   trunk/server/core/src/main/java/org/infinispan/server/core/CommandFactory.java
   trunk/server/memcached/src/main/java/org/infinispan/server/memcached/TextServer.java
   trunk/server/memcached/src/main/java/org/infinispan/server/memcached/transport/netty/NettyMemcachedDecoder.java
Log:
[ISPN-173] (Build memcached server module) Abstracted the replaying decoder code so that the memcached decoder does not directly depend on Netty any more. Even though hot rod will use a different decoder, this adds an abstraction layer between the decoder used and the real one used underneath.

Modified: trunk/server/core/src/main/java/org/infinispan/server/core/CommandDecoder.java
===================================================================
--- trunk/server/core/src/main/java/org/infinispan/server/core/CommandDecoder.java	2010-02-11 12:43:29 UTC (rev 1499)
+++ trunk/server/core/src/main/java/org/infinispan/server/core/CommandDecoder.java	2010-02-11 18:14:11 UTC (rev 1500)
@@ -28,6 +28,7 @@
  * @author Galder Zamarreño
  * @since 4.0
  */
+ at Deprecated
 public interface CommandDecoder {
 
 }

Modified: trunk/server/core/src/main/java/org/infinispan/server/core/CommandFactory.java
===================================================================
--- trunk/server/core/src/main/java/org/infinispan/server/core/CommandFactory.java	2010-02-11 12:43:29 UTC (rev 1499)
+++ trunk/server/core/src/main/java/org/infinispan/server/core/CommandFactory.java	2010-02-11 18:14:11 UTC (rev 1500)
@@ -25,7 +25,7 @@
 import java.io.IOException;
 
 /**
- * CommandFactory.
+ * TODO: This only deals with text based protocols, needs further thought. Will be looked into when implementing Hot Rod
  * 
  * @author Galder Zamarreño
  * @since 4.0

Added: trunk/server/core/src/main/java/org/infinispan/server/core/transport/Decoder.java
===================================================================
--- trunk/server/core/src/main/java/org/infinispan/server/core/transport/Decoder.java	                        (rev 0)
+++ trunk/server/core/src/main/java/org/infinispan/server/core/transport/Decoder.java	2010-02-11 18:14:11 UTC (rev 1500)
@@ -0,0 +1,39 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2010, Red Hat, Inc. and/or its affiliates, and
+ * individual contributors as indicated by the @author tags. See the
+ * copyright.txt file in the distribution for a full listing of
+ * individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.infinispan.server.core.transport;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Galder Zamarreño
+ * @since 4.0
+ */
+public interface Decoder<T extends Enum<T>> {
+   Object decode(ChannelHandlerContext ctx, ChannelBuffer buffer, T state) throws Exception;
+   void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception;
+
+   interface Checkpointer<T> {
+      void checkpoint(T state);
+   }
+}

Added: trunk/server/core/src/main/java/org/infinispan/server/core/transport/ExceptionEvent.java
===================================================================
--- trunk/server/core/src/main/java/org/infinispan/server/core/transport/ExceptionEvent.java	                        (rev 0)
+++ trunk/server/core/src/main/java/org/infinispan/server/core/transport/ExceptionEvent.java	2010-02-11 18:14:11 UTC (rev 1500)
@@ -0,0 +1,34 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2010, Red Hat, Inc. and/or its affiliates, and
+ * individual contributors as indicated by the @author tags. See the
+ * copyright.txt file in the distribution for a full listing of
+ * individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.infinispan.server.core.transport;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Galder Zamarreño
+ * @since 4.0
+ */
+public interface ExceptionEvent {
+   Throwable getCause();
+}

Added: trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyExceptionEvent.java
===================================================================
--- trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyExceptionEvent.java	                        (rev 0)
+++ trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyExceptionEvent.java	2010-02-11 18:14:11 UTC (rev 1500)
@@ -0,0 +1,45 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2010, Red Hat, Inc. and/or its affiliates, and
+ * individual contributors as indicated by the @author tags. See the
+ * copyright.txt file in the distribution for a full listing of
+ * individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.infinispan.server.core.transport.netty;
+
+import org.infinispan.server.core.transport.ExceptionEvent;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Galder Zamarreño
+ * @since 4.0
+ */
+public class NettyExceptionEvent implements ExceptionEvent {
+   final org.jboss.netty.channel.ExceptionEvent event;
+
+   NettyExceptionEvent(org.jboss.netty.channel.ExceptionEvent event) {
+      this.event = event;
+   }
+
+   @Override
+   public Throwable getCause() {
+      return event.getCause();
+   }
+}

Added: trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyReplayingDecoder.java
===================================================================
--- trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyReplayingDecoder.java	                        (rev 0)
+++ trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyReplayingDecoder.java	2010-02-11 18:14:11 UTC (rev 1500)
@@ -0,0 +1,91 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2010, Red Hat, Inc. and/or its affiliates, and
+ * individual contributors as indicated by the @author tags. See the
+ * copyright.txt file in the distribution for a full listing of
+ * individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.infinispan.server.core.transport.netty;
+
+import org.infinispan.server.core.transport.ChannelBuffer;
+import org.infinispan.server.core.transport.ChannelHandlerContext;
+import org.infinispan.server.core.transport.Decoder;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.handler.codec.replay.ReplayingDecoder;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Galder Zamarreño
+ * @since 4.0
+ */
+public class NettyReplayingDecoder<T extends Enum<T>> extends ReplayingDecoder<T> implements Decoder.Checkpointer<T> {
+   final Decoder<T> decoder;
+
+   public NettyReplayingDecoder(Decoder<T> decoder, T initialState) {
+      super(initialState, true);
+      this.decoder = decoder;
+   }
+
+   @Override
+   protected Object decode(org.jboss.netty.channel.ChannelHandlerContext nCtx, org.jboss.netty.channel.Channel channel,
+                           org.jboss.netty.buffer.ChannelBuffer nBuffer, T state) throws Exception {
+      ChannelHandlerContext ctx = new NettyChannelHandlerContext(nCtx);
+      ChannelBuffer buffer = new NettyChannelBuffer(nBuffer);
+      return decoder.decode(ctx, buffer, state);
+   }
+
+   @Override
+   public void checkpoint(T state) {
+      super.checkpoint(state);
+   }
+
+   @Override
+   public void exceptionCaught(org.jboss.netty.channel.ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
+      decoder.exceptionCaught(new NettyChannelHandlerContext(ctx), new NettyExceptionEvent(e));
+   }
+
+   //   ReplayingDecoder<T> decoder;
+//
+//   public NettyReplayingDecoder(ReplayingDecoder<T> decoder) {
+//      this.decoder = decoder;
+//   }
+//
+//   @Override
+//   public Object decode(ChannelHandlerContext ctx, ChannelBuffer buffer, T state) throws Exception {
+//      return decode(((NettyChannelHandlerContext) ctx).ctx, ((NettyChannelHandlerContext) ctx).ctx.getChannel(),
+//                    ((NettyChannelBuffer) buffer).buffer, state);
+//   }
+//
+//   @Override
+//   protected Object decode(org.jboss.netty.channel.ChannelHandlerContext ctx, org.jboss.netty.channel.Channel channel,
+//                           org.jboss.netty.buffer.ChannelBuffer buffer, T state) throws Exception {
+//      return null;
+//   }
+//
+//   class AccessorReplayingDecoder<T> extends ReplayingDecoder<T> {
+//      ReplayingDecoder<T> decoder;
+//
+//      @Override
+//      protected Object decode(org.jboss.netty.channel.ChannelHandlerContext ctx, org.jboss.netty.channel.Channel channel, org.jboss.netty.buffer.ChannelBuffer buffer, T state) throws Exception {
+//         return null;  // TODO: Customise this generated block
+//      }
+//   }
+
+}

Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/TextServer.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/TextServer.java	2010-02-11 12:43:29 UTC (rev 1499)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/TextServer.java	2010-02-11 18:14:11 UTC (rev 1500)
@@ -30,8 +30,9 @@
 import org.infinispan.Cache;
 import org.infinispan.manager.DefaultCacheManager;
 import org.infinispan.server.core.Server;
+import org.infinispan.server.core.transport.netty.NettyReplayingDecoder;
 import org.infinispan.server.core.transport.netty.NettyServer;
-import org.infinispan.server.memcached.transport.netty.NettyMemcachedDecoder;
+import org.infinispan.server.memcached.transport.MemcachedDecoder;
 import org.infinispan.server.core.InterceptorChain;
 import org.infinispan.server.memcached.commands.TextCommandHandler;
 import org.infinispan.server.memcached.commands.Value;
@@ -82,10 +83,14 @@
 
    public void start() throws Exception {
       InterceptorChain chain = TextProtocolInterceptorChainFactory.getInstance(cache).buildInterceptorChain();
-      NettyMemcachedDecoder decoder = new NettyMemcachedDecoder(cache, chain, scheduler);
+      MemcachedDecoder decoder = new MemcachedDecoder(cache, chain, scheduler);
+      NettyReplayingDecoder nettyDecoder = new NettyReplayingDecoder<MemcachedDecoder.State>(decoder,
+              MemcachedDecoder.State.READ_COMMAND);
+      decoder.setCheckpointer(nettyDecoder);
+
       TextCommandHandler commandHandler = new TextCommandHandler(cache, chain);
 
-      server = new NettyServer(commandHandler, decoder, new InetSocketAddress(host, port),
+      server = new NettyServer(commandHandler, nettyDecoder, new InetSocketAddress(host, port),
                masterThreads, workerThreads, cache.getName());
       server.start();
       log.info("Started Memcached text server bound to {0}:{1}", host, port);

Added: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/transport/MemcachedDecoder.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/transport/MemcachedDecoder.java	                        (rev 0)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/transport/MemcachedDecoder.java	2010-02-11 18:14:11 UTC (rev 1500)
@@ -0,0 +1,153 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2010, Red Hat, Inc. and/or its affiliates, and
+ * individual contributors as indicated by the @author tags. See the
+ * copyright.txt file in the distribution for a full listing of
+ * individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.infinispan.server.memcached.transport;
+
+import org.infinispan.Cache;
+import org.infinispan.server.core.Command;
+import org.infinispan.server.core.InterceptorChain;
+import org.infinispan.server.core.transport.Channel;
+import org.infinispan.server.core.transport.ChannelBuffer;
+import org.infinispan.server.core.transport.ChannelBuffers;
+import org.infinispan.server.core.transport.ChannelHandlerContext;
+import org.infinispan.server.core.transport.Decoder;
+import org.infinispan.server.core.transport.ExceptionEvent;
+import org.infinispan.server.memcached.Reply;
+import org.infinispan.server.memcached.UnknownCommandException;
+import org.infinispan.server.memcached.commands.CommandFactory;
+import org.infinispan.server.memcached.commands.StorageCommand;
+import org.infinispan.server.memcached.commands.TextCommand;
+import org.infinispan.server.memcached.commands.Value;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+
+import java.io.IOException;
+import java.io.StreamCorruptedException;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.infinispan.server.memcached.TextProtocolUtil.CR;
+import static org.infinispan.server.memcached.TextProtocolUtil.CRLF;
+import static org.infinispan.server.memcached.TextProtocolUtil.LF;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Galder Zamarreño
+ * @since 4.0
+ */
+public class MemcachedDecoder implements Decoder<MemcachedDecoder.State> {
+   private static final Log log = LogFactory.getLog(MemcachedDecoder.class);
+   private final CommandFactory factory;
+   private volatile TextCommand command;
+   private Decoder.Checkpointer checkpointer;
+
+   public enum State {
+      READ_COMMAND, READ_UNSTRUCTURED_DATA
+   }
+
+   public MemcachedDecoder(Cache<String, Value> cache, InterceptorChain chain, ScheduledExecutorService scheduler) {
+      this.factory = new CommandFactory(cache, chain, scheduler);
+   }
+
+   public void setCheckpointer(Checkpointer checkpointer) {
+      this.checkpointer = checkpointer;
+   }
+
+   @Override
+   public Object decode(ChannelHandlerContext ctx, ChannelBuffer buffer, State state) throws Exception {
+      switch (state) {
+         case READ_COMMAND:
+            command = factory.createCommand(readLine(buffer));
+            if (command.getType().isStorage())
+               checkpointer.checkpoint(State.READ_UNSTRUCTURED_DATA);
+            else
+               return command;
+            break;
+         case READ_UNSTRUCTURED_DATA:
+            StorageCommand storageCmd = (StorageCommand) command;
+            byte[] data= new byte[storageCmd.getParams().getBytes()];
+            buffer.readBytes(data, 0, data.length);
+            byte next = buffer.readByte();
+            if (next == CR) {
+               next = buffer.readByte();
+               if (next == LF) {
+                  try {
+                     return reset(storageCmd.setData(data));
+                  } catch (IOException ioe) {
+                     checkpointer.checkpoint(State.READ_COMMAND);
+                     throw ioe;
+                  }
+               } else {
+                  throw new StreamCorruptedException("Expecting \r\n after data block");
+               }
+            } else {
+               throw new StreamCorruptedException("Expecting \r\n after data block");
+            }
+      }
+      return null;
+   }
+
+   @Override
+   public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
+      Throwable t = e.getCause();
+      log.error("Unexpected exception", t);
+      Channel ch = ctx.getChannel();
+      ChannelBuffers buffers = ctx.getChannelBuffers();
+      if (t instanceof UnknownCommandException) {
+         ch.write(buffers.wrappedBuffer(buffers.wrappedBuffer(Reply.ERROR.bytes()), buffers.wrappedBuffer(CRLF)));
+      } else if (t instanceof IOException) {
+         StringBuilder sb = new StringBuilder();
+         sb.append(Reply.CLIENT_ERROR).append(' ').append(t);
+         ch.write(buffers.wrappedBuffer(buffers.wrappedBuffer(sb.toString().getBytes()), buffers.wrappedBuffer(CRLF)));
+      } else {
+         StringBuilder sb = new StringBuilder();
+         sb.append(Reply.SERVER_ERROR).append(' ').append(t);
+         ch.write(buffers.wrappedBuffer(buffers.wrappedBuffer(sb.toString().getBytes()), buffers.wrappedBuffer(CRLF)));
+      }
+   }
+
+   private Object reset(Command c) {
+      this.command = null;
+      checkpointer.checkpoint(State.READ_COMMAND);
+      return c;
+  }
+
+   private String readLine(ChannelBuffer buffer) {
+      StringBuilder sb = new StringBuilder(64);
+      int lineLength = 0;
+      while (true) {
+         byte next = buffer.readByte();
+         if (next == CR) {
+            next = buffer.readByte();
+            if (next == LF) {
+               return sb.toString();
+            }
+         } else if (next == LF) {
+            return sb.toString();
+         } else {
+            lineLength++;
+            sb.append((char) next);
+         }
+      }
+   }
+}

Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/transport/netty/NettyMemcachedDecoder.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/transport/netty/NettyMemcachedDecoder.java	2010-02-11 12:43:29 UTC (rev 1499)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/transport/netty/NettyMemcachedDecoder.java	2010-02-11 18:14:11 UTC (rev 1500)
@@ -54,6 +54,7 @@
  * @author Galder Zamarreño
  * @since 4.0
  */
+ at Deprecated
 public class NettyMemcachedDecoder extends ReplayingDecoder<NettyMemcachedDecoder.State> {
    private static final Log log = LogFactory.getLog(NettyMemcachedDecoder.class);
    



More information about the infinispan-commits mailing list