[infinispan-commits] Infinispan SVN: r1561 - in trunk/server: core/src/main/java/org/infinispan/server/core/transport and 5 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Wed Mar 3 06:39:19 EST 2010


Author: galder.zamarreno at jboss.com
Date: 2010-03-03 06:39:18 -0500 (Wed, 03 Mar 2010)
New Revision: 1561

Added:
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Replies.scala
Removed:
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/EncodedData.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Reply.scala
Modified:
   trunk/server/core/src/main/java/org/infinispan/server/core/MessageEvent.java
   trunk/server/core/src/main/java/org/infinispan/server/core/transport/ChannelBuffers.java
   trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyChannelBuffers.java
   trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyChannelHandlerContext.java
   trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyChannelPipelineFactory.java
   trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyMessageEvent.java
   trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyServer.java
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Cache.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CallerCache.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Command.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder410.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Flags.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/GlobalDecoder.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Handler.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/StatsCache.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/StorageCommand.scala
   trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FlagsTest.scala
   trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FunctionalTest.scala
   trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Client.scala
   trunk/server/memcached/src/main/java/org/infinispan/server/memcached/TextServer.java
Log:
[ISPN-171] (Build a server module based on the HotRod protocol) Response encoding added and got a put decode/encode working. This is a bit preliminary and will get tuned as more commands are written. Also, added encoder part to main server instance which comes in handy for Hot Rod.

Modified: trunk/server/core/src/main/java/org/infinispan/server/core/MessageEvent.java
===================================================================
--- trunk/server/core/src/main/java/org/infinispan/server/core/MessageEvent.java	2010-03-03 07:36:05 UTC (rev 1560)
+++ trunk/server/core/src/main/java/org/infinispan/server/core/MessageEvent.java	2010-03-03 11:39:18 UTC (rev 1561)
@@ -22,6 +22,8 @@
  */
 package org.infinispan.server.core;
 
