[infinispan-commits] Infinispan SVN: r1639 - in trunk: core/src/main/java/org/infinispan/marshall/jboss and 6 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Tue Mar 30 03:19:43 EDT 2010


Author: galder.zamarreno at jboss.com
Date: 2010-03-30 03:19:41 -0400 (Tue, 30 Mar 2010)
New Revision: 1639

Added:
   trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodReplicationTest.scala
Removed:
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodHeader.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/ProtocolFlag.scala
   trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/ClusterTest.scala
Modified:
   trunk/core/src/main/java/org/infinispan/marshall/Ids.java
   trunk/core/src/main/java/org/infinispan/marshall/jboss/ConstantObjectTable.java
   trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolDecoder.scala
   trunk/server/core/src/main/scala/org/infinispan/server/core/CacheValue.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CacheKey.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodEncoder.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Response.scala
   trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodFunctionalTest.scala
   trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodClient.scala
   trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodTestingUtil.scala
   trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedDecoder.scala
   trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedValue.scala
   trunk/server/pom.xml
Log:
[ISPN-171] (Build a server module based on the HotRod protocol) Added corresponding Externalizers for Memcached and Hot Rod modules. Added replication test for Hot Rod module. Implemented more meaningful toString methods for java beans.

Modified: trunk/core/src/main/java/org/infinispan/marshall/Ids.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/marshall/Ids.java	2010-03-29 15:50:49 UTC (rev 1638)
+++ trunk/core/src/main/java/org/infinispan/marshall/Ids.java	2010-03-30 07:19:41 UTC (rev 1639)
@@ -108,4 +108,11 @@
    static final byte DEFAULT_CONSISTENT_HASH = 51;
    static final byte UNION_CONSISTENT_HASH = 52;
    static final byte JOIN_COMPLETE_COMMAND = 53;
+
+   /*
+    * ids for server modules
+    */
+   static final byte SERVER_CACHE_VALUE = 55;
+   static final byte MEMCACHED_CACHE_VALUE = 56;
+   static final byte HOTROD_CACHE_KEY = 57;
 }

Modified: trunk/core/src/main/java/org/infinispan/marshall/jboss/ConstantObjectTable.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/marshall/jboss/ConstantObjectTable.java	2010-03-29 15:50:49 UTC (rev 1638)
+++ trunk/core/src/main/java/org/infinispan/marshall/jboss/ConstantObjectTable.java	2010-03-30 07:19:41 UTC (rev 1639)
@@ -168,6 +168,10 @@
       MARSHALLABLES.add(ClearOperation.class.getName());
       MARSHALLABLES.add(DefaultConsistentHash.class.getName());
       MARSHALLABLES.add(UnionConsistentHash.class.getName());
+
+      MARSHALLABLES.add("org.infinispan.server.core.CacheValue");
+      MARSHALLABLES.add("org.infinispan.server.memcached.MemcachedValue");
+      MARSHALLABLES.add("org.infinispan.server.hotrod.CacheKey");
    }
 
    /**

Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolDecoder.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolDecoder.scala	2010-03-29 15:50:49 UTC (rev 1638)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolDecoder.scala	2010-03-30 07:19:41 UTC (rev 1639)
@@ -12,6 +12,7 @@
 import java.io.StreamCorruptedException
 import transport._
 import transport.ChannelBuffers._
+import org.infinispan.util.Util
 
 /**
  * // TODO: Document this
@@ -220,10 +221,24 @@
    private val DefaultTimeUnit = TimeUnit.MILLISECONDS 
 }
 
-class RequestHeader(val op: Enumeration#Value)
+class RequestHeader(val op: Enumeration#Value) {
+   override def toString = {
+      new StringBuilder().append("RequestHeader").append("{")
+         .append("op=").append(op)
+         .append("}").toString
+   }
+}
 
-// TODO: NoReply could possibly be passed to subclass specific to memcached and make create* implementations use it
-class RequestParameters(val data: Array[Byte], val lifespan: Int, val maxIdle: Int, val streamVersion: Long)
+class RequestParameters(val data: Array[Byte], val lifespan: Int, val maxIdle: Int, val streamVersion: Long) {
+   override def toString = {
+      new StringBuilder().append("RequestParameters").append("{")
+         .append("data=").append(Util.printArray(data, true))
+         .append(", lifespan=").append(lifespan)
+         .append(", maxIdle=").append(maxIdle)
+         .append(", streamVersion=").append(streamVersion)
+         .append("}").toString
+   }
+}
 
 class UnknownOperationException(reason: String) extends StreamCorruptedException(reason)
 

Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/CacheValue.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/CacheValue.scala	2010-03-29 15:50:49 UTC (rev 1638)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/CacheValue.scala	2010-03-30 07:19:41 UTC (rev 1639)
@@ -2,15 +2,16 @@
 
 import org.infinispan.util.Util
 import java.io.{Serializable, ObjectOutput, ObjectInput, Externalizable}
+import org.infinispan.marshall.{Externalizer, Ids, Marshallable}
 
 /**
  * // TODO: Document this
  * @author Galder Zamarreño
  * @since
  */
