[infinispan-commits] Infinispan SVN: r1564 - in trunk/server: core/src/main/java/org/infinispan/server/core/transport/netty and 3 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Wed Mar 3 10:46:53 EST 2010


Author: galder.zamarreno at jboss.com
Date: 2010-03-03 10:46:52 -0500 (Wed, 03 Mar 2010)
New Revision: 1564

Removed:
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Replies.scala
Modified:
   trunk/server/core/src/main/java/org/infinispan/server/core/transport/ChannelBuffer.java
   trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyChannelBuffer.java
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder410.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Encoder410.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/GlobalDecoder.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/VInt.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/VLong.scala
   trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FlagsTest.scala
   trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FunctionalTest.scala
   trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/VariableLengthTest.scala
   trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Client.scala
Log:
[ISPN-171] (Build a server module based on the HotRod protocol) Added read/write unsigned int/long to the ChannelBuffer interface and provided implementation that talks to ChannelBuffer (as opposed to Unsigned class that uses ObjectInput and ObjectOutput). Also sorted out an issue with TestNG and vars defined in traits.

Modified: trunk/server/core/src/main/java/org/infinispan/server/core/transport/ChannelBuffer.java
===================================================================
--- trunk/server/core/src/main/java/org/infinispan/server/core/transport/ChannelBuffer.java	2010-03-03 12:03:26 UTC (rev 1563)
+++ trunk/server/core/src/main/java/org/infinispan/server/core/transport/ChannelBuffer.java	2010-03-03 15:46:52 UTC (rev 1564)
@@ -26,24 +26,24 @@
  * ChannelBuffer.
  * 
  * @author Galder Zamarreño
- * @since 4.0
+ * @since 4.1
  */
 public interface ChannelBuffer {
    byte readByte();
    void readBytes(byte[] dst, int dstIndex, int length);
-   int readableBytes();
    short readUnsignedByte();
+   int readUnsignedInt();
+   long readUnsignedLong();
    ChannelBuffer readBytes(int length);
-   void resetReaderIndex();
    int readerIndex();
    void readBytes(byte[] dst);
 
    void writeByte(byte value);
    void writeBytes(byte[] src);
+   void writeUnsignedInt(int i);
+   void writeUnsignedLong(long l);
    int writerIndex();
 
    Object getUnderlyingChannelBuffer();
 
-   // TODO: Add read/write methods for reading and writing variable length numbers,
-   // TODO: that way abstracting VInt and VLong
 }

Modified: trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyChannelBuffer.java
===================================================================
--- trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyChannelBuffer.java	2010-03-03 12:03:26 UTC (rev 1563)
+++ trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyChannelBuffer.java	2010-03-03 15:46:52 UTC (rev 1564)
@@ -28,7 +28,7 @@
  * NettyChannelBuffer.
  * 
  * @author Galder Zamarreño
