[infinispan-commits] Infinispan SVN: r1634 - in trunk/client/hotrod-client/src/main/java: org and 7 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Mon Mar 29 11:41:39 EDT 2010


Author: mircea.markus
Date: 2010-03-29 11:41:37 -0400 (Mon, 29 Mar 2010)
New Revision: 1634

Added:
   trunk/client/hotrod-client/src/main/java/org/
   trunk/client/hotrod-client/src/main/java/org/infinispan/
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/Flag.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCache.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/Version.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/exceptions/
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/exceptions/ClientDisconnectedException.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/exceptions/HotRodClientException.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/exceptions/HotRodException.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/exceptions/InvalidResponseException.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/exceptions/TimeoutException.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/BinaryVersionedValue.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodConstants.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodMarshaller.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperations.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperationsImpl.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/RemoteCacheImpl.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/RemoteCacheSupport.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/SerializationMarshaller.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/Transport.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/TransportFactory.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/VersionedValueImpl.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/TransportException.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/VHelper.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransport.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java
Log:
ongoing work on hotrod client

Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/Flag.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/Flag.java	                        (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/Flag.java	2010-03-29 15:41:37 UTC (rev 1634)
@@ -0,0 +1,21 @@
+package org.infinispan.client.hotrod;
+
+/**
+ * // TODO: Document this
+ *
+ * @author mmarkus
+ * @since 4.1
+ */
+public enum Flag {
+   FORCE_RETURN_VALUE(0x0800);
+
+   private int flagInt;
+
+   Flag(int flagInt) {
+      this.flagInt = flagInt;
+   }
+
+   public int getFlagInt() {
+      return flagInt;
+   }
+}

Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCache.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCache.java	                        (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCache.java	2010-03-29 15:41:37 UTC (rev 1634)
@@ -0,0 +1,350 @@
+package org.infinispan.client.hotrod;
+
+import org.infinispan.AdvancedCache;
+import org.infinispan.Cache;
+import org.infinispan.config.Configuration;
+import org.infinispan.util.concurrent.NotifyingFuture;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Provides remote reference to a Hot Rod server/cluster. It implements {@link org.infinispan.Cache}, but given its
+ * nature (remote) some operations are not supported. All these unsupported operations are being overridden within this
+ * interface and documented as such.
+ * <p>
+ * <b>New operations</b>: besides the operations inherited from {@link org.infinispan.Cache}, RemoteCache also adds new
+ * operations to optimize/reduce network traffic: e.g. versioned put operation.
+ * <p>
+ * <b>Concurrency</b>: implementors of this interface will support multi-threaded access, similar to the way {@link
+ * org.infinispan.Cache} supports it.
+ * <p>
+ * <b>Return values</b>: previously existing values for certain {@link java.util.Map} operations are not returned,
+ * null is returned instead. E.g. {@link java.util.Map#put(Object, Object)} returns the previous value
+ * associated to the supplied key. In case of RemoteCache, this returns null.
+ * <p>
+ * <b>Synthetic operations</b>: aggregate operations are being implemented based on other Hot Rod operations.
+ * E.g. all the {@link java.util.Map#putAll(java.util.Map)} is implemented through multiple individual puts. This means
+ * that the these operations are not atomic and that they are costly, e.g. as the number of network round-trips is not
+ * one, but the size of the added map. All these synthetic operations are documented as such.
+ * <p>
+ * <b>changing default behavior through {@link org.infinispan.client.hotrod.Flag}s</b>: it is possible to change de default cache behaviour by using
+ * flags on an per invocation basis.
+ * E.g.
+ * <pre>
+ *      RemoteCache cache = getRemoteCache();
+ *      Object value = cache.withFlags(Flag.FORCE_RETURN_VALUE).get(aKey);
+ * </pre>
+ * In the previous example, using {@link org.infinispan.client.hotrod.Flag#FORCE_RETURN_VALUE} will make the client to also return previously
+ * existing value associated with <tt>aKey</tt>. If this flag would not be present, Infinispan would return (by default)
+ * <tt>null</tt>. This is in order to avoid fetching a possibly large object from the remote server, which might not be
+ * needed. The flags as set by the {@link org.infinispan.client.hotrod.RemoteCache#withFlags(Flag...)} operation only apply for the very next
+ * operation executed <b>by the same thread</b> on the RemoteCache.
+ * <p>
+ * <b><a href="http://community.jboss.org/wiki/Eviction">Eviction and expiration</a></b>:
+ * Unlike local {@link org.infinispan.Cache} cache, which allows specifying time values with any granularity (as defined by {@link TimeUnit}),
+ * HotRod only supports seconds as time units. If a different time unit is used instead, HotRod will transparently convert it to
+ * seconds, using {@link java.util.concurrent.TimeUnit#toSeconds(long)} method. This might result in loss of precision for
+ * values specified as nanos or milliseconds. <br/>
+ * Another fundamental difference is in the case of lifespan (naturally does NOT apply for max idle): If number of seconds is bigger than 30 days,
+ * this number of seconds is treated as UNIX time and so, represents the number of seconds since 1/1/1970. <br/>
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public interface RemoteCache<K, V> extends Cache<K, V> {
+
+
+   /**
+    * Removes the given entry only if its version matches the supplied version. A typical use case looks like this:
+    * <pre>
+    * VersionedEntry ve = remoteCache.getVersioned(key);
+    * //some processing
+    * remoteCache.remove(key, ve.getVersion();
+    * </pre>
+    * Lat call (remove) will make sure that the entry will only be removed if it hasn't been changed in between.
+    *
+    * @return true if the entry has been removed
+    * @see org.infinispan.client.hotrod.RemoteCache.VersionedValue
+    * @see #getVersioned(Object)
+    */
+   boolean remove(K key, long version);
+
+   /**
+    * @see #remove(Object, Object)
+    */
+   NotifyingFuture<Boolean> removeAsync(Object key, long version);
+
+   /**
+    * Removes the given value only if its version matches the supplied version. See {@link #remove(Object, long)} for a
+    * sample usage.
+    *
+    * @return true if the method has been replaced
+    * @see #getVersioned(Object)
+    * @see org.infinispan.client.hotrod.RemoteCache.VersionedValue
+    */
+   boolean replace(K key, V newValue, long version);
+
+   /**
+    * @see #replace(Object, Object, long)
+    */
+   boolean replace(K key, V newValue, long version, int lifespanSeconds);
+
+   /**
+    * @see #replace(Object, Object, long)
+    */
+   boolean replace(K key, V newValue, long version, int lifespanSeconds, int maxIdleTimeSeconds);
+
+   /**
+    * @see #replace(Object, Object, long)
+    */
+   NotifyingFuture<Boolean> replaceAsync(K key, V newValue, long version);
+
+   /**
+    * @see #replace(Object, Object, long)
+    */
+   NotifyingFuture<Boolean> replaceAsync(K key, V newValue, long version, int lifespanSeconds);
+
+   /**
+    * @see #replace(Object, Object, long)
+    */
+   NotifyingFuture<Boolean> replaceAsync(K key, V newValue, long version, int lifespanSeconds, int maxIdleSeconds);
+
+
+   /**
+    * Returns the {@link org.infinispan.client.hotrod.RemoteCache.VersionedValue} associated to the supplied key param, or null if it doesn't exist.
+    */
+   VersionedValue getVersioned(K key);
+
+
+   /**
+    * Operation might be supported for smart clients that will be able to register for topology changes.
+    *
+    * @throws UnsupportedOperationException
+    */
+   @Override
+   void addListener(Object listener);
+
+   /**
+    * @throws UnsupportedOperationException
+    * @see #addListener(Object)
+    */
+   @Override
+   void removeListener(Object listener);
+
+   /**
+    * @throws UnsupportedOperationException
+    * @see #addListener(Object)
+    */
+   @Override
+   Set<Object> getListeners();
+
+   /**
+    * @throws UnsupportedOperationException
+    */
+   @Override
+   int size();
+
+   /**
+    * @throws UnsupportedOperationException
+    */
+   @Override
+   boolean isEmpty();
+
+   /**
+    * @throws UnsupportedOperationException
+    */
+   @Override
+   boolean containsValue(Object value);
+
+   /**
+    * @throws UnsupportedOperationException
+    */
+   @Override
+   Set<K> keySet();
+
+   /**
+    * @throws UnsupportedOperationException
+    */
+   @Override
+   Collection<V> values();
+
+   /**
+    * @throws UnsupportedOperationException
+    */
+   @Override
+   Set<Entry<K, V>> entrySet();
+
+   /**
+    * @throws UnsupportedOperationException
+    */
+   @Override
+   void evict(K key);
+
+   /**
+    * @throws UnsupportedOperationException
+    */
+   @Override
+   Configuration getConfiguration();
+
+   /**
+    * @throws UnsupportedOperationException
+    */
+   @Override
+   boolean startBatch();
+
+   /**
+    * @throws UnsupportedOperationException
+    */
+   @Override
+   void endBatch(boolean successful);
+
+   /**
+    * This operation is not supported. Consider using {@link #remove(Object, long)} instead.
+    *
+    * @throws UnsupportedOperationException
+    */
+   @Override
+   boolean remove(Object key, Object value);
+
+   /**
+    * This operation is not supported. Consider using {@link #removeAsync(Object, long)} instead.
+    *
+    * @throws UnsupportedOperationException
+    */
+   @Override
+   NotifyingFuture<Boolean> removeAsync(Object key, Object value);
+
+   /**
+    * This operation is not supported. Consider using {@link #replace(Object, Object, long)} instead.
+    *
+    * @throws UnsupportedOperationException
+    */
+   @Override
+   boolean replace(K key, V oldValue, V newValue);
+
+   /**
+    * This operation is not supported. Consider using {@link #replace(K,V,long,int)} instead.
+    *
+    * @throws UnsupportedOperationException
+    */
+   @Override
+   boolean replace(K key, V oldValue, V value, long lifespan, TimeUnit unit);
+
+   /**
+    * This operation is not supported. Consider using {@link #replace(K,V,long,int,int)} instead.
+    *
+    * @throws UnsupportedOperationException
+    */
+   @Override
+   boolean replace(K key, V oldValue, V value, long lifespan, TimeUnit lifespanUnit, long maxIdleTime, TimeUnit maxIdleTimeUnit);
+
+   /**
+    * This operation is not supported. Consider using {@link #replaceAsync(Object, Object, long)} instead.
+    *
+    * @throws UnsupportedOperationException
+    */
+   @Override
+   NotifyingFuture<Boolean> replaceAsync(K key, V oldValue, V newValue);
+
+   /**
+    * This operation is not supported. Consider using {@link #replaceAsync(K,V,long,int)}
+    * instead.
+    *
+    * @throws UnsupportedOperationException
+    */
+   @Override
+   NotifyingFuture<Boolean> replaceAsync(K key, V oldValue, V newValue, long lifespan, TimeUnit unit);
+
+   /**
+    * This operation is not supported. Consider using {@link #replaceAsync(K,V,long,int,int)} instead.
+    *
+    * @throws UnsupportedOperationException
+    */
+   @Override
+   NotifyingFuture<Boolean> replaceAsync(K key, V oldValue, V newValue, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit);
+
+   /**
+    * This operation is not supported.
+    *
+    * @throws UnsupportedOperationException
+    */
+   @Override
+   AdvancedCache<K, V> getAdvancedCache();
+
+   /**
+    * This operation is not supported.
+    *
+    * @throws UnsupportedOperationException
+    */
+   @Override
+   void compact();
+
+   /**
+    * This operation is not supported.
+    *
+    * @throws UnsupportedOperationException
+    */
+   @Override
+   void putForExternalRead(K key, V value);
+
+   /**
+    * Synthetic operation. The client iterates over the set of keys and calls put for each one of them. This results in
+    * operation not being atomic (if a failure happens after few puts it is not rolled back) and costly (for each key in
+    * the parameter map a remote call is performed).
+    */
+   @Override
+   void putAll(Map<? extends K, ? extends V> map, long lifespan, TimeUnit unit);
+
+   /**
+    * Synthetic operation.
+    *
+    * @see #putAll(java.util.Map, long, java.util.concurrent.TimeUnit)
+    */
+   @Override
+   void putAll(Map<? extends K, ? extends V> map, long lifespan, TimeUnit lifespanUnit, long maxIdleTime, TimeUnit maxIdleTimeUnit);
+
+   /**
+    * Synthetic operation.
+    *
+    * @see #putAll(java.util.Map, long, java.util.concurrent.TimeUnit)
+    */
+   @Override
+   NotifyingFuture<Void> putAllAsync(Map<? extends K, ? extends V> data);
+
+   /**
+    * Synthetic operation.
+    *
+    * @see #putAll(java.util.Map, long, java.util.concurrent.TimeUnit)
+    */
+   @Override
+   NotifyingFuture<Void> putAllAsync(Map<? extends K, ? extends V> data, long lifespan, TimeUnit unit);
+
+   /**
+    * Synthetic operation.
+    *
+    * @see #putAll(java.util.Map, long, java.util.concurrent.TimeUnit)
+    */
+   @Override
+   NotifyingFuture<Void> putAllAsync(Map<? extends K, ? extends V> data, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit);
+
+   /**
+    * Synthetic operation.
+    *
+    * @see #putAll(java.util.Map, long, java.util.concurrent.TimeUnit)
+    */
+   @Override
+   void putAll(Map<? extends K, ? extends V> m);
+
+   /**
+    * Besides the key and value, also contains an version. To be used in versioned operations, e.g. {@link RemoteCache#remove(Object, long)}.
+    */
+   public static interface VersionedValue<V> {
+      public long getVersion();
+      public V getValue();
+   }
+
+   RemoteCache withFlags(Flag... flags);
+}

Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java	                        (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java	2010-03-29 15:41:37 UTC (rev 1634)
@@ -0,0 +1,139 @@
+package org.infinispan.client.hotrod;
+
+import org.infinispan.client.hotrod.exceptions.HotRodClientException;
+import org.infinispan.client.hotrod.impl.HotrodOperations;
+import org.infinispan.client.hotrod.impl.HotrodOperationsImpl;
+import org.infinispan.client.hotrod.impl.HotrodMarshaller;
+import org.infinispan.client.hotrod.impl.RemoteCacheImpl;
+import org.infinispan.client.hotrod.impl.SerializationMarshaller;
+import org.infinispan.client.hotrod.impl.TransportFactory;
+import org.infinispan.lifecycle.Lifecycle;
+import org.infinispan.manager.CacheContainer;
+import org.infinispan.manager.DefaultCacheManager;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.util.Properties;
+
+/**
+ * // TODO: Document this
+ *
+ * @author mmarkus
+ * @since 4.1
+ */
+public class RemoteCacheManager implements CacheContainer, Lifecycle {
+
+   private Properties props;
+   private TransportFactory transportFactory;
+   private String hotrodMarshaller;
+
+
+   /**
+    * Build a cache manager based on supplied given properties.
+    * TODO - add a list of all possible configuration parameters here
+    */
+   public RemoteCacheManager(Properties props, boolean start) {
+      this.props = props;
+      if (start) start();
+   }
+
+   /**
+    * Same as {@link #RemoteCacheManager(java.util.Properties, boolean)}, and it also starts the cache (start==true).
+    */
+   public RemoteCacheManager(Properties props) {
+      this.props = props;
+   }
+
+   /**
+    * Same as {@link #RemoteCacheManager(java.util.Properties)}, but it will try to lookup the config properties in the
+    * classpath, in a file named <tt>hotrod-client.properties</tt>.
+    * @param start weather or not to start the RemoteCacheManager
+    * @throws HotRodClientException if such a file cannot be found in the classpath
+    */
+   public RemoteCacheManager(boolean start) {
+      ClassLoader loader = Thread.currentThread().getContextClassLoader();
+      InputStream stream = loader.getResourceAsStream("hotrod-client.properties");
+      loadFromStream(stream);
+      if (start) start();
+   }
+
+   /**
+    * Same as {@link #RemoteCacheManager(boolean)} and it also starts the cache.
+    */
+   public RemoteCacheManager() {
+      this(true);
+   }
+
+   /**
+    * Same as {@link #RemoteCacheManager(java.util.Properties)}, but it will try to lookup the config properties in
+    * supplied URL.
+    * @param start weather or not to start the RemoteCacheManager 
+    * @throws HotRodClientException if properties could not be loaded
+    */
+   public RemoteCacheManager(URL config, boolean start) {
+      try {
+         loadFromStream(config.openStream());
+      } catch (IOException e) {
+         throw new HotRodClientException("Could not read URL:" + config, e);
+      }
+      if (start)
+         start();
+   }
+
+   /**
+    * Same as {@link #RemoteCacheManager(java.net.URL)} and it also starts the cache (start==true).
+    * @param config
+    */
+   public RemoteCacheManager(URL config) {
+      this(config, true);
+   }
+   
+
+   private void loadFromStream(InputStream stream) {
+      props = new Properties();
+      try {
+         props.load(stream);
+      } catch (IOException e) {
+         throw new HotRodClientException("Issues configuring from client hotrod-client.properties",e);
+      }
+   }
+
+   public <K,V> RemoteCache<K,V> getCache(String cacheName) {
+      return createRemoteCache(cacheName);
+   }
+
+   public <K,V> RemoteCache<K,V> getCache() {
+      return createRemoteCache(DefaultCacheManager.DEFAULT_CACHE_NAME);
+   }
+
+   @Override
+   public void start() {
+      String factory = props.getProperty("transport-factory"); //todo switch to Netty transport by default (second param)
+      if (factory == null)
+         throw new IllegalStateException("Missing required property: transport-factory!");
+      transportFactory = (TransportFactory) newInstance(factory);
+      transportFactory.init(props);
+      hotrodMarshaller = props.getProperty("marshaller", SerializationMarshaller.class.getName());
+   }
+
+   @Override
+   public void stop() {
+      transportFactory.destroy();
+   }
+
+   private Object newInstance(String clazz) {
+      ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
+      try {
+         return ((Class<?>) contextClassLoader.loadClass(clazz)).newInstance();
+      } catch (Exception e) {
+         throw new HotRodClientException("Could not instantiate class: " + clazz, e);
+      }
+   }
+
+   private <K, V> RemoteCache<K, V> createRemoteCache(String cacheName) {
+      HotrodMarshaller marshaller = (HotrodMarshaller) newInstance(hotrodMarshaller);
+      HotrodOperations hotrodOperations = new HotrodOperationsImpl(cacheName, transportFactory);
+      return new RemoteCacheImpl<K, V>(hotrodOperations, marshaller, cacheName);
+   }
+}

Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/Version.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/Version.java	                        (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/Version.java	2010-03-29 15:41:37 UTC (rev 1634)
@@ -0,0 +1,16 @@
+package org.infinispan.client.hotrod;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public class Version {
+
+   private static final String PROTOCOL_VERSION = "1.0"; 
+
+   public static String getProtocolVersion() {
+      return "HotRod client, protocol version :" + PROTOCOL_VERSION;
+   }
+}

Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/exceptions/ClientDisconnectedException.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/exceptions/ClientDisconnectedException.java	                        (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/exceptions/ClientDisconnectedException.java	2010-03-29 15:41:37 UTC (rev 1634)
@@ -0,0 +1,13 @@
+package org.infinispan.client.hotrod.exceptions;
+
+/**
+ * // TODO: Document this
+ *
+ * @author mmarkus
+ * @since 4.1
+ */
+public class ClientDisconnectedException extends HotRodException {
+   public ClientDisconnectedException() {
+      super("Cannot call a method on the remote cache after RemoteCacheFactory.stop() has been called.");
+   }
+}

Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/exceptions/HotRodClientException.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/exceptions/HotRodClientException.java	                        (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/exceptions/HotRodClientException.java	2010-03-29 15:41:37 UTC (rev 1634)
@@ -0,0 +1,33 @@
+package org.infinispan.client.hotrod.exceptions;
+
+import org.infinispan.client.hotrod.exceptions.HotRodException;
+
+/**
+ * // TODO: Document this
+ *
+ * @author mmarkus
+ * @since 4.1
+ */
+public class HotRodClientException extends HotRodException {
+   private long messageId;
+   private int errorStatusCode;
+
+   public HotRodClientException(String message, Throwable cause) {
+      super(message, cause);
+   }
+
+   public HotRodClientException(String remoteMessage, long messageId, int errorStatusCode) {
+      super(remoteMessage);
+      this.messageId = messageId;
+      this.errorStatusCode = errorStatusCode;
+   }
+
+
+   @Override
+   public String toString() {
+      return "HotRodServerException{" +
+            "messageId=" + messageId +
+            ", errorStatusCode=" + errorStatusCode +
+            "} " + super.toString();
+   }
+}

Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/exceptions/HotRodException.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/exceptions/HotRodException.java	                        (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/exceptions/HotRodException.java	2010-03-29 15:41:37 UTC (rev 1634)
@@ -0,0 +1,24 @@
+package org.infinispan.client.hotrod.exceptions;
+
+/**
+ * // TODO: Document this
+ *
+ * @author mmarkus
+ * @since 4.1
+ */
+public class HotRodException extends RuntimeException {
+   public HotRodException() {
+   }
+
+   public HotRodException(String message) {
+      super(message);
+   }
+
+   public HotRodException(String message, Throwable cause) {
+      super(message, cause);
+   }
+
+   public HotRodException(Throwable cause) {
+      super(cause);
+   }
+}

Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/exceptions/InvalidResponseException.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/exceptions/InvalidResponseException.java	                        (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/exceptions/InvalidResponseException.java	2010-03-29 15:41:37 UTC (rev 1634)
@@ -0,0 +1,26 @@
+package org.infinispan.client.hotrod.exceptions;
+
+import org.infinispan.client.hotrod.exceptions.HotRodException;
+
+/**
+ * // TODO: Document this
+ *
+ * @author mmarkus
+ * @since 4.1
+ */
+public class InvalidResponseException extends HotRodException {
+   public InvalidResponseException() {
+   }
+
+   public InvalidResponseException(String message) {
+      super(message);
+   }
+
+   public InvalidResponseException(String message, Throwable cause) {
+      super(message, cause);
+   }
+
+   public InvalidResponseException(Throwable cause) {
+      super(cause);
+   }
+}

Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/exceptions/TimeoutException.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/exceptions/TimeoutException.java	                        (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/exceptions/TimeoutException.java	2010-03-29 15:41:37 UTC (rev 1634)
@@ -0,0 +1,10 @@
+package org.infinispan.client.hotrod.exceptions;
+
+/**
+ * // TODO: Document this
+ *
+ * @author mmarkus
+ * @since 4.1
+ */
+public class TimeoutException extends HotRodException {
+}

Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/BinaryVersionedValue.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/BinaryVersionedValue.java	                        (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/BinaryVersionedValue.java	2010-03-29 15:41:37 UTC (rev 1634)
@@ -0,0 +1,44 @@
+package org.infinispan.client.hotrod.impl;
+
+import java.util.Arrays;
+
+/**
+* // TODO: Document this
+*
+* @author mmarkus
+* @since 4.1
+*/
+public class BinaryVersionedValue {
+   private final long version;
+   private final byte[] value;
+
+   public BinaryVersionedValue(long version, byte[] value) {
+      this.version = version;
+      this.value = value;
+   }
+
+   public long getVersion() {
+      return version;
+   }
+
+   public byte[] getValue() {
+      return value;
+   }
+
+   @Override
+   public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      BinaryVersionedValue that = (BinaryVersionedValue) o;
+
+      return version == that.version && Arrays.equals(value, that.value);
+   }
+
+   @Override
+   public int hashCode() {
+      int result = (int) (version ^ (version >>> 32));
+      result = 31 * result + (value != null ? Arrays.hashCode(value) : 0);
+      return result;
+   }
+}

Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodConstants.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodConstants.java	                        (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodConstants.java	2010-03-29 15:41:37 UTC (rev 1634)
@@ -0,0 +1,68 @@
+package org.infinispan.client.hotrod.impl;
+
+import java.util.EnumMap;
+
+/**
+ * // TODO: Document this
+ *
+ * @author mmarkus
+ * @since 4.1
+ */
+public interface HotrodConstants {
+
+   public static final short REQUEST_MAGIC = 0xA0;
+   public static final short RESPONSE_MAGIC = 0xA1;
+
+   public static final byte HOTROD_VERSION = 10;
+
+   //requests
+   public static final byte PUT_REQUEST = 0x01;
+   public static final byte GET_REQUEST = 0x03;
+   public static final byte PUT_IF_ABSENT_REQUEST = 0x05;
+   public static final byte REPLACE_REQUEST = 0x07;
+   public static final byte REPLACE_IF_UNMODIFIED_REQUEST = 0x09;
+   public static final byte REMOVE_REQUEST = 0x0B;
+   public static final byte REMOVE_IF_UNMODIFIED_REQUEST = 0x0D;
+   public static final byte CONTAINS_KEY_REQUEST = 0x0F;
+   public static final byte PUT_FOR_EXTERNAL_READ_REQUEST = 0x11;
+   public static final byte GET_WITH_CAS_REQUEST = 0x13;
+   public static final byte EVICT_REQUEST = 0x15;
+   public static final byte CLEAR_REQUEST = 0x17;
+   public static final byte STATS_REQUEST = 0x19;
+   public static final byte QUIT_REQUEST = 0x1B;
+   public static final byte EVENT_REGISTRATION_REQUEST = 0x1D;
+
+   //responses
+   public static final byte PUT_RESPONSE = 0x02;
+   public static final byte GET_RESPONSE = 0x04;
+   public static final byte PUT_IF_ABSENT_RESPONSE = 0x06;
+   public static final byte REPLACE_RESPONSE = 0x08;
+   public static final byte REPLACE_IF_UNMODIFIED_RESPONSE = 0x0A;
+   public static final byte REMOVE_RESPONSE = 0x0C;
+   public static final byte REMOVE_IF_UNMODIFIED_RESPONSE = 0x0E;
+   public static final byte CONTAINS_KEY_RESPONSE = 0x10;
+   public static final byte PUT_FOR_EXTERNAL_READ_RESPONSE = 0x12;
+   public static final byte GET_WITH_CAS_RESPONSE = 0x14;
+   public static final byte EVICT_RESPONSE = 0x16;
+   public static final byte CLEAR_RESPONSE = 0x18;
+   public static final byte STATS_RESPONSE = 0x1A;
+   public static final byte QUIT_RESPONSE = 0x1C;
+   public static final byte EVENT_REGISTRATION_RESPONSE = 0x1E;
+   public static final byte ERROR_RESPONSE = 0x50;
+
+   //response status
+   public static final byte NO_ERROR_STATUS = 0x00;
+   public static final int INVALID_MAGIC_OR_MESSAGE_ID_STATUS = 0x81;
+   public static final int REQUEST_PARSING_ERROR_STATUS = 0x84;
+   public static final byte NOT_PUT_REMOVED_REPLACED_STATUS = 0x01;
+   public static final int UNKNOWN_COMMAND_STATUS = 0x82;
+   public static final int SERVER_ERROR_STATUS = 0x85;
+   public static final int KEY_DOES_NOT_EXIST_STATUS = 0x02;
+   public static final int UNKNOWN_VERSION_STATUS = 0x83;
+   public static final int COMMAND_TIMEOUT_STATUS = 0x86;
+
+
+   public static final byte CLIENT_INTELLIGENCE_BASIC = 0x01;
+   public static final byte CLIENT_INTELLIGENCE_TOPOLOGY_AWARE = 0x02;
+   public static final byte CLIENT_INTELLIGENCE_HASH_DISTRIBUTION_AWARE = 0x03;
+}

Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodMarshaller.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodMarshaller.java	                        (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodMarshaller.java	2010-03-29 15:41:37 UTC (rev 1634)
@@ -0,0 +1,14 @@
+package org.infinispan.client.hotrod.impl;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public interface HotrodMarshaller {
+
+   byte[] marshallObject(Object toMarshall);
+
+   Object readObject(byte[] bytes);
+}

Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperations.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperations.java	                        (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperations.java	2010-03-29 15:41:37 UTC (rev 1634)
@@ -0,0 +1,82 @@
+package org.infinispan.client.hotrod.impl;
+
+
+import org.infinispan.client.hotrod.Flag;
+
+import java.util.Map;
+
+/**
+ * // TODO: Document this
+ *
+ * - TODO - add timeout support
+ * - TODO - enforce encoding and add such tests
+ *
+ * @author mmarkus
+ * @since 4.1
+ */
+public interface HotrodOperations {
+
+   public enum VersionedOperationResponse {
+      SUCCESS(true), NO_SUCH_KEY(false), MODIFIED_KEY(false);
+      private boolean isModified;
+
+      VersionedOperationResponse(boolean modified) {
+         isModified = modified;
+      }
+
+      public boolean isUpdated() {
+         return isModified;
+      }
+   }
+
+   public byte[] get(byte[] key, Flag... flags);
+
+   public byte[] remove(byte[] key, Flag... flags);
+
+   public boolean containsKey(byte[] key, Flag... flags);
+
+   /**
+    * Returns null if the given key does not exist.
+    */
+   public BinaryVersionedValue getWithVersion(byte[] key, Flag... flags);
+
+   /**
+    * @param lifespan number of seconds that a entry during which the entry is allowed to life.
+    * If number of seconds is bigger than 30 days, this number of seconds is treated as UNIX time and so, represents
+    * the number of seconds since 1/1/1970. If set to 0, lifespan is unlimited.
+    * @param maxIdle Number of seconds that a entry can be idle before it's evicted from the cache. If 0, no max
+    * @param flags
+    */
+   public byte[] put(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags);
+
+   /**
+    * @param lifespan same as in {@link #put(byte[],byte[],int,int,org.infinispan.client.hotrod.Flag...)}
+    * @param maxIdle same as in {@link #put(byte[],byte[],int,int,org.infinispan.client.hotrod.Flag...)}
+    * @param flags
+    */
+   public byte[] putIfAbsent(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags);
+
+   /**
+    * @param lifespan same as in {@link #put(byte[],byte[],int,int,org.infinispan.client.hotrod.Flag...)}
+    * @param maxIdle same as in {@link #put(byte[],byte[],int,int,org.infinispan.client.hotrod.Flag...)}
+    * @param flags
+    */
+   public byte[] replace(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags);
+
+   /**
+    * @param lifespan same as in {@link #put(byte[],byte[],int,int,org.infinispan.client.hotrod.Flag...)}
+    * @param maxIdle same as in {@link #put(byte[],byte[],int,int,org.infinispan.client.hotrod.Flag...)}
+    * @param flags
+    */
+   public VersionedOperationResponse replaceIfUnmodified(byte[] key, byte[] value, int lifespan, int maxIdle, long version, Flag... flags);
+
+   public VersionedOperationResponse removeIfUnmodified(byte[] key, long version, Flag... flags);
+
+   public void clear(Flag... flags);
+
+   public Map<String, String> stats();
+
+   public String stats(String paramName);
+
+   public boolean ping();
+}

Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperationsImpl.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperationsImpl.java	                        (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperationsImpl.java	2010-03-29 15:41:37 UTC (rev 1634)
@@ -0,0 +1,282 @@
+package org.infinispan.client.hotrod.impl;
+
+import org.infinispan.client.hotrod.Flag;
+import org.infinispan.client.hotrod.exceptions.HotRodClientException;
+import org.infinispan.client.hotrod.exceptions.InvalidResponseException;
+import org.infinispan.client.hotrod.exceptions.TimeoutException;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * // TODO: Document this
+ *
+ * @author mmarkus
+ * @since 4.1
+ */
+public class HotrodOperationsImpl implements HotrodOperations, HotrodConstants {
+
+   private final byte[] cacheNameBytes;
+   private static final AtomicLong MSG_ID = new AtomicLong();
+   private TransportFactory transportFactory;
+   private byte clientIntelligence;
+
+   public HotrodOperationsImpl(String cacheName, TransportFactory transportFactory) {
+      cacheNameBytes = cacheName.getBytes(); //todo add charset here
+      this.transportFactory = transportFactory;
+   }
+
+   public byte[] get(byte[] key, Flag[] flags) {
+      Transport transport = getTransport();
+      try {
+         short status = sendKeyOperation(key, transport, GET_REQUEST, flags, GET_RESPONSE);
+         if (status == KEY_DOES_NOT_EXIST_STATUS) {
+            return null;
+         }
+         if (status == NO_ERROR_STATUS) {
+            return readValue(transport);
+         }
+      } finally {
+         transport.release();
+      }
+      throw new IllegalStateException("We should not reach here!");
+   }
+
+   public byte[] remove(byte[] key, Flag[] flags) {
+      Transport transport = getTransport();
+      try {
+         short status = sendKeyOperation(key, transport, REMOVE_REQUEST, flags, REMOVE_RESPONSE);
+         if (status == KEY_DOES_NOT_EXIST_STATUS) {
+            return null;
+         } else if (status == NO_ERROR_STATUS) {
+            return returnPossiblePrevValue(transport, flags);
+         }
+      } finally {
+         transport.release();
+      }
+      throw new IllegalStateException("We should not reach here!");
+   }
+
+   public boolean containsKey(byte[] key, Flag... flags) {
+      Transport transport = getTransport();
+      try {
+         short status = sendKeyOperation(key, transport, CONTAINS_KEY_REQUEST, flags, CONTAINS_KEY_RESPONSE);
+         if (status == KEY_DOES_NOT_EXIST_STATUS) {
+            return false;
+         } else if (status == NO_ERROR_STATUS) {
+            return true;
+         }
+      } finally {
+         transport.release();
+      }
+      throw new IllegalStateException("We should not reach here!");
+   }
+
+   public BinaryVersionedValue getWithVersion(byte[] key, Flag... flags) {
+      Transport transport = getTransport();
+      try {
+         short status = sendKeyOperation(key, transport, GET_WITH_CAS_REQUEST, flags, GET_WITH_CAS_RESPONSE);
+         if (status == KEY_DOES_NOT_EXIST_STATUS) {
+            return null;
+         }
+         if (status == NO_ERROR_STATUS) {
+            long version = transport.readVLong();
+            byte[] value = readValue(transport);
+            return new BinaryVersionedValue(version, value);
+         }
+      } finally {
+         transport.release();
+      }
+      throw new IllegalStateException("We should not reach here!");
+   }
+
+
+   public byte[] put(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags) {
+      Transport transport = getTransport();
+      try {
+         short status = sendPutOperation(key, value, transport, PUT_REQUEST, PUT_RESPONSE, lifespan, maxIdle, flags);
+         if (status != NO_ERROR_STATUS) {
+            throw new InvalidResponseException("Unexpected response status: " + Integer.toHexString(status));
+         }
+         return returnPossiblePrevValue(transport, flags);
+      } finally {
+         transport.release();
+      }
+   }
+
+   public byte[] putIfAbsent(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags) {
+      Transport transport = getTransport();
+      try {
+         short status = sendPutOperation(key, value, transport, PUT_IF_ABSENT_REQUEST, PUT_IF_ABSENT_RESPONSE, lifespan, maxIdle, flags);
+         if (status == NO_ERROR_STATUS) {
+            return returnPossiblePrevValue(transport, flags);
+         } else if (status == NOT_PUT_REMOVED_REPLACED_STATUS) {
+            return null;
+         }
+      } finally {
+         transport.release();
+      }
+      throw new IllegalStateException("We should not reach here!");
+   }
+
+   public byte[] replace(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags) {
+      return null;
+   }
+
+   public VersionedOperationResponse replaceIfUnmodified(byte[] key, byte[] value, int lifespan, int maxIdle, long version, Flag... flags) {
+      return null;  // TODO: Customise this generated block
+   }
+
+   public VersionedOperationResponse removeIfUnmodified(byte[] key, long version, Flag... flags) {
+      return null;  // TODO: Customise this generated block
+   }
+
+   public void clear(Flag... flags) {
+      // TODO: Customise this generated block
+   }
+
+   public Map<String, String> stats() {
+      return null;  // TODO: Customise this generated block
+   }
+
+   public String stats(String paramName) {
+      return null;  // TODO: Customise this generated block
+   }
+
+   public Transport getTransport() {
+      return transportFactory.getTransport();
+   }
+
+   private short sendKeyOperation(byte[] key, Transport transport, byte opCode, Flag[] flags, byte opRespCode) {
+      // 1) write [header][key length][key]
+      long messageId = writeHeader(transport, opCode, flags);
+      transport.writeBytesArray(key);
+      transport.flush();
+
+      // 2) now read the header
+      short status = readHeaderAndValidate(transport, messageId, opCode, opRespCode);
+
+      // 3) process possible error messages
+      checkForErrorsInResponseStatus(status, messageId, transport);
+
+      return status;
+   }
+
+   @Override
+   public boolean ping() {
+      return false;  // TODO: Customise this generated block
+   }
+
+   //[header][key length][key][lifespan][max idle][value length][value]
+
+   private short sendPutOperation(byte[] key, byte[] value, Transport transport, short opCode, byte opRespCode, int lifespan, int maxIdle, Flag[] flags) {
+      // 1) write header
+      long messageId = writeHeader(transport, opCode, flags);
+
+      // 2) write key and value
+      transport.writeBytesArray(key);
+      transport.writeVInt(lifespan);
+      transport.writeVInt(maxIdle);
+      transport.writeBytesArray(value);
+      transport.flush();
+
+      // 3) now read header
+      short status = readHeaderAndValidate(transport, messageId, opCode, opRespCode);
+      checkForErrorsInResponseStatus(status, messageId, transport);
+
+      //return status (not error status for sure)
+      return status;
+   }
+
+   /*
+    * Magic	| MessageId	| Version | Opcode | CacheNameLength | CacheName | Flags | Client Intelligence | Topology Id
+    */
+
+   private long writeHeader(Transport transport, short operationCode, Flag... flags) {
+      transport.appendUnsignedByte(REQUEST_MAGIC);
+      long messageId = MSG_ID.incrementAndGet();
+      transport.writeVLong(messageId);
+      transport.writeByte(HOTROD_VERSION);
+      transport.writeByte(operationCode);
+      transport.writeBytesArray(cacheNameBytes);
+      int flagInt = 0;
+      if (flags != null) {
+         for (Flag flag : flags) {
+            flagInt = flag.getFlagInt() | flagInt;
+         }
+      }
+      transport.writeVInt(flagInt);
+      transport.writeByte(clientIntelligence);
+      transport.writeVInt(0);//this will be changed once smarter clients are supported
+      return messageId;
+   }
+
+   /**
+    * Magic	| Message Id | Op code | Status | Topology Change Marker
+    */
+   private short readHeaderAndValidate(Transport transport, long messageId, short opCode, short opRespCode) {
+      short magic = transport.readByte();
+      if (magic != RESPONSE_MAGIC) {
+         throw new InvalidResponseException("Invalid magic number. Expected " + Integer.toHexString(RESPONSE_MAGIC) + " and received " + Integer.toHexString(magic));
+      }
+      long receivedMessageId = transport.readVLong();
+      if (receivedMessageId != messageId) {
+         throw new InvalidResponseException("Invalid message id. Expected " + Long.toHexString(messageId) + " and received " + Long.toHexString(receivedMessageId));
+      }
+      short receivedOpCode = transport.readByte();
+      if (receivedOpCode != opRespCode) {
+         if (receivedOpCode == ERROR_RESPONSE) {
+            checkForErrorsInResponseStatus(transport.readByte(), messageId, transport);
+            throw new IllegalStateException("Error expected! (i.e. exception in the prev statement)");
+         }
+         throw new InvalidResponseException("Invalid response operation. Expected " + Integer.toHexString(opCode) + " and received " + Integer.toHexString(receivedOpCode));
+      }
+      short status = transport.readByte();
+      transport.readByte(); //todo - this will be changed once we support smarter than basic clients
+      return status; 
+   }
+
+   private void checkForErrorsInResponseStatus(short status, long messageId, Transport transport) {
+      switch ((int) status) {
+         case INVALID_MAGIC_OR_MESSAGE_ID_STATUS:
+         case REQUEST_PARSING_ERROR_STATUS:
+         case UNKNOWN_COMMAND_STATUS:
+         case SERVER_ERROR_STATUS:
+         case UNKNOWN_VERSION_STATUS: {
+            throw new HotRodClientException(transport.readString(), messageId, status);
+         }
+         case COMMAND_TIMEOUT_STATUS: {
+            throw new TimeoutException();
+         }
+         case NO_ERROR_STATUS:
+         case KEY_DOES_NOT_EXIST_STATUS: {
+            //don't do anything, these are correct responses
+            break;
+         }
+         default: {
+            throw new IllegalStateException("Unknown status: " + Integer.toHexString(status));
+         }
+      }
+   }
+
+   /**
+    * Reads the length og the byte array and then the byte array from the transport.
+    */
+   private byte[] readValue(Transport transport) {
+      int responseLength = transport.readVInt();
+      byte[] result = new byte[responseLength];
+      return transport.readByteArray(result);
+   }
+
+   private boolean hasForceReturn(Flag[] flags) {
+      if (flags == null) return false;
+      for (Flag flag : flags) {
+         if (flag == Flag.FORCE_RETURN_VALUE) return true;
+      }
+      return false;
+   }
+
+   private byte[] returnPossiblePrevValue(Transport transport, Flag[] flags) {
+      return hasForceReturn(flags) ? readValue(transport) : null;
+   }
+}

Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/RemoteCacheImpl.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/RemoteCacheImpl.java	                        (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/RemoteCacheImpl.java	2010-03-29 15:41:37 UTC (rev 1634)
@@ -0,0 +1,198 @@
+package org.infinispan.client.hotrod.impl;
+
+import org.infinispan.client.hotrod.Flag;
+import org.infinispan.client.hotrod.RemoteCache;
+import org.infinispan.client.hotrod.Version;
+import org.infinispan.util.concurrent.NotifyingFuture;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public class RemoteCacheImpl<K, V> extends RemoteCacheSupport<K,V> {
+
+   private ThreadLocal<Flag[]> flagsMap = new ThreadLocal<Flag[]>();
+   private HotrodOperations operations;
+   private HotrodMarshaller marshaller;
+   private String name;
+
+
+   public RemoteCacheImpl(HotrodOperations operations, HotrodMarshaller marshaller, String name) {
+      this.operations = operations;
+      this.marshaller = marshaller;
+      this.name = name;
+   }
+
+   @Override
+   public boolean remove(K key, long version) {
+      HotrodOperations.VersionedOperationResponse response = operations.removeIfUnmodified(obj2bytes(key), version, flags());
+      return response.isUpdated();
+   }
+
+   @Override
+   public NotifyingFuture<Boolean> removeAsync(Object key, long version) {
+      return null;  // TODO: Customise this generated block
+   }
+
+   @Override
+   public boolean replace(K key, V newValue, long version, int lifespanSeconds, int maxIdleTimeSeconds) {
+      HotrodOperations.VersionedOperationResponse response = operations.replaceIfUnmodified(obj2bytes(key), obj2bytes(newValue), lifespanSeconds, maxIdleTimeSeconds, version, flags());
+      return response.isUpdated();
+   }
+
+   @Override
+   public NotifyingFuture<Boolean> replaceAsync(K key, V newValue, long version, int lifespanSeconds, int maxIdleSeconds) {
+      return null;  // TODO: Customise this generated block
+   }
+
+   @Override
+   public VersionedValue<V> getVersioned(K key) {
+      BinaryVersionedValue value = operations.getWithVersion(obj2bytes(key), flags());
+      return binary2VersionedValue(value);
+   }
+
+   @Override
+   public void putAll(Map<? extends K, ? extends V> map, long lifespan, TimeUnit lifespanUnit, long maxIdleTime, TimeUnit maxIdleTimeUnit) {
+      for (Entry<? extends K, ? extends V> entry : map.entrySet()) {
+         put(entry.getKey(), entry.getValue(), lifespan, lifespanUnit, maxIdleTime, maxIdleTimeUnit);
+      }
+   }
+
+   @Override
+   public NotifyingFuture<Void> putAllAsync(Map<? extends K, ? extends V> data, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit) {
+      return null;  // TODO: Customise this generated block
+   }
+
+   @Override
+   public String getName() {
+      return name;
+   }
+
+   @Override
+   public String getVersion() {
+      return Version.getProtocolVersion();  
+   }
+
+   @Override
+   public V put(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdleTime, TimeUnit maxIdleTimeUnit) {
+      int lifespanSecs = toSeconds(lifespan, lifespanUnit);
+      int maxIdleSecs = toSeconds(maxIdleTime, maxIdleTimeUnit);
+      byte[] result = operations.put(obj2bytes(key), obj2bytes(value), lifespanSecs, maxIdleSecs, flags());
+      return (V) bytes2obj(result);
+   }
+
+
+   @Override
+   public V putIfAbsent(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdleTime, TimeUnit maxIdleTimeUnit) {
+      int lifespanSecs = toSeconds(lifespan, lifespanUnit);
+      int maxIdleSecs = toSeconds(maxIdleTime, maxIdleTimeUnit);
+      byte[] bytes = operations.putIfAbsent(obj2bytes(key), obj2bytes(value), lifespanSecs, maxIdleSecs, flags());
+      return (V) bytes2obj(bytes);
+   }
+
+   @Override
+   public V replace(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdleTime, TimeUnit maxIdleTimeUnit) {
+      int lifespanSecs = toSeconds(lifespan, lifespanUnit);
+      int maxIdleSecs = toSeconds(maxIdleTime, maxIdleTimeUnit);
+      byte[] bytes = operations.replace(obj2bytes(key), obj2bytes(value), lifespanSecs, maxIdleSecs, flags());
+      return (V) bytes2obj(bytes);
+   }
+
+   @Override
+   public NotifyingFuture<V> putAsync(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit) {
+      return null;  // TODO: Customise this generated block
+   }
+
+   @Override
+   public NotifyingFuture<Void> clearAsync() {
+      return null;  // TODO: Customise this generated block
+   }
+
+   @Override
+   public NotifyingFuture<V> putIfAbsentAsync(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit) {
+      return null;  // TODO: Customise this generated block
+   }
+
+   @Override
+   public NotifyingFuture<V> removeAsync(Object key) {
+      return null;  // TODO: Customise this generated block
+   }
+
+   @Override
+   public NotifyingFuture<V> replaceAsync(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit) {
+      return null;  // TODO: Customise this generated block
+   }
+
+   @Override
+   public boolean containsKey(Object key) {
+      return operations.containsKey(obj2bytes(key), flags());
+   }
+
+   @Override
+   public V get(Object key) {
+      byte[] bytes = operations.get(obj2bytes(key), flags());
+      return (V) bytes2obj(bytes);
+   }
+
+   @Override
+   public V remove(Object key) {
+      byte[] existingValue = operations.remove(obj2bytes(key), flags());
+      return (V) bytes2obj(existingValue);
+   }
+
+   @Override
+   public void clear() {
+      operations.clear(flags());
+   }
+
+   @Override
+   public void start() {
+      // TODO: Customise this generated block
+   }
+
+   @Override
+   public void stop() {
+      // TODO: Customise this generated block
+   }
+
+
+   @Override
+   public RemoteCache withFlags(Flag... flags) {
+      this.flagsMap.set(flags);
+      return this;
+   }
+
+   private Flag[] flags() {
+      Flag[] flags = this.flagsMap.get();
+      this.flagsMap.remove();
+      return flags;
+   }
+
+   private byte[] obj2bytes(Object obj) {
+      return this.marshaller.marshallObject(obj);
+   }
+
+   private Object bytes2obj(byte[] bytes) {
+      if (bytes == null) {
+         return null;
+      }
+      return this.marshaller.readObject(bytes);
+   }
+
+   private VersionedValue<V> binary2VersionedValue(BinaryVersionedValue value) {
+      if (value == null)
+         return null;
+      V valueObj = (V) bytes2obj(value.getValue());
+      return new VersionedValueImpl<V>(value.getVersion(), valueObj);
+   }
+
+   private int toSeconds(long durration, TimeUnit timeUnit) {
+      //todo make sure this can pe enveloped on an int
+      return (int) timeUnit.toSeconds(durration);
+   }
+}

Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/RemoteCacheSupport.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/RemoteCacheSupport.java	                        (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/RemoteCacheSupport.java	2010-03-29 15:41:37 UTC (rev 1634)
@@ -0,0 +1,154 @@
+package org.infinispan.client.hotrod.impl;
+
+import org.infinispan.AdvancedCache;
+import org.infinispan.CacheSupport;
+import org.infinispan.client.hotrod.RemoteCache;
+import org.infinispan.config.Configuration;
+import org.infinispan.lifecycle.ComponentStatus;
+import org.infinispan.manager.CacheManager;
+import org.infinispan.util.concurrent.NotifyingFuture;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public abstract class RemoteCacheSupport<K,V> extends CacheSupport<K,V> implements RemoteCache<K,V> {
+
+
+   @Override
+   public NotifyingFuture<Boolean> replaceAsync(K key, V newValue, long version) {
+      return replaceAsync(key, newValue, version, 0);
+   }
+
+   @Override
+   public NotifyingFuture<Boolean> replaceAsync(K key, V newValue, long version, int lifespanSeconds) {
+      return replaceAsync(key, newValue, version, 0, 0);
+   }
+
+   @Override
+   public boolean replace(K key, V newValue, long version) {
+      return replace(key, newValue, version, 0);
+   }
+
+   @Override
+   public CacheManager getCacheManager() {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public boolean replace(K key, V newValue, long version, int lifespanSeconds) {
+      return replace(key, newValue, version, lifespanSeconds, 0);
+   }
+
+   @Override
+   public void putForExternalRead(K key, V value) {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public void addListener(Object listener) {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public void removeListener(Object listener) {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public Set<Object> getListeners() {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public int size() {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public boolean isEmpty() {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public boolean containsValue(Object value) {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public Set<K> keySet() {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public Collection<V> values() {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public Set<Entry<K, V>> entrySet() {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public void evict(K key) {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public Configuration getConfiguration() {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public boolean startBatch() {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public void endBatch(boolean successful) {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public boolean remove(Object key, Object value) {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public NotifyingFuture<Boolean> removeAsync(Object key, Object value) {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public boolean replace(K key, V oldValue, V value, long lifespan, TimeUnit lifespanUnit, long maxIdleTime, TimeUnit maxIdleTimeUnit) {
+      throw new UnsupportedOperationException();
+   }
+
+
+   @Override
+   public NotifyingFuture<Boolean> replaceAsync(K key, V oldValue, V newValue, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit) {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public AdvancedCache<K, V> getAdvancedCache() {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public void compact() {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public ComponentStatus getStatus() {
+      throw new UnsupportedOperationException();
+   }
+}

Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/SerializationMarshaller.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/SerializationMarshaller.java	                        (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/SerializationMarshaller.java	2010-03-29 15:41:37 UTC (rev 1634)
@@ -0,0 +1,40 @@
+package org.infinispan.client.hotrod.impl;
+
+import org.infinispan.client.hotrod.exceptions.HotRodClientException;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public class SerializationMarshaller implements HotrodMarshaller {
+
+   @Override
+   public byte[] marshallObject(Object toMarshall) {
+      ByteArrayOutputStream result = new ByteArrayOutputStream(1000);
+      try {
+         ObjectOutputStream oos = new ObjectOutputStream(result);
+         oos.writeObject(toMarshall);
+         return result.toByteArray();
+      } catch (IOException e) {
+         throw new HotRodClientException("Unexpected!", e);
+      }
+   }
+
+   @Override
+   public Object readObject(byte[] bytes) {
+      try {
+         ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
+         return ois.readObject();
+      } catch (Exception e) {
+         throw new HotRodClientException("Unexpected!", e);
+      }
+   }
+}

Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/Transport.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/Transport.java	                        (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/Transport.java	2010-03-29 15:41:37 UTC (rev 1634)
@@ -0,0 +1,41 @@
+package org.infinispan.client.hotrod.impl;
+
+/**
+ * // TODO: Document this
+ *
+ * @author mmarkus
+ * @since 4.1
+ */
+public interface Transport {
+
+   public void writeBytesArray(byte... toAppend);
+
+   public void writeByte(short toWrite);
+
+   /**
+    * Treats the tailing byte as an unsigned byte.
+    */
+   public void appendUnsignedByte(short requestMagic);
+
+   public void writeVInt(int length);
+
+   public void writeVLong(long l);
+
+   public long readVLong();
+
+   public int readVInt();
+
+   public void flush();
+
+   public short readByte();
+
+   public void release();
+
+   /**
+    * reads an vint which is size; then an array having that size.
+    * @param bufferToFill
+    */
+   public byte[] readByteArray(byte[] bufferToFill);
+
+   String readString();
+}

Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/TransportFactory.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/TransportFactory.java	                        (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/TransportFactory.java	2010-03-29 15:41:37 UTC (rev 1634)
@@ -0,0 +1,17 @@
+package org.infinispan.client.hotrod.impl;
+
+import java.util.Properties;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public interface TransportFactory {
+   public Transport getTransport();
+
+   void init(Properties props);
+
+   void destroy();
+}

Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/VersionedValueImpl.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/VersionedValueImpl.java	                        (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/VersionedValueImpl.java	2010-03-29 15:41:37 UTC (rev 1634)
@@ -0,0 +1,31 @@
+package org.infinispan.client.hotrod.impl;
+
+import org.infinispan.client.hotrod.RemoteCache;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public class VersionedValueImpl<V> implements RemoteCache.VersionedValue<V> {
+
+   private long version;
+
+   private V value;
+
+   public VersionedValueImpl(long version, V value) {
+      this.version = version;
+      this.value = value;
+   }
+
+   @Override
+   public long getVersion() {
+      return version;
+   }
+
+   @Override
+   public V getValue() {
+      return value;
+   }
+}

Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/TransportException.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/TransportException.java	                        (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/TransportException.java	2010-03-29 15:41:37 UTC (rev 1634)
@@ -0,0 +1,26 @@
+package org.infinispan.client.hotrod.impl.transport;
+
+import org.infinispan.client.hotrod.exceptions.HotRodException;
+
+/**
+ * // TODO: Document this
+ *
+ * @author mmarkus
+ * @since 4.1
+ */
+public class TransportException extends HotRodException {
+   public TransportException() {
+   }
+
+   public TransportException(String message) {
+      super(message);
+   }
+
+   public TransportException(String message, Throwable cause) {
+      super(message, cause);
+   }
+
+   public TransportException(Throwable cause) {
+      super(cause);
+   }
+}
\ No newline at end of file

Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/VHelper.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/VHelper.java	                        (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/VHelper.java	2010-03-29 15:41:37 UTC (rev 1634)
@@ -0,0 +1,102 @@
+package org.infinispan.client.hotrod.impl.transport;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * // TODO: Document this
+ *
+ * @author mmarkus
+ * @since 4.1
+ */
+public class VHelper {
+   private static final int MAX_VINT_BYTES = 5;
+   private static final int MAX_VLONG_BYTES = 9;
+
+   public static int readVInt(InputStream is) {
+      int result = 0;
+      for (int i = 0; i < MAX_VINT_BYTES; i++) {
+         int aByte = nextByte(is);
+         boolean hasMore = (aByte & 128) > 0;
+         aByte &= 127; //remove leading byte
+         result = result | (aByte << (i*7));
+         if (!hasMore) break;
+      }
+      if (result < 0)
+         throw new TransportException("negative number read: " + result);
+      return result;
+   }
+
+   public static void writeVInt(int toWrite, OutputStream os) {
+      boolean hasMore;
+      do {
+         int currentByte = toWrite & 0x0000007F;
+         toWrite = toWrite >> 7;
+         hasMore = toWrite > 0;
+         if (hasMore) {
+            currentByte |= 128;
+         }
+         writeByte(os, currentByte);
+      } while (hasMore);
+   }
+
+   private static void writeByte(OutputStream os, int currentByte) {
+      try {
+         os.write(currentByte);
+      } catch (IOException e) {
+         throw new TransportException(e);
+      }
+   }
+
+   private static int nextByte(InputStream is) {
+      try {
+         int result = is.read();
+         if (result < 0) {
+            throw new TransportException("Unexpected end of stream " + result);
+         }
+         return result;
+      } catch (IOException e) {
+         throw new TransportException(e);
+      }
+   }
+
+   public static long readVLong(InputStream is) {
+      long result = 0;
+      for (int i = 0; i < MAX_VLONG_BYTES; i++) {
+         long aByte = nextByte(is);
+         boolean hasMore = (aByte & 128) > 0;
+         aByte &= 127; //remove leading byte
+         result = result | (aByte << (i*7));
+         if (!hasMore) break;
+      }
+      if (result < 0)
+         throw new TransportException("negative number read: " + result);
+      return result;
+   }
+
+
+   public static void writeVLong(long toWrite, OutputStream os) {
+      boolean hasMore;
+      do {
+         long currentByte = toWrite & 0x000000000000007F;
+         toWrite = toWrite >> 7;
+         hasMore = toWrite > 0;
+         if (hasMore) {
+            currentByte |= 128;
+         }
+         writeByte(os, (int)currentByte);
+      } while (hasMore);
+   }
+
+   public static void main(String[] args) {
+      long zero = 0;
+      long aByte = 1;
+      long shift = 35;
+      System.out.println((zero | (aByte << 35)));
+      long a = 1l<<18;
+      long b = 1l<<17;
+      long ab = a * b;
+      System.out.println(ab);
+   }
+}

Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransport.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransport.java	                        (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransport.java	2010-03-29 15:41:37 UTC (rev 1634)
@@ -0,0 +1,147 @@
+package org.infinispan.client.hotrod.impl.transport.tcp;
+
+import org.infinispan.client.hotrod.impl.Transport;
+import org.infinispan.client.hotrod.impl.transport.TransportException;
+import org.infinispan.client.hotrod.impl.transport.VHelper;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.util.logging.Logger;
+
+/**
+ * // TODO: Document this
+ *
+ * @author mmarkus
+ * @since 4.1
+ */
+public class TcpTransport implements Transport {
+
+   public static final Logger log = Logger.getLogger(TcpTransport.class.getName());
+
+   private String host;
+   private int port;
+   private Socket socket;
+
+   public void appendUnsignedByte(short requestMagic) {
+      try {
+         socket.getOutputStream().write(requestMagic);
+      } catch (IOException e) {
+         throw new TransportException(e);
+      }
+   }
+
+   public void writeVInt(int length) {
+      try {
+         VHelper.writeVInt(length, socket.getOutputStream());
+      } catch (IOException e) {
+         throw new TransportException(e);
+      }
+   }
+
+   public void writeVLong(long l) {
+      try {
+         VHelper.writeVLong(l, socket.getOutputStream());
+      } catch (IOException e) {
+         throw new TransportException(e);
+      }
+   }
+
+   public long readVLong() {
+      try {
+         return VHelper.readVLong(socket.getInputStream());
+      } catch (IOException e) {
+         throw new TransportException(e);
+      }
+   }
+
+   public int readVInt() {
+      try {
+         return VHelper.readVInt(socket.getInputStream());
+      } catch (IOException e) {
+         throw new TransportException(e);
+      }
+   }
+
+   public TcpTransport(String host, int port) {
+      this.host = host;
+      this.port = port;
+   }
+
+   public void connect() {
+      try {
+         socket = new Socket(host, port);
+      } catch (IOException e) {
+         throw new TransportException("Problems establishing initial connection", e);
+      }
+   }
+
+   public void writeBytesArray(byte... toAppend) {
+      try {
+         writeVInt(toAppend.length);
+         socket.getOutputStream().write(toAppend);
+      } catch (IOException e) {
+         throw new TransportException("Problems writing data to stream", e);
+      }
+   }
+
+   @Override
+   public void writeByte(short toWrite) {
+      try {
+         socket.getOutputStream().write(toWrite);
+      } catch (IOException e) {
+         throw new TransportException("Problems writing data to stream", e);
+      }
+   }
+
+   public void flush() {
+      try {
+         socket.getOutputStream().flush();
+      } catch (IOException e) {
+         throw new TransportException(e);
+      }
+   }
+
+   public short readByte() {
+      int resultInt;
+      try {
+         resultInt = socket.getInputStream().read();
+      } catch (IOException e) {
+         throw new TransportException(e);
+      }
+      if (resultInt == -1) {
+         throw new TransportException("End of stream reached!");
+      }
+      return (short) resultInt;
+   }
+
+   public void release() {
+      try {
+         socket.close();
+      } catch (IOException e) {
+         log.warning("Issues closing socket:" + e.getMessage());
+      }
+   }
+
+   public byte[] readByteArray(byte[] bufferToFill) {
+      int size;
+      try {
+         size = socket.getInputStream().read(bufferToFill);
+      } catch (IOException e) {
+         throw new TransportException(e);
+      }
+      if (size == -1) {
+         throw new RuntimeException("End of stream reached!");
+      }
+      if (size != bufferToFill.length) {
+         throw new TransportException("Expected " + bufferToFill.length + " bytes but only could read " + size + " bytes!");
+      }
+      return bufferToFill;
+   }
+
+   @Override
+   public String readString() {
+      long strLength = readVLong();
+      byte[] strContent = readByteArray(new byte[(int)strLength]);
+      return new String(strContent);//todo take care of encoding here
+   }
+}

Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java	                        (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java	2010-03-29 15:41:37 UTC (rev 1634)
@@ -0,0 +1,45 @@
+package org.infinispan.client.hotrod.impl.transport.tcp;
+
+import org.infinispan.client.hotrod.impl.Transport;
+import org.infinispan.client.hotrod.impl.TransportFactory;
+
+import java.util.Properties;
+import java.util.StringTokenizer;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public class TcpTransportFactory implements TransportFactory {
+
+   private String serverHost;
+   private int serverPort;
+
+   public void init(Properties props) {
+      String servers = props.getProperty("hotrod-servers");
+      StringTokenizer tokenizer = new StringTokenizer(servers,";");
+      String server = tokenizer.nextToken();
+      String[] serverDef = tokenizeServer(server);
+      serverHost = serverDef[0];
+      serverPort = Integer.parseInt(serverDef[1]);
+   }
+
+   @Override
+   public void destroy() {
+      // TODO: Customise this generated block
+   }
+
+   private String[] tokenizeServer(String server) {
+      StringTokenizer t = new StringTokenizer(server, ":");
+      return new String[] {t.nextToken(), t.nextToken()};
+   }
+
+   @Override
+   public Transport getTransport() {
+      TcpTransport transport = new TcpTransport(serverHost, serverPort);
+      transport.connect();
+      return transport;
+   }
+}



More information about the infinispan-commits mailing list