-// TODO: Make it a hardcoded Externalizer
-// TODO: maybe convert to scala case class -> whenever u create java beans, maybe create case classes
-class CacheValue(val data: Array[Byte], val version: Long) extends Serializable {
+// TODO: putting Ids.SERVER_CACHE_VALUE fails compilation in 2.8
+ at Marshallable(externalizer = classOf[CacheValueExternalizer], id = 55)
+class CacheValue(val data: Array[Byte], val version: Long) {
 
    override def toString = {
       new StringBuilder().append("CacheValue").append("{")
@@ -19,15 +20,20 @@
          .append("}").toString
    }
 
-//   override def readExternal(in: ObjectInput) {
-//      data = new Array[Byte](in.read())
-//      in.read(data)
-//      version = in.readLong
-//   }
-//
-//   override def writeExternal(out: ObjectOutput) {
-//      out.write(data.length)
-//      out.write(data)
-//      out.writeLong(version)
-//   }
-}
\ No newline at end of file
+}
+
+private class CacheValueExternalizer extends Externalizer {
+   override def writeObject(output: ObjectOutput, obj: AnyRef) {
+      val cacheValue = obj.asInstanceOf[CacheValue]
+      output.write(cacheValue.data.length)
+      output.write(cacheValue.data)
+      output.writeLong(cacheValue.version)
+   }
+
+   override def readObject(input: ObjectInput): AnyRef = {
+      val data = new Array[Byte](input.read())
+      input.read(data)
+      val version = input.readLong
+      new CacheValue(data, version)
+   }
+}

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CacheKey.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CacheKey.scala	2010-03-29 15:50:49 UTC (rev 1638)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CacheKey.scala	2010-03-30 07:19:41 UTC (rev 1639)
@@ -1,31 +1,48 @@
 package org.infinispan.server.hotrod
 
-import java.io.Serializable
 import org.infinispan.util.Util
 import java.util.Arrays
+import org.infinispan.marshall.{Externalizer, Marshallable}
+import java.io.{ObjectInput, ObjectOutput}
 
 /**
  * // TODO: Document this
  * @author Galder Zamarreño
  * @since
  */
+// TODO: putting Ids.HOTROD_CACHE_KEY fails compilation in 2.8
+ at Marshallable(externalizer = classOf[CacheKeyExternalizer], id = 57)
+final class CacheKey(val data: Array[Byte]) {
 
-final class CacheKey(val k: Array[Byte]) extends Serializable {
-
    override def equals(obj: Any) = {
       obj match {
-         // TODO: find out the right way to compare arrays in Scala
-         case k: CacheKey => Arrays.equals(k.k, this.k)
+         // Apparenlty this is the way arrays should be compared for equality of contents, see:
+         // http://old.nabble.com/-scala--Array-equality-td23149094.html
+         case k: CacheKey => Arrays.equals(k.data, this.data)
          case _ => false
       }
    }
 
-   override def hashCode: Int = 41 + Arrays.hashCode(k)
+   override def hashCode: Int = 41 + Arrays.hashCode(data)
 
    override def toString = {
-      new StringBuilder().append("HotRodKey").append("{")
-         .append("k=").append(Util.printArray(k, true))
+      new StringBuilder().append("CacheKey").append("{")
+         .append("data=").append(Util.printArray(data, true))
          .append("}").toString
    }
 
+}
+
+private class CacheKeyExternalizer extends Externalizer {
+   override def writeObject(output: ObjectOutput, obj: AnyRef) {
+      val cacheKey = obj.asInstanceOf[CacheKey]
+      output.write(cacheKey.data.length)
+      output.write(cacheKey.data)
+   }
+
+   override def readObject(input: ObjectInput): AnyRef = {
+      val data = new Array[Byte](input.read())
+      input.read(data)
+      new CacheKey(data)
+   }
 }
\ No newline at end of file

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala	2010-03-29 15:50:49 UTC (rev 1638)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala	2010-03-30 07:19:41 UTC (rev 1639)
@@ -202,4 +202,10 @@
    def toResponse(request: Enumeration#Value): OperationResponse = {
       responses.get(request).get
    }
+}
+
+object ProtocolFlag extends Enumeration {
+   type ProtocolFlag = Enumeration#Value
+   val NoFlag = Value(0)
+   val ForceReturnPreviousValue = Value(1)
 }
\ No newline at end of file

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala	2010-03-29 15:50:49 UTC (rev 1638)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala	2010-03-30 07:19:41 UTC (rev 1639)
@@ -8,6 +8,8 @@
 import org.infinispan.manager.{DefaultCacheManager, CacheManager}
 import java.io.{IOException, StreamCorruptedException}
 import org.infinispan.util.concurrent.TimeoutException
