[infinispan-commits] Infinispan SVN: r1805 - in trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl: async and 5 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Tue May 18 11:40:50 EDT 2010


Author: mircea.markus
Date: 2010-05-18 11:40:48 -0400 (Tue, 18 May 2010)
New Revision: 1805

Added:
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/consistenthash/ConsistentHashV1.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodOperationsHelper.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotrodConstants.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotrodOperations.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotrodOperationsImpl.java
Removed:
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodConstants.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperations.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperationsImpl.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/consistenthash/ConsitentHashV1.java
Modified:
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/async/DefaultAsyncExecutorFactory.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/async/NotifyingFutureImpl.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/consistenthash/ConsistentHash.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/consistenthash/ConsistentHashFactory.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/AbstractTransport.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/Transport.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/TransportFactory.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/VHelper.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/HotrodClientDecoder.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/HotrodClientEncoder.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/HotrodClientPipelaneFactory.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/InputStreamAdapter.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransport.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransportFactory.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/OutputStreamAdapter.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/PropsKeyedObjectPoolFactory.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/RequestBalancingStrategy.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/RoundRobinBalancingStrategy.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransport.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TransportObjectFactory.java
Log:
updated test and javadocs, fixed various issues


Deleted: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodConstants.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodConstants.java	2010-05-18 15:39:00 UTC (rev 1804)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodConstants.java	2010-05-18 15:40:48 UTC (rev 1805)
@@ -1,61 +0,0 @@
-package org.infinispan.client.hotrod.impl;
-
-/**
- * // TODO: Document this
- *
- * @author mmarkus
- * @since 4.1
- */
-public interface HotrodConstants {
-
-   public static final short REQUEST_MAGIC = 0xA0;
-   public static final short RESPONSE_MAGIC = 0xA1;
-
-   public static final byte HOTROD_VERSION = 10;
-
-   //requests
-   public static final byte PUT_REQUEST = 0x01;
-   public static final byte GET_REQUEST = 0x03;
-   public static final byte PUT_IF_ABSENT_REQUEST = 0x05;
-   public static final byte REPLACE_REQUEST = 0x07;
-   public static final byte REPLACE_IF_UNMODIFIED_REQUEST = 0x09;
-   public static final byte REMOVE_REQUEST = 0x0B;
-   public static final byte REMOVE_IF_UNMODIFIED_REQUEST = 0x0D;
-   public static final byte CONTAINS_KEY_REQUEST = 0x0F;
-   public static final byte GET_WITH_VERSION = 0x11;
-   public static final byte CLEAR_REQUEST = 0x13;
-   public static final byte STATS_REQUEST = 0x15;
-   public static final byte PING_REQUEST = 0x17;
-
-
-   //responses
-   public static final byte PUT_RESPONSE = 0x02;
-   public static final byte GET_RESPONSE = 0x04;
-   public static final byte PUT_IF_ABSENT_RESPONSE = 0x06;
-   public static final byte REPLACE_RESPONSE = 0x08;
-   public static final byte REPLACE_IF_UNMODIFIED_RESPONSE = 0x0A;
-   public static final byte REMOVE_RESPONSE = 0x0C;
-   public static final byte REMOVE_IF_UNMODIFIED_RESPONSE = 0x0E;
-   public static final byte CONTAINS_KEY_RESPONSE = 0x10;
-   public static final byte GET_WITH_VERSION_RESPONSE = 0x12;
-   public static final byte CLEAR_RESPONSE = 0x14;
-   public static final byte STATS_RESPONSE = 0x16;
-   public static final byte PING_RESPONSE = 0x18;
-   public static final byte ERROR_RESPONSE = 0x50;
-
-   //response status
-   public static final byte NO_ERROR_STATUS = 0x00;
-   public static final int INVALID_MAGIC_OR_MESSAGE_ID_STATUS = 0x81;
-   public static final int REQUEST_PARSING_ERROR_STATUS = 0x84;
-   public static final byte NOT_PUT_REMOVED_REPLACED_STATUS = 0x01;
-   public static final int UNKNOWN_COMMAND_STATUS = 0x82;
-   public static final int SERVER_ERROR_STATUS = 0x85;
-   public static final int KEY_DOES_NOT_EXIST_STATUS = 0x02;
-   public static final int UNKNOWN_VERSION_STATUS = 0x83;
-   public static final int COMMAND_TIMEOUT_STATUS = 0x86;
-
-
-   public static final byte CLIENT_INTELLIGENCE_BASIC = 0x01;
-   public static final byte CLIENT_INTELLIGENCE_TOPOLOGY_AWARE = 0x02;
-   public static final byte CLIENT_INTELLIGENCE_HASH_DISTRIBUTION_AWARE = 0x03;
-}

Deleted: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperations.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperations.java	2010-05-18 15:39:00 UTC (rev 1804)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperations.java	2010-05-18 15:40:48 UTC (rev 1805)
@@ -1,67 +0,0 @@
-package org.infinispan.client.hotrod.impl;
-
-
-import org.infinispan.client.hotrod.Flag;
-
-import java.util.Map;
-
-/**
- * // TODO: Document this
- *
- * - TODO - add timeout support
- * - TODO - enforce encoding and add such tests
- *
- * @author mmarkus
- * @since 4.1
- */
-public interface HotrodOperations {
-
-   public byte[] get(byte[] key, Flag... flags);
-
-   public byte[] remove(byte[] key, Flag... flags);
-
-   public boolean containsKey(byte[] key, Flag... flags);
-
-   /**
-    * Returns null if the given key does not exist.
-    */
-   public BinaryVersionedValue getWithVersion(byte[] key, Flag... flags);
-
-   /**
-    * @param lifespan number of seconds that a entry during which the entry is allowed to life.
-    * If number of seconds is bigger than 30 days, this number of seconds is treated as UNIX time and so, represents
-    * the number of seconds since 1/1/1970. If set to 0, lifespan is unlimited.
-    * @param maxIdle Number of seconds that a entry can be idle before it's evicted from the cache. If 0, no max
-    * @param flags
-    */
-   public byte[] put(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags);
-
-   /**
-    * @param lifespan same as in {@link #put(byte[],byte[],int,int,org.infinispan.client.hotrod.Flag...)}
-    * @param maxIdle same as in {@link #put(byte[],byte[],int,int,org.infinispan.client.hotrod.Flag...)}
-    * @param flags
-    */
-   public byte[] putIfAbsent(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags);
-
-   /**
-    * @param lifespan same as in {@link #put(byte[],byte[],int,int,org.infinispan.client.hotrod.Flag...)}
-    * @param maxIdle same as in {@link #put(byte[],byte[],int,int,org.infinispan.client.hotrod.Flag...)}
-    * @param flags
-    */
-   public byte[] replace(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags);
-
-   /**
-    * @param lifespan same as in {@link #put(byte[],byte[],int,int,org.infinispan.client.hotrod.Flag...)}
-    * @param maxIdle same as in {@link #put(byte[],byte[],int,int,org.infinispan.client.hotrod.Flag...)}
-    * @param flags
-    */
-   public VersionedOperationResponse replaceIfUnmodified(byte[] key, byte[] value, int lifespan, int maxIdle, long version, Flag... flags);
-
-   public VersionedOperationResponse removeIfUnmodified(byte[] key, long version, Flag... flags);
-
-   public void clear(Flag... flags);
-
-   public Map<String, String> stats();
-
-   public boolean ping();
-}

