[infinispan-commits] Infinispan SVN: r2034 - in trunk/client/hotrod-client: src/main/java/org/infinispan/client/hotrod and 5 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Wed Jul 14 12:50:14 EDT 2010


Author: mircea.markus
Date: 2010-07-14 12:50:13 -0400 (Wed, 14 Jul 2010)
New Revision: 2034

Added:
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/
Removed:
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodOperationsHelper.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodOperationsImpl.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/
   trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/NettyHotRodIntegrationTest.java
Modified:
   trunk/client/hotrod-client/pom.xml
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCache.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/RemoteCacheImpl.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodConstants.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/AbstractTransport.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TransportObjectFactory.java
   trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HeavyLoadConnectionPoolingTest.java
   trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HotRodIntegrationTest.java
   trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/RemoteCacheManagerTest.java
   trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ReplTopologyChangeTest.java
   trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/WorkerThread.java
Log:
migrated 2033 to trunk

Modified: trunk/client/hotrod-client/pom.xml
===================================================================
--- trunk/client/hotrod-client/pom.xml	2010-07-14 16:10:00 UTC (rev 2033)
+++ trunk/client/hotrod-client/pom.xml	2010-07-14 16:50:13 UTC (rev 2034)
@@ -10,10 +10,6 @@
       <relativePath>../../parent/pom.xml</relativePath>
    </parent>
 
-   <properties>
-      <version.netty>3.2.1.Final</version.netty>
-   </properties>
-
    <artifactId>infinispan-client-hotrod</artifactId>
    <name>Infinispan Client Hotrod Module</name>
    <description>Infinispan client hotrod module</description>
@@ -50,13 +46,6 @@
       </dependency>
 
       <dependency>