+import org.infinispan.server.hotrod.ProtocolFlag._
+import org.infinispan.server.hotrod.OperationResponse._
 
 /**
  * // TODO: Document this
@@ -128,4 +130,27 @@
 
 class UnknownVersionException(reason: String) extends StreamCorruptedException(reason)
 
-class InvalidMagicIdException(reason: String) extends StreamCorruptedException(reason)
\ No newline at end of file
+class InvalidMagicIdException(reason: String) extends StreamCorruptedException(reason)
+
+class HotRodHeader(override val op: Enumeration#Value, val messageId: Long, val cacheName: String,
+                   val flag: ProtocolFlag, val clientIntelligence: Short, val topologyId: Int,
+                   val decoder: AbstractVersionedDecoder) extends RequestHeader(op) {
+   override def toString = {
+      new StringBuilder().append("HotRodHeader").append("{")
+         .append("op=").append(op)
+         .append(", messageId=").append(messageId)
+         .append(", cacheName=").append(cacheName)
+         .append(", flag=").append(flag)
+         .append(", clientIntelligence=").append(clientIntelligence)
+         .append(", topologyId=").append(topologyId)
+         .append("}").toString
+   }
+}
+
+class ErrorHeader(override val messageId: Long) extends HotRodHeader(ErrorResponse, messageId, "", NoFlag, 0, 0, null) {
+   override def toString = {
+      new StringBuilder().append("ErrorHeader").append("{")
+         .append("messageId=").append(messageId)
+         .append("}").toString
+   }
+}
\ No newline at end of file

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodEncoder.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodEncoder.scala	2010-03-29 15:50:49 UTC (rev 1638)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodEncoder.scala	2010-03-30 07:19:41 UTC (rev 1639)
@@ -17,20 +17,16 @@
    override def encode(ctx: ChannelHandlerContext, channel: Channel, msg: AnyRef): AnyRef = {
       trace("Encode msg {0}", msg)
       val buffer: ChannelBuffer = msg match {
-         // TODO: move stats response down
+         case r: Response => writeHeader(r)
+      }
+      msg match {
          case s: StatsResponse => {
-            val buffer = dynamicBuffer
-            writeHeader(buffer, s)
             buffer.writeUnsignedInt(s.stats.size)
             for ((key, value) <- s.stats) {
                buffer.writeString(key)
                buffer.writeString(value)
             }
-            buffer
          }
-         case r: Response => writeHeader(dynamicBuffer, r)
-      }
-      msg match {
          case g: GetWithVersionResponse => {
             if (g.status == Success) {
                buffer.writeLong(g.version)
@@ -44,7 +40,8 @@
       buffer
    }
 
-   private def writeHeader(buffer: ChannelBuffer, r: Response): ChannelBuffer = {
+   private def writeHeader(r: Response): ChannelBuffer = {
+      val buffer = dynamicBuffer
       buffer.writeByte(Magic.byteValue)
       buffer.writeUnsignedLong(r.messageId)
       buffer.writeByte(r.operation.id.byteValue)

Deleted: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodHeader.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodHeader.scala	2010-03-29 15:50:49 UTC (rev 1638)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodHeader.scala	2010-03-30 07:19:41 UTC (rev 1639)
@@ -1,22 +0,0 @@
-package org.infinispan.server.hotrod
-
-import org.infinispan.server.core.RequestHeader
-import org.infinispan.server.hotrod.ProtocolFlag._
-import org.infinispan.server.hotrod.OperationResponse._
-
-/**
- * // TODO: Document this
- * @author Galder Zamarreño
- * @since
- */
-
-class HotRodHeader(override val op: Enumeration#Value, val messageId: Long, val cacheName: String,
-                   val flag: ProtocolFlag, val clientIntelligence: Short, val topologyId: Int,
-                   val decoder: AbstractVersionedDecoder) extends RequestHeader(op) {
-
-   // TODO: add meaningfull toString()
-}
-
-class ErrorHeader(override val messageId: Long) extends HotRodHeader(ErrorResponse, messageId, "", NoFlag, 0, 0, null) {
-   // TODO: add meaningfull toString()   
-}
\ No newline at end of file

Deleted: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/ProtocolFlag.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/ProtocolFlag.scala	2010-03-29 15:50:49 UTC (rev 1638)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/ProtocolFlag.scala	2010-03-30 07:19:41 UTC (rev 1639)
@@ -1,13 +0,0 @@
-package org.infinispan.server.hotrod
-
-/**
- * // TODO: Document this
- * @author Galder Zamarreño
- * @since
- */
-
-object ProtocolFlag extends Enumeration {
-   type ProtocolFlag = Value
-   val NoFlag = Value(0)
-   val ForceReturnValue = Value(1)
-}
\ No newline at end of file

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Response.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Response.scala	2010-03-29 15:50:49 UTC (rev 1638)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Response.scala	2010-03-30 07:19:41 UTC (rev 1639)
@@ -2,15 +2,14 @@
 
 import OperationStatus._
 import OperationResponse._