Deleted: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperationsImpl.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperationsImpl.java	2010-05-18 15:39:00 UTC (rev 1804)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperationsImpl.java	2010-05-18 15:40:48 UTC (rev 1805)
@@ -1,460 +0,0 @@
-package org.infinispan.client.hotrod.impl;
-
-import org.infinispan.client.hotrod.Flag;
-import org.infinispan.client.hotrod.exceptions.HotRodClientException;
-import org.infinispan.client.hotrod.exceptions.InvalidResponseException;
-import org.infinispan.client.hotrod.exceptions.TimeoutException;
-import org.infinispan.client.hotrod.exceptions.TransportException;
-import org.infinispan.client.hotrod.impl.transport.Transport;
-import org.infinispan.client.hotrod.impl.transport.TransportFactory;
-import org.infinispan.util.logging.Log;
-import org.infinispan.util.logging.LogFactory;
-
-import java.net.InetSocketAddress;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * // TODO: Document this
- *
- * @author mmarkus
- * @since 4.1
- */
-public class HotrodOperationsImpl implements HotrodOperations, HotrodConstants {
-
-   private static Log log = LogFactory.getLog(HotrodOperationsImpl.class);
-
-   private final byte[] cacheNameBytes;
-   private static final AtomicLong MSG_ID = new AtomicLong();
-   private static final AtomicInteger TOPOLOGY_ID = new AtomicInteger();
-   private TransportFactory transportFactory;
-   private byte clientIntelligence = CLIENT_INTELLIGENCE_HASH_DISTRIBUTION_AWARE;
-
-   public HotrodOperationsImpl(String cacheName, TransportFactory transportFactory) {
-      cacheNameBytes = cacheName.getBytes(); //todo add charset here
-      this.transportFactory = transportFactory;
-   }
-
-   public byte[] get(byte[] key, Flag[] flags) {
-      Transport transport = transportFactory.getTransport(key);
-      try {
-         short status = sendKeyOperation(key, transport, GET_REQUEST, flags, GET_RESPONSE);
-         if (status == KEY_DOES_NOT_EXIST_STATUS) {
-            return null;
-         }
-         if (status == NO_ERROR_STATUS) {
-            return transport.readArray();
-         }
-      } finally {
-         releaseTransport(transport);
-      }
-      throw new IllegalStateException("We should not reach here!");
-   }
-
-   public byte[] remove(byte[] key, Flag[] flags) {
-      Transport transport = transportFactory.getTransport(key);
-      try {
-         short status = sendKeyOperation(key, transport, REMOVE_REQUEST, flags, REMOVE_RESPONSE);
-         if (status == KEY_DOES_NOT_EXIST_STATUS) {
-            return null;
-         } else if (status == NO_ERROR_STATUS) {
-            return returnPossiblePrevValue(transport, flags);
-         }
-      } finally {
-         releaseTransport(transport);
-      }
-      throw new IllegalStateException("We should not reach here!");
-   }
-
-   public boolean containsKey(byte[] key, Flag... flags) {
-      Transport transport = transportFactory.getTransport(key);
-      try {
-         short status = sendKeyOperation(key, transport, CONTAINS_KEY_REQUEST, flags, CONTAINS_KEY_RESPONSE);
-         if (status == KEY_DOES_NOT_EXIST_STATUS) {
-            return false;
-         } else if (status == NO_ERROR_STATUS) {
-            return true;
-         }
-      } finally {
-         releaseTransport(transport);
-      }
-      throw new IllegalStateException("We should not reach here!");
-   }
-
-   public BinaryVersionedValue getWithVersion(byte[] key, Flag... flags) {
-      Transport transport = transportFactory.getTransport(key);
-      try {
-         short status = sendKeyOperation(key, transport, GET_WITH_VERSION, flags, GET_WITH_VERSION_RESPONSE);
-         if (status == KEY_DOES_NOT_EXIST_STATUS) {
-            return null;
-         }
-         if (status == NO_ERROR_STATUS) {
-            long version = transport.readLong();
-            if (log.isTraceEnabled()) {
-               log.trace("Received version: " + version);
-            }
-            byte[] value = transport.readArray();
-            return new BinaryVersionedValue(version, value);
-         }
-      } finally {
-         releaseTransport(transport);
-      }
-      throw new IllegalStateException("We should not reach here!");
-   }
-
-
-   public byte[] put(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags) {
-      Transport transport = transportFactory.getTransport(key);
-      try {
-         short status = sendPutOperation(key, value, transport, PUT_REQUEST, PUT_RESPONSE, lifespan, maxIdle, flags);
-         if (status != NO_ERROR_STATUS) {
-            throw new InvalidResponseException("Unexpected response status: " + Integer.toHexString(status));
-         }
-         return returnPossiblePrevValue(transport, flags);
-      } finally {
-         releaseTransport(transport);
-      }
-   }
-
-   public byte[] putIfAbsent(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags) {
-      Transport transport = transportFactory.getTransport(key);
-      try {
-         short status = sendPutOperation(key, value, transport, PUT_IF_ABSENT_REQUEST, PUT_IF_ABSENT_RESPONSE, lifespan, maxIdle, flags);
-         if (status == NO_ERROR_STATUS || status == NOT_PUT_REMOVED_REPLACED_STATUS) {
-            byte[] bytes = returnPossiblePrevValue(transport, flags);
-            if (log.isTraceEnabled()) {
-               log.trace("Returning from putIfAbsent: " + Arrays.toString(bytes));
-            }
-            return bytes;
-         }
-      } finally {
-         releaseTransport(transport);
-      }
-      throw new IllegalStateException("We should not reach here!");
-   }
-
-   public byte[] replace(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags) {
-      Transport transport = transportFactory.getTransport(key);
-      try {
-         short status = sendPutOperation(key, value, transport, REPLACE_REQUEST, REPLACE_RESPONSE, lifespan, maxIdle, flags);
-         if (status == NO_ERROR_STATUS || status == NOT_PUT_REMOVED_REPLACED_STATUS) {
-            return returnPossiblePrevValue(transport, flags);
-         }
-      } finally {
-         releaseTransport(transport);
-      }
-      throw new IllegalStateException("We should not reach here!");
-   }
-
-   /**
-    * request : [header][key length][key][lifespan][max idle][entry_version][value length][value] response: If
-    * ForceReturnPreviousValue has been passed, this responses will contain previous [value length][value] for that key.
-    * If the key does not exist or previous was null, value length would be 0. Otherwise, if no ForceReturnPreviousValue
-    * was sent, the response would be empty.
-    */
-   public VersionedOperationResponse replaceIfUnmodified(byte[] key, byte[] value, int lifespan, int maxIdle, long version, Flag... flags) {
-      Transport transport = transportFactory.getTransport(key);
-      try {
-         // 1) write header
-         long messageId = writeHeader(transport, REPLACE_IF_UNMODIFIED_REQUEST, flags);
-
-         //2) write message body
-         transport.writeArray(key);
-         transport.writeVInt(lifespan);
-         transport.writeVInt(maxIdle);
-         transport.writeLong(version);
-         transport.writeArray(value);
-         return returnVersionedOperationResponse(transport, messageId, REPLACE_IF_UNMODIFIED_RESPONSE, flags);
-      } finally {
-         releaseTransport(transport);
-      }
-   }
-
-   /**
-    * Request: [header][key length][key][entry_version]
-    */
-   public VersionedOperationResponse removeIfUnmodified(byte[] key, long version, Flag... flags) {
-      Transport transport = transportFactory.getTransport(key);
-      try {
-         // 1) write header
-         long messageId = writeHeader(transport, REMOVE_IF_UNMODIFIED_REQUEST, flags);
-
-         //2) write message body
-         transport.writeArray(key);
-         transport.writeLong(version);
-
-         //process response and return
-         return returnVersionedOperationResponse(transport, messageId, REMOVE_IF_UNMODIFIED_RESPONSE, flags);
-
-      } finally {
-         releaseTransport(transport);
-      }
-   }
-
-   public void clear(Flag... flags) {
-      Transport transport = transportFactory.getTransport();
-      try {
-         // 1) write header
-         long messageId = writeHeader(transport, CLEAR_REQUEST, flags);
-         readHeaderAndValidate(transport, messageId, CLEAR_RESPONSE);
-      } finally {
-         releaseTransport(transport);
-      }
-   }
-
-   public Map<String, String> stats() {
-      Transport transport = transportFactory.getTransport();
-      try {
-         // 1) write header
-         long messageId = writeHeader(transport, STATS_REQUEST);
-         readHeaderAndValidate(transport, messageId, STATS_RESPONSE);
-         int nrOfStats = transport.readVInt();
-
-         Map<String, String> result = new HashMap<String, String>();
-         for (int i = 0; i < nrOfStats; i++) {
-            String statName = transport.readString();
-            String statValue = transport.readString();
-            result.put(statName, statValue);
-         }
-         return result;
-      } finally {
-         releaseTransport(transport);
-      }
-   }
-
-   @Override
-   public boolean ping() {
-      Transport transport = null;
-      try {
-         transport = transportFactory.getTransport();
-         // 1) write header
-         long messageId = writeHeader(transport, PING_REQUEST);
-         short respStatus = readHeaderAndValidate(transport, messageId, HotrodConstants.PING_RESPONSE);
-         if (respStatus == NO_ERROR_STATUS) {
-            return true;
-         }
-         throw new IllegalStateException("Unknown response status: " + Integer.toHexString(respStatus));
-      } catch (TransportException te) {
-         log.trace("Exception while ping", te);
-         return false;
-      }
-      finally {
-         releaseTransport(transport);
-      }
-   }
-
-   //[header][key length][key][lifespan][max idle][value length][value]
-
-   private short sendPutOperation(byte[] key, byte[] value, Transport transport, short opCode, byte opRespCode, int lifespan, int maxIdle, Flag[] flags) {
-      // 1) write header
-      long messageId = writeHeader(transport, opCode, flags);
-
-      // 2) write key and value
-      transport.writeArray(key);
-      transport.writeVInt(lifespan);
-      transport.writeVInt(maxIdle);
-      transport.writeArray(value);
-      transport.flush();
-
-      // 3) now read header
-
-      //return status (not error status for sure)
-      return readHeaderAndValidate(transport, messageId, opRespCode);
-   }
-
-   /*
-    * Magic	| MessageId	| Version | Opcode | CacheNameLength | CacheName | Flags | Client Intelligence | Topology Id
-    */
-
-   private long writeHeader(Transport transport, short operationCode, Flag... flags) {
-      transport.writeByte(REQUEST_MAGIC);
-      long messageId = MSG_ID.incrementAndGet();
-      transport.writeVLong(messageId);
-      transport.writeByte(HOTROD_VERSION);
-      transport.writeByte(operationCode);
-      transport.writeArray(cacheNameBytes);
-
-      int flagInt = 0;
-      if (flags != null) {
-         for (Flag flag : flags) {
-            flagInt = flag.getFlagInt() | flagInt;
-         }
-      }
-      transport.writeVInt(flagInt);
-      transport.writeByte(clientIntelligence);
-      transport.writeVInt(TOPOLOGY_ID.get());
-      if (log.isTraceEnabled()) {
-         log.trace("wrote header for message " + messageId + ". Operation code: " + operationCode + ". Flags: " + Integer.toHexString(flagInt));
-      }
-      return messageId;
-   }
-
-   /**
-    * Magic	| Message Id | Op code | Status | Topology Change Marker
-    */
-   private short readHeaderAndValidate(Transport transport, long messageId, short opRespCode) {
-      short magic = transport.readByte();
-      if (magic != RESPONSE_MAGIC) {
-         String message = "Invalid magic number. Expected " + Integer.toHexString(RESPONSE_MAGIC) + " and received " + Integer.toHexString(magic);
-         log.error(message);
-         throw new InvalidResponseException(message);
-      }
-      long receivedMessageId = transport.readVLong();
-      if (receivedMessageId != messageId) {
-         String message = "Invalid message id. Expected " + Long.toHexString(messageId) + " and received " + Long.toHexString(receivedMessageId);
-         log.error(message);
-         throw new InvalidResponseException(message);
-      }
-      if (log.isTraceEnabled()) {
-         log.trace("Received response for message id: " + receivedMessageId);
-      }
-      short receivedOpCode = transport.readByte();
-      if (receivedOpCode != opRespCode) {
-         if (receivedOpCode == ERROR_RESPONSE) {
-            checkForErrorsInResponseStatus(transport.readByte(), messageId, transport);
-            throw new IllegalStateException("Error expected! (i.e. exception in the prev statement)");
-         }
-         throw new InvalidResponseException("Invalid response operation. Expected " + Integer.toHexString(opRespCode) + " and received " + Integer.toHexString(receivedOpCode));
-      }
-      if (log.isTraceEnabled()) {
-         log.trace("Received operation code is: " + receivedOpCode);
-      }
-      short status = transport.readByte();
-      checkForErrorsInResponseStatus(status, messageId, transport);
-      short topologyChangeByte = transport.readByte();
-      if (topologyChangeByte == 1) {
-         readNewTopologyAndHash(transport);
-      }
-      return status;
-   }
-
-   private void readNewTopologyAndHash(Transport transport) {
-      int newTopologyId = transport.readVInt();
-      TOPOLOGY_ID.set(newTopologyId);
-      int numKeyOwners = transport.readUnsignedShort();
-      short hashFunctionVersion = transport.readByte();
-      int hashSpace = transport.readVInt();
-      int clusterSize = transport.readVInt();
-
-      if (log.isTraceEnabled()) {
-         log.trace("Topology change request: newTopologyId=" + newTopologyId + ", numKeyOwners=" + numKeyOwners +
-               ", hashFunctionVersion=" + hashFunctionVersion + ", hashSpaceSize=" + hashSpace + ", clusterSize=" + clusterSize);
-      }
-
-      LinkedHashMap<InetSocketAddress, Integer> servers2HashCode = new LinkedHashMap<InetSocketAddress, Integer>();
-
-      for (int i = 0; i < clusterSize; i++) {
-         String host = transport.readString();
-         int port = transport.readUnsignedShort();
-         if (log.isTraceEnabled()) {
-            log.trace("Server read:" + host + ":" + port);
-         }
-         int hashCode = transport.read4ByteInt();
-         servers2HashCode.put(new InetSocketAddress(host, port), hashCode);
-         if (log.isTraceEnabled()) {
-            log.trace("Hash code is: " + hashCode);
-         }
-      }
-      if (log.isInfoEnabled()) {
-         log.info("New topology: " + servers2HashCode);
-      }
-      transportFactory.updateServers(servers2HashCode.keySet());
-      if (hashFunctionVersion == 0) {
-         if (log.isTraceEnabled())
-            log.trace("Not using a consistent hash function (hash function version == 0).");
-      } else {
-         transportFactory.updateHashFunction(servers2HashCode, numKeyOwners, hashFunctionVersion, hashSpace);
-      }
-   }
-
-   private void checkForErrorsInResponseStatus(short status, long messageId, Transport transport) {
-      if (log.isTraceEnabled()) {
-         log.trace("Received operation status: " + status);
-      }
-      switch ((int) status) {
-         case INVALID_MAGIC_OR_MESSAGE_ID_STATUS:
-         case REQUEST_PARSING_ERROR_STATUS:
-         case UNKNOWN_COMMAND_STATUS:
-         case SERVER_ERROR_STATUS:
-         case UNKNOWN_VERSION_STATUS: {
-            String msgFromServer = transport.readString();
-            if (log.isWarnEnabled()) {
-               log.warn("Error status received from the server:" + msgFromServer + " for message id " + messageId);
-            }
-            throw new HotRodClientException(msgFromServer, messageId, status);
-         }
-         case COMMAND_TIMEOUT_STATUS: {
-            if (log.isTraceEnabled()) {
-               log.trace("timeout message received from the server");
-            }
-            throw new TimeoutException();
-         }
-         case NO_ERROR_STATUS:
-         case KEY_DOES_NOT_EXIST_STATUS:
-         case NOT_PUT_REMOVED_REPLACED_STATUS: {
-            //don't do anything, these are correct responses
-            break;
-         }
-         default: {
-            throw new IllegalStateException("Unknown status: " + Integer.toHexString(status));
-         }
-      }
-   }
-
-   private boolean hasForceReturn(Flag[] flags) {
-      if (flags == null) return false;
-      for (Flag flag : flags) {
-         if (flag == Flag.FORCE_RETURN_VALUE) return true;
-      }
-      return false;
-   }
-
-   private short sendKeyOperation(byte[] key, Transport transport, byte opCode, Flag[] flags, byte opRespCode) {
-      // 1) write [header][key length][key]
-      long messageId = writeHeader(transport, opCode, flags);
-      transport.writeArray(key);
-      transport.flush();
-
-      // 2) now read the header
-      return readHeaderAndValidate(transport, messageId, opRespCode);
-   }
-
-   private byte[] returnPossiblePrevValue(Transport transport, Flag[] flags) {
-      if (hasForceReturn(flags)) {
-         byte[] bytes = transport.readArray();
-         if (log.isTraceEnabled()) log.trace("Previous value bytes is: " + Arrays.toString(bytes));
-         //0-length response means null
-         return bytes.length == 0 ? null : bytes;
-      } else {
-         return null;
-      }
-   }
-
-   private void releaseTransport(Transport transport) {
-      if (transport != null)
-         transportFactory.releaseTransport(transport);
-   }
-
-   private VersionedOperationResponse returnVersionedOperationResponse(Transport transport, long messageId, byte response, Flag[] flags) {
-      //3) ...
-      short respStatus = readHeaderAndValidate(transport, messageId, response);
-
-      //4 ...
-      VersionedOperationResponse.RspCode code;
-      if (respStatus == NO_ERROR_STATUS) {
-         code = VersionedOperationResponse.RspCode.SUCCESS;
-      } else if (respStatus == NOT_PUT_REMOVED_REPLACED_STATUS) {
-         code = VersionedOperationResponse.RspCode.MODIFIED_KEY;
-      } else if (respStatus == KEY_DOES_NOT_EXIST_STATUS) {
-         code = VersionedOperationResponse.RspCode.NO_SUCH_KEY;
-      } else {
-         throw new IllegalStateException("Unknown response status: " + Integer.toHexString(respStatus));
-      }
-      byte[] prevValue = returnPossiblePrevValue(transport, flags);
-      return new VersionedOperationResponse(prevValue, code);
-   }
-}

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/async/DefaultAsyncExecutorFactory.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/async/DefaultAsyncExecutorFactory.java	2010-05-18 15:39:00 UTC (rev 1804)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/async/DefaultAsyncExecutorFactory.java	2010-05-18 15:40:48 UTC (rev 1805)
@@ -4,16 +4,19 @@
 
 import java.util.Properties;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
