[infinispan-commits] Infinispan SVN: r1561 - in trunk/server: core/src/main/java/org/infinispan/server/core/transport and 5 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Wed Mar 3 06:39:19 EST 2010
Author: galder.zamarreno at jboss.com
Date: 2010-03-03 06:39:18 -0500 (Wed, 03 Mar 2010)
New Revision: 1561
Added:
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Replies.scala
Removed:
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/EncodedData.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Reply.scala
Modified:
trunk/server/core/src/main/java/org/infinispan/server/core/MessageEvent.java
trunk/server/core/src/main/java/org/infinispan/server/core/transport/ChannelBuffers.java
trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyChannelBuffers.java
trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyChannelHandlerContext.java
trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyChannelPipelineFactory.java
trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyMessageEvent.java
trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyServer.java
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Cache.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CallerCache.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Command.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder410.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Flags.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/GlobalDecoder.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Handler.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/StatsCache.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/StorageCommand.scala
trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FlagsTest.scala
trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FunctionalTest.scala
trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Client.scala
trunk/server/memcached/src/main/java/org/infinispan/server/memcached/TextServer.java
Log:
[ISPN-171] (Build a server module based on the HotRod protocol) Response encoding added and got a put decode/encode working. This is a bit preliminary and will get tuned as more commands are written. Also, added encoder part to main server instance which comes in handy for Hot Rod.
Modified: trunk/server/core/src/main/java/org/infinispan/server/core/MessageEvent.java
===================================================================
--- trunk/server/core/src/main/java/org/infinispan/server/core/MessageEvent.java 2010-03-03 07:36:05 UTC (rev 1560)
+++ trunk/server/core/src/main/java/org/infinispan/server/core/MessageEvent.java 2010-03-03 11:39:18 UTC (rev 1561)
@@ -22,6 +22,8 @@
*/
package org.infinispan.server.core;
+import org.infinispan.server.core.transport.Channel;
+
import java.net.SocketAddress;
/**
@@ -33,4 +35,5 @@
public interface MessageEvent {
Object getMessage();
SocketAddress getRemoteAddress();
+ Channel getChannel();
}
Modified: trunk/server/core/src/main/java/org/infinispan/server/core/transport/ChannelBuffers.java
===================================================================
--- trunk/server/core/src/main/java/org/infinispan/server/core/transport/ChannelBuffers.java 2010-03-03 07:36:05 UTC (rev 1560)
+++ trunk/server/core/src/main/java/org/infinispan/server/core/transport/ChannelBuffers.java 2010-03-03 11:39:18 UTC (rev 1561)
@@ -31,4 +31,5 @@
public interface ChannelBuffers {
ChannelBuffer wrappedBuffer(ChannelBuffer... buffers);
ChannelBuffer wrappedBuffer(byte[] array);
+ ChannelBuffer dynamicBuffer();
}
Modified: trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyChannelBuffers.java
===================================================================
--- trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyChannelBuffers.java 2010-03-03 07:36:05 UTC (rev 1560)
+++ trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyChannelBuffers.java 2010-03-03 11:39:18 UTC (rev 1561)
@@ -48,6 +48,11 @@
return new NettyChannelBuffer(org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer(nettyBuffers));
}
+ @Override
+ public ChannelBuffer dynamicBuffer() {
+ return new NettyChannelBuffer(org.jboss.netty.buffer.ChannelBuffers.dynamicBuffer());
+ }
+
public static NettyChannelBuffers getInstance() {
return INSTANCE;
}
Modified: trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyChannelHandlerContext.java
===================================================================
--- trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyChannelHandlerContext.java 2010-03-03 07:36:05 UTC (rev 1560)
+++ trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyChannelHandlerContext.java 2010-03-03 11:39:18 UTC (rev 1561)
@@ -47,6 +47,6 @@
}
public ChannelBuffers getChannelBuffers() {
- return NettyChannelBuffers.INSTANCE;
+ return NettyChannelBuffers.getInstance();
}
}
Modified: trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyChannelPipelineFactory.java
===================================================================
--- trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyChannelPipelineFactory.java 2010-03-03 07:36:05 UTC (rev 1560)
+++ trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyChannelPipelineFactory.java 2010-03-03 11:39:18 UTC (rev 1561)
@@ -24,9 +24,11 @@
import static org.jboss.netty.channel.Channels.pipeline;
+import org.jboss.netty.channel.ChannelDownstreamHandler;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.ChannelUpstreamHandler;
/**
* NettyChannelPipelineFactory.
@@ -35,20 +37,30 @@
* @since 4.0
*/
public class NettyChannelPipelineFactory implements ChannelPipelineFactory {
- private final ChannelHandler decoder;
+ private final ChannelUpstreamHandler decoder;
+ private final ChannelDownstreamHandler encoder;
private final ChannelHandler handler;
- public NettyChannelPipelineFactory(ChannelHandler decoder, ChannelHandler handler) {
+ public NettyChannelPipelineFactory(ChannelUpstreamHandler decoder, ChannelDownstreamHandler encoder, ChannelHandler handler) {
this.decoder = decoder;
this.handler = handler;
+ this.encoder = encoder;
}
@Override
public ChannelPipeline getPipeline() throws Exception {
// Create a default pipeline implementation.
ChannelPipeline pipeline = pipeline();
- pipeline.addLast("decoder", decoder);
- pipeline.addLast("handler", handler);
+ if (decoder != null) {
+ pipeline.addLast("decoder", decoder);
+ }
+ if (encoder != null) {
+ pipeline.addLast("encoder", encoder);
+ }
+ if (handler != null) {
+ pipeline.addLast("handler", handler);
+ }
+
return pipeline;
}
Modified: trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyMessageEvent.java
===================================================================
--- trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyMessageEvent.java 2010-03-03 07:36:05 UTC (rev 1560)
+++ trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyMessageEvent.java 2010-03-03 11:39:18 UTC (rev 1561)
@@ -25,6 +25,7 @@
import java.net.SocketAddress;
import org.infinispan.server.core.MessageEvent;
+import org.infinispan.server.core.transport.Channel;
/**
* NettyMessageEvent.
@@ -49,4 +50,8 @@
return event.getRemoteAddress();
}
+ @Override
+ public Channel getChannel() {
+ return new NettyChannel(event.getChannel());
+ }
}
Modified: trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyServer.java
===================================================================
--- trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyServer.java 2010-03-03 07:36:05 UTC (rev 1560)
+++ trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyServer.java 2010-03-03 11:39:18 UTC (rev 1561)
@@ -35,9 +35,10 @@
import org.infinispan.util.logging.LogFactory;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelDownstreamHandler;
import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.ChannelUpstreamHandler;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.ChannelGroupFuture;
import org.jboss.netty.channel.group.DefaultChannelGroup;
@@ -45,6 +46,8 @@
/**
* NettyChannelFactory.
+ *
+ * // TODO: Make this class more generic and remove any memcached references
*
* @author Galder Zamarreño
* @since 4.0
@@ -59,8 +62,8 @@
final ExecutorService masterExecutor;
final ExecutorService workerExecutor;
- public NettyServer(CommandHandler commandHandler, ChannelHandler decoder, SocketAddress address,
- int masterThreads, int workerThreads, String cacheName) {
+ public NettyServer(CommandHandler commandHandler, ChannelUpstreamHandler decoder, ChannelDownstreamHandler encoder,
+ SocketAddress address, int masterThreads, int workerThreads, String cacheName) {
ThreadFactory tf = new MemcachedThreadFactory(cacheName, ExecutorType.MASTER);
if (masterThreads == 0) {
log.debug("Configured unlimited threads for master thread pool");
@@ -80,7 +83,7 @@
}
NettyChannelUpstreamHandler handler = new NettyChannelUpstreamHandler(commandHandler, acceptedChannels);
- this.pipeline = new NettyChannelPipelineFactory(decoder, handler);
+ this.pipeline = new NettyChannelPipelineFactory(decoder, encoder, handler);
this.address = address;
if (workerThreads == 0) {
factory = new NioServerSocketChannelFactory(masterExecutor, workerExecutor);
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Cache.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Cache.scala 2010-03-03 07:36:05 UTC (rev 1560)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Cache.scala 2010-03-03 11:39:18 UTC (rev 1561)
@@ -7,6 +7,6 @@
*/
abstract class Cache {
- def put(c: StorageCommand): Reply.Value
- def get(c: RetrievalCommand): Reply.Value
+ def put(c: StorageCommand): Response
+ def get(c: RetrievalCommand): Response
}
\ No newline at end of file
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CallerCache.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CallerCache.scala 2010-03-03 07:36:05 UTC (rev 1560)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CallerCache.scala 2010-03-03 11:39:18 UTC (rev 1561)
@@ -11,13 +11,13 @@
class CallerCache(val manager: CacheManager) extends Cache {
- override def put(c: StorageCommand): Reply.Value = {
+ override def put(c: StorageCommand): Response = {
val cache: InfinispanCache[Array[Byte], Array[Byte]] = manager.getCache(c.cacheName)
cache.put(c.key, c.value)
- Reply.Stored
+ new Response(OpCodes.PutResponse, c.id, Status.Success)
}
- override def get(c: RetrievalCommand): Reply.Value = {
+ override def get(c: RetrievalCommand): Response = {
null
}
}
\ No newline at end of file
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Command.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Command.scala 2010-03-03 07:36:05 UTC (rev 1560)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Command.scala 2010-03-03 11:39:18 UTC (rev 1561)
@@ -8,5 +8,5 @@
@Deprecated
trait Command {
-// def perform(op: Unit => Reply)
+// def perform(op: Unit => Replies)
}
\ No newline at end of file
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder410.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder410.scala 2010-03-03 07:36:05 UTC (rev 1560)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder410.scala 2010-03-03 11:39:18 UTC (rev 1561)
@@ -14,8 +14,7 @@
val Put = 0x01
- @Override
- def decode(ctx: ChannelHandlerContext, buffer: ChannelBuffer, state: NoState): StorageCommand = {
+ override def decode(ctx: ChannelHandlerContext, buffer: ChannelBuffer, state: NoState): StorageCommand = {
val op = buffer.readUnsignedByte
val cacheName = readString(buffer)
val id = VLong.read(buffer)
@@ -27,7 +26,7 @@
val lifespan = VInt.read(buffer)
val maxIdle = VInt.read(buffer)
val value = readByteArray(buffer)
- new StorageCommand(cacheName, key, lifespan, maxIdle, value, flags)({
+ new StorageCommand(cacheName, id, key, lifespan, maxIdle, value, flags)({
(cache: Cache, command: StorageCommand) => cache.put(command)
})
}
@@ -36,8 +35,7 @@
command
}
- @Override
- def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent) {
+ override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent) {
error("Error", e.getCause)
}
Deleted: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/EncodedData.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/EncodedData.scala 2010-03-03 07:36:05 UTC (rev 1560)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/EncodedData.scala 2010-03-03 11:39:18 UTC (rev 1561)
@@ -1,35 +0,0 @@
-package org.infinispan.server.hotrod
-
-/**
- * // TODO: Document this
- * @author Galder Zamarreño
- * @since 4.0
- */
-
- at Deprecated
-object EncodedData extends Enumeration {
- type EncodedData = Value
- val Array = Value(1)
- val Byte = Value(1 << 1)
- val Boolean = Value(1 << 2)
- val Character = Value(1 << 3)
- val String = Value(1 << 4)
- val Date = Value(1 << 5)
- val Double = Value(1 << 6)
- // 1 << 7 skipped since that's the variable length marker
- val Float = Value(1 << 8)
- val Integer = Value(1 << 9)
- val Long = Value(1 << 10)
- val Map = Value(1 << 11)
- val Primitive = Value(1 << 12)
- val Serialized = Value(1 << 13)
- val Short = Value(1 << 14)
- // 1 << 15 skipped since that's the variable length marker
- val Compressed = Value(1 << 16)
- val StringBuilder = Value(1 << 17)
-
-}
-
- at Deprecated
-class EncodedData(dataType: EncodedData, length: Long, data: Array[Byte]) {
-}
\ No newline at end of file
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Flags.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Flags.scala 2010-03-03 07:36:05 UTC (rev 1560)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Flags.scala 2010-03-03 11:39:18 UTC (rev 1561)
@@ -1,6 +1,7 @@
package org.infinispan.server.hotrod
import scala.collection.mutable.HashSet
+import scala.collection.immutable
import org.infinispan.context.Flag
/**
@@ -11,8 +12,6 @@
object Flags extends Enumeration {
- type Flags = Value
-
private val ZeroLockAcquisitionTimeout = Value(1, Flag.ZERO_LOCK_ACQUISITION_TIMEOUT.toString)
private val CacheModeLocal = Value(1 << 1, Flag.CACHE_MODE_LOCAL.toString)
private val SkipLocking = Value(1 << 2, Flag.SKIP_LOCKING.toString)
@@ -28,8 +27,8 @@
def extractFlags(bitFlags: Int): Set[Flag] = {
val s = new HashSet[Flag]
- Flags.filter(f => (bitFlags & f.id) > 0).foreach(f => s += Flag.valueOf(f.toString))
- new scala.collection.immutable.HashSet ++ s
+ Flags.values.filter(f => (bitFlags & f.id) > 0).foreach(f => s += Flag.valueOf(f.toString))
+ new immutable.HashSet ++ s
}
}
\ No newline at end of file
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/GlobalDecoder.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/GlobalDecoder.scala 2010-03-03 07:36:05 UTC (rev 1560)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/GlobalDecoder.scala 2010-03-03 11:39:18 UTC (rev 1561)
@@ -11,17 +11,21 @@
class GlobalDecoder extends Decoder[NoState] {
import GlobalDecoder._
- val Magic = 0xA0
- val Version410 = 41
+ private val Magic = 0xA0
+ private val Version410 = 41
override def decode(ctx: ChannelHandlerContext, buffer: ChannelBuffer, state: NoState): Object = {
// trace("Buffer contains: {0}", buffer)
// state match {
// case NoState.VOID => {
- val header = getHeader(buffer)
- if (header == null) null
- verifyMagic(header)
- val version = getVersion(header)
+// val header = buffer.readBytes(2)
+ val magic = buffer.readUnsignedByte()
+ if (magic != Magic) {
+ buffer.resetReaderIndex()
+ throw new StreamCorruptedException("Magic byte incorrect: " + magic)
+ }
+
+ val version = buffer.readUnsignedByte()
val decoder =
version match {
case Version410 => new Decoder410
@@ -36,23 +40,18 @@
error("Error", e.getCause)
}
- private def getHeader(buffer: ChannelBuffer): ChannelBuffer = {
- if (buffer.readableBytes() < 2) null
- buffer.readBytes(2)
- }
+// private def getHeader(buffer: ChannelBuffer): ChannelBuffer = {
+// if (buffer.readableBytes() < 2) null
+//
+// }
+//
+// private def verifyMagic(buffer: ChannelBuffer) {
+// }
+//
+// private def getVersion(buffer: ChannelBuffer) = {
+//
+// }
- private def verifyMagic(buffer: ChannelBuffer) {
- val magic = buffer.readUnsignedByte()
- if (magic != Magic) {
- buffer.resetReaderIndex()
- throw new StreamCorruptedException("Magic byte incorrect: " + magic)
- }
- }
-
- private def getVersion(buffer: ChannelBuffer) = {
- buffer.readUnsignedByte()
- }
-
}
object GlobalDecoder extends Logging
\ No newline at end of file
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Handler.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Handler.scala 2010-03-03 07:36:05 UTC (rev 1560)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Handler.scala 2010-03-03 11:39:18 UTC (rev 1561)
@@ -13,7 +13,7 @@
override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent) {
e.getMessage match {
- case c: StorageCommand => println(c.op(hotCache, c))
+ case c: StorageCommand => e.getChannel.write(c.op(hotCache, c))
}
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala 2010-03-03 07:36:05 UTC (rev 1560)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala 2010-03-03 11:39:18 UTC (rev 1561)
@@ -1,9 +1,9 @@
package org.infinispan.server.hotrod
import org.infinispan.manager.CacheManager
-import org.infinispan.server.core.transport.netty.{NettyServer, NettyReplayingDecoder}
import java.net.InetSocketAddress
import org.infinispan.server.core.Server
+import org.infinispan.server.core.transport.netty.{NettyEncoder, NettyServer, NettyReplayingDecoder}
/**
* // TODO: Document this
@@ -19,13 +19,15 @@
import HotRodServer._
- private var server: Server = null
+ private var server: Server = _
def start {
- var decoder = new GlobalDecoder
+ val decoder = new GlobalDecoder
val nettyDecoder = new NettyReplayingDecoder[NoState](decoder)
+ val encoder = new Encoder410
+ val nettyEncoder = new NettyEncoder(encoder)
val commandHandler = new Handler(new CallerCache(manager))
- server = new NettyServer(commandHandler, nettyDecoder, new InetSocketAddress(host, port),
+ server = new NettyServer(commandHandler, nettyDecoder, nettyEncoder, new InetSocketAddress(host, port),
masterThreads, workerThreads, "HotRod")
server.start
info("Started Hot Rod bound to {0}:{1}", host, port)
Copied: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Replies.scala (from rev 1557, trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Reply.scala)
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Replies.scala (rev 0)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Replies.scala 2010-03-03 11:39:18 UTC (rev 1561)
@@ -0,0 +1,12 @@
+package org.infinispan.server.hotrod
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since 4.0
+ */
+
+object Replies extends Enumeration {
+ type Reply = Value
+ val Stored = Value
+}
\ No newline at end of file
Deleted: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Reply.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Reply.scala 2010-03-03 07:36:05 UTC (rev 1560)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Reply.scala 2010-03-03 11:39:18 UTC (rev 1561)
@@ -1,12 +0,0 @@
-package org.infinispan.server.hotrod
-
-/**
- * // TODO: Document this
- * @author Galder Zamarreño
- * @since 4.0
- */
-
-object Reply extends Enumeration {
- type Reply = Value
- val Stored = Value
-}
\ No newline at end of file
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/StatsCache.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/StatsCache.scala 2010-03-03 07:36:05 UTC (rev 1560)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/StatsCache.scala 2010-03-03 11:39:18 UTC (rev 1561)
@@ -18,12 +18,12 @@
trait StatsCache extends Cache {
- abstract override def put(c: StorageCommand): Reply.Value = {
+ abstract override def put(c: StorageCommand): Response = {
// TODO: calculate stats if necessary
super.put(c)
}
- abstract override def get(c: RetrievalCommand): Reply.Value = {
+ abstract override def get(c: RetrievalCommand): Response = {
// TODO: calculate stats if necessary
super.get(c)
}
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/StorageCommand.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/StorageCommand.scala 2010-03-03 07:36:05 UTC (rev 1560)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/StorageCommand.scala 2010-03-03 11:39:18 UTC (rev 1561)
@@ -9,12 +9,17 @@
*/
// val cache: Cache[Array[Byte], Array[Byte]]
-class StorageCommand(val cacheName: String, val key: Array[Byte], val lifespan: Int,
- val maxIdle: Int, val value: Array[Byte], val flags: Set[Flag])
- (val op: (Cache, StorageCommand) => Reply.Value)
+class StorageCommand(val cacheName: String,
+ val id: Long,
+ val key: Array[Byte],
+ val lifespan: Int,
+ val maxIdle: Int,
+ val value: Array[Byte],
+ val flags: Set[Flag])
+ (val op: (Cache, StorageCommand) => Response)
//{
//
-//// def perform(op: StorageCommand => Reply.Value) {
+//// def perform(op: StorageCommand => Replies.Value) {
//// op(this)
//// }
////
Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FlagsTest.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FlagsTest.scala 2010-03-03 07:36:05 UTC (rev 1560)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FlagsTest.scala 2010-03-03 11:39:18 UTC (rev 1561)
@@ -63,7 +63,7 @@
// }
private def flag(bitFlags: Int)(size: Int)(p: Set[Flag] => Boolean) {
- var flags = Flags.extractFlags(bitFlags)
+ val flags = Flags.extractFlags(bitFlags)
assertEquals(flags.size, size)
assertTrue(p(flags))
}
Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FunctionalTest.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FunctionalTest.scala 2010-03-03 07:36:05 UTC (rev 1560)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FunctionalTest.scala 2010-03-03 11:39:18 UTC (rev 1561)
@@ -23,7 +23,7 @@
*/
@Test(groups = Array("functional"), testName = "server.hotrod.FunctionalTest")
class FunctionalTest extends SingleCacheManagerTest with Utils with Client {
- private var server: HotRodServer = null
+ private var server: HotRodServer = _
override def createCacheManager: CacheManager = {
val cacheManager = TestCacheManagerFactory.createLocalCacheManager
@@ -34,7 +34,8 @@
def testPutBasic(m: Method) {
assertTrue(connect("127.0.0.1", server.port))
- assertTrue(put("__default", k(m) , 0, 0, v(m)))
+ val status = put("__default", k(m) , 0, 0, v(m))
+ assertTrue(status == 0, "Status should have been 0 but instead was: " + status)
}
@AfterClass(alwaysRun = true)
Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Client.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Client.scala 2010-03-03 07:36:05 UTC (rev 1560)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Client.scala 2010-03-03 11:39:18 UTC (rev 1561)
@@ -1,7 +1,6 @@
package org.infinispan.server.hotrod.test
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
-import java.util.concurrent.Executors
import org.jboss.netty.bootstrap.ClientBootstrap
import java.net.InetSocketAddress
import org.jboss.netty.handler.codec.oneone.OneToOneEncoder
@@ -9,7 +8,9 @@
import org.jboss.netty.buffer.{ChannelBuffer, ChannelBuffers}
import org.infinispan.server.core.transport.netty.NettyChannelBuffer
import org.jboss.netty.handler.codec.replay.ReplayingDecoder
-import org.infinispan.server.hotrod.{Logging, NoState, VLong, VInt}
+import org.testng.Assert._
+import java.util.concurrent.{LinkedBlockingQueue, Executors}
+import org.infinispan.server.hotrod._
/**
* // TODO: Document this
@@ -20,7 +21,7 @@
* @since 4.1
*/
trait Client {
- private var channel: Channel = null
+ private var channel: Channel = _
def connect(host: String, port: Int): Boolean = {
// Set up.
@@ -33,17 +34,17 @@
val connectFuture = bootstrap.connect(new InetSocketAddress(host, port))
// Wait until the connection is made successfully.
channel = connectFuture.awaitUninterruptibly.getChannel
-// // Get the handler instance to retrieve the answer.
-// var handler = channel.getPipeline.getLast.asInstanceOf[ClientHandler]
connectFuture.isSuccess
}
- def put(cacheName: String, key: Array[Byte], lifespan: Int, maxIdle: Int, value: Array[Byte]): Boolean = {
+ def put(cacheName: String, key: Array[Byte], lifespan: Int, maxIdle: Int, value: Array[Byte]): Byte = {
val writeFuture = channel.write(new Store(cacheName, key, lifespan, maxIdle, value))
writeFuture.awaitUninterruptibly
- writeFuture.isSuccess
+ assertTrue(writeFuture.isSuccess)
+ // Get the handler instance to retrieve the answer.
+ var handler = channel.getPipeline.getLast.asInstanceOf[ClientHandler]
+ handler.getResponse.status.id.byteValue
}
-
}
@ChannelPipelineCoverage("all")
@@ -53,7 +54,7 @@
val pipeline = Channels.pipeline
pipeline.addLast("decoder", Decoder)
pipeline.addLast("encoder", Encoder)
-// pipeline.addLast("handler", new FactorialClientHandler(count))
+ pipeline.addLast("handler", new ClientHandler)
pipeline
}
@@ -90,8 +91,13 @@
private object Decoder extends ReplayingDecoder[NoState] with Logging {
- override def decode(ctx: ChannelHandlerContext, ch: Channel, buffer: ChannelBuffer, state: NoState) = {
- null
+ override def decode(ctx: ChannelHandlerContext, ch: Channel, buffer: ChannelBuffer, state: NoState): Object = {
+ val buf = new NettyChannelBuffer(buffer)
+ val magic = buf.readUnsignedByte
+ val opCode = buf.readUnsignedByte
+ val id = VLong.read(buf)
+ val status = buf.readUnsignedByte
+ new Response(OpCodes.apply(opCode), id, Status.apply(status))
}
override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent) {
@@ -99,22 +105,22 @@
}
}
-//private class ClientHandler(val command: Any) extends SimpleChannelUpstreamHandler {
-//
-// override def channelConnected(ctx: ChannelHandlerContext, e: ChannelStateEvent) {
-// sendCommand(e)
-// }
-//
-// override def channelInterestChanged(ctx: ChannelHandlerContext, e: ChannelStateEvent) {
-// sendCommand(e)
-// }
-//
-// private def sendCommand(e: ChannelStateEvent) {
-// var channel = e.getChannel
-// channel.write(command)
-// }
-//}
+ at ChannelPipelineCoverage("one")
+private class ClientHandler extends SimpleChannelUpstreamHandler {
+ private val answer = new LinkedBlockingQueue[Response];
+
+ override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent) {
+ val offered = answer.offer(e.getMessage.asInstanceOf[Response])
+ assertTrue(offered)
+ }
+
+ def getResponse: Response = {
+ answer.take
+ }
+
+}
+
private class Store(val cacheName: String, val key: Array[Byte],
val lifespan: Int, val maxIdle: Int,
val value: Array[Byte])
\ No newline at end of file
Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/TextServer.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/TextServer.java 2010-03-03 07:36:05 UTC (rev 1560)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/TextServer.java 2010-03-03 11:39:18 UTC (rev 1561)
@@ -90,7 +90,8 @@
TextCommandHandler commandHandler = new TextCommandHandler(cache, chain);
- server = new NettyServer(commandHandler, nettyDecoder, new InetSocketAddress(host, port),
+ // No common encoder used, each command encodes its response, since there's no common ground for all.
+ server = new NettyServer(commandHandler, nettyDecoder, null, new InetSocketAddress(host, port),
masterThreads, workerThreads, cache.getName());
server.start();
log.info("Started Memcached text server bound to {0}:{1}", host, port);
More information about the infinispan-commits
mailing list