+import org.infinispan.util.Util
 
 /**
  * // TODO: Document this
  * @author Galder Zamarreño
  * @since 4.1
  */
-
 class Response(val messageId: Long, val operation: OperationResponse, val status: OperationStatus) {
-
    override def toString = {
       new StringBuilder().append("Response").append("{")
          .append("messageId=").append(messageId)
@@ -18,27 +17,38 @@
          .append(", status=").append(status)
          .append("}").toString
    }
-
 }
 
 class GetResponse(override val messageId: Long, override val operation: OperationResponse,
                   override val status: OperationStatus, val data: Option[Array[Byte]])
       extends Response(messageId, operation, status) {
-
-   // TODO add meaningful toString()
+   override def toString = {
+      new StringBuilder().append("GetResponse").append("{")
+         .append("messageId=").append(messageId)
+         .append(", operation=").append(operation)
+         .append(", status=").append(status)
+         .append(", data=").append(if (data == None) "null" else Util.printArray(data.get, true))
+         .append("}").toString
+   }
 }
 
 class GetWithVersionResponse(override val messageId: Long, override val operation: OperationResponse,
                              override val status: OperationStatus, override val data: Option[Array[Byte]],
                              val version: Long)
       extends GetResponse(messageId, operation, status, data) {
-
-   // TODO add meaningful toString()
+   override def toString = {
+      new StringBuilder().append("GetWithVersionResponse").append("{")
+         .append("messageId=").append(messageId)
+         .append(", operation=").append(operation)
+         .append(", status=").append(status)
+         .append(", data=").append(if (data == None) "null" else Util.printArray(data.get, true))
+         .append(", version=").append(version)
+         .append("}").toString
+   }
 }
 
 class ErrorResponse(override val messageId: Long, override val status: OperationStatus,
                     val msg: String) extends Response(messageId, ErrorResponse, status) {
-
    override def toString = {
       new StringBuilder().append("ErrorResponse").append("{")
          .append("messageId=").append(messageId)
@@ -47,7 +57,13 @@
          .append(", msg=").append(msg)
          .append("}").toString
    }
-
 }
 
-class StatsResponse(override val messageId: Long, val stats: Map[String, String]) extends Response(messageId, StatsResponse, Success)
\ No newline at end of file
+class StatsResponse(override val messageId: Long, val stats: Map[String, String]) extends Response(messageId, StatsResponse, Success) {
+   override def toString = {
+      new StringBuilder().append("StatsResponse").append("{")
+         .append("messageId=").append(messageId)
+         .append(", stats=").append(stats)
+         .append("}").toString
+   }
+}
\ No newline at end of file

