[infinispan-commits] Infinispan SVN: r1642 - in trunk/client/hotrod-client: src/main/java/org/infinispan/client/hotrod and 4 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Wed Mar 31 03:34:15 EDT 2010


Author: mircea.markus
Date: 2010-03-31 03:34:14 -0400 (Wed, 31 Mar 2010)
New Revision: 1642

Added:
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/AbstractTransport.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/AbstractTransportFactory.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/HotrodClientDecoder.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/HotrodClientEncoder.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/HotrodClientPipelaneFactory.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/InputStreamAdapter.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransport.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransportFactory.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/OutputStreamAdapter.java
Modified:
   trunk/client/hotrod-client/pom.xml
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/Flag.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodConstants.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperationsImpl.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/Transport.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransport.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java
Log:
ongoing work on hotrod client

Modified: trunk/client/hotrod-client/pom.xml
===================================================================
--- trunk/client/hotrod-client/pom.xml	2010-03-30 13:38:37 UTC (rev 1641)
+++ trunk/client/hotrod-client/pom.xml	2010-03-31 07:34:14 UTC (rev 1642)
@@ -9,7 +9,11 @@
       <version>4.1.0-SNAPSHOT</version>
       <relativePath>../../parent/pom.xml</relativePath>
    </parent>
+   <properties>
+      <version.netty>3.2.0.BETA1</version.netty>
+   </properties>
 
+
    <artifactId>infinispan-client-hotrod</artifactId>
    <name>Infinispan Client Hotrod Module</name>
    <description>Infinispan client hotrod module</description>
@@ -45,5 +49,11 @@
          <scope>test</scope>
       </dependency>
 