- * @since 4.0
+ * @since 4.1
  */
 public class NettyChannelBuffer implements ChannelBuffer /*, org.jboss.netty.nettyBuffer.ChannelBuffer*/ {
    private final org.jboss.netty.buffer.ChannelBuffer nettyBuffer;
@@ -41,8 +41,48 @@
    public org.jboss.netty.buffer.ChannelBuffer getUnderlyingChannelBuffer() {
       return nettyBuffer;
    }
-   
+
    @Override
+   public int readUnsignedInt() {
+      byte b = readByte();
+      int i = b & 0x7F;
+      for (int shift = 7; (b & 0x80) != 0; shift += 7) {
+         b = readByte();
+         i |= (b & 0x7FL) << shift;
+      }
+      return i;
+   }
+
+   @Override
+   public void writeUnsignedInt(int i) {
+      while ((i & ~0x7F) != 0) {
+         writeByte((byte) ((i & 0x7f) | 0x80));
+         i >>>= 7;
+      }
+      writeByte((byte) i);
+   }
+
+   @Override
+   public long readUnsignedLong() {
+      byte b = readByte();
+      long l = b & 0x7F;
+      for (int shift = 7; (b & 0x80) != 0; shift += 7) {
+         b = readByte();
+         l |= (b & 0x7FL) << shift;
+      }
+      return l;
+   }
+
+   @Override
+   public void writeUnsignedLong(long l) {
+      while ((l & ~0x7F) != 0) {
+         writeByte((byte) ((l & 0x7f) | 0x80));
+         l >>>= 7;
+      }
+      writeByte((byte) l);
+   }
+
+   @Override
    public byte readByte() {
       return nettyBuffer.readByte();
    }
@@ -305,25 +345,27 @@
 //      return nettyBuffer.readable();
 //   }
 //
-     @Override
-     public int readableBytes() {
-        return nettyBuffer.readableBytes();
-     }
-
+//     @Override
+//     public int readableBytes() {
+//        return nettyBuffer.readableBytes();
+//     }
+//
    @Override
    public int readerIndex() {
       return nettyBuffer.readerIndex();
    }
+
 //
+//
 //   @Override
 //   public void readerIndex(int readerIndex) {
 //      nettyBuffer.readerIndex(readerIndex);
 //   }
 //
-     @Override
-     public void resetReaderIndex() {
-        nettyBuffer.resetReaderIndex();
-     }
+//     @Override
+//     public void resetReaderIndex() {
+//        nettyBuffer.resetReaderIndex();
+//     }
 //
 //   @Override
 //   public void resetWriterIndex() {

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder410.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder410.scala	2010-03-03 12:03:26 UTC (rev 1563)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder410.scala	2010-03-03 15:46:52 UTC (rev 1564)
@@ -6,9 +6,8 @@
 /**
  * // TODO: Document this
  * @author Galder Zamarreño
- * @since 4.0
+ * @since 4.1
  */
-
 class Decoder410 extends Decoder[NoState] {
    import Decoder410._
 
@@ -17,14 +16,14 @@
    override def decode(ctx: ChannelHandlerContext, buffer: ChannelBuffer, state: NoState): StorageCommand = {
       val op = buffer.readUnsignedByte
       val cacheName = readString(buffer)
-      val id = VLong.read(buffer)
-      val flags = Flags.extractFlags(VInt.read(buffer))
+      val id = buffer.readUnsignedLong
+      val flags = Flags.extractFlags(buffer.readUnsignedInt)
       val command: StorageCommand =
          op match {
             case Put => {
                val key = readByteArray(buffer)
-               val lifespan = VInt.read(buffer)
-               val maxIdle = VInt.read(buffer)
+               val lifespan = buffer.readUnsignedInt
+               val maxIdle = buffer.readUnsignedInt
                val value = readByteArray(buffer)
                new StorageCommand(cacheName, id, key, lifespan, maxIdle, value, flags)({
                   (cache: Cache, command: StorageCommand) => cache.put(command)
@@ -40,13 +39,13 @@
    }
 
    private def readString(buffer: ChannelBuffer): String = {
-      val array = new Array[Byte](VInt.read(buffer))
+      val array = new Array[Byte](buffer.readUnsignedInt)
       buffer.readBytes(array)
       new String(array, "UTF8")
    }
 
    private def readByteArray(buffer: ChannelBuffer): Array[Byte] = {
-      val array = new Array[Byte](VInt.read(buffer))
+      val array = new Array[Byte](buffer.readUnsignedInt)
       buffer.readBytes(array)
       array
    }

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Encoder410.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Encoder410.scala	2010-03-03 12:03:26 UTC (rev 1563)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Encoder410.scala	2010-03-03 15:46:52 UTC (rev 1564)
@@ -17,12 +17,12 @@
       val buffer: ChannelBuffer =
          msg match {
             case r: Response => {
-               val buff = ctx.getChannelBuffers.dynamicBuffer
-               buff.writeByte(Magic.byteValue)
-               buff.writeByte(r.opCode.id.byteValue)
-               VLong.write(buff, r.id)
-               buff.writeByte(r.status.id.byteValue)
-               buff
+               val buffer = ctx.getChannelBuffers.dynamicBuffer
+               buffer.writeByte(Magic.byteValue)
+               buffer.writeByte(r.opCode.id.byteValue)
+               buffer.writeUnsignedLong(r.id)
+               buffer.writeByte(r.status.id.byteValue)
+               buffer
             }
       }
       buffer

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/GlobalDecoder.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/GlobalDecoder.scala	2010-03-03 12:03:26 UTC (rev 1563)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/GlobalDecoder.scala	2010-03-03 15:46:52 UTC (rev 1564)
@@ -6,7 +6,7 @@
 /**
  * // TODO: Document this
  * @author Galder Zamarreño
- * @since 4.0
+ * @since 4.1
  */
 class GlobalDecoder extends Decoder[NoState] {
    import GlobalDecoder._
@@ -15,13 +15,8 @@
    private val Version410 = 41
 
    override def decode(ctx: ChannelHandlerContext, buffer: ChannelBuffer, state: NoState): Object = {
-//      trace("Buffer contains: {0}", buffer)
-//      state match {
-//         case NoState.VOID => {
-//      val header = buffer.readBytes(2)
       val magic = buffer.readUnsignedByte()
       if (magic != Magic) {
-         buffer.resetReaderIndex()
          throw new StreamCorruptedException("Magic byte incorrect: " + magic)
       }
 
@@ -32,26 +27,12 @@
             case _ => throw new StreamCorruptedException("Unknown version:" + version)
          }
       decoder.decode(ctx, buffer, state)
-//         }
-//      }
    }
 
    override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent) {
       error("Error", e.getCause)
    }
 
-//   private def getHeader(buffer: ChannelBuffer): ChannelBuffer = {
-//      if (buffer.readableBytes() < 2) null
-//
-//   }
-//
-//   private def verifyMagic(buffer: ChannelBuffer) {
-//   }
-//
-//   private def getVersion(buffer: ChannelBuffer) = {
-//
-//   }
-
 }
 
 object GlobalDecoder extends Logging
\ No newline at end of file

Deleted: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Replies.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Replies.scala	2010-03-03 12:03:26 UTC (rev 1563)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Replies.scala	2010-03-03 15:46:52 UTC (rev 1564)
@@ -1,12 +0,0 @@
-package org.infinispan.server.hotrod
-
-/**
- * // TODO: Document this
- * @author Galder Zamarreño
- * @since 4.0
- */
-
-object Replies extends Enumeration {
-   type Reply = Value
-   val Stored = Value
-}
\ No newline at end of file

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/VInt.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/VInt.scala	2010-03-03 12:03:26 UTC (rev 1563)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/VInt.scala	2010-03-03 15:46:52 UTC (rev 1564)
@@ -3,11 +3,15 @@
 import org.infinispan.server.core.transport.ChannelBuffer
 
 /**
- * // TODO: Document this
+ * Reads and writes unsigned variable length integer values. Even though it's deprecated, do not
+ * remove from source code for the moment because it's a good scala example and could be used
+ * as reference. 
+ *
  * @author Galder Zamarreño
- * @since 4.0
+ * @since 4.1
+ * @deprecated Instead use ChannelBuffer.writeUnsignedInt and ChannelBuffer.readUnsignedInt
  */
-
+ at deprecated("Instead use ChannelBuffer.writeUnsignedInt and ChannelBuffer.readUnsignedInt")
 object VInt {
 
    def write(out: ChannelBuffer, i: Int) {

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/VLong.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/VLong.scala	2010-03-03 12:03:26 UTC (rev 1563)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/VLong.scala	2010-03-03 15:46:52 UTC (rev 1564)
@@ -3,11 +3,15 @@
 import org.infinispan.server.core.transport.ChannelBuffer
 
 /**
- * // TODO: Document this
+ * Reads and writes unsigned variable length long values. Even though it's deprecated, do not
+ * remove from source code for the moment because it's a good scala example and could be used
+ * as reference.
+ *
  * @author Galder Zamarreño
- * @since 4.0
+ * @since 4.1
+ * @deprecated Instead use ChannelBuffer.writeUnsignedLong and ChannelBuffer.readUnsignedLong
  */
-
+ at deprecated("Instead use ChannelBuffer.writeUnsignedLong and ChannelBuffer.readUnsignedLong")
 object VLong {
 
    def write(out: ChannelBuffer, i: Long) {

Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FlagsTest.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FlagsTest.scala	2010-03-03 12:03:26 UTC (rev 1563)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FlagsTest.scala	2010-03-03 15:46:52 UTC (rev 1564)
@@ -11,9 +11,9 @@
  * Keep an eye on that for @Test and @AfterClass annotations
  * 
  * @author Galder Zamarreño
- * @since 4.0
+ * @since 4.1
  */
- at Test
+ at Test(groups = Array("functional"), testName = "server.hotrod.FlagsTest")
 class FlagsTest {
 
    def testSingleFlag {

Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FunctionalTest.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FunctionalTest.scala	2010-03-03 12:03:26 UTC (rev 1563)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FunctionalTest.scala	2010-03-03 15:46:52 UTC (rev 1564)
@@ -33,13 +33,16 @@
    }
 
    def testPutBasic(m: Method) {
-      assertTrue(connect("127.0.0.1", server.port))
-      val status = put("__default", k(m) , 0, 0, v(m))
+      val result = connect("127.0.0.1", server.port)
+      assertTrue(result._1)
+      val ch = result._2
+      val status = put(ch, "__default", k(m) , 0, 0, v(m))
       assertTrue(status == 0, "Status should have been 0 but instead was: " + status)
    }
 
    @AfterClass(alwaysRun = true)
    override def destroyAfterClass {
+      super.destroyAfterClass
       log.debug("Test finished, close memcached server", null)
       server.stop
    }

Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/VariableLengthTest.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/VariableLengthTest.scala	2010-03-03 12:03:26 UTC (rev 1563)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/VariableLengthTest.scala	2010-03-03 15:46:52 UTC (rev 1564)
@@ -12,10 +12,10 @@
  * Keep an eye on that for @Test and @AfterClass annotations
  *  
  * @author Galder Zamarreño
- * @since 4.0
+ * @since 4.1
  */
 
- at Test
+ at Test(groups = Array("functional"), testName = "server.hotrod.VariableLengthTest")
 class VariableLengthTest {
 
    def test2pow7minus1 {
@@ -93,17 +93,21 @@
    private def writeReadInt(num: Int, expected: Int) {
       val buffer = new NettyChannelBuffer(ChannelBuffers.directBuffer(1024))
       assert(buffer.writerIndex == 0)
-      VInt.write(buffer, num)
+//      VInt.write(buffer, num)
+      buffer.writeUnsignedInt(num)
       assertEquals(buffer.writerIndex, expected)
-      assertEquals(VInt.read(buffer), num)
+//      assertEquals(VInt.read(buffer), num)
+      assertEquals(buffer.readUnsignedInt, num)
    }
 
    private def writeReadLong(num: Long, expected: Int) {
       val buffer = new NettyChannelBuffer(ChannelBuffers.directBuffer(1024))
       assert(buffer.writerIndex == 0)
-      VLong.write(buffer, num)
+//      VLong.write(buffer, num)
+      buffer.writeUnsignedLong(num)
       assertEquals(buffer.writerIndex, expected)
-      assertEquals(VLong.read(buffer), num)
+//      assertEquals(VLong.read(buffer), num)
+      assertEquals(buffer.readUnsignedLong, num)
    }
 
 //   def testEquals128Old() {

Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Client.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Client.scala	2010-03-03 12:03:26 UTC (rev 1563)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Client.scala	2010-03-03 15:46:52 UTC (rev 1564)
@@ -21,9 +21,8 @@
  * @since 4.1
  */
 trait Client {
-   private var channel: Channel = _
 
-   def connect(host: String, port: Int): Boolean = {
+   def connect(host: String, port: Int) = {
       // Set up.
       val factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool)
       val bootstrap: ClientBootstrap = new ClientBootstrap(factory)
@@ -33,16 +32,18 @@
       // Make a new connection.
       val connectFuture = bootstrap.connect(new InetSocketAddress(host, port))
       // Wait until the connection is made successfully.
-      channel = connectFuture.awaitUninterruptibly.getChannel
-      connectFuture.isSuccess
+      val ch = connectFuture.awaitUninterruptibly.getChannel
+      // Ideally, I'd store channel as a var in this trait. However, this causes issues with TestNG, see:
+      // http://thread.gmane.org/gmane.comp.lang.scala.user/24317
+      (connectFuture.isSuccess, ch)
    }
 
-   def put(cacheName: String, key: Array[Byte], lifespan: Int, maxIdle: Int, value: Array[Byte]): Byte = {
-      val writeFuture = channel.write(new Store(cacheName, key, lifespan, maxIdle, value))
+   def put(ch: Channel, cacheName: String, key: Array[Byte], lifespan: Int, maxIdle: Int, value: Array[Byte]): Byte = {
+      val writeFuture = ch.write(new Store(cacheName, key, lifespan, maxIdle, value))
       writeFuture.awaitUninterruptibly
       assertTrue(writeFuture.isSuccess)
       // Get the handler instance to retrieve the answer.
-      var handler = channel.getPipeline.getLast.asInstanceOf[ClientHandler]
+      var handler = ch.getPipeline.getLast.asInstanceOf[ClientHandler]
       handler.getResponse.status.id.byteValue
    }
 }
@@ -67,21 +68,21 @@
       val ret =
          msg match {
             case s: Store => {
-               val buf = new NettyChannelBuffer(ChannelBuffers.dynamicBuffer)
-               buf.writeByte(0xA0.asInstanceOf[Byte]) // magic
-               buf.writeByte(41) // version
-               buf.writeByte(0x01) // opcode - put
-               VInt.write(buf, s.cacheName.length) // cache name length
-               buf.writeBytes(s.cacheName.getBytes()) // cache name
-               VLong.write(buf, 1) // message id
-               VInt.write(buf, 0) // flags
-               VInt.write(buf, s.key.length) // key length
-               buf.writeBytes(s.key) // key
-               VInt.write(buf, s.lifespan) // lifespan
-               VInt.write(buf, s.maxIdle) // maxIdle
-               VInt.write(buf, s.value.length) // value length
-               buf.writeBytes(s.value) // value
-               buf.getUnderlyingChannelBuffer
+               val buffer = new NettyChannelBuffer(ChannelBuffers.dynamicBuffer)
+               buffer.writeByte(0xA0.asInstanceOf[Byte]) // magic
+               buffer.writeByte(41) // version
+               buffer.writeByte(0x01) // opcode - put
+               buffer.writeUnsignedInt(s.cacheName.length) // cache name length
+               buffer.writeBytes(s.cacheName.getBytes()) // cache name
+               buffer.writeUnsignedLong(1) // message id
+               buffer.writeUnsignedInt(0) // flags
+               buffer.writeUnsignedInt(s.key.length) // key length
+               buffer.writeBytes(s.key) // key
+               buffer.writeUnsignedInt(s.lifespan) // lifespan
+               buffer.writeUnsignedInt(s.maxIdle) // maxIdle
+               buffer.writeUnsignedInt(s.value.length) // value length
+               buffer.writeBytes(s.value) // value
+               buffer.getUnderlyingChannelBuffer
             }
       }
       ret
@@ -95,7 +96,7 @@
       val buf = new NettyChannelBuffer(buffer)
       val magic = buf.readUnsignedByte
       val opCode = buf.readUnsignedByte
-      val id = VLong.read(buf)
+      val id = buf.readUnsignedLong
       val status = buf.readUnsignedByte
       new Response(OpCodes.apply(opCode), id, Status.apply(status))
    }



More information about the infinispan-commits mailing list