Deleted: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/ClusterTest.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/ClusterTest.scala	2010-03-29 15:50:49 UTC (rev 1638)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/ClusterTest.scala	2010-03-30 07:19:41 UTC (rev 1639)
@@ -1,60 +0,0 @@
-//package org.infinispan.server.hotrod
-//
-//import org.infinispan.test.MultipleCacheManagersTest
-//import org.infinispan.config.Configuration
-//import org.jboss.netty.channel.Channel
-//import test.{Utils, Client}
-//import java.lang.reflect.Method
-//import org.infinispan.manager.CacheManager
-//import org.testng.annotations.{BeforeClass, AfterClass, Test}
-//import org.infinispan.config.Configuration.CacheMode
-//import org.infinispan.context.Flag
-//import org.infinispan.server.hotrod.Status._
-//
-///**
-// * // TODO: Document this
-// * @author Galder Zamarreño
-// * @since
-// */
-//
-//@Test(groups = Array("functional"), testName = "server.hotrod.ClusterTest")
-//class ClusterTest extends MultipleCacheManagersTest with Utils with Client {
-//   private val cacheName = "hotRodReplSync"
-//   private[this] var servers: List[HotRodServer] = List()
-//   private[this] var channels: List[Channel] = List()
-//
-//   @Test(enabled=false) // Disable explicitly to avoid TestNG thinking this is a test!!
-//   override def createCacheManagers {
-//      var replSync = getDefaultClusteredConfig(Configuration.CacheMode.REPL_SYNC)
-//      createClusteredCaches(2, cacheName, replSync)
-//      servers = startHotRodServer(cacheManagers.get(0)) :: servers
-//      servers = startHotRodServer(cacheManagers.get(1), servers.head.port + 50) :: servers
-//      servers.foreach {
-//         s => s.start
-//         channels = connect("127.0.0.1", s.port) :: channels
-//      }
-//   }
-//
-//   @AfterClass(alwaysRun = true)
-//   override def destroy {
-//      super.destroy
-//      log.debug("Test finished, close Hot Rod server", null)
-//      servers.foreach(_.stop)
-//   }
-//
-//   def tesReplicatedPut(m: Method) {
-//      val putSt = put(channels.head, cacheName, k(m) , 0, 0, v(m))
-//      assertStatus(putSt, Success)
-//      val (getSt, actual) = get(channels.tail.head, cacheName, k(m), null)
-//      assertSuccess(getSt, v(m), actual)
-//   }
-//
-//   def tesLocalOnlyPut(m: Method) {
-//      val putSt = put(channels.head, cacheName, k(m) , 0, 0, v(m), Set(Flag.CACHE_MODE_LOCAL))
-//       assertStatus(putSt, Success)
-//      val (getSt, actual) = get(channels.tail.head, cacheName, k(m), null)
-//      assertKeyDoesNotExist(getSt, actual)
-//   }
-//
-//   // todo: test multiple flags
-//}
\ No newline at end of file

Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodFunctionalTest.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodFunctionalTest.scala	2010-03-29 15:50:49 UTC (rev 1638)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodFunctionalTest.scala	2010-03-30 07:19:41 UTC (rev 1639)
@@ -57,7 +57,7 @@
 
    def testUnknownMagic(m: Method) {
       client.assertPut(m) // Do a put to make sure decoder gets back to reading properly
-      val status = client.execute(0x66, 0x01, cacheName, k(m) , 0, 0, v(m), 0, 0)
+      val status = client.executeWithBadMagic(0x66, 0x01, cacheName, k(m) , 0, 0, v(m), 0, 0)
       assertTrue(status == InvalidMagicOrMsgId,
          "Status should have been 'InvalidMagicOrMsgId' but instead was: " + status)
    }

Copied: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodReplicationTest.scala (from rev 1625, trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/ClusterTest.scala)
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodReplicationTest.scala	                        (rev 0)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodReplicationTest.scala	2010-03-30 07:19:41 UTC (rev 1639)
@@ -0,0 +1,78 @@
+package org.infinispan.server.hotrod
+
+import org.infinispan.test.MultipleCacheManagersTest
+import org.infinispan.config.Configuration
+import java.lang.reflect.Method
+import org.testng.annotations.{AfterClass, Test}
+import test.HotRodClient
+import test.HotRodTestingUtil._
+import org.infinispan.server.hotrod.OperationStatus._
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+
+ at Test(groups = Array("functional"), testName = "server.hotrod.ClusterTest")
+class HotRodReplicationTest extends MultipleCacheManagersTest {
+   private val cacheName = "hotRodReplSync"
+   private[this] var servers: List[HotRodServer] = List()
+   private[this] var clients: List[HotRodClient] = List()
+
+   @Test(enabled=false) // Disable explicitly to avoid TestNG thinking this is a test!!
+   override def createCacheManagers {
+      var replSync = getDefaultClusteredConfig(Configuration.CacheMode.REPL_SYNC)
+      createClusteredCaches(2, cacheName, replSync)
+      servers = startHotRodServer(cacheManagers.get(0)) :: servers
+      servers = startHotRodServer(cacheManagers.get(1), servers.head.getPort + 50) :: servers
+      servers.foreach {s =>
+         clients = new HotRodClient("127.0.0.1", s.getPort, cacheName) :: clients
+      }
+   }
+
+   @AfterClass(alwaysRun = true)
+   override def destroy {
+      super.destroy
+      log.debug("Test finished, close Hot Rod server", null)
+      clients.foreach(_.stop)
+      servers.foreach(_.stop)
+   }
+
+   def tesReplicatedPut(m: Method) {
+      val putSt = clients.head.put(k(m) , 0, 0, v(m))
+      assertStatus(putSt, Success)
+      val (getSt, actual) = clients.tail.head.get(k(m), 0)
+      assertSuccess(getSt, v(m), actual)
+   }
+
+   def tesReplicatedPutIfAbsent(m: Method) {
+      val (getSt, actual) = clients.head.assertGet(m)
+      assertKeyDoesNotExist(getSt, actual)
+      val (getSt2, actual2) = clients.tail.head.assertGet(m)
+      assertKeyDoesNotExist(getSt2, actual2)
+      val putSt = clients.head.putIfAbsent(k(m) , 0, 0, v(m))
+      assertStatus(putSt, Success)
+      val (getSt3, actual3) = clients.tail.head.get(k(m), 0)
+      assertSuccess(getSt3, v(m), actual3)
+      val putSt2 = clients.tail.head.putIfAbsent(k(m) , 0, 0, v(m, "v2-"))
+      assertStatus(putSt2, OperationNotExecuted)
+   }
+
+   def testReplicatedReplace(m: Method) {
+      val status = clients.head.replace(k(m), 0, 0, v(m))
+      assertStatus(status, OperationNotExecuted)
+      val status2 = clients.tail.head.replace(k(m), 0, 0, v(m))
+      assertStatus(status2, OperationNotExecuted)
+      clients.tail.head.assertPut(m)
+      val status3 = clients.tail.head.replace(k(m), 0, 0, v(m, "v1-"))
+      assertStatus(status3, Success)
+      val (getSt, actual) = clients.head.assertGet(m)
+      assertSuccess(getSt, v(m, "v1-"), actual)
+      val status4 = clients.head.replace(k(m), 0, 0, v(m, "v2-"))
+      assertStatus(status4, Success)
+      val (getSt2, actual2) = clients.tail.head.assertGet(m)
+      assertSuccess(getSt2, v(m, "v2-"), actual2)
+   }
+
+}
\ No newline at end of file

Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodClient.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodClient.scala	2010-03-29 15:50:49 UTC (rev 1638)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodClient.scala	2010-03-30 07:19:41 UTC (rev 1639)
@@ -12,16 +12,17 @@
 import org.infinispan.server.hotrod.Response
 import org.infinispan.server.hotrod.OperationStatus._
 import org.infinispan.server.hotrod.OperationResponse._