+      <dependency>
+         <groupId>org.jboss.netty</groupId>
+         <artifactId>netty</artifactId>
+         <version>${version.netty}</version>
+      </dependency>
+
    </dependencies>
 </project>

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/Flag.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/Flag.java	2010-03-30 13:38:37 UTC (rev 1641)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/Flag.java	2010-03-31 07:34:14 UTC (rev 1642)
@@ -7,7 +7,7 @@
  * @since 4.1
  */
 public enum Flag {
-   FORCE_RETURN_VALUE(0x0800);
+   FORCE_RETURN_VALUE(0x0001);
 
    private int flagInt;
 

Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/AbstractTransport.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/AbstractTransport.java	                        (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/AbstractTransport.java	2010-03-31 07:34:14 UTC (rev 1642)
@@ -0,0 +1,36 @@
+package org.infinispan.client.hotrod.impl;
+
+import org.infinispan.client.hotrod.impl.transport.TransportException;
+
+import java.io.IOException;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public abstract class AbstractTransport implements Transport {
+
+   public byte[] readByteArray() {
+         int responseLength = readVInt();
+         byte[] bufferToFill = new byte[responseLength];
+         readBuffer(bufferToFill);
+         return bufferToFill;
+   }
+
+   @Override
+   public String readString() {
+      byte[] strContent = readByteArray();
+      return new String(strContent);//todo take care of encoding here
+   }
+
+   protected abstract void readBuffer(byte[] bufferToFill);
+
+   public void writeByteArray(byte[] toAppend) {
+      writeVInt(toAppend.length);
+      writeBuffer(toAppend);
+   }
+
+   protected abstract void writeBuffer(byte[] toAppend);
+}

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodConstants.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodConstants.java	2010-03-30 13:38:37 UTC (rev 1641)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodConstants.java	2010-03-31 07:34:14 UTC (rev 1642)
@@ -1,7 +1,5 @@
 package org.infinispan.client.hotrod.impl;
 
-import java.util.EnumMap;
-
 /**
  * // TODO: Document this
  *

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperationsImpl.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperationsImpl.java	2010-03-30 13:38:37 UTC (rev 1641)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperationsImpl.java	2010-03-31 07:34:14 UTC (rev 1642)
@@ -34,7 +34,7 @@
             return null;
          }
          if (status == NO_ERROR_STATUS) {
-            return readValue(transport);
+            return transport.readByteArray();
          }
       } finally {
          transport.release();
@@ -81,7 +81,7 @@
          }
          if (status == NO_ERROR_STATUS) {
             long version = transport.readVLong();
-            byte[] value = readValue(transport);
+            byte[] value = transport.readByteArray();
             return new BinaryVersionedValue(version, value);
          }
       } finally {
@@ -150,7 +150,7 @@
    private short sendKeyOperation(byte[] key, Transport transport, byte opCode, Flag[] flags, byte opRespCode) {
       // 1) write [header][key length][key]
       long messageId = writeHeader(transport, opCode, flags);
-      transport.writeBytesArray(key);
+      transport.writeByteArray(key);
       transport.flush();
 
       // 2) now read the header
@@ -174,10 +174,10 @@
       long messageId = writeHeader(transport, opCode, flags);
 
       // 2) write key and value
-      transport.writeBytesArray(key);
+      transport.writeByteArray(key);
       transport.writeVInt(lifespan);
       transport.writeVInt(maxIdle);
-      transport.writeBytesArray(value);
+      transport.writeByteArray(value);
       transport.flush();
 
       // 3) now read header
@@ -193,12 +193,12 @@
     */
 
    private long writeHeader(Transport transport, short operationCode, Flag... flags) {
-      transport.appendUnsignedByte(REQUEST_MAGIC);
+      transport.writeByte(REQUEST_MAGIC);
       long messageId = MSG_ID.incrementAndGet();
       transport.writeVLong(messageId);
       transport.writeByte(HOTROD_VERSION);
       transport.writeByte(operationCode);
-      transport.writeBytesArray(cacheNameBytes);
+      transport.writeByteArray(cacheNameBytes);
       int flagInt = 0;
       if (flags != null) {
          for (Flag flag : flags) {
@@ -259,15 +259,6 @@
       }
    }
 
-   /**
-    * Reads the length og the byte array and then the byte array from the transport.
-    */
-   private byte[] readValue(Transport transport) {
-      int responseLength = transport.readVInt();
-      byte[] result = new byte[responseLength];
-      return transport.readByteArray(result);
-   }
-
    private boolean hasForceReturn(Flag[] flags) {
       if (flags == null) return false;
       for (Flag flag : flags) {
@@ -277,6 +268,6 @@
    }
 
    private byte[] returnPossiblePrevValue(Transport transport, Flag[] flags) {
-      return hasForceReturn(flags) ? readValue(transport) : null;
+      return hasForceReturn(flags) ? transport.readByteArray() : null;
    }
 }

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/Transport.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/Transport.java	2010-03-30 13:38:37 UTC (rev 1641)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/Transport.java	2010-03-31 07:34:14 UTC (rev 1642)
@@ -8,15 +8,10 @@
  */
 public interface Transport {
 
-   public void writeBytesArray(byte... toAppend);
+   public void writeByteArray(byte[] toAppend);
 
    public void writeByte(short toWrite);
 
-   /**
-    * Treats the tailing byte as an unsigned byte.
-    */
-   public void appendUnsignedByte(short requestMagic);
-
    public void writeVInt(int length);
 
    public void writeVLong(long l);
@@ -33,9 +28,8 @@
 
    /**
     * reads an vint which is size; then an array having that size.
-    * @param bufferToFill
     */
-   public byte[] readByteArray(byte[] bufferToFill);
+   public byte[] readByteArray();
 
    String readString();
 }

Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/AbstractTransportFactory.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/AbstractTransportFactory.java	                        (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/AbstractTransportFactory.java	2010-03-31 07:34:14 UTC (rev 1642)
@@ -0,0 +1,31 @@
+package org.infinispan.client.hotrod.impl.transport;
+
+import org.infinispan.client.hotrod.impl.TransportFactory;
+
+import java.util.Properties;
+import java.util.StringTokenizer;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public abstract class AbstractTransportFactory implements TransportFactory {
+   protected String serverHost;
+   protected int serverPort;
+
+   public void init(Properties props) {
+      String servers = props.getProperty("hotrod-servers");
+      StringTokenizer tokenizer = new StringTokenizer(servers,";");
+      String server = tokenizer.nextToken();
+      String[] serverDef = tokenizeServer(server);
+      serverHost = serverDef[0];
+      serverPort = Integer.parseInt(serverDef[1]);
+   }
+
+   private String[] tokenizeServer(String server) {
+      StringTokenizer t = new StringTokenizer(server, ":");
+      return new String[] {t.nextToken(), t.nextToken()};
+   }
+}

Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/HotrodClientDecoder.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/HotrodClientDecoder.java	                        (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/HotrodClientDecoder.java	2010-03-31 07:34:14 UTC (rev 1642)
@@ -0,0 +1,66 @@
+package org.infinispan.client.hotrod.impl.transport.netty;
+
+import org.infinispan.client.hotrod.impl.transport.TransportException;
+import org.infinispan.client.hotrod.impl.transport.VHelper;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.frame.FrameDecoder;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public class HotrodClientDecoder extends FrameDecoder {
+
+   private final ChannelBuffer buffer = ChannelBuffers.dynamicBuffer();
+   private final InputStreamAdapter isa = new InputStreamAdapter(buffer);
+
+   @Override
+   protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
+      synchronized (this.buffer) {
+         this.buffer.writeBytes(buffer);
+         if (this.buffer.readableBytes() > 0) {
+            this.buffer.notify();
+         }
+      }
+      return null;
+   }
+
+   public long readVLong() {
+      return VHelper.readVLong(isa);
+   }
+
+   public int readVInt() {
+      return VHelper.readVInt(isa);
+   }
+
+   public void fillBuffer(byte[] bufferToFill) {
+      synchronized (buffer) {
+         if (buffer.readableBytes() < bufferToFill.length) {
+            try {
+               buffer.wait();
+            } catch (InterruptedException e) {
+               throw new TransportException(e);
+            }
+         }
+         buffer.readBytes(bufferToFill);
+      }
+   }
+
+   public short readByte() {
+      synchronized (buffer) {
+         if (!buffer.readable()) {
+            try {
+               buffer.wait();
+            } catch (InterruptedException e) {
+               throw new TransportException(e);
+            }
+         }
+         return buffer.readUnsignedByte();
+      }
+   }
+}

Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/HotrodClientEncoder.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/HotrodClientEncoder.java	                        (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/HotrodClientEncoder.java	2010-03-31 07:34:14 UTC (rev 1642)
@@ -0,0 +1,54 @@
+package org.infinispan.client.hotrod.impl.transport.netty;
+
+import org.infinispan.client.hotrod.impl.transport.TransportException;
+import org.infinispan.client.hotrod.impl.transport.VHelper;
+import org.infinispan.io.ExposedByteArrayOutputStream;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
+
+import static org.jboss.netty.buffer.ChannelBuffers.*;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public class HotrodClientEncoder extends OneToOneEncoder {
+
+   private OutputStreamAdapter osa = new OutputStreamAdapter();
+
+   @Override
+   protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
+      if (msg instanceof byte[]) {
+         return wrappedBuffer((byte[])msg);
+      } else if (msg instanceof Integer) {
+         int intMsg = (Integer) msg;
+         ChannelBuffer buffer = getBuffer(channel);
+         VHelper.writeVInt(intMsg, osa);
+         return buffer;
+      } else if (msg instanceof Long) {
+         ChannelBuffer buffer = getBuffer(channel);
+         long longMsg = (Long) msg;
+         VHelper.writeVLong(longMsg, osa);
+         return buffer;
+      } else if (msg instanceof Short) {
+         ChannelBuffer buffer = getBuffer(channel);
+         short byteMsg =  (Short) msg;
+         buffer.writeByte(byteMsg);
+         return buffer;
+      } else {
+         throw new TransportException("Unknown msg type: " + msg.getClass());
+      }
+   }
+
+
+   private ChannelBuffer getBuffer(Channel channel) {
+      ChannelBuffer buffer = ChannelBuffers.dynamicBuffer(channel.getConfig().getBufferFactory());
+      osa.setBuffer(buffer);
+      return buffer;
+   }
+}

Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/HotrodClientPipelaneFactory.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/HotrodClientPipelaneFactory.java	                        (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/HotrodClientPipelaneFactory.java	2010-03-31 07:34:14 UTC (rev 1642)
@@ -0,0 +1,42 @@
+package org.infinispan.client.hotrod.impl.transport.netty;
+
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+import org.jboss.netty.channel.ChannelEvent;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.DefaultChannelPipeline;
+import org.jboss.netty.handler.codec.http.HttpRequestEncoder;
+import org.jboss.netty.handler.codec.http.HttpResponseDecoder;
+import static org.jboss.netty.channel.Channels.*;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public class HotrodClientPipelaneFactory implements ChannelPipelineFactory {
+
+   private static Log log = LogFactory.getLog(HotrodClientPipelaneFactory.class);
+
+   private HotrodClientDecoder decoder;
+
+   public HotrodClientPipelaneFactory(HotrodClientDecoder decoder) {
+      this.decoder = decoder;
+   }
+
+   @Override
+   public ChannelPipeline getPipeline() throws Exception {
+      ChannelPipeline pipeline = new DefaultChannelPipeline() {
+         @Override
+         protected void notifyHandlerException(ChannelEvent e, Throwable t) {
+            log.warn("Exception on event: " + e, t);
+            super.notifyHandlerException(e, t);
+         }
+      };
+      pipeline.addLast("decoder", decoder);
+      pipeline.addLast("encoder", new HotrodClientEncoder());
+      return pipeline;
+   }
+}

Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/InputStreamAdapter.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/InputStreamAdapter.java	                        (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/InputStreamAdapter.java	2010-03-31 07:34:14 UTC (rev 1642)
@@ -0,0 +1,36 @@
+package org.infinispan.client.hotrod.impl.transport.netty;
+
+import org.infinispan.client.hotrod.impl.transport.TransportException;
+import org.jboss.netty.buffer.ChannelBuffer;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public class InputStreamAdapter extends InputStream {
+
+   private final ChannelBuffer buffer;
+
+   public InputStreamAdapter(ChannelBuffer buffer) {
+      this.buffer = buffer;
+   }
+
+   @Override
+   public int read() throws IOException {
+      synchronized (buffer) {
+         if (buffer.readableBytes() == 0) {
+            try {
+               buffer.wait();
+            } catch (InterruptedException e) {
+               throw new TransportException(e);
+            }
+         }
+         return buffer.readUnsignedByte();
+      }
+   }
+}

Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransport.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransport.java	                        (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransport.java	2010-03-31 07:34:14 UTC (rev 1642)
@@ -0,0 +1,106 @@
+package org.infinispan.client.hotrod.impl.transport.netty;
+
+import org.infinispan.client.hotrod.impl.AbstractTransport;
+import org.infinispan.client.hotrod.impl.Transport;
+import org.infinispan.client.hotrod.impl.transport.TransportException;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.Executors;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public class NettyTransport extends AbstractTransport {
+
+   private InetSocketAddress serverAddress;
+   private Channel channel;
+   private ChannelFuture lastWrite;
+
+   private HotrodClientDecoder decoder = new HotrodClientDecoder();
+
+   public NettyTransport(InetSocketAddress serverAddress) {
+      this.serverAddress = serverAddress;
+      init();
+   }
+
+   private void init() {
+      // Configure the client.
+      ClientBootstrap bootstrap = new ClientBootstrap(
+            new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));
+
+      // Set up the event pipeline factory.
+      bootstrap.setPipelineFactory(new HotrodClientPipelaneFactory(decoder));
+
+      // Start the connection attempt.
+      ChannelFuture future = bootstrap.connect(serverAddress);
+
+      // Wait until the connection attempt succeeds or fails.
+      channel = future.awaitUninterruptibly().getChannel();
+      if (!future.isSuccess()) {
+         bootstrap.releaseExternalResources();
+         throw new TransportException("Coukd not create netty transport", future.getCause());
+      }
+   }
+
+   @Override
+   protected void writeBuffer(byte[] toAppend) {
+      channel.write(toAppend);
+   }
+
+   @Override
+   public void writeByte(short toWrite) {
+      lastWrite = channel.write(toWrite);
+   }
+
+   @Override
+   public void writeVInt(int length) {
+      lastWrite = channel.write(length);
+   }
+
+   @Override
+   public void writeVLong(long l) {
+      lastWrite = channel.write(l);
+   }
+
+
+   @Override
+   public void flush() {
+      try {
+         lastWrite.await();
+      } catch (InterruptedException e) {
+         throw new TransportException(e);
+      }
+   }
+
+   @Override
+   public long readVLong() {
+      return decoder.readVLong();
+   }
+
+   @Override
+   public int readVInt() {
+      return decoder.readVInt();
+   }
+
+   @Override
+   public short readByte() {
+      return decoder.readByte();
+   }
+
+   @Override
+   public void release() {
+      // TODO: Customise this generated block
+   }
+
+   @Override
+   protected void readBuffer(byte[] bufferToFill) {
+      decoder.fillBuffer(bufferToFill);
+   }
+}

Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransportFactory.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransportFactory.java	                        (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransportFactory.java	2010-03-31 07:34:14 UTC (rev 1642)
@@ -0,0 +1,33 @@
+package org.infinispan.client.hotrod.impl.transport.netty;
+
+import org.infinispan.client.hotrod.impl.Transport;
+import org.infinispan.client.hotrod.impl.transport.AbstractTransportFactory;
+
+import java.net.InetSocketAddress;
+import java.util.Properties;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public class NettyTransportFactory extends AbstractTransportFactory {
+   private InetSocketAddress serverAddr;
+
+   @Override
+   public void init(Properties props) {
+      super.init(props);
+      serverAddr = new InetSocketAddress(serverHost, serverPort);
+   }
+
+   @Override
+   public Transport getTransport() {
+      return new NettyTransport(serverAddr);
+   }
+
+   @Override
+   public void destroy() {
+      // TODO: Customise this generated block
+   }
+}

Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/OutputStreamAdapter.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/OutputStreamAdapter.java	                        (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/OutputStreamAdapter.java	2010-03-31 07:34:14 UTC (rev 1642)
@@ -0,0 +1,27 @@
+package org.infinispan.client.hotrod.impl.transport.netty;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public class OutputStreamAdapter extends OutputStream {
+
+   ChannelBuffer buffer;
+
+
+   public void setBuffer(ChannelBuffer buffer) {
+      this.buffer = buffer;
+   }
+
+   @Override
+   public void write(int b) throws IOException {
+      buffer.writeByte(b);
+   }
+}

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransport.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransport.java	2010-03-30 13:38:37 UTC (rev 1641)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransport.java	2010-03-31 07:34:14 UTC (rev 1642)
@@ -1,5 +1,6 @@
 package org.infinispan.client.hotrod.impl.transport.tcp;
 
+import org.infinispan.client.hotrod.impl.AbstractTransport;
 import org.infinispan.client.hotrod.impl.Transport;
 import org.infinispan.client.hotrod.impl.transport.TransportException;
 import org.infinispan.client.hotrod.impl.transport.VHelper;
@@ -14,7 +15,7 @@
  * @author mmarkus
  * @since 4.1
  */
-public class TcpTransport implements Transport {
+public class TcpTransport extends AbstractTransport {
 
    public static final Logger log = Logger.getLogger(TcpTransport.class.getName());
 
@@ -22,14 +23,6 @@
    private int port;
    private Socket socket;
 
-   public void appendUnsignedByte(short requestMagic) {
-      try {
-         socket.getOutputStream().write(requestMagic);
-      } catch (IOException e) {
-         throw new TransportException(e);
-      }
-   }
-
    public void writeVInt(int length) {
       try {
          VHelper.writeVInt(length, socket.getOutputStream());
@@ -75,9 +68,8 @@
       }
    }
 
-   public void writeBytesArray(byte... toAppend) {
+   protected void writeBuffer(byte[] toAppend) {
       try {
-         writeVInt(toAppend.length);
          socket.getOutputStream().write(toAppend);
       } catch (IOException e) {
          throw new TransportException("Problems writing data to stream", e);
@@ -122,7 +114,7 @@
       }
    }
 
-   public byte[] readByteArray(byte[] bufferToFill) {
+   protected void readBuffer(byte[] bufferToFill)  {
       int size;
       try {
          size = socket.getInputStream().read(bufferToFill);
@@ -135,13 +127,5 @@
       if (size != bufferToFill.length) {
          throw new TransportException("Expected " + bufferToFill.length + " bytes but only could read " + size + " bytes!");
       }
-      return bufferToFill;
    }
-
-   @Override
-   public String readString() {
-      long strLength = readVLong();
-      byte[] strContent = readByteArray(new byte[(int)strLength]);
-      return new String(strContent);//todo take care of encoding here
-   }
 }

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java	2010-03-30 13:38:37 UTC (rev 1641)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java	2010-03-31 07:34:14 UTC (rev 1642)
@@ -2,6 +2,7 @@
 
 import org.infinispan.client.hotrod.impl.Transport;
 import org.infinispan.client.hotrod.impl.TransportFactory;
+import org.infinispan.client.hotrod.impl.transport.AbstractTransportFactory;
 
 import java.util.Properties;
 import java.util.StringTokenizer;
@@ -12,30 +13,13 @@
  * @author Mircea.Markus at jboss.com
  * @since 4.1
  */
-public class TcpTransportFactory implements TransportFactory {
+public class TcpTransportFactory extends AbstractTransportFactory {
 
-   private String serverHost;
-   private int serverPort;
-
-   public void init(Properties props) {
-      String servers = props.getProperty("hotrod-servers");
-      StringTokenizer tokenizer = new StringTokenizer(servers,";");
-      String server = tokenizer.nextToken();
-      String[] serverDef = tokenizeServer(server);
-      serverHost = serverDef[0];
-      serverPort = Integer.parseInt(serverDef[1]);
-   }
-
    @Override
    public void destroy() {
       // TODO: Customise this generated block
    }
 
-   private String[] tokenizeServer(String server) {
-      StringTokenizer t = new StringTokenizer(server, ":");
-      return new String[] {t.nextToken(), t.nextToken()};
-   }
-
    @Override
    public Transport getTransport() {
       TcpTransport transport = new TcpTransport(serverHost, serverPort);



More information about the infinispan-commits mailing list