[infinispan-commits] Infinispan SVN: r1750 - in trunk: client/hotrod-client/src/main/java/org/infinispan/client/hotrod and 10 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Thu May 6 18:19:32 EDT 2010


Author: mircea.markus
Date: 2010-05-06 18:19:31 -0400 (Thu, 06 May 2010)
New Revision: 1750

Added:
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/async/
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/async/DefaultAsyncExecutorFactory.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/async/NotifyingFutureImpl.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/consistenthash/
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/consistenthash/ConsistentHash.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/consistenthash/ConsistentHashFactory.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/consistenthash/ConsitentHashV1.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/AbstractTransport.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/Transport.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/TransportFactory.java
   trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/CSAIntegrationTest.java
   trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/RemoteAsyncAPITest.java
Removed:
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/AbstractTransport.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/Transport.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/TransportFactory.java
Modified:
   trunk/client/hotrod-client/pom.xml
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCache.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperationsImpl.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/RemoteCacheImpl.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/RemoteCacheSupport.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/SerializationMarshaller.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransport.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransportFactory.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransport.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java
   trunk/client/hotrod-client/src/main/resources/hotrod-client.properties
   trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HotRodIntegrationTest.java
   trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/TestHelper.java
   trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/TopologyChangeTest.java
   trunk/core/src/main/java/org/infinispan/io/UnsignedNumeric.java
   trunk/core/src/test/java/org/infinispan/test/fwk/TestCacheManagerFactory.java
Log:
ongoing work on HR client


Modified: trunk/client/hotrod-client/pom.xml
===================================================================
--- trunk/client/hotrod-client/pom.xml	2010-05-06 14:52:22 UTC (rev 1749)
+++ trunk/client/hotrod-client/pom.xml	2010-05-06 22:19:31 UTC (rev 1750)
@@ -11,6 +11,7 @@
    </parent>
    <properties>
       <version.netty>3.2.0.BETA1</version.netty>
+      <version.scala>2.8.0.RC1</version.scala>
    </properties>
 
 
@@ -61,5 +62,12 @@
          <version>1.5.4</version>
       </dependency>
 
+      <dependency>
+         <groupId>org.scala-lang</groupId>
+         <artifactId>scala-library</artifactId>
+         <version>${version.scala}</version>
+         <scope>test</scope>
+      </dependency>
+
    </dependencies>
 </project>

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCache.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCache.java	2010-05-06 14:52:22 UTC (rev 1749)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCache.java	2010-05-06 22:19:31 UTC (rev 1750)
@@ -62,55 +62,55 @@
     * <pre>
     * VersionedEntry ve = remoteCache.getVersioned(key);
     * //some processing
-    * remoteCache.remove(key, ve.getVersion();
+    * remoteCache.removeWithVersion(key, ve.getVersion();
     * </pre>
-    * Lat call (remove) will make sure that the entry will only be removed if it hasn't been changed in between.
+    * Lat call (removeWithVersion) will make sure that the entry will only be removed if it hasn't been changed in between.
     *
     * @return true if the entry has been removed
     * @see VersionedValue
     * @see #getVersioned(Object)
     */
-   boolean remove(K key, long version);
+   boolean removeWithVersion(K key, long version);
 
    /**
     * @see #remove(Object, Object)
     */
-   NotifyingFuture<Boolean> removeAsync(Object key, long version);
+   NotifyingFuture<Boolean> removeWithVersionAsync(K key, long version);
 
    /**
-    * Removes the given value only if its version matches the supplied version. See {@link #remove(Object, long)} for a
+    * Removes the given value only if its version matches the supplied version. See {@link #removeWithVersion(Object, long)} for a
     * sample usage.
     *
     * @return true if the method has been replaced
     * @see #getVersioned(Object)
     * @see VersionedValue
     */
-   boolean replace(K key, V newValue, long version);
+   boolean replaceWithVersion(K key, V newValue, long version);
 
    /**
-    * @see #replace(Object, Object, long)
+    * @see #replaceWithVersion(Object, Object, long)
     */
-   boolean replace(K key, V newValue, long version, int lifespanSeconds);
+   boolean replaceWithVersion(K key, V newValue, long version, int lifespanSeconds);
 
    /**
-    * @see #replace(Object, Object, long)
+    * @see #replaceWithVersion(Object, Object, long)
     */
-   boolean replace(K key, V newValue, long version, int lifespanSeconds, int maxIdleTimeSeconds);
+   boolean replaceWithVersion(K key, V newValue, long version, int lifespanSeconds, int maxIdleTimeSeconds);
 
    /**
-    * @see #replace(Object, Object, long)
+    * @see #replaceWithVersion(Object, Object, long)
     */
-   NotifyingFuture<Boolean> replaceAsync(K key, V newValue, long version);
+   NotifyingFuture<Boolean> replaceWithVersionAsync(K key, V newValue, long version);
 
    /**
-    * @see #replace(Object, Object, long)
+    * @see #replaceWithVersion(Object, Object, long)
     */
-   NotifyingFuture<Boolean> replaceAsync(K key, V newValue, long version, int lifespanSeconds);
+   NotifyingFuture<Boolean> replaceWithVersionAsync(K key, V newValue, long version, int lifespanSeconds);
 
    /**
-    * @see #replace(Object, Object, long)
+    * @see #replaceWithVersion(Object, Object, long)
     */
-   NotifyingFuture<Boolean> replaceAsync(K key, V newValue, long version, int lifespanSeconds, int maxIdleSeconds);
+   NotifyingFuture<Boolean> replaceWithVersionAsync(K key, V newValue, long version, int lifespanSeconds, int maxIdleSeconds);
 
 
    /**
@@ -202,7 +202,7 @@
    void endBatch(boolean successful);
 
    /**
-    * This operation is not supported. Consider using {@link #remove(Object, long)} instead.
+    * This operation is not supported. Consider using {@link #removeWithVersion(Object, long)} instead.
     *
     * @throws UnsupportedOperationException
     */
@@ -210,7 +210,7 @@
    boolean remove(Object key, Object value);
 
    /**
-    * This operation is not supported. Consider using {@link #removeAsync(Object, long)} instead.
+    * This operation is not supported. Consider using {@link #removeWithVersionAsync(Object, long)} instead.
     *
     * @throws UnsupportedOperationException
     */
@@ -218,7 +218,7 @@
    NotifyingFuture<Boolean> removeAsync(Object key, Object value);
 
    /**
-    * This operation is not supported. Consider using {@link #replace(Object, Object, long)} instead.
+    * This operation is not supported. Consider using {@link #replaceWithVersion(Object, Object, long)} instead.
     *
     * @throws UnsupportedOperationException
     */
@@ -242,7 +242,7 @@
    boolean replace(K key, V oldValue, V value, long lifespan, TimeUnit lifespanUnit, long maxIdleTime, TimeUnit maxIdleTimeUnit);
 
    /**
-    * This operation is not supported. Consider using {@link #replaceAsync(Object, Object, long)} instead.
+    * This operation is not supported. Consider using {@link #replaceWithVersionAsync(Object, Object, long)} instead.
     *
     * @throws UnsupportedOperationException
     */

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java	2010-05-06 14:52:22 UTC (rev 1749)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java	2010-05-06 22:19:31 UTC (rev 1750)
@@ -1,14 +1,16 @@
 package org.infinispan.client.hotrod;
 
 import org.infinispan.client.hotrod.exceptions.HotRodClientException;
+import org.infinispan.client.hotrod.impl.async.DefaultAsyncExecutorFactory;
 import org.infinispan.client.hotrod.impl.HotrodOperations;
 import org.infinispan.client.hotrod.impl.HotrodOperationsImpl;
 import org.infinispan.client.hotrod.impl.HotrodMarshaller;
 import org.infinispan.client.hotrod.impl.RemoteCacheImpl;
 import org.infinispan.client.hotrod.impl.SerializationMarshaller;
-import org.infinispan.client.hotrod.impl.TransportFactory;
+import org.infinispan.client.hotrod.impl.transport.TransportFactory;
 import org.infinispan.client.hotrod.impl.transport.VHelper;
 import org.infinispan.client.hotrod.impl.transport.tcp.TcpTransportFactory;
+import org.infinispan.executors.ExecutorFactory;
 import org.infinispan.lifecycle.Lifecycle;
 import org.infinispan.manager.CacheContainer;
 import org.infinispan.manager.DefaultCacheManager;
@@ -23,6 +25,7 @@
 import java.util.Properties;
 import java.util.Set;
 import java.util.StringTokenizer;
+import java.util.concurrent.ExecutorService;
 
 /**
  * // TODO: Document this
@@ -45,6 +48,8 @@
    private TransportFactory transportFactory;
    private String hotrodMarshaller;
    private boolean started = false;
+   private boolean forceReturnValueDefault = false;
+   private ExecutorService asyncExecutorService;
 
 
    /**
@@ -151,12 +156,19 @@
 
 
    public <K, V> RemoteCache<K, V> getCache(String cacheName) {
-      return createRemoteCache(cacheName);
+      return this.getCache(cacheName, forceReturnValueDefault);
    }
 
+   public <K, V> RemoteCache<K, V> getCache(String cacheName, boolean forceReturnValue) {
+      return createRemoteCache(cacheName, forceReturnValue);
+   }
+
    public <K, V> RemoteCache<K, V> getCache() {
-      return createRemoteCache(DefaultCacheManager.DEFAULT_CACHE_NAME);
+      return this.getCache(forceReturnValueDefault);
    }
+   public <K, V> RemoteCache<K, V> getCache(boolean forceReturnValue) {
+      return createRemoteCache(DefaultCacheManager.DEFAULT_CACHE_NAME, forceReturnValue);
+   }
 
    @Override
    public void start() {
@@ -169,12 +181,21 @@
       String servers = props.getProperty(CONF_HOTROD_SERVERS);
       transportFactory.start(props, getStaticConfiguredServers(servers));
       hotrodMarshaller = props.getProperty("marshaller");
+
+      String asyncExecutorClass = DefaultAsyncExecutorFactory.class.getName();
+      if (props.contains("asyn-executor-factory")) {
+         asyncExecutorClass = props.getProperty("asyn-executor-factory");
+      }
+      ExecutorFactory executorFactory = (ExecutorFactory) VHelper.newInstance(asyncExecutorClass);
+      asyncExecutorService = executorFactory.getExecutor(props);
+      
+
       if (hotrodMarshaller == null) {
          hotrodMarshaller = SerializationMarshaller.class.getName();
          log.info("'marshaller' not specified, using " + hotrodMarshaller);
       }
       if (props.get("force-return-value") != null && props.get("force-return-value").equals("true")) {
-         throw new RuntimeException("force-return-value is not supported in Alpha1");
+          forceReturnValueDefault = true;
       }
       started = true;
    }
@@ -198,10 +219,10 @@
       }
    }
 
-   private <K, V> RemoteCache<K, V> createRemoteCache(String cacheName) {
+   private <K, V> RemoteCache<K, V> createRemoteCache(String cacheName, boolean forceReturnValue) {
       HotrodMarshaller marshaller = (HotrodMarshaller) VHelper.newInstance(hotrodMarshaller);
       HotrodOperations hotrodOperations = new HotrodOperationsImpl(cacheName, transportFactory);
-      return new RemoteCacheImpl<K, V>(hotrodOperations, marshaller, cacheName, this);
+      return new RemoteCacheImpl<K, V>(hotrodOperations, marshaller, cacheName, this, asyncExecutorService, forceReturnValue);
    }
 
    private Set<InetSocketAddress> getStaticConfiguredServers(String servers) {

Deleted: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/AbstractTransport.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/AbstractTransport.java	2010-05-06 14:52:22 UTC (rev 1749)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/AbstractTransport.java	2010-05-06 22:19:31 UTC (rev 1750)
@@ -1,69 +0,0 @@
-package org.infinispan.client.hotrod.impl;
-
-import org.infinispan.util.logging.Log;
-import org.infinispan.util.logging.LogFactory;
-
-/**
- * // TODO: Document this
- *
- * @author Mircea.Markus at jboss.com
- * @since 4.1
- */
-public abstract class AbstractTransport implements Transport {
-
-   private static Log log = LogFactory.getLog(AbstractTransport.class);
-
-   public byte[] readArray() {
-      int responseLength = readVInt();
-      return readByteArray(responseLength);
-   }
-
-   @Override
-   public String readString() {
-      byte[] strContent = readArray();
-      String readString = new String(strContent);
-      if (log.isTraceEnabled()) {
-         log.trace("Read string is: " + readString);
-      }
-      return readString;//todo take care of encoding here
-   }
-
-   @Override
-   public long readLong() {
-      //todo - optimize this not to create the longBytes on every call, but reuse it/cache it as class is NOT thread safe
-      byte[] longBytes = readByteArray(8);
-      long result = 0;
-      for (byte longByte : longBytes) {
-         result <<= 8;
-         result ^= (long) longByte & 0xFF;
-      }
-      return result;
-   }
-
-   @Override
-   public void writeLong(long longValue) {
-      byte[] b = new byte[8];
-      for (int i = 0; i < 8; i++) {
-         b[7 - i] = (byte) (longValue >>> (i * 8));
-      }
-      writeBytes(b);
-   }
-
-   @Override
-   public int readUnsignedShort() {
-      byte[] shortBytes = readByteArray(2);
-      int result = 0;
-      for (byte longByte : shortBytes) {
-         result <<= 8;
-         result ^= (long) longByte & 0xFF;
-      }
-      return result;
-   }
-
-   public void writeArray(byte[] toAppend) {
-      writeVInt(toAppend.length);
-      writeBytes(toAppend);
-   }
-
-   protected abstract void writeBytes(byte[] toAppend);
-}

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperationsImpl.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperationsImpl.java	2010-05-06 14:52:22 UTC (rev 1749)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperationsImpl.java	2010-05-06 22:19:31 UTC (rev 1750)
@@ -5,13 +5,15 @@
 import org.infinispan.client.hotrod.exceptions.InvalidResponseException;
 import org.infinispan.client.hotrod.exceptions.TimeoutException;
 import org.infinispan.client.hotrod.exceptions.TransportException;
+import org.infinispan.client.hotrod.impl.transport.Transport;
+import org.infinispan.client.hotrod.impl.transport.TransportFactory;
 import org.infinispan.util.logging.Log;
 import org.infinispan.util.logging.LogFactory;
 
 import java.net.InetSocketAddress;
-import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
-import java.util.List;
+import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -30,7 +32,7 @@
    private static final AtomicLong MSG_ID = new AtomicLong();
    private static final AtomicInteger TOPOLOGY_ID = new AtomicInteger();
    private TransportFactory transportFactory;
-   private byte clientIntelligence = CLIENT_INTELLIGENCE_TOPOLOGY_AWARE;
+   private byte clientIntelligence = CLIENT_INTELLIGENCE_HASH_DISTRIBUTION_AWARE;
 
    public HotrodOperationsImpl(String cacheName, TransportFactory transportFactory) {
       cacheNameBytes = cacheName.getBytes(); //todo add charset here
@@ -38,7 +40,7 @@
    }
 
    public byte[] get(byte[] key, Flag[] flags) {
-      Transport transport = transportFactory.getTransport();
+      Transport transport = transportFactory.getTransport(key);
       try {
          short status = sendKeyOperation(key, transport, GET_REQUEST, flags, GET_RESPONSE);
          if (status == KEY_DOES_NOT_EXIST_STATUS) {
@@ -54,7 +56,7 @@
    }
 
    public byte[] remove(byte[] key, Flag[] flags) {
-      Transport transport = transportFactory.getTransport();
+      Transport transport = transportFactory.getTransport(key);
       try {
          short status = sendKeyOperation(key, transport, REMOVE_REQUEST, flags, REMOVE_RESPONSE);
          if (status == KEY_DOES_NOT_EXIST_STATUS) {
@@ -69,7 +71,7 @@
    }
 
    public boolean containsKey(byte[] key, Flag... flags) {
-      Transport transport = transportFactory.getTransport();
+      Transport transport = transportFactory.getTransport(key);
       try {
          short status = sendKeyOperation(key, transport, CONTAINS_KEY_REQUEST, flags, CONTAINS_KEY_RESPONSE);
          if (status == KEY_DOES_NOT_EXIST_STATUS) {
@@ -84,7 +86,7 @@
    }
 
    public BinaryVersionedValue getWithVersion(byte[] key, Flag... flags) {
-      Transport transport = transportFactory.getTransport();
+      Transport transport = transportFactory.getTransport(key);
       try {
          short status = sendKeyOperation(key, transport, GET_WITH_VERSION, flags, GET_WITH_VERSION_RESPONSE);
          if (status == KEY_DOES_NOT_EXIST_STATUS) {
@@ -106,7 +108,7 @@
 
 
    public byte[] put(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags) {
-      Transport transport = transportFactory.getTransport();
+      Transport transport = transportFactory.getTransport(key);
       try {
          short status = sendPutOperation(key, value, transport, PUT_REQUEST, PUT_RESPONSE, lifespan, maxIdle, flags);
          if (status != NO_ERROR_STATUS) {
@@ -119,12 +121,16 @@
    }
 
    public byte[] putIfAbsent(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags) {
-      Transport transport = transportFactory.getTransport();
+      Transport transport = transportFactory.getTransport(key);
       try {
          short status = sendPutOperation(key, value, transport, PUT_IF_ABSENT_REQUEST, PUT_IF_ABSENT_RESPONSE, lifespan, maxIdle, flags);
          if (status == NO_ERROR_STATUS || status == NOT_PUT_REMOVED_REPLACED_STATUS) {
-            return returnPossiblePrevValue(transport, flags);
-         } 
+            byte[] bytes = returnPossiblePrevValue(transport, flags);
+            if (log.isTraceEnabled()) {
+               log.trace("Returning from putIfAbsent: " + Arrays.toString(bytes));
+            }
+            return bytes;
+         }
       } finally {
          releaseTransport(transport);
       }
@@ -132,13 +138,11 @@
    }
 
    public byte[] replace(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags) {
-      Transport transport = transportFactory.getTransport();
+      Transport transport = transportFactory.getTransport(key);
       try {
          short status = sendPutOperation(key, value, transport, REPLACE_REQUEST, REPLACE_RESPONSE, lifespan, maxIdle, flags);
-         if (status == NO_ERROR_STATUS) {
+         if (status == NO_ERROR_STATUS || status == NOT_PUT_REMOVED_REPLACED_STATUS) {
             return returnPossiblePrevValue(transport, flags);
-         } else if (status == NOT_PUT_REMOVED_REPLACED_STATUS) {
-            return null;
          }
       } finally {
          releaseTransport(transport);
@@ -153,7 +157,7 @@
     * was sent, the response would be empty.
     */
    public VersionedOperationResponse replaceIfUnmodified(byte[] key, byte[] value, int lifespan, int maxIdle, long version, Flag... flags) {
-      Transport transport = transportFactory.getTransport();
+      Transport transport = transportFactory.getTransport(key);
       try {
          // 1) write header
          long messageId = writeHeader(transport, REPLACE_IF_UNMODIFIED_REQUEST, flags);
@@ -174,7 +178,7 @@
     * Request: [header][key length][key][entry_version]
     */
    public VersionedOperationResponse removeIfUnmodified(byte[] key, long version, Flag... flags) {
-      Transport transport = transportFactory.getTransport();
+      Transport transport = transportFactory.getTransport(key);
       try {
          // 1) write header
          long messageId = writeHeader(transport, REMOVE_IF_UNMODIFIED_REQUEST, flags);
@@ -209,7 +213,7 @@
          long messageId = writeHeader(transport, STATS_REQUEST);
          readHeaderAndValidate(transport, messageId, STATS_RESPONSE);
          int nrOfStats = transport.readVInt();
-         
+
          Map<String, String> result = new HashMap<String, String>();
          for (int i = 0; i < nrOfStats; i++) {
             String statName = transport.readString();
@@ -265,6 +269,7 @@
    /*
     * Magic	| MessageId	| Version | Opcode | CacheNameLength | CacheName | Flags | Client Intelligence | Topology Id
     */
+
    private long writeHeader(Transport transport, short operationCode, Flag... flags) {
       transport.writeByte(REQUEST_MAGIC);
       long messageId = MSG_ID.incrementAndGet();
@@ -322,21 +327,43 @@
       checkForErrorsInResponseStatus(status, messageId, transport);
       short topologyChangeByte = transport.readByte();
       if (topologyChangeByte == 1) {
-         int newTopology = transport.readVInt();
-         TOPOLOGY_ID.set(newTopology);
-         int clusterSize = transport.readVInt();
-         List<InetSocketAddress> hotRodServers = new ArrayList<InetSocketAddress>(clusterSize);
-         for (int i = 0; i < clusterSize; i++) {
-            String host = transport.readString();
-            int port = transport.readUnsignedShort();
-            hotRodServers.add(new InetSocketAddress(host, port));
+         readNewTopologyAndHash(transport);
+      }
+      return status;
+   }
+
+   private void readNewTopologyAndHash(Transport transport) {
+      int newTopologyId = transport.readVInt();
+      TOPOLOGY_ID.set(newTopologyId);
+      int numKeyOwners = transport.readUnsignedShort();
+      short hashFunctionVersion = transport.readByte();
+      int hashSpace = transport.readVInt();
+      int clusterSize = transport.readVInt();
+
+      if (log.isTraceEnabled()) {
+         log.trace("Topology change request: newTopologyId=" + newTopologyId + ", numKeyOwners=" + numKeyOwners +
+               ", hashFunctionVersion=" + hashFunctionVersion + ", hashSpaceSize=" + hashSpace + ", clusterSize=" + clusterSize);
+      }
+
+      LinkedHashMap<InetSocketAddress, Integer> servers2HashCode = new LinkedHashMap<InetSocketAddress, Integer>();
+
+      for (int i = 0; i < clusterSize; i++) {
+         String host = transport.readString();
+         int port = transport.readUnsignedShort();
+         if (log.isTraceEnabled()) {
+            log.trace("Server read:" + host + ":" + port);
          }
-         if (log.isInfoEnabled()) {
-            log.info("Received topology change response. New cluster size = " + clusterSize +", new topology id = " + newTopology  + ", new topology " + hotRodServers);
+         int hashCode = transport.readVInt();
+         servers2HashCode.put(new InetSocketAddress(host, port), hashCode);
+         if (log.isTraceEnabled()) {
+            log.trace("Hash code is: " + hashCode);
          }
-         transportFactory.updateServers(hotRodServers);
       }
-      return status;
+      if (log.isInfoEnabled()) {
+         log.info("New topology: " + servers2HashCode);
+      }
+      transportFactory.updateServers(servers2HashCode.keySet());
+      transportFactory.updateHashFunction(servers2HashCode, numKeyOwners, hashFunctionVersion, hashSpace);
    }
 
    private void checkForErrorsInResponseStatus(short status, long messageId, Transport transport) {
@@ -392,12 +419,19 @@
    }
 
    private byte[] returnPossiblePrevValue(Transport transport, Flag[] flags) {
-      return hasForceReturn(flags) ? transport.readArray() : null;
+      if (hasForceReturn(flags)) {
+         byte[] bytes = transport.readArray();
+         if (log.isTraceEnabled()) log.trace("Previous value bytes is: " + Arrays.toString(bytes));
+         //0-length response means null
+         return bytes.length == 0 ? null : bytes;
+      } else {
+         return null;
+      }
    }
 
    private void releaseTransport(Transport transport) {
       if (transport != null)
-        transportFactory.releaseTransport(transport);
+         transportFactory.releaseTransport(transport);
    }
 
    private VersionedOperationResponse returnVersionedOperationResponse(Transport transport, long messageId, byte response, Flag[] flags) {

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/RemoteCacheImpl.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/RemoteCacheImpl.java	2010-05-06 14:52:22 UTC (rev 1749)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/RemoteCacheImpl.java	2010-05-06 22:19:31 UTC (rev 1750)
@@ -6,59 +6,88 @@
 import org.infinispan.client.hotrod.ServerStatistics;
 import org.infinispan.client.hotrod.Version;
 import org.infinispan.client.hotrod.VersionedValue;
-import org.infinispan.manager.CacheManager;
+import org.infinispan.client.hotrod.impl.async.NotifyingFutureImpl;
 import org.infinispan.util.concurrent.NotifyingFuture;
 
 import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 /**
  * // TODO: Document this
+ * //todo - consider the return values
  *
  * @author Mircea.Markus at jboss.com
  * @since 4.1
  */
-public class RemoteCacheImpl<K, V> extends RemoteCacheSupport<K,V> {
+public class RemoteCacheImpl<K, V> extends RemoteCacheSupport<K, V> {
 
+   private static final Flag[] FORCE_RETURN_VALUE = {Flag.FORCE_RETURN_VALUE};
+
    private ThreadLocal<Flag[]> flagsMap = new ThreadLocal<Flag[]>();
    private HotrodOperations operations;
    private HotrodMarshaller marshaller;
    private String name;
    private RemoteCacheManager remoteCacheManager;
+   private final ExecutorService executorService;
+   private final boolean forceReturnValue;
 
 
-   public RemoteCacheImpl(HotrodOperations operations, HotrodMarshaller marshaller, String name, RemoteCacheManager rcm) {
+   public RemoteCacheImpl(HotrodOperations operations, HotrodMarshaller marshaller, String name, RemoteCacheManager rcm, ExecutorService executorService, boolean forceReturnValue) {
       this.operations = operations;
       this.marshaller = marshaller;
       this.name = name;
       this.remoteCacheManager = rcm;
+      this.executorService = executorService;
+      this.forceReturnValue = forceReturnValue;
    }
 
    public RemoteCacheManager getRemoteCacheManager() {
       return remoteCacheManager;
    }
 
-
    @Override
-   public boolean remove(K key, long version) {
+   public boolean removeWithVersion(K key, long version) {
       VersionedOperationResponse response = operations.removeIfUnmodified(obj2bytes(key), version, flags());
       return response.getCode().isUpdated();
    }
 
    @Override
-   public NotifyingFuture<Boolean> removeAsync(Object key, long version) {
-      return null;  // TODO: Customise this generated block
+   public NotifyingFuture<Boolean> removeWithVersionAsync(final K key, final long version) {
+      final NotifyingFutureImpl<Boolean> result = new NotifyingFutureImpl<Boolean>();
+      Future future = executorService.submit(new Callable() {
+         @Override
+         public Object call() throws Exception {
+            boolean removed = removeWithVersion(key, version);
+            result.notifyFutureCompletion();
+            return removed;
+         }
+      });
+      result.setExecuting(future);
+      return result;
    }
 
    @Override
-   public boolean replace(K key, V newValue, long version, int lifespanSeconds, int maxIdleTimeSeconds) {
+   public boolean replaceWithVersion(K key, V newValue, long version, int lifespanSeconds, int maxIdleTimeSeconds) {
       VersionedOperationResponse response = operations.replaceIfUnmodified(obj2bytes(key), obj2bytes(newValue), lifespanSeconds, maxIdleTimeSeconds, version, flags());
       return response.getCode().isUpdated();
    }
 
    @Override
-   public NotifyingFuture<Boolean> replaceAsync(K key, V newValue, long version, int lifespanSeconds, int maxIdleSeconds) {
-      return null;  // TODO: Customise this generated block
+   public NotifyingFuture<Boolean> replaceWithVersionAsync(final K key, final V newValue, final long version, final int lifespanSeconds, final int maxIdleSeconds) {
+      final NotifyingFutureImpl<Boolean> result = new NotifyingFutureImpl<Boolean>();
+      Future future = executorService.submit(new Callable() {
+         @Override
+         public Object call() throws Exception {
+            boolean removed = replaceWithVersion(key, newValue, version, lifespanSeconds, maxIdleSeconds);
+            result.notifyFutureCompletion();
+            return removed;
+         }
+      });
+      result.setExecuting(future);
+      return result;
    }
 
    @Override
@@ -75,8 +104,19 @@
    }
 
    @Override
-   public NotifyingFuture<Void> putAllAsync(Map<? extends K, ? extends V> data, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit) {
-      return null;  // TODO: Customise this generated block
+   public NotifyingFuture<Void> putAllAsync(final Map<? extends K, ? extends V> data, final long lifespan, final TimeUnit lifespanUnit, final long maxIdle, final TimeUnit maxIdleUnit) {
+      final NotifyingFutureImpl<Void> result = new NotifyingFutureImpl<Void>();
+      Future future = executorService.submit(new Callable() {
+         @Override
+         public Object call() throws Exception {
+            putAll(data, lifespan, lifespanUnit, maxIdle, maxIdleUnit);
+            result.notifyFutureCompletion();
+            return null;
+         }
+      });
+      result.setExecuting(future);
+      return result;
+
    }
 
    @Override
@@ -96,7 +136,7 @@
 
    @Override
    public String getVersion() {
-      return Version.getProtocolVersion();  
+      return Version.getProtocolVersion();
    }
 
    @Override
@@ -125,28 +165,78 @@
    }
 
    @Override
-   public NotifyingFuture<V> putAsync(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit) {
-      return null;  // TODO: Customise this generated block
+   public NotifyingFuture<V> putAsync(final K key, final V value, final long lifespan, final TimeUnit lifespanUnit, final long maxIdle, final TimeUnit maxIdleUnit) {
+      final NotifyingFutureImpl<V> result = new NotifyingFutureImpl<V>();
+      Future future = executorService.submit(new Callable() {
+         @Override
+         public Object call() throws Exception {
+            V prevValue = put(key, value, lifespan, lifespanUnit, maxIdle, maxIdleUnit);
+            result.notifyFutureCompletion();
+            return prevValue;
+         }
+      });
+      result.setExecuting(future);
+      return result;
    }
 
    @Override
    public NotifyingFuture<Void> clearAsync() {
-      return null;  // TODO: Customise this generated block
+      final NotifyingFutureImpl<Void> result = new NotifyingFutureImpl<Void>();
+      Future future = executorService.submit(new Callable() {
+         @Override
+         public Object call() throws Exception {
+            clear();
+            result.notifyFutureCompletion();
+            return null;
+         }
+      });
+      result.setExecuting(future);
+      return result;
    }
 
    @Override
-   public NotifyingFuture<V> putIfAbsentAsync(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit) {
-      return null;  // TODO: Customise this generated block
+   public NotifyingFuture<V> putIfAbsentAsync(final K key,final V value,final long lifespan,final TimeUnit lifespanUnit,final long maxIdle,final TimeUnit maxIdleUnit) {
+      final NotifyingFutureImpl<V> result = new NotifyingFutureImpl<V>();
+      Future future = executorService.submit(new Callable() {
+         @Override
+         public Object call() throws Exception {
+            V prevValue = putIfAbsent(key, value, lifespan, lifespanUnit, maxIdle, maxIdleUnit);
+            result.notifyFutureCompletion();
+            return prevValue;
+         }
+      });
+      result.setExecuting(future);
+      return result;
    }
 
    @Override
-   public NotifyingFuture<V> removeAsync(Object key) {
-      return null;  // TODO: Customise this generated block
+   public NotifyingFuture<V> removeAsync(final Object key) {
+      final NotifyingFutureImpl<V> result = new NotifyingFutureImpl<V>();
+      Future future = executorService.submit(new Callable() {
+         @Override
+         public Object call() throws Exception {
+            V toReturn = remove(key);
+            result.notifyFutureCompletion();
+            return toReturn;
+         }
+      });
+      result.setExecuting(future);
+      return result;      
    }
 
    @Override
-   public NotifyingFuture<V> replaceAsync(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit) {
-      return null;  // TODO: Customise this generated block
+   public NotifyingFuture<V> replaceAsync(final K key,final V value,final long lifespan,final TimeUnit lifespanUnit,final long maxIdle,final TimeUnit maxIdleUnit) {
+      final NotifyingFutureImpl<V> result = new NotifyingFutureImpl<V>();
+      Future future = executorService.submit(new Callable() {
+         @Override
+         public Object call() throws Exception {
+            V v = replace(key, value, lifespan, lifespanUnit, maxIdle, maxIdleUnit);
+            result.notifyFutureCompletion();
+            return v;
+         }
+      });
+      result.setExecuting(future);
+      return result;
    }
 
    @Override
@@ -196,6 +286,9 @@
    private Flag[] flags() {
       Flag[] flags = this.flagsMap.get();
       this.flagsMap.remove();
+      if (flags == null && forceReturnValue) {
+         return FORCE_RETURN_VALUE;
+      }
       return flags;
    }
 
@@ -217,8 +310,8 @@
       return new VersionedValueImpl<V>(value.getVersion(), valueObj);
    }
 
-   private int toSeconds(long durration, TimeUnit timeUnit) {
+   private int toSeconds(long duration, TimeUnit timeUnit) {
       //todo make sure this can pe enveloped on an int
-      return (int) timeUnit.toSeconds(durration);
+      return (int) timeUnit.toSeconds(duration);
    }
 }

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/RemoteCacheSupport.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/RemoteCacheSupport.java	2010-05-06 14:52:22 UTC (rev 1749)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/RemoteCacheSupport.java	2010-05-06 22:19:31 UTC (rev 1750)
@@ -22,18 +22,18 @@
 
 
    @Override
-   public NotifyingFuture<Boolean> replaceAsync(K key, V newValue, long version) {
-      return replaceAsync(key, newValue, version, 0);
+   public NotifyingFuture<Boolean> replaceWithVersionAsync(K key, V newValue, long version) {
+      return replaceWithVersionAsync(key, newValue, version, 0);
    }
 
    @Override
-   public NotifyingFuture<Boolean> replaceAsync(K key, V newValue, long version, int lifespanSeconds) {
-      return replaceAsync(key, newValue, version, 0, 0);
+   public NotifyingFuture<Boolean> replaceWithVersionAsync(K key, V newValue, long version, int lifespanSeconds) {
+      return replaceWithVersionAsync(key, newValue, version, 0, 0);
    }
 
    @Override
-   public boolean replace(K key, V newValue, long version) {
-      return replace(key, newValue, version, 0);
+   public boolean replaceWithVersion(K key, V newValue, long version) {
+      return replaceWithVersion(key, newValue, version, 0);
    }
 
    @Override
@@ -42,8 +42,8 @@
    }
 
    @Override
-   public boolean replace(K key, V newValue, long version, int lifespanSeconds) {
-      return replace(key, newValue, version, lifespanSeconds, 0);
+   public boolean replaceWithVersion(K key, V newValue, long version, int lifespanSeconds) {
+      return replaceWithVersion(key, newValue, version, lifespanSeconds, 0);
    }
 
    @Override

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/SerializationMarshaller.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/SerializationMarshaller.java	2010-05-06 14:52:22 UTC (rev 1749)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/SerializationMarshaller.java	2010-05-06 22:19:31 UTC (rev 1750)
@@ -1,12 +1,15 @@
 package org.infinispan.client.hotrod.impl;
 
 import org.infinispan.client.hotrod.exceptions.HotRodClientException;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.util.Arrays;
 
 /**
  * // TODO: Document this
@@ -16,6 +19,8 @@
  */
 public class SerializationMarshaller implements HotrodMarshaller {
 
+   private static Log log = LogFactory.getLog(SerializationMarshaller.class);
+
    @Override
    public byte[] marshallObject(Object toMarshall) {
       ByteArrayOutputStream result = new ByteArrayOutputStream(1000);
@@ -32,7 +37,11 @@
    public Object readObject(byte[] bytes) {
       try {
          ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
-         return ois.readObject();
+         Object o = ois.readObject();
+         if (log.isTraceEnabled()) {
+            log.trace("Unmarshalled bytes: " + Arrays.toString(bytes) + " and returning object: " + o);
+         }
+         return o;
       } catch (Exception e) {
          throw new HotRodClientException("Unexpected!", e);
       }

Deleted: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/Transport.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/Transport.java	2010-05-06 14:52:22 UTC (rev 1749)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/Transport.java	2010-05-06 22:19:31 UTC (rev 1750)
@@ -1,46 +0,0 @@
-package org.infinispan.client.hotrod.impl;
-
-import net.jcip.annotations.NotThreadSafe;
-
-/**
- * // TODO: Document this
- *
- * @author mmarkus
- * @since 4.1
- */
- at NotThreadSafe
-public interface Transport {
-
-   public void writeArray(byte[] toAppend);
-
-   public void writeByte(short toWrite);
-
-   public void writeVInt(int vint);
-
-   public void writeVLong(long l);
-
-   public long readVLong();
-
-   public int readVInt();
-
-   public void flush();
-
-   public short readByte();
-
-   public void release();
-
-   /**
-    * reads an vint which is size; then an array having that size.
-    */
-   public byte[] readArray();
-
-   String readString();
-
-   byte[] readByteArray(int size);
-
-   long readLong();
-
-   void writeLong(long longValue);
-
-   int readUnsignedShort();
-}

Deleted: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/TransportFactory.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/TransportFactory.java	2010-05-06 14:52:22 UTC (rev 1749)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/TransportFactory.java	2010-05-06 22:19:31 UTC (rev 1750)
@@ -1,27 +0,0 @@
-package org.infinispan.client.hotrod.impl;
-
-import java.net.InetSocketAddress;
-import java.util.Collection;
-import java.util.Properties;
-import java.util.Set;
-
-/**
- * // TODO: Document this
- *
- * @author Mircea.Markus at jboss.com
- * @since 4.1
- */
-public interface TransportFactory {
-
-   public static final String CONF_HOTROD_SERVERS = "hotrod-servers";
-
-   public Transport getTransport();
-
-   public void releaseTransport(Transport transport);
-
-   void start(Properties props, Collection<InetSocketAddress> staticConfiguredServers);
-
-   void updateServers(Collection<InetSocketAddress> newServers);
-
-   void destroy();
-}

Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/async/DefaultAsyncExecutorFactory.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/async/DefaultAsyncExecutorFactory.java	                        (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/async/DefaultAsyncExecutorFactory.java	2010-05-06 22:19:31 UTC (rev 1750)
@@ -0,0 +1,52 @@
+package org.infinispan.client.hotrod.impl.async;
+
+import org.infinispan.executors.ExecutorFactory;
+
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public class DefaultAsyncExecutorFactory implements ExecutorFactory {
+   public static final String THREAD_NAME = "Hotrod-client-async-pool";
+   public static final AtomicInteger counter = new AtomicInteger(0);
+   private int poolSize = 1;
+   private int queueSize = 100000;
+
+   @Override
+   public ExecutorService getExecutor(Properties p) {
+      readParams(p);
+      ThreadFactory tf = new ThreadFactory() {
+         public Thread newThread(Runnable r) {
+            Thread th = new Thread(r, THREAD_NAME + "-" + counter.getAndIncrement());
+            th.setDaemon(true);
+            return th;
+         }
+      };
+
+      return new ThreadPoolExecutor(poolSize, poolSize,
+                                    0L, TimeUnit.MILLISECONDS,
+                                    new LinkedBlockingQueue<Runnable>(queueSize),
+                                    tf);
+   }
+
+   private void readParams(Properties props) {
+      if (props.contains("default-executor-factory.poolSize")) {
+         poolSize = Integer.parseInt(props.getProperty("default-executor-factory.poolSize"));
+      }
+      if (props.contains("default-executor-factory.queueSize")) {
+         queueSize = Integer.parseInt("default-executor-factory.queueSize");
+      }
+   }
+}

Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/async/NotifyingFutureImpl.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/async/NotifyingFutureImpl.java	                        (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/async/NotifyingFutureImpl.java	2010-05-06 22:19:31 UTC (rev 1750)
@@ -0,0 +1,72 @@
+package org.infinispan.client.hotrod.impl.async;
+
+import org.infinispan.util.concurrent.FutureListener;
+import org.infinispan.util.concurrent.NotifyingFuture;
+
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public class NotifyingFutureImpl<T> implements NotifyingFuture<T> {
+
+   private volatile Future<T> executing;
+   private volatile CopyOnWriteArraySet<FutureListener> listeners;
+
+   public void setExecuting(Future<T> executing) {
+      this.executing = executing;
+   }
+
+   @Override
+   public NotifyingFuture<T> attachListener(FutureListener futureListener) {
+      if (listeners == null) {
+         listeners = new CopyOnWriteArraySet<FutureListener>();
+      }
+      listeners.add(futureListener);
+      return this;
+   }
+
+   @Override
+   public boolean cancel(boolean mayInterruptIfRunning) {
+      try {
+         return executing.cancel(mayInterruptIfRunning);
+      } finally {
+         notifyFutureCompletion();
+      }
+   }
+
+   public void notifyFutureCompletion() {
+      if (listeners != null) {
+         for (FutureListener listener : listeners) {
+            listener.futureDone(this);
+         }
+      }
+   }
+
+   @Override
+   public boolean isCancelled() {
+      return executing.isCancelled();
+   }
+
+   @Override
+   public boolean isDone() {
+      return executing.isDone();
+   }
+
+   @Override
+   public T get() throws InterruptedException, ExecutionException {
+      return executing.get();
+   }
+
+   @Override
+   public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+      return executing.get(timeout, unit);
+   }
+}

Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/consistenthash/ConsistentHash.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/consistenthash/ConsistentHash.java	                        (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/consistenthash/ConsistentHash.java	2010-05-06 22:19:31 UTC (rev 1750)
@@ -0,0 +1,18 @@
+package org.infinispan.client.hotrod.impl.consistenthash;
+
+import java.net.InetSocketAddress;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public interface ConsistentHash {
+   
+   void init(LinkedHashMap<InetSocketAddress,Integer> servers2HashCode, int numKeyOwners, int hashSpace);
+
+   InetSocketAddress getServer(byte[] key);
+}

Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/consistenthash/ConsistentHashFactory.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/consistenthash/ConsistentHashFactory.java	                        (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/consistenthash/ConsistentHashFactory.java	2010-05-06 22:19:31 UTC (rev 1750)
@@ -0,0 +1,53 @@
+package org.infinispan.client.hotrod.impl.consistenthash;
+
+import org.infinispan.client.hotrod.impl.transport.VHelper;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public class ConsistentHashFactory {
+   
+   private static Log log = LogFactory.getLog(ConsistentHashFactory.class);
+
+   public final Map<Integer, String> version2ConsistentHash = new HashMap<Integer, String>();
+
+   public void init(Properties props) {
+      for (String propName : props.stringPropertyNames()) {
+         if (propName.indexOf("consistent-hash") >=0) {
+            if (log.isTraceEnabled()) log.trace("Processing consistent hash: " + propName);
+            String versionString = propName.substring("consistent-hash.".length());
+            int version = Integer.parseInt(versionString);
+            String hashFunction = props.getProperty(versionString);
+            version2ConsistentHash.put(version, hashFunction);
+            if (log.isTraceEnabled()) {
+               log.trace("Added consistent hash version " + version + ": " + hashFunction);
+            }
+         }
+      }
+   }
+
+   public ConsistentHash newConsistentHash(int version) {
+      String hashFunctionClass = version2ConsistentHash.get(version);
+      if (hashFunctionClass == null) {
+         log.trace("No hash function configured for version " + version);
+         hashFunctionClass = ConsistentHashFactory.class.getPackage().getName() + ".ConsitentHashV" + version;
+         if (log.isTraceEnabled()) log.trace("Trying to use default value: " + hashFunctionClass);
+      }
+      ConsistentHash consistentHash = null;
+      try {
+         consistentHash = (ConsistentHash) VHelper.newInstance(hashFunctionClass);
+      } catch (RuntimeException re) {
+         log.warn("Could not instantiate consistent hash for version " + version + ": " + hashFunctionClass, re);
+      }
+      return consistentHash;
+   }
+}

Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/consistenthash/ConsitentHashV1.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/consistenthash/ConsitentHashV1.java	                        (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/consistenthash/ConsitentHashV1.java	2010-05-06 22:19:31 UTC (rev 1750)
@@ -0,0 +1,48 @@
+package org.infinispan.client.hotrod.impl.consistenthash;
+
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public class ConsitentHashV1 implements ConsistentHash {
+
+   private static Log log = LogFactory.getLog(ConsitentHashV1.class);
+   
+   private volatile Map<InetSocketAddress, Integer> servers2HashCode;
+
+   private volatile int numKeyOwners;
+
+   private volatile int hashSpace;
+
+   private Random random = new Random();
+   List<InetSocketAddress> addresses = new ArrayList<InetSocketAddress>();
+
+   @Override
+   public void init(LinkedHashMap<InetSocketAddress,Integer> servers2HashCode, int numKeyOwners, int hashSpace) {
+      this.servers2HashCode = servers2HashCode;
+      this.numKeyOwners = numKeyOwners;
+      this.hashSpace = hashSpace;
+      addresses.addAll(servers2HashCode.keySet());
+   }
+
+   @Override
+   public InetSocketAddress getServer(byte[] key) {
+      InetSocketAddress addr = addresses.get(random.nextInt(addresses.size()));
+      if (log.isTraceEnabled()) {
+         log.trace("Randomly returning an address: " + addr);
+      }
+      return addr;
+   }
+}

Copied: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/AbstractTransport.java (from rev 1749, trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/AbstractTransport.java)
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/AbstractTransport.java	                        (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/AbstractTransport.java	2010-05-06 22:19:31 UTC (rev 1750)
@@ -0,0 +1,80 @@
+package org.infinispan.client.hotrod.impl.transport;
+
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public abstract class AbstractTransport implements Transport {
+
+   private static Log log = LogFactory.getLog(AbstractTransport.class);
+
+   public byte[] readArray() {
+      int responseLength = readVInt();
+      return readByteArray(responseLength);
+   }
+
+   @Override
+   public String readString() {
+      byte[] strContent = readArray();
+      String readString = new String(strContent);
+      if (log.isTraceEnabled()) {
+         log.trace("Read string is: " + readString);
+      }
+      return readString;//todo take care of encoding here
+   }
+
+   @Override
+   public long readLong() {
+      //todo - optimize this not to create the longBytes on every call, but reuse it/cache it as class is NOT thread safe
+      byte[] longBytes = readByteArray(8);
+      long result = 0;
+      for (byte longByte : longBytes) {
+         result <<= 8;
+         result ^= (long) longByte & 0xFF;
+      }
+      return result;
+   }
+
+   @Override
+   public void writeLong(long longValue) {
+      byte[] b = new byte[8];
+      for (int i = 0; i < 8; i++) {
+         b[7 - i] = (byte) (longValue >>> (i * 8));
+      }
+      writeBytes(b);
+   }
+
+   @Override
+   public int readUnsignedShort() {
+      byte[] shortBytes = readByteArray(2);
+      int result = 0;
+      for (byte longByte : shortBytes) {
+         result <<= 8;
+         result ^= (long) longByte & 0xFF;
+      }
+      return result;
+   }
+
+   @Override
+   public int read4ByteInt() {
+      byte[] b = readByteArray(4);
+      int value = 0;
+      for (int i = 0; i < 4; i++) {
+         int shift = (4 - 1 - i) * 8;
+         value += (b[i] & 0x000000FF) << shift;
+      }
+      return value;
+   }
+
+   public void writeArray(byte[] toAppend) {
+      writeVInt(toAppend.length);
+      writeBytes(toAppend);
+   }
+
+   protected abstract void writeBytes(byte[] toAppend);
+}

Copied: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/Transport.java (from rev 1749, trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/Transport.java)
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/Transport.java	                        (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/Transport.java	2010-05-06 22:19:31 UTC (rev 1750)
@@ -0,0 +1,48 @@
+package org.infinispan.client.hotrod.impl.transport;
+
+import net.jcip.annotations.NotThreadSafe;
+
+/**
+ * // TODO: Document this
+ *
+ * @author mmarkus
+ * @since 4.1
+ */
+ at NotThreadSafe
+public interface Transport {
+
+   public void writeArray(byte[] toAppend);
+
+   public void writeByte(short toWrite);
+
+   public void writeVInt(int vint);
+
+   public void writeVLong(long l);
+
+   public long readVLong();
+
+   public int readVInt();
+
+   public void flush();
+
+   public short readByte();
+
+   public void release();
+
+   /**
+    * reads an vint which is size; then an array having that size.
+    */
+   public byte[] readArray();
+
+   String readString();
+
+   byte[] readByteArray(int size);
+
+   long readLong();
+
+   void writeLong(long longValue);
+
+   int readUnsignedShort();
+
+   int read4ByteInt();
+}

Copied: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/TransportFactory.java (from rev 1749, trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/TransportFactory.java)
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/TransportFactory.java	                        (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/TransportFactory.java	2010-05-06 22:19:31 UTC (rev 1750)
@@ -0,0 +1,32 @@
+package org.infinispan.client.hotrod.impl.transport;
+
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public interface TransportFactory {
+
+   public static final String CONF_HOTROD_SERVERS = "hotrod-servers";
+
+   public Transport getTransport();
+
+   public void releaseTransport(Transport transport);
+
+   void start(Properties props, Collection<InetSocketAddress> staticConfiguredServers);
+
+   void updateServers(Collection<InetSocketAddress> newServers);
+
+   void destroy();
+
+   void updateHashFunction(LinkedHashMap<InetSocketAddress,Integer> servers2HashCode, int numKeyOwners, short hashFunctionVersion, int hashSpace);
+
+   Transport getTransport(byte[] key);
+}

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransport.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransport.java	2010-05-06 14:52:22 UTC (rev 1749)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransport.java	2010-05-06 22:19:31 UTC (rev 1750)
@@ -1,7 +1,7 @@
 package org.infinispan.client.hotrod.impl.transport.netty;
 
 import org.infinispan.client.hotrod.exceptions.TransportException;
-import org.infinispan.client.hotrod.impl.AbstractTransport;
+import org.infinispan.client.hotrod.impl.transport.AbstractTransport;
 import org.infinispan.util.logging.Log;
 import org.infinispan.util.logging.LogFactory;
 import org.jboss.netty.bootstrap.ClientBootstrap;

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransportFactory.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransportFactory.java	2010-05-06 14:52:22 UTC (rev 1749)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransportFactory.java	2010-05-06 22:19:31 UTC (rev 1750)
@@ -1,12 +1,13 @@
 package org.infinispan.client.hotrod.impl.transport.netty;
 
-import org.infinispan.client.hotrod.impl.Transport;
-import org.infinispan.client.hotrod.impl.TransportFactory;
+import org.infinispan.client.hotrod.impl.transport.Transport;
+import org.infinispan.client.hotrod.impl.transport.TransportFactory;
 import org.infinispan.util.logging.Log;
 import org.infinispan.util.logging.LogFactory;
 
 import java.net.InetSocketAddress;
 import java.util.Collection;
+import java.util.LinkedHashMap;
 import java.util.Properties;
 
 /**
@@ -48,4 +49,14 @@
    public void releaseTransport(Transport transport) {
       transport.release();
    }
+
+   @Override
+   public void updateHashFunction(LinkedHashMap<InetSocketAddress,Integer> servers2HashCode, int numKeyOwners, short hashFunctionVersion, int hashSpace) {
+      // TODO: Customise this generated block
+   }
+
+   @Override
+   public Transport getTransport(byte[] key) {
+      return getTransport();  // TODO: Customise this generated block
+   }
 }

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransport.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransport.java	2010-05-06 14:52:22 UTC (rev 1749)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransport.java	2010-05-06 22:19:31 UTC (rev 1750)
@@ -1,6 +1,6 @@
 package org.infinispan.client.hotrod.impl.transport.tcp;
 
-import org.infinispan.client.hotrod.impl.AbstractTransport;
+import org.infinispan.client.hotrod.impl.transport.AbstractTransport;
 import org.infinispan.client.hotrod.exceptions.TransportException;
 import org.infinispan.client.hotrod.impl.transport.VHelper;
 import org.infinispan.util.logging.Log;

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java	2010-05-06 14:52:22 UTC (rev 1749)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java	2010-05-06 22:19:31 UTC (rev 1750)
@@ -2,8 +2,10 @@
 
 import org.apache.commons.pool.impl.GenericKeyedObjectPool;
 import org.infinispan.client.hotrod.exceptions.TransportException;
-import org.infinispan.client.hotrod.impl.Transport;
-import org.infinispan.client.hotrod.impl.TransportFactory;
+import org.infinispan.client.hotrod.impl.consistenthash.ConsistentHash;
+import org.infinispan.client.hotrod.impl.consistenthash.ConsistentHashFactory;
+import org.infinispan.client.hotrod.impl.transport.Transport;
+import org.infinispan.client.hotrod.impl.transport.TransportFactory;
 import org.infinispan.client.hotrod.impl.transport.VHelper;
 import org.infinispan.util.logging.Log;
 import org.infinispan.util.logging.LogFactory;
@@ -11,6 +13,7 @@
 import java.net.InetSocketAddress;
 import java.util.Collection;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.Properties;
 import java.util.Set;
 
@@ -33,9 +36,12 @@
    private volatile GenericKeyedObjectPool connectionPool;
    private volatile RequestBalancingStrategy balancer;
    private volatile Collection<InetSocketAddress> servers;
+   private volatile ConsistentHash consistentHash;
+   private final ConsistentHashFactory hashFactory = new ConsistentHashFactory();
 
    @Override
    public void start(Properties props, Collection<InetSocketAddress> staticConfiguredServers) {
+      hashFactory.init(props);
       servers = staticConfiguredServers;
       String balancerClass = props.getProperty("requestBalancingStrategy", RoundRobinBalancingStrategy.class.getName());
       balancer = (RequestBalancingStrategy) VHelper.newInstance(balancerClass);
@@ -55,17 +61,36 @@
    }
 
    @Override
+   public void updateHashFunction(LinkedHashMap<InetSocketAddress,Integer> servers2HashCode, int numKeyOwners, short hashFunctionVersion, int hashSpace) {
+      ConsistentHash hash = hashFactory.newConsistentHash(hashFunctionVersion);
+      if (hash == null) {
+         log.warn("No hash function configured for version: " + hashFunctionVersion);
+      } else {
+         hash.init(servers2HashCode, numKeyOwners, hashSpace);
+      }
+      consistentHash = hash;
+   }
+
+   @Override
    public Transport getTransport() {
       InetSocketAddress server = balancer.nextServer();
-      try {
-         return (Transport) connectionPool.borrowObject(server);
-      } catch (Exception e) {
-         String message = "Could not fetch transport";
-         log.error(message, e);
-         throw new TransportException(message, e);
-      } finally {
-         logConnectionInfo(server);
+      return borrowTransportFromPool(server);
+   }
+
+   public Transport getTransport(byte[] key) {
+      InetSocketAddress server;
+      if (consistentHash != null) {
+         server = consistentHash.getServer(key);
+         if (log.isTraceEnabled()) {
+            log.trace("Using consistent hash for determining the server: " + server);
+         }
+      } else {
+         server = balancer.nextServer();
+         if (log.isTraceEnabled()) {
+            log.trace("Using the balancer for determining the server: " + server);
+         }
       }
+      return borrowTransportFromPool(server);
    }
 
    @Override
@@ -134,4 +159,16 @@
          log.trace("For server " + server + ": active = " + connectionPool.getNumActive(server) + "; idle = " + connectionPool.getNumIdle(server));
       }
    }
+
+   private Transport borrowTransportFromPool(InetSocketAddress server) {
+      try {
+         return (Transport) connectionPool.borrowObject(server);
+      } catch (Exception e) {
+         String message = "Could not fetch transport";
+         log.error(message, e);
+         throw new TransportException(message, e);
+      } finally {
+         logConnectionInfo(server);
+      }
+   }
 }

Modified: trunk/client/hotrod-client/src/main/resources/hotrod-client.properties
===================================================================
--- trunk/client/hotrod-client/src/main/resources/hotrod-client.properties	2010-05-06 14:52:22 UTC (rev 1749)
+++ trunk/client/hotrod-client/src/main/resources/hotrod-client.properties	2010-05-06 22:19:31 UTC (rev 1750)
@@ -66,3 +66,17 @@
 #behave as FIFO queues - connections are taken from idle connections pools in the order that they are returned.
 #The default setting for this parameter is true.
 lifo = true
+
+## bellow is the async executor factory config
+
+# async-executor-factory must implement org.infinispan.executors.ExecutorFactory. If not specified, defaults to org.infinispan.client.hotrod.DefaultAsyncExecutorFactory
+asyn-executor-factory=org.infinispan.client.hotrod.DefaultAsyncExecutorFactory
+
+# used as a configuration for DefaultAsyncExecutorFactory, and defined the number of threads to keep in the pool. If not specified defaults to 1.
+default-executor-factory.poolSize = 1;
+
+# queue to use for holding async requests before they are executed. Defaults to 
+default-executor-factory.queueSize = 100000
+
+consistent-hash.1=org.infinispan.client.hotrod.impl.consistenthash.ConsitentHashV1
+

Added: trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/CSAIntegrationTest.java
===================================================================
--- trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/CSAIntegrationTest.java	                        (rev 0)
+++ trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/CSAIntegrationTest.java	2010-05-06 22:19:31 UTC (rev 1750)
@@ -0,0 +1,69 @@
+package org.infinispan.client.hotrod;
+
+import org.infinispan.client.hotrod.impl.transport.tcp.TcpTransportFactory;
+import org.infinispan.config.Configuration;
+import org.infinispan.manager.CacheManager;
+import org.infinispan.server.hotrod.HotRodServer;
+import org.infinispan.test.MultipleCacheManagersTest;
+import org.infinispan.test.TestingUtil;
+import org.infinispan.test.fwk.TestCacheManagerFactory;
+import org.testng.annotations.Test;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+ at Test(groups = "functional", testName = "client.hotrod.CSAIntegrationTest")
+public class CSAIntegrationTest extends MultipleCacheManagersTest {
+
+   private HotRodServer hotRodServer1;
+   private HotRodServer hotRodServer2;
+   private RemoteCacheManager remoteCacheManager;
+   private RemoteCache<Object, Object> remoteCache;
+   private TcpTransportFactory tcpConnectionFactory;
+
+
+   @Override
+   protected void createCacheManagers() throws Throwable {
+      cleanup = CleanupPhase.AFTER_METHOD;
+      Configuration config = TestHelper.getMultiNodeConfig();
+      CacheManager cm1 = TestCacheManagerFactory.createClusteredCacheManager(config);
+      CacheManager cm2 = TestCacheManagerFactory.createClusteredCacheManager(config);
+      registerCacheManager(cm1);
+      registerCacheManager(cm2);
+
+      hotRodServer1 = TestHelper.startHotRodServer(manager(0));
+      hotRodServer2 = TestHelper.startHotRodServer(manager(1));
+
+      manager(0).getCache();
+      manager(1).getCache();
+
+      TestingUtil.blockUntilViewReceived(manager(0).getCache(), 2, 10000);
+      TestingUtil.blockUntilViewReceived(manager(1).getCache(), 2, 10000);
+
+
+      manager(0).getCache().put("k","v");
+      manager(0).getCache().get("k").equals("v");
+      manager(1).getCache().get("k").equals("v");
+
+      log.info("Local replication test passed!");
+
+      //Important: this only connects to one of the two servers!
+      remoteCacheManager = new RemoteCacheManager("localhost:" + hotRodServer2.getPort() + ";localhost:" + hotRodServer2.getPort());
+      remoteCache = remoteCacheManager.getCache();
+
+      tcpConnectionFactory = (TcpTransportFactory) TestingUtil.extractField(remoteCacheManager, "transportFactory");
+   }
+
+   public void testPing() {
+      assert tcpConnectionFactory.getServers().size() == 1;
+      remoteCache.ping();
+      assert tcpConnectionFactory.getServers().size() == 2;
+   }
+
+   public void testHashInfoRetrieved() {
+      remoteCache.put("k", "v");
+   }
+}

Modified: trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HotRodIntegrationTest.java
===================================================================
--- trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HotRodIntegrationTest.java	2010-05-06 14:52:22 UTC (rev 1749)
+++ trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HotRodIntegrationTest.java	2010-05-06 22:19:31 UTC (rev 1750)
@@ -19,7 +19,6 @@
 
 import static junit.framework.Assert.assertEquals;
 import static org.testng.Assert.assertNull;
-import static org.testng.AssertJUnit.assertTrue;
 
 
 /**
@@ -144,21 +143,21 @@
 
       remoteCache.put("aKey", "aValue");
       VersionedValue valueBinary = remoteCache.getVersioned("aKey");
-      assert remoteCache.replace("aKey", "aNewValue", valueBinary.getVersion());
+      assert remoteCache.replaceWithVersion("aKey", "aNewValue", valueBinary.getVersion());
 
       VersionedValue entry2 = remoteCache.getVersioned("aKey");
       assert entry2.getVersion() != valueBinary.getVersion();
       assertEquals(entry2.getValue(), "aNewValue");
 
-      assert !remoteCache.replace("aKey", "aNewValue", valueBinary.getVersion());
+      assert !remoteCache.replaceWithVersion("aKey", "aNewValue", valueBinary.getVersion());
    }
 
    public void testRemoveIfUnmodified() {
-      assert !remoteCache.remove("aKey", 12321212l);
+      assert !remoteCache.removeWithVersion("aKey", 12321212l);
 
       remoteCache.put("aKey", "aValue");
       VersionedValue valueBinary = remoteCache.getVersioned("aKey");
-      assert remoteCache.remove("aKey", valueBinary.getVersion());
+      assert remoteCache.removeWithVersion("aKey", valueBinary.getVersion());
       assert !cache.containsKey("aKey");
 
       remoteCache.put("aKey", "aNewValue");
@@ -167,7 +166,7 @@
       assert entry2.getVersion() != valueBinary.getVersion();
       assertEquals(entry2.getValue(), "aNewValue");
 
-      assert  !remoteCache.remove("aKey", valueBinary.getVersion());
+      assert  !remoteCache.removeWithVersion("aKey", valueBinary.getVersion());
    }
 
    public void testPutIfAbsent() {
@@ -199,7 +198,7 @@
       if (value == null) {
          assert cacheValue == null : "Expected null value but received: " + cacheValue;
       } else {
-         assert Arrays.equals(valueBytes, cacheValue.data());
+         assert Arrays.equals(valueBytes, (byte[])cacheValue.data());
       }
    }
 }

Added: trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/RemoteAsyncAPITest.java
===================================================================
--- trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/RemoteAsyncAPITest.java	                        (rev 0)
+++ trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/RemoteAsyncAPITest.java	2010-05-06 22:19:31 UTC (rev 1750)
@@ -0,0 +1,160 @@
+package org.infinispan.client.hotrod;
+
+import org.infinispan.manager.CacheManager;
+import org.infinispan.server.hotrod.HotRodServer;
+import org.infinispan.test.SingleCacheManagerTest;
+import org.infinispan.test.fwk.TestCacheManagerFactory;
+import org.infinispan.util.concurrent.NotifyingFuture;
+import org.testng.annotations.Test;
+
+import java.util.Collections;
+import java.util.Properties;
+import java.util.concurrent.Future;
+
+import static junit.framework.Assert.assertEquals;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+ at Test(groups = "functional", testName = "client.hotrod.RemoteAsyncAPITest")
+public class RemoteAsyncAPITest extends SingleCacheManagerTest {
+   private HotRodServer hotrodServer;
+   private RemoteCacheManager rcm;
+   private RemoteCache<String, String> c;
+
+   @Override
+   protected CacheManager createCacheManager() throws Exception {
+      CacheManager cm = TestCacheManagerFactory.createLocalCacheManager();
+      cache = cm.getCache();
+      hotrodServer = TestHelper.startHotRodServer(cm);
+      Properties props = new Properties();
+      props.put("hotrod-servers", "127.0.0.1:" + hotrodServer.getPort());
+      props.put("force-return-value","true");
+      props.put("testOnBorrow", "false");
+      rcm = new RemoteCacheManager(props);
+      c = rcm.getCache(true);
+      return cm;
+   }
+
+   public void testAsyncPut() throws Exception {
+      // put
+      Future<String> f = c.putAsync("k", "v");
+      assert f != null;
+      assert !f.isCancelled();
+      assert f.get() == null;
+      assert c.get("k").equals("v");
+
+
+      f = c.putAsync("k", "v2");
+      assert f != null;
+      assert !f.isCancelled();
+      assert f.get().equals("v") : "Obtained " + f.get();
+      assert c.get("k").equals("v2");
+   }
+
+   public void testAsyncPutAll() throws Exception {
+      Future<Void> f2 = c.putAllAsync(Collections.singletonMap("k", "v3"));
+      assert f2 != null;
+      assert !f2.isCancelled();
+      assert f2.get() == null;
+      assert f2.isDone();
+      assert c.get("k").equals("v3");
+   }
+
+
+   public void testAsyncPutIfAbsent() throws Exception {
+      Future<Void> f2 = c.putAllAsync(Collections.singletonMap("k", "v3"));
+      assert f2.get() == null;
+      assert f2.isDone();
+      assert c.get("k").equals("v3");
+      Future f = c.putIfAbsentAsync("k", "v4");
+      assert f != null;
+      assert !f.isCancelled();
+      assert "v3".equals(f.get()) : "Obtained " + f.get();
+      assert f.isDone();
+      assert c.get("k").equals("v3");
+   }
+
+   public void testRemoveAsync() throws Exception {
+      c.put("k","v3");
+      Future<String> f = c.removeAsync("k");
+      assert f != null;
+      assert !f.isCancelled();
+      assert f.get().equals("v3");
+      assert f.isDone();
+      assert c.get("k") == null;
+   }
+
+
+   public void testPutIfAbsentAsync() throws Exception {
+      Future f = c.putIfAbsentAsync("k", "v4");
+      assert f != null;
+      assert !f.isCancelled();
+      assert f.get() == null;
+      assert f.isDone();
+      assert c.get("k").equals("v4");
+   }
+
+
+   public void testVersionedRemove() throws Exception {
+
+      c.put("k","v4");
+      VersionedValue value = c.getVersioned("k");
+
+      Future<Boolean> f3 = c.removeWithVersionAsync("k", value.getVersion() + 1);
+      assert f3 != null;
+      assert !f3.isCancelled();
+      assert f3.get().equals(false);
+      assert f3.isDone();
+
+      assert c.get("k").equals("v4");
+
+      f3 = c.removeWithVersionAsync("k", value.getVersion());
+      assert f3 != null;
+      assert !f3.isCancelled();
+      assert f3.get().equals(true);
+      assert f3.isDone();
+      assert c.get("k") == null;
+   }
+
+
+
+   public void testReplaceAsync() throws Exception {
+      Future f = c.replaceAsync("k", "v5");
+      assert f != null;
+      assert !f.isCancelled();
+      assert f.get() == null;
+      assert c.get("k") == null;
+      assert f.isDone();
+
+      c.put("k", "v");
+      f = c.replaceAsync("k", "v5");
+      assert f != null;
+      assert !f.isCancelled();
+      assert f.get().equals("v");
+      assert c.get("k").equals("v5");
+      assert f.isDone();
+   }
+
+   public void testVersionedReplace() throws Exception {
+      assert null == c.replace("aKey", "aValue");
+
+
+      c.put("aKey", "aValue");
+      VersionedValue valueBinary = c.getVersioned("aKey");
+      NotifyingFuture<Boolean> future = c.replaceWithVersionAsync("aKey", "aNewValue", valueBinary.getVersion());
+      assert !future.isCancelled();
+      assert future.get();
+      assert future.isDone();
+
+      VersionedValue entry2 = c.getVersioned("aKey");
+      assert entry2.getVersion() != valueBinary.getVersion();
+      assertEquals(entry2.getValue(), "aNewValue");
+
+      assert !c.replaceWithVersion("aKey", "aNewValue", valueBinary.getVersion());
+
+   }
+}

Modified: trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/TestHelper.java
===================================================================
--- trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/TestHelper.java	2010-05-06 14:52:22 UTC (rev 1749)
+++ trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/TestHelper.java	2010-05-06 22:19:31 UTC (rev 1750)
@@ -1,5 +1,6 @@
 package org.infinispan.client.hotrod;
 
+import org.infinispan.config.Configuration;
 import org.infinispan.manager.CacheManager;
 import org.infinispan.server.hotrod.HotRodServer;
 import org.infinispan.server.hotrod.test.HotRodTestingUtil;
@@ -30,4 +31,14 @@
       }
       return builder.toString();
    }
+
+   public static Configuration getMultiNodeConfig() {
+      Configuration result = new Configuration();
+      result.setCacheMode(Configuration.CacheMode.DIST_SYNC);
+      result.setSyncReplTimeout(10000);
+//      result.setFetchInMemoryState(true);
+      result.setSyncCommitPhase(true);
+      result.setSyncRollbackPhase(true);
+      return result;      
+   }
 }

Modified: trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/TopologyChangeTest.java
===================================================================
--- trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/TopologyChangeTest.java	2010-05-06 14:52:22 UTC (rev 1749)
+++ trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/TopologyChangeTest.java	2010-05-06 22:19:31 UTC (rev 1750)
@@ -3,6 +3,7 @@
 import org.infinispan.Cache;
 import org.infinispan.client.hotrod.impl.transport.tcp.TcpTransportFactory;
 import org.infinispan.config.Configuration;
+import org.infinispan.lifecycle.ComponentStatus;
 import org.infinispan.manager.CacheManager;
 import org.infinispan.server.hotrod.HotRodServer;
 import org.infinispan.test.MultipleCacheManagersTest;
@@ -30,6 +31,7 @@
    RemoteCache remoteCache;
    private RemoteCacheManager remoteCacheManager;
    private TcpTransportFactory tcpConnectionFactory;
+   private Configuration config;
 
    @Override
    protected void assertSupportedConfig() {      
@@ -37,7 +39,7 @@
 
    @Override
    protected void createCacheManagers() throws Throwable {
-      Configuration config = getDefaultClusteredConfig(Configuration.CacheMode.DIST_SYNC);
+      config = getDefaultClusteredConfig(Configuration.CacheMode.DIST_SYNC);
       CacheManager cm1 = TestCacheManagerFactory.createClusteredCacheManager(config);
       CacheManager cm2 = TestCacheManagerFactory.createClusteredCacheManager(config);
       registerCacheManager(cm1);
@@ -50,7 +52,8 @@
       manager(1).getCache();
 
       TestingUtil.blockUntilViewReceived(manager(0).getCache(), 2, 10000);
-      TestingUtil.blockUntilViewReceived(manager(1).getCache(), 2, 10000);
+      TestingUtil.blockUntilCacheStatusAchieved(manager(0).getCache(), ComponentStatus.RUNNING, 10000);
+      TestingUtil.blockUntilCacheStatusAchieved(manager(1).getCache(), ComponentStatus.RUNNING, 10000);
 
 
       manager(0).getCache().put("k","v");
@@ -75,10 +78,16 @@
 
    @Test(dependsOnMethods = "testTwoMembers")
    public void testAddNewServer() {
-      addClusterEnabledCacheManager();
+      CacheManager cm3 = TestCacheManagerFactory.createClusteredCacheManager(config);
+      registerCacheManager(cm3);
+      hotRodServer3 = TestHelper.startHotRodServer(manager(2));
       manager(2).getCache();
+
       TestingUtil.blockUntilViewsReceived(10000, true, manager(0), manager(1), manager(2));
-      hotRodServer3 = TestHelper.startHotRodServer(manager(2));
+      TestingUtil.blockUntilCacheStatusAchieved(manager(0).getCache(), ComponentStatus.RUNNING, 10000);
+      TestingUtil.blockUntilCacheStatusAchieved(manager(1).getCache(), ComponentStatus.RUNNING, 10000);
+      TestingUtil.blockUntilCacheStatusAchieved(manager(2).getCache(), ComponentStatus.RUNNING, 10000);
+
       expectTopologyChange(new InetSocketAddress("localhost",hotRodServer3.getPort()), true);
       assertEquals(3, tcpConnectionFactory.getServers().size());
    }
@@ -87,6 +96,10 @@
    public void testDropServer() {
       manager(2).stop();
       TestingUtil.blockUntilViewsReceived(10000, true, manager(0), manager(1));
+      TestingUtil.blockUntilViewsReceived(10000, true, manager(0), manager(1), manager(2));
+      TestingUtil.blockUntilCacheStatusAchieved(manager(0).getCache(), ComponentStatus.RUNNING, 10000);
+      TestingUtil.blockUntilCacheStatusAchieved(manager(1).getCache(), ComponentStatus.RUNNING, 10000);
+      
       InetSocketAddress server3Address = new InetSocketAddress("localhost", hotRodServer3.getPort());
       hotRodServer3.stop();
       expectTopologyChange(server3Address, false);
@@ -96,7 +109,8 @@
    private void expectTopologyChange(InetSocketAddress server1Address, boolean added) {
       for (int i = 0; i < 10; i++) {
          try {
-            remoteCache.put("k" + i, "v" + i);
+//            remoteCache.put("k" + i, "v" + i);
+            remoteCache.ping();
          } catch (Exception e) {
             if (added) {
                throw new IllegalStateException(e);

Modified: trunk/core/src/main/java/org/infinispan/io/UnsignedNumeric.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/io/UnsignedNumeric.java	2010-05-06 14:52:22 UTC (rev 1749)
+++ trunk/core/src/main/java/org/infinispan/io/UnsignedNumeric.java	2010-05-06 22:19:31 UTC (rev 1750)
@@ -5,6 +5,7 @@
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.io.OutputStream;
+import java.nio.Buffer;
 
 /**
  * Helper to read and write unsigned numerics
@@ -36,6 +37,16 @@
       }
       return i;
    }
+   
+   public static int readUnsignedInt(java.nio.ByteBuffer in) throws IOException {
+      int b = in.get();
+      int i = b & 0x7F;
+      for (int shift = 7; (b & 0x80) != 0; shift += 7) {
+         b = in.get();
+         i |= (b & 0x7FL) << shift;
+      }
+      return i;
+   }
 
    /**
     * Writes an int in a variable-length format.  Writes between one and five bytes.  Smaller values take fewer bytes.
@@ -58,6 +69,14 @@
       }
       out.write((byte) i);
    }
+   
+   public static void writeUnsignedInt(java.nio.ByteBuffer out, int i) throws IOException {
+      while ((i & ~0x7F) != 0) {
+         out.put((byte) ((i & 0x7f) | 0x80));
+         i >>>= 7;
+      }
+      out.put((byte) i);
+   }
 
 
    /**
@@ -83,6 +102,15 @@
       }
       return i;
    }
+   public static long readUnsignedLong(java.nio.ByteBuffer in) throws IOException {
+      int b = in.get();
+      long i = b & 0x7F;
+      for (int shift = 7; (b & 0x80) != 0; shift += 7) {
+         b = in.get();
+         i |= (b & 0x7FL) << shift;
+      }
+      return i;
+   }
 
    /**
     * Writes an int in a variable-length format.  Writes between one and nine bytes.  Smaller values take fewer bytes.
@@ -106,6 +134,14 @@
       out.write((byte) i);
    }
 
+   public static void writeUnsignedLong(java.nio.ByteBuffer out, long i) throws IOException {
+      while ((i & ~0x7F) != 0) {
+         out.put((byte) ((i & 0x7f) | 0x80));
+         i >>>= 7;
+      }
+      out.put((byte) i);
+   }
+
      /**
     * Reads an int stored in variable-length format.  Reads between one and five bytes.  Smaller values take fewer
     * bytes.  Negative numbers are not supported.

Modified: trunk/core/src/test/java/org/infinispan/test/fwk/TestCacheManagerFactory.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/test/fwk/TestCacheManagerFactory.java	2010-05-06 14:52:22 UTC (rev 1749)
+++ trunk/core/src/test/java/org/infinispan/test/fwk/TestCacheManagerFactory.java	2010-05-06 22:19:31 UTC (rev 1750)
@@ -116,13 +116,7 @@
     * Creates an cache manager that does support clustering.
     */
    public static CacheManager createClusteredCacheManager() {
-      GlobalConfiguration globalConfiguration = GlobalConfiguration.getClusteredDefault();
-      amendMarshaller(globalConfiguration);
-      minimizeThreads(globalConfiguration);
-      Properties newTransportProps = new Properties();
-      newTransportProps.put(JGroupsTransport.CONFIGURATION_STRING, JGroupsConfigBuilder.getJGroupsConfig());
-      globalConfiguration.setTransportProperties(newTransportProps);
-      return newDefaultCacheManager(globalConfiguration, new Configuration(), false);
+      return createClusteredCacheManager(new Configuration());
    }
 
    /**



More information about the infinispan-commits mailing list