[infinispan-commits] Infinispan SVN: r1571 - in trunk/server: core/src/main/java/org/infinispan/server/core/transport/netty and 5 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Fri Mar 5 06:00:48 EST 2010
Author: galder.zamarreno at jboss.com
Date: 2010-03-05 06:00:46 -0500 (Fri, 05 Mar 2010)
New Revision: 1571
Added:
trunk/server/core/src/main/java/org/infinispan/server/core/transport/NoState.java
trunk/server/core/src/main/java/org/infinispan/server/core/transport/NoStateDecoder.java
trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyDecoder.java
trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyNoStateDecoder.java
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Key.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/RetrievalResponse.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Value.scala
Removed:
trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyReplayingDecoder.java
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/NoState.java
Modified:
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CallerCache.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder410.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/GlobalDecoder.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/StorageCommand.scala
trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FunctionalTest.scala
trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Client.scala
trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Utils.scala
trunk/server/memcached/src/main/java/org/infinispan/server/memcached/TextServer.java
trunk/server/memcached/src/test/java/org/infinispan/server/memcached/FunctionalTest.java
trunk/server/memcached/src/test/java/org/infinispan/server/memcached/StatsTest.java
Log:
[ISPN-171] (Build a server module based on the HotRod protocol) Forgot to add some classes in previous commit, adding them now. Also added put tests with lifespan and maxIdle parameters. Finally, created a state independent decoder in the core module to simplify implementations in hot rod.
Added: trunk/server/core/src/main/java/org/infinispan/server/core/transport/NoState.java
===================================================================
--- trunk/server/core/src/main/java/org/infinispan/server/core/transport/NoState.java (rev 0)
+++ trunk/server/core/src/main/java/org/infinispan/server/core/transport/NoState.java 2010-03-05 11:00:46 UTC (rev 1571)
@@ -0,0 +1,32 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2010, Red Hat, Inc. and/or its affiliates, and
+ * individual contributors as indicated by the @author tags. See the
+ * copyright.txt file in the distribution for a full listing of
+ * individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.infinispan.server.core.transport;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Galder Zamarreño
+ */
+public enum NoState {
+}
Added: trunk/server/core/src/main/java/org/infinispan/server/core/transport/NoStateDecoder.java
===================================================================
--- trunk/server/core/src/main/java/org/infinispan/server/core/transport/NoStateDecoder.java (rev 0)
+++ trunk/server/core/src/main/java/org/infinispan/server/core/transport/NoStateDecoder.java 2010-03-05 11:00:46 UTC (rev 1571)
@@ -0,0 +1,40 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2010, Red Hat, Inc. and/or its affiliates, and
+ * individual contributors as indicated by the @author tags. See the
+ * copyright.txt file in the distribution for a full listing of
+ * individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.infinispan.server.core.transport;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Galder Zamarreño
+ */
+public abstract class NoStateDecoder implements Decoder<NoState> {
+
+ @Override
+ public Object decode(ChannelHandlerContext ctx, ChannelBuffer buffer, NoState state) throws Exception {
+ return decode(ctx, buffer);
+ }
+
+ public abstract Object decode(ChannelHandlerContext ctx, ChannelBuffer buffer) throws Exception;
+
+}
Copied: trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyDecoder.java (from rev 1566, trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyReplayingDecoder.java)
===================================================================
--- trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyDecoder.java (rev 0)
+++ trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyDecoder.java 2010-03-05 11:00:46 UTC (rev 1571)
@@ -0,0 +1,96 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2010, Red Hat, Inc. and/or its affiliates, and
+ * individual contributors as indicated by the @author tags. See the
+ * copyright.txt file in the distribution for a full listing of
+ * individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.infinispan.server.core.transport.netty;
+
+import org.infinispan.server.core.transport.ChannelBuffer;
+import org.infinispan.server.core.transport.ChannelHandlerContext;
+import org.infinispan.server.core.transport.Decoder;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.handler.codec.replay.ReplayingDecoder;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Galder Zamarreño
+ * @since 4.0
+ */
+public class NettyDecoder<T extends Enum<T>> extends ReplayingDecoder<T> implements Decoder.Checkpointer<T> {
+ final Decoder<T> decoder;
+
+ public NettyDecoder(Decoder<T> decoder) {
+ super(true);
+ this.decoder = decoder;
+ }
+
+ public NettyDecoder(Decoder<T> decoder, T initialState) {
+ super(initialState, true);
+ this.decoder = decoder;
+ }
+
+ @Override
+ protected Object decode(org.jboss.netty.channel.ChannelHandlerContext nCtx, org.jboss.netty.channel.Channel channel,
+ org.jboss.netty.buffer.ChannelBuffer nBuffer, T state) throws Exception {
+ ChannelHandlerContext ctx = new NettyChannelHandlerContext(nCtx);
+ ChannelBuffer buffer = new NettyChannelBuffer(nBuffer);
+ return decoder.decode(ctx, buffer, state);
+ }
+
+ @Override
+ public void checkpoint(T state) {
+ super.checkpoint(state);
+ }
+
+ @Override
+ public void exceptionCaught(org.jboss.netty.channel.ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
+ decoder.exceptionCaught(new NettyChannelHandlerContext(ctx), new NettyExceptionEvent(e));
+ }
+
+ // ReplayingDecoder<T> decoder;
+//
+// public NettyDecoder(ReplayingDecoder<T> decoder) {
+// this.decoder = decoder;
+// }
+//
+// @Override
+// public Object decode(ChannelHandlerContext ctx, ChannelBuffer nettyBuffer, T state) throws Exception {
+// return decode(((NettyChannelHandlerContext) ctx).ctx, ((NettyChannelHandlerContext) ctx).ctx.getChannel(),
+// ((NettyChannelBuffer) nettyBuffer).nettyBuffer, state);
+// }
+//
+// @Override
+// protected Object decode(org.jboss.netty.channel.ChannelHandlerContext ctx, org.jboss.netty.channel.Channel channel,
+// org.jboss.netty.nettyBuffer.ChannelBuffer nettyBuffer, T state) throws Exception {
+// return null;
+// }
+//
+// class AccessorReplayingDecoder<T> extends ReplayingDecoder<T> {
+// ReplayingDecoder<T> decoder;
+//
+// @Override
+// protected Object decode(org.jboss.netty.channel.ChannelHandlerContext ctx, org.jboss.netty.channel.Channel channel, org.jboss.netty.nettyBuffer.ChannelBuffer nettyBuffer, T state) throws Exception {
+// return null; // TODO: Customise this generated block
+// }
+// }
+
+}
Added: trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyNoStateDecoder.java
===================================================================
--- trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyNoStateDecoder.java (rev 0)
+++ trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyNoStateDecoder.java 2010-03-05 11:00:46 UTC (rev 1571)
@@ -0,0 +1,58 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2010, Red Hat, Inc. and/or its affiliates, and
+ * individual contributors as indicated by the @author tags. See the
+ * copyright.txt file in the distribution for a full listing of
+ * individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.infinispan.server.core.transport.netty;
+
+import org.infinispan.server.core.transport.NoState;
+import org.infinispan.server.core.transport.NoStateDecoder;
+import org.infinispan.server.core.transport.ChannelBuffer;
+import org.infinispan.server.core.transport.ChannelHandlerContext;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.handler.codec.replay.ReplayingDecoder;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Galder Zamarreño
+ */
+public class NettyNoStateDecoder extends ReplayingDecoder<NoState> {
+ final NoStateDecoder decoder;
+
+ public NettyNoStateDecoder(NoStateDecoder decoder) {
+ super(true);
+ this.decoder = decoder;
+ }
+
+ @Override
+ protected Object decode(org.jboss.netty.channel.ChannelHandlerContext nCtx, org.jboss.netty.channel.Channel channel,
+ org.jboss.netty.buffer.ChannelBuffer nBuffer, NoState state) throws Exception {
+ ChannelHandlerContext ctx = new NettyChannelHandlerContext(nCtx);
+ ChannelBuffer buffer = new NettyChannelBuffer(nBuffer);
+ return decoder.decode(ctx, buffer);
+ }
+
+ @Override
+ public void exceptionCaught(org.jboss.netty.channel.ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
+ decoder.exceptionCaught(new NettyChannelHandlerContext(ctx), new NettyExceptionEvent(e));
+ }
+}
Deleted: trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyReplayingDecoder.java
===================================================================
--- trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyReplayingDecoder.java 2010-03-05 08:26:28 UTC (rev 1570)
+++ trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyReplayingDecoder.java 2010-03-05 11:00:46 UTC (rev 1571)
@@ -1,96 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2010, Red Hat, Inc. and/or its affiliates, and
- * individual contributors as indicated by the @author tags. See the
- * copyright.txt file in the distribution for a full listing of
- * individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.infinispan.server.core.transport.netty;
-
-import org.infinispan.server.core.transport.ChannelBuffer;
-import org.infinispan.server.core.transport.ChannelHandlerContext;
-import org.infinispan.server.core.transport.Decoder;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.handler.codec.replay.ReplayingDecoder;
-
-/**
- * // TODO: Document this
- *
- * @author Galder Zamarreño
- * @since 4.0
- */
-public class NettyReplayingDecoder<T extends Enum<T>> extends ReplayingDecoder<T> implements Decoder.Checkpointer<T> {
- final Decoder<T> decoder;
-
- public NettyReplayingDecoder(Decoder<T> decoder) {
- super(true);
- this.decoder = decoder;
- }
-
- public NettyReplayingDecoder(Decoder<T> decoder, T initialState) {
- super(initialState, true);
- this.decoder = decoder;
- }
-
- @Override
- protected Object decode(org.jboss.netty.channel.ChannelHandlerContext nCtx, org.jboss.netty.channel.Channel channel,
- org.jboss.netty.buffer.ChannelBuffer nBuffer, T state) throws Exception {
- ChannelHandlerContext ctx = new NettyChannelHandlerContext(nCtx);
- ChannelBuffer buffer = new NettyChannelBuffer(nBuffer);
- return decoder.decode(ctx, buffer, state);
- }
-
- @Override
- public void checkpoint(T state) {
- super.checkpoint(state);
- }
-
- @Override
- public void exceptionCaught(org.jboss.netty.channel.ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
- decoder.exceptionCaught(new NettyChannelHandlerContext(ctx), new NettyExceptionEvent(e));
- }
-
- // ReplayingDecoder<T> decoder;
-//
-// public NettyReplayingDecoder(ReplayingDecoder<T> decoder) {
-// this.decoder = decoder;
-// }
-//
-// @Override
-// public Object decode(ChannelHandlerContext ctx, ChannelBuffer nettyBuffer, T state) throws Exception {
-// return decode(((NettyChannelHandlerContext) ctx).ctx, ((NettyChannelHandlerContext) ctx).ctx.getChannel(),
-// ((NettyChannelBuffer) nettyBuffer).nettyBuffer, state);
-// }
-//
-// @Override
-// protected Object decode(org.jboss.netty.channel.ChannelHandlerContext ctx, org.jboss.netty.channel.Channel channel,
-// org.jboss.netty.nettyBuffer.ChannelBuffer nettyBuffer, T state) throws Exception {
-// return null;
-// }
-//
-// class AccessorReplayingDecoder<T> extends ReplayingDecoder<T> {
-// ReplayingDecoder<T> decoder;
-//
-// @Override
-// protected Object decode(org.jboss.netty.channel.ChannelHandlerContext ctx, org.jboss.netty.channel.Channel channel, org.jboss.netty.nettyBuffer.ChannelBuffer nettyBuffer, T state) throws Exception {
-// return null; // TODO: Customise this generated block
-// }
-// }
-
-}
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CallerCache.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CallerCache.scala 2010-03-05 08:26:28 UTC (rev 1570)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CallerCache.scala 2010-03-05 11:00:46 UTC (rev 1571)
@@ -1,7 +1,8 @@
package org.infinispan.server.hotrod
-import org.infinispan.manager.CacheManager
import org.infinispan.{Cache => InfinispanCache}
+import org.infinispan.manager.{DefaultCacheManager, CacheManager}
+import java.util.concurrent.TimeUnit
/**
* // TODO: Document this
@@ -11,9 +12,17 @@
class CallerCache(val manager: CacheManager) extends Cache {
+ import CallerCache._
+
override def put(c: StorageCommand): Response = {
val cache = getCache(c.cacheName)
- cache.put(new Key(c.key), new Value(c.value))
+ val k = new Key(c.key)
+ val v = new Value(c.value)
+ (c.lifespan, c.maxIdle) match {
+ case (0, 0) => cache.put(k, v)
+ case (x, 0) => cache.put(k, v, toMillis(c.lifespan), TimeUnit.MILLISECONDS)
+ case (x, y) => cache.put(k, v, toMillis(c.lifespan), TimeUnit.MILLISECONDS, c.maxIdle, TimeUnit.SECONDS)
+ }
new Response(OpCodes.PutResponse, c.id, Status.Success)
}
@@ -27,7 +36,30 @@
}
private def getCache(cacheName: String): InfinispanCache[Key, Value] = {
- // TODO: Detect __default cache and call simply getCache()
- manager.getCache(cacheName)
+ if (cacheName == DefaultCacheManager.DEFAULT_CACHE_NAME)
+ manager.getCache[Key, Value]
+ else
+ manager.getCache(cacheName)
}
+
+ /**
+ * Transforms lifespan pass as seconds into milliseconds
+ * following this rule:
+ *
+ * If lifespan is bigger than number of seconds in 30 days,
+ * then it is considered unix time. After converting it to
+ * milliseconds, we substract the current time in and the
+ * result is returned.
+ *
+ * Otherwise it's just considered number of seconds from
+ * now and it's returned in milliseconds unit.
+ */
+ private def toMillis(lifespan: Int) = {
+ if (lifespan > SecondsInAMonth) TimeUnit.SECONDS.toMillis(lifespan) - System.currentTimeMillis
+ else TimeUnit.SECONDS.toMillis(lifespan)
+ }
+}
+
+object CallerCache {
+ private val SecondsInAMonth = 60 * 60 * 24 * 30
}
\ No newline at end of file
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder410.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder410.scala 2010-03-05 08:26:28 UTC (rev 1570)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder410.scala 2010-03-05 11:00:46 UTC (rev 1571)
@@ -1,18 +1,18 @@
package org.infinispan.server.hotrod
-import org.infinispan.server.core.transport.{ExceptionEvent, ChannelHandlerContext, ChannelBuffer, Decoder}
import org.infinispan.server.core.UnknownCommandException
import org.infinispan.server.hotrod.OpCodes._
+import org.infinispan.server.core.transport._
/**
* // TODO: Document this
* @author Galder Zamarreño
* @since 4.1
*/
-class Decoder410 extends Decoder[NoState] {
+class Decoder410 extends NoStateDecoder {
import Decoder410._
- override def decode(ctx: ChannelHandlerContext, buffer: ChannelBuffer, state: NoState): Command = {
+ override def decode(ctx: ChannelHandlerContext, buffer: ChannelBuffer): Command = {
val op = OpCodes.apply(buffer.readUnsignedByte)
val cacheName = buffer.readString
val id = buffer.readUnsignedLong
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/GlobalDecoder.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/GlobalDecoder.scala 2010-03-05 08:26:28 UTC (rev 1570)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/GlobalDecoder.scala 2010-03-05 11:00:46 UTC (rev 1571)
@@ -1,20 +1,20 @@
package org.infinispan.server.hotrod
import java.io.StreamCorruptedException
-import org.infinispan.server.core.transport.{ExceptionEvent, Decoder, ChannelBuffer, ChannelHandlerContext}
+import org.infinispan.server.core.transport._
/**
* // TODO: Document this
* @author Galder Zamarreño
* @since 4.1
*/
-class GlobalDecoder extends Decoder[NoState] {
+class GlobalDecoder extends NoStateDecoder {
import GlobalDecoder._
private val Magic = 0xA0
private val Version410 = 41
- override def decode(ctx: ChannelHandlerContext, buffer: ChannelBuffer, state: NoState): Object = {
+ override def decode(ctx: ChannelHandlerContext, buffer: ChannelBuffer): Object = {
val magic = buffer.readUnsignedByte()
if (magic != Magic) {
throw new StreamCorruptedException("Magic byte incorrect: " + magic)
@@ -26,7 +26,7 @@
case Version410 => new Decoder410
case _ => throw new StreamCorruptedException("Unknown version:" + version)
}
- val command = decoder.decode(ctx, buffer, state)
+ val command = decoder.decode(ctx, buffer)
trace("Decoded msg {0}", command)
command
}
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala 2010-03-05 08:26:28 UTC (rev 1570)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala 2010-03-05 11:00:46 UTC (rev 1571)
@@ -3,7 +3,7 @@
import org.infinispan.manager.CacheManager
import java.net.InetSocketAddress
import org.infinispan.server.core.Server
-import org.infinispan.server.core.transport.netty.{NettyEncoder, NettyServer, NettyReplayingDecoder}
+import org.infinispan.server.core.transport.netty.{NettyNoStateDecoder, NettyEncoder, NettyServer, NettyDecoder}
/**
* // TODO: Document this
@@ -23,7 +23,7 @@
def start {
val decoder = new GlobalDecoder
- val nettyDecoder = new NettyReplayingDecoder[NoState](decoder)
+ val nettyDecoder = new NettyNoStateDecoder(decoder)
val encoder = new Encoder410
val nettyEncoder = new NettyEncoder(encoder)
val commandHandler = new Handler(new CallerCache(manager))
Added: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Key.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Key.scala (rev 0)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Key.scala 2010-03-05 11:00:46 UTC (rev 1571)
@@ -0,0 +1,28 @@
+package org.infinispan.server.hotrod
+
+import java.util.Arrays
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+
+final class Key(val k: Array[Byte]) {
+
+ override def equals(obj: Any) = {
+ obj match {
+ case k: Key => Arrays.equals(k.k, this.k)
+ case _ => false
+ }
+ }
+
+ override def hashCode: Int = 41 + Arrays.hashCode(k)
+
+ override def toString = {
+ new StringBuilder().append("Key").append("{")
+ .append("k=").append(k)
+ .append("}").toString
+ }
+
+}
\ No newline at end of file
Deleted: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/NoState.java
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/NoState.java 2010-03-05 08:26:28 UTC (rev 1570)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/NoState.java 2010-03-05 11:00:46 UTC (rev 1571)
@@ -1,33 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2010, Red Hat, Inc. and/or its affiliates, and
- * individual contributors as indicated by the @author tags. See the
- * copyright.txt file in the distribution for a full listing of
- * individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.infinispan.server.hotrod;
-
-/**
- * // TODO: Document this
- *
- * @author Galder Zamarreño
- * @since 4.0
- */
-public enum NoState {
-}
Added: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/RetrievalResponse.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/RetrievalResponse.scala (rev 0)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/RetrievalResponse.scala 2010-03-05 11:00:46 UTC (rev 1571)
@@ -0,0 +1,23 @@
+package org.infinispan.server.hotrod
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+
+class RetrievalResponse(override val opCode: OpCodes.OpCode,
+ override val id: Long,
+ override val status: Status.Status,
+ val value: Array[Byte]) extends Response(opCode, id, status) {
+
+ override def toString = {
+ new StringBuilder().append("RetrievalResponse").append("{")
+ .append("opCode=").append(opCode)
+ .append(", id=").append(id)
+ .append(", status=").append(status)
+ .append(", value=").append(value)
+ .append("}").toString
+ }
+
+}
\ No newline at end of file
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/StorageCommand.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/StorageCommand.scala 2010-03-05 08:26:28 UTC (rev 1570)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/StorageCommand.scala 2010-03-05 11:00:46 UTC (rev 1571)
@@ -7,15 +7,6 @@
* @author Galder Zamarreño
* @since 4.1
*/
-//class StorageCommand(val cacheName: String,
-// val id: Long,
-// val key: Array[Byte],
-// val lifespan: Int,
-// val maxIdle: Int,
-// val value: Array[Byte],
-// val flags: Set[Flag])
-// (val op: (Cache, StorageCommand) => Response)
-
class StorageCommand(override val cacheName: String,
override val id: Long,
val key: Array[Byte],
@@ -29,4 +20,15 @@
op(cache, this)
}
+ override def toString = {
+ new StringBuilder().append("StorageCommand").append("{")
+ .append("cacheName=").append(cacheName)
+ .append(", id=").append(id)
+ .append(", key=").append(key)
+ .append(", lifespan=").append(lifespan)
+ .append(", maxIdle=").append(maxIdle)
+ .append(", value=").append(value)
+ .append(", flags=").append(flags)
+ .append("}").toString
+ }
}
\ No newline at end of file
Added: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Value.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Value.scala (rev 0)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Value.scala 2010-03-05 11:00:46 UTC (rev 1571)
@@ -0,0 +1,17 @@
+package org.infinispan.server.hotrod
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+
+final class Value(val v: Array[Byte]) {
+
+ override def toString = {
+ new StringBuilder().append("Value").append("{")
+ .append("v=").append(v)
+ .append("}").toString
+ }
+
+}
\ No newline at end of file
Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FunctionalTest.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FunctionalTest.scala 2010-03-05 08:26:28 UTC (rev 1570)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FunctionalTest.scala 2010-03-05 11:00:46 UTC (rev 1571)
@@ -2,7 +2,6 @@
import org.infinispan.test.SingleCacheManagerTest
import org.infinispan.test.fwk.TestCacheManagerFactory
-import org.infinispan.manager.CacheManager
import org.testng.annotations.{AfterClass, Test}
import java.lang.reflect.Method
import test.{Client, Utils}
@@ -10,6 +9,8 @@
import org.infinispan.server.hotrod.Status._
import java.util.Arrays
import org.jboss.netty.channel.Channel
+import org.infinispan.manager.{DefaultCacheManager, CacheManager}
+import org.infinispan.{Cache => InfinispanCache}
/**
* TODO: Document
@@ -38,22 +39,49 @@
}
def testPutBasic(m: Method) {
- val status = put(ch, "__default", k(m) , 0, 0, v(m))
+ val status = doPut(m)
assertSuccess(status)
}
+ def testPutOnDefaultCache(m: Method) {
+ val status = put(ch, DefaultCacheManager.DEFAULT_CACHE_NAME, k(m) , 0, 0, v(m))
+ assertSuccess(status)
+ val cache: InfinispanCache[Key, Value] = cacheManager.getCache[Key, Value]
+ assertTrue(Arrays.equals(cache.get(new Key(k(m))).v, v(m)));
+ }
+
+ def testPutWithLifespan(m: Method) {
+ val status = doPutWithLifespanMaxIdle(m, 1, 0)
+ assertSuccess(status)
+ Thread.sleep(1100)
+ val (getSt, actual) = doGet(m)
+ assertKeyDoesNotExist(getSt, actual)
+ }
+
+ def testPutWithMaxIdle(m: Method) {
+ val status = doPutWithLifespanMaxIdle(m, 0, 1)
+ assertSuccess(status)
+ Thread.sleep(1100)
+ val (getSt, actual) = doGet(m)
+ assertKeyDoesNotExist(getSt, actual)
+ }
+
def testGetBasic(m: Method) {
- val putSt = put(ch, "__default", k(m) , 0, 0, v(m))
+ val putSt = doPut(m)
assertSuccess(putSt)
- val (getSt, actual) = get(ch, "__default", k(m))
+ val (getSt, actual) = doGet(m)
assertSuccess(getSt, v(m), actual)
}
def testGetDoesNotExist(m: Method) {
- val (getSt, actual) = get(ch, "__default", k(m))
+ val (getSt, actual) = doGet(m)
assertKeyDoesNotExist(getSt, actual)
}
+// def testGetWithWriteLock(m: Method) {
+// // TODO
+// }
+
private def assertSuccess(status: Status.Status) {
assertTrue(status == Success, "Status should have been 'Success' but instead was: " + status)
}
@@ -68,6 +96,18 @@
assertNull(actual)
}
+ private def doPut(m: Method): Status = {
+ doPutWithLifespanMaxIdle(m, 0, 0)
+ }
+
+ private def doPutWithLifespanMaxIdle(m: Method, lifespan: Int, maxIdle: Int): Status = {
+ put(ch, "hotrod-cache", k(m) , lifespan, maxIdle, v(m))
+ }
+
+ private def doGet(m: Method) = {
+ get(ch, "hotrod-cache", k(m))
+ }
+
@AfterClass(alwaysRun = true)
override def destroyAfterClass {
super.destroyAfterClass
Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Client.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Client.scala 2010-03-05 08:26:28 UTC (rev 1570)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Client.scala 2010-03-05 11:00:46 UTC (rev 1571)
@@ -13,6 +13,9 @@
import org.infinispan.server.hotrod._
import org.infinispan.server.hotrod.OpCodes._
import org.infinispan.server.hotrod.Status._
+import java.util.concurrent.atomic.AtomicInteger
+import org.infinispan.server.core.transport.NoState
+import org.jboss.netty.channel.ChannelHandler.Sharable
/**
* // TODO: Document this
@@ -62,7 +65,7 @@
}
- at ChannelPipelineCoverage("all")
+ at Sharable
private object ClientPipelineFactory extends ChannelPipelineFactory {
override def getPipeline() = {
@@ -75,9 +78,11 @@
}
- at ChannelPipelineCoverage("all")
+ at Sharable
private object Encoder extends OneToOneEncoder {
+ private val idCounter: AtomicInteger = new AtomicInteger
+
override def encode(ctx: ChannelHandlerContext, ch: Channel, msg: Any) = {
val ret =
msg match {
@@ -87,7 +92,7 @@
buffer.writeByte(41) // version
buffer.writeByte(op.code) // opcode
buffer.writeRangedBytes(op.cacheName.getBytes()) // cache name length + cache name
- buffer.writeUnsignedLong(1) // message id
+ buffer.writeUnsignedLong(idCounter.incrementAndGet) // message id
buffer.writeUnsignedInt(0) // flags
buffer.writeRangedBytes(op.key) // key length + key
if (op.value != null) {
Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Utils.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Utils.scala 2010-03-05 08:26:28 UTC (rev 1570)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Utils.scala 2010-03-05 11:00:46 UTC (rev 1571)
@@ -3,7 +3,7 @@
import java.util.concurrent.atomic.AtomicInteger
import org.infinispan.manager.CacheManager
import java.lang.reflect.Method
-import org.infinispan.server.hotrod.HotRodServer
+import org.infinispan.server.hotrod.{Logging, HotRodServer}
/**
* // TODO: Document this
@@ -12,6 +12,9 @@
*/
trait Utils {
+
+ import Utils._
+
def host = "127.0.0.1"
def createHotRodServer(manager: CacheManager): HotRodServer = {
@@ -19,23 +22,21 @@
}
def k(m: Method, prefix: String): Array[Byte] = {
- (prefix + m.getName) getBytes
+ val bytes: Array[Byte] = (prefix + m.getName).getBytes
+ trace("String {0} is converted to {1} bytes", prefix + m.getName, bytes)
+ bytes
}
- def v(m: Method, prefix: String): Array[Byte] = {
- k(m, prefix)
- }
+ def v(m: Method, prefix: String): Array[Byte] = k(m, prefix)
- def k(m: Method): Array[Byte] = {
- k(m, "k-")
- }
+ def k(m: Method): Array[Byte] = k(m, "k-")
- def v(m: Method): Array[Byte] = {
- v(m, "v-")
- }
+ def v(m: Method): Array[Byte] = v(m, "v-")
- }
+}
+object Utils extends Logging
+
object UniquePortThreadLocal extends ThreadLocal[Int] {
private val uniqueAddr = new AtomicInteger(21212)
override def initialValue: Int = uniqueAddr.getAndAdd(100)
Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/TextServer.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/TextServer.java 2010-03-05 08:26:28 UTC (rev 1570)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/TextServer.java 2010-03-05 11:00:46 UTC (rev 1571)
@@ -30,7 +30,7 @@
import org.infinispan.Cache;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.server.core.Server;
-import org.infinispan.server.core.transport.netty.NettyReplayingDecoder;
+import org.infinispan.server.core.transport.netty.NettyDecoder;
import org.infinispan.server.core.transport.netty.NettyServer;
import org.infinispan.server.memcached.transport.TextDecoder;
import org.infinispan.server.core.InterceptorChain;
@@ -84,7 +84,7 @@
public void start() throws Exception {
InterceptorChain chain = TextProtocolInterceptorChainFactory.getInstance(cache).buildInterceptorChain();
TextDecoder decoder = new TextDecoder(cache, chain, scheduler);
- NettyReplayingDecoder nettyDecoder = new NettyReplayingDecoder<TextDecoder.State>(decoder,
+ NettyDecoder nettyDecoder = new NettyDecoder<TextDecoder.State>(decoder,
TextDecoder.State.READ_COMMAND);
decoder.setCheckpointer(nettyDecoder);
Modified: trunk/server/memcached/src/test/java/org/infinispan/server/memcached/FunctionalTest.java
===================================================================
--- trunk/server/memcached/src/test/java/org/infinispan/server/memcached/FunctionalTest.java 2010-03-05 08:26:28 UTC (rev 1570)
+++ trunk/server/memcached/src/test/java/org/infinispan/server/memcached/FunctionalTest.java 2010-03-05 11:00:46 UTC (rev 1571)
@@ -69,6 +69,7 @@
@AfterClass(alwaysRun=true)
protected void destroyAfterClass() {
+ super.destroyAfterClass();
log.debug("Test finished, close memcached server");
server.stop();
}
Modified: trunk/server/memcached/src/test/java/org/infinispan/server/memcached/StatsTest.java
===================================================================
--- trunk/server/memcached/src/test/java/org/infinispan/server/memcached/StatsTest.java 2010-03-05 08:26:28 UTC (rev 1570)
+++ trunk/server/memcached/src/test/java/org/infinispan/server/memcached/StatsTest.java 2010-03-05 11:00:46 UTC (rev 1571)
@@ -65,6 +65,7 @@
@AfterClass(alwaysRun=true)
protected void destroyAfterClass() {
+ super.destroyAfterClass();
server.stop();
}
More information about the infinispan-commits
mailing list