[infinispan-commits] Infinispan SVN: r1693 - in trunk: core/src/main/java/org/infinispan/marshall/jboss and 7 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Thu Apr 15 10:48:27 EDT 2010
Author: galder.zamarreno at jboss.com
Date: 2010-04-15 10:48:24 -0400 (Thu, 15 Apr 2010)
New Revision: 1693
Added:
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/TopologyAddress.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/TopologyView.scala
Modified:
trunk/core/src/main/java/org/infinispan/marshall/Ids.java
trunk/core/src/main/java/org/infinispan/marshall/jboss/ConstantObjectTable.java
trunk/server/core/src/main/scala/org/infinispan/server/core/CacheValue.scala
trunk/server/core/src/main/scala/org/infinispan/server/core/VersionGenerator.scala
trunk/server/core/src/main/scala/org/infinispan/server/core/transport/ChannelBuffer.scala
trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelBufferAdapter.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/AbstractVersionedDecoder.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CacheKey.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodEncoder.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Response.scala
trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodFunctionalTest.scala
trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodReplicationTest.scala
trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodStatsTest.scala
trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodClient.scala
trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodTestingUtil.scala
trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedValue.scala
Log:
[ISPN-384] (Implement topology and hash distribution headers in Hot Rod) Topology-aware headers now returned upon detecting view id discrepancies between client and server.
Modified: trunk/core/src/main/java/org/infinispan/marshall/Ids.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/marshall/Ids.java 2010-04-15 13:52:57 UTC (rev 1692)
+++ trunk/core/src/main/java/org/infinispan/marshall/Ids.java 2010-04-15 14:48:24 UTC (rev 1693)
@@ -115,4 +115,7 @@
static final byte SERVER_CACHE_VALUE = 55;
static final byte MEMCACHED_CACHE_VALUE = 56;
static final byte HOTROD_CACHE_KEY = 57;
+ static final byte TOPOLOGY_ADDRESS = 58;
+ static final byte TOPOLOGY_VIEW = 59;
+
}
Modified: trunk/core/src/main/java/org/infinispan/marshall/jboss/ConstantObjectTable.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/marshall/jboss/ConstantObjectTable.java 2010-04-15 13:52:57 UTC (rev 1692)
+++ trunk/core/src/main/java/org/infinispan/marshall/jboss/ConstantObjectTable.java 2010-04-15 14:48:24 UTC (rev 1693)
@@ -172,6 +172,8 @@
MARSHALLABLES.add("org.infinispan.server.core.CacheValue");
MARSHALLABLES.add("org.infinispan.server.memcached.MemcachedValue");
MARSHALLABLES.add("org.infinispan.server.hotrod.CacheKey");
+ MARSHALLABLES.add("org.infinispan.server.hotrod.TopologyAddress");
+ MARSHALLABLES.add("org.infinispan.server.hotrod.TopologyView");
}
/**
Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/CacheValue.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/CacheValue.scala 2010-04-15 13:52:57 UTC (rev 1692)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/CacheValue.scala 2010-04-15 14:48:24 UTC (rev 1693)
@@ -1,8 +1,8 @@
package org.infinispan.server.core
import org.infinispan.util.Util
-import java.io.{Serializable, ObjectOutput, ObjectInput, Externalizable}
-import org.infinispan.marshall.{Externalizer, Ids, Marshallable}
+import java.io.{ObjectOutput, ObjectInput}
+import org.infinispan.marshall.Marshallable
/**
* // TODO: Document this
@@ -10,7 +10,7 @@
* @since 4.1
*/
// TODO: putting Ids.SERVER_CACHE_VALUE fails compilation in 2.8 - https://lampsvn.epfl.ch/trac/scala/ticket/2764
- at Marshallable(externalizer = classOf[CacheValueExternalizer], id = 55)
+ at Marshallable(externalizer = classOf[CacheValue.Externalizer], id = 55)
class CacheValue(val data: Array[Byte], val version: Long) {
override def toString = {
@@ -22,18 +22,20 @@
}
-private class CacheValueExternalizer extends Externalizer {
- override def writeObject(output: ObjectOutput, obj: AnyRef) {
- val cacheValue = obj.asInstanceOf[CacheValue]
- output.write(cacheValue.data.length)
- output.write(cacheValue.data)
- output.writeLong(cacheValue.version)
- }
+object CacheValue {
+ class Externalizer extends org.infinispan.marshall.Externalizer {
+ override def writeObject(output: ObjectOutput, obj: AnyRef) {
+ val cacheValue = obj.asInstanceOf[CacheValue]
+ output.write(cacheValue.data.length)
+ output.write(cacheValue.data)
+ output.writeLong(cacheValue.version)
+ }
- override def readObject(input: ObjectInput): AnyRef = {
- val data = new Array[Byte](input.read())
- input.readFully(data)
- val version = input.readLong
- new CacheValue(data, version)
+ override def readObject(input: ObjectInput): AnyRef = {
+ val data = new Array[Byte](input.read())
+ input.readFully(data)
+ val version = input.readLong
+ new CacheValue(data, version)
+ }
}
}
Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/VersionGenerator.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/VersionGenerator.scala 2010-04-15 13:52:57 UTC (rev 1692)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/VersionGenerator.scala 2010-04-15 14:48:24 UTC (rev 1693)
@@ -16,6 +16,7 @@
*/
object VersionGenerator {
+ // TODO: Possibly seed version counter on capped System.currentTimeMillis, to avoid issues with clients holding to versions in between restarts
private val versionCounter = new AtomicInteger
private val versionPrefix = new AtomicLong
Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/ChannelBuffer.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/ChannelBuffer.scala 2010-04-15 13:52:57 UTC (rev 1692)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/ChannelBuffer.scala 2010-04-15 14:48:24 UTC (rev 1693)
@@ -11,6 +11,7 @@
def readUnsignedByte: Short
def readUnsignedInt: Int
def readUnsignedLong: Long
+ def readUnsignedShort: Int
def readBytes(length: Int): ChannelBuffer
def readerIndex: Int
def readBytes(dst: Array[Byte]): Unit
@@ -31,6 +32,7 @@
def writeRangedBytes(src: Array[Byte])
def writeUnsignedInt(i: Int)
def writeUnsignedLong(l: Long)
+ def writeUnsignedShort(i: Int)
def writerIndex: Int
/**
Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelBufferAdapter.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelBufferAdapter.scala 2010-04-15 13:52:57 UTC (rev 1692)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelBufferAdapter.scala 2010-04-15 14:48:24 UTC (rev 1693)
@@ -16,6 +16,7 @@
override def readUnsignedByte: Short = buffer.readUnsignedByte
override def readUnsignedInt: Int = VInt.read(this)
override def readUnsignedLong: Long = VLong.read(this)
+ override def readUnsignedShort: Int = buffer.readUnsignedShort
override def readBytes(length: Int): ChannelBuffer = new ChannelBufferAdapter(buffer.readBytes(length))
override def readerIndex: Int = readerIndex
override def readBytes(dst: Array[Byte]) = buffer.readBytes(dst)
@@ -43,6 +44,7 @@
}
override def writeUnsignedInt(i: Int) = VInt.write(this, i)
override def writeUnsignedLong(l: Long) = VLong.write(this, l)
+ override def writeUnsignedShort(i: Int) = buffer.writeShort(i)
override def writerIndex: Int = buffer.writerIndex
/**
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/AbstractVersionedDecoder.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/AbstractVersionedDecoder.scala 2010-04-15 13:52:57 UTC (rev 1692)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/AbstractVersionedDecoder.scala 2010-04-15 14:48:24 UTC (rev 1693)
@@ -29,10 +29,12 @@
def createNotExistResponse(header: HotRodHeader): AnyRef
- def createGetResponse(messageId: Long, v: CacheValue, op: Enumeration#Value): AnyRef
+ def createGetResponse(header: HotRodHeader, v: CacheValue, op: Enumeration#Value): AnyRef
def handleCustomRequest(header: HotRodHeader, buffer: ChannelBuffer, cache: Cache[CacheKey, CacheValue]): AnyRef
def createStatsResponse(header: HotRodHeader, stats: Stats): AnyRef
+ def createErrorResponse(header: HotRodHeader, t: Throwable): AnyRef
+
}
\ No newline at end of file
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CacheKey.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CacheKey.scala 2010-04-15 13:52:57 UTC (rev 1692)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CacheKey.scala 2010-04-15 14:48:24 UTC (rev 1693)
@@ -2,9 +2,8 @@
import org.infinispan.util.Util
import java.util.Arrays
-import org.infinispan.marshall.{Externalizer, Marshallable}
+import org.infinispan.marshall.Marshallable
import java.io.{ObjectInput, ObjectOutput}
-import org.infinispan.server.core.Logging
/**
* // TODO: Document this
@@ -12,7 +11,7 @@
* @since
*/
// TODO: putting Ids.HOTROD_CACHE_KEY fails compilation in 2.8 - https://lampsvn.epfl.ch/trac/scala/ticket/2764
- at Marshallable(externalizer = classOf[CacheKeyExternalizer], id = 57)
+ at Marshallable(externalizer = classOf[CacheKey.Externalizer], id = 57)
final class CacheKey(val data: Array[Byte]) {
override def equals(obj: Any) = {
@@ -34,16 +33,18 @@
}
-private class CacheKeyExternalizer extends Externalizer {
- override def writeObject(output: ObjectOutput, obj: AnyRef) {
- val cacheKey = obj.asInstanceOf[CacheKey]
- output.write(cacheKey.data.length)
- output.write(cacheKey.data)
- }
+object CacheKey {
+ class Externalizer extends org.infinispan.marshall.Externalizer {
+ override def writeObject(output: ObjectOutput, obj: AnyRef) {
+ val cacheKey = obj.asInstanceOf[CacheKey]
+ output.write(cacheKey.data.length)
+ output.write(cacheKey.data)
+ }
- override def readObject(input: ObjectInput): AnyRef = {
- val data = new Array[Byte](input.read())
- input.readFully(data)
- new CacheKey(data)
+ override def readObject(input: ObjectInput): AnyRef = {
+ val data = new Array[Byte](input.read())
+ input.readFully(data)
+ new CacheKey(data)
+ }
}
}
\ No newline at end of file
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala 2010-04-15 13:52:57 UTC (rev 1692)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala 2010-04-15 14:48:24 UTC (rev 1693)
@@ -10,6 +10,8 @@
import org.infinispan.server.core._
import collection.mutable
import collection.immutable
+import org.infinispan.util.concurrent.TimeoutException
+import java.io.IOException
/**
* // TODO: Document this
@@ -22,6 +24,7 @@
import ResponseResolver._
import OperationResponse._
import ProtocolFlag._
+ import HotRodServer._
type SuitableHeader = HotRodHeader
override def readHeader(buffer: ChannelBuffer, messageId: Long): HotRodHeader = {
@@ -77,64 +80,67 @@
createResponse(header, toResponse(header.op), KeyDoesNotExist, null)
private def createResponse(h: HotRodHeader, op: OperationResponse, st: OperationStatus, prev: CacheValue): AnyRef = {
+ val topologyResponse = getTopologyResponse(h)
if (h.flag == ForceReturnPreviousValue)
- new ResponseWithPrevious(h.messageId, op, st, if (prev == null) None else Some(prev.data))
+ new ResponseWithPrevious(h.messageId, op, st, topologyResponse, if (prev == null) None else Some(prev.data))
else
- new Response(h.messageId, op, st)
+ new Response(h.messageId, op, st, topologyResponse)
}
- override def createGetResponse(messageId: Long, v: CacheValue, op: Enumeration#Value): AnyRef = {
+ override def createGetResponse(h: HotRodHeader, v: CacheValue, op: Enumeration#Value): AnyRef = {
+ val topologyResponse = getTopologyResponse(h)
if (v != null && op == GetRequest)
- new GetResponse(messageId, GetResponse, Success, Some(v.data))
+ new GetResponse(h.messageId, GetResponse, Success, topologyResponse, Some(v.data))
else if (v != null && op == GetWithVersionRequest)
- new GetWithVersionResponse(messageId, GetWithVersionResponse, Success, Some(v.data), v.version)
+ new GetWithVersionResponse(h.messageId, GetWithVersionResponse, Success, topologyResponse, Some(v.data), v.version)
else if (op == GetRequest)
- new GetResponse(messageId, GetResponse, KeyDoesNotExist, None)
+ new GetResponse(h.messageId, GetResponse, KeyDoesNotExist, topologyResponse, None)
else
- new GetWithVersionResponse(messageId, GetWithVersionResponse, KeyDoesNotExist, None, 0)
+ new GetWithVersionResponse(h.messageId, GetWithVersionResponse, KeyDoesNotExist, topologyResponse, None, 0)
}
- override def handleCustomRequest(header: HotRodHeader, buffer: ChannelBuffer, cache: Cache[CacheKey, CacheValue]): AnyRef = {
- val messageId = header.messageId
- header.op match {
+ override def handleCustomRequest(h: HotRodHeader, buffer: ChannelBuffer, cache: Cache[CacheKey, CacheValue]): AnyRef = {
+ val messageId = h.messageId
+ h.op match {
case RemoveIfUnmodifiedRequest => {
val k = readKey(buffer)
- val params = readParameters(header, buffer)
+ val params = readParameters(h, buffer)
val prev = cache.get(k)
if (prev != null) {
if (prev.version == params.get.streamVersion) {
val removed = cache.remove(k, prev);
if (removed)
- // new Response(messageId, RemoveIfUnmodifiedResponse, Success)
- createResponse(header, RemoveIfUnmodifiedResponse, Success, prev)
+ createResponse(h, RemoveIfUnmodifiedResponse, Success, prev)
else
- // new Response(messageId, RemoveIfUnmodifiedResponse, OperationNotExecuted)
- createResponse(header, RemoveIfUnmodifiedResponse, OperationNotExecuted, prev)
+ createResponse(h, RemoveIfUnmodifiedResponse, OperationNotExecuted, prev)
} else {
- // new Response(messageId, RemoveIfUnmodifiedResponse, OperationNotExecuted)
- createResponse(header, RemoveIfUnmodifiedResponse, OperationNotExecuted, prev)
+ createResponse(h, RemoveIfUnmodifiedResponse, OperationNotExecuted, prev)
}
} else {
- // new Response(messageId, RemoveIfUnmodifiedResponse, KeyDoesNotExist)
- createResponse(header, RemoveIfUnmodifiedResponse, KeyDoesNotExist, prev)
+ createResponse(h, RemoveIfUnmodifiedResponse, KeyDoesNotExist, prev)
}
}
case ContainsKeyRequest => {
+ val topologyResponse = getTopologyResponse(h)
val k = readKey(buffer)
if (cache.containsKey(k))
- new Response(messageId, ContainsKeyResponse, Success)
+ new Response(messageId, ContainsKeyResponse, Success, topologyResponse)
else
- new Response(messageId, ContainsKeyResponse, KeyDoesNotExist)
+ new Response(messageId, ContainsKeyResponse, KeyDoesNotExist, topologyResponse)
}
case ClearRequest => {
+ val topologyResponse = getTopologyResponse(h)
cache.clear
- new Response(messageId, ClearResponse, Success)
+ new Response(messageId, ClearResponse, Success, topologyResponse)
}
- case PingRequest => new Response(messageId, PingResponse, Success)
+ case PingRequest => {
+ val topologyResponse = getTopologyResponse(h)
+ new Response(messageId, PingResponse, Success, topologyResponse)
+ }
}
}
- override def createStatsResponse(header: HotRodHeader, cacheStats: Stats): AnyRef = {
+ override def createStatsResponse(h: HotRodHeader, cacheStats: Stats): AnyRef = {
val stats = mutable.Map.empty[String, String]
stats += ("timeSinceStart" -> cacheStats.getTimeSinceStart.toString)
stats += ("currentNumberOfEntries" -> cacheStats.getCurrentNumberOfEntries.toString)
@@ -145,9 +151,42 @@
stats += ("misses" -> cacheStats.getMisses.toString)
stats += ("removeHits" -> cacheStats.getRemoveHits.toString)
stats += ("removeMisses" -> cacheStats.getRemoveMisses.toString)
- new StatsResponse(header.messageId, immutable.Map[String, String]() ++ stats)
+ val topologyResponse = getTopologyResponse(h)
+ new StatsResponse(h.messageId, immutable.Map[String, String]() ++ stats, topologyResponse)
}
+ override def createErrorResponse(h: HotRodHeader, t: Throwable): AnyRef = {
+ t match {
+ case i: IOException =>
+ new ErrorResponse(h.messageId, ParseError, getTopologyResponse(h), i.toString)
+ case t: TimeoutException =>
+ new ErrorResponse(h.messageId, OperationTimedOut, getTopologyResponse(h), t.toString)
+ case t: Throwable =>
+ new ErrorResponse(h.messageId, ServerError, getTopologyResponse(h), t.toString)
+ }
+ }
+
+ private def getTopologyResponse(h: HotRodHeader): Option[AbstractTopologyResponse] = {
+ // If clustered, set up a cache for topology information
+ if (cacheManager.getGlobalConfiguration.getTransportClass != null) {
+ val topologyCache: Cache[String, TopologyView] = cacheManager.getCache(TopologyCacheName)
+ h.clientIntelligence match {
+ case 2 | 3 => {
+ val currentTopologyView = topologyCache.get("view")
+ if (h.topologyId != currentTopologyView.topologyId) {
+ if (h.clientIntelligence == 2) {
+ Some(TopologyAwareResponse(TopologyView(currentTopologyView.topologyId, currentTopologyView.members)))
+ } else { // Must be 3
+ // TODO: Implement hash-distribution-aware reply
+ None
+ }
+ } else None
+ }
+ case 1 => None
+ }
+ } else None
+ }
+
}
object RequestResolver extends Logging {
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala 2010-04-15 13:52:57 UTC (rev 1692)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala 2010-04-15 14:48:24 UTC (rev 1693)
@@ -6,8 +6,7 @@
import transport._
import OperationStatus._
import org.infinispan.manager.{DefaultCacheManager, CacheManager}
-import java.io.{IOException, StreamCorruptedException}
-import org.infinispan.util.concurrent.TimeoutException
+import java.io.StreamCorruptedException
import org.infinispan.server.hotrod.ProtocolFlag._
import org.infinispan.server.hotrod.OperationResponse._
import java.nio.channels.ClosedChannelException
@@ -24,7 +23,8 @@
type SuitableHeader = HotRodHeader
type SuitableParameters = RequestParameters
- @volatile private var isError = false
+ private var isError = false
+ private var joined = false
override def readHeader(buffer: ChannelBuffer): HotRodHeader = {
try {
@@ -65,6 +65,7 @@
}
override def getCache(header: HotRodHeader): Cache[CacheKey, CacheValue] = {
+ // TODO: Document this in wiki
if (header.cacheName == DefaultCacheManager.DEFAULT_CACHE_NAME) cacheManager.getCache[CacheKey, CacheValue]
else cacheManager.getCache(header.cacheName)
}
@@ -91,7 +92,7 @@
h.decoder.createNotExistResponse(h)
override def createGetResponse(h: HotRodHeader, k: CacheKey, v: CacheValue): AnyRef =
- h.decoder.createGetResponse(h.messageId, v, h.op)
+ h.decoder.createGetResponse(h, v, h.op)
override def createMultiGetResponse(h: HotRodHeader, pairs: Map[CacheKey, CacheValue]): AnyRef =
null // Unsupported
@@ -105,18 +106,16 @@
override def createErrorResponse(t: Throwable): AnyRef = {
t match {
case se: ServerException => {
- val messageId = se.header.asInstanceOf[HotRodHeader].messageId
+ val h = se.header.asInstanceOf[HotRodHeader]
se.getCause match {
- case i: InvalidMagicIdException => new ErrorResponse(0, InvalidMagicOrMsgId, i.toString)
- case u: UnknownOperationException => new ErrorResponse(messageId, UnknownOperation, u.toString)
- case u: UnknownVersionException => new ErrorResponse(messageId, UnknownVersion, u.toString)
- case i: IOException => new ErrorResponse(messageId, ParseError, i.toString)
- case t: TimeoutException => new ErrorResponse(messageId, OperationTimedOut, t.toString)
- case e: Exception => new ErrorResponse(messageId, ServerError, e.toString)
+ case i: InvalidMagicIdException => new ErrorResponse(0, InvalidMagicOrMsgId, None, i.toString)
+ case u: UnknownOperationException => new ErrorResponse(h.messageId, UnknownOperation, None, u.toString)
+ case u: UnknownVersionException => new ErrorResponse(h.messageId, UnknownVersion, None, u.toString)
+ case t: Throwable => h.decoder.createErrorResponse(h, t)
}
}
case c: ClosedChannelException => null
- case t: Throwable => new ErrorResponse(0, ServerError, t.toString)
+ case t: Throwable => new ErrorResponse(0, ServerError, None, t.toString)
}
}
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodEncoder.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodEncoder.scala 2010-04-15 13:52:57 UTC (rev 1692)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodEncoder.scala 2010-04-15 14:48:24 UTC (rev 1693)
@@ -16,7 +16,7 @@
override def encode(ctx: ChannelHandlerContext, channel: Channel, msg: AnyRef): AnyRef = {
trace("Encode msg {0}", msg)
- val buffer: ChannelBuffer = msg match {
+ val buffer: ChannelBuffer = msg match {
case r: Response => writeHeader(r)
}
msg match {
@@ -52,7 +52,24 @@
buffer.writeUnsignedLong(r.messageId)
buffer.writeByte(r.operation.id.byteValue)
buffer.writeByte(r.status.id.byteValue)
- buffer.writeByte(0) // TODO: topology change marker, implemented later
+ if (r.topologyResponse != None) {
+ buffer.writeByte(1) // Topology changed
+ r.topologyResponse.get match {
+ case t: TopologyAwareResponse => {
+ buffer.writeUnsignedInt(t.view.topologyId)
+ buffer.writeUnsignedInt(t.view.members.size)
+ t.view.members.foreach{address =>
+ buffer.writeString(address.host)
+ buffer.writeUnsignedShort(address.port)
+ }
+ }
+ case h: HashDistAwareResponse => {
+ // TODO: Implement reply to hash dist responses
+ }
+ }
+ } else {
+ buffer.writeByte(0) // No topology change
+ }
buffer
}
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala 2010-04-15 13:52:57 UTC (rev 1692)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala 2010-04-15 14:48:24 UTC (rev 1693)
@@ -1,8 +1,12 @@
package org.infinispan.server.hotrod
import org.infinispan.manager.CacheManager
-import org.infinispan.server.core.AbstractProtocolServer
import org.infinispan.server.core.transport.{Decoder, Encoder}
+import org.jgroups.blocks.RequestOptions
+import org.infinispan.server.core.{Logging, AbstractProtocolServer}
+import org.infinispan.config.Configuration
+import org.infinispan.config.Configuration.CacheMode
+import org.infinispan.Cache
/**
* // TODO: Document this
@@ -12,8 +16,49 @@
class HotRodServer extends AbstractProtocolServer("HotRod") {
+ import HotRodServer._
+
override def getEncoder: Encoder = new HotRodEncoder
override def getDecoder: Decoder = new HotRodDecoder(getCacheManager)
+ override def start(host: String, port: Int, cacheManager: CacheManager, masterThreads: Int, workerThreads: Int) {
+ super.start(host, port, cacheManager, masterThreads, workerThreads)
+ // If clustered, set up a cache for topology information
+ if (cacheManager.getGlobalConfiguration.getTransportClass != null) {
+ defineTopologyCacheConfig(cacheManager)
+ val topologyCache: Cache[String, TopologyView] = cacheManager.getCache(TopologyCacheName)
+ val currentView = topologyCache.get("view")
+ if (currentView != null) {
+ // TODO: If distribution configured, add hashcode of this address
+ val newMembers = currentView.members ::: List(TopologyAddress(host, port, 0))
+ val newView = TopologyView(currentView.topologyId + 1, newMembers)
+ val replaced = topologyCache.replace("view", currentView, newView)
+ if (!replaced) {
+ // TODO: There was a concurrent view modification, get and try to install new view again.
+ }
+ } else {
+ // TODO add check for distribution and if so, put the right hashcode
+ val newMembers = List(TopologyAddress(host, port, 0))
+ val newView = TopologyView(1, newMembers)
+ val prev = topologyCache.putIfAbsent("view", newView)
+ if (prev != null) {
+ // TODO: There was a concurrent view modification, get and try to install new view again.
+ }
+ }
+ }
+ }
+
+ protected def defineTopologyCacheConfig(cacheManager: CacheManager) {
+ val topologyCacheConfig = new Configuration
+ topologyCacheConfig.setCacheMode(CacheMode.REPL_SYNC)
+ topologyCacheConfig.setSyncReplTimeout(10000) // Milliseconds
+ topologyCacheConfig.setFetchInMemoryState(true) // State transfer required
+ cacheManager.defineConfiguration(TopologyCacheName, topologyCacheConfig)
+ }
+
+}
+
+object HotRodServer {
+ val TopologyCacheName = "___hotRodTopologyCache"
}
\ No newline at end of file
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Response.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Response.scala 2010-04-15 13:52:57 UTC (rev 1692)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Response.scala 2010-04-15 14:48:24 UTC (rev 1693)
@@ -9,7 +9,9 @@
* @author Galder Zamarreño
* @since 4.1
*/
-class Response(val messageId: Long, val operation: OperationResponse, val status: OperationStatus) {
+// TODO: Maybe add clientIntelligence to response to decide what information to send back
+class Response(val messageId: Long, val operation: OperationResponse, val status: OperationStatus,
+ val topologyResponse: Option[AbstractTopologyResponse]) {
override def toString = {
new StringBuilder().append("Response").append("{")
.append("messageId=").append(messageId)
@@ -20,8 +22,10 @@
}
class ResponseWithPrevious(override val messageId: Long, override val operation: OperationResponse,
- override val status: OperationStatus, val previous: Option[Array[Byte]])
- extends Response(messageId, operation, status) {
+ override val status: OperationStatus,
+ override val topologyResponse: Option[AbstractTopologyResponse],
+ val previous: Option[Array[Byte]])
+ extends Response(messageId, operation, status, topologyResponse) {
override def toString = {
new StringBuilder().append("ResponseWithPrevious").append("{")
.append("messageId=").append(messageId)
@@ -33,8 +37,9 @@
}
class GetResponse(override val messageId: Long, override val operation: OperationResponse,
- override val status: OperationStatus, val data: Option[Array[Byte]])
- extends Response(messageId, operation, status) {
+ override val status: OperationStatus, override val topologyResponse: Option[AbstractTopologyResponse],
+ val data: Option[Array[Byte]])
+ extends Response(messageId, operation, status, topologyResponse) {
override def toString = {
new StringBuilder().append("GetResponse").append("{")
.append("messageId=").append(messageId)
@@ -46,9 +51,10 @@
}
class GetWithVersionResponse(override val messageId: Long, override val operation: OperationResponse,
- override val status: OperationStatus, override val data: Option[Array[Byte]],
- val version: Long)
- extends GetResponse(messageId, operation, status, data) {
+ override val status: OperationStatus,
+ override val topologyResponse: Option[AbstractTopologyResponse],
+ override val data: Option[Array[Byte]], val version: Long)
+ extends GetResponse(messageId, operation, status, topologyResponse, data) {
override def toString = {
new StringBuilder().append("GetWithVersionResponse").append("{")
.append("messageId=").append(messageId)
@@ -61,7 +67,8 @@
}
class ErrorResponse(override val messageId: Long, override val status: OperationStatus,
- val msg: String) extends Response(messageId, ErrorResponse, status) {
+ override val topologyResponse: Option[AbstractTopologyResponse], val msg: String)
+ extends Response(messageId, ErrorResponse, status, topologyResponse) {
override def toString = {
new StringBuilder().append("ErrorResponse").append("{")
.append("messageId=").append(messageId)
@@ -72,11 +79,21 @@
}
}
-class StatsResponse(override val messageId: Long, val stats: Map[String, String]) extends Response(messageId, StatsResponse, Success) {
+class StatsResponse(override val messageId: Long, val stats: Map[String, String],
+ override val topologyResponse: Option[AbstractTopologyResponse])
+ extends Response(messageId, StatsResponse, Success, topologyResponse) {
override def toString = {
new StringBuilder().append("StatsResponse").append("{")
.append("messageId=").append(messageId)
.append(", stats=").append(stats)
.append("}").toString
}
-}
\ No newline at end of file
+}
+
+abstract class AbstractTopologyResponse(val view: TopologyView)
+
+case class TopologyAwareResponse(override val view: TopologyView)
+ extends AbstractTopologyResponse(view)
+
+case class HashDistAwareResponse(override val view: TopologyView, numKeyOwners: Short, hashFunction: Byte, hashSpaceSize: Int)
+ extends AbstractTopologyResponse(view)
\ No newline at end of file
Added: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/TopologyAddress.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/TopologyAddress.scala (rev 0)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/TopologyAddress.scala 2010-04-15 14:48:24 UTC (rev 1693)
@@ -0,0 +1,30 @@
+package org.infinispan.server.hotrod
+
+import java.io.{ObjectInput, ObjectOutput}
+import org.infinispan.marshall.Marshallable
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since // TODO
+ */
+ at Marshallable(externalizer = classOf[TopologyAddress.Externalizer], id = 58)
+case class TopologyAddress(val host: String, val port: Int, val hostHashCode: Int)
+
+object TopologyAddress {
+ class Externalizer extends org.infinispan.marshall.Externalizer {
+ override def writeObject(output: ObjectOutput, obj: AnyRef) {
+ val topologyAddress = obj.asInstanceOf[TopologyAddress]
+ output.writeObject(topologyAddress.host)
+ output.writeInt(topologyAddress.port)
+ output.writeInt(topologyAddress.hostHashCode)
+ }
+
+ override def readObject(input: ObjectInput): AnyRef = {
+ val host = input.readObject.asInstanceOf[String]
+ val port = input.readInt
+ val hostHashCode = input.readInt
+ TopologyAddress(host, port, hostHashCode)
+ }
+ }
+}
\ No newline at end of file
Added: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/TopologyView.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/TopologyView.scala (rev 0)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/TopologyView.scala 2010-04-15 14:48:24 UTC (rev 1693)
@@ -0,0 +1,28 @@
+package org.infinispan.server.hotrod
+
+import org.infinispan.marshall.Marshallable
+import java.io.{ObjectInput, ObjectOutput}
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since // TODO
+ */
+ at Marshallable(externalizer = classOf[TopologyView.Externalizer], id = 59)
+case class TopologyView(val topologyId: Int, val members: List[TopologyAddress])
+
+object TopologyView {
+ class Externalizer extends org.infinispan.marshall.Externalizer {
+ override def writeObject(output: ObjectOutput, obj: AnyRef) {
+ val topologyView = obj.asInstanceOf[TopologyView]
+ output.writeInt(topologyView.topologyId)
+ output.writeObject(topologyView.members.toArray) // Write arrays instead since writing Lists causes issues
+ }
+
+ override def readObject(input: ObjectInput): AnyRef = {
+ val topologyId = input.readInt
+ val members = input.readObject.asInstanceOf[Array[TopologyAddress]]
+ TopologyView(topologyId, members.toList)
+ }
+ }
+}
\ No newline at end of file
Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodFunctionalTest.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodFunctionalTest.scala 2010-04-15 13:52:57 UTC (rev 1692)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodFunctionalTest.scala 2010-04-15 14:48:24 UTC (rev 1693)
@@ -29,15 +29,15 @@
override def createTestCacheManager: CacheManager = TestCacheManagerFactory.createLocalCacheManager(true)
def testUnknownCommand(m: Method) {
- val status = client.execute(0xA0, 0x77, cacheName, k(m) , 0, 0, v(m), 0)
- assertTrue(status == UnknownOperation,
+ val status = client.execute(0xA0, 0x77, cacheName, k(m) , 0, 0, v(m), 0, 1, 0).status
+ assertEquals(status, UnknownOperation,
"Status should have been 'UnknownOperation' but instead was: " + status)
}
def testUnknownMagic(m: Method) {
client.assertPut(m) // Do a put to make sure decoder gets back to reading properly
- val status = client.executeWithBadMagic(0x66, 0x01, cacheName, k(m) , 0, 0, v(m), 0)
- assertTrue(status == InvalidMagicOrMsgId,
+ val status = client.executeWithBadMagic(0x66, 0x01, cacheName, k(m) , 0, 0, v(m), 0).status
+ assertEquals(status, InvalidMagicOrMsgId,
"Status should have been 'InvalidMagicOrMsgId' but instead was: " + status)
}
@@ -48,7 +48,7 @@
}
def testPutOnDefaultCache(m: Method) {
- val status = client.execute(0xA0, 0x01, DefaultCacheManager.DEFAULT_CACHE_NAME, k(m), 0, 0, v(m), 0)
+ val status = client.execute(0xA0, 0x01, DefaultCacheManager.DEFAULT_CACHE_NAME, k(m), 0, 0, v(m), 0, 1, 0).status
assertStatus(status, Success)
val cache = cacheManager.getCache[CacheKey, CacheValue]
val value = cache.get(new CacheKey(k(m)))
@@ -58,280 +58,254 @@
def testPutWithLifespan(m: Method) {
client.assertPut(m, 1, 0)
Thread.sleep(1100)
- val (getSt, actual) = client.assertGet(m)
- assertKeyDoesNotExist(getSt, actual)
+ assertKeyDoesNotExist(client.assertGet(m))
}
def testPutWithMaxIdle(m: Method) {
client.assertPut(m, 0, 1)
Thread.sleep(1100)
- val (getSt, actual) = client.assertGet(m)
- assertKeyDoesNotExist(getSt, actual)
+ assertKeyDoesNotExist(client.assertGet(m))
}
def testPutWithPreviousValue(m: Method) {
- val (status, previous) = client.put(k(m) , 0, 0, v(m), 1)
- assertSuccess(status, Array(), previous)
- val (status2, previous2) = client.put(k(m) , 0, 0, v(m, "v2-"), 1)
- assertSuccess(status2, v(m), previous2)
+ var resp = client.put(k(m) , 0, 0, v(m), 1).asInstanceOf[ResponseWithPrevious]
+ assertStatus(resp.status, Success)
+ assertEquals(resp.previous, None)
+ resp = client.put(k(m) , 0, 0, v(m, "v2-"), 1).asInstanceOf[ResponseWithPrevious]
+ assertSuccess(resp, v(m))
}
def testGetBasic(m: Method) {
client.assertPut(m)
- val (getSt, actual) = client.assertGet(m)
- assertSuccess(getSt, v(m), actual)
+ assertSuccess(client.assertGet(m), v(m))
}
def testGetDoesNotExist(m: Method) {
- val (getSt, actual) = client.assertGet(m)
- assertKeyDoesNotExist(getSt, actual)
+ assertKeyDoesNotExist(client.assertGet(m))
}
def testPutIfAbsentNotExist(m: Method) {
- val status = client.putIfAbsent(k(m) , 0, 0, v(m))
+ val status = client.putIfAbsent(k(m) , 0, 0, v(m)).status
assertStatus(status, Success)
}
def testPutIfAbsentExist(m: Method) {
client.assertPut(m)
- val status = client.putIfAbsent(k(m) , 0, 0, v(m, "v2-"))
+ val status = client.putIfAbsent(k(m) , 0, 0, v(m, "v2-")).status
assertStatus(status, OperationNotExecuted)
}
def testPutIfAbsentWithLifespan(m: Method) {
- val status = client.putIfAbsent(k(m) , 1, 0, v(m))
+ val status = client.putIfAbsent(k(m) , 1, 0, v(m)).status
assertStatus(status, Success)
Thread.sleep(1100)
- val (getSt, actual) = client.assertGet(m)
- assertKeyDoesNotExist(getSt, actual)
+ assertKeyDoesNotExist(client.assertGet(m))
}
def testPutIfAbsentWithMaxIdle(m: Method) {
- val status = client.putIfAbsent(k(m) , 0, 1, v(m))
+ val status = client.putIfAbsent(k(m) , 0, 1, v(m)).status
assertStatus(status, Success)
Thread.sleep(1100)
- val (getSt, actual) = client.assertGet(m)
- assertKeyDoesNotExist(getSt, actual)
+ assertKeyDoesNotExist(client.assertGet(m))
}
def testPutIfAbsentWithPreviousValue(m: Method) {
- val (status, previous) = client.putIfAbsent(k(m) , 0, 0, v(m), 1)
- assertSuccess(status, Array(), previous)
- val (status2, previous2) = client.putIfAbsent(k(m) , 0, 0, v(m, "v2-"), 1)
- assertStatus(status2, OperationNotExecuted)
- assertTrue(Arrays.equals(v(m), previous2))
+ var resp = client.putIfAbsent(k(m) , 0, 0, v(m), 1).asInstanceOf[ResponseWithPrevious]
+ assertStatus(resp.status, Success)
+ assertEquals(resp.previous, None)
+ resp = client.putIfAbsent(k(m) , 0, 0, v(m, "v2-"), 1).asInstanceOf[ResponseWithPrevious]
+ assertStatus(resp.status, OperationNotExecuted)
+ assertTrue(Arrays.equals(v(m), resp.previous.get))
}
def testReplaceBasic(m: Method) {
client.assertPut(m)
- val status = client.replace(k(m), 0, 0, v(m, "v1-"))
+ val status = client.replace(k(m), 0, 0, v(m, "v1-")).status
assertStatus(status, Success)
- val (getSt, actual) = client.assertGet(m)
- assertSuccess(getSt, v(m, "v1-"), actual)
+ assertSuccess(client.assertGet(m), v(m, "v1-"))
}
def testNotReplaceIfNotPresent(m: Method) {
- val status = client.replace(k(m), 0, 0, v(m))
+ val status = client.replace(k(m), 0, 0, v(m)).status
assertStatus(status, OperationNotExecuted)
}
def testReplaceWithLifespan(m: Method) {
client.assertPut(m)
- val status = client.replace(k(m), 1, 0, v(m, "v1-"))
+ val status = client.replace(k(m), 1, 0, v(m, "v1-")).status
assertStatus(status, Success)
Thread.sleep(1100)
- val (getSt, actual) = client.assertGet(m)
- assertKeyDoesNotExist(getSt, actual)
+ assertKeyDoesNotExist(client.assertGet(m))
}
def testReplaceWithMaxIdle(m: Method) {
client.assertPut(m)
- val status = client.replace(k(m), 0, 1, v(m, "v1-"))
+ val status = client.replace(k(m), 0, 1, v(m, "v1-")).status
assertStatus(status, Success)
Thread.sleep(1100)
- val (getSt, actual) = client.assertGet(m)
- assertKeyDoesNotExist(getSt, actual)
+ assertKeyDoesNotExist(client.assertGet(m))
}
def testReplaceWithPreviousValue(m: Method) {
- val (status, previous) = client.replace(k(m) , 0, 0, v(m), 1)
- assertStatus(status, OperationNotExecuted)
- assertEquals(previous.length, 0)
- val (status2, previous2) = client.put(k(m) , 0, 0, v(m, "v2-"), 1)
- assertSuccess(status2, Array(), previous2)
- val (status3, previous3) = client.replace(k(m) , 0, 0, v(m, "v3-"), 1)
- assertSuccess(status3, v(m, "v2-"), previous3)
+ var resp = client.replace(k(m) , 0, 0, v(m), 1).asInstanceOf[ResponseWithPrevious]
+ assertStatus(resp.status, OperationNotExecuted)
+ assertEquals(resp.previous, None)
+ resp = client.put(k(m) , 0, 0, v(m, "v2-"), 1).asInstanceOf[ResponseWithPrevious]
+ assertStatus(resp.status, Success)
+ assertEquals(resp.previous, None)
+ resp = client.replace(k(m) , 0, 0, v(m, "v3-"), 1).asInstanceOf[ResponseWithPrevious]
+ assertSuccess(resp, v(m, "v2-"))
}
def testGetWithVersionBasic(m: Method) {
client.assertPut(m)
- val (getSt, actual, version) = client.getWithVersion(k(m), 0)
- assertSuccess(getSt, v(m), actual)
- assertTrue(version != 0)
+ assertSuccess(client.getWithVersion(k(m), 0), v(m), 0)
}
def testGetWithVersionDoesNotExist(m: Method) {
- val (getSt, actual, version) = client.getWithVersion(k(m), 0)
- assertKeyDoesNotExist(getSt, actual)
- assertTrue(version == 0)
+ val resp = client.getWithVersion(k(m), 0)
+ assertKeyDoesNotExist(resp)
+ assertTrue(resp.version == 0)
}
def testReplaceIfUnmodifiedBasic(m: Method) {
client.assertPut(m)
- val (getSt, actual, version) = client.getWithVersion(k(m), 0)
- assertSuccess(getSt, v(m), actual)
- assertTrue(version != 0)
- val status = client.replaceIfUnmodified(k(m), 0, 0, v(m, "v1-"), version)
+ val resp = client.getWithVersion(k(m), 0)
+ assertSuccess(resp, v(m), 0)
+ val status = client.replaceIfUnmodified(k(m), 0, 0, v(m, "v1-"), resp.version).status
assertStatus(status, Success)
}
def testReplaceIfUnmodifiedNotFound(m: Method) {
client.assertPut(m)
- val (getSt, actual, version) = client.getWithVersion(k(m), 0)
- assertSuccess(getSt, v(m), actual)
- assertTrue(version != 0)
- val status = client.replaceIfUnmodified(k(m, "k1-"), 0, 0, v(m, "v1-"), version)
+ val resp = client.getWithVersion(k(m), 0)
+ assertSuccess(resp, v(m), 0)
+ val status = client.replaceIfUnmodified(k(m, "k1-"), 0, 0, v(m, "v1-"), resp.version).status
assertStatus(status, KeyDoesNotExist)
}
def testReplaceIfUnmodifiedNotExecuted(m: Method) {
client.assertPut(m)
- val (getSt, actual, version) = client.getWithVersion(k(m), 0)
- assertSuccess(getSt, v(m), actual)
- assertTrue(version != 0)
- var status = client.replaceIfUnmodified(k(m), 0, 0, v(m, "v1-"), version)
+ var resp = client.getWithVersion(k(m), 0)
+ assertSuccess(resp, v(m), 0)
+ var status = client.replaceIfUnmodified(k(m), 0, 0, v(m, "v1-"), resp.version).status
assertStatus(status, Success)
- val (getSt2, actual2, version2) = client.getWithVersion(k(m), 0)
- assertSuccess(getSt2, v(m, "v1-"), actual2)
- assertTrue(version2 != 0)
- assertTrue(version != version2)
- status = client.replaceIfUnmodified(k(m), 0, 0, v(m, "v2-"), version)
+ val resp2 = client.getWithVersion(k(m), 0)
+ assertSuccess(resp2, v(m, "v1-"), 0)
+ assertTrue(resp.version != resp2.version)
+ status = client.replaceIfUnmodified(k(m), 0, 0, v(m, "v2-"), resp.version).status
assertStatus(status, OperationNotExecuted)
- status = client.replaceIfUnmodified(k(m), 0, 0, v(m, "v2-"), version2)
+ status = client.replaceIfUnmodified(k(m), 0, 0, v(m, "v2-"), resp2.version).status
assertStatus(status, Success)
}
def testReplaceIfUnmodifiedWithPreviousValue(m: Method) {
- val (status, previous) = client.replaceIfUnmodified(k(m) , 0, 0, v(m), 999, 1)
- assertStatus(status, KeyDoesNotExist)
- assertEquals(previous.length, 0)
+ var resp = client.replaceIfUnmodified(k(m) , 0, 0, v(m), 999, 1).asInstanceOf[ResponseWithPrevious]
+ assertStatus(resp.status, KeyDoesNotExist)
+ assertEquals(resp.previous, None)
client.assertPut(m)
- val (getSt, actual, version) = client.getWithVersion(k(m), 0)
- assertSuccess(getSt, v(m), actual)
- assertTrue(version != 0)
- val (status2, previous2) = client.replaceIfUnmodified(k(m), 0, 0, v(m, "v2-"), 888, 1)
- assertStatus(status2, OperationNotExecuted)
- assertTrue(Arrays.equals(v(m), previous2))
- val (status3, previous3) = client.replaceIfUnmodified(k(m), 0, 0, v(m, "v3-"), version, 1)
- assertStatus(status3, Success)
- assertTrue(Arrays.equals(v(m), previous3))
+ val getResp = client.getWithVersion(k(m), 0)
+ assertSuccess(getResp, v(m), 0)
+ resp = client.replaceIfUnmodified(k(m), 0, 0, v(m, "v2-"), 888, 1).asInstanceOf[ResponseWithPrevious]
+ assertStatus(resp.status, OperationNotExecuted)
+ assertTrue(Arrays.equals(v(m), resp.previous.get))
+ resp = client.replaceIfUnmodified(k(m), 0, 0, v(m, "v3-"), getResp.version, 1).asInstanceOf[ResponseWithPrevious]
+ assertStatus(resp.status, Success)
+ assertTrue(Arrays.equals(v(m), resp.previous.get))
}
def testRemoveBasic(m: Method) {
client.assertPut(m)
- val status = client.remove(k(m))
+ val status = client.remove(k(m)).status
assertStatus(status, Success)
- val (getSt, actual) = client.assertGet(m)
- assertKeyDoesNotExist(getSt, actual)
+ assertKeyDoesNotExist(client.assertGet(m))
}
def testRemoveDoesNotExist(m: Method) {
- val status = client.remove(k(m))
+ val status = client.remove(k(m)).status
assertStatus(status, KeyDoesNotExist)
}
def testRemoveWithPreviousValue(m: Method) {
- val (status, previous) = client.remove(k(m), 1)
- assertStatus(status, KeyDoesNotExist)
- assertEquals(previous.length, 0)
+ var resp = client.remove(k(m), 1).asInstanceOf[ResponseWithPrevious]
+ assertStatus(resp.status, KeyDoesNotExist)
+ assertEquals(resp.previous, None)
client.assertPut(m)
- val (status2, previous2) = client.remove(k(m), 1)
- assertSuccess(status2, v(m), previous2)
+ resp = client.remove(k(m), 1).asInstanceOf[ResponseWithPrevious]
+ assertSuccess(resp, v(m))
}
def testRemoveIfUnmodifiedBasic(m: Method) {
client.assertPut(m)
- val (getSt, actual, version) = client.getWithVersion(k(m), 0)
- assertSuccess(getSt, v(m), actual)
- assertTrue(version != 0)
- val status = client.removeIfUnmodified(k(m), 0, 0, v(m, "v1-"), version)
+ val resp = client.getWithVersion(k(m), 0)
+ assertSuccess(resp, v(m), 0)
+ assertTrue(resp.version != 0)
+ val status = client.removeIfUnmodified(k(m), 0, 0, v(m, "v1-"), resp.version).status
assertStatus(status, Success)
- val (getSt2, actual2) = client.assertGet(m)
- assertKeyDoesNotExist(getSt2, actual2)
+ assertKeyDoesNotExist(client.assertGet(m))
}
def testRemoveIfUnmodifiedNotFound(m: Method) {
client.assertPut(m)
- val (getSt, actual, version) = client.getWithVersion(k(m), 0)
- assertSuccess(getSt, v(m), actual)
- assertTrue(version != 0)
- val status = client.removeIfUnmodified(k(m, "k1-"), 0, 0, v(m, "v1-"), version)
+ var resp = client.getWithVersion(k(m), 0)
+ assertSuccess(resp, v(m), 0)
+ val status = client.removeIfUnmodified(k(m, "k1-"), 0, 0, v(m, "v1-"), resp.version).status
assertStatus(status, KeyDoesNotExist)
- val (getSt2, actual2) = client.assertGet(m)
- assertSuccess(getSt2, v(m), actual2)
+ assertSuccess(client.assertGet(m), v(m))
}
def testRemoveIfUnmodifiedNotExecuted(m: Method) {
client.assertPut(m)
- val (getSt, actual, version) = client.getWithVersion(k(m), 0)
- assertSuccess(getSt, v(m), actual)
- assertTrue(version != 0)
- var status = client.replaceIfUnmodified(k(m), 0, 0, v(m, "v1-"), version)
+ val resp = client.getWithVersion(k(m), 0)
+ assertSuccess(resp, v(m), 0)
+ var status = client.replaceIfUnmodified(k(m), 0, 0, v(m, "v1-"), resp.version).status
assertStatus(status, Success)
- val (getSt2, actual2, version2) = client.getWithVersion(k(m), 0)
- assertSuccess(getSt2, v(m, "v1-"), actual2)
- assertTrue(version2 != 0)
- assertTrue(version != version2)
- status = client.removeIfUnmodified(k(m), 0, 0, v(m, "v2-"), version)
+ val resp2 = client.getWithVersion(k(m), 0)
+ assertSuccess(resp2, v(m, "v1-"), 0)
+ assertTrue(resp.version != resp2.version)
+ status = client.removeIfUnmodified(k(m), 0, 0, v(m, "v2-"), resp.version).status
assertStatus(status, OperationNotExecuted)
- status = client.removeIfUnmodified(k(m), 0, 0, v(m, "v2-"), version2)
+ status = client.removeIfUnmodified(k(m), 0, 0, v(m, "v2-"), resp2.version).status
assertStatus(status, Success)
}
def testRemoveIfUmodifiedWithPreviousValue(m: Method) {
- val (status, previous) = client.removeIfUnmodified(k(m) , 0, 0, v(m), 999, 1)
- assertStatus(status, KeyDoesNotExist)
- assertEquals(previous.length, 0)
+ var resp = client.removeIfUnmodified(k(m) , 0, 0, v(m), 999, 1).asInstanceOf[ResponseWithPrevious]
+ assertStatus(resp.status, KeyDoesNotExist)
+ assertEquals(resp.previous, None)
client.assertPut(m)
- val (getSt, actual, version) = client.getWithVersion(k(m), 0)
- assertSuccess(getSt, v(m), actual)
- assertTrue(version != 0)
- val (status2, previous2) = client.removeIfUnmodified(k(m), 0, 0, v(m, "v2-"), 888, 1)
- assertStatus(status2, OperationNotExecuted)
- assertTrue(Arrays.equals(v(m), previous2))
- val (status3, previous3) = client.removeIfUnmodified(k(m), 0, 0, v(m, "v3-"), version, 1)
- assertStatus(status3, Success)
- assertTrue(Arrays.equals(v(m), previous3))
+ val getResp = client.getWithVersion(k(m), 0)
+ assertSuccess(getResp, v(m), 0)
+ resp = client.removeIfUnmodified(k(m), 0, 0, v(m, "v2-"), 888, 1).asInstanceOf[ResponseWithPrevious]
+ assertStatus(resp.status, OperationNotExecuted)
+ assertTrue(Arrays.equals(v(m), resp.previous.get))
+ resp = client.removeIfUnmodified(k(m), 0, 0, v(m, "v3-"), getResp.version, 1).asInstanceOf[ResponseWithPrevious]
+ assertStatus(resp.status, Success)
+ assertTrue(Arrays.equals(v(m), resp.previous.get))
}
def testContainsKeyBasic(m: Method) {
client.assertPut(m)
- val status = client.containsKey(k(m), 0)
- assertStatus(status, Success)
+ assertStatus(client.containsKey(k(m), 0).status, Success)
}
def testContainsKeyDoesNotExist(m: Method) {
- val status = client.containsKey(k(m), 0)
- assertStatus(status, KeyDoesNotExist)
+ assertStatus(client.containsKey(k(m), 0).status, KeyDoesNotExist)
}
def testClear(m: Method) {
for (i <- 1 to 5) {
val key = k(m, "k" + i + "-");
val value = v(m, "v" + i + "-");
- var status = client.put(key , 0, 0, value)
- assertStatus(status, Success)
- status = client.containsKey(key, 0)
- assertStatus(status, Success)
+ assertStatus(client.put(key , 0, 0, value).status, Success)
+ assertStatus(client.containsKey(key, 0).status, Success)
}
- val status = client.clear
- assertStatus(status, Success)
+ assertStatus(client.clear.status, Success)
for (i <- 1 to 5) {
val key = k(m, "k" + i + "-")
- val status = client.containsKey(key, 0)
- assertStatus(status, KeyDoesNotExist)
+ assertStatus(client.containsKey(key, 0).status, KeyDoesNotExist)
}
}
@@ -349,8 +323,23 @@
}
def testPing(m: Method) {
- val status = client.ping
+ val status = client.ping.status
assertStatus(status, Success)
}
+ def testPingWithTopologyAwareClient(m: Method) {
+ var resp = client.ping
+ assertStatus(resp.status, Success)
+ assertEquals(resp.topologyResponse, None)
+ resp = client.ping(1, 0)
+ assertStatus(resp.status, Success)
+ assertEquals(resp.topologyResponse, None)
+ resp = client.ping(2, 0)
+ assertStatus(resp.status, Success)
+ assertEquals(resp.topologyResponse, None)
+ resp = client.ping(3, 0)
+ assertStatus(resp.status, Success)
+ assertEquals(resp.topologyResponse, None)
+ }
+
}
\ No newline at end of file
Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodReplicationTest.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodReplicationTest.scala 2010-04-15 13:52:57 UTC (rev 1692)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodReplicationTest.scala 2010-04-15 14:48:24 UTC (rev 1693)
@@ -3,10 +3,12 @@
import org.infinispan.test.MultipleCacheManagersTest
import org.infinispan.config.Configuration
import java.lang.reflect.Method
-import org.testng.annotations.{AfterClass, Test}
import test.HotRodClient
import test.HotRodTestingUtil._
import org.infinispan.server.hotrod.OperationStatus._
+import org.infinispan.config.Configuration.CacheMode
+import org.testng.Assert._
+import org.testng.annotations.{AfterMethod, AfterClass, Test}
/**
* // TODO: Document this
@@ -16,17 +18,29 @@
@Test(groups = Array("functional"), testName = "server.hotrod.ClusterTest")
class HotRodReplicationTest extends MultipleCacheManagersTest {
+
+ import HotRodServer._
+
private val cacheName = "hotRodReplSync"
private[this] var servers: List[HotRodServer] = List()
private[this] var clients: List[HotRodClient] = List()
@Test(enabled=false) // Disable explicitly to avoid TestNG thinking this is a test!!
override def createCacheManagers {
- var config = getDefaultClusteredConfig(Configuration.CacheMode.REPL_SYNC)
+ val config = getDefaultClusteredConfig(Configuration.CacheMode.REPL_SYNC)
config.setFetchInMemoryState(true)
+
+ val topologyCacheConfig = new Configuration
+ topologyCacheConfig.setCacheMode(CacheMode.REPL_SYNC)
+ topologyCacheConfig.setSyncReplTimeout(10000) // Milliseconds
+ topologyCacheConfig.setFetchInMemoryState(true) // State transfer required
+ topologyCacheConfig.setSyncCommitPhase(true) // Only for testing, so that asserts work fine.
+ topologyCacheConfig.setSyncRollbackPhase(true) // Only for testing, so that asserts work fine.
+
for (i <- 0 until 2) {
val cm = addClusterEnabledCacheManager()
cm.defineConfiguration(cacheName, config)
+ cm.defineConfiguration(TopologyCacheName, topologyCacheConfig)
}
servers = startHotRodServer(cacheManagers.get(0)) :: servers
servers = startHotRodServer(cacheManagers.get(1), servers.head.getPort + 50) :: servers
@@ -43,40 +57,80 @@
servers.foreach(_.stop)
}
+ @AfterMethod(alwaysRun=true)
+ override def clearContent() {
+ // Do not clear cache between methods so that topology cache does not get cleared
+ }
+
def testReplicatedPut(m: Method) {
- val putSt = clients.head.put(k(m) , 0, 0, v(m))
+ val putSt = clients.head.put(k(m) , 0, 0, v(m)).status
assertStatus(putSt, Success)
- val (getSt, actual) = clients.tail.head.get(k(m), 0)
- assertSuccess(getSt, v(m), actual)
+ assertSuccess(clients.tail.head.get(k(m), 0), v(m))
}
def testReplicatedPutIfAbsent(m: Method) {
- val (getSt, actual) = clients.head.assertGet(m)
- assertKeyDoesNotExist(getSt, actual)
- val (getSt2, actual2) = clients.tail.head.assertGet(m)
- assertKeyDoesNotExist(getSt2, actual2)
- val putSt = clients.head.putIfAbsent(k(m) , 0, 0, v(m))
+ assertKeyDoesNotExist(clients.head.assertGet(m))
+ assertKeyDoesNotExist(clients.tail.head.assertGet(m))
+ var putSt = clients.head.putIfAbsent(k(m) , 0, 0, v(m)).status
assertStatus(putSt, Success)
- val (getSt3, actual3) = clients.tail.head.get(k(m), 0)
- assertSuccess(getSt3, v(m), actual3)
- val putSt2 = clients.tail.head.putIfAbsent(k(m) , 0, 0, v(m, "v2-"))
- assertStatus(putSt2, OperationNotExecuted)
+ assertSuccess(clients.tail.head.get(k(m), 0), v(m))
+ assertStatus(clients.tail.head.putIfAbsent(k(m) , 0, 0, v(m, "v2-")).status, OperationNotExecuted)
}
def testReplicatedReplace(m: Method) {
- val status = clients.head.replace(k(m), 0, 0, v(m))
+ var status = clients.head.replace(k(m), 0, 0, v(m)).status
assertStatus(status, OperationNotExecuted)
- val status2 = clients.tail.head.replace(k(m), 0, 0, v(m))
- assertStatus(status2, OperationNotExecuted)
+ status = clients.tail.head.replace(k(m), 0, 0, v(m)).status
+ assertStatus(status , OperationNotExecuted)
clients.tail.head.assertPut(m)
- val status3 = clients.tail.head.replace(k(m), 0, 0, v(m, "v1-"))
- assertStatus(status3, Success)
- val (getSt, actual) = clients.head.assertGet(m)
- assertSuccess(getSt, v(m, "v1-"), actual)
- val status4 = clients.head.replace(k(m), 0, 0, v(m, "v2-"))
- assertStatus(status4, Success)
- val (getSt2, actual2) = clients.tail.head.assertGet(m)
- assertSuccess(getSt2, v(m, "v2-"), actual2)
+ status = clients.tail.head.replace(k(m), 0, 0, v(m, "v1-")).status
+ assertStatus(status, Success)
+ assertSuccess(clients.head.assertGet(m), v(m, "v1-"))
+ status = clients.head.replace(k(m), 0, 0, v(m, "v2-")).status
+ assertStatus(status, Success)
+ assertSuccess(clients.tail.head.assertGet(m), v(m, "v2-"))
}
+ def testPingWithTopologyAwareClient(m: Method) {
+ var resp = clients.head.ping
+ assertStatus(resp.status, Success)
+ assertEquals(resp.topologyResponse, None)
+ resp = clients.tail.head.ping(1, 0)
+ assertStatus(resp.status, Success)
+ assertEquals(resp.topologyResponse, None)
+ resp = clients.head.ping(2, 0)
+ assertStatus(resp.status, Success)
+ assertTopologyReceived(resp.topologyResponse.get)
+ resp = clients.tail.head.ping(2, 1)
+ assertStatus(resp.status, Success)
+ assertTopologyReceived(resp.topologyResponse.get)
+ resp = clients.tail.head.ping(2, 2)
+ assertStatus(resp.status, Success)
+ assertEquals(resp.topologyResponse, None)
+ }
+
+ private def assertTopologyReceived(topologyResp: AbstractTopologyResponse) {
+ assertEquals(topologyResp.view.topologyId, 2)
+ assertEquals(topologyResp.view.members.size, 2)
+ assertEquals(topologyResp.view.members.head, TopologyAddress("127.0.0.1", 11311, 0))
+ assertEquals(topologyResp.view.members.tail.head, TopologyAddress("127.0.0.1", 11361, 0))
+ }
+
+ def testReplicatedPutWithTopologyAwareClient(m: Method) {
+ var resp = clients.head.put(k(m) , 0, 0, v(m), 1, 0)
+ assertStatus(resp.status, Success)
+ assertEquals(resp.topologyResponse, None)
+ assertSuccess(clients.tail.head.get(k(m), 0), v(m))
+ resp = clients.head.put(k(m) , 0, 0, v(m, "v1-"), 2, 0)
+ assertStatus(resp.status, Success)
+ assertTopologyReceived(resp.topologyResponse.get)
+ resp = clients.tail.head.put(k(m) , 0, 0, v(m, "v2-"), 2, 1)
+ assertStatus(resp.status, Success)
+ assertTopologyReceived(resp.topologyResponse.get)
+ resp = clients.head.put(k(m) , 0, 0, v(m, "v3-"), 2, 2)
+ assertStatus(resp.status, Success)
+ assertEquals(resp.topologyResponse, None)
+ assertSuccess(clients.tail.head.get(k(m), 0), v(m, "v3-"))
+ }
+
}
\ No newline at end of file
Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodStatsTest.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodStatsTest.scala 2010-04-15 13:52:57 UTC (rev 1692)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodStatsTest.scala 2010-04-15 14:48:24 UTC (rev 1693)
@@ -34,8 +34,7 @@
assertEquals(s.get("currentNumberOfEntries").get, "1")
assertEquals(s.get("totalNumberOfEntries").get, "1")
assertEquals(s.get("stores").get, "1")
- val (getSt, actual) = client.assertGet(m)
- assertSuccess(getSt, v(m), actual)
+ assertSuccess(client.assertGet(m), v(m))
s = client.stats
assertEquals(s.get("hits").get, "1")
assertEquals(s.get("misses").get, "0")
Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodClient.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodClient.scala 2010-04-15 13:52:57 UTC (rev 1692)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodClient.scala 2010-04-15 14:48:24 UTC (rev 1693)
@@ -52,87 +52,82 @@
def stop = ch.disconnect
- def put(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte]): OperationStatus =
- execute(0xA0, 0x01, defaultCacheName, k, lifespan, maxIdle, v, 0)
+ def put(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte]): Response =
+ execute(0xA0, 0x01, defaultCacheName, k, lifespan, maxIdle, v, 0, 1 ,0)
+ def put(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], clientIntelligence: Byte, topologyId: Int): Response =
+ execute(0xA0, 0x01, defaultCacheName, k, lifespan, maxIdle, v, 0, clientIntelligence, topologyId)
+
def assertPut(m: Method) {
- val status = put(k(m) , 0, 0, v(m))
+ val status = put(k(m) , 0, 0, v(m)).status
assertStatus(status, Success)
}
def assertPut(m: Method, kPrefix: String, vPrefix: String) {
- val status = put(k(m, kPrefix) , 0, 0, v(m, vPrefix))
+ val status = put(k(m, kPrefix) , 0, 0, v(m, vPrefix)).status
assertStatus(status, Success)
}
def assertPut(m: Method, lifespan: Int, maxIdle: Int) {
- val status = put(k(m) , lifespan, maxIdle, v(m))
+ val status = put(k(m) , lifespan, maxIdle, v(m)).status
assertStatus(status, Success)
}
- def put(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], flags: Int): (OperationStatus, Array[Byte]) =
+ def put(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], flags: Int): Response =
execute(0xA0, 0x01, defaultCacheName, k, lifespan, maxIdle, v, 0, flags)
- def putIfAbsent(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte]): OperationStatus =
- execute(0xA0, 0x05, defaultCacheName, k, lifespan, maxIdle, v, 0)
+ def putIfAbsent(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte]): Response =
+ execute(0xA0, 0x05, defaultCacheName, k, lifespan, maxIdle, v, 0, 1 ,0)
- def putIfAbsent(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], flags: Int): (OperationStatus, Array[Byte]) =
+ def putIfAbsent(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], flags: Int): Response =
execute(0xA0, 0x05, defaultCacheName, k, lifespan, maxIdle, v, 0, flags)
- def replace(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte]): OperationStatus =
- execute(0xA0, 0x07, defaultCacheName, k, lifespan, maxIdle, v, 0)
+ def replace(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte]): Response =
+ execute(0xA0, 0x07, defaultCacheName, k, lifespan, maxIdle, v, 0, 1 ,0)
- def replace(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], flags: Int): (OperationStatus, Array[Byte]) =
+ def replace(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], flags: Int): Response =
execute(0xA0, 0x07, defaultCacheName, k, lifespan, maxIdle, v, 0, flags)
- def replaceIfUnmodified(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], version: Long): OperationStatus =
- execute(0xA0, 0x09, defaultCacheName, k, lifespan, maxIdle, v, version)
+ def replaceIfUnmodified(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], version: Long): Response =
+ execute(0xA0, 0x09, defaultCacheName, k, lifespan, maxIdle, v, version, 1 ,0)
- def replaceIfUnmodified(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], version: Long, flags: Int): (OperationStatus, Array[Byte]) =
+ def replaceIfUnmodified(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], version: Long, flags: Int): Response =
execute(0xA0, 0x09, defaultCacheName, k, lifespan, maxIdle, v, version, flags)
- def remove(k: Array[Byte]): OperationStatus =
- execute(0xA0, 0x0B, defaultCacheName, k, 0, 0, null, 0)
+ def remove(k: Array[Byte]): Response =
+ execute(0xA0, 0x0B, defaultCacheName, k, 0, 0, null, 0, 1 ,0)
- def remove(k: Array[Byte], flags: Int): (OperationStatus, Array[Byte]) =
+ def remove(k: Array[Byte], flags: Int): Response =
execute(0xA0, 0x0B, defaultCacheName, k, 0, 0, null, 0, flags)
- def removeIfUnmodified(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], version: Long): OperationStatus =
- execute(0xA0, 0x0D, defaultCacheName, k, lifespan, maxIdle, v, version)
+ def removeIfUnmodified(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], version: Long): Response =
+ execute(0xA0, 0x0D, defaultCacheName, k, lifespan, maxIdle, v, version, 1 ,0)
- def removeIfUnmodified(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], version: Long, flags: Int): (OperationStatus, Array[Byte]) =
+ def removeIfUnmodified(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], version: Long, flags: Int): Response =
execute(0xA0, 0x0D, defaultCacheName, k, lifespan, maxIdle, v, version, flags)
def execute(magic: Int, code: Byte, name: String, k: Array[Byte], lifespan: Int, maxIdle: Int,
- v: Array[Byte], version: Long): OperationStatus = {
- val op = new Op(magic, code, name, k, lifespan, maxIdle, v, 0, version)
- execute(op, op.id)._1
+ v: Array[Byte], version: Long, clientIntelligence: Byte, topologyId: Int): Response = {
+ val op = new Op(magic, code, name, k, lifespan, maxIdle, v, 0, version, clientIntelligence, topologyId)
+ execute(op, op.id)
}
def executeWithBadMagic(magic: Int, code: Byte, name: String, k: Array[Byte], lifespan: Int, maxIdle: Int,
- v: Array[Byte], version: Long): OperationStatus = {
- val op = new Op(magic, code, name, k, lifespan, maxIdle, v, 0, version)
- execute(op, 0)._1
+ v: Array[Byte], version: Long): ErrorResponse = {
+ val op = new Op(magic, code, name, k, lifespan, maxIdle, v, 0, version, 1, 0)
+ execute(op, 0).asInstanceOf[ErrorResponse]
}
def execute(magic: Int, code: Byte, name: String, k: Array[Byte], lifespan: Int, maxIdle: Int,
- v: Array[Byte], version: Long, flags: Int): (OperationStatus, Array[Byte]) = {
- val op = new Op(magic, code, name, k, lifespan, maxIdle, v, flags, version)
+ v: Array[Byte], version: Long, flags: Int): Response = {
+ val op = new Op(magic, code, name, k, lifespan, maxIdle, v, flags, version, 1, 0)
execute(op, op.id)
}
- private def execute(op: Op, expectedResponseMessageId: Long): (OperationStatus, Array[Byte]) = {
+ private def execute(op: Op, expectedResponseMessageId: Long): Response = {
writeOp(op)
var handler = ch.getPipeline.getLast.asInstanceOf[ClientHandler]
- if (op.flags == 1) {
- val respWithPrevious = handler.getResponse(expectedResponseMessageId).asInstanceOf[ResponseWithPrevious]
- if (respWithPrevious.previous == None)
- (respWithPrevious.status, Array())
- else
- (respWithPrevious.status, respWithPrevious.previous.get)
- } else {
- (handler.getResponse(expectedResponseMessageId).status, null)
- }
+ handler.getResponse(expectedResponseMessageId)
}
private def writeOp(op: Op) {
@@ -142,45 +137,37 @@
assertTrue(future.isSuccess)
}
- def get(k: Array[Byte], flags: Int): (OperationStatus, Array[Byte]) = {
- val (getSt, actual, version) = get(0x03, k, 0)
- (getSt, actual)
+ def get(k: Array[Byte], flags: Int): GetResponse = {
+ get(0x03, k, 0).asInstanceOf[GetResponse]
}
- def assertGet(m: Method): (OperationStatus, Array[Byte]) = assertGet(m, 0)
+ def assertGet(m: Method): GetResponse = assertGet(m, 0)
- def assertGet(m: Method, flags: Int): (OperationStatus, Array[Byte]) = get(k(m), flags)
+ def assertGet(m: Method, flags: Int): GetResponse = get(k(m), flags)
- def containsKey(k: Array[Byte], flags: Int): OperationStatus = {
- val (containsKeySt, actual, version) = get(0x0F, k, 0)
- containsKeySt
+ def containsKey(k: Array[Byte], flags: Int): Response = {
+ get(0x0F, k, 0)
}
- def getWithVersion(k: Array[Byte], flags: Int): (OperationStatus, Array[Byte], Long) =
- get(0x11, k, 0)
+ def getWithVersion(k: Array[Byte], flags: Int): GetWithVersionResponse =
+ get(0x11, k, 0).asInstanceOf[GetWithVersionResponse]
- private def get(code: Byte, k: Array[Byte], flags: Int): (OperationStatus, Array[Byte], Long) = {
- val op = new Op(0xA0, code, defaultCacheName, k, 0, 0, null, flags, 0)
+ private def get(code: Byte, k: Array[Byte], flags: Int): Response = {
+ val op = new Op(0xA0, code, defaultCacheName, k, 0, 0, null, flags, 0, 1, 0)
val writeFuture = writeOp(op)
// Get the handler instance to retrieve the answer.
var handler = ch.getPipeline.getLast.asInstanceOf[ClientHandler]
- if (code == 0x03) {
- val resp = handler.getResponse(op.id).asInstanceOf[GetResponse]
- (resp.status, if (resp.data == None) null else resp.data.get, 0)
- } else if (code == 0x11) {
- val resp = handler.getResponse(op.id).asInstanceOf[GetWithVersionResponse]
- (resp.status, if (resp.data == None) null else resp.data.get, resp.version)
- } else if (code == 0x0F) {
- (handler.getResponse(op.id).status, null, 0)
+ if (code == 0x03 || code == 0x11 || code == 0x0F) {
+ handler.getResponse(op.id)
} else {
- (OperationNotExecuted, null, 0)
+ null
}
}
- def clear: OperationStatus = execute(0xA0, 0x13, defaultCacheName, null, 0, 0, null, 0)
+ def clear: Response = execute(0xA0, 0x13, defaultCacheName, null, 0, 0, null, 0, 1 ,0)
def stats: Map[String, String] = {
- val op = new StatsOp(0xA0, 0x15, defaultCacheName, null)
+ val op = new StatsOp(0xA0, 0x15, defaultCacheName, 1, 0, null)
val writeFuture = writeOp(op)
// Get the handler instance to retrieve the answer.
var handler = ch.getPipeline.getLast.asInstanceOf[ClientHandler]
@@ -188,8 +175,11 @@
resp.stats
}
- def ping: OperationStatus = execute(0xA0, 0x17, defaultCacheName, null, 0, 0, null, 0)
+ def ping: Response = execute(0xA0, 0x17, defaultCacheName, null, 0, 0, null, 0, 1 ,0)
+ def ping(clientIntelligence: Byte, topologyId: Int): Response =
+ execute(0xA0, 0x17, defaultCacheName, null, 0, 0, null, 0, clientIntelligence, topologyId)
+
}
private class ClientPipelineFactory(client: HotRodClient) extends ChannelPipelineFactory {
@@ -217,8 +207,8 @@
buffer.writeByte(op.code) // opcode
buffer.writeRangedBytes(op.cacheName.getBytes()) // cache name length + cache name
buffer.writeUnsignedInt(op.flags) // flags
- buffer.writeByte(0) // client intelligence
- buffer.writeUnsignedInt(0) // topology id
+ buffer.writeByte(op.clientIntelligence) // client intelligence
+ buffer.writeUnsignedInt(op.topologyId) // topology id
if (op.code != 0x13 && op.code != 0x15 && op.code != 0x17) { // if it's a key based op...
buffer.writeRangedBytes(op.key) // key length + key
if (op.value != null) {
@@ -255,6 +245,27 @@
val opCode = OperationResponse.apply(buf.readUnsignedByte)
val status = OperationStatus.apply(buf.readUnsignedByte)
val topologyChangeMarker = buf.readUnsignedByte
+ val op = client.idToOp.get(id)
+ val topologyChangeResponse =
+ if (topologyChangeMarker == 1) {
+ val topologyId = buf.readUnsignedInt
+ if (op.clientIntelligence == 2) {
+ val numberClusterMembers = buf.readUnsignedInt
+ val viewArray = new Array[TopologyAddress](numberClusterMembers)
+ for (i <- 0 until numberClusterMembers) {
+ val host = buf.readString
+ val port = buf.readUnsignedShort
+ viewArray(i) = TopologyAddress(host, port, 0)
+ }
+ Some(TopologyAwareResponse(TopologyView(topologyId, viewArray.toList)))
+ } else if (op.clientIntelligence == 3) {
+ None // TODO: Parse hash distribution aware
+ } else {
+ None // Is it possible?
+ }
+ } else {
+ None
+ }
val resp: Response = opCode match {
case StatsResponse => {
val size = buf.readUnsignedInt
@@ -262,41 +273,40 @@
for (i <- 1 to size) {
stats += (buf.readString -> buf.readString)
}
- new StatsResponse(id, immutable.Map[String, String]() ++ stats)
+ new StatsResponse(id, immutable.Map[String, String]() ++ stats, topologyChangeResponse)
}
case PutResponse | PutIfAbsentResponse | ReplaceResponse | ReplaceIfUnmodifiedResponse
| RemoveResponse | RemoveIfUnmodifiedResponse => {
- val op = client.idToOp.get(id)
if (op.flags == 1) {
val length = buf.readUnsignedInt
if (length == 0) {
- new ResponseWithPrevious(id, opCode, status, None)
+ new ResponseWithPrevious(id, opCode, status, topologyChangeResponse, None)
} else {
val previous = new Array[Byte](length)
buf.readBytes(previous)
- new ResponseWithPrevious(id, opCode, status, Some(previous))
+ new ResponseWithPrevious(id, opCode, status, topologyChangeResponse, Some(previous))
}
- } else new Response(id, opCode, status)
+ } else new Response(id, opCode, status, topologyChangeResponse)
}
- case ContainsKeyResponse | ClearResponse | PingResponse => new Response(id, opCode, status)
+ case ContainsKeyResponse | ClearResponse | PingResponse => new Response(id, opCode, status, topologyChangeResponse)
case GetWithVersionResponse => {
if (status == Success) {
val version = buf.readLong
val data = Some(buf.readRangedBytes)
- new GetWithVersionResponse(id, opCode, status, data, version)
+ new GetWithVersionResponse(id, opCode, status, topologyChangeResponse, data, version)
} else{
- new GetWithVersionResponse(id, opCode, status, None, 0)
+ new GetWithVersionResponse(id, opCode, status, topologyChangeResponse, None, 0)
}
}
case GetResponse => {
if (status == Success) {
val data = Some(buf.readRangedBytes)
- new GetResponse(id, opCode, status, data)
+ new GetResponse(id, opCode, status, topologyChangeResponse, data)
} else{
- new GetResponse(id, opCode, status, None)
+ new GetResponse(id, opCode, status, topologyChangeResponse, None)
}
}
- case ErrorResponse => new ErrorResponse(id, status, buf.readString)
+ case ErrorResponse => new ErrorResponse(id, status, topologyChangeResponse, buf.readString)
}
trace("Got response from server: {0}", resp)
resp
@@ -328,7 +338,7 @@
i += 1
}
}
- while (v == null && i < 100)
+ while (v == null && i < 10000)
v
}
@@ -342,11 +352,15 @@
val maxIdle: Int,
val value: Array[Byte],
val flags: Int,
- val version: Long) {
+ val version: Long,
+ val clientIntelligence: Byte,
+ val topologyId: Int) {
lazy val id = HotRodClient.idCounter.incrementAndGet
}
class StatsOp(override val magic: Int,
- override val code: Byte,
- override val cacheName: String,
- val statName: String) extends Op(magic, code, cacheName, null, 0, 0, null, 0, 0)
\ No newline at end of file
+ override val code: Byte,
+ override val cacheName: String,
+ override val clientIntelligence: Byte,
+ override val topologyId: Int,
+ val statName: String) extends Op(magic, code, cacheName, null, 0, 0, null, 0, 0, clientIntelligence, topologyId)
\ No newline at end of file
Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodTestingUtil.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodTestingUtil.scala 2010-04-15 13:52:57 UTC (rev 1692)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodTestingUtil.scala 2010-04-15 14:48:24 UTC (rev 1693)
@@ -3,11 +3,12 @@
import java.util.concurrent.atomic.AtomicInteger
import org.infinispan.manager.CacheManager
import java.lang.reflect.Method
-import org.infinispan.server.hotrod.{HotRodServer}
import org.infinispan.server.core.Logging
import java.util.Arrays
import org.infinispan.server.hotrod.OperationStatus._
import org.testng.Assert._
+import org.infinispan.util.Util
+import org.infinispan.server.hotrod.{ResponseWithPrevious, GetWithVersionResponse, GetResponse, HotRodServer}
/**
* // TODO: Document this
@@ -24,14 +25,18 @@
startHotRodServer(manager, UniquePortThreadLocal.get.intValue)
def startHotRodServer(manager: CacheManager, port: Int): HotRodServer = {
- val server = new HotRodServer
+ val server = new HotRodServer {
+ override protected def defineTopologyCacheConfig(cacheManager: CacheManager) {
+ // No-op since topology cache configuration comes defined by the test
+ }
+ }
server.start(host, port, manager, 0, 0)
server
}
def k(m: Method, prefix: String): Array[Byte] = {
val bytes: Array[Byte] = (prefix + m.getName).getBytes
- trace("String {0} is converted to {1} bytes", prefix + m.getName, bytes)
+ trace("String {0} is converted to {1} bytes", prefix + m.getName, Util.printArray(bytes, true))
bytes
}
@@ -47,19 +52,32 @@
isSuccess
}
- def assertSuccess(status: OperationStatus, expected: Array[Byte], actual: Array[Byte]): Boolean = {
- assertStatus(status, Success)
- val isSuccess = Arrays.equals(expected, actual)
+ def assertSuccess(resp: GetResponse, expected: Array[Byte]): Boolean = {
+ assertStatus(resp.status, Success)
+ val isSuccess = Arrays.equals(expected, resp.data.get)
assertTrue(isSuccess)
isSuccess
}
- def assertKeyDoesNotExist(status: OperationStatus, actual: Array[Byte]): Boolean = {
+ def assertSuccess(resp: GetWithVersionResponse, expected: Array[Byte], expectedVersion: Int): Boolean = {
+ assertTrue(resp.version != expectedVersion)
+ assertSuccess(resp, expected)
+ }
+
+ def assertSuccess(resp: ResponseWithPrevious, expected: Array[Byte]): Boolean = {
+ assertStatus(resp.status, Success)
+ val isSuccess = Arrays.equals(expected, resp.previous.get)
+ assertTrue(isSuccess)
+ isSuccess
+ }
+
+ def assertKeyDoesNotExist(resp: GetResponse): Boolean = {
+ val status = resp.status
assertTrue(status == KeyDoesNotExist, "Status should have been 'KeyDoesNotExist' but instead was: " + status)
- assertNull(actual)
+ assertEquals(resp.data, None)
status == KeyDoesNotExist
}
-
+
}
object UniquePortThreadLocal extends ThreadLocal[Int] {
Modified: trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedValue.scala
===================================================================
--- trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedValue.scala 2010-04-15 13:52:57 UTC (rev 1692)
+++ trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedValue.scala 2010-04-15 14:48:24 UTC (rev 1693)
@@ -3,7 +3,7 @@
import org.infinispan.server.core.CacheValue
import org.infinispan.util.Util
import java.io.{ObjectOutput, ObjectInput}
-import org.infinispan.marshall.{Marshallable, Externalizer}
+import org.infinispan.marshall.Marshallable
/**
* // TODO: Document this
@@ -11,7 +11,7 @@
* @since
*/
// TODO: putting Ids.MEMCACHED_CACHE_VALUE fails compilation in 2.8 - https://lampsvn.epfl.ch/trac/scala/ticket/2764
- at Marshallable(externalizer = classOf[MemcachedValueExternalizer], id = 56)
+ at Marshallable(externalizer = classOf[MemcachedValue.Externalizer], id = 56)
class MemcachedValue(override val data: Array[Byte], override val version: Long, val flags: Int)
extends CacheValue(data, version) {
@@ -25,20 +25,22 @@
}
-private class MemcachedValueExternalizer extends Externalizer {
- override def writeObject(output: ObjectOutput, obj: AnyRef) {
- val cacheValue = obj.asInstanceOf[MemcachedValue]
- output.write(cacheValue.data.length)
- output.write(cacheValue.data)
- output.writeLong(cacheValue.version)
- output.writeInt(cacheValue.flags)
- }
+object MemcachedValue {
+ class Externalizer extends org.infinispan.marshall.Externalizer {
+ override def writeObject(output: ObjectOutput, obj: AnyRef) {
+ val cacheValue = obj.asInstanceOf[MemcachedValue]
+ output.write(cacheValue.data.length)
+ output.write(cacheValue.data)
+ output.writeLong(cacheValue.version)
+ output.writeInt(cacheValue.flags)
+ }
- override def readObject(input: ObjectInput): AnyRef = {
- val data = new Array[Byte](input.read())
- input.read(data)
- val version = input.readLong
- val flags = input.readInt
- new MemcachedValue(data, version, flags)
+ override def readObject(input: ObjectInput): AnyRef = {
+ val data = new Array[Byte](input.read())
+ input.readFully(data)
+ val version = input.readLong
+ val flags = input.readInt
+ new MemcachedValue(data, version, flags)
+ }
}
}
\ No newline at end of file
More information about the infinispan-commits
mailing list