+import org.infinispan.server.core.transport.Channel;
+
 import java.net.SocketAddress;
 
 /**
@@ -33,4 +35,5 @@
 public interface MessageEvent {
    Object getMessage();
    SocketAddress getRemoteAddress();
+   Channel getChannel();
 }

Modified: trunk/server/core/src/main/java/org/infinispan/server/core/transport/ChannelBuffers.java
===================================================================
--- trunk/server/core/src/main/java/org/infinispan/server/core/transport/ChannelBuffers.java	2010-03-03 07:36:05 UTC (rev 1560)
+++ trunk/server/core/src/main/java/org/infinispan/server/core/transport/ChannelBuffers.java	2010-03-03 11:39:18 UTC (rev 1561)
@@ -31,4 +31,5 @@
 public interface ChannelBuffers {
    ChannelBuffer wrappedBuffer(ChannelBuffer... buffers);
    ChannelBuffer wrappedBuffer(byte[] array);
+   ChannelBuffer dynamicBuffer();
 }

Modified: trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyChannelBuffers.java
===================================================================
--- trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyChannelBuffers.java	2010-03-03 07:36:05 UTC (rev 1560)
+++ trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyChannelBuffers.java	2010-03-03 11:39:18 UTC (rev 1561)
@@ -48,6 +48,11 @@
       return new NettyChannelBuffer(org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer(nettyBuffers));
    }
 
+   @Override
+   public ChannelBuffer dynamicBuffer() {
+      return new NettyChannelBuffer(org.jboss.netty.buffer.ChannelBuffers.dynamicBuffer());
+   }
+
    public static NettyChannelBuffers getInstance() {
       return INSTANCE;
    }

Modified: trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyChannelHandlerContext.java
===================================================================
--- trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyChannelHandlerContext.java	2010-03-03 07:36:05 UTC (rev 1560)
+++ trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyChannelHandlerContext.java	2010-03-03 11:39:18 UTC (rev 1561)
@@ -47,6 +47,6 @@
    }
 
    public ChannelBuffers getChannelBuffers() {
-      return NettyChannelBuffers.INSTANCE;
+      return NettyChannelBuffers.getInstance();
    }
 }

Modified: trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyChannelPipelineFactory.java
===================================================================
--- trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyChannelPipelineFactory.java	2010-03-03 07:36:05 UTC (rev 1560)
+++ trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyChannelPipelineFactory.java	2010-03-03 11:39:18 UTC (rev 1561)
@@ -24,9 +24,11 @@
 
 import static org.jboss.netty.channel.Channels.pipeline;
 
+import org.jboss.netty.channel.ChannelDownstreamHandler;
 import org.jboss.netty.channel.ChannelHandler;
 import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.ChannelUpstreamHandler;
 
 /**
  * NettyChannelPipelineFactory.
@@ -35,20 +37,30 @@
  * @since 4.0
  */
 public class NettyChannelPipelineFactory implements ChannelPipelineFactory {
-   private final ChannelHandler decoder;
+   private final ChannelUpstreamHandler decoder;
+   private final ChannelDownstreamHandler encoder;
    private final ChannelHandler handler;
 
-   public NettyChannelPipelineFactory(ChannelHandler decoder, ChannelHandler handler) {
+   public NettyChannelPipelineFactory(ChannelUpstreamHandler decoder, ChannelDownstreamHandler encoder, ChannelHandler handler) {
       this.decoder = decoder;
       this.handler = handler;
+      this.encoder = encoder;
    }
 
    @Override
    public ChannelPipeline getPipeline() throws Exception {
       // Create a default pipeline implementation.
       ChannelPipeline pipeline = pipeline();
-      pipeline.addLast("decoder", decoder);
-      pipeline.addLast("handler", handler);
+      if (decoder != null) {
+         pipeline.addLast("decoder", decoder);
+      }
+      if (encoder != null) {
+         pipeline.addLast("encoder", encoder);
+      }
+      if (handler != null) {
+         pipeline.addLast("handler", handler);
+      }
+
       return pipeline;
    }
 

Modified: trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyMessageEvent.java
===================================================================
--- trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyMessageEvent.java	2010-03-03 07:36:05 UTC (rev 1560)
+++ trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyMessageEvent.java	2010-03-03 11:39:18 UTC (rev 1561)
@@ -25,6 +25,7 @@
 import java.net.SocketAddress;
 
 import org.infinispan.server.core.MessageEvent;
+import org.infinispan.server.core.transport.Channel;
 
 /**
  * NettyMessageEvent.
@@ -49,4 +50,8 @@
       return event.getRemoteAddress();
    }
 
+   @Override
+   public Channel getChannel() {
+      return new NettyChannel(event.getChannel());
+   }
 }

Modified: trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyServer.java
===================================================================
--- trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyServer.java	2010-03-03 07:36:05 UTC (rev 1560)
+++ trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyServer.java	2010-03-03 11:39:18 UTC (rev 1561)
@@ -35,9 +35,10 @@
 import org.infinispan.util.logging.LogFactory;
 import org.jboss.netty.bootstrap.ServerBootstrap;
 import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelDownstreamHandler;
 import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelHandler;
 import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.ChannelUpstreamHandler;
 import org.jboss.netty.channel.group.ChannelGroup;
 import org.jboss.netty.channel.group.ChannelGroupFuture;
 import org.jboss.netty.channel.group.DefaultChannelGroup;
@@ -45,6 +46,8 @@
 
 /**
  * NettyChannelFactory.
+ *
+ * // TODO: Make this class more generic and remove any memcached references
  * 
  * @author Galder Zamarreño
  * @since 4.0
@@ -59,8 +62,8 @@
    final ExecutorService masterExecutor;
    final ExecutorService workerExecutor;
    
-   public NettyServer(CommandHandler commandHandler, ChannelHandler decoder, SocketAddress address,
-            int masterThreads, int workerThreads, String cacheName) {
+   public NettyServer(CommandHandler commandHandler, ChannelUpstreamHandler decoder, ChannelDownstreamHandler encoder, 
+            SocketAddress address, int masterThreads, int workerThreads, String cacheName) {
       ThreadFactory tf = new MemcachedThreadFactory(cacheName, ExecutorType.MASTER);
       if (masterThreads == 0) {
          log.debug("Configured unlimited threads for master thread pool");
@@ -80,7 +83,7 @@
       }
 
       NettyChannelUpstreamHandler handler = new NettyChannelUpstreamHandler(commandHandler, acceptedChannels);
-      this.pipeline = new NettyChannelPipelineFactory(decoder, handler);
+      this.pipeline = new NettyChannelPipelineFactory(decoder, encoder, handler);
       this.address = address;
       if (workerThreads == 0) {
          factory = new NioServerSocketChannelFactory(masterExecutor, workerExecutor);

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Cache.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Cache.scala	2010-03-03 07:36:05 UTC (rev 1560)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Cache.scala	2010-03-03 11:39:18 UTC (rev 1561)
@@ -7,6 +7,6 @@
  */
 
 abstract class Cache {
-   def put(c: StorageCommand): Reply.Value
-   def get(c: RetrievalCommand): Reply.Value
+   def put(c: StorageCommand): Response
+   def get(c: RetrievalCommand): Response
 }
\ No newline at end of file

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CallerCache.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CallerCache.scala	2010-03-03 07:36:05 UTC (rev 1560)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CallerCache.scala	2010-03-03 11:39:18 UTC (rev 1561)
@@ -11,13 +11,13 @@
 
 class CallerCache(val manager: CacheManager) extends Cache {
 
-   override def put(c: StorageCommand): Reply.Value = {
+   override def put(c: StorageCommand): Response = {
       val cache: InfinispanCache[Array[Byte], Array[Byte]] = manager.getCache(c.cacheName)
       cache.put(c.key, c.value)
-      Reply.Stored
+      new Response(OpCodes.PutResponse, c.id, Status.Success)
    }
 
-   override def get(c: RetrievalCommand): Reply.Value = {
+   override def get(c: RetrievalCommand): Response = {
       null
    }
 }
\ No newline at end of file

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Command.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Command.scala	2010-03-03 07:36:05 UTC (rev 1560)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Command.scala	2010-03-03 11:39:18 UTC (rev 1561)
@@ -8,5 +8,5 @@
 
 @Deprecated
 trait Command {
-//   def perform(op: Unit => Reply)
+//   def perform(op: Unit => Replies)
 }
\ No newline at end of file

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder410.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder410.scala	2010-03-03 07:36:05 UTC (rev 1560)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder410.scala	2010-03-03 11:39:18 UTC (rev 1561)
@@ -14,8 +14,7 @@
 
    val Put = 0x01
 
-   @Override
-   def decode(ctx: ChannelHandlerContext, buffer: ChannelBuffer, state: NoState): StorageCommand = {
+   override def decode(ctx: ChannelHandlerContext, buffer: ChannelBuffer, state: NoState): StorageCommand = {
       val op = buffer.readUnsignedByte
       val cacheName = readString(buffer)
       val id = VLong.read(buffer)
@@ -27,7 +26,7 @@
                val lifespan = VInt.read(buffer)
                val maxIdle = VInt.read(buffer)
                val value = readByteArray(buffer)
-               new StorageCommand(cacheName, key, lifespan, maxIdle, value, flags)({
+               new StorageCommand(cacheName, id, key, lifespan, maxIdle, value, flags)({
                   (cache: Cache, command: StorageCommand) => cache.put(command)
                })
             }
@@ -36,8 +35,7 @@
       command
    }
 
-   @Override
-   def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent) {
+   override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent) {
       error("Error", e.getCause)
    }
 

Deleted: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/EncodedData.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/EncodedData.scala	2010-03-03 07:36:05 UTC (rev 1560)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/EncodedData.scala	2010-03-03 11:39:18 UTC (rev 1561)
@@ -1,35 +0,0 @@
-package org.infinispan.server.hotrod
-
-/**
- * // TODO: Document this
- * @author Galder Zamarreño
- * @since 4.0
- */
-
- at Deprecated
-object EncodedData extends Enumeration {
-   type EncodedData = Value
-   val Array = Value(1)
-   val Byte = Value(1 << 1)
-   val Boolean = Value(1 << 2)
-   val Character = Value(1 << 3)
-   val String = Value(1 << 4)
-   val Date = Value(1 << 5)
-   val Double = Value(1 << 6)
-   // 1 << 7 skipped since that's the variable length marker
-   val Float = Value(1 << 8)
-   val Integer = Value(1 << 9)
-   val Long = Value(1 << 10)
-   val Map = Value(1 << 11)
-   val Primitive = Value(1 << 12)
-   val Serialized = Value(1 << 13)
-   val Short = Value(1 << 14)
-   // 1 << 15 skipped since that's the variable length marker
-   val Compressed = Value(1 << 16)
-   val StringBuilder = Value(1 << 17)
-
-}
-
- at Deprecated
-class EncodedData(dataType: EncodedData, length: Long, data: Array[Byte]) {
-}
\ No newline at end of file

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Flags.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Flags.scala	2010-03-03 07:36:05 UTC (rev 1560)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Flags.scala	2010-03-03 11:39:18 UTC (rev 1561)
@@ -1,6 +1,7 @@
 package org.infinispan.server.hotrod
 
 import scala.collection.mutable.HashSet
+import scala.collection.immutable
 import org.infinispan.context.Flag
 
 /**
@@ -11,8 +12,6 @@
 
 object Flags extends Enumeration {
 
-   type Flags = Value
-
    private val ZeroLockAcquisitionTimeout = Value(1, Flag.ZERO_LOCK_ACQUISITION_TIMEOUT.toString)
    private val CacheModeLocal = Value(1 << 1, Flag.CACHE_MODE_LOCAL.toString)
    private val SkipLocking = Value(1 << 2, Flag.SKIP_LOCKING.toString)
@@ -28,8 +27,8 @@
 
    def extractFlags(bitFlags: Int): Set[Flag] = {
       val s = new HashSet[Flag]
-      Flags.filter(f => (bitFlags & f.id) > 0).foreach(f => s += Flag.valueOf(f.toString))
-      new scala.collection.immutable.HashSet ++ s
+      Flags.values.filter(f => (bitFlags & f.id) > 0).foreach(f => s += Flag.valueOf(f.toString))
+      new immutable.HashSet ++ s
    }
 
 }
\ No newline at end of file

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/GlobalDecoder.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/GlobalDecoder.scala	2010-03-03 07:36:05 UTC (rev 1560)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/GlobalDecoder.scala	2010-03-03 11:39:18 UTC (rev 1561)
@@ -11,17 +11,21 @@
 class GlobalDecoder extends Decoder[NoState] {
    import GlobalDecoder._
 
-   val Magic = 0xA0
-   val Version410 = 41
+   private val Magic = 0xA0
+   private val Version410 = 41
 
    override def decode(ctx: ChannelHandlerContext, buffer: ChannelBuffer, state: NoState): Object = {
 //      trace("Buffer contains: {0}", buffer)
 //      state match {
 //         case NoState.VOID => {
-      val header = getHeader(buffer)
-      if (header == null) null
-      verifyMagic(header)
-      val version = getVersion(header)
+//      val header = buffer.readBytes(2)
+      val magic = buffer.readUnsignedByte()
+      if (magic != Magic) {
+         buffer.resetReaderIndex()
+         throw new StreamCorruptedException("Magic byte incorrect: " + magic)
+      }
+
+      val version = buffer.readUnsignedByte()
       val decoder =
          version match {
             case Version410 => new Decoder410
@@ -36,23 +40,18 @@
       error("Error", e.getCause)
    }
 
-   private def getHeader(buffer: ChannelBuffer): ChannelBuffer = {
-      if (buffer.readableBytes() < 2) null
-      buffer.readBytes(2)
-   }
+//   private def getHeader(buffer: ChannelBuffer): ChannelBuffer = {
+//      if (buffer.readableBytes() < 2) null
+//
+//   }
+//
+//   private def verifyMagic(buffer: ChannelBuffer) {
+//   }
+//
+//   private def getVersion(buffer: ChannelBuffer) = {
+//
+//   }
 
-   private def verifyMagic(buffer: ChannelBuffer) {
-      val magic = buffer.readUnsignedByte()
-      if (magic != Magic) {
-         buffer.resetReaderIndex()
-         throw new StreamCorruptedException("Magic byte incorrect: " + magic)
-      }
-   }
-
-   private def getVersion(buffer: ChannelBuffer) = {
-      buffer.readUnsignedByte()
-   }
-
 }
 
 object GlobalDecoder extends Logging
\ No newline at end of file

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Handler.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Handler.scala	2010-03-03 07:36:05 UTC (rev 1560)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Handler.scala	2010-03-03 11:39:18 UTC (rev 1561)
@@ -13,7 +13,7 @@
 
    override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent) {
       e.getMessage match {
-         case c: StorageCommand => println(c.op(hotCache, c))
+         case c: StorageCommand => e.getChannel.write(c.op(hotCache, c))
       }
 
 

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala	2010-03-03 07:36:05 UTC (rev 1560)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala	2010-03-03 11:39:18 UTC (rev 1561)
@@ -1,9 +1,9 @@
 package org.infinispan.server.hotrod
 
 import org.infinispan.manager.CacheManager
-import org.infinispan.server.core.transport.netty.{NettyServer, NettyReplayingDecoder}
 import java.net.InetSocketAddress
 import org.infinispan.server.core.Server
+import org.infinispan.server.core.transport.netty.{NettyEncoder, NettyServer, NettyReplayingDecoder}
 
 /**
  * // TODO: Document this
@@ -19,13 +19,15 @@
 
    import HotRodServer._
 
-   private var server: Server = null
+   private var server: Server = _
 
    def start {
-      var decoder = new GlobalDecoder
+      val decoder = new GlobalDecoder
       val nettyDecoder = new NettyReplayingDecoder[NoState](decoder)
+      val encoder = new Encoder410
+      val nettyEncoder = new NettyEncoder(encoder)
       val commandHandler = new Handler(new CallerCache(manager))
-      server = new NettyServer(commandHandler, nettyDecoder, new InetSocketAddress(host, port),
+      server = new NettyServer(commandHandler, nettyDecoder, nettyEncoder, new InetSocketAddress(host, port),
                                masterThreads, workerThreads, "HotRod")
       server.start
       info("Started Hot Rod bound to {0}:{1}", host, port)

Copied: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Replies.scala (from rev 1557, trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Reply.scala)
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Replies.scala	                        (rev 0)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Replies.scala	2010-03-03 11:39:18 UTC (rev 1561)
@@ -0,0 +1,12 @@
+package org.infinispan.server.hotrod
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since 4.0
+ */
+
+object Replies extends Enumeration {
+   type Reply = Value
+   val Stored = Value
+}
\ No newline at end of file

Deleted: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Reply.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Reply.scala	2010-03-03 07:36:05 UTC (rev 1560)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Reply.scala	2010-03-03 11:39:18 UTC (rev 1561)
@@ -1,12 +0,0 @@
-package org.infinispan.server.hotrod
-
-/**
- * // TODO: Document this
- * @author Galder Zamarreño
- * @since 4.0
- */
-
-object Reply extends Enumeration {
-   type Reply = Value
-   val Stored = Value
-}
\ No newline at end of file

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/StatsCache.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/StatsCache.scala	2010-03-03 07:36:05 UTC (rev 1560)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/StatsCache.scala	2010-03-03 11:39:18 UTC (rev 1561)
@@ -18,12 +18,12 @@
 
 trait StatsCache extends Cache {
 
-   abstract override def put(c: StorageCommand): Reply.Value = {
+   abstract override def put(c: StorageCommand): Response = {
       // TODO: calculate stats if necessary
       super.put(c)
    }
 
-   abstract override def get(c: RetrievalCommand): Reply.Value = {
+   abstract override def get(c: RetrievalCommand): Response = {
       // TODO: calculate stats if necessary
       super.get(c)
    }

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/StorageCommand.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/StorageCommand.scala	2010-03-03 07:36:05 UTC (rev 1560)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/StorageCommand.scala	2010-03-03 11:39:18 UTC (rev 1561)
@@ -9,12 +9,17 @@
  */
 
 // val cache: Cache[Array[Byte], Array[Byte]]
-class StorageCommand(val cacheName: String, val key: Array[Byte], val lifespan: Int,
-                     val maxIdle: Int, val value: Array[Byte], val flags: Set[Flag])
-                    (val op: (Cache, StorageCommand) => Reply.Value)
+class StorageCommand(val cacheName: String,
+                     val id: Long,
+                     val key: Array[Byte],
+                     val lifespan: Int,
+                     val maxIdle: Int,
+                     val value: Array[Byte],
+                     val flags: Set[Flag])
+                    (val op: (Cache, StorageCommand) => Response)
 //{
 //
-////   def perform(op: StorageCommand => Reply.Value) {
+////   def perform(op: StorageCommand => Replies.Value) {
 ////      op(this)
 ////   }
 ////

Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FlagsTest.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FlagsTest.scala	2010-03-03 07:36:05 UTC (rev 1560)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FlagsTest.scala	2010-03-03 11:39:18 UTC (rev 1561)
@@ -63,7 +63,7 @@
 //   }
 
    private def flag(bitFlags: Int)(size: Int)(p: Set[Flag] => Boolean) {
-      var flags = Flags.extractFlags(bitFlags)
+      val flags = Flags.extractFlags(bitFlags)
       assertEquals(flags.size, size)
       assertTrue(p(flags))
    }

Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FunctionalTest.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FunctionalTest.scala	2010-03-03 07:36:05 UTC (rev 1560)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FunctionalTest.scala	2010-03-03 11:39:18 UTC (rev 1561)
@@ -23,7 +23,7 @@
  */
 @Test(groups = Array("functional"), testName = "server.hotrod.FunctionalTest")
 class FunctionalTest extends SingleCacheManagerTest with Utils with Client {
-   private var server: HotRodServer = null
+   private var server: HotRodServer = _
 
    override def createCacheManager: CacheManager = {
       val cacheManager = TestCacheManagerFactory.createLocalCacheManager
@@ -34,7 +34,8 @@
 
    def testPutBasic(m: Method) {
       assertTrue(connect("127.0.0.1", server.port))
-      assertTrue(put("__default", k(m) , 0, 0, v(m)))
+      val status = put("__default", k(m) , 0, 0, v(m))
+      assertTrue(status == 0, "Status should have been 0 but instead was: " + status)
    }
 
    @AfterClass(alwaysRun = true)

Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Client.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Client.scala	2010-03-03 07:36:05 UTC (rev 1560)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Client.scala	2010-03-03 11:39:18 UTC (rev 1561)
@@ -1,7 +1,6 @@
 package org.infinispan.server.hotrod.test
 
 import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
-import java.util.concurrent.Executors
 import org.jboss.netty.bootstrap.ClientBootstrap
 import java.net.InetSocketAddress
 import org.jboss.netty.handler.codec.oneone.OneToOneEncoder
@@ -9,7 +8,9 @@
 import org.jboss.netty.buffer.{ChannelBuffer, ChannelBuffers}
 import org.infinispan.server.core.transport.netty.NettyChannelBuffer
 import org.jboss.netty.handler.codec.replay.ReplayingDecoder
-import org.infinispan.server.hotrod.{Logging, NoState, VLong, VInt}
+import org.testng.Assert._
+import java.util.concurrent.{LinkedBlockingQueue, Executors}
+import org.infinispan.server.hotrod._
 
 /**
  * // TODO: Document this
@@ -20,7 +21,7 @@
  * @since 4.1
  */
 trait Client {
-   private var channel: Channel = null
+   private var channel: Channel = _
 
    def connect(host: String, port: Int): Boolean = {
       // Set up.
@@ -33,17 +34,17 @@
       val connectFuture = bootstrap.connect(new InetSocketAddress(host, port))
       // Wait until the connection is made successfully.
       channel = connectFuture.awaitUninterruptibly.getChannel
-//      // Get the handler instance to retrieve the answer.
-//      var handler = channel.getPipeline.getLast.asInstanceOf[ClientHandler]
       connectFuture.isSuccess
    }
 
-   def put(cacheName: String, key: Array[Byte], lifespan: Int, maxIdle: Int, value: Array[Byte]): Boolean = {
+   def put(cacheName: String, key: Array[Byte], lifespan: Int, maxIdle: Int, value: Array[Byte]): Byte = {
       val writeFuture = channel.write(new Store(cacheName, key, lifespan, maxIdle, value))
       writeFuture.awaitUninterruptibly
-      writeFuture.isSuccess
+      assertTrue(writeFuture.isSuccess)
+      // Get the handler instance to retrieve the answer.
+      var handler = channel.getPipeline.getLast.asInstanceOf[ClientHandler]
+      handler.getResponse.status.id.byteValue
    }
-   
 }
 
 @ChannelPipelineCoverage("all")
@@ -53,7 +54,7 @@
       val pipeline = Channels.pipeline
       pipeline.addLast("decoder", Decoder)
       pipeline.addLast("encoder", Encoder)
-//      pipeline.addLast("handler", new FactorialClientHandler(count))
+      pipeline.addLast("handler", new ClientHandler)
       pipeline
    }
 
@@ -90,8 +91,13 @@
 
 private object Decoder extends ReplayingDecoder[NoState] with Logging {
 
-   override def decode(ctx: ChannelHandlerContext, ch: Channel, buffer: ChannelBuffer, state: NoState) = {
-      null
+   override def decode(ctx: ChannelHandlerContext, ch: Channel, buffer: ChannelBuffer, state: NoState): Object = {
+      val buf = new NettyChannelBuffer(buffer)
+      val magic = buf.readUnsignedByte
+      val opCode = buf.readUnsignedByte
+      val id = VLong.read(buf)
+      val status = buf.readUnsignedByte
+      new Response(OpCodes.apply(opCode), id, Status.apply(status))
    }
 
    override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent) {
@@ -99,22 +105,22 @@
    }
 }
 
-//private class ClientHandler(val command: Any) extends SimpleChannelUpstreamHandler {
-//
-//   override def channelConnected(ctx: ChannelHandlerContext, e: ChannelStateEvent) {
-//      sendCommand(e)
-//   }
-//
-//   override def channelInterestChanged(ctx: ChannelHandlerContext, e: ChannelStateEvent) {
-//      sendCommand(e)
-//   }
-//
-//   private def sendCommand(e: ChannelStateEvent) {
-//      var channel = e.getChannel
-//      channel.write(command)
-//   }
-//}
+ at ChannelPipelineCoverage("one")
+private class ClientHandler extends SimpleChannelUpstreamHandler {
 
+   private val answer = new LinkedBlockingQueue[Response]; 
+
+   override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent) {
+      val offered = answer.offer(e.getMessage.asInstanceOf[Response])
+      assertTrue(offered)
+   }
+
+   def getResponse: Response = {
+      answer.take
+   }
+
+}
+
 private class Store(val cacheName: String, val key: Array[Byte],
                     val lifespan: Int, val maxIdle: Int,
                     val value: Array[Byte])
\ No newline at end of file

Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/TextServer.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/TextServer.java	2010-03-03 07:36:05 UTC (rev 1560)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/TextServer.java	2010-03-03 11:39:18 UTC (rev 1561)
@@ -90,7 +90,8 @@
 
       TextCommandHandler commandHandler = new TextCommandHandler(cache, chain);
 
-      server = new NettyServer(commandHandler, nettyDecoder, new InetSocketAddress(host, port),
+      // No common encoder used, each command encodes its response, since there's no common ground for all.
+      server = new NettyServer(commandHandler, nettyDecoder, null, new InetSocketAddress(host, port),
                masterThreads, workerThreads, cache.getName());
       server.start();
       log.info("Started Memcached text server bound to {0}:{1}", host, port);



More information about the infinispan-commits mailing list