[infinispan-commits] Infinispan SVN: r1557 - in trunk/server: core/src/main/java/org/infinispan/server/core/transport/netty and 13 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Fri Feb 26 13:20:48 EST 2010
Author: galder.zamarreno at jboss.com
Date: 2010-02-26 13:20:47 -0500 (Fri, 26 Feb 2010)
New Revision: 1557
Added:
trunk/server/hotrod/src/main/scala/
trunk/server/hotrod/src/main/scala/org/
trunk/server/hotrod/src/main/scala/org/infinispan/
trunk/server/hotrod/src/main/scala/org/infinispan/server/
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Cache.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CallerCache.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Command.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder410.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/EncodedData.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Flags.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/GlobalDecoder.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Handler.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Logging.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/NoState.java
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Reply.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/RetrievalCommand.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/StatsCache.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/StorageCommand.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/VInt.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/VLong.scala
trunk/server/hotrod/src/test/scala/
trunk/server/hotrod/src/test/scala/org/
trunk/server/hotrod/src/test/scala/org/infinispan/
trunk/server/hotrod/src/test/scala/org/infinispan/server/
trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/
trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FlagsTest.scala
trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FunctionalTest.scala
trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/VariableLengthTest.scala
trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/
trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Client.scala
trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Utils.scala
Modified:
trunk/server/core/src/main/java/org/infinispan/server/core/transport/ChannelBuffer.java
trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyChannel.java
trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyChannelBuffer.java
trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyChannelBuffers.java
trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyReplayingDecoder.java
Log:
[ISPN-171] (Build a server module based on the HotRod protocol) A very preliminary HotRod version that simply decodes a put command fine. Committing to make sure this does not get lost. More to come...
Modified: trunk/server/core/src/main/java/org/infinispan/server/core/transport/ChannelBuffer.java
===================================================================
--- trunk/server/core/src/main/java/org/infinispan/server/core/transport/ChannelBuffer.java 2010-02-26 18:11:48 UTC (rev 1556)
+++ trunk/server/core/src/main/java/org/infinispan/server/core/transport/ChannelBuffer.java 2010-02-26 18:20:47 UTC (rev 1557)
@@ -31,4 +31,19 @@
public interface ChannelBuffer {
byte readByte();
void readBytes(byte[] dst, int dstIndex, int length);
+ int readableBytes();
+ short readUnsignedByte();
+ ChannelBuffer readBytes(int length);
+ void resetReaderIndex();
+ int readerIndex();
+ void readBytes(byte[] dst);
+
+ void writeByte(byte value);
+ void writeBytes(byte[] src);
+ int writerIndex();
+
+ Object getUnderlyingChannelBuffer();
+
+ // TODO: Add read/write methods for reading and writing variable length numbers,
+ // TODO: that way abstracting VInt and VLong
}
Modified: trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyChannel.java
===================================================================
--- trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyChannel.java 2010-02-26 18:11:48 UTC (rev 1556)
+++ trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyChannel.java 2010-02-26 18:20:47 UTC (rev 1557)
@@ -23,6 +23,7 @@
package org.infinispan.server.core.transport.netty;
import org.infinispan.server.core.transport.Channel;
+import org.infinispan.server.core.transport.ChannelBuffer;
import org.infinispan.server.core.transport.ChannelFuture;
/**
@@ -45,6 +46,12 @@
@Override
public ChannelFuture write(Object message) {
+ if (message instanceof ChannelBuffer) {
+ // If we get an Infinispan channel buffer abstraction, convert it to something netty can understand
+// message = new NettyChannelBufferAdapter((NettyChannelBuffer) message);
+ message = ((ChannelBuffer) message).getUnderlyingChannelBuffer();
+// message = ((NettyChannelBuffer) message).nettyBuffer;
+ }
return new NettyChannelFuture(ch.write(message), this);
}
Modified: trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyChannelBuffer.java
===================================================================
--- trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyChannelBuffer.java 2010-02-26 18:11:48 UTC (rev 1556)
+++ trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyChannelBuffer.java 2010-02-26 18:20:47 UTC (rev 1557)
@@ -22,17 +22,7 @@
*/
package org.infinispan.server.core.transport.netty;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.nio.channels.GatheringByteChannel;
-import java.nio.channels.ScatteringByteChannel;
-
import org.infinispan.server.core.transport.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBufferFactory;
-import org.jboss.netty.buffer.ChannelBufferIndexFinder;
/**
* NettyChannelBuffer.
@@ -40,521 +30,531 @@
* @author Galder Zamarreño
* @since 4.0
*/
-public class NettyChannelBuffer implements ChannelBuffer, org.jboss.netty.buffer.ChannelBuffer {
- final org.jboss.netty.buffer.ChannelBuffer buffer;
+public class NettyChannelBuffer implements ChannelBuffer /*, org.jboss.netty.nettyBuffer.ChannelBuffer*/ {
+ private final org.jboss.netty.buffer.ChannelBuffer nettyBuffer;
- public NettyChannelBuffer(org.jboss.netty.buffer.ChannelBuffer buffer) {
- this.buffer = buffer;
+ public NettyChannelBuffer(org.jboss.netty.buffer.ChannelBuffer nettyBuffer) {
+ this.nettyBuffer = nettyBuffer;
}
+
+ @Override
+ public org.jboss.netty.buffer.ChannelBuffer getUnderlyingChannelBuffer() {
+ return nettyBuffer;
+ }
@Override
public byte readByte() {
- return buffer.readByte();
+ return nettyBuffer.readByte();
}
@Override
public void readBytes(byte[] dst, int dstIndex, int length) {
- buffer.readBytes(dst, dstIndex, length);
+ nettyBuffer.readBytes(dst, dstIndex, length);
}
- @Override
- public int compareTo(org.jboss.netty.buffer.ChannelBuffer o) {
- return buffer.compareTo(o);
- }
+// @Override
+// public int compareTo(org.jboss.netty.nettyBuffer.ChannelBuffer o) {
+// return nettyBuffer.compareTo(o);
+// }
+//
+// @Override
+// public int capacity() {
+// return nettyBuffer.capacity();
+// }
+//
+// @Override
+// public void clear() {
+// nettyBuffer.clear();
+// }
+//
+// @Override
+// public org.jboss.netty.nettyBuffer.ChannelBuffer copy() {
+// return nettyBuffer.copy();
+// }
+//
+// @Override
+// public org.jboss.netty.nettyBuffer.ChannelBuffer copy(int index, int length) {
+// return nettyBuffer.copy(index, length);
+// }
+//
+// @Override
+// public void discardReadBytes() {
+// nettyBuffer.discardReadBytes();
+// }
+//
+// @Override
+// public org.jboss.netty.nettyBuffer.ChannelBuffer duplicate() {
+// return nettyBuffer.duplicate();
+// }
+//
+// @Override
+// public ChannelBufferFactory factory() {
+// return nettyBuffer.factory();
+// }
+//
+// @Override
+// public byte getByte(int index) {
+// return nettyBuffer.getByte(index);
+// }
+//
+// @Override
+// public void getBytes(int index, org.jboss.netty.nettyBuffer.ChannelBuffer dst) {
+// nettyBuffer.getBytes(index, dst);
+// }
+//
+// @Override
+// public void getBytes(int index, byte[] dst) {
+// nettyBuffer.getBytes(index, dst);
+// }
+//
+// @Override
+// public void getBytes(int index, ByteBuffer dst) {
+// nettyBuffer.getBytes(index, dst);
+// }
+//
+// @Override
+// public void getBytes(int index, org.jboss.netty.nettyBuffer.ChannelBuffer dst, int length) {
+// nettyBuffer.getBytes(index, dst, length);
+// }
+//
+// @Override
+// public void getBytes(int index, OutputStream out, int length) throws IOException {
+// nettyBuffer.getBytes(index, out, length);
+// }
+//
+// @Override
+// public int getBytes(int index, GatheringByteChannel out, int length) throws IOException {
+// return nettyBuffer.getBytes(index, out, length);
+// }
+//
+// @Override
+// public void getBytes(int index, org.jboss.netty.nettyBuffer.ChannelBuffer dst, int dstIndex, int length) {
+// nettyBuffer.getBytes(index, dst, dstIndex, length);
+// }
+//
+// @Override
+// public void getBytes(int index, byte[] dst, int dstIndex, int length) {
+// nettyBuffer.getBytes(index, dst, dstIndex, length);
+// }
+//
+// @Override
+// public int getInt(int index) {
+// return nettyBuffer.getInt(index);
+// }
+//
+// @Override
+// public long getLong(int index) {
+// return nettyBuffer.getLong(index);
+// }
+//
+// @Override
+// public int getMedium(int index) {
+// return nettyBuffer.getMedium(index);
+// }
+//
+// @Override
+// public short getShort(int index) {
+// return nettyBuffer.getShort(index);
+// }
+//
+// @Override
+// public short getUnsignedByte(int index) {
+// return nettyBuffer.getUnsignedByte(index);
+// }
+//
+// @Override
+// public long getUnsignedInt(int index) {
+// return nettyBuffer.getUnsignedInt(index);
+// }
+//
+// @Override
+// public int getUnsignedMedium(int index) {
+// return nettyBuffer.getUnsignedMedium(index);
+// }
+//
+// @Override
+// public int getUnsignedShort(int index) {
+// return nettyBuffer.getUnsignedShort(index);
+// }
+//
+// @Override
+// public int indexOf(int fromIndex, int toIndex, byte value) {
+// return nettyBuffer.indexOf(fromIndex, toIndex, value);
+// }
+//
+// @Override
+// public int indexOf(int fromIndex, int toIndex, ChannelBufferIndexFinder indexFinder) {
+// return nettyBuffer.indexOf(fromIndex, toIndex, indexFinder);
+// }
+//
+// @Override
+// public void markReaderIndex() {
+// nettyBuffer.markReaderIndex();
+// }
+//
+// @Override
+// public void markWriterIndex() {
+// nettyBuffer.markWriterIndex();
+// }
+//
+// @Override
+// public ByteOrder order() {
+// return nettyBuffer.order();
+// }
+//
+ @Override
+ public ChannelBuffer readBytes(int length) {
+ return new NettyChannelBuffer(nettyBuffer.readBytes(length));
+ }
+//
+// @Override
+// public org.jboss.netty.nettyBuffer.ChannelBuffer readBytes(ChannelBufferIndexFinder indexFinder) {
+// return nettyBuffer.readBytes(indexFinder);
+// }
+//
+// @Override
+// public void readBytes(org.jboss.netty.nettyBuffer.ChannelBuffer dst) {
+// nettyBuffer.readBytes(dst);
+// }
+//
@Override
- public int capacity() {
- return buffer.capacity();
- }
-
- @Override
- public void clear() {
- buffer.clear();
- }
-
- @Override
- public org.jboss.netty.buffer.ChannelBuffer copy() {
- return buffer.copy();
- }
-
- @Override
- public org.jboss.netty.buffer.ChannelBuffer copy(int index, int length) {
- return buffer.copy(index, length);
- }
-
- @Override
- public void discardReadBytes() {
- buffer.discardReadBytes();
- }
-
- @Override
- public org.jboss.netty.buffer.ChannelBuffer duplicate() {
- return buffer.duplicate();
- }
-
- @Override
- public ChannelBufferFactory factory() {
- return buffer.factory();
- }
-
- @Override
- public byte getByte(int index) {
- return buffer.getByte(index);
- }
-
- @Override
- public void getBytes(int index, org.jboss.netty.buffer.ChannelBuffer dst) {
- buffer.getBytes(index, dst);
- }
-
- @Override
- public void getBytes(int index, byte[] dst) {
- buffer.getBytes(index, dst);
- }
-
- @Override
- public void getBytes(int index, ByteBuffer dst) {
- buffer.getBytes(index, dst);
- }
-
- @Override
- public void getBytes(int index, org.jboss.netty.buffer.ChannelBuffer dst, int length) {
- buffer.getBytes(index, dst, length);
- }
-
- @Override
- public void getBytes(int index, OutputStream out, int length) throws IOException {
- buffer.getBytes(index, out, length);
- }
-
- @Override
- public int getBytes(int index, GatheringByteChannel out, int length) throws IOException {
- return buffer.getBytes(index, out, length);
- }
-
- @Override
- public void getBytes(int index, org.jboss.netty.buffer.ChannelBuffer dst, int dstIndex, int length) {
- buffer.getBytes(index, dst, dstIndex, length);
- }
-
- @Override
- public void getBytes(int index, byte[] dst, int dstIndex, int length) {
- buffer.getBytes(index, dst, dstIndex, length);
- }
-
- @Override
- public int getInt(int index) {
- return buffer.getInt(index);
- }
-
- @Override
- public long getLong(int index) {
- return buffer.getLong(index);
- }
-
- @Override
- public int getMedium(int index) {
- return buffer.getMedium(index);
- }
-
- @Override
- public short getShort(int index) {
- return buffer.getShort(index);
- }
-
- @Override
- public short getUnsignedByte(int index) {
- return buffer.getUnsignedByte(index);
- }
-
- @Override
- public long getUnsignedInt(int index) {
- return buffer.getUnsignedInt(index);
- }
-
- @Override
- public int getUnsignedMedium(int index) {
- return buffer.getUnsignedMedium(index);
- }
-
- @Override
- public int getUnsignedShort(int index) {
- return buffer.getUnsignedShort(index);
- }
-
- @Override
- public int indexOf(int fromIndex, int toIndex, byte value) {
- return buffer.indexOf(fromIndex, toIndex, value);
- }
-
- @Override
- public int indexOf(int fromIndex, int toIndex, ChannelBufferIndexFinder indexFinder) {
- return buffer.indexOf(fromIndex, toIndex, indexFinder);
- }
-
- @Override
- public void markReaderIndex() {
- buffer.markReaderIndex();
- }
-
- @Override
- public void markWriterIndex() {
- buffer.markWriterIndex();
- }
-
- @Override
- public ByteOrder order() {
- return buffer.order();
- }
-
- @Override
- public org.jboss.netty.buffer.ChannelBuffer readBytes(int length) {
- return buffer.readBytes(length);
- }
-
- @Override
- public org.jboss.netty.buffer.ChannelBuffer readBytes(ChannelBufferIndexFinder indexFinder) {
- return buffer.readBytes(indexFinder);
- }
-
- @Override
- public void readBytes(org.jboss.netty.buffer.ChannelBuffer dst) {
- buffer.readBytes(dst);
- }
-
- @Override
public void readBytes(byte[] dst) {
- buffer.readBytes(dst);
+ nettyBuffer.readBytes(dst);
}
+
+//
+// @Override
+// public void readBytes(ByteBuffer dst) {
+// nettyBuffer.readBytes(dst);
+// }
+//
+// @Override
+// public void readBytes(org.jboss.netty.nettyBuffer.ChannelBuffer dst, int length) {
+// nettyBuffer.readBytes(dst, length);
+// }
+//
+// @Override
+// public void readBytes(OutputStream out, int length) throws IOException {
+// nettyBuffer.readBytes(out, length);
+// }
+//
+// @Override
+// public int readBytes(GatheringByteChannel out, int length) throws IOException {
+// return nettyBuffer.readBytes(out, length);
+// }
+//
+// @Override
+// public void readBytes(org.jboss.netty.nettyBuffer.ChannelBuffer dst, int dstIndex, int length) {
+// nettyBuffer.readBytes(dst, dstIndex, length);
+// }
+//
+// @Override
+// public int readInt() {
+// return nettyBuffer.readInt();
+// }
+//
+// @Override
+// public long readLong() {
+// return nettyBuffer.readLong();
+// }
+//
+// @Override
+// public int readMedium() {
+// return nettyBuffer.readMedium();
+// }
+//
+// @Override
+// public short readShort() {
+// return nettyBuffer.readShort();
+// }
+//
+// @Override
+// public org.jboss.netty.nettyBuffer.ChannelBuffer readSlice(int length) {
+// return nettyBuffer.readSlice(length);
+// }
+//
+// @Override
+// public org.jboss.netty.nettyBuffer.ChannelBuffer readSlice(ChannelBufferIndexFinder indexFinder) {
+// return nettyBuffer.readSlice(indexFinder);
+// }
+//
+ @Override
+ public short readUnsignedByte() {
+ return nettyBuffer.readUnsignedByte();
+ }
- @Override
- public void readBytes(ByteBuffer dst) {
- buffer.readBytes(dst);
- }
+//
+// @Override
+// public long readUnsignedInt() {
+// return nettyBuffer.readUnsignedInt();
+// }
+//
+// @Override
+// public int readUnsignedMedium() {
+// return nettyBuffer.readUnsignedMedium();
+// }
+//
+// @Override
+// public int readUnsignedShort() {
+// return nettyBuffer.readUnsignedShort();
+// }
+//
+// @Override
+// public boolean readable() {
+// return nettyBuffer.readable();
+// }
+//
+ @Override
+ public int readableBytes() {
+ return nettyBuffer.readableBytes();
+ }
@Override
- public void readBytes(org.jboss.netty.buffer.ChannelBuffer dst, int length) {
- buffer.readBytes(dst, length);
- }
-
- @Override
- public void readBytes(OutputStream out, int length) throws IOException {
- buffer.readBytes(out, length);
- }
-
- @Override
- public int readBytes(GatheringByteChannel out, int length) throws IOException {
- return buffer.readBytes(out, length);
- }
-
- @Override
- public void readBytes(org.jboss.netty.buffer.ChannelBuffer dst, int dstIndex, int length) {
- buffer.readBytes(dst, dstIndex, length);
- }
-
- @Override
- public int readInt() {
- return buffer.readInt();
- }
-
- @Override
- public long readLong() {
- return buffer.readLong();
- }
-
- @Override
- public int readMedium() {
- return buffer.readMedium();
- }
-
- @Override
- public short readShort() {
- return buffer.readShort();
- }
-
- @Override
- public org.jboss.netty.buffer.ChannelBuffer readSlice(int length) {
- return buffer.readSlice(length);
- }
-
- @Override
- public org.jboss.netty.buffer.ChannelBuffer readSlice(ChannelBufferIndexFinder indexFinder) {
- return buffer.readSlice(indexFinder);
- }
-
- @Override
- public short readUnsignedByte() {
- return buffer.readUnsignedByte();
- }
-
- @Override
- public long readUnsignedInt() {
- return buffer.readUnsignedInt();
- }
-
- @Override
- public int readUnsignedMedium() {
- return buffer.readUnsignedMedium();
- }
-
- @Override
- public int readUnsignedShort() {
- return buffer.readUnsignedShort();
- }
-
- @Override
- public boolean readable() {
- return buffer.readable();
- }
-
- @Override
- public int readableBytes() {
- return buffer.readableBytes();
- }
-
- @Override
public int readerIndex() {
- return buffer.readerIndex();
+ return nettyBuffer.readerIndex();
}
+//
+// @Override
+// public void readerIndex(int readerIndex) {
+// nettyBuffer.readerIndex(readerIndex);
+// }
+//
+ @Override
+ public void resetReaderIndex() {
+ nettyBuffer.resetReaderIndex();
+ }
+//
+// @Override
+// public void resetWriterIndex() {
+// nettyBuffer.resetWriterIndex();
+// }
+//
+// @Override
+// public void setByte(int index, byte value) {
+// nettyBuffer.setByte(index, value);
+// }
+//
+// @Override
+// public void setBytes(int index, org.jboss.netty.nettyBuffer.ChannelBuffer src) {
+// nettyBuffer.setBytes(index, src);
+// }
+//
+// @Override
+// public void setBytes(int index, byte[] src) {
+// nettyBuffer.setBytes(index, src);
+// }
+//
+// @Override
+// public void setBytes(int index, ByteBuffer src) {
+// nettyBuffer.setBytes(index, src);
+// }
+//
+// @Override
+// public void setBytes(int index, org.jboss.netty.nettyBuffer.ChannelBuffer src, int length) {
+// nettyBuffer.setBytes(index, src, length);
+// }
+//
+// @Override
+// public int setBytes(int index, InputStream in, int length) throws IOException {
+// return nettyBuffer.setBytes(index, in, length);
+// }
+//
+// @Override
+// public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
+// return nettyBuffer.setBytes(index, in, length);
+// }
+//
+// @Override
+// public void setBytes(int index, org.jboss.netty.nettyBuffer.ChannelBuffer src, int srcIndex, int length) {
+// nettyBuffer.setBytes(index, src, srcIndex, length);
+// }
+//
+// @Override
+// public void setBytes(int index, byte[] src, int srcIndex, int length) {
+// nettyBuffer.setBytes(index, src, srcIndex, length);
+// }
+//
+// @Override
+// public void setIndex(int readerIndex, int writerIndex) {
+// nettyBuffer.setIndex(readerIndex, writerIndex);
+// }
+//
+// @Override
+// public void setInt(int index, int value) {
+// nettyBuffer.setInt(index, value);
+// }
+//
+// @Override
+// public void setLong(int index, long value) {
+// nettyBuffer.setLong(index, value);
+// }
+//
+// @Override
+// public void setMedium(int index, int value) {
+// nettyBuffer.setMedium(index, value);
+// }
+//
+// @Override
+// public void setShort(int index, short value) {
+// nettyBuffer.setShort(index, value);
+// }
+//
+// @Override
+// public void setZero(int index, int length) {
+// nettyBuffer.setZero(index, length);
+// }
+//
+// @Override
+// public void skipBytes(int length) {
+// nettyBuffer.skipBytes(length);
+// }
+//
+// @Override
+// public int skipBytes(ChannelBufferIndexFinder indexFinder) {
+// return nettyBuffer.skipBytes(indexFinder);
+// }
+//
+// @Override
+// public org.jboss.netty.nettyBuffer.ChannelBuffer slice() {
+// return nettyBuffer.slice();
+// }
+//
+// @Override
+// public org.jboss.netty.nettyBuffer.ChannelBuffer slice(int index, int length) {
+// return nettyBuffer.slice(index, length);
+// }
+//
+// @Override
+// public ByteBuffer toByteBuffer() {
+// return nettyBuffer.toByteBuffer();
+// }
+//
+// @Override
+// public ByteBuffer toByteBuffer(int index, int length) {
+// return nettyBuffer.toByteBuffer(index, length);
+// }
+//
+// @Override
+// public ByteBuffer[] toByteBuffers() {
+// return nettyBuffer.toByteBuffers();
+// }
+//
+// @Override
+// public ByteBuffer[] toByteBuffers(int index, int length) {
+// return nettyBuffer.toByteBuffers(index, length);
+// }
+//
+// @Override
+// public String toString(String charsetName) {
+// return nettyBuffer.toString(charsetName);
+// }
+//
+// @Override
+// public String toString(String charsetName, ChannelBufferIndexFinder terminatorFinder) {
+// return nettyBuffer.toString(charsetName, terminatorFinder);
+// }
+//
+// @Override
+// public String toString(int index, int length, String charsetName) {
+// return nettyBuffer.toString(index, length, charsetName);
+// }
+//
+// @Override
+// public String toString(int index, int length, String charsetName, ChannelBufferIndexFinder terminatorFinder) {
+// return nettyBuffer.toString(charsetName, terminatorFinder);
+// }
+//
+// @Override
+// public boolean writable() {
+// return nettyBuffer.writable();
+// }
+//
+// @Override
+// public int writableBytes() {
+// return nettyBuffer.writableBytes();
+// }
+//
+ @Override
+ public void writeByte(byte value) {
+ nettyBuffer.writeByte(value);
+ }
- @Override
- public void readerIndex(int readerIndex) {
- buffer.readerIndex(readerIndex);
- }
+//
+// @Override
+// public void writeBytes(org.jboss.netty.nettyBuffer.ChannelBuffer src) {
+// nettyBuffer.writeBytes(src);
+// }
+//
+ @Override
+ public void writeBytes(byte[] src) {
+ nettyBuffer.writeBytes(src);
+ }
+//
+// @Override
+// public void writeBytes(ByteBuffer src) {
+// nettyBuffer.writeBytes(src);
+// }
+//
+// @Override
+// public void writeBytes(org.jboss.netty.nettyBuffer.ChannelBuffer src, int length) {
+// nettyBuffer.writeBytes(src, length);
+// }
+//
+// @Override
+// public int writeBytes(InputStream in, int length) throws IOException {
+// return nettyBuffer.writeBytes(in, length);
+// }
+//
+// @Override
+// public int writeBytes(ScatteringByteChannel in, int length) throws IOException {
+// return nettyBuffer.writeBytes(in, length);
+// }
+//
+// @Override
+// public void writeBytes(org.jboss.netty.nettyBuffer.ChannelBuffer src, int srcIndex, int length) {
+// nettyBuffer.writeBytes(src, srcIndex, length);
+// }
+//
+// @Override
+// public void writeBytes(byte[] src, int srcIndex, int length) {
+// nettyBuffer.writeBytes(src, srcIndex, length);
+// }
+//
+// @Override
+// public void writeInt(int value) {
+// nettyBuffer.writeInt(value);
+// }
+//
+// @Override
+// public void writeLong(long value) {
+// nettyBuffer.writeLong(value);
+// }
+//
+// @Override
+// public void writeMedium(int value) {
+// nettyBuffer.writeMedium(value);
+// }
+//
+// @Override
+// public void writeShort(short value) {
+// nettyBuffer.writeShort(value);
+// }
+//
+// @Override
+// public void writeZero(int length) {
+// nettyBuffer.writeZero(length);
+// }
+//
@Override
- public void resetReaderIndex() {
- buffer.resetReaderIndex();
- }
-
- @Override
- public void resetWriterIndex() {
- buffer.resetWriterIndex();
- }
-
- @Override
- public void setByte(int index, byte value) {
- buffer.setByte(index, value);
- }
-
- @Override
- public void setBytes(int index, org.jboss.netty.buffer.ChannelBuffer src) {
- buffer.setBytes(index, src);
- }
-
- @Override
- public void setBytes(int index, byte[] src) {
- buffer.setBytes(index, src);
- }
-
- @Override
- public void setBytes(int index, ByteBuffer src) {
- buffer.setBytes(index, src);
- }
-
- @Override
- public void setBytes(int index, org.jboss.netty.buffer.ChannelBuffer src, int length) {
- buffer.setBytes(index, src, length);
- }
-
- @Override
- public int setBytes(int index, InputStream in, int length) throws IOException {
- return buffer.setBytes(index, in, length);
- }
-
- @Override
- public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
- return buffer.setBytes(index, in, length);
- }
-
- @Override
- public void setBytes(int index, org.jboss.netty.buffer.ChannelBuffer src, int srcIndex, int length) {
- buffer.setBytes(index, src, srcIndex, length);
- }
-
- @Override
- public void setBytes(int index, byte[] src, int srcIndex, int length) {
- buffer.setBytes(index, src, srcIndex, length);
- }
-
- @Override
- public void setIndex(int readerIndex, int writerIndex) {
- buffer.setIndex(readerIndex, writerIndex);
- }
-
- @Override
- public void setInt(int index, int value) {
- buffer.setInt(index, value);
- }
-
- @Override
- public void setLong(int index, long value) {
- buffer.setLong(index, value);
- }
-
- @Override
- public void setMedium(int index, int value) {
- buffer.setMedium(index, value);
- }
-
- @Override
- public void setShort(int index, short value) {
- buffer.setShort(index, value);
- }
-
- @Override
- public void setZero(int index, int length) {
- buffer.setZero(index, length);
- }
-
- @Override
- public void skipBytes(int length) {
- buffer.skipBytes(length);
- }
-
- @Override
- public int skipBytes(ChannelBufferIndexFinder indexFinder) {
- return buffer.skipBytes(indexFinder);
- }
-
- @Override
- public org.jboss.netty.buffer.ChannelBuffer slice() {
- return buffer.slice();
- }
-
- @Override
- public org.jboss.netty.buffer.ChannelBuffer slice(int index, int length) {
- return buffer.slice(index, length);
- }
-
- @Override
- public ByteBuffer toByteBuffer() {
- return buffer.toByteBuffer();
- }
-
- @Override
- public ByteBuffer toByteBuffer(int index, int length) {
- return buffer.toByteBuffer(index, length);
- }
-
- @Override
- public ByteBuffer[] toByteBuffers() {
- return buffer.toByteBuffers();
- }
-
- @Override
- public ByteBuffer[] toByteBuffers(int index, int length) {
- return buffer.toByteBuffers(index, length);
- }
-
- @Override
- public String toString(String charsetName) {
- return buffer.toString(charsetName);
- }
-
- @Override
- public String toString(String charsetName, ChannelBufferIndexFinder terminatorFinder) {
- return buffer.toString(charsetName, terminatorFinder);
- }
-
- @Override
- public String toString(int index, int length, String charsetName) {
- return buffer.toString(index, length, charsetName);
- }
-
- @Override
- public String toString(int index, int length, String charsetName, ChannelBufferIndexFinder terminatorFinder) {
- return buffer.toString(charsetName, terminatorFinder);
- }
-
- @Override
- public boolean writable() {
- return buffer.writable();
- }
-
- @Override
- public int writableBytes() {
- return buffer.writableBytes();
- }
-
- @Override
- public void writeByte(byte value) {
- buffer.writeByte(value);
- }
-
- @Override
- public void writeBytes(org.jboss.netty.buffer.ChannelBuffer src) {
- buffer.writeBytes(src);
- }
-
- @Override
- public void writeBytes(byte[] src) {
- buffer.writeBytes(src);
- }
-
- @Override
- public void writeBytes(ByteBuffer src) {
- buffer.writeBytes(src);
- }
-
- @Override
- public void writeBytes(org.jboss.netty.buffer.ChannelBuffer src, int length) {
- buffer.writeBytes(src, length);
- }
-
- @Override
- public int writeBytes(InputStream in, int length) throws IOException {
- return buffer.writeBytes(in, length);
- }
-
- @Override
- public int writeBytes(ScatteringByteChannel in, int length) throws IOException {
- return buffer.writeBytes(in, length);
- }
-
- @Override
- public void writeBytes(org.jboss.netty.buffer.ChannelBuffer src, int srcIndex, int length) {
- buffer.writeBytes(src, srcIndex, length);
- }
-
- @Override
- public void writeBytes(byte[] src, int srcIndex, int length) {
- buffer.writeBytes(src, srcIndex, length);
- }
-
- @Override
- public void writeInt(int value) {
- buffer.writeInt(value);
- }
-
- @Override
- public void writeLong(long value) {
- buffer.writeLong(value);
- }
-
- @Override
- public void writeMedium(int value) {
- buffer.writeMedium(value);
- }
-
- @Override
- public void writeShort(short value) {
- buffer.writeShort(value);
- }
-
- @Override
- public void writeZero(int length) {
- buffer.writeZero(length);
- }
-
- @Override
public int writerIndex() {
- return buffer.writerIndex();
+ return nettyBuffer.writerIndex();
}
+//
+// @Override
+// public void writerIndex(int writerIndex) {
+// nettyBuffer.writerIndex(writerIndex);
+// }
- @Override
- public void writerIndex(int writerIndex) {
- buffer.writerIndex(writerIndex);
- }
-
}
Modified: trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyChannelBuffers.java
===================================================================
--- trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyChannelBuffers.java 2010-02-26 18:11:48 UTC (rev 1556)
+++ trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyChannelBuffers.java 2010-02-26 18:20:47 UTC (rev 1557)
@@ -43,7 +43,7 @@
public ChannelBuffer wrappedBuffer(ChannelBuffer... buffers) {
org.jboss.netty.buffer.ChannelBuffer[] nettyBuffers = new org.jboss.netty.buffer.ChannelBuffer[buffers.length];
for (int i =0; i < buffers.length; i++) {
- nettyBuffers[i] = ((NettyChannelBuffer) buffers[i]).buffer;
+ nettyBuffers[i] = ((NettyChannelBuffer) buffers[i]).getUnderlyingChannelBuffer();
}
return new NettyChannelBuffer(org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer(nettyBuffers));
}
Modified: trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyReplayingDecoder.java
===================================================================
--- trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyReplayingDecoder.java 2010-02-26 18:11:48 UTC (rev 1556)
+++ trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyReplayingDecoder.java 2010-02-26 18:20:47 UTC (rev 1557)
@@ -38,6 +38,11 @@
public class NettyReplayingDecoder<T extends Enum<T>> extends ReplayingDecoder<T> implements Decoder.Checkpointer<T> {
final Decoder<T> decoder;
+ public NettyReplayingDecoder(Decoder<T> decoder) {
+ super(true);
+ this.decoder = decoder;
+ }
+
public NettyReplayingDecoder(Decoder<T> decoder, T initialState) {
super(initialState, true);
this.decoder = decoder;
@@ -68,14 +73,14 @@
// }
//
// @Override
-// public Object decode(ChannelHandlerContext ctx, ChannelBuffer buffer, T state) throws Exception {
+// public Object decode(ChannelHandlerContext ctx, ChannelBuffer nettyBuffer, T state) throws Exception {
// return decode(((NettyChannelHandlerContext) ctx).ctx, ((NettyChannelHandlerContext) ctx).ctx.getChannel(),
-// ((NettyChannelBuffer) buffer).buffer, state);
+// ((NettyChannelBuffer) nettyBuffer).nettyBuffer, state);
// }
//
// @Override
// protected Object decode(org.jboss.netty.channel.ChannelHandlerContext ctx, org.jboss.netty.channel.Channel channel,
-// org.jboss.netty.buffer.ChannelBuffer buffer, T state) throws Exception {
+// org.jboss.netty.nettyBuffer.ChannelBuffer nettyBuffer, T state) throws Exception {
// return null;
// }
//
@@ -83,7 +88,7 @@
// ReplayingDecoder<T> decoder;
//
// @Override
-// protected Object decode(org.jboss.netty.channel.ChannelHandlerContext ctx, org.jboss.netty.channel.Channel channel, org.jboss.netty.buffer.ChannelBuffer buffer, T state) throws Exception {
+// protected Object decode(org.jboss.netty.channel.ChannelHandlerContext ctx, org.jboss.netty.channel.Channel channel, org.jboss.netty.nettyBuffer.ChannelBuffer nettyBuffer, T state) throws Exception {
// return null; // TODO: Customise this generated block
// }
// }
Added: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Cache.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Cache.scala (rev 0)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Cache.scala 2010-02-26 18:20:47 UTC (rev 1557)
@@ -0,0 +1,12 @@
+package org.infinispan.server.hotrod
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since 4.1
+ */
+
+abstract class Cache {
+ def put(c: StorageCommand): Reply.Value
+ def get(c: RetrievalCommand): Reply.Value
+}
\ No newline at end of file
Added: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CallerCache.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CallerCache.scala (rev 0)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CallerCache.scala 2010-02-26 18:20:47 UTC (rev 1557)
@@ -0,0 +1,23 @@
+package org.infinispan.server.hotrod
+
+import org.infinispan.manager.CacheManager
+import org.infinispan.{Cache => InfinispanCache}
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since 4.0
+ */
+
+class CallerCache(val manager: CacheManager) extends Cache {
+
+ override def put(c: StorageCommand): Reply.Value = {
+ val cache: InfinispanCache[Array[Byte], Array[Byte]] = manager.getCache(c.cacheName)
+ cache.put(c.key, c.value)
+ Reply.Stored
+ }
+
+ override def get(c: RetrievalCommand): Reply.Value = {
+ null
+ }
+}
\ No newline at end of file
Added: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Command.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Command.scala (rev 0)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Command.scala 2010-02-26 18:20:47 UTC (rev 1557)
@@ -0,0 +1,12 @@
+package org.infinispan.server.hotrod
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since 4.0
+ */
+
+ at Deprecated
+trait Command {
+// def perform(op: Unit => Reply)
+}
\ No newline at end of file
Added: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder410.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder410.scala (rev 0)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder410.scala 2010-02-26 18:20:47 UTC (rev 1557)
@@ -0,0 +1,57 @@
+package org.infinispan.server.hotrod
+
+import org.infinispan.server.core.transport.{ExceptionEvent, ChannelHandlerContext, ChannelBuffer, Decoder}
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since 4.0
+ */
+
+class Decoder410 extends Decoder[NoState] {
+ import Decoder410._
+
+ val Put = 0x01
+
+ @Override
+ def decode(ctx: ChannelHandlerContext, buffer: ChannelBuffer, state: NoState): StorageCommand = {
+ val op = buffer.readUnsignedByte
+ val cacheName = readString(buffer)
+ val id = VLong.read(buffer)
+ val flags = Flags.extractFlags(VInt.read(buffer))
+ val command: StorageCommand =
+ op match {
+ case Put => {
+ val key = readByteArray(buffer)
+ val lifespan = VInt.read(buffer)
+ val maxIdle = VInt.read(buffer)
+ val value = readByteArray(buffer)
+ new StorageCommand(cacheName, key, lifespan, maxIdle, value, flags)({
+ (cache: Cache, command: StorageCommand) => cache.put(command)
+ })
+ }
+ case _ => throw new RuntimeException("Unknown command")// TODO: Changed to unknown command exception
+ }
+ command
+ }
+
+ @Override
+ def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent) {
+ error("Error", e.getCause)
+ }
+
+ private def readString(buffer: ChannelBuffer): String = {
+ val array = new Array[Byte](VInt.read(buffer))
+ buffer.readBytes(array)
+ new String(array, "UTF8")
+ }
+
+ private def readByteArray(buffer: ChannelBuffer): Array[Byte] = {
+ val array = new Array[Byte](VInt.read(buffer))
+ buffer.readBytes(array)
+ array
+ }
+
+}
+
+object Decoder410 extends Logging
\ No newline at end of file
Added: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/EncodedData.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/EncodedData.scala (rev 0)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/EncodedData.scala 2010-02-26 18:20:47 UTC (rev 1557)
@@ -0,0 +1,35 @@
+package org.infinispan.server.hotrod
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since 4.0
+ */
+
+ at Deprecated
+object EncodedData extends Enumeration {
+ type EncodedData = Value
+ val Array = Value(1)
+ val Byte = Value(1 << 1)
+ val Boolean = Value(1 << 2)
+ val Character = Value(1 << 3)
+ val String = Value(1 << 4)
+ val Date = Value(1 << 5)
+ val Double = Value(1 << 6)
+ // 1 << 7 skipped since that's the variable length marker
+ val Float = Value(1 << 8)
+ val Integer = Value(1 << 9)
+ val Long = Value(1 << 10)
+ val Map = Value(1 << 11)
+ val Primitive = Value(1 << 12)
+ val Serialized = Value(1 << 13)
+ val Short = Value(1 << 14)
+ // 1 << 15 skipped since that's the variable length marker
+ val Compressed = Value(1 << 16)
+ val StringBuilder = Value(1 << 17)
+
+}
+
+ at Deprecated
+class EncodedData(dataType: EncodedData, length: Long, data: Array[Byte]) {
+}
\ No newline at end of file
Added: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Flags.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Flags.scala (rev 0)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Flags.scala 2010-02-26 18:20:47 UTC (rev 1557)
@@ -0,0 +1,35 @@
+package org.infinispan.server.hotrod
+
+import scala.collection.mutable.HashSet
+import org.infinispan.context.Flag
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since 4.0
+ */
+
+object Flags extends Enumeration {
+
+ type Flags = Value
+
+ private val ZeroLockAcquisitionTimeout = Value(1, Flag.ZERO_LOCK_ACQUISITION_TIMEOUT.toString)
+ private val CacheModeLocal = Value(1 << 1, Flag.CACHE_MODE_LOCAL.toString)
+ private val SkipLocking = Value(1 << 2, Flag.SKIP_LOCKING.toString)
+ private val ForceWriteLock = Value(1 << 3, Flag.FORCE_WRITE_LOCK.toString)
+ private val SkipCacheStatusCheck = Value(1 << 4, Flag.SKIP_CACHE_STATUS_CHECK.toString)
+ private val ForceAsynchronous = Value(1 << 5, Flag.FORCE_ASYNCHRONOUS.toString)
+ private val ForceSynchronous = Value(1 << 6, Flag.FORCE_SYNCHRONOUS.toString)
+ // 1 << 7 skipped since that's the variable length marker
+ private val SkipCacheStore = Value(1 << 8, Flag.SKIP_CACHE_STORE.toString)
+ private val FailSilently = Value(1 << 9, Flag.FAIL_SILENTLY.toString)
+ private val SkipRemoteLookup = Value(1 << 10, Flag.SKIP_REMOTE_LOOKUP.toString)
+ private val PutForExternalRead = Value(1 << 11, Flag.PUT_FOR_EXTERNAL_READ.toString)
+
+ def extractFlags(bitFlags: Int): Set[Flag] = {
+ val s = new HashSet[Flag]
+ Flags.filter(f => (bitFlags & f.id) > 0).foreach(f => s += Flag.valueOf(f.toString))
+ new scala.collection.immutable.HashSet ++ s
+ }
+
+}
\ No newline at end of file
Added: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/GlobalDecoder.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/GlobalDecoder.scala (rev 0)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/GlobalDecoder.scala 2010-02-26 18:20:47 UTC (rev 1557)
@@ -0,0 +1,58 @@
+package org.infinispan.server.hotrod
+
+import java.io.StreamCorruptedException
+import org.infinispan.server.core.transport.{ExceptionEvent, Decoder, ChannelBuffer, ChannelHandlerContext}
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since 4.0
+ */
+class GlobalDecoder extends Decoder[NoState] {
+ import GlobalDecoder._
+
+ val Magic = 0xA0
+ val Version410 = 41
+
+ override def decode(ctx: ChannelHandlerContext, buffer: ChannelBuffer, state: NoState): Object = {
+// trace("Buffer contains: {0}", buffer)
+// state match {
+// case NoState.VOID => {
+ val header = getHeader(buffer)
+ if (header == null) null
+ verifyMagic(header)
+ val version = getVersion(header)
+ val decoder =
+ version match {
+ case Version410 => new Decoder410
+ case _ => throw new StreamCorruptedException("Unknown version:" + version)
+ }
+ decoder.decode(ctx, buffer, state)
+// }
+// }
+ }
+
+ override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent) {
+ error("Error", e.getCause)
+ }
+
+ private def getHeader(buffer: ChannelBuffer): ChannelBuffer = {
+ if (buffer.readableBytes() < 2) null
+ buffer.readBytes(2)
+ }
+
+ private def verifyMagic(buffer: ChannelBuffer) {
+ val magic = buffer.readUnsignedByte()
+ if (magic != Magic) {
+ buffer.resetReaderIndex()
+ throw new StreamCorruptedException("Magic byte incorrect: " + magic)
+ }
+ }
+
+ private def getVersion(buffer: ChannelBuffer) = {
+ buffer.readUnsignedByte()
+ }
+
+}
+
+object GlobalDecoder extends Logging
\ No newline at end of file
Added: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Handler.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Handler.scala (rev 0)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Handler.scala 2010-02-26 18:20:47 UTC (rev 1557)
@@ -0,0 +1,24 @@
+package org.infinispan.server.hotrod
+
+import org.infinispan.server.core.transport.ChannelHandlerContext
+import org.infinispan.server.core.{MessageEvent, CommandHandler}
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since 4.1
+ */
+
+class Handler(val hotCache: CallerCache) extends CommandHandler {
+
+ override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent) {
+ e.getMessage match {
+ case c: StorageCommand => println(c.op(hotCache, c))
+ }
+
+
+// e.getMessage match {
+// case s: StorageCommand => s.perform(s)
+// }
+ }
+}
\ No newline at end of file
Added: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala (rev 0)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala 2010-02-26 18:20:47 UTC (rev 1557)
@@ -0,0 +1,39 @@
+package org.infinispan.server.hotrod
+
+import org.infinispan.manager.CacheManager
+import org.infinispan.server.core.transport.netty.{NettyServer, NettyReplayingDecoder}
+import java.net.InetSocketAddress
+import org.infinispan.server.core.Server
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since 3.5
+ */
+
+class HotRodServer(val host: String,
+ val port: Int,
+ val manager: CacheManager,
+ val masterThreads: Int,
+ val workerThreads: Int) {
+
+ import HotRodServer._
+
+ private var server: Server = null
+
+ def start {
+ var decoder = new GlobalDecoder
+ val nettyDecoder = new NettyReplayingDecoder[NoState](decoder)
+ val commandHandler = new Handler(new CallerCache(manager))
+ server = new NettyServer(commandHandler, nettyDecoder, new InetSocketAddress(host, port),
+ masterThreads, workerThreads, "HotRod")
+ server.start
+ info("Started Hot Rod bound to {0}:{1}", host, port)
+ }
+
+ def stop {
+ if (server != null) server.stop
+ }
+}
+
+object HotRodServer extends Logging
\ No newline at end of file
Added: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Logging.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Logging.scala (rev 0)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Logging.scala 2010-02-26 18:20:47 UTC (rev 1557)
@@ -0,0 +1,41 @@
+package org.infinispan.server.hotrod
+
+import org.infinispan.util.logging.{Log, LogFactory}
+import collection.mutable.WrappedArray
+
+/**
+ * TODO: This would be simplest way to get logging in but unfortunately it creates
+ * a new Log instance per object created
+ *
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since 4.1
+ */
+trait Logging {
+ private lazy val log: Log = LogFactory.getLog(getClass)
+
+ def info(msg: => String) = log.info(msg, null)
+
+ def info(msg: => String, params: Any*) =
+ // params.map(_.asInstanceOf[AnyRef]) => returns a Seq[AnyRef]
+ // the ': _*' part tells the compiler to pass it as varargs
+ if (log.isInfoEnabled) log.info(msg, params.map(_.asInstanceOf[AnyRef]) : _*)
+
+ def trace(msg: => String) = log.trace(msg, null)
+
+ def trace(msg: => String, params: Any*) =
+ if (log.isTraceEnabled) log.trace(msg, params.map(_.asInstanceOf[AnyRef]) : _*)
+
+ def error(msg: => String) = log.error(msg, null)
+
+ def error(msg: => String, t: Throwable) = log.error(msg, t, null)
+
+ // TODO: Sort out the other error methods that take both Throwable and varargs
+
+// def error(msg: => String, params: Any*) =
+// if (log.isErrorEnabled) log.error(msg, params.map(_.asInstanceOf[AnyRef]) : _*)
+//
+// def error(msg: => String, t: Throwable, params: Any*) =
+// if (log.isErrorEnabled) log.error(msg, t, params.map(_.asInstanceOf[AnyRef]) : _*)
+
+}
\ No newline at end of file
Added: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/NoState.java
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/NoState.java (rev 0)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/NoState.java 2010-02-26 18:20:47 UTC (rev 1557)
@@ -0,0 +1,33 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2010, Red Hat, Inc. and/or its affiliates, and
+ * individual contributors as indicated by the @author tags. See the
+ * copyright.txt file in the distribution for a full listing of
+ * individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.infinispan.server.hotrod;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Galder Zamarreño
+ * @since 4.0
+ */
+public enum NoState {
+}
Added: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Reply.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Reply.scala (rev 0)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Reply.scala 2010-02-26 18:20:47 UTC (rev 1557)
@@ -0,0 +1,12 @@
+package org.infinispan.server.hotrod
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since 4.0
+ */
+
+object Reply extends Enumeration {
+ type Reply = Value
+ val Stored = Value
+}
\ No newline at end of file
Added: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/RetrievalCommand.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/RetrievalCommand.scala (rev 0)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/RetrievalCommand.scala 2010-02-26 18:20:47 UTC (rev 1557)
@@ -0,0 +1,9 @@
+package org.infinispan.server.hotrod
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since 4.1
+ */
+
+class RetrievalCommand
\ No newline at end of file
Added: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/StatsCache.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/StatsCache.scala (rev 0)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/StatsCache.scala 2010-02-26 18:20:47 UTC (rev 1557)
@@ -0,0 +1,31 @@
+package org.infinispan.server.hotrod
+
+/**
+ * // TODO: Document
+ *
+ * The idea with this trait is to be able to do stuff like this:
+ *
+ * If stats enabled:
+ * val cache = (new CallerCache with StatsCache)
+ * If stats disabled:
+ * val cache = new CallerCache
+ *
+ * A very easy way to define interceptors!!!!
+ *
+ * @author Galder Zamarreño
+ * @since 4.1
+ */
+
+trait StatsCache extends Cache {
+
+ abstract override def put(c: StorageCommand): Reply.Value = {
+ // TODO: calculate stats if necessary
+ super.put(c)
+ }
+
+ abstract override def get(c: RetrievalCommand): Reply.Value = {
+ // TODO: calculate stats if necessary
+ super.get(c)
+ }
+
+}
\ No newline at end of file
Added: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/StorageCommand.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/StorageCommand.scala (rev 0)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/StorageCommand.scala 2010-02-26 18:20:47 UTC (rev 1557)
@@ -0,0 +1,21 @@
+package org.infinispan.server.hotrod
+
+import org.infinispan.context.Flag
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since 4.0
+ */
+
+// val cache: Cache[Array[Byte], Array[Byte]]
+class StorageCommand(val cacheName: String, val key: Array[Byte], val lifespan: Int,
+ val maxIdle: Int, val value: Array[Byte], val flags: Set[Flag])
+ (val op: (Cache, StorageCommand) => Reply.Value)
+//{
+//
+//// def perform(op: StorageCommand => Reply.Value) {
+//// op(this)
+//// }
+////
+//}
\ No newline at end of file
Added: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/VInt.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/VInt.scala (rev 0)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/VInt.scala 2010-02-26 18:20:47 UTC (rev 1557)
@@ -0,0 +1,33 @@
+package org.infinispan.server.hotrod
+
+import org.infinispan.server.core.transport.ChannelBuffer
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since 4.0
+ */
+
+object VInt {
+
+ def write(out: ChannelBuffer, i: Int) {
+ if ((i & ~0x7F) == 0) out.writeByte(i.toByte)
+ else {
+ out.writeByte(((i & 0x7f) | 0x80).toByte)
+ write(out, i >>> 7)
+ }
+ }
+
+ def read(in: ChannelBuffer): Int = {
+ val b = in.readByte
+ read(in, b, 7, b & 0x7F)
+ }
+
+ private def read(in: ChannelBuffer, b: Byte, shift: Int, i: Int): Int = {
+ if ((b & 0x80) == 0) i
+ else {
+ val bb = in.readByte
+ read(in, bb, shift + 7, i | ((bb & 0x7FL) << shift).toInt)
+ }
+ }
+}
\ No newline at end of file
Added: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/VLong.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/VLong.scala (rev 0)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/VLong.scala 2010-02-26 18:20:47 UTC (rev 1557)
@@ -0,0 +1,33 @@
+package org.infinispan.server.hotrod
+
+import org.infinispan.server.core.transport.ChannelBuffer
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since 4.0
+ */
+
+object VLong {
+
+ def write(out: ChannelBuffer, i: Long) {
+ if ((i & ~0x7F) == 0) out.writeByte(i.toByte)
+ else {
+ out.writeByte(((i & 0x7f) | 0x80).toByte)
+ write(out, i >>> 7)
+ }
+ }
+
+ def read(in: ChannelBuffer): Long = {
+ val b = in.readByte
+ read(in, b, 7, b & 0x7F)
+ }
+
+ private def read(in: ChannelBuffer, b: Byte, shift: Int, i: Long): Long = {
+ if ((b & 0x80) == 0) i
+ else {
+ val bb = in.readByte
+ read(in, bb, shift + 7, i | (bb & 0x7FL) << shift)
+ }
+ }
+}
\ No newline at end of file
Added: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FlagsTest.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FlagsTest.scala (rev 0)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FlagsTest.scala 2010-02-26 18:20:47 UTC (rev 1557)
@@ -0,0 +1,71 @@
+package org.infinispan.server.hotrod
+
+import org.testng.annotations.Test
+import org.infinispan.context.Flag
+import org.testng.Assert._
+
+/**
+ * Appears that optional parameters in annotations result in compiler errors:
+ * https://lampsvn.epfl.ch/trac/scala/ticket/1810
+ *
+ * Keep an eye on that for @Test and @AfterClass annotations
+ *
+ * @author Galder Zamarreño
+ * @since 4.0
+ */
+ at Test
+class FlagsTest {
+
+ def testSingleFlag {
+ flag(1)(1) { flags => flags contains Flag.ZERO_LOCK_ACQUISITION_TIMEOUT }
+ flag(1 << 1)(1) { flags => flags contains Flag.CACHE_MODE_LOCAL }
+ flag(1 << 2)(1) { flags => flags contains Flag.SKIP_LOCKING }
+ flag(1 << 3)(1) { flags => flags contains Flag.FORCE_WRITE_LOCK }
+ flag(1 << 4)(1) { flags => flags contains Flag.SKIP_CACHE_STATUS_CHECK }
+ flag(1 << 5)(1) { flags => flags contains Flag.FORCE_ASYNCHRONOUS }
+ flag(1 << 6)(1) { flags => flags contains Flag.FORCE_SYNCHRONOUS }
+ flag(1 << 8)(1) { flags => flags contains Flag.SKIP_CACHE_STORE }
+ flag(1 << 9)(1) { flags => flags contains Flag.FAIL_SILENTLY }
+ flag(1 << 10)(1) { flags => flags contains Flag.SKIP_REMOTE_LOOKUP }
+ flag(1 << 11)(1) { flags => flags contains Flag.PUT_FOR_EXTERNAL_READ }
+ }
+
+ def testMultipleFlags {
+ flag(3)(2) {
+ flags => (flags contains Flag.ZERO_LOCK_ACQUISITION_TIMEOUT) &&
+ (flags contains Flag.CACHE_MODE_LOCAL)
+ }
+ flag(15)(4) {
+ flags => (flags contains Flag.ZERO_LOCK_ACQUISITION_TIMEOUT) &&
+ (flags contains Flag.CACHE_MODE_LOCAL) &&
+ (flags contains Flag.SKIP_LOCKING) &&
+ (flags contains Flag.FORCE_WRITE_LOCK)
+ }
+ flag(0xF7F)(11) {
+ flags => (flags contains Flag.ZERO_LOCK_ACQUISITION_TIMEOUT) &&
+ (flags contains Flag.CACHE_MODE_LOCAL) &&
+ (flags contains Flag.SKIP_LOCKING) &&
+ (flags contains Flag.FORCE_WRITE_LOCK) &&
+ (flags contains Flag.SKIP_CACHE_STATUS_CHECK) &&
+ (flags contains Flag.FORCE_ASYNCHRONOUS) &&
+ (flags contains Flag.FORCE_SYNCHRONOUS) &&
+ (flags contains Flag.SKIP_CACHE_STORE) &&
+ (flags contains Flag.FAIL_SILENTLY) &&
+ (flags contains Flag.SKIP_REMOTE_LOOKUP) &&
+ (flags contains Flag.PUT_FOR_EXTERNAL_READ)
+ }
+ }
+
+// private def flag(bitFlags: Int)(size: Int)(p: Set[Flags.Value] => Boolean) {
+// var flags = Flags.extractFlags(bitFlags)
+// assert { flags.size == size }
+// assert { true == p(flags) }
+// }
+
+ private def flag(bitFlags: Int)(size: Int)(p: Set[Flag] => Boolean) {
+ var flags = Flags.extractFlags(bitFlags)
+ assertEquals(flags.size, size)
+ assertTrue(p(flags))
+ }
+
+}
\ No newline at end of file
Added: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FunctionalTest.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FunctionalTest.scala (rev 0)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FunctionalTest.scala 2010-02-26 18:20:47 UTC (rev 1557)
@@ -0,0 +1,46 @@
+package org.infinispan.server.hotrod
+
+import org.infinispan.test.SingleCacheManagerTest
+import org.infinispan.test.fwk.TestCacheManagerFactory
+import org.infinispan.manager.CacheManager
+import org.testng.annotations.{AfterClass, Test}
+import java.lang.reflect.Method
+import test.{Client, Utils}
+import org.testng.Assert._
+
+/**
+ * TODO: Document
+ *
+ * Note: It appears that optional parameters in annotations result in compiler errors.
+ * This has been solved in Scala 2.8.0.Beta1, so use that compiler,
+ * otherwise this class won't compile.
+ * https://lampsvn.epfl.ch/trac/scala/ticket/1810
+ *
+ * Keep an eye on that for @Test and @AfterClass annotations
+ *
+ * @author Galder Zamarreño
+ * @since 4.1
+ */
+ at Test(groups = Array("functional"), testName = "server.hotrod.FunctionalTest")
+class FunctionalTest extends SingleCacheManagerTest with Utils with Client {
+ private var server: HotRodServer = null
+
+ override def createCacheManager: CacheManager = {
+ val cacheManager = TestCacheManagerFactory.createLocalCacheManager
+ server = createHotRodServer(cacheManager)
+ server.start
+ cacheManager
+ }
+
+ def testPutBasic(m: Method) {
+ assertTrue(connect("127.0.0.1", server.port))
+ assertTrue(put("__default", k(m) , 0, 0, v(m)))
+ }
+
+ @AfterClass(alwaysRun = true)
+ override def destroyAfterClass {
+ log.debug("Test finished, close memcached server", null)
+ server.stop
+ }
+
+}
\ No newline at end of file
Added: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/VariableLengthTest.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/VariableLengthTest.scala (rev 0)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/VariableLengthTest.scala 2010-02-26 18:20:47 UTC (rev 1557)
@@ -0,0 +1,119 @@
+package org.infinispan.server.hotrod
+
+import org.testng.annotations.Test
+import org.infinispan.server.core.transport.netty.NettyChannelBuffer
+import org.jboss.netty.buffer.{ChannelBuffers}
+import org.testng.Assert._
+
+/**
+ * Appears that optional parameters in annotations result in compiler errors:
+ * https://lampsvn.epfl.ch/trac/scala/ticket/1810
+ *
+ * Keep an eye on that for @Test and @AfterClass annotations
+ *
+ * @author Galder Zamarreño
+ * @since 4.0
+ */
+
+ at Test
+class VariableLengthTest {
+
+ def test2pow7minus1 {
+ writeReadInt(127, 1)
+ }
+
+ def test2pow7 {
+ writeReadInt(128, 2)
+ }
+
+ def test2pow14minus1 {
+ writeReadInt(16383, 2)
+ }
+
+ def test2pow14 {
+ writeReadInt(16384, 3)
+ }
+
+ def test2pow21minus1 {
+ writeReadInt(2097151, 3)
+ }
+
+ def test2pow21 {
+ writeReadInt(2097152, 4)
+ }
+
+ def test2pow28minus1 {
+ writeReadInt(268435455, 4)
+ }
+
+ def test2pow28 {
+ writeReadInt(268435456, 5)
+ }
+
+ def test2pow35minus1 {
+ writeReadLong(34359738367L, 5)
+ }
+
+ def test2pow35 {
+ writeReadLong(34359738368L, 6)
+ }
+
+ def test2pow42minus1 {
+ writeReadLong(4398046511103L, 6)
+ }
+
+ def test2pow42 {
+ writeReadLong(4398046511104L, 7)
+ }
+
+ def test2pow49minus1 {
+ writeReadLong(562949953421311L, 7)
+ }
+
+ def test2pow49 {
+ writeReadLong(562949953421312L, 8)
+ }
+
+ def test2pow56minus1 {
+ writeReadLong(72057594037927935L, 8)
+ }
+
+ def test2pow56 {
+ writeReadLong(72057594037927936L, 9)
+ }
+
+ def test2pow63minus1 {
+ writeReadLong(9223372036854775807L, 9)
+ }
+
+// def test2pow63() {
+// writeReadLong(9223372036854775808L, 10)
+// }
+
+ private def writeReadInt(num: Int, expected: Int) {
+ val buffer = new NettyChannelBuffer(ChannelBuffers.directBuffer(1024))
+ assert(buffer.writerIndex == 0)
+ VInt.write(buffer, num)
+ assertEquals(buffer.writerIndex, expected)
+ assertEquals(VInt.read(buffer), num)
+ }
+
+ private def writeReadLong(num: Long, expected: Int) {
+ val buffer = new NettyChannelBuffer(ChannelBuffers.directBuffer(1024))
+ assert(buffer.writerIndex == 0)
+ VLong.write(buffer, num)
+ assertEquals(buffer.writerIndex, expected)
+ assertEquals(VLong.read(buffer), num)
+ }
+
+// def testEquals128Old() {
+// val baos = new ByteArrayOutputStream(1024);
+// val oos = new ObjectOutputStream(baos);
+// UnsignedNumeric.writeUnsignedInt(oos, 128);
+// oos.flush();
+// assertEquals(baos.size() - 6, 2);
+// val bais = new ByteArrayInputStream(baos.toByteArray());
+// val ois = new ObjectInputStream(bais);
+// assertEquals(UnsignedNumeric.readUnsignedInt(ois), 128);
+// }
+}
\ No newline at end of file
Added: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Client.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Client.scala (rev 0)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Client.scala 2010-02-26 18:20:47 UTC (rev 1557)
@@ -0,0 +1,120 @@
+package org.infinispan.server.hotrod.test
+
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
+import java.util.concurrent.Executors
+import org.jboss.netty.bootstrap.ClientBootstrap
+import java.net.InetSocketAddress
+import org.jboss.netty.handler.codec.oneone.OneToOneEncoder
+import org.jboss.netty.channel._
+import org.jboss.netty.buffer.{ChannelBuffer, ChannelBuffers}
+import org.infinispan.server.core.transport.netty.NettyChannelBuffer
+import org.jboss.netty.handler.codec.replay.ReplayingDecoder
+import org.infinispan.server.hotrod.{Logging, NoState, VLong, VInt}
+
+/**
+ * // TODO: Document this
+ *
+ * // TODO: Transform to Netty independent code
+ *
+ * @author Galder Zamarreño
+ * @since 4.1
+ */
+trait Client {
+ private var channel: Channel = null
+
+ def connect(host: String, port: Int): Boolean = {
+ // Set up.
+ val factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool)
+ val bootstrap: ClientBootstrap = new ClientBootstrap(factory)
+ bootstrap.setPipelineFactory(ClientPipelineFactory)
+ bootstrap.setOption("tcpNoDelay", true)
+ bootstrap.setOption("keepAlive", true)
+ // Make a new connection.
+ val connectFuture = bootstrap.connect(new InetSocketAddress(host, port))
+ // Wait until the connection is made successfully.
+ channel = connectFuture.awaitUninterruptibly.getChannel
+// // Get the handler instance to retrieve the answer.
+// var handler = channel.getPipeline.getLast.asInstanceOf[ClientHandler]
+ connectFuture.isSuccess
+ }
+
+ def put(cacheName: String, key: Array[Byte], lifespan: Int, maxIdle: Int, value: Array[Byte]): Boolean = {
+ val writeFuture = channel.write(new Store(cacheName, key, lifespan, maxIdle, value))
+ writeFuture.awaitUninterruptibly
+ writeFuture.isSuccess
+ }
+
+}
+
+ at ChannelPipelineCoverage("all")
+private object ClientPipelineFactory extends ChannelPipelineFactory {
+
+ override def getPipeline() = {
+ val pipeline = Channels.pipeline
+ pipeline.addLast("decoder", Decoder)
+ pipeline.addLast("encoder", Encoder)
+// pipeline.addLast("handler", new FactorialClientHandler(count))
+ pipeline
+ }
+
+}
+
+ at ChannelPipelineCoverage("all")
+private object Encoder extends OneToOneEncoder {
+
+ override def encode(ctx: ChannelHandlerContext, ch: Channel, msg: Any) = {
+ val ret =
+ msg match {
+ case s: Store => {
+ val buf = new NettyChannelBuffer(ChannelBuffers.dynamicBuffer)
+ buf.writeByte(0xA0.asInstanceOf[Byte]) // magic
+ buf.writeByte(41) // version
+ buf.writeByte(0x01) // opcode - put
+ VInt.write(buf, s.cacheName.length) // cache name length
+ buf.writeBytes(s.cacheName.getBytes()) // cache name
+ VLong.write(buf, 1) // message id
+ VInt.write(buf, 0) // flags
+ VInt.write(buf, s.key.length) // key length
+ buf.writeBytes(s.key) // key
+ VInt.write(buf, s.lifespan) // lifespan
+ VInt.write(buf, s.maxIdle) // maxIdle
+ VInt.write(buf, s.value.length) // value length
+ buf.writeBytes(s.value) // value
+ buf.getUnderlyingChannelBuffer
+ }
+ }
+ ret
+ }
+
+}
+
+private object Decoder extends ReplayingDecoder[NoState] with Logging {
+
+ override def decode(ctx: ChannelHandlerContext, ch: Channel, buffer: ChannelBuffer, state: NoState) = {
+ null
+ }
+
+ override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent) {
+ error("Error", e.getCause)
+ }
+}
+
+//private class ClientHandler(val command: Any) extends SimpleChannelUpstreamHandler {
+//
+// override def channelConnected(ctx: ChannelHandlerContext, e: ChannelStateEvent) {
+// sendCommand(e)
+// }
+//
+// override def channelInterestChanged(ctx: ChannelHandlerContext, e: ChannelStateEvent) {
+// sendCommand(e)
+// }
+//
+// private def sendCommand(e: ChannelStateEvent) {
+// var channel = e.getChannel
+// channel.write(command)
+// }
+//}
+
+private class Store(val cacheName: String, val key: Array[Byte],
+ val lifespan: Int, val maxIdle: Int,
+ val value: Array[Byte])
\ No newline at end of file
Added: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Utils.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Utils.scala (rev 0)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Utils.scala 2010-02-26 18:20:47 UTC (rev 1557)
@@ -0,0 +1,42 @@
+package org.infinispan.server.hotrod.test
+
+import java.util.concurrent.atomic.AtomicInteger
+import org.infinispan.manager.CacheManager
+import java.lang.reflect.Method
+import org.infinispan.server.hotrod.HotRodServer
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since 4.1
+ */
+
+trait Utils {
+ def host = "127.0.0.1"
+
+ def createHotRodServer(manager: CacheManager): HotRodServer = {
+ new HotRodServer(host, UniquePortThreadLocal.get.intValue, manager, 0, 0)
+ }
+
+ def k(m: Method, prefix: String): Array[Byte] = {
+ (prefix + m.getName) getBytes
+ }
+
+ def v(m: Method, prefix: String): Array[Byte] = {
+ k(m, prefix)
+ }
+
+ def k(m: Method): Array[Byte] = {
+ k(m, "k-")
+ }
+
+ def v(m: Method): Array[Byte] = {
+ v(m, "v-")
+ }
+
+ }
+
+object UniquePortThreadLocal extends ThreadLocal[Int] {
+ private val uniqueAddr = new AtomicInteger(21212)
+ override def initialValue: Int = uniqueAddr.getAndAdd(100)
+}
\ No newline at end of file
More information about the infinispan-commits
mailing list