- * // TODO: Document this
+ * Default implementation for {@link org.infinispan.executors.ExecutorFactory} based on an {@link ThreadPoolExecutor}.
+ * Accepts following configuration parameters:
+ * <ul>
+ *  <li> - default-executor-factory.poolSize = the fixed size fo the pool</li>
+ *  <li> - default-executor-factory.queueSize = The size of the {@link LinkedBlockingQueue} backing up the executor</li>
+ * </ul>
  *
  * @author Mircea.Markus at jboss.com
  * @since 4.1

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/async/NotifyingFutureImpl.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/async/NotifyingFutureImpl.java	2010-05-18 15:39:00 UTC (rev 1804)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/async/NotifyingFutureImpl.java	2010-05-18 15:40:48 UTC (rev 1805)
@@ -10,7 +10,7 @@
 import java.util.concurrent.TimeoutException;
 
 /**
- * // TODO: Document this
+ * Notifying future implementation for async calls.
  *
  * @author Mircea.Markus at jboss.com
  * @since 4.1

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/consistenthash/ConsistentHash.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/consistenthash/ConsistentHash.java	2010-05-18 15:39:00 UTC (rev 1804)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/consistenthash/ConsistentHash.java	2010-05-18 15:40:48 UTC (rev 1805)
@@ -5,7 +5,7 @@
 import java.util.Map;
 
 /**
- * // TODO: Document this
+ * Abstraction for the used consistent hash.
  *
  * @author Mircea.Markus at jboss.com
  * @since 4.1

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/consistenthash/ConsistentHashFactory.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/consistenthash/ConsistentHashFactory.java	2010-05-18 15:39:00 UTC (rev 1804)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/consistenthash/ConsistentHashFactory.java	2010-05-18 15:40:48 UTC (rev 1805)
@@ -4,29 +4,38 @@
 import org.infinispan.util.logging.Log;
 import org.infinispan.util.logging.LogFactory;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 
 /**
- * // TODO: Document this
+ * Factory for {@link org.infinispan.client.hotrod.impl.consistenthash.ConsistentHash} function. It will try to look
+ * into the configuration for consistent hash definitions as follows:
+ *   consistent-hash.[version]=[fully qualified class implementing ConsistentHash]
+ * e.g.
+ * consistent-hash.1=org.infinispan.client.hotrod.impl.consistenthash.ConsitentHashV1
+ * <p/>
+ *  If no CH function is defined for a certain version, then it will be defaulted to "org.infinispan.client.hotrod.impl.ConsistentHashV[version]".
+ * E.g. if the server indicates that in use CH is version 1, and it is not defined within the configuration, it will be defaulted to
+ * org.infinispan.client.hotrod.impl.ConsistentHashV1. 
  *
  * @author Mircea.Markus at jboss.com
  * @since 4.1
  */
 public class ConsistentHashFactory {
-   
+
    private static Log log = LogFactory.getLog(ConsistentHashFactory.class);
 
-   public final Map<Integer, String> version2ConsistentHash = new HashMap<Integer, String>();
+   private final Map<Integer, String> version2ConsistentHash = new HashMap<Integer, String>();
 
    public void init(Properties props) {
       for (String propName : props.stringPropertyNames()) {
-         if (propName.indexOf("consistent-hash") >=0) {
+         if (propName.indexOf("consistent-hash") >= 0) {
             if (log.isTraceEnabled()) log.trace("Processing consistent hash: " + propName);
             String versionString = propName.substring("consistent-hash.".length());
             int version = Integer.parseInt(versionString);
-            String hashFunction = props.getProperty(versionString);
+            String hashFunction = props.getProperty(propName);
             version2ConsistentHash.put(version, hashFunction);
             if (log.isTraceEnabled()) {
                log.trace("Added consistent hash version " + version + ": " + hashFunction);
@@ -38,8 +47,8 @@
    public ConsistentHash newConsistentHash(int version) {
       String hashFunctionClass = version2ConsistentHash.get(version);
       if (hashFunctionClass == null) {
-         log.trace("No hash function configured for version " + version);
-         hashFunctionClass = ConsistentHashFactory.class.getPackage().getName() + ".ConsitentHashV" + version;
+         if (log.isTraceEnabled()) log.trace("No hash function configured for version " + version);
+         hashFunctionClass = ConsistentHashFactory.class.getPackage().getName() + ".ConsistentHashV" + version;
          if (log.isTraceEnabled()) log.trace("Trying to use default value: " + hashFunctionClass);
       }
       ConsistentHash consistentHash = null;
@@ -50,4 +59,8 @@
       }
       return consistentHash;
    }
+
+   public Map<Integer, String> getVersion2ConsistentHash() {
+      return Collections.unmodifiableMap(version2ConsistentHash);
+   }
 }

Copied: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/consistenthash/ConsistentHashV1.java (from rev 1800, trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/consistenthash/ConsitentHashV1.java)
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/consistenthash/ConsistentHashV1.java	                        (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/consistenthash/ConsistentHashV1.java	2010-05-18 15:40:48 UTC (rev 1805)
@@ -0,0 +1,59 @@
+package org.infinispan.client.hotrod.impl.consistenthash;
+
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+
+import java.net.InetSocketAddress;
+import java.util.LinkedHashMap;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import static org.infinispan.util.hash.MurmurHash2.hash;
+
+/**
+ * Version one consistent hash function based on {@link org.infinispan.util.hash.MurmurHash2};
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public class ConsistentHashV1 implements ConsistentHash {
+
+   private static Log log = LogFactory.getLog(ConsistentHashV1.class);
+   
+   private final SortedMap<Integer, InetSocketAddress> positions = new TreeMap<Integer, InetSocketAddress>();
+
+   private volatile int hashSpace;
+
+   @Override
+   public void init(LinkedHashMap<InetSocketAddress,Integer> servers2HashCode, int numKeyOwners, int hashSpace) {
+      for (InetSocketAddress addr :servers2HashCode.keySet()) {
+         positions.put(servers2HashCode.get(addr), addr);
+      }
+      if (log.isTraceEnabled())
+         log.trace("Positions are: " + positions);
+      this.hashSpace = hashSpace;
+   }
+
+   @Override
+   public InetSocketAddress getServer(byte[] key) {
+      int keyHashCode = hash(key);
+      if (keyHashCode == Integer.MIN_VALUE) keyHashCode += 1;
+      int hash = Math.abs(keyHashCode);
+
+      SortedMap<Integer, InetSocketAddress> candidates = positions.tailMap(hash % hashSpace);
+      if (log.isTraceEnabled()) {
+         log.trace("Found possible candidates: " + candidates);
+      }
+      if (candidates.isEmpty()) {
+         InetSocketAddress socketAddress = positions.get(positions.firstKey());
+         if (log.isTraceEnabled()) {
+            log.trace("Over the wheel, returning first member: " + socketAddress);
+         }
+         return socketAddress;
+      } else {
+         InetSocketAddress socketAddress = candidates.get(candidates.firstKey());
+         log.trace("Found candidate: " + socketAddress);
+         return socketAddress;
+      }
+   }
+}

Deleted: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/consistenthash/ConsitentHashV1.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/consistenthash/ConsitentHashV1.java	2010-05-18 15:39:00 UTC (rev 1804)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/consistenthash/ConsitentHashV1.java	2010-05-18 15:40:48 UTC (rev 1805)
@@ -1,59 +0,0 @@
-package org.infinispan.client.hotrod.impl.consistenthash;
-
-import org.infinispan.util.logging.Log;
-import org.infinispan.util.logging.LogFactory;
-
-import java.net.InetSocketAddress;
-import java.util.LinkedHashMap;
-import java.util.SortedMap;
-import java.util.TreeMap;
-
-import static org.infinispan.util.hash.MurmurHash2.hash;
-
-/**
- * // TODO: Document this
- *
- * @author Mircea.Markus at jboss.com
- * @since 4.1
- */
-public class ConsitentHashV1 implements ConsistentHash {
-
-   private static Log log = LogFactory.getLog(ConsitentHashV1.class);
-   
-   private final SortedMap<Integer, InetSocketAddress> positions = new TreeMap<Integer, InetSocketAddress>();
-
-   private volatile int hashSpace;
-
-   @Override
-   public void init(LinkedHashMap<InetSocketAddress,Integer> servers2HashCode, int numKeyOwners, int hashSpace) {
-      for (InetSocketAddress addr :servers2HashCode.keySet()) {
-         positions.put(servers2HashCode.get(addr), addr);
-      }
-      if (log.isTraceEnabled())
-         log.trace("Positions are: " + positions);
-      this.hashSpace = hashSpace;
-   }
-
-   @Override
-   public InetSocketAddress getServer(byte[] key) {
-      int keyHashCode = hash(key);
-      if (keyHashCode == Integer.MIN_VALUE) keyHashCode += 1;
-      int hash = Math.abs(keyHashCode);
-
-      SortedMap<Integer, InetSocketAddress> candidates = positions.tailMap(hash % hashSpace);
-      if (log.isTraceEnabled()) {
-         log.trace("Found possible candidates: " + candidates);
-      }
-      if (candidates.isEmpty()) {
-         InetSocketAddress socketAddress = positions.get(positions.firstKey());
-         if (log.isTraceEnabled()) {
-            log.trace("Over the wheel, returning first member: " + socketAddress);
-         }
-         return socketAddress;
-      } else {
-         InetSocketAddress socketAddress = candidates.get(candidates.firstKey());
-         log.trace("Found candidate: " + socketAddress);
-         return socketAddress;
-      }
-   }
-}

Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodOperationsHelper.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodOperationsHelper.java	                        (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodOperationsHelper.java	2010-05-18 15:40:48 UTC (rev 1805)
@@ -0,0 +1,162 @@
+package org.infinispan.client.hotrod.impl.protocol;
+
+import org.infinispan.client.hotrod.Flag;
+import org.infinispan.client.hotrod.exceptions.HotRodClientException;
+import org.infinispan.client.hotrod.exceptions.InvalidResponseException;
+import org.infinispan.client.hotrod.exceptions.HotRodTimeoutException;
+import org.infinispan.client.hotrod.impl.protocol.HotrodConstants;
+import org.infinispan.client.hotrod.impl.transport.Transport;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+
+import java.net.InetSocketAddress;
+import java.util.LinkedHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Helper class for factorizing common parts of read/write operations.
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public class HotRodOperationsHelper {
+   static Log log = LogFactory.getLog(HotrodOperationsImpl.class);
+   static final AtomicLong MSG_ID = new AtomicLong();
+   final static byte CLIENT_INTELLIGENCE = HotrodConstants.CLIENT_INTELLIGENCE_HASH_DISTRIBUTION_AWARE;
+
+   public static long writeHeader(Transport transport, short operationCode, String cacheName, AtomicInteger topologyId, Flag... flags) {
+      transport.writeByte(HotrodConstants.REQUEST_MAGIC);
+      long messageId = MSG_ID.incrementAndGet();
+      transport.writeVLong(messageId);
+      transport.writeByte(HotrodConstants.HOTROD_VERSION);
+      transport.writeByte(operationCode);
+      transport.writeArray(cacheName.getBytes());
+
+      int flagInt = 0;
+      if (flags != null) {
+         for (Flag flag : flags) {
+            flagInt = flag.getFlagInt() | flagInt;
+         }
+      }
+      transport.writeVInt(flagInt);
+      transport.writeByte(CLIENT_INTELLIGENCE);
+      transport.writeVInt(topologyId.get());
+      if (log.isTraceEnabled()) {
+         log.trace("wrote header for message " + messageId + ". Operation code: " + operationCode + ". Flags: " + Integer.toHexString(flagInt));
+      }
+      return messageId;
+   }
+
+   /**
+    * Magic	| Message Id | Op code | Status | Topology Change Marker
+    */
+   public static short readHeaderAndValidate(Transport transport, long messageId, short opRespCode, AtomicInteger topologyId) {
+      short magic = transport.readByte();
+      if (magic != HotrodConstants.RESPONSE_MAGIC) {
+         String message = "Invalid magic number. Expected " + Integer.toHexString(HotrodConstants.RESPONSE_MAGIC) + " and received " + Integer.toHexString(magic);
+         log.error(message);
+         throw new InvalidResponseException(message);
+      }
+      long receivedMessageId = transport.readVLong();
+      if (receivedMessageId != messageId) {
+         String message = "Invalid message id. Expected " + Long.toHexString(messageId) + " and received " + Long.toHexString(receivedMessageId);
+         log.error(message);
+         throw new InvalidResponseException(message);
+      }
+      if (log.isTraceEnabled()) {
+         log.trace("Received response for message id: " + receivedMessageId);
+      }
+      short receivedOpCode = transport.readByte();
+      if (receivedOpCode != opRespCode) {
+         if (receivedOpCode == HotrodConstants.ERROR_RESPONSE) {
+            checkForErrorsInResponseStatus(transport.readByte(), messageId, transport);
+            throw new IllegalStateException("Error expected! (i.e. exception in the prev statement)");
+         }
+         throw new InvalidResponseException("Invalid response operation. Expected " + Integer.toHexString(opRespCode) + " and received " + Integer.toHexString(receivedOpCode));
+      }
+      if (log.isTraceEnabled()) {
+         log.trace("Received operation code is: " + receivedOpCode);
+      }
+      short status = transport.readByte();
+      checkForErrorsInResponseStatus(status, messageId, transport);
+      short topologyChangeByte = transport.readByte();
+      if (topologyChangeByte == 1) {
+         readNewTopologyAndHash(transport, topologyId);
+      }
+      return status;
+   }
+
+   static void readNewTopologyAndHash(Transport transport, AtomicInteger topologyId) {
+      int newTopologyId = transport.readVInt();
+      topologyId.set(newTopologyId);
+      int numKeyOwners = transport.readUnsignedShort();
+      short hashFunctionVersion = transport.readByte();
+      int hashSpace = transport.readVInt();
+      int clusterSize = transport.readVInt();
+
+      if (log.isTraceEnabled()) {
+         log.trace("Topology change request: newTopologyId=" + newTopologyId + ", numKeyOwners=" + numKeyOwners +
+               ", hashFunctionVersion=" + hashFunctionVersion + ", hashSpaceSize=" + hashSpace + ", clusterSize=" + clusterSize);
+      }
+
+      LinkedHashMap<InetSocketAddress, Integer> servers2HashCode = new LinkedHashMap<InetSocketAddress, Integer>();
+
+      for (int i = 0; i < clusterSize; i++) {
+         String host = transport.readString();
+         int port = transport.readUnsignedShort();
+         if (log.isTraceEnabled()) {
+            log.trace("Server read:" + host + ":" + port);
+         }
+         int hashCode = transport.read4ByteInt();
+         servers2HashCode.put(new InetSocketAddress(host, port), hashCode);
+         if (log.isTraceEnabled()) {
+            log.trace("Hash code is: " + hashCode);
+         }
+      }
+      if (log.isInfoEnabled()) {
+         log.info("New topology: " + servers2HashCode);
+      }
+      transport.getTransportFactory().updateServers(servers2HashCode.keySet());
+      if (hashFunctionVersion == 0) {
+         if (log.isTraceEnabled())
+            log.trace("Not using a consistent hash function (hash function version == 0).");
+      } else {
+         transport.getTransportFactory().updateHashFunction(servers2HashCode, numKeyOwners, hashFunctionVersion, hashSpace);
+      }
+   }
+
+   static void checkForErrorsInResponseStatus(short status, long messageId, Transport transport) {
+      if (log.isTraceEnabled()) {
+         log.trace("Received operation status: " + status);
+      }
+      switch ((int) status) {
+         case HotrodConstants.INVALID_MAGIC_OR_MESSAGE_ID_STATUS:
+         case HotrodConstants.REQUEST_PARSING_ERROR_STATUS:
+         case HotrodConstants.UNKNOWN_COMMAND_STATUS:
+         case HotrodConstants.SERVER_ERROR_STATUS:
+         case HotrodConstants.UNKNOWN_VERSION_STATUS: {
+            String msgFromServer = transport.readString();
+            if (log.isWarnEnabled()) {
+               log.warn("Error status received from the server:" + msgFromServer + " for message id " + messageId);
+            }
+            throw new HotRodClientException(msgFromServer, messageId, status);
+         }
+         case HotrodConstants.COMMAND_TIMEOUT_STATUS: {
+            if (log.isTraceEnabled()) {
+               log.trace("timeout message received from the server");
+            }
+            throw new HotRodTimeoutException();
+         }
+         case HotrodConstants.NO_ERROR_STATUS:
+         case HotrodConstants.KEY_DOES_NOT_EXIST_STATUS:
+         case HotrodConstants.NOT_PUT_REMOVED_REPLACED_STATUS: {
+            //don't do anything, these are correct responses
+            break;
+         }
+         default: {
+            throw new IllegalStateException("Unknown status: " + Integer.toHexString(status));
+         }
+      }
+   }
+}

Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotrodConstants.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotrodConstants.java	                        (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotrodConstants.java	2010-05-18 15:40:48 UTC (rev 1805)
@@ -0,0 +1,61 @@
+package org.infinispan.client.hotrod.impl.protocol;
+
+/**
+ * Defines constants defined by Hotrod specifications.
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public interface HotrodConstants {
+
+   public static final short REQUEST_MAGIC = 0xA0;
+   public static final short RESPONSE_MAGIC = 0xA1;
+
+   public static final byte HOTROD_VERSION = 10;
+
+   //requests
+   public static final byte PUT_REQUEST = 0x01;
+   public static final byte GET_REQUEST = 0x03;
+   public static final byte PUT_IF_ABSENT_REQUEST = 0x05;
+   public static final byte REPLACE_REQUEST = 0x07;
+   public static final byte REPLACE_IF_UNMODIFIED_REQUEST = 0x09;
+   public static final byte REMOVE_REQUEST = 0x0B;
+   public static final byte REMOVE_IF_UNMODIFIED_REQUEST = 0x0D;
+   public static final byte CONTAINS_KEY_REQUEST = 0x0F;
+   public static final byte GET_WITH_VERSION = 0x11;
+   public static final byte CLEAR_REQUEST = 0x13;
+   public static final byte STATS_REQUEST = 0x15;
+   public static final byte PING_REQUEST = 0x17;
+
+
+   //responses
+   public static final byte PUT_RESPONSE = 0x02;
+   public static final byte GET_RESPONSE = 0x04;
+   public static final byte PUT_IF_ABSENT_RESPONSE = 0x06;
+   public static final byte REPLACE_RESPONSE = 0x08;
+   public static final byte REPLACE_IF_UNMODIFIED_RESPONSE = 0x0A;
+   public static final byte REMOVE_RESPONSE = 0x0C;
+   public static final byte REMOVE_IF_UNMODIFIED_RESPONSE = 0x0E;
+   public static final byte CONTAINS_KEY_RESPONSE = 0x10;
+   public static final byte GET_WITH_VERSION_RESPONSE = 0x12;
+   public static final byte CLEAR_RESPONSE = 0x14;
+   public static final byte STATS_RESPONSE = 0x16;
+   public static final byte PING_RESPONSE = 0x18;
+   public static final byte ERROR_RESPONSE = 0x50;
+
+   //response status
+   public static final byte NO_ERROR_STATUS = 0x00;
+   public static final int INVALID_MAGIC_OR_MESSAGE_ID_STATUS = 0x81;
+   public static final int REQUEST_PARSING_ERROR_STATUS = 0x84;
+   public static final byte NOT_PUT_REMOVED_REPLACED_STATUS = 0x01;
+   public static final int UNKNOWN_COMMAND_STATUS = 0x82;
+   public static final int SERVER_ERROR_STATUS = 0x85;
+   public static final int KEY_DOES_NOT_EXIST_STATUS = 0x02;
+   public static final int UNKNOWN_VERSION_STATUS = 0x83;
+   public static final int COMMAND_TIMEOUT_STATUS = 0x86;
+
+
+   public static final byte CLIENT_INTELLIGENCE_BASIC = 0x01;
+   public static final byte CLIENT_INTELLIGENCE_TOPOLOGY_AWARE = 0x02;
+   public static final byte CLIENT_INTELLIGENCE_HASH_DISTRIBUTION_AWARE = 0x03;
+}

Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotrodOperations.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotrodOperations.java	                        (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotrodOperations.java	2010-05-18 15:40:48 UTC (rev 1805)
@@ -0,0 +1,68 @@
+package org.infinispan.client.hotrod.impl.protocol;
+
+
+import org.infinispan.client.hotrod.Flag;
+import org.infinispan.client.hotrod.impl.BinaryVersionedValue;
+import org.infinispan.client.hotrod.impl.VersionedOperationResponse;
+
+import java.util.Map;
+
+/**
+ * Defines the hotrod operations as described in the protocol spec: http://community.jboss.org/wiki/HotRodProtocol
+ *
+ * - TODO - enforce encoding and add such tests
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public interface HotrodOperations {
+
+   public byte[] get(byte[] key, Flag... flags);
+
+   public byte[] remove(byte[] key, Flag... flags);
+
+   public boolean containsKey(byte[] key, Flag... flags);
+
+   /**
+    * Returns null if the given key does not exist.
+    */
+   public BinaryVersionedValue getWithVersion(byte[] key, Flag... flags);
+
+   /**
+    * @param lifespan number of seconds that a entry during which the entry is allowed to life.
+    * If number of seconds is bigger than 30 days, this number of seconds is treated as UNIX time and so, represents
+    * the number of seconds since 1/1/1970. If set to 0, lifespan is unlimited.
+    * @param maxIdle Number of seconds that a entry can be idle before it's evicted from the cache. If 0, no max
+    * @param flags
+    */
+   public byte[] put(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags);
+
+   /**
+    * @param lifespan same as in {@link #put(byte[],byte[],int,int,org.infinispan.client.hotrod.Flag...)}
+    * @param maxIdle same as in {@link #put(byte[],byte[],int,int,org.infinispan.client.hotrod.Flag...)}
+    * @param flags
+    */
+   public byte[] putIfAbsent(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags);
+
+   /**
+    * @param lifespan same as in {@link #put(byte[],byte[],int,int,org.infinispan.client.hotrod.Flag...)}
+    * @param maxIdle same as in {@link #put(byte[],byte[],int,int,org.infinispan.client.hotrod.Flag...)}
+    * @param flags
+    */
+   public byte[] replace(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags);
+
+   /**
+    * @param lifespan same as in {@link #put(byte[],byte[],int,int,org.infinispan.client.hotrod.Flag...)}
+    * @param maxIdle same as in {@link #put(byte[],byte[],int,int,org.infinispan.client.hotrod.Flag...)}
+    * @param flags
+    */
+   public VersionedOperationResponse replaceIfUnmodified(byte[] key, byte[] value, int lifespan, int maxIdle, long version, Flag... flags);
+
+   public VersionedOperationResponse removeIfUnmodified(byte[] key, long version, Flag... flags);
+
+   public void clear(Flag... flags);
+
+   public Map<String, String> stats();
+
+   public boolean ping();
+}

Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotrodOperationsImpl.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotrodOperationsImpl.java	                        (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotrodOperationsImpl.java	2010-05-18 15:40:48 UTC (rev 1805)
@@ -0,0 +1,320 @@
+package org.infinispan.client.hotrod.impl.protocol;
+
+import org.infinispan.client.hotrod.Flag;
+import org.infinispan.client.hotrod.exceptions.InvalidResponseException;
+import org.infinispan.client.hotrod.exceptions.TransportException;
+import org.infinispan.client.hotrod.impl.BinaryVersionedValue;
+import org.infinispan.client.hotrod.impl.VersionedOperationResponse;
+import org.infinispan.client.hotrod.impl.protocol.HotrodConstants;
+import org.infinispan.client.hotrod.impl.transport.Transport;
+import org.infinispan.client.hotrod.impl.transport.TransportFactory;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public class HotrodOperationsImpl implements HotrodOperations, HotrodConstants {
+
+   private static Log log = LogFactory.getLog(HotrodOperationsImpl.class);
+
+   private final String cacheName;
+   private TransportFactory transportFactory;
+   private final AtomicInteger topologyId;
+
+   public HotrodOperationsImpl(String cacheName, TransportFactory transportFactory, AtomicInteger topologyId) {
+      this.cacheName = cacheName; //todo add charset here
+      this.transportFactory = transportFactory;
+      this.topologyId = topologyId;
+   }
+
+   public byte[] get(byte[] key, Flag[] flags) {
+      Transport transport = transportFactory.getTransport(key);
+      try {
+         short status = sendKeyOperation(key, transport, GET_REQUEST, flags, GET_RESPONSE);
+         if (status == KEY_DOES_NOT_EXIST_STATUS) {
+            return null;
+         }
+         if (status == NO_ERROR_STATUS) {
+            return transport.readArray();
+         }
+      } finally {
+         releaseTransport(transport);
+      }
+      throw new IllegalStateException("We should not reach here!");
+   }
+
+   public byte[] remove(byte[] key, Flag[] flags) {
+      Transport transport = transportFactory.getTransport(key);
+      try {
+         short status = sendKeyOperation(key, transport, REMOVE_REQUEST, flags, REMOVE_RESPONSE);
+         if (status == KEY_DOES_NOT_EXIST_STATUS) {
+            return null;
+         } else if (status == NO_ERROR_STATUS) {
+            return returnPossiblePrevValue(transport, flags);
+         }
+      } finally {
+         releaseTransport(transport);
+      }
+      throw new IllegalStateException("We should not reach here!");
+   }
+
+   public boolean containsKey(byte[] key, Flag... flags) {
+      Transport transport = transportFactory.getTransport(key);
+      try {
+         short status = sendKeyOperation(key, transport, CONTAINS_KEY_REQUEST, flags, CONTAINS_KEY_RESPONSE);
+         if (status == KEY_DOES_NOT_EXIST_STATUS) {
+            return false;
+         } else if (status == NO_ERROR_STATUS) {
+            return true;
+         }
+      } finally {
+         releaseTransport(transport);
+      }
+      throw new IllegalStateException("We should not reach here!");
+   }
+
+   public BinaryVersionedValue getWithVersion(byte[] key, Flag... flags) {
+      Transport transport = transportFactory.getTransport(key);
+      try {
+         short status = sendKeyOperation(key, transport, GET_WITH_VERSION, flags, GET_WITH_VERSION_RESPONSE);
+         if (status == KEY_DOES_NOT_EXIST_STATUS) {
+            return null;
+         }
+         if (status == NO_ERROR_STATUS) {
+            long version = transport.readLong();
+            if (log.isTraceEnabled()) {
+               log.trace("Received version: " + version);
+            }
+            byte[] value = transport.readArray();
+            return new BinaryVersionedValue(version, value);
+         }
+      } finally {
+         releaseTransport(transport);
+      }
+      throw new IllegalStateException("We should not reach here!");
+   }
+
+
+   public byte[] put(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags) {
+      Transport transport = transportFactory.getTransport(key);
+      try {
+         short status = sendPutOperation(key, value, transport, PUT_REQUEST, PUT_RESPONSE, lifespan, maxIdle, flags);
+         if (status != NO_ERROR_STATUS) {
+            throw new InvalidResponseException("Unexpected response status: " + Integer.toHexString(status));
+         }
+         return returnPossiblePrevValue(transport, flags);
+      } finally {
+         releaseTransport(transport);
+      }
+   }
+
+   public byte[] putIfAbsent(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags) {
+      Transport transport = transportFactory.getTransport(key);
+      try {
+         short status = sendPutOperation(key, value, transport, PUT_IF_ABSENT_REQUEST, PUT_IF_ABSENT_RESPONSE, lifespan, maxIdle, flags);
+         if (status == NO_ERROR_STATUS || status == NOT_PUT_REMOVED_REPLACED_STATUS) {
+            byte[] bytes = returnPossiblePrevValue(transport, flags);
+            if (log.isTraceEnabled()) {
+               log.trace("Returning from putIfAbsent: " + Arrays.toString(bytes));
+            }
+            return bytes;
+         }
+      } finally {
+         releaseTransport(transport);
+      }
+      throw new IllegalStateException("We should not reach here!");
+   }
+
+   public byte[] replace(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags) {
+      Transport transport = transportFactory.getTransport(key);
+      try {
+         short status = sendPutOperation(key, value, transport, REPLACE_REQUEST, REPLACE_RESPONSE, lifespan, maxIdle, flags);
+         if (status == NO_ERROR_STATUS || status == NOT_PUT_REMOVED_REPLACED_STATUS) {
+            return returnPossiblePrevValue(transport, flags);
+         }
+      } finally {
+         releaseTransport(transport);
+      }
+      throw new IllegalStateException("We should not reach here!");
+   }
+
+   /**
+    * request : [header][key length][key][lifespan][max idle][entry_version][value length][value] response: If
+    * ForceReturnPreviousValue has been passed, this responses will contain previous [value length][value] for that key.
+    * If the key does not exist or previous was null, value length would be 0. Otherwise, if no ForceReturnPreviousValue
+    * was sent, the response would be empty.
+    */
+   public VersionedOperationResponse replaceIfUnmodified(byte[] key, byte[] value, int lifespan, int maxIdle, long version, Flag... flags) {
+      Transport transport = transportFactory.getTransport(key);
+      try {
+         // 1) write header
+         long messageId = HotRodOperationsHelper.writeHeader(transport, REPLACE_IF_UNMODIFIED_REQUEST, cacheName, topologyId, flags);
+
+         //2) write message body
+         transport.writeArray(key);
+         transport.writeVInt(lifespan);
+         transport.writeVInt(maxIdle);
+         transport.writeLong(version);
+         transport.writeArray(value);
+         return returnVersionedOperationResponse(transport, messageId, REPLACE_IF_UNMODIFIED_RESPONSE, flags);
+      } finally {
+         releaseTransport(transport);
+      }
+   }
+
+   /**
+    * Request: [header][key length][key][entry_version]
+    */
+   public VersionedOperationResponse removeIfUnmodified(byte[] key, long version, Flag... flags) {
+      Transport transport = transportFactory.getTransport(key);
+      try {
+         // 1) write header
+         long messageId = HotRodOperationsHelper.writeHeader(transport, REMOVE_IF_UNMODIFIED_REQUEST, cacheName, topologyId, flags);
+
+         //2) write message body
+         transport.writeArray(key);
+         transport.writeLong(version);
+
+         //process response and return
+         return returnVersionedOperationResponse(transport, messageId, REMOVE_IF_UNMODIFIED_RESPONSE, flags);
+
+      } finally {
+         releaseTransport(transport);
+      }
+   }
+
+   public void clear(Flag... flags) {
+      Transport transport = transportFactory.getTransport();
+      try {
+         // 1) write header
+         long messageId = HotRodOperationsHelper.writeHeader(transport, CLEAR_REQUEST, cacheName, topologyId, flags);
+         HotRodOperationsHelper.readHeaderAndValidate(transport, messageId, CLEAR_RESPONSE, topologyId);
+      } finally {
+         releaseTransport(transport);
+      }
+   }
+
+   public Map<String, String> stats() {
+      Transport transport = transportFactory.getTransport();
+      try {
+         // 1) write header
+         long messageId = HotRodOperationsHelper.writeHeader(transport, STATS_REQUEST, cacheName, topologyId);
+         HotRodOperationsHelper.readHeaderAndValidate(transport, messageId, STATS_RESPONSE, topologyId);
+         int nrOfStats = transport.readVInt();
+
+         Map<String, String> result = new HashMap<String, String>();
+         for (int i = 0; i < nrOfStats; i++) {
+            String statName = transport.readString();
+            String statValue = transport.readString();
+            result.put(statName, statValue);
+         }
+         return result;
+      } finally {
+         releaseTransport(transport);
+      }
+   }
+
+   @Override
+   public boolean ping() {
+      Transport transport = null;
+      try {
+         transport = transportFactory.getTransport();
+         // 1) write header
+         long messageId = HotRodOperationsHelper.writeHeader(transport, PING_REQUEST, cacheName, topologyId);
+         short respStatus = HotRodOperationsHelper.readHeaderAndValidate(transport, messageId, HotrodConstants.PING_RESPONSE, topologyId);
+         if (respStatus == NO_ERROR_STATUS) {
+            return true;
+         }
+         throw new IllegalStateException("Unknown response status: " + Integer.toHexString(respStatus));
+      } catch (TransportException te) {
+         log.trace("Exception while ping", te);
+         return false;
+      }
+      finally {
+         releaseTransport(transport);
+      }
+   }
+
+   //[header][key length][key][lifespan][max idle][value length][value]
+
+   private short sendPutOperation(byte[] key, byte[] value, Transport transport, short opCode, byte opRespCode, int lifespan, int maxIdle, Flag[] flags) {
+      // 1) write header
+      long messageId = HotRodOperationsHelper.writeHeader(transport, opCode, cacheName, topologyId, flags);
+
+      // 2) write key and value
+      transport.writeArray(key);
+      transport.writeVInt(lifespan);
+      transport.writeVInt(maxIdle);
+      transport.writeArray(value);
+      transport.flush();
+
+      // 3) now read header
+
+      //return status (not error status for sure)
+      return HotRodOperationsHelper.readHeaderAndValidate(transport, messageId, opRespCode, topologyId);
+   }
+
+   /*
+    * Magic	| MessageId	| Version | Opcode | CacheNameLength | CacheName | Flags | Client Intelligence | Topology Id
+    */
+
+   private boolean hasForceReturn(Flag[] flags) {
+      if (flags == null) return false;
+      for (Flag flag : flags) {
+         if (flag == Flag.FORCE_RETURN_VALUE) return true;
+      }
+      return false;
+   }
+
+   private short sendKeyOperation(byte[] key, Transport transport, byte opCode, Flag[] flags, byte opRespCode) {
+      // 1) write [header][key length][key]
+      long messageId = HotRodOperationsHelper.writeHeader(transport, opCode, cacheName, topologyId, flags);
+      transport.writeArray(key);
+      transport.flush();
+
+      // 2) now read the header
+      return HotRodOperationsHelper.readHeaderAndValidate(transport, messageId, opRespCode, topologyId);
+   }
+
+   private byte[] returnPossiblePrevValue(Transport transport, Flag[] flags) {
+      if (hasForceReturn(flags)) {
+         byte[] bytes = transport.readArray();
+         if (log.isTraceEnabled()) log.trace("Previous value bytes is: " + Arrays.toString(bytes));
+         //0-length response means null
+         return bytes.length == 0 ? null : bytes;
+      } else {
+         return null;
+      }
+   }
+
+   private void releaseTransport(Transport transport) {
+      if (transport != null)
+         transportFactory.releaseTransport(transport);
+   }
+
+   private VersionedOperationResponse returnVersionedOperationResponse(Transport transport, long messageId, byte response, Flag[] flags) {
+      //3) ...
+      short respStatus = HotRodOperationsHelper.readHeaderAndValidate(transport, messageId, response, topologyId);
+
+      //4 ...
+      VersionedOperationResponse.RspCode code;
+      if (respStatus == NO_ERROR_STATUS) {
+         code = VersionedOperationResponse.RspCode.SUCCESS;
+      } else if (respStatus == NOT_PUT_REMOVED_REPLACED_STATUS) {
+         code = VersionedOperationResponse.RspCode.MODIFIED_KEY;
+      } else if (respStatus == KEY_DOES_NOT_EXIST_STATUS) {
+         code = VersionedOperationResponse.RspCode.NO_SUCH_KEY;
+      } else {
+         throw new IllegalStateException("Unknown response status: " + Integer.toHexString(respStatus));
+      }
+      byte[] prevValue = returnPossiblePrevValue(transport, flags);
+      return new VersionedOperationResponse(prevValue, code);
+   }
+}

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/AbstractTransport.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/AbstractTransport.java	2010-05-18 15:39:00 UTC (rev 1804)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/AbstractTransport.java	2010-05-18 15:40:48 UTC (rev 1805)
@@ -3,8 +3,10 @@
 import org.infinispan.util.logging.Log;
 import org.infinispan.util.logging.LogFactory;
 
+import java.nio.charset.Charset;
+
 /**
- * // TODO: Document this
+ * Support class for transport implementations.
  *
  * @author Mircea.Markus at jboss.com
  * @since 4.1
@@ -13,6 +15,14 @@
 
    private static Log log = LogFactory.getLog(AbstractTransport.class);
 
+   private static final Charset CHARSET = Charset.forName("UTF-8");
+   
+   private final TransportFactory transportFactory;
+
+   protected AbstractTransport(TransportFactory transportFactory) {
+      this.transportFactory = transportFactory;
+   }
+
    public byte[] readArray() {
       int responseLength = readVInt();
       return readByteArray(responseLength);
@@ -21,16 +31,15 @@
    @Override
    public String readString() {
       byte[] strContent = readArray();
-      String readString = new String(strContent);
+      String readString = new String(strContent, CHARSET);
       if (log.isTraceEnabled()) {
          log.trace("Read string is: " + readString);
       }
-      return readString;//todo take care of encoding here
+      return readString;
    }
 
    @Override
    public long readLong() {
-      //todo - optimize this not to create the longBytes on every call, but reuse it/cache it as class is NOT thread safe
       byte[] longBytes = readByteArray(8);
       long result = 0;
       for (byte longByte : longBytes) {
@@ -71,6 +80,11 @@
       return value;
    }
 
+   @Override
+   public TransportFactory getTransportFactory() {
+      return transportFactory;
+   }
+
    public void writeArray(byte[] toAppend) {
       writeVInt(toAppend.length);
       writeBytes(toAppend);

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/Transport.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/Transport.java	2010-05-18 15:39:00 UTC (rev 1804)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/Transport.java	2010-05-18 15:40:48 UTC (rev 1805)
@@ -1,16 +1,15 @@
 package org.infinispan.client.hotrod.impl.transport;
 
-import net.jcip.annotations.NotThreadSafe;
-
 /**
- * // TODO: Document this
+ * Transport abstraction.
  *
- * @author mmarkus
+ * @author Mircea.Markus at jboss.com
  * @since 4.1
  */
- at NotThreadSafe
 public interface Transport {
 
+   public TransportFactory getTransportFactory();
+
    public void writeArray(byte[] toAppend);
 
    public void writeByte(short toWrite);

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/TransportFactory.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/TransportFactory.java	2010-05-18 15:39:00 UTC (rev 1804)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/TransportFactory.java	2010-05-18 15:40:48 UTC (rev 1805)
@@ -5,9 +5,10 @@
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
- * // TODO: Document this
+ * Transport factory for building and managing {@link org.infinispan.client.hotrod.impl.transport.Transport} objects.
  *
  * @author Mircea.Markus at jboss.com
  * @since 4.1
@@ -20,7 +21,7 @@
 
    public void releaseTransport(Transport transport);
 
-   void start(Properties props, Collection<InetSocketAddress> staticConfiguredServers);
+   void start(Properties props, Collection<InetSocketAddress> staticConfiguredServers, AtomicInteger topologyId);
 
    void updateServers(Collection<InetSocketAddress> newServers);
 

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/VHelper.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/VHelper.java	2010-05-18 15:39:00 UTC (rev 1804)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/VHelper.java	2010-05-18 15:40:48 UTC (rev 1805)
@@ -10,9 +10,9 @@
 import java.io.OutputStream;
 
 /**
- * // TODO: Document this
+ * Helper for handling v-operations.
  *
- * @author mmarkus
+ * @author Mircea.Markus at jboss.com
  * @since 4.1
  */
 public class VHelper {

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/HotrodClientDecoder.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/HotrodClientDecoder.java	2010-05-18 15:39:00 UTC (rev 1804)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/HotrodClientDecoder.java	2010-05-18 15:40:48 UTC (rev 1805)
@@ -9,8 +9,6 @@
 import org.jboss.netty.handler.codec.frame.FrameDecoder;
 
 /**
- * // TODO: Document this
- *
  * @author Mircea.Markus at jboss.com
  * @since 4.1
  */

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/HotrodClientEncoder.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/HotrodClientEncoder.java	2010-05-18 15:39:00 UTC (rev 1804)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/HotrodClientEncoder.java	2010-05-18 15:40:48 UTC (rev 1805)
@@ -11,8 +11,6 @@
 import static org.jboss.netty.buffer.ChannelBuffers.*;
 
 /**
- * // TODO: Document this
- *
  * @author Mircea.Markus at jboss.com
  * @since 4.1
  */

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/HotrodClientPipelaneFactory.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/HotrodClientPipelaneFactory.java	2010-05-18 15:39:00 UTC (rev 1804)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/HotrodClientPipelaneFactory.java	2010-05-18 15:40:48 UTC (rev 1805)
@@ -11,8 +11,6 @@
 import static org.jboss.netty.channel.Channels.*;
 
 /**
- * // TODO: Document this
- *
  * @author Mircea.Markus at jboss.com
  * @since 4.1
  */

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/InputStreamAdapter.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/InputStreamAdapter.java	2010-05-18 15:39:00 UTC (rev 1804)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/InputStreamAdapter.java	2010-05-18 15:40:48 UTC (rev 1805)
@@ -7,8 +7,6 @@
 import java.io.InputStream;
 
 /**
- * // TODO: Document this
- *
  * @author Mircea.Markus at jboss.com
  * @since 4.1
  */

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransport.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransport.java	2010-05-18 15:39:00 UTC (rev 1804)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransport.java	2010-05-18 15:40:48 UTC (rev 1805)
@@ -2,6 +2,7 @@
 
 import org.infinispan.client.hotrod.exceptions.TransportException;
 import org.infinispan.client.hotrod.impl.transport.AbstractTransport;
+import org.infinispan.client.hotrod.impl.transport.TransportFactory;
 import org.infinispan.util.logging.Log;
 import org.infinispan.util.logging.LogFactory;
 import org.jboss.netty.bootstrap.ClientBootstrap;
@@ -13,8 +14,6 @@
 import java.util.concurrent.Executors;
 
 /**
- * // TODO: Document this
- *
  * @author Mircea.Markus at jboss.com
  * @since 4.1
  */
@@ -28,7 +27,8 @@
 
    private HotrodClientDecoder decoder = new HotrodClientDecoder();
 
-   public NettyTransport(InetSocketAddress serverAddress) {
+   public NettyTransport(InetSocketAddress serverAddress, TransportFactory transportFactory) {
+      super(transportFactory);
       this.serverAddress = serverAddress;
       init();
    }

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransportFactory.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransportFactory.java	2010-05-18 15:39:00 UTC (rev 1804)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransportFactory.java	2010-05-18 15:40:48 UTC (rev 1805)
@@ -9,10 +9,9 @@
 import java.util.Collection;
 import java.util.LinkedHashMap;
 import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
- * // TODO: Document this
- *
  * @author Mircea.Markus at jboss.com
  * @since 4.1
  */
@@ -24,7 +23,7 @@
    private Collection<InetSocketAddress> serverAddresses;
 
    @Override
-   public void start(Properties props, Collection<InetSocketAddress> staticConfiguredServers) {
+   public void start(Properties props, Collection<InetSocketAddress> staticConfiguredServers, AtomicInteger topologyId) {
       this.serverAddresses = staticConfiguredServers;
       serverAddr = serverAddresses.iterator().next();
    }
@@ -37,7 +36,7 @@
    @Override
    public Transport getTransport() {
       log.info("Connecting to server on: " + serverAddr);
-      return new NettyTransport(serverAddr);
+      return new NettyTransport(serverAddr, this);
    }
 
    @Override

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/OutputStreamAdapter.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/OutputStreamAdapter.java	2010-05-18 15:39:00 UTC (rev 1804)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/OutputStreamAdapter.java	2010-05-18 15:40:48 UTC (rev 1805)
@@ -6,8 +6,6 @@
 import java.io.OutputStream;
 
 /**
- * // TODO: Document this
- *
  * @author Mircea.Markus at jboss.com
  * @since 4.1
  */

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/PropsKeyedObjectPoolFactory.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/PropsKeyedObjectPoolFactory.java	2010-05-18 15:39:00 UTC (rev 1804)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/PropsKeyedObjectPoolFactory.java	2010-05-18 15:40:48 UTC (rev 1805)
@@ -9,8 +9,6 @@
 import java.util.Properties;
 
 /**
- * // TODO: Document this
- *
  * @author Mircea.Markus at jboss.com
  * @since 4.1
  */
@@ -21,16 +19,15 @@
 
    public PropsKeyedObjectPoolFactory(KeyedPoolableObjectFactory factory, Properties props) {
       super(factory);
-      _maxActive = intProp(props, "maxActive", 1);
+      _maxActive = intProp(props, "maxActive", 2);
       _maxTotal = intProp(props, "maxTotal", -1);
-      _maxIdle = intProp(props, "maxIdle", 4);
-      int value = GenericKeyedObjectPool.WHEN_EXHAUSTED_BLOCK;
-      _whenExhaustedAction = (byte) intProp(props, "whenExhaustedAction", value);
+      _maxIdle = intProp(props, "maxIdle", 2);
+      _whenExhaustedAction = (byte) intProp(props, "whenExhaustedAction", (int) GenericKeyedObjectPool.WHEN_EXHAUSTED_BLOCK);
       _testOnBorrow = booleanProp(props, "testOnBorrow", false);
       _testOnReturn = booleanProp(props, "testOnReturn", false);
-      _timeBetweenEvictionRunsMillis = intProp(props, "timeBetweenEvictionRunsMillis", -1);
-      _minEvictableIdleTimeMillis = longProp(props, "minEvictableIdleTimeMillis", 1800000L);
-      _testWhileIdle = booleanProp(props, "testWhileIdle", false);
+      _timeBetweenEvictionRunsMillis = intProp(props, "timeBetweenEvictionRunsMillis", 5 * 60 * 1000);
+      _minEvictableIdleTimeMillis = longProp(props, "minEvictableIdleTimeMillis", 30 * 60 * 1000);
+      _testWhileIdle = booleanProp(props, "testWhileIdle", true);
       _minIdle = intProp(props, "minIdle", 0);
       _lifo = booleanProp(props, "lifo", true);
    }

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/RequestBalancingStrategy.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/RequestBalancingStrategy.java	2010-05-18 15:39:00 UTC (rev 1804)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/RequestBalancingStrategy.java	2010-05-18 15:40:48 UTC (rev 1805)
@@ -1,14 +1,18 @@
 package org.infinispan.client.hotrod.impl.transport.tcp;
 
+import net.jcip.annotations.ThreadSafe;
+
 import java.net.InetSocketAddress;
 import java.util.Collection;
 
 /**
- * // TODO: Document this
+ * Defines how request are distributed between the servers for replicated caches. This class must be thread safe: setServer
+ * and nextServer are called from multiple threads. 
  *
  * @author Mircea.Markus at jboss.com
  * @since 4.1
  */
+ at ThreadSafe
 public interface RequestBalancingStrategy {
 
    void setServers(Collection<InetSocketAddress> servers);

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/RoundRobinBalancingStrategy.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/RoundRobinBalancingStrategy.java	2010-05-18 15:39:00 UTC (rev 1804)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/RoundRobinBalancingStrategy.java	2010-05-18 15:40:48 UTC (rev 1805)
@@ -13,10 +13,8 @@
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /**
- * // TODO: Document this
+ * Round-robin implementation for {@link org.infinispan.client.hotrod.impl.transport.tcp.RequestBalancingStrategy}.
  *
- * todo - this can be called from several threads, synchronize!
- *
  * @author Mircea.Markus at jboss.com
  * @since 4.1
  */
@@ -30,7 +28,7 @@
    private final Lock writeLock = readWriteLock.writeLock();
    private final AtomicInteger index = new AtomicInteger(0);
 
-   private InetSocketAddress[] servers;
+   private volatile InetSocketAddress[] servers;
 
    @Override
    public void setServers(Collection<InetSocketAddress> servers) {
@@ -56,7 +54,7 @@
          int pos = index.getAndIncrement() % servers.length;
          InetSocketAddress server = servers[pos];
          if (log.isTraceEnabled()) {
-            log.trace("Retuning server: " + server);
+            log.trace("Returning server: " + server);
          }
          return server;
       } finally {

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransport.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransport.java	2010-05-18 15:39:00 UTC (rev 1804)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransport.java	2010-05-18 15:40:48 UTC (rev 1805)
@@ -1,7 +1,9 @@
 package org.infinispan.client.hotrod.impl.transport.tcp;
 
+import net.jcip.annotations.ThreadSafe;
 import org.infinispan.client.hotrod.impl.transport.AbstractTransport;
 import org.infinispan.client.hotrod.exceptions.TransportException;
+import org.infinispan.client.hotrod.impl.transport.TransportFactory;
 import org.infinispan.client.hotrod.impl.transport.VHelper;
 import org.infinispan.util.logging.Log;
 import org.infinispan.util.logging.LogFactory;
@@ -12,18 +14,32 @@
 import java.nio.channels.SocketChannel;
 
 /**
- * // TODO: Document this
+ * Transport implementation based on TCP.
  *
- * @author mmarkus
+ * @author Mircea.Markus at jboss.com
  * @since 4.1
  */
+ at ThreadSafe
 public class TcpTransport extends AbstractTransport {
 
    private static Log log = LogFactory.getLog(TcpTransport.class);
 
-   private Socket socket;
-   private InetSocketAddress serverAddress;
+   private final Socket socket;
+   private final InetSocketAddress serverAddress;
 
+   public TcpTransport(InetSocketAddress serverAddress, TransportFactory transportFactory) {
+      super(transportFactory);
+      this.serverAddress = serverAddress;
+      try {
+         SocketChannel socketChannel = SocketChannel.open(serverAddress);
+         socket = socketChannel.socket();
+      } catch (IOException e) {
+         String message = "Could not connect to server: " + serverAddress;
+         log.error(message, e);
+         throw new TransportException(message, e);
+      }
+   }
+
    public void writeVInt(int vInt) {
       try {
          VHelper.writeVInt(vInt, socket.getOutputStream());
@@ -68,18 +84,6 @@
       }
    }
 
-   public TcpTransport(InetSocketAddress serverAddress) {
-      this.serverAddress = serverAddress;
-      try {
-         SocketChannel socketChannel = SocketChannel.open(serverAddress);
-         socket = socketChannel.socket();
-      } catch (IOException e) {
-         String message = "Could not connect to server: " + serverAddress;
-         log.error(message, e);
-         throw new TransportException(message, e);
-      }
-   }
-
    protected void writeBytes(byte[] toAppend) {
       try {
          socket.getOutputStream().write(toAppend);

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java	2010-05-18 15:39:00 UTC (rev 1804)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java	2010-05-18 15:40:48 UTC (rev 1805)
@@ -1,5 +1,6 @@
 package org.infinispan.client.hotrod.impl.transport.tcp;
 
+import net.jcip.annotations.ThreadSafe;
 import org.apache.commons.pool.impl.GenericKeyedObjectPool;
 import org.infinispan.client.hotrod.exceptions.TransportException;
 import org.infinispan.client.hotrod.impl.consistenthash.ConsistentHash;
@@ -16,22 +17,20 @@
 import java.util.LinkedHashMap;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
- * // TODO: Document this
- * <p/>
- * todo - all methods but start and start can be called from multiple threads, add proper sync
- *
  * @author Mircea.Markus at jboss.com
  * @since 4.1
  */
+ at ThreadSafe
 public class TcpTransportFactory implements TransportFactory {
 
    private static final Log log = LogFactory.getLog(TcpTransportFactory.class);
 
    /**
-    * These are declared volatile as the thread that calls {@link #start(java.util.Properties, java.util.Collection)}
-    * might(and likely will) be different from the thread that calls {@link #getTransport()} or other methods
+    * These are declared volatile as the thread that calls {@link org.infinispan.client.hotrod.impl.transport.TransportFactory#start(java.util.Properties, java.util.Collection, java.util.concurrent.atomic.AtomicInteger)}
+    * might(and likely will) be different from the thread(s) that calls {@link #getTransport()} or other methods
     */
    private volatile GenericKeyedObjectPool connectionPool;
    private volatile RequestBalancingStrategy balancer;
@@ -40,12 +39,12 @@
    private final ConsistentHashFactory hashFactory = new ConsistentHashFactory();
 
    @Override
-   public void start(Properties props, Collection<InetSocketAddress> staticConfiguredServers) {
+   public void start(Properties props, Collection<InetSocketAddress> staticConfiguredServers, AtomicInteger topologyId) {
       hashFactory.init(props);
       servers = staticConfiguredServers;
       String balancerClass = props.getProperty("requestBalancingStrategy", RoundRobinBalancingStrategy.class.getName());
       balancer = (RequestBalancingStrategy) VHelper.newInstance(balancerClass);
-      PropsKeyedObjectPoolFactory poolFactory = new PropsKeyedObjectPoolFactory(new TransportObjectFactory(), props);
+      PropsKeyedObjectPoolFactory poolFactory = new PropsKeyedObjectPoolFactory(new TransportObjectFactory(this, topologyId), props);
       connectionPool = (GenericKeyedObjectPool) poolFactory.createPool();
       balancer.setServers(servers);
    }

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TransportObjectFactory.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TransportObjectFactory.java	2010-05-18 15:39:00 UTC (rev 1804)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TransportObjectFactory.java	2010-05-18 15:40:48 UTC (rev 1805)
@@ -1,39 +1,66 @@
 package org.infinispan.client.hotrod.impl.transport.tcp;
 
 import org.apache.commons.pool.BaseKeyedPoolableObjectFactory;
+import org.infinispan.client.hotrod.impl.protocol.HotRodOperationsHelper;
+import org.infinispan.client.hotrod.impl.protocol.HotrodConstants;
+import org.infinispan.manager.DefaultCacheManager;
 import org.infinispan.util.logging.Log;
 import org.infinispan.util.logging.LogFactory;
 
 import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
- * // TODO: Document this
- *
  * @author Mircea.Markus at jboss.com
  * @since 4.1
  */
 public class TransportObjectFactory extends BaseKeyedPoolableObjectFactory {
 
-   private static Log log = LogFactory.getLog(TransportObjectFactory.class);
+   private static final Log log = LogFactory.getLog(TransportObjectFactory.class);
+   private final TcpTransportFactory tcpTransportFactory;
+   private final AtomicInteger topologyId;
 
+   public TransportObjectFactory(TcpTransportFactory tcpTransportFactory, AtomicInteger topologyId) {
+      this.tcpTransportFactory = tcpTransportFactory;
+      this.topologyId = topologyId;
+   }
+
    @Override
    public Object makeObject(Object key) throws Exception {
       InetSocketAddress serverAddress = (InetSocketAddress) key;
-      TcpTransport tcpTransport = new TcpTransport(serverAddress);
+      TcpTransport tcpTransport = new TcpTransport(serverAddress, tcpTransportFactory);
       if (log.isTraceEnabled()) {
          log.trace("Created tcp transport: " + tcpTransport);
       }
       return tcpTransport;
    }
 
+   /**
+    * This will be called by the test thread when testWhileIdle==true.
+    */
    @Override
    public boolean validateObject(Object key, Object obj) {
       TcpTransport transport = (TcpTransport) obj;
-      if (log.isTraceEnabled()) {
-         log.trace("About to validate(ping) connection to server " + key + ". TcpTransport is " + transport);
+      try {
+         if (log.isTraceEnabled()) {
+            log.trace("About to validate(ping) connection to server " + key + ". TcpTransport is " + transport);
+         }
+         long messageId = HotRodOperationsHelper.writeHeader(transport, HotrodConstants.PING_REQUEST, DefaultCacheManager.DEFAULT_CACHE_NAME, topologyId);
+         short respStatus = HotRodOperationsHelper.readHeaderAndValidate(transport, messageId, HotrodConstants.PING_RESPONSE, topologyId);
+         if (respStatus == HotrodConstants.NO_ERROR_STATUS) {
+            if (log.isTraceEnabled())
+               log.trace("Successfully validated transport: " + transport);
+            return true;
+         } else {
+            if (log.isTraceEnabled())
+               log.trace("Unknown response status: " + respStatus);
+            return false;
+         }
+      } catch (Exception e) {
+         if (log.isTraceEnabled())
+            log.trace("Failed to validate transport: " + transport, e);
+         return false;
       }
-      //todo implement
-      return true;
    }
 
    @Override



More information about the infinispan-commits mailing list