-import java.util.concurrent.atomic.AtomicInteger
 import org.infinispan.server.core.transport.NoState
 import org.jboss.netty.channel.ChannelHandler.Sharable
-import java.util.concurrent.{TimeUnit, LinkedBlockingQueue, Executors}
 import org.infinispan.server.core.transport.netty.{ChannelBufferAdapter}
 import org.infinispan.server.core.Logging
 import collection.mutable
 import collection.immutable
 import java.lang.reflect.Method
 import test.HotRodTestingUtil._
+import java.util.concurrent.{ConcurrentHashMap, TimeUnit, LinkedBlockingQueue, Executors}
+import java.util.concurrent.atomic.{AtomicLong, AtomicInteger}
+import org.infinispan.test.TestingUtil
 
 /**
  * A very simply Hot Rod client for testing purpouses
@@ -33,7 +34,7 @@
  * @author Galder Zamarreño
  * @since 4.1
  */
-class HotRodClient(host: String, port: Int, defaultCacheName: String) {
+class HotRodClient(host: String, port: Int, defaultCacheName: String) {   
 
    private lazy val ch: Channel = {
       val factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool)
@@ -81,13 +82,24 @@
 
    def execute(magic: Int, code: Byte, name: String, k: Array[Byte], lifespan: Int, maxIdle: Int,
                v: Array[Byte], flags: Int, version: Long): OperationStatus = {
-      val writeFuture = ch.write(new Op(magic, code, name, k, lifespan, maxIdle, v, flags, version))
+      val op = new Op(magic, code, name, k, lifespan, maxIdle, v, flags, version)
+      val writeFuture = ch.write(op)
       writeFuture.awaitUninterruptibly
       assertTrue(writeFuture.isSuccess)
       var handler = ch.getPipeline.getLast.asInstanceOf[ClientHandler]
-      handler.getResponse.status
+      handler.getResponse(op.id).status
    }
 
+   def executeWithBadMagic(magic: Int, code: Byte, name: String, k: Array[Byte], lifespan: Int, maxIdle: Int,
+                           v: Array[Byte], flags: Int, version: Long): OperationStatus = {
+      val op = new Op(magic, code, name, k, lifespan, maxIdle, v, flags, version)
+      val writeFuture = ch.write(op)
+      writeFuture.awaitUninterruptibly
+      assertTrue(writeFuture.isSuccess)
+      var handler = ch.getPipeline.getLast.asInstanceOf[ClientHandler]
+      handler.getResponse(0).status
+   }
+
    def get(k: Array[Byte], flags: Int): (OperationStatus, Array[Byte]) = {
       val (getSt, actual, version) = get(0x03, k, 0)
       (getSt, actual)
@@ -109,19 +121,20 @@
       execute(0xA0, 0x0B, defaultCacheName, k, 0, 0, null, 0, 0)
 
    def get(code: Byte, k: Array[Byte], flags: Int): (OperationStatus, Array[Byte], Long) = {
-      val writeFuture = ch.write(new Op(0xA0, code, defaultCacheName, k, 0, 0, null, flags, 0))
+      val op = new Op(0xA0, code, defaultCacheName, k, 0, 0, null, flags, 0)
+      val writeFuture = ch.write(op)
       writeFuture.awaitUninterruptibly
       assertTrue(writeFuture.isSuccess)
       // Get the handler instance to retrieve the answer.
       var handler = ch.getPipeline.getLast.asInstanceOf[ClientHandler]
       if (code == 0x03) {
-         val resp = handler.getResponse.asInstanceOf[GetResponse]
+         val resp = handler.getResponse(op.id).asInstanceOf[GetResponse]
          (resp.status, if (resp.data == None) null else resp.data.get, 0)
       } else if (code == 0x11) {
-         val resp = handler.getResponse.asInstanceOf[GetWithVersionResponse]
+         val resp = handler.getResponse(op.id).asInstanceOf[GetWithVersionResponse]
          (resp.status, if (resp.data == None) null else resp.data.get, resp.version)
       } else if (code == 0x0F) {
-         (handler.getResponse.status, null, 0)
+         (handler.getResponse(op.id).status, null, 0)
       } else {
          (OperationNotExecuted, null, 0)
       }
@@ -130,12 +143,13 @@
    def clear: OperationStatus = execute(0xA0, 0x13, defaultCacheName, null, 0, 0, null, 0, 0)
 
    def stats: Map[String, String] = {
-      val writeFuture = ch.write(new StatsOp(0xA0, 0x15, defaultCacheName, null))
+      val op = new StatsOp(0xA0, 0x15, defaultCacheName, null)
+      val writeFuture = ch.write(op)
       writeFuture.awaitUninterruptibly
       assertTrue(writeFuture.isSuccess)
       // Get the handler instance to retrieve the answer.
       var handler = ch.getPipeline.getLast.asInstanceOf[ClientHandler]
-      val resp = handler.getResponse.asInstanceOf[StatsResponse]
+      val resp = handler.getResponse(op.id).asInstanceOf[StatsResponse]
       resp.stats
    }
 
@@ -159,15 +173,13 @@
 @Sharable
 private object Encoder extends OneToOneEncoder {
 
-   private val idCounter: AtomicInteger = new AtomicInteger
-
    override def encode(ctx: ChannelHandlerContext, ch: Channel, msg: Any) = {
       val ret =
          msg match {
             case op: Op => {
                val buffer = new ChannelBufferAdapter(ChannelBuffers.dynamicBuffer)
                buffer.writeByte(op.magic.asInstanceOf[Byte]) // magic
-               buffer.writeUnsignedLong(idCounter.incrementAndGet) // message id
+               buffer.writeUnsignedLong(op.id) // message id
                buffer.writeByte(10) // version
                buffer.writeByte(op.code) // opcode
                buffer.writeRangedBytes(op.cacheName.getBytes()) // cache name length + cache name
@@ -197,13 +209,16 @@
 
 }
 
+object HotRodClient {
+   val idCounter = new AtomicLong
+}
+
 private object Decoder extends ReplayingDecoder[NoState] with Logging {
 
    override def decode(ctx: ChannelHandlerContext, ch: Channel, buffer: ChannelBuffer, state: NoState): Object = {
       val buf = new ChannelBufferAdapter(buffer)
       val magic = buf.readUnsignedByte
       val id = buf.readUnsignedLong
-      // val opCode = OperationResolver.resolve(buf.readUnsignedByte)
       val opCode = OperationResponse.apply(buf.readUnsignedByte)
       val status = OperationStatus.apply(buf.readUnsignedByte)
       val topologyChangeMarker = buf.readUnsignedByte
@@ -249,15 +264,26 @@
 
 private class ClientHandler extends SimpleChannelUpstreamHandler {
 
-   private val answer = new LinkedBlockingQueue[Response];
+   private val responses = new ConcurrentHashMap[Long, Response]
 
    override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent) {
-      val offered = answer.offer(e.getMessage.asInstanceOf[Response])
-      assertTrue(offered)
+      val resp = e.getMessage.asInstanceOf[Response]
+      responses.put(resp.messageId, resp)
    }
 
-   def getResponse: Response = {
-      answer.poll(60, TimeUnit.SECONDS)
+   def getResponse(messageId: Long): Response = {
+      // TODO: Very very primitive way of waiting for a response. Convert to a Future
+      var i = 0;
+      var v: Response = null;
+      do {
+         v = responses.get(messageId)
+         if (v == null) {
+            TestingUtil.sleepThread(100)
+            i += 1
+         }
+      }
+      while (v == null && i < 20)
+      v
    }
 
 }
@@ -270,7 +296,9 @@
                  val maxIdle: Int,
                  val value: Array[Byte],
                  val flags: Int,
-                 val version: Long)
+                 val version: Long) {
+   lazy val id = HotRodClient.idCounter.incrementAndGet
+}
 
 private class StatsOp(override val magic: Int,
                  override val code: Byte,

Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodTestingUtil.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodTestingUtil.scala	2010-03-29 15:50:49 UTC (rev 1638)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodTestingUtil.scala	2010-03-30 07:19:41 UTC (rev 1639)
@@ -14,8 +14,6 @@
  * @author Galder Zamarreño
  * @since 4.1
  */
-
-// TODO: convert to object so that mircea can use it
 object HotRodTestingUtil extends Logging {
 
    import HotRodTestingUtil._

Modified: trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedDecoder.scala
===================================================================
--- trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedDecoder.scala	2010-03-29 15:50:49 UTC (rev 1638)
+++ trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedDecoder.scala	2010-03-30 07:19:41 UTC (rev 1639)
@@ -14,6 +14,7 @@
 import org.infinispan.{AdvancedCache, Version, CacheException, Cache}
 import collection.mutable.ListBuffer
 import org.infinispan.server.core.transport.ChannelBuffers._
+import org.infinispan.util.Util
 
 /**
  * // TODO: Document this
@@ -383,7 +384,20 @@
 class MemcachedParameters(override val data: Array[Byte], override val lifespan: Int,
                           override val maxIdle: Int, override val streamVersion: Long,
                           val noReply: Boolean, val flags: Int, val delta: String,
-                          val flushDelay: Int) extends RequestParameters(data, lifespan, maxIdle, streamVersion)
+                          val flushDelay: Int) extends RequestParameters(data, lifespan, maxIdle, streamVersion) {
+   override def toString = {
+      new StringBuilder().append("MemcachedParameters").append("{")
+         .append("data=").append(Util.printArray(data, true))
+         .append(", lifespan=").append(lifespan)
+         .append(", maxIdle=").append(maxIdle)
+         .append(", streamVersion=").append(streamVersion)
+         .append(", noReply=").append(noReply)
+         .append(", flags=").append(flags)
+         .append(", delta=").append(delta)
+         .append(", flushDelay=").append(flushDelay)
+         .append("}").toString
+   }   
+}
 
 private class DelayedFlushAll(cache: Cache[String, MemcachedValue],
                               flushFunction: AdvancedCache[String, MemcachedValue] => Unit) extends Runnable {

Modified: trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedValue.scala
===================================================================
--- trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedValue.scala	2010-03-29 15:50:49 UTC (rev 1638)
+++ trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedValue.scala	2010-03-30 07:19:41 UTC (rev 1639)
@@ -3,13 +3,15 @@
 import org.infinispan.server.core.CacheValue
 import org.infinispan.util.Util
 import java.io.{ObjectOutput, ObjectInput}
+import org.infinispan.marshall.{Marshallable, Externalizer}
 
 /**
  * // TODO: Document this
  * @author Galder Zamarreño
  * @since
  */
-// TODO: Make it a hardcoded Externalizer
+// TODO: putting Ids.MEMCACHED_CACHE_VALUE fails compilation in 2.8
+ at Marshallable(externalizer = classOf[MemcachedValueExternalizer], id = 56)
 class MemcachedValue(override val data: Array[Byte], override val version: Long, val flags: Int)
       extends CacheValue(data, version) {
 
@@ -21,20 +23,22 @@
          .append("}").toString
    }
 
-//   override def readExternal(in: ObjectInput) {
-////      data = new Array[Byte](in.read())
-////      in.read(data)
-////      version = in.readLong
-//      super.readExternal(in)
-//      flags = in.readInt
-//   }
-//
-//   override def writeExternal(out: ObjectOutput) {
-//      super.writeExternal(out)
-////      out.write(data.length)
-////      out.write(data)
-////      out.writeLong(version)
-//      out.writeInt(flags)
-//   }
+}
 
+private class MemcachedValueExternalizer extends Externalizer {
+   override def writeObject(output: ObjectOutput, obj: AnyRef) {
+      val cacheValue = obj.asInstanceOf[MemcachedValue]
+      output.write(cacheValue.data.length)
+      output.write(cacheValue.data)
+      output.writeLong(cacheValue.version)
+      output.writeInt(cacheValue.flags)
+   }
+
+   override def readObject(input: ObjectInput): AnyRef = {
+      val data = new Array[Byte](input.read())
+      input.read(data)
+      val version = input.readLong
+      val flags = input.readInt
+      new MemcachedValue(data, version, flags)
+   }
 }
\ No newline at end of file

Modified: trunk/server/pom.xml
===================================================================
--- trunk/server/pom.xml	2010-03-29 15:50:49 UTC (rev 1638)
+++ trunk/server/pom.xml	2010-03-30 07:19:41 UTC (rev 1639)
@@ -13,6 +13,13 @@
    <name>Parent pom for server modules</name>
    <packaging>pom</packaging>
 
+   <modules>
+      <module>core</module>
+      <module>rest</module>
+      <module>memcached</module>
+      <module>hotrod</module>
+   </modules>
+
    <dependencies>
       <dependency>
          <groupId>${project.groupId}</groupId>
@@ -28,4 +35,4 @@
          <scope>test</scope>
       </dependency>
    </dependencies>
-</project>
\ No newline at end of file
+</project>



More information about the infinispan-commits mailing list