-         <groupId>org.jboss.netty</groupId>
-         <artifactId>netty</artifactId>
-         <version>${version.netty}</version>
-         <scope>compile</scope>
-      </dependency>
-
-      <dependency>
          <groupId>commons-pool</groupId>
          <artifactId>commons-pool</artifactId>
          <version>1.5.4</version>

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCache.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCache.java	2010-07-14 16:10:00 UTC (rev 2033)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCache.java	2010-07-14 16:50:13 UTC (rev 2034)
@@ -56,7 +56,6 @@
  */
 public interface RemoteCache<K, V> extends Cache<K, V> {
 
-
    /**
     * Removes the given entry only if its version matches the supplied version. A typical use case looks like this:
     * <pre>

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java	2010-07-14 16:10:00 UTC (rev 2033)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java	2010-07-14 16:50:13 UTC (rev 2034)
@@ -2,19 +2,15 @@
 
 import org.infinispan.client.hotrod.exceptions.HotRodClientException;
 import org.infinispan.client.hotrod.impl.async.DefaultAsyncExecutorFactory;
-import org.infinispan.client.hotrod.impl.protocol.HotRodOperations;
-import org.infinispan.client.hotrod.impl.protocol.HotRodOperationsImpl;
+import org.infinispan.client.hotrod.impl.operations.OperationsFactory;
 import org.infinispan.client.hotrod.impl.RemoteCacheImpl;
 import org.infinispan.client.hotrod.impl.transport.TransportFactory;
-
 import org.infinispan.client.hotrod.impl.transport.tcp.TcpTransportFactory;
-import org.infinispan.config.ConfigurationException;
 import org.infinispan.executors.ExecutorFactory;
 import org.infinispan.manager.CacheContainer;
 import org.infinispan.marshall.Marshaller;
 import org.infinispan.marshall.jboss.GenericJBossMarshaller;
 import org.infinispan.util.TypedProperties;
-import org.infinispan.util.Util;
 import org.infinispan.util.logging.Log;
 import org.infinispan.util.logging.LogFactory;
 
@@ -30,7 +26,6 @@
 import java.util.StringTokenizer;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
-
 import static org.infinispan.util.Util.getInstance;
 
 /**
@@ -144,8 +139,13 @@
    public static final String OVERRIDE_HOTROD_SERVERS = "infinispan.hotrod.client.servers";
 
    private static final String KEY_SIZE = "marshaller.default-array-size.key";
+
    private static final String VALUE_SIZE = "marshaller.default-array-size.value";
 
+   private static final int DEFAULT_KEY_SIZE = 64;
+   private static final int DEFAULT_VALUE_SIZE = 512;
+
+
    private TypedProperties props;
    private TransportFactory transportFactory;
    private Marshaller marshaller;
@@ -154,8 +154,6 @@
    private ExecutorService asyncExecutorService;
    private final Map<String, RemoteCacheImpl> cacheName2RemoteCache = new HashMap<String, RemoteCacheImpl>();
    private AtomicInteger topologyId = new AtomicInteger();
-   private static final int DEFAULT_KEY_SIZE = 64;
-   private static final int DEFAULT_VALUE_SIZE = 512;
 
 
    /**
@@ -175,10 +173,10 @@
    }
 
    /**
-    * Same as {@link org.infinispan.client.hotrod.RemoteCacheManager#RemoteCacheManager(HotRodMarshaller, java.util.Properties, boolean)} with start = true.
+    * Same as {@link org.infinispan.client.hotrod.RemoteCacheManager#RemoteCacheManager(Marshaller, java.util.Properties, boolean)} with start = true.
     */
-   public RemoteCacheManager(Marshaller marshaller, Properties props) {
-      this(marshaller, props, false);
+   public RemoteCacheManager(Marshaller hotRodMarshaller, Properties props) {
+      this(hotRodMarshaller, props, false);
    }
 
    /**
@@ -325,8 +323,7 @@
       if (props.contains("asyn-executor-factory")) {
          asyncExecutorClass = props.getProperty("asyn-executor-factory");
       }
-      ExecutorFactory executorFactory = null;
-      executorFactory = (ExecutorFactory) getInstance(asyncExecutorClass);
+      ExecutorFactory executorFactory = (ExecutorFactory) getInstance(asyncExecutorClass);
       asyncExecutorService = executorFactory.getExecutor(props);
 
 
@@ -343,7 +340,9 @@
 
    @Override
    public void stop() {
-      transportFactory.destroy();
+      if (isStarted()) {
+         transportFactory.destroy();
+      }
       started = false;
    }
 
@@ -363,10 +362,8 @@
    private <K, V> RemoteCache<K, V> createRemoteCache(String cacheName, boolean forceReturnValue) {
       synchronized (cacheName2RemoteCache) {
          if (!cacheName2RemoteCache.containsKey(cacheName)) {
-            RemoteCacheImpl<K, V> result = new RemoteCacheImpl<K, V>(this, cacheName, forceReturnValue);
-            if (isStarted()) {
-               startRemoteCache(result);
-            }
+            RemoteCacheImpl<K, V> result = new RemoteCacheImpl<K, V>(this, cacheName);
+            startRemoteCache(result);
             cacheName2RemoteCache.put(cacheName, result);
             return result;
          } else {
@@ -376,9 +373,8 @@
    }
 
    private <K, V> void startRemoteCache(RemoteCacheImpl<K, V> result) {
-      HotRodOperations hotRodOperations = new HotRodOperationsImpl(result.getName(), transportFactory, topologyId);
-      result.init(hotRodOperations, marshaller, asyncExecutorService,
-              props.getIntProperty(KEY_SIZE, DEFAULT_KEY_SIZE),
+      OperationsFactory operationsFactory = new OperationsFactory(transportFactory, result.getName(), topologyId, forceReturnValueDefault);
+      result.init(marshaller, asyncExecutorService, operationsFactory, props.getIntProperty(KEY_SIZE, DEFAULT_KEY_SIZE),
               props.getIntProperty(VALUE_SIZE, DEFAULT_VALUE_SIZE));
    }
 

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/RemoteCacheImpl.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/RemoteCacheImpl.java	2010-07-14 16:10:00 UTC (rev 2033)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/RemoteCacheImpl.java	2010-07-14 16:50:13 UTC (rev 2034)
@@ -9,7 +9,7 @@
 import org.infinispan.client.hotrod.exceptions.RemoteCacheManagerNotStartedException;
 import org.infinispan.client.hotrod.exceptions.TransportException;
 import org.infinispan.client.hotrod.impl.async.NotifyingFutureImpl;
-import org.infinispan.client.hotrod.impl.protocol.HotRodOperations;
+import org.infinispan.client.hotrod.impl.operations.*;
 import org.infinispan.marshall.Marshaller;
 import org.infinispan.util.concurrent.NotifyingFuture;
 import org.infinispan.util.logging.Log;
@@ -30,31 +30,27 @@
 
    private static Log log = LogFactory.getLog(RemoteCacheImpl.class);
 
-   private static final Flag[] FORCE_RETURN_VALUE = {Flag.FORCE_RETURN_VALUE};
-
-   private ThreadLocal<Flag[]> flagsMap = new ThreadLocal<Flag[]>();
-   private HotRodOperations operations;
    private Marshaller marshaller;
    private final String name;
    private final RemoteCacheManager remoteCacheManager;
    private volatile ExecutorService executorService;
-   private volatile boolean forceReturnValue;
+   private OperationsFactory operationsFactory;
    private int estimateKeySize;
    private int estimateValueSize;
 
-   public RemoteCacheImpl(RemoteCacheManager rcm, String name, boolean forceReturnValue) {
+
+   public RemoteCacheImpl(RemoteCacheManager rcm, String name) {
       if (log.isTraceEnabled()) {
          log.trace("Creating remote cache: " + name);
       }
       this.name = name;
-      this.forceReturnValue = forceReturnValue;
       this.remoteCacheManager = rcm;
    }
 
-   public void init(HotRodOperations operations, Marshaller marshaller, ExecutorService executorService, int estimateKeySize, int estimateValueSize) {
-      this.operations = operations;
+   public void init(Marshaller marshaller, ExecutorService executorService, OperationsFactory operationsFactory, int estimateKeySize, int estimateValueSize) {
       this.marshaller = marshaller;
       this.executorService = executorService;
+      this.operationsFactory = operationsFactory;
       this.estimateKeySize = estimateKeySize;
       this.estimateValueSize = estimateValueSize;
    }
@@ -63,18 +59,11 @@
       return remoteCacheManager;
    }
 
-   private byte[] obj2bytes(Object o, boolean isKey) {
-      try {
-         return marshaller.objectToByteBuffer(o, isKey ? estimateKeySize : estimateValueSize);
-      } catch (IOException ioe) {
-         throw new TransportException("Unable to marshall object of type [" + o.getClass().getName() + "]", ioe);
-      }
-   }
-
    @Override
    public boolean removeWithVersion(K key, long version) {
       assertRemoteCacheManagerIsStarted();
-      VersionedOperationResponse response = operations.removeIfUnmodified(obj2bytes(key, true), version, flags());
+      RemoveIfUnmodifiedOperation op = operationsFactory.newRemoveIfUnmodifiedOperation(obj2bytes(key, true), version);
+      VersionedOperationResponse response = (VersionedOperationResponse) op.execute();
       return response.getCode().isUpdated();
    }
 
@@ -97,7 +86,8 @@
    @Override
    public boolean replaceWithVersion(K key, V newValue, long version, int lifespanSeconds, int maxIdleTimeSeconds) {
       assertRemoteCacheManagerIsStarted();
-      VersionedOperationResponse response = operations.replaceIfUnmodified(obj2bytes(key, true), obj2bytes(newValue, false), lifespanSeconds, maxIdleTimeSeconds, version, flags());
+      ReplaceIfUnmodifiedOperation op = operationsFactory.newReplaceIfUnmodifiedOperation(obj2bytes(key, true), obj2bytes(newValue, false), lifespanSeconds, maxIdleTimeSeconds, version);
+      VersionedOperationResponse response = (VersionedOperationResponse) op.execute();
       return response.getCode().isUpdated();
    }
 
@@ -120,7 +110,8 @@
    @Override
    public VersionedValue<V> getVersioned(K key) {
       assertRemoteCacheManagerIsStarted();
-      BinaryVersionedValue value = operations.getWithVersion(obj2bytes(key, true), flags());
+      GetWithVersionOperation op = operationsFactory.newGetWithVersionOperation(obj2bytes(key, true));
+      BinaryVersionedValue value = (BinaryVersionedValue) op.execute();
       return binary2VersionedValue(value);
    }
 
@@ -152,7 +143,8 @@
    @Override
    public ServerStatistics stats() {
       assertRemoteCacheManagerIsStarted();
-      Map<String, String> statsMap = operations.stats();
+      StatsOperation op = operationsFactory.newStatsOperation();
+      Map<String, String> statsMap = (Map<String, String>) op.execute();
       ServerStatisticsImpl stats = new ServerStatisticsImpl();
       for (Map.Entry<String, String> entry : statsMap.entrySet()) {
          stats.addStats(entry.getKey(), entry.getValue());
@@ -161,16 +153,6 @@
    }
 
    @Override
-   public String getName() {
-      return name;
-   }
-
-   @Override
-   public String getVersion() {
-      return Version.getProtocolVersion();
-   }
-
-   @Override
    public V put(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdleTime, TimeUnit maxIdleTimeUnit) {
       assertRemoteCacheManagerIsStarted();
       int lifespanSecs = toSeconds(lifespan, lifespanUnit);
@@ -178,7 +160,8 @@
       if (log.isTraceEnabled()) {
          log.trace("About to add (K,V): (" + key + ", " + value + ") lifespanSecs:" + lifespanSecs + ", maxIdleSecs:" + maxIdleSecs);
       }
-      byte[] result = operations.put(obj2bytes(key, true), obj2bytes(value, false), lifespanSecs, maxIdleSecs, flags());
+      PutOperation op = operationsFactory.newPutKeyValueOperation(obj2bytes(key, true), obj2bytes(value, false), lifespanSecs, maxIdleSecs);
+      byte[] result = (byte[]) op.execute();
       return (V) bytes2obj(result);
    }
 
@@ -188,7 +171,8 @@
       assertRemoteCacheManagerIsStarted();
       int lifespanSecs = toSeconds(lifespan, lifespanUnit);
       int maxIdleSecs = toSeconds(maxIdleTime, maxIdleTimeUnit);
-      byte[] bytes = operations.putIfAbsent(obj2bytes(key, true), obj2bytes(value, false), lifespanSecs, maxIdleSecs, flags());
+      PutIfAbsentOperation op = operationsFactory.newPutIfAbsentOperation(obj2bytes(key, true), obj2bytes(value, false), lifespanSecs, maxIdleSecs);
+      byte[] bytes = (byte[]) op.execute();
       return (V) bytes2obj(bytes);
    }
 
@@ -197,7 +181,8 @@
       assertRemoteCacheManagerIsStarted();
       int lifespanSecs = toSeconds(lifespan, lifespanUnit);
       int maxIdleSecs = toSeconds(maxIdleTime, maxIdleTimeUnit);
-      byte[] bytes = operations.replace(obj2bytes(key, true), obj2bytes(value, false), lifespanSecs, maxIdleSecs, flags());
+      ReplaceOperation op = operationsFactory.newReplaceOperation(obj2bytes(key, true), obj2bytes(value, false), lifespanSecs, maxIdleSecs);
+      byte[] bytes = (byte[]) op.execute();
       return (V) bytes2obj(bytes);
    }
 
@@ -284,27 +269,32 @@
    @Override
    public boolean containsKey(Object key) {
       assertRemoteCacheManagerIsStarted();
-      return operations.containsKey(obj2bytes(key, true), flags());
+      ContainsKeyOperation op = operationsFactory.newContainsKeyOperation(obj2bytes(key, true));
+      return (Boolean)op.execute();
    }
 
    @Override
    public V get(Object key) {
       assertRemoteCacheManagerIsStarted();
-      byte[] bytes = operations.get(obj2bytes(key, true), flags());
+      byte[] keyBytes = obj2bytes(key, true);
+      GetOperation gco = operationsFactory.newGetKeyOperation(keyBytes);
+      byte[] bytes = (byte[]) gco.execute();
       return (V) bytes2obj(bytes);
    }
 
    @Override
    public V remove(Object key) {
       assertRemoteCacheManagerIsStarted();
-      byte[] existingValue = operations.remove(obj2bytes(key, true), flags());
+      RemoveOperation removeOperation = operationsFactory.newRemoveOperation(obj2bytes(key, true));
+      byte[] existingValue = (byte[]) removeOperation.execute();
       return (V) bytes2obj(existingValue);
    }
 
    @Override
    public void clear() {
       assertRemoteCacheManagerIsStarted();
-      operations.clear(flags());
+      ClearOperation op = operationsFactory.newClearOperation() ;
+      op.execute();
    }
 
    @Override
@@ -321,20 +311,29 @@
       }
    }
 
+   @Override
+   public String getName() {
+      return name;
+   }
 
    @Override
-   public RemoteCache withFlags(Flag... flags) {
-      this.flagsMap.set(flags);
+   public String getVersion() {
+      return Version.getProtocolVersion();
+   }
+
+   @Override
+   public RemoteCache<K, V> withFlags(Flag... flags) {
+      operationsFactory.setFlags(flags);
       return this;
    }
 
-   private Flag[] flags() {
-      Flag[] flags = this.flagsMap.get();
-      this.flagsMap.remove();
-      if (flags == null && forceReturnValue) {
-         return FORCE_RETURN_VALUE;
+
+   private byte[] obj2bytes(Object o, boolean isKey) {
+      try {
+         return marshaller.objectToByteBuffer(o, isKey ? estimateKeySize : estimateValueSize);
+      } catch (IOException ioe) {
+         throw new TransportException("Unable to marshall object of type [" + o.getClass().getName() + "]", ioe);
       }
-      return flags;
    }
 
    private Object bytes2obj(byte[] bytes) {

Copied: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations (from rev 2033, branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations)

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodConstants.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodConstants.java	2010-07-14 16:10:00 UTC (rev 2033)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodConstants.java	2010-07-14 16:50:13 UTC (rev 2034)
@@ -10,55 +10,57 @@
  */
 public interface HotRodConstants {
 
-   public static final short REQUEST_MAGIC = 0xA0;
-   public static final short RESPONSE_MAGIC = 0xA1;
+   static final short REQUEST_MAGIC = 0xA0;
+   static final short RESPONSE_MAGIC = 0xA1;
 
-   public static final byte HOTROD_VERSION = 10;
+   static final byte HOTROD_VERSION = 10;
 
    //requests
-   public static final byte PUT_REQUEST = 0x01;
-   public static final byte GET_REQUEST = 0x03;
-   public static final byte PUT_IF_ABSENT_REQUEST = 0x05;
-   public static final byte REPLACE_REQUEST = 0x07;
-   public static final byte REPLACE_IF_UNMODIFIED_REQUEST = 0x09;
-   public static final byte REMOVE_REQUEST = 0x0B;
-   public static final byte REMOVE_IF_UNMODIFIED_REQUEST = 0x0D;
-   public static final byte CONTAINS_KEY_REQUEST = 0x0F;
-   public static final byte GET_WITH_VERSION = 0x11;
-   public static final byte CLEAR_REQUEST = 0x13;
-   public static final byte STATS_REQUEST = 0x15;
-   public static final byte PING_REQUEST = 0x17;
+   static final byte PUT_REQUEST = 0x01;
+   static final byte GET_REQUEST = 0x03;
+   static final byte PUT_IF_ABSENT_REQUEST = 0x05;
+   static final byte REPLACE_REQUEST = 0x07;
+   static final byte REPLACE_IF_UNMODIFIED_REQUEST = 0x09;
+   static final byte REMOVE_REQUEST = 0x0B;
+   static final byte REMOVE_IF_UNMODIFIED_REQUEST = 0x0D;
+   static final byte CONTAINS_KEY_REQUEST = 0x0F;
+   static final byte GET_WITH_VERSION = 0x11;
+   static final byte CLEAR_REQUEST = 0x13;
+   static final byte STATS_REQUEST = 0x15;
+   static final byte PING_REQUEST = 0x17;
 
 
    //responses
-   public static final byte PUT_RESPONSE = 0x02;
-   public static final byte GET_RESPONSE = 0x04;
-   public static final byte PUT_IF_ABSENT_RESPONSE = 0x06;
-   public static final byte REPLACE_RESPONSE = 0x08;
-   public static final byte REPLACE_IF_UNMODIFIED_RESPONSE = 0x0A;
-   public static final byte REMOVE_RESPONSE = 0x0C;
-   public static final byte REMOVE_IF_UNMODIFIED_RESPONSE = 0x0E;
-   public static final byte CONTAINS_KEY_RESPONSE = 0x10;
-   public static final byte GET_WITH_VERSION_RESPONSE = 0x12;
-   public static final byte CLEAR_RESPONSE = 0x14;
-   public static final byte STATS_RESPONSE = 0x16;
-   public static final byte PING_RESPONSE = 0x18;
-   public static final byte ERROR_RESPONSE = 0x50;
+   static final byte PUT_RESPONSE = 0x02;
+   static final byte GET_RESPONSE = 0x04;
+   static final byte PUT_IF_ABSENT_RESPONSE = 0x06;
+   static final byte REPLACE_RESPONSE = 0x08;
+   static final byte REPLACE_IF_UNMODIFIED_RESPONSE = 0x0A;
+   static final byte REMOVE_RESPONSE = 0x0C;
+   static final byte REMOVE_IF_UNMODIFIED_RESPONSE = 0x0E;
+   static final byte CONTAINS_KEY_RESPONSE = 0x10;
+   static final byte GET_WITH_VERSION_RESPONSE = 0x12;
+   static final byte CLEAR_RESPONSE = 0x14;
+   static final byte STATS_RESPONSE = 0x16;
+   static final byte PING_RESPONSE = 0x18;
+   static final byte ERROR_RESPONSE = 0x50;
 
    //response status
-   public static final byte NO_ERROR_STATUS = 0x00;
-   public static final int INVALID_MAGIC_OR_MESSAGE_ID_STATUS = 0x81;
-   public static final int REQUEST_PARSING_ERROR_STATUS = 0x84;
-   public static final byte NOT_PUT_REMOVED_REPLACED_STATUS = 0x01;
-   public static final int UNKNOWN_COMMAND_STATUS = 0x82;
-   public static final int SERVER_ERROR_STATUS = 0x85;
-   public static final int KEY_DOES_NOT_EXIST_STATUS = 0x02;
-   public static final int UNKNOWN_VERSION_STATUS = 0x83;
-   public static final int COMMAND_TIMEOUT_STATUS = 0x86;
+   static final byte NO_ERROR_STATUS = 0x00;
+   static final int INVALID_MAGIC_OR_MESSAGE_ID_STATUS = 0x81;
+   static final int REQUEST_PARSING_ERROR_STATUS = 0x84;
+   static final byte NOT_PUT_REMOVED_REPLACED_STATUS = 0x01;
+   static final int UNKNOWN_COMMAND_STATUS = 0x82;
+   static final int SERVER_ERROR_STATUS = 0x85;
+   static final int KEY_DOES_NOT_EXIST_STATUS = 0x02;
+   static final int UNKNOWN_VERSION_STATUS = 0x83;
+   static final int COMMAND_TIMEOUT_STATUS = 0x86;
 
 
-   public static final byte CLIENT_INTELLIGENCE_BASIC = 0x01;
-   public static final byte CLIENT_INTELLIGENCE_TOPOLOGY_AWARE = 0x02;
-   public static final byte CLIENT_INTELLIGENCE_HASH_DISTRIBUTION_AWARE = 0x03;
-   Charset STRING_CHARSET = Charset.forName("UTF-8");
+   static final byte CLIENT_INTELLIGENCE_BASIC = 0x01;
+   static final byte CLIENT_INTELLIGENCE_TOPOLOGY_AWARE = 0x02;
+   static final byte CLIENT_INTELLIGENCE_HASH_DISTRIBUTION_AWARE = 0x03;
+   Charset HOTROD_STRING_CHARSET = Charset.forName("UTF-8");
+   
+   static final byte[] DEFAULT_CACHE_NAME_BYTES = new byte[]{};
 }

Deleted: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodOperationsHelper.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodOperationsHelper.java	2010-07-14 16:10:00 UTC (rev 2033)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodOperationsHelper.java	2010-07-14 16:50:13 UTC (rev 2034)
@@ -1,183 +0,0 @@
-package org.infinispan.client.hotrod.impl.protocol;
-
-import org.infinispan.client.hotrod.Flag;
-import org.infinispan.client.hotrod.exceptions.HotRodClientException;
-import org.infinispan.client.hotrod.exceptions.HotRodTimeoutException;
-import org.infinispan.client.hotrod.exceptions.InvalidResponseException;
-import org.infinispan.client.hotrod.impl.transport.Transport;
-import org.infinispan.util.logging.Log;
-import org.infinispan.util.logging.LogFactory;
-
-import java.net.InetSocketAddress;
-import java.util.LinkedHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * Helper class for factorizing common parts of read/write operations.
- *
- * @author Mircea.Markus at jboss.com
- * @since 4.1
- */
-public class HotRodOperationsHelper {
-   static Log log = LogFactory.getLog(HotRodOperationsHelper.class);
-   static final AtomicLong MSG_ID = new AtomicLong();
-   final static byte CLIENT_INTELLIGENCE = HotRodConstants.CLIENT_INTELLIGENCE_HASH_DISTRIBUTION_AWARE;
-
-   public static final byte[] DEFAULT_CACHE_NAME_BYTES = new byte[]{};
-
-   public static long writeHeader(Transport transport, short operationCode, byte[] cacheName, AtomicInteger topologyId, Flag... flags) {
-      transport.writeByte(HotRodConstants.REQUEST_MAGIC);
-      long messageId = MSG_ID.incrementAndGet();
-      transport.writeVLong(messageId);
-      transport.writeByte(HotRodConstants.HOTROD_VERSION);
-      transport.writeByte(operationCode);
-      transport.writeArray(cacheName);
-
-      int flagInt = 0;
-      if (flags != null) {
-         for (Flag flag : flags) {
-            flagInt = flag.getFlagInt() | flagInt;
-         }
-      }
-      transport.writeVInt(flagInt);
-      transport.writeByte(CLIENT_INTELLIGENCE);
-      transport.writeVInt(topologyId.get());
-      if (log.isTraceEnabled()) {
-         log.trace("wrote header for message " + messageId + ". Operation code: " + operationCode + ". Flags: " + Integer.toHexString(flagInt));
-      }
-      return messageId;
-   }
-
-   /**
-    * Magic	| Message Id | Op code | Status | Topology Change Marker
-    */
-   public static short readHeaderAndValidate(Transport transport, long messageId, short opRespCode, AtomicInteger topologyId) {
-      short magic = transport.readByte();
-      if (magic != HotRodConstants.RESPONSE_MAGIC) {
-         String message = "Invalid magic number. Expected " + Integer.toHexString(HotRodConstants.RESPONSE_MAGIC) + " and received " + Integer.toHexString(magic);
-         log.error(message);
-         throw new InvalidResponseException(message);
-      }
-      long receivedMessageId = transport.readVLong();
-      if (receivedMessageId != messageId) {
-         String message = "Invalid message id. Expected " + Long.toHexString(messageId) + " and received " + Long.toHexString(receivedMessageId);
-         log.error(message);
-         throw new InvalidResponseException(message);
-      }
-      if (log.isTraceEnabled()) {
-         log.trace("Received response for message id: " + receivedMessageId);
-      }
-      short receivedOpCode = transport.readByte();
-      if (receivedOpCode != opRespCode) {
-         if (receivedOpCode == HotRodConstants.ERROR_RESPONSE) {
-            checkForErrorsInResponseStatus(transport.readByte(), messageId, transport);
-            throw new IllegalStateException("Error expected! (i.e. exception in the prev statement)");
-         }
-         throw new InvalidResponseException("Invalid response operation. Expected " + Integer.toHexString(opRespCode) + " and received " + Integer.toHexString(receivedOpCode));
-      }
-      if (log.isTraceEnabled()) {
-         log.trace("Received operation code is: " + receivedOpCode);
-      }
-      short status = transport.readByte();
-      checkForErrorsInResponseStatus(status, messageId, transport);
-      short topologyChangeByte = transport.readByte();
-      if (topologyChangeByte == 1) {
-         readNewTopologyAndHash(transport, topologyId);
-      }
-      return status;
-   }
-
-   static void readNewTopologyAndHash(Transport transport, AtomicInteger topologyId) {
-      int newTopologyId = transport.readVInt();
-      topologyId.set(newTopologyId);
-      int numKeyOwners = transport.readUnsignedShort();
-      short hashFunctionVersion = transport.readByte();
-      int hashSpace = transport.readVInt();
-      int clusterSize = transport.readVInt();
-
-      if (log.isTraceEnabled()) {
-         log.trace("Topology change request: newTopologyId=" + newTopologyId + ", numKeyOwners=" + numKeyOwners +
-               ", hashFunctionVersion=" + hashFunctionVersion + ", hashSpaceSize=" + hashSpace + ", clusterSize=" + clusterSize);
-      }
-
-      LinkedHashMap<InetSocketAddress, Integer> servers2HashCode = new LinkedHashMap<InetSocketAddress, Integer>();
-
-      for (int i = 0; i < clusterSize; i++) {
-         String host = transport.readString();
-         int port = transport.readUnsignedShort();
-         if (log.isTraceEnabled()) {
-            log.trace("Server read:" + host + ":" + port);
-         }
-         int hashCode = transport.read4ByteInt();
-         servers2HashCode.put(new InetSocketAddress(host, port), hashCode);
-         if (log.isTraceEnabled()) {
-            log.trace("Hash code is: " + hashCode);
-         }
-      }
-      if (log.isInfoEnabled()) {
-         log.info("New topology: " + servers2HashCode);
-      }
-      transport.getTransportFactory().updateServers(servers2HashCode.keySet());
-      if (hashFunctionVersion == 0) {
-         if (log.isTraceEnabled())
-            log.trace("Not using a consistent hash function (hash function version == 0).");
-      } else {
-         transport.getTransportFactory().updateHashFunction(servers2HashCode, numKeyOwners, hashFunctionVersion, hashSpace);
-      }
-   }
-
-   static void checkForErrorsInResponseStatus(short status, long messageId, Transport transport) {
-      if (log.isTraceEnabled()) {
-         log.trace("Received operation status: " + status);
-      }
-      switch ((int) status) {
-         case HotRodConstants.INVALID_MAGIC_OR_MESSAGE_ID_STATUS:
-         case HotRodConstants.REQUEST_PARSING_ERROR_STATUS:
-         case HotRodConstants.UNKNOWN_COMMAND_STATUS:
-         case HotRodConstants.SERVER_ERROR_STATUS:
-         case HotRodConstants.UNKNOWN_VERSION_STATUS: {
-            String msgFromServer = transport.readString();
-            if (log.isWarnEnabled()) {
-               log.warn("Error status received from the server:" + msgFromServer + " for message id " + messageId);
-            }
-            throw new HotRodClientException(msgFromServer, messageId, status);
-         }
-         case HotRodConstants.COMMAND_TIMEOUT_STATUS: {
-            if (log.isTraceEnabled()) {
-               log.trace("timeout message received from the server");
-            }
-            throw new HotRodTimeoutException();
-         }
-         case HotRodConstants.NO_ERROR_STATUS:
-         case HotRodConstants.KEY_DOES_NOT_EXIST_STATUS:
-         case HotRodConstants.NOT_PUT_REMOVED_REPLACED_STATUS: {
-            //don't do anything, these are correct responses
-            break;
-         }
-         default: {
-            throw new IllegalStateException("Unknown status: " + Integer.toHexString(status));
-         }
-      }
-   }
-
-   public static boolean ping(Transport transport, AtomicInteger topologyId) {
-      try {
-         long messageId = HotRodOperationsHelper.writeHeader(transport, HotRodConstants.PING_REQUEST, DEFAULT_CACHE_NAME_BYTES, topologyId);
-         short respStatus = HotRodOperationsHelper.readHeaderAndValidate(transport, messageId, HotRodConstants.PING_RESPONSE, topologyId);
-         if (respStatus == HotRodConstants.NO_ERROR_STATUS) {
-            if (log.isTraceEnabled())
-               log.trace("Successfully validated transport: " + transport);
-            return true;
-         } else {
-            if (log.isTraceEnabled())
-               log.trace("Unknown response status: " + respStatus);
-            return false;
-         }
-      } catch (Exception e) {
-         if (log.isTraceEnabled())
-            log.trace("Failed to validate transport: " + transport, e);
-         return false;
-      }
-   }
-}

Deleted: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodOperationsImpl.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodOperationsImpl.java	2010-07-14 16:10:00 UTC (rev 2033)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodOperationsImpl.java	2010-07-14 16:50:13 UTC (rev 2034)
@@ -1,421 +0,0 @@
-package org.infinispan.client.hotrod.impl.protocol;
-
-import org.infinispan.client.hotrod.Flag;
-import org.infinispan.client.hotrod.exceptions.InvalidResponseException;
-import org.infinispan.client.hotrod.exceptions.TransportException;
-import org.infinispan.client.hotrod.impl.BinaryVersionedValue;
-import org.infinispan.client.hotrod.impl.VersionedOperationResponse;
-import org.infinispan.client.hotrod.impl.transport.Transport;
-import org.infinispan.client.hotrod.impl.transport.TransportFactory;
-import org.infinispan.util.Util;
-import org.infinispan.util.logging.Log;
-import org.infinispan.util.logging.LogFactory;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * @author Mircea.Markus at jboss.com
- * @since 4.1
- */
-public class HotRodOperationsImpl implements HotRodOperations, HotRodConstants {
-
-   private static Log log = LogFactory.getLog(HotRodOperationsImpl.class);
-
-   private final byte[] cacheName;
-   private TransportFactory transportFactory;
-   private final AtomicInteger topologyId;
-
-   public HotRodOperationsImpl(String cacheName, TransportFactory transportFactory, AtomicInteger topologyId) {
-      this.cacheName = cacheName.getBytes(STRING_CHARSET);
-      this.transportFactory = transportFactory;
-      this.topologyId = topologyId;
-   }
-
-   public byte[] get(byte[] key, Flag[] flags) {
-      Transport transport = getTransport(key, true);
-      int retryCount = 0;
-      do {
-         try {
-            try {
-               short status = sendKeyOperation(key, transport, GET_REQUEST, flags, GET_RESPONSE);
-               if (status == KEY_DOES_NOT_EXIST_STATUS) {
-                  return null;
-               }
-               if (status == NO_ERROR_STATUS) {
-                  return transport.readArray();
-               }
-            } finally {
-               releaseTransport(transport);
-            }
-         } catch (TransportException te) {
-            logErrorAndThrowExceptionIfNeeded(retryCount, te);
-         }
-         if (shouldRetry(retryCount)) {
-            transport = getTransport(key, false);
-         }
-         retryCount++;
-      } while (shouldRetry(retryCount));
-      throw new IllegalStateException("We should not reach here!");
-   }
-
-   public byte[] remove(byte[] key, Flag[] flags) {
-      Transport transport = getTransport(key, true);
-      int retryCount = 0;
-      do {
-         try {
-            short status = sendKeyOperation(key, transport, REMOVE_REQUEST, flags, REMOVE_RESPONSE);
-            if (status == KEY_DOES_NOT_EXIST_STATUS) {
-               return null;
-            } else if (status == NO_ERROR_STATUS) {
-               return returnPossiblePrevValue(transport, flags);
-            }
-         } catch (TransportException te) {
-            logErrorAndThrowExceptionIfNeeded(retryCount, te);
-         } finally {
-            releaseTransport(transport);
-         }
-         if (shouldRetry(retryCount)) {
-            transport = getTransport(key, false);
-         }
-         retryCount++;
-      } while (shouldRetry(retryCount));
-      throw new IllegalStateException("We should not reach here!");
-   }
-
-   public boolean containsKey(byte[] key, Flag... flags) {
-      Transport transport = getTransport(key, true);
-      int retryCount = 0;
-      do {
-         try {
-            short status = sendKeyOperation(key, transport, CONTAINS_KEY_REQUEST, flags, CONTAINS_KEY_RESPONSE);
-            if (status == KEY_DOES_NOT_EXIST_STATUS) {
-               return false;
-            } else if (status == NO_ERROR_STATUS) {
-               return true;
-            }
-         } catch (TransportException te) {
-            logErrorAndThrowExceptionIfNeeded(retryCount, te);
-         }
-         finally {
-            releaseTransport(transport);
-         }
-         if (shouldRetry(retryCount)) {
-            transport = getTransport(key, false);
-         }
-         retryCount++;
-
-      } while (shouldRetry(retryCount));
-      throw new IllegalStateException("We should not reach here!");
-   }
-
-   public BinaryVersionedValue getWithVersion(byte[] key, Flag... flags) {
-      Transport transport = getTransport(key, true);
-      int retryCount = 0;
-      do {
-         try {
-            short status = sendKeyOperation(key, transport, GET_WITH_VERSION, flags, GET_WITH_VERSION_RESPONSE);
-            if (status == KEY_DOES_NOT_EXIST_STATUS) {
-               return null;
-            }
-            if (status == NO_ERROR_STATUS) {
-               long version = transport.readLong();
-               if (log.isTraceEnabled()) {
-                  log.trace("Received version: " + version);
-               }
-               byte[] value = transport.readArray();
-               return new BinaryVersionedValue(version, value);
-            }
-         } catch (TransportException te) {
-            logErrorAndThrowExceptionIfNeeded(retryCount, te);
-         } finally {
-            releaseTransport(transport);
-         }
-         if (shouldRetry(retryCount)) {
-            transport = getTransport(key, false);
-         }
-         retryCount++;
-      } while (shouldRetry(retryCount));
-      throw new IllegalStateException("We should not reach here!");
-   }
-
-
-   public byte[] put(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags) {
-      Transport transport = getTransport(key, true);
-      int retryCount = 0;
-      do {
-         try {
-            short status = sendPutOperation(key, value, transport, PUT_REQUEST, PUT_RESPONSE, lifespan, maxIdle, flags);
-            if (status != NO_ERROR_STATUS) {
-               throw new InvalidResponseException("Unexpected response status: " + Integer.toHexString(status));
-            }
-            return returnPossiblePrevValue(transport, flags);
-         } catch (TransportException te) {
-            logErrorAndThrowExceptionIfNeeded(retryCount, te);
-         } finally {
-            releaseTransport(transport);
-         }
-         if (shouldRetry(retryCount)) {
-            transport = getTransport(key, false);
-         }
-         retryCount++;
-      } while (shouldRetry(retryCount));
-      throw new IllegalStateException("We should not reach here!");
-   }
-
-   public byte[] putIfAbsent(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags) {
-      Transport transport = getTransport(key, true);
-      int retryCount = 0;
-      do {
-         try {
-            short status = sendPutOperation(key, value, transport, PUT_IF_ABSENT_REQUEST, PUT_IF_ABSENT_RESPONSE, lifespan, maxIdle, flags);
-            if (status == NO_ERROR_STATUS || status == NOT_PUT_REMOVED_REPLACED_STATUS) {
-               byte[] bytes = returnPossiblePrevValue(transport, flags);
-               if (log.isTraceEnabled()) {
-                  log.trace("Returning from putIfAbsent: " + Util.printArray(bytes, false));
-               }
-               return bytes;
-            }
-         } catch (TransportException te) {
-            logErrorAndThrowExceptionIfNeeded(retryCount, te);
-         }
-         finally {
-            releaseTransport(transport);
-         }
-         if (shouldRetry(retryCount)) {
-            transport = getTransport(key, false);
-         }
-         retryCount++;
-      } while (shouldRetry(retryCount));
-      throw new IllegalStateException("We should not reach here!");
-   }
-
-   public byte[] replace(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags) {
-      Transport transport = getTransport(key, true);
-      int retryCount = 0;
-      do {
-         try {
-            short status = sendPutOperation(key, value, transport, REPLACE_REQUEST, REPLACE_RESPONSE, lifespan, maxIdle, flags);
-            if (status == NO_ERROR_STATUS || status == NOT_PUT_REMOVED_REPLACED_STATUS) {
-               return returnPossiblePrevValue(transport, flags);
-            }
-         } catch (TransportException te) {
-            logErrorAndThrowExceptionIfNeeded(retryCount, te);
-         } finally {
-            releaseTransport(transport);
-         }
-         if (shouldRetry(retryCount)) {
-            transport = getTransport(key, false);
-         }
-         retryCount++;
-      } while (shouldRetry(retryCount));
-      throw new IllegalStateException(" should not reach here!");
-   }
-
-   /**
-    * request : [header][key length][key][lifespan][max idle][entry_version][value length][value] response: If
-    * ForceReturnPreviousValue has been passed, this responses will contain previous [value length][value] for that key.
-    * If the key does not exist or previous was null, value length would be 0. Otherwise, if no ForceReturnPreviousValue
-    * was sent, the response would be empty.
-    */
-   public VersionedOperationResponse replaceIfUnmodified(byte[] key, byte[] value, int lifespan, int maxIdle, long version, Flag... flags) {
-      Transport transport = getTransport(key, true);
-      int retryCount = 0;
-      do {
-         try {
-            // 1) write header
-            long messageId = HotRodOperationsHelper.writeHeader(transport, REPLACE_IF_UNMODIFIED_REQUEST, cacheName, topologyId, flags);
-
-            //2) write message body
-            transport.writeArray(key);
-            transport.writeVInt(lifespan);
-            transport.writeVInt(maxIdle);
-            transport.writeLong(version);
-            transport.writeArray(value);
-            return returnVersionedOperationResponse(transport, messageId, REPLACE_IF_UNMODIFIED_RESPONSE, flags);
-         } catch (TransportException te) {
-            logErrorAndThrowExceptionIfNeeded(retryCount, te);
-         }
-         finally {
-            releaseTransport(transport);
-         }
-         if (shouldRetry(retryCount)) {
-            transport = getTransport(key, false);
-         }
-         retryCount++;
-      } while (shouldRetry(retryCount));
-      throw new IllegalStateException(" should not reach here!");
-   }
-
-   /**
-    * Request: [header][key length][key][entry_version]
-    */
-   public VersionedOperationResponse removeIfUnmodified(byte[] key, long version, Flag... flags) {
-      Transport transport = getTransport(key, true);
-      int retryCount = 0;
-      do {
-         try {
-            // 1) write header
-            long messageId = HotRodOperationsHelper.writeHeader(transport, REMOVE_IF_UNMODIFIED_REQUEST, cacheName, topologyId, flags);
-
-            //2) write message body
-            transport.writeArray(key);
-            transport.writeLong(version);
-
-            //process response and return
-            return returnVersionedOperationResponse(transport, messageId, REMOVE_IF_UNMODIFIED_RESPONSE, flags);
-
-         } catch (TransportException te) {
-            logErrorAndThrowExceptionIfNeeded(retryCount, te);
-         }
-         finally {
-            releaseTransport(transport);
-         }
-         if (shouldRetry(retryCount)) {
-            transport = getTransport(key, false);
-         }
-         retryCount++;
-      } while (shouldRetry(retryCount));
-      throw new IllegalStateException("Should not reach this point!");
-   }
-
-   public void clear(Flag... flags) {
-      Transport transport = transportFactory.getTransport();
-      int retryCount = 0;
-      do {
-         try {
-            // 1) write header
-            long messageId = HotRodOperationsHelper.writeHeader(transport, CLEAR_REQUEST, cacheName, topologyId, flags);
-            HotRodOperationsHelper.readHeaderAndValidate(transport, messageId, CLEAR_RESPONSE, topologyId);
-         } catch (TransportException te) {
-            logErrorAndThrowExceptionIfNeeded(retryCount, te);
-         } finally {
-            releaseTransport(transport);
-         }
-         if (shouldRetry(retryCount)) {
-            transport = transportFactory.getTransport();
-         }
-         retryCount++;
-
-      } while (shouldRetry(retryCount));
-   }
-
-   public Map<String, String> stats() {
-      Transport transport = transportFactory.getTransport();
-      try {
-         // 1) write header
-         long messageId = HotRodOperationsHelper.writeHeader(transport, STATS_REQUEST, cacheName, topologyId);
-         HotRodOperationsHelper.readHeaderAndValidate(transport, messageId, STATS_RESPONSE, topologyId);
-         int nrOfStats = transport.readVInt();
-
-         Map<String, String> result = new HashMap<String, String>();
-         for (int i = 0; i < nrOfStats; i++) {
-            String statName = transport.readString();
-            String statValue = transport.readString();
-            result.put(statName, statValue);
-         }
-         return result;
-      } finally {
-         releaseTransport(transport);
-      }
-   }
-
-   //[header][key length][key][lifespan][max idle][value length][value]
-
-   private short sendPutOperation(byte[] key, byte[] value, Transport transport, short opCode, byte opRespCode, int lifespan, int maxIdle, Flag[] flags) {
-      // 1) write header
-      long messageId = HotRodOperationsHelper.writeHeader(transport, opCode, cacheName, topologyId, flags);
-
-      // 2) write key and value
-      transport.writeArray(key);
-      transport.writeVInt(lifespan);
-      transport.writeVInt(maxIdle);
-      transport.writeArray(value);
-      transport.flush();
-
-      // 3) now read header
-
-      //return status (not error status for sure)
-      return HotRodOperationsHelper.readHeaderAndValidate(transport, messageId, opRespCode, topologyId);
-   }
-
-   /*
-    * Magic	| MessageId	| Version | Opcode | CacheNameLength | CacheName | Flags | Client Intelligence | Topology Id
-    */
-
-   private boolean hasForceReturn(Flag[] flags) {
-      if (flags == null) return false;
-      for (Flag flag : flags) {
-         if (flag == Flag.FORCE_RETURN_VALUE) return true;
-      }
-      return false;
-   }
-
-   private short sendKeyOperation(byte[] key, Transport transport, byte opCode, Flag[] flags, byte opRespCode) {
-      // 1) write [header][key length][key]
-      long messageId = HotRodOperationsHelper.writeHeader(transport, opCode, cacheName, topologyId, flags);
-      transport.writeArray(key);
-      transport.flush();
-
-      // 2) now read the header
-      return HotRodOperationsHelper.readHeaderAndValidate(transport, messageId, opRespCode, topologyId);
-   }
-
-   private byte[] returnPossiblePrevValue(Transport transport, Flag[] flags) {
-      if (hasForceReturn(flags)) {
-         byte[] bytes = transport.readArray();
-         if (log.isTraceEnabled()) log.trace("Previous value bytes is: " + Util.printArray(bytes, false));
-         //0-length response means null
-         return bytes.length == 0 ? null : bytes;
-      } else {
-         return null;
-      }
-   }
-
-   private void releaseTransport(Transport transport) {
-      if (transport != null)
-         transportFactory.releaseTransport(transport);
-   }
-
-   private VersionedOperationResponse returnVersionedOperationResponse(Transport transport, long messageId, byte response, Flag[] flags) {
-      //3) ...
-      short respStatus = HotRodOperationsHelper.readHeaderAndValidate(transport, messageId, response, topologyId);
-
-      //4 ...
-      VersionedOperationResponse.RspCode code;
-      if (respStatus == NO_ERROR_STATUS) {
-         code = VersionedOperationResponse.RspCode.SUCCESS;
-      } else if (respStatus == NOT_PUT_REMOVED_REPLACED_STATUS) {
-         code = VersionedOperationResponse.RspCode.MODIFIED_KEY;
-      } else if (respStatus == KEY_DOES_NOT_EXIST_STATUS) {
-         code = VersionedOperationResponse.RspCode.NO_SUCH_KEY;
-      } else {
-         throw new IllegalStateException("Unknown response status: " + Integer.toHexString(respStatus));
-      }
-      byte[] prevValue = returnPossiblePrevValue(transport, flags);
-      return new VersionedOperationResponse(prevValue, code);
-   }
-
-   private void logErrorAndThrowExceptionIfNeeded(int i, TransportException te) {
-      String message = "Transport exception. Retry " + i + " out of " + transportFactory.getTransportCount();
-      if (i == transportFactory.getTransportCount() - 1 || transportFactory.getTransportCount() < 0) {
-         log.warn(message, te);
-         throw te;
-      } else {
-         log.trace(message + ":" + te);
-      }
-   }
-
-   private Transport getTransport(byte[] key, boolean hashAware) {
-      if (hashAware) {
-         return transportFactory.getTransport(key);
-      } else {
-         return transportFactory.getTransport();
-      }
-   }
-   
-   private boolean shouldRetry(int retryCount) {
-      return retryCount < transportFactory.getTransportCount();
-   }
-}

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/AbstractTransport.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/AbstractTransport.java	2010-07-14 16:10:00 UTC (rev 2033)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/AbstractTransport.java	2010-07-14 16:50:13 UTC (rev 2034)
@@ -28,7 +28,7 @@
    @Override
    public String readString() {
       byte[] strContent = readArray();
-      String readString = new String(strContent, HotRodConstants.STRING_CHARSET);
+      String readString = new String(strContent, HotRodConstants.HOTROD_STRING_CHARSET);
       if (log.isTraceEnabled()) {
          log.trace("Read string is: " + readString);
       }
@@ -80,7 +80,7 @@
    @Override
    public void writeString(String string) {
       if (!string.isEmpty()) {
-         writeArray(string.getBytes(HotRodConstants.STRING_CHARSET));
+         writeArray(string.getBytes(HotRodConstants.HOTROD_STRING_CHARSET));
       } else {
          writeVInt(0);
       }

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TransportObjectFactory.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TransportObjectFactory.java	2010-07-14 16:10:00 UTC (rev 2033)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TransportObjectFactory.java	2010-07-14 16:50:13 UTC (rev 2034)
@@ -1,7 +1,7 @@
 package org.infinispan.client.hotrod.impl.transport.tcp;
 
 import org.apache.commons.pool.BaseKeyedPoolableObjectFactory;
-import org.infinispan.client.hotrod.impl.protocol.HotRodOperationsHelper;
+import org.infinispan.client.hotrod.impl.operations.PingOperation;
 import org.infinispan.util.logging.Log;
 import org.infinispan.util.logging.LogFactory;
 
@@ -37,7 +37,7 @@
          log.trace("Executing first ping!");
          firstPingExecuted = true;
          try {
-            HotRodOperationsHelper.ping(tcpTransport, topologyId);
+            ping(tcpTransport, topologyId);
          } catch (Exception e) {
             log.trace("Ignoring ping request failure during ping on startup: " + e.getMessage());
          }
@@ -45,6 +45,11 @@
       return tcpTransport;
    }
 
+   private boolean ping(TcpTransport tcpTransport, AtomicInteger topologyId) {
+      PingOperation po = new PingOperation(null, topologyId, tcpTransport);
+      return (Boolean)po.execute();
+   }
+
    /**
     * This will be called by the test thread when testWhileIdle==true.
     */
@@ -54,7 +59,7 @@
       if (log.isTraceEnabled()) {
          log.trace("About to validate(ping) connection to server " + key + ". TcpTransport is " + transport);
       }
-      return HotRodOperationsHelper.ping(transport, topologyId);
+      return ping(transport, topologyId);
    }
 
    @Override

Modified: trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HeavyLoadConnectionPoolingTest.java
===================================================================
--- trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HeavyLoadConnectionPoolingTest.java	2010-07-14 16:10:00 UTC (rev 2033)
+++ trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HeavyLoadConnectionPoolingTest.java	2010-07-14 16:50:13 UTC (rev 2034)
@@ -76,7 +76,7 @@
       }
 
       for (WorkerThread wt: workers) {
-         wt.interrupt();
+         wt.stopWorker();
          wt.waitToFinish();
       }
       //now wait for the idle thread to wake up and clean them

Modified: trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HotRodIntegrationTest.java
===================================================================
--- trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HotRodIntegrationTest.java	2010-07-14 16:10:00 UTC (rev 2033)
+++ trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HotRodIntegrationTest.java	2010-07-14 16:50:13 UTC (rev 2034)
@@ -9,7 +9,7 @@
 import org.infinispan.server.hotrod.HotRodServer;
 import org.infinispan.test.SingleCacheManagerTest;
 import org.infinispan.test.fwk.TestCacheManagerFactory;
-import org.infinispan.util.ByteArrayKey;
+import org.infinispan.util.ByteArrayKey;            
 import org.infinispan.util.logging.Log;
 import org.infinispan.util.logging.LogFactory;
 import org.testng.annotations.AfterClass;

Deleted: trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/NettyHotRodIntegrationTest.java
===================================================================
--- trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/NettyHotRodIntegrationTest.java	2010-07-14 16:10:00 UTC (rev 2033)
+++ trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/NettyHotRodIntegrationTest.java	2010-07-14 16:50:13 UTC (rev 2034)
@@ -1,24 +0,0 @@
-package org.infinispan.client.hotrod;
-
-import org.infinispan.client.hotrod.impl.transport.netty.NettyTransportFactory;
-import org.infinispan.config.Configuration;
-import org.infinispan.test.fwk.TestCacheManagerFactory;
-import org.testng.annotations.Test;
-
-import java.util.Properties;
-
-/**
- * @author Mircea.Markus at jboss.com
- * @since 4.1
- */
- at Test(testName = "hotrod.NettyHotRodIntegrationTest", groups = "functional")
-public class NettyHotRodIntegrationTest extends HotRodIntegrationTest {
-   
-   @Override
-   protected RemoteCacheManager getRemoteCacheManager() {
-      Properties props = new Properties();
-      props.put("transport-factory", NettyTransportFactory.class.getName());
-      props.put("hotrod-servers", "127.0.0.1:" + hotrodServer.getPort());
-      return new RemoteCacheManager(props, true);
-   }
-}

Modified: trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/RemoteCacheManagerTest.java
===================================================================
--- trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/RemoteCacheManagerTest.java	2010-07-14 16:10:00 UTC (rev 2033)
+++ trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/RemoteCacheManagerTest.java	2010-07-14 16:50:13 UTC (rev 2034)
@@ -31,8 +31,12 @@
 
    @AfterTest(alwaysRun = true)
    public void release() {
-      if (cacheManager != null) cacheManager.stop();
-      if (hotrodServer != null) hotrodServer.stop();
+      try {
+         if (cacheManager != null) cacheManager.stop();
+         if (hotrodServer != null) hotrodServer.stop();
+      } catch (Exception e) {
+         e.printStackTrace();
+      }
       if (prevValue != null) {
          System.setProperty(RemoteCacheManager.OVERRIDE_HOTROD_SERVERS, prevValue);
       } else {

Modified: trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ReplTopologyChangeTest.java
===================================================================
--- trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ReplTopologyChangeTest.java	2010-07-14 16:10:00 UTC (rev 2033)
+++ trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ReplTopologyChangeTest.java	2010-07-14 16:50:13 UTC (rev 2034)
@@ -41,11 +41,19 @@
    protected void clearContent() throws Throwable {
    }
 
-   @AfterClass
+   @AfterClass (alwaysRun = true)
    @Override
    protected void destroy() {
+      try {
       hotRodServer1.stop();
+      } catch (Exception e) {
+         e.printStackTrace();
+      }
+      try {
       hotRodServer2.stop();
+      } catch (Exception e) {
+         e.printStackTrace();
+      }
       super.destroy();
    }
 
@@ -112,6 +120,7 @@
    public void testDropServer() {
       hotRodServer3.stop();
       manager(2).stop();
+      log.trace("Just stopped server 2");
 
       waitForClusterToForm(2);
 
@@ -138,7 +147,7 @@
    protected void waitForClusterToForm(int memberCount) {
       TestingUtil.blockUntilViewReceived(manager(0).getCache(), memberCount, 30000);
       for (int i = 0; i < memberCount; i++) {
-         TestingUtil.blockUntilCacheStatusAchieved(manager(i).getCache(), ComponentStatus.RUNNING, 10000);
+         TestingUtil.blockUntilCacheStatusAchieved(manager(i).getCache(), ComponentStatus.RUNNING, 30000);
       }
    }
 }

Modified: trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/WorkerThread.java
===================================================================
--- trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/WorkerThread.java	2010-07-14 16:10:00 UTC (rev 2033)
+++ trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/WorkerThread.java	2010-07-14 16:50:13 UTC (rev 2034)
@@ -30,6 +30,7 @@
    volatile String key;
    volatile String value;
    volatile boolean finished = false;
+   volatile boolean stopWorker = false;
 
    public WorkerThread(RemoteCache remoteCache) {
       super("WorkerThread-" + WORKER_INDEX.getAndIncrement());
@@ -70,7 +71,7 @@
 
    private void stress_() {
       Random rnd = new Random();
-      while (!isInterrupted()) {
+      while (!stopWorker) {
          remoteCache.put(rnd.nextLong(), rnd.nextLong());
          try {
             Thread.sleep(50);
@@ -152,4 +153,8 @@
          }
       }
    }
+
+   public void stopWorker() {
+      this.stopWorker = true;
+   }
 }



More information about the infinispan-commits mailing list