[infinispan-commits] Infinispan SVN: r1642 - in trunk/client/hotrod-client: src/main/java/org/infinispan/client/hotrod and 4 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Wed Mar 31 03:34:15 EDT 2010
Author: mircea.markus
Date: 2010-03-31 03:34:14 -0400 (Wed, 31 Mar 2010)
New Revision: 1642
Added:
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/AbstractTransport.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/AbstractTransportFactory.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/HotrodClientDecoder.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/HotrodClientEncoder.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/HotrodClientPipelaneFactory.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/InputStreamAdapter.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransport.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransportFactory.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/OutputStreamAdapter.java
Modified:
trunk/client/hotrod-client/pom.xml
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/Flag.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodConstants.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperationsImpl.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/Transport.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransport.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java
Log:
ongoing work on hotrod client
Modified: trunk/client/hotrod-client/pom.xml
===================================================================
--- trunk/client/hotrod-client/pom.xml 2010-03-30 13:38:37 UTC (rev 1641)
+++ trunk/client/hotrod-client/pom.xml 2010-03-31 07:34:14 UTC (rev 1642)
@@ -9,7 +9,11 @@
<version>4.1.0-SNAPSHOT</version>
<relativePath>../../parent/pom.xml</relativePath>
</parent>
+ <properties>
+ <version.netty>3.2.0.BETA1</version.netty>
+ </properties>
+
<artifactId>infinispan-client-hotrod</artifactId>
<name>Infinispan Client Hotrod Module</name>
<description>Infinispan client hotrod module</description>
@@ -45,5 +49,11 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.jboss.netty</groupId>
+ <artifactId>netty</artifactId>
+ <version>${version.netty}</version>
+ </dependency>
+
</dependencies>
</project>
Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/Flag.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/Flag.java 2010-03-30 13:38:37 UTC (rev 1641)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/Flag.java 2010-03-31 07:34:14 UTC (rev 1642)
@@ -7,7 +7,7 @@
* @since 4.1
*/
public enum Flag {
- FORCE_RETURN_VALUE(0x0800);
+ FORCE_RETURN_VALUE(0x0001);
private int flagInt;
Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/AbstractTransport.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/AbstractTransport.java (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/AbstractTransport.java 2010-03-31 07:34:14 UTC (rev 1642)
@@ -0,0 +1,36 @@
+package org.infinispan.client.hotrod.impl;
+
+import org.infinispan.client.hotrod.impl.transport.TransportException;
+
+import java.io.IOException;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public abstract class AbstractTransport implements Transport {
+
+ public byte[] readByteArray() {
+ int responseLength = readVInt();
+ byte[] bufferToFill = new byte[responseLength];
+ readBuffer(bufferToFill);
+ return bufferToFill;
+ }
+
+ @Override
+ public String readString() {
+ byte[] strContent = readByteArray();
+ return new String(strContent);//todo take care of encoding here
+ }
+
+ protected abstract void readBuffer(byte[] bufferToFill);
+
+ public void writeByteArray(byte[] toAppend) {
+ writeVInt(toAppend.length);
+ writeBuffer(toAppend);
+ }
+
+ protected abstract void writeBuffer(byte[] toAppend);
+}
Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodConstants.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodConstants.java 2010-03-30 13:38:37 UTC (rev 1641)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodConstants.java 2010-03-31 07:34:14 UTC (rev 1642)
@@ -1,7 +1,5 @@
package org.infinispan.client.hotrod.impl;
-import java.util.EnumMap;
-
/**
* // TODO: Document this
*
Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperationsImpl.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperationsImpl.java 2010-03-30 13:38:37 UTC (rev 1641)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperationsImpl.java 2010-03-31 07:34:14 UTC (rev 1642)
@@ -34,7 +34,7 @@
return null;
}
if (status == NO_ERROR_STATUS) {
- return readValue(transport);
+ return transport.readByteArray();
}
} finally {
transport.release();
@@ -81,7 +81,7 @@
}
if (status == NO_ERROR_STATUS) {
long version = transport.readVLong();
- byte[] value = readValue(transport);
+ byte[] value = transport.readByteArray();
return new BinaryVersionedValue(version, value);
}
} finally {
@@ -150,7 +150,7 @@
private short sendKeyOperation(byte[] key, Transport transport, byte opCode, Flag[] flags, byte opRespCode) {
// 1) write [header][key length][key]
long messageId = writeHeader(transport, opCode, flags);
- transport.writeBytesArray(key);
+ transport.writeByteArray(key);
transport.flush();
// 2) now read the header
@@ -174,10 +174,10 @@
long messageId = writeHeader(transport, opCode, flags);
// 2) write key and value
- transport.writeBytesArray(key);
+ transport.writeByteArray(key);
transport.writeVInt(lifespan);
transport.writeVInt(maxIdle);
- transport.writeBytesArray(value);
+ transport.writeByteArray(value);
transport.flush();
// 3) now read header
@@ -193,12 +193,12 @@
*/
private long writeHeader(Transport transport, short operationCode, Flag... flags) {
- transport.appendUnsignedByte(REQUEST_MAGIC);
+ transport.writeByte(REQUEST_MAGIC);
long messageId = MSG_ID.incrementAndGet();
transport.writeVLong(messageId);
transport.writeByte(HOTROD_VERSION);
transport.writeByte(operationCode);
- transport.writeBytesArray(cacheNameBytes);
+ transport.writeByteArray(cacheNameBytes);
int flagInt = 0;
if (flags != null) {
for (Flag flag : flags) {
@@ -259,15 +259,6 @@
}
}
- /**
- * Reads the length og the byte array and then the byte array from the transport.
- */
- private byte[] readValue(Transport transport) {
- int responseLength = transport.readVInt();
- byte[] result = new byte[responseLength];
- return transport.readByteArray(result);
- }
-
private boolean hasForceReturn(Flag[] flags) {
if (flags == null) return false;
for (Flag flag : flags) {
@@ -277,6 +268,6 @@
}
private byte[] returnPossiblePrevValue(Transport transport, Flag[] flags) {
- return hasForceReturn(flags) ? readValue(transport) : null;
+ return hasForceReturn(flags) ? transport.readByteArray() : null;
}
}
Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/Transport.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/Transport.java 2010-03-30 13:38:37 UTC (rev 1641)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/Transport.java 2010-03-31 07:34:14 UTC (rev 1642)
@@ -8,15 +8,10 @@
*/
public interface Transport {
- public void writeBytesArray(byte... toAppend);
+ public void writeByteArray(byte[] toAppend);
public void writeByte(short toWrite);
- /**
- * Treats the tailing byte as an unsigned byte.
- */
- public void appendUnsignedByte(short requestMagic);
-
public void writeVInt(int length);
public void writeVLong(long l);
@@ -33,9 +28,8 @@
/**
* reads an vint which is size; then an array having that size.
- * @param bufferToFill
*/
- public byte[] readByteArray(byte[] bufferToFill);
+ public byte[] readByteArray();
String readString();
}
Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/AbstractTransportFactory.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/AbstractTransportFactory.java (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/AbstractTransportFactory.java 2010-03-31 07:34:14 UTC (rev 1642)
@@ -0,0 +1,31 @@
+package org.infinispan.client.hotrod.impl.transport;
+
+import org.infinispan.client.hotrod.impl.TransportFactory;
+
+import java.util.Properties;
+import java.util.StringTokenizer;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public abstract class AbstractTransportFactory implements TransportFactory {
+ protected String serverHost;
+ protected int serverPort;
+
+ public void init(Properties props) {
+ String servers = props.getProperty("hotrod-servers");
+ StringTokenizer tokenizer = new StringTokenizer(servers,";");
+ String server = tokenizer.nextToken();
+ String[] serverDef = tokenizeServer(server);
+ serverHost = serverDef[0];
+ serverPort = Integer.parseInt(serverDef[1]);
+ }
+
+ private String[] tokenizeServer(String server) {
+ StringTokenizer t = new StringTokenizer(server, ":");
+ return new String[] {t.nextToken(), t.nextToken()};
+ }
+}
Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/HotrodClientDecoder.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/HotrodClientDecoder.java (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/HotrodClientDecoder.java 2010-03-31 07:34:14 UTC (rev 1642)
@@ -0,0 +1,66 @@
+package org.infinispan.client.hotrod.impl.transport.netty;
+
+import org.infinispan.client.hotrod.impl.transport.TransportException;
+import org.infinispan.client.hotrod.impl.transport.VHelper;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.frame.FrameDecoder;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public class HotrodClientDecoder extends FrameDecoder {
+
+ private final ChannelBuffer buffer = ChannelBuffers.dynamicBuffer();
+ private final InputStreamAdapter isa = new InputStreamAdapter(buffer);
+
+ @Override
+ protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
+ synchronized (this.buffer) {
+ this.buffer.writeBytes(buffer);
+ if (this.buffer.readableBytes() > 0) {
+ this.buffer.notify();
+ }
+ }
+ return null;
+ }
+
+ public long readVLong() {
+ return VHelper.readVLong(isa);
+ }
+
+ public int readVInt() {
+ return VHelper.readVInt(isa);
+ }
+
+ public void fillBuffer(byte[] bufferToFill) {
+ synchronized (buffer) {
+ if (buffer.readableBytes() < bufferToFill.length) {
+ try {
+ buffer.wait();
+ } catch (InterruptedException e) {
+ throw new TransportException(e);
+ }
+ }
+ buffer.readBytes(bufferToFill);
+ }
+ }
+
+ public short readByte() {
+ synchronized (buffer) {
+ if (!buffer.readable()) {
+ try {
+ buffer.wait();
+ } catch (InterruptedException e) {
+ throw new TransportException(e);
+ }
+ }
+ return buffer.readUnsignedByte();
+ }
+ }
+}
Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/HotrodClientEncoder.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/HotrodClientEncoder.java (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/HotrodClientEncoder.java 2010-03-31 07:34:14 UTC (rev 1642)
@@ -0,0 +1,54 @@
+package org.infinispan.client.hotrod.impl.transport.netty;
+
+import org.infinispan.client.hotrod.impl.transport.TransportException;
+import org.infinispan.client.hotrod.impl.transport.VHelper;
+import org.infinispan.io.ExposedByteArrayOutputStream;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
+
+import static org.jboss.netty.buffer.ChannelBuffers.*;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public class HotrodClientEncoder extends OneToOneEncoder {
+
+ private OutputStreamAdapter osa = new OutputStreamAdapter();
+
+ @Override
+ protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
+ if (msg instanceof byte[]) {
+ return wrappedBuffer((byte[])msg);
+ } else if (msg instanceof Integer) {
+ int intMsg = (Integer) msg;
+ ChannelBuffer buffer = getBuffer(channel);
+ VHelper.writeVInt(intMsg, osa);
+ return buffer;
+ } else if (msg instanceof Long) {
+ ChannelBuffer buffer = getBuffer(channel);
+ long longMsg = (Long) msg;
+ VHelper.writeVLong(longMsg, osa);
+ return buffer;
+ } else if (msg instanceof Short) {
+ ChannelBuffer buffer = getBuffer(channel);
+ short byteMsg = (Short) msg;
+ buffer.writeByte(byteMsg);
+ return buffer;
+ } else {
+ throw new TransportException("Unknown msg type: " + msg.getClass());
+ }
+ }
+
+
+ private ChannelBuffer getBuffer(Channel channel) {
+ ChannelBuffer buffer = ChannelBuffers.dynamicBuffer(channel.getConfig().getBufferFactory());
+ osa.setBuffer(buffer);
+ return buffer;
+ }
+}
Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/HotrodClientPipelaneFactory.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/HotrodClientPipelaneFactory.java (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/HotrodClientPipelaneFactory.java 2010-03-31 07:34:14 UTC (rev 1642)
@@ -0,0 +1,42 @@
+package org.infinispan.client.hotrod.impl.transport.netty;
+
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+import org.jboss.netty.channel.ChannelEvent;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.DefaultChannelPipeline;
+import org.jboss.netty.handler.codec.http.HttpRequestEncoder;
+import org.jboss.netty.handler.codec.http.HttpResponseDecoder;
+import static org.jboss.netty.channel.Channels.*;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public class HotrodClientPipelaneFactory implements ChannelPipelineFactory {
+
+ private static Log log = LogFactory.getLog(HotrodClientPipelaneFactory.class);
+
+ private HotrodClientDecoder decoder;
+
+ public HotrodClientPipelaneFactory(HotrodClientDecoder decoder) {
+ this.decoder = decoder;
+ }
+
+ @Override
+ public ChannelPipeline getPipeline() throws Exception {
+ ChannelPipeline pipeline = new DefaultChannelPipeline() {
+ @Override
+ protected void notifyHandlerException(ChannelEvent e, Throwable t) {
+ log.warn("Exception on event: " + e, t);
+ super.notifyHandlerException(e, t);
+ }
+ };
+ pipeline.addLast("decoder", decoder);
+ pipeline.addLast("encoder", new HotrodClientEncoder());
+ return pipeline;
+ }
+}
Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/InputStreamAdapter.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/InputStreamAdapter.java (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/InputStreamAdapter.java 2010-03-31 07:34:14 UTC (rev 1642)
@@ -0,0 +1,36 @@
+package org.infinispan.client.hotrod.impl.transport.netty;
+
+import org.infinispan.client.hotrod.impl.transport.TransportException;
+import org.jboss.netty.buffer.ChannelBuffer;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public class InputStreamAdapter extends InputStream {
+
+ private final ChannelBuffer buffer;
+
+ public InputStreamAdapter(ChannelBuffer buffer) {
+ this.buffer = buffer;
+ }
+
+ @Override
+ public int read() throws IOException {
+ synchronized (buffer) {
+ if (buffer.readableBytes() == 0) {
+ try {
+ buffer.wait();
+ } catch (InterruptedException e) {
+ throw new TransportException(e);
+ }
+ }
+ return buffer.readUnsignedByte();
+ }
+ }
+}
Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransport.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransport.java (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransport.java 2010-03-31 07:34:14 UTC (rev 1642)
@@ -0,0 +1,106 @@
+package org.infinispan.client.hotrod.impl.transport.netty;
+
+import org.infinispan.client.hotrod.impl.AbstractTransport;
+import org.infinispan.client.hotrod.impl.Transport;
+import org.infinispan.client.hotrod.impl.transport.TransportException;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.Executors;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public class NettyTransport extends AbstractTransport {
+
+ private InetSocketAddress serverAddress;
+ private Channel channel;
+ private ChannelFuture lastWrite;
+
+ private HotrodClientDecoder decoder = new HotrodClientDecoder();
+
+ public NettyTransport(InetSocketAddress serverAddress) {
+ this.serverAddress = serverAddress;
+ init();
+ }
+
+ private void init() {
+ // Configure the client.
+ ClientBootstrap bootstrap = new ClientBootstrap(
+ new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));
+
+ // Set up the event pipeline factory.
+ bootstrap.setPipelineFactory(new HotrodClientPipelaneFactory(decoder));
+
+ // Start the connection attempt.
+ ChannelFuture future = bootstrap.connect(serverAddress);
+
+ // Wait until the connection attempt succeeds or fails.
+ channel = future.awaitUninterruptibly().getChannel();
+ if (!future.isSuccess()) {
+ bootstrap.releaseExternalResources();
+ throw new TransportException("Coukd not create netty transport", future.getCause());
+ }
+ }
+
+ @Override
+ protected void writeBuffer(byte[] toAppend) {
+ channel.write(toAppend);
+ }
+
+ @Override
+ public void writeByte(short toWrite) {
+ lastWrite = channel.write(toWrite);
+ }
+
+ @Override
+ public void writeVInt(int length) {
+ lastWrite = channel.write(length);
+ }
+
+ @Override
+ public void writeVLong(long l) {
+ lastWrite = channel.write(l);
+ }
+
+
+ @Override
+ public void flush() {
+ try {
+ lastWrite.await();
+ } catch (InterruptedException e) {
+ throw new TransportException(e);
+ }
+ }
+
+ @Override
+ public long readVLong() {
+ return decoder.readVLong();
+ }
+
+ @Override
+ public int readVInt() {
+ return decoder.readVInt();
+ }
+
+ @Override
+ public short readByte() {
+ return decoder.readByte();
+ }
+
+ @Override
+ public void release() {
+ // TODO: Customise this generated block
+ }
+
+ @Override
+ protected void readBuffer(byte[] bufferToFill) {
+ decoder.fillBuffer(bufferToFill);
+ }
+}
Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransportFactory.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransportFactory.java (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransportFactory.java 2010-03-31 07:34:14 UTC (rev 1642)
@@ -0,0 +1,33 @@
+package org.infinispan.client.hotrod.impl.transport.netty;
+
+import org.infinispan.client.hotrod.impl.Transport;
+import org.infinispan.client.hotrod.impl.transport.AbstractTransportFactory;
+
+import java.net.InetSocketAddress;
+import java.util.Properties;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public class NettyTransportFactory extends AbstractTransportFactory {
+ private InetSocketAddress serverAddr;
+
+ @Override
+ public void init(Properties props) {
+ super.init(props);
+ serverAddr = new InetSocketAddress(serverHost, serverPort);
+ }
+
+ @Override
+ public Transport getTransport() {
+ return new NettyTransport(serverAddr);
+ }
+
+ @Override
+ public void destroy() {
+ // TODO: Customise this generated block
+ }
+}
Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/OutputStreamAdapter.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/OutputStreamAdapter.java (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/OutputStreamAdapter.java 2010-03-31 07:34:14 UTC (rev 1642)
@@ -0,0 +1,27 @@
+package org.infinispan.client.hotrod.impl.transport.netty;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public class OutputStreamAdapter extends OutputStream {
+
+ ChannelBuffer buffer;
+
+
+ public void setBuffer(ChannelBuffer buffer) {
+ this.buffer = buffer;
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ buffer.writeByte(b);
+ }
+}
Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransport.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransport.java 2010-03-30 13:38:37 UTC (rev 1641)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransport.java 2010-03-31 07:34:14 UTC (rev 1642)
@@ -1,5 +1,6 @@
package org.infinispan.client.hotrod.impl.transport.tcp;
+import org.infinispan.client.hotrod.impl.AbstractTransport;
import org.infinispan.client.hotrod.impl.Transport;
import org.infinispan.client.hotrod.impl.transport.TransportException;
import org.infinispan.client.hotrod.impl.transport.VHelper;
@@ -14,7 +15,7 @@
* @author mmarkus
* @since 4.1
*/
-public class TcpTransport implements Transport {
+public class TcpTransport extends AbstractTransport {
public static final Logger log = Logger.getLogger(TcpTransport.class.getName());
@@ -22,14 +23,6 @@
private int port;
private Socket socket;
- public void appendUnsignedByte(short requestMagic) {
- try {
- socket.getOutputStream().write(requestMagic);
- } catch (IOException e) {
- throw new TransportException(e);
- }
- }
-
public void writeVInt(int length) {
try {
VHelper.writeVInt(length, socket.getOutputStream());
@@ -75,9 +68,8 @@
}
}
- public void writeBytesArray(byte... toAppend) {
+ protected void writeBuffer(byte[] toAppend) {
try {
- writeVInt(toAppend.length);
socket.getOutputStream().write(toAppend);
} catch (IOException e) {
throw new TransportException("Problems writing data to stream", e);
@@ -122,7 +114,7 @@
}
}
- public byte[] readByteArray(byte[] bufferToFill) {
+ protected void readBuffer(byte[] bufferToFill) {
int size;
try {
size = socket.getInputStream().read(bufferToFill);
@@ -135,13 +127,5 @@
if (size != bufferToFill.length) {
throw new TransportException("Expected " + bufferToFill.length + " bytes but only could read " + size + " bytes!");
}
- return bufferToFill;
}
-
- @Override
- public String readString() {
- long strLength = readVLong();
- byte[] strContent = readByteArray(new byte[(int)strLength]);
- return new String(strContent);//todo take care of encoding here
- }
}
Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java 2010-03-30 13:38:37 UTC (rev 1641)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java 2010-03-31 07:34:14 UTC (rev 1642)
@@ -2,6 +2,7 @@
import org.infinispan.client.hotrod.impl.Transport;
import org.infinispan.client.hotrod.impl.TransportFactory;
+import org.infinispan.client.hotrod.impl.transport.AbstractTransportFactory;
import java.util.Properties;
import java.util.StringTokenizer;
@@ -12,30 +13,13 @@
* @author Mircea.Markus at jboss.com
* @since 4.1
*/
-public class TcpTransportFactory implements TransportFactory {
+public class TcpTransportFactory extends AbstractTransportFactory {
- private String serverHost;
- private int serverPort;
-
- public void init(Properties props) {
- String servers = props.getProperty("hotrod-servers");
- StringTokenizer tokenizer = new StringTokenizer(servers,";");
- String server = tokenizer.nextToken();
- String[] serverDef = tokenizeServer(server);
- serverHost = serverDef[0];
- serverPort = Integer.parseInt(serverDef[1]);
- }
-
@Override
public void destroy() {
// TODO: Customise this generated block
}
- private String[] tokenizeServer(String server) {
- StringTokenizer t = new StringTokenizer(server, ":");
- return new String[] {t.nextToken(), t.nextToken()};
- }
-
@Override
public Transport getTransport() {
TcpTransport transport = new TcpTransport(serverHost, serverPort);
More information about the infinispan-commits
mailing list