[infinispan-commits] Infinispan SVN: r2034 - in trunk/client/hotrod-client: src/main/java/org/infinispan/client/hotrod and 5 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Wed Jul 14 12:50:14 EDT 2010
Author: mircea.markus
Date: 2010-07-14 12:50:13 -0400 (Wed, 14 Jul 2010)
New Revision: 2034
Added:
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/
Removed:
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodOperationsHelper.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodOperationsImpl.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/
trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/NettyHotRodIntegrationTest.java
Modified:
trunk/client/hotrod-client/pom.xml
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCache.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/RemoteCacheImpl.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodConstants.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/AbstractTransport.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TransportObjectFactory.java
trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HeavyLoadConnectionPoolingTest.java
trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HotRodIntegrationTest.java
trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/RemoteCacheManagerTest.java
trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ReplTopologyChangeTest.java
trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/WorkerThread.java
Log:
migrated 2033 to trunk
Modified: trunk/client/hotrod-client/pom.xml
===================================================================
--- trunk/client/hotrod-client/pom.xml 2010-07-14 16:10:00 UTC (rev 2033)
+++ trunk/client/hotrod-client/pom.xml 2010-07-14 16:50:13 UTC (rev 2034)
@@ -10,10 +10,6 @@
<relativePath>../../parent/pom.xml</relativePath>
</parent>
- <properties>
- <version.netty>3.2.1.Final</version.netty>
- </properties>
-
<artifactId>infinispan-client-hotrod</artifactId>
<name>Infinispan Client Hotrod Module</name>
<description>Infinispan client hotrod module</description>
@@ -50,13 +46,6 @@
</dependency>
<dependency>
- <groupId>org.jboss.netty</groupId>
- <artifactId>netty</artifactId>
- <version>${version.netty}</version>
- <scope>compile</scope>
- </dependency>
-
- <dependency>
<groupId>commons-pool</groupId>
<artifactId>commons-pool</artifactId>
<version>1.5.4</version>
Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCache.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCache.java 2010-07-14 16:10:00 UTC (rev 2033)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCache.java 2010-07-14 16:50:13 UTC (rev 2034)
@@ -56,7 +56,6 @@
*/
public interface RemoteCache<K, V> extends Cache<K, V> {
-
/**
* Removes the given entry only if its version matches the supplied version. A typical use case looks like this:
* <pre>
Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java 2010-07-14 16:10:00 UTC (rev 2033)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java 2010-07-14 16:50:13 UTC (rev 2034)
@@ -2,19 +2,15 @@
import org.infinispan.client.hotrod.exceptions.HotRodClientException;
import org.infinispan.client.hotrod.impl.async.DefaultAsyncExecutorFactory;
-import org.infinispan.client.hotrod.impl.protocol.HotRodOperations;
-import org.infinispan.client.hotrod.impl.protocol.HotRodOperationsImpl;
+import org.infinispan.client.hotrod.impl.operations.OperationsFactory;
import org.infinispan.client.hotrod.impl.RemoteCacheImpl;
import org.infinispan.client.hotrod.impl.transport.TransportFactory;
-
import org.infinispan.client.hotrod.impl.transport.tcp.TcpTransportFactory;
-import org.infinispan.config.ConfigurationException;
import org.infinispan.executors.ExecutorFactory;
import org.infinispan.manager.CacheContainer;
import org.infinispan.marshall.Marshaller;
import org.infinispan.marshall.jboss.GenericJBossMarshaller;
import org.infinispan.util.TypedProperties;
-import org.infinispan.util.Util;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
@@ -30,7 +26,6 @@
import java.util.StringTokenizer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
-
import static org.infinispan.util.Util.getInstance;
/**
@@ -144,8 +139,13 @@
public static final String OVERRIDE_HOTROD_SERVERS = "infinispan.hotrod.client.servers";
private static final String KEY_SIZE = "marshaller.default-array-size.key";
+
private static final String VALUE_SIZE = "marshaller.default-array-size.value";
+ private static final int DEFAULT_KEY_SIZE = 64;
+ private static final int DEFAULT_VALUE_SIZE = 512;
+
+
private TypedProperties props;
private TransportFactory transportFactory;
private Marshaller marshaller;
@@ -154,8 +154,6 @@
private ExecutorService asyncExecutorService;
private final Map<String, RemoteCacheImpl> cacheName2RemoteCache = new HashMap<String, RemoteCacheImpl>();
private AtomicInteger topologyId = new AtomicInteger();
- private static final int DEFAULT_KEY_SIZE = 64;
- private static final int DEFAULT_VALUE_SIZE = 512;
/**
@@ -175,10 +173,10 @@
}
/**
- * Same as {@link org.infinispan.client.hotrod.RemoteCacheManager#RemoteCacheManager(HotRodMarshaller, java.util.Properties, boolean)} with start = true.
+ * Same as {@link org.infinispan.client.hotrod.RemoteCacheManager#RemoteCacheManager(Marshaller, java.util.Properties, boolean)} with start = true.
*/
- public RemoteCacheManager(Marshaller marshaller, Properties props) {
- this(marshaller, props, false);
+ public RemoteCacheManager(Marshaller hotRodMarshaller, Properties props) {
+ this(hotRodMarshaller, props, false);
}
/**
@@ -325,8 +323,7 @@
if (props.contains("asyn-executor-factory")) {
asyncExecutorClass = props.getProperty("asyn-executor-factory");
}
- ExecutorFactory executorFactory = null;
- executorFactory = (ExecutorFactory) getInstance(asyncExecutorClass);
+ ExecutorFactory executorFactory = (ExecutorFactory) getInstance(asyncExecutorClass);
asyncExecutorService = executorFactory.getExecutor(props);
@@ -343,7 +340,9 @@
@Override
public void stop() {
- transportFactory.destroy();
+ if (isStarted()) {
+ transportFactory.destroy();
+ }
started = false;
}
@@ -363,10 +362,8 @@
private <K, V> RemoteCache<K, V> createRemoteCache(String cacheName, boolean forceReturnValue) {
synchronized (cacheName2RemoteCache) {
if (!cacheName2RemoteCache.containsKey(cacheName)) {
- RemoteCacheImpl<K, V> result = new RemoteCacheImpl<K, V>(this, cacheName, forceReturnValue);
- if (isStarted()) {
- startRemoteCache(result);
- }
+ RemoteCacheImpl<K, V> result = new RemoteCacheImpl<K, V>(this, cacheName);
+ startRemoteCache(result);
cacheName2RemoteCache.put(cacheName, result);
return result;
} else {
@@ -376,9 +373,8 @@
}
private <K, V> void startRemoteCache(RemoteCacheImpl<K, V> result) {
- HotRodOperations hotRodOperations = new HotRodOperationsImpl(result.getName(), transportFactory, topologyId);
- result.init(hotRodOperations, marshaller, asyncExecutorService,
- props.getIntProperty(KEY_SIZE, DEFAULT_KEY_SIZE),
+ OperationsFactory operationsFactory = new OperationsFactory(transportFactory, result.getName(), topologyId, forceReturnValueDefault);
+ result.init(marshaller, asyncExecutorService, operationsFactory, props.getIntProperty(KEY_SIZE, DEFAULT_KEY_SIZE),
props.getIntProperty(VALUE_SIZE, DEFAULT_VALUE_SIZE));
}
Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/RemoteCacheImpl.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/RemoteCacheImpl.java 2010-07-14 16:10:00 UTC (rev 2033)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/RemoteCacheImpl.java 2010-07-14 16:50:13 UTC (rev 2034)
@@ -9,7 +9,7 @@
import org.infinispan.client.hotrod.exceptions.RemoteCacheManagerNotStartedException;
import org.infinispan.client.hotrod.exceptions.TransportException;
import org.infinispan.client.hotrod.impl.async.NotifyingFutureImpl;
-import org.infinispan.client.hotrod.impl.protocol.HotRodOperations;
+import org.infinispan.client.hotrod.impl.operations.*;
import org.infinispan.marshall.Marshaller;
import org.infinispan.util.concurrent.NotifyingFuture;
import org.infinispan.util.logging.Log;
@@ -30,31 +30,27 @@
private static Log log = LogFactory.getLog(RemoteCacheImpl.class);
- private static final Flag[] FORCE_RETURN_VALUE = {Flag.FORCE_RETURN_VALUE};
-
- private ThreadLocal<Flag[]> flagsMap = new ThreadLocal<Flag[]>();
- private HotRodOperations operations;
private Marshaller marshaller;
private final String name;
private final RemoteCacheManager remoteCacheManager;
private volatile ExecutorService executorService;
- private volatile boolean forceReturnValue;
+ private OperationsFactory operationsFactory;
private int estimateKeySize;
private int estimateValueSize;
- public RemoteCacheImpl(RemoteCacheManager rcm, String name, boolean forceReturnValue) {
+
+ public RemoteCacheImpl(RemoteCacheManager rcm, String name) {
if (log.isTraceEnabled()) {
log.trace("Creating remote cache: " + name);
}
this.name = name;
- this.forceReturnValue = forceReturnValue;
this.remoteCacheManager = rcm;
}
- public void init(HotRodOperations operations, Marshaller marshaller, ExecutorService executorService, int estimateKeySize, int estimateValueSize) {
- this.operations = operations;
+ public void init(Marshaller marshaller, ExecutorService executorService, OperationsFactory operationsFactory, int estimateKeySize, int estimateValueSize) {
this.marshaller = marshaller;
this.executorService = executorService;
+ this.operationsFactory = operationsFactory;
this.estimateKeySize = estimateKeySize;
this.estimateValueSize = estimateValueSize;
}
@@ -63,18 +59,11 @@
return remoteCacheManager;
}
- private byte[] obj2bytes(Object o, boolean isKey) {
- try {
- return marshaller.objectToByteBuffer(o, isKey ? estimateKeySize : estimateValueSize);
- } catch (IOException ioe) {
- throw new TransportException("Unable to marshall object of type [" + o.getClass().getName() + "]", ioe);
- }
- }
-
@Override
public boolean removeWithVersion(K key, long version) {
assertRemoteCacheManagerIsStarted();
- VersionedOperationResponse response = operations.removeIfUnmodified(obj2bytes(key, true), version, flags());
+ RemoveIfUnmodifiedOperation op = operationsFactory.newRemoveIfUnmodifiedOperation(obj2bytes(key, true), version);
+ VersionedOperationResponse response = (VersionedOperationResponse) op.execute();
return response.getCode().isUpdated();
}
@@ -97,7 +86,8 @@
@Override
public boolean replaceWithVersion(K key, V newValue, long version, int lifespanSeconds, int maxIdleTimeSeconds) {
assertRemoteCacheManagerIsStarted();
- VersionedOperationResponse response = operations.replaceIfUnmodified(obj2bytes(key, true), obj2bytes(newValue, false), lifespanSeconds, maxIdleTimeSeconds, version, flags());
+ ReplaceIfUnmodifiedOperation op = operationsFactory.newReplaceIfUnmodifiedOperation(obj2bytes(key, true), obj2bytes(newValue, false), lifespanSeconds, maxIdleTimeSeconds, version);
+ VersionedOperationResponse response = (VersionedOperationResponse) op.execute();
return response.getCode().isUpdated();
}
@@ -120,7 +110,8 @@
@Override
public VersionedValue<V> getVersioned(K key) {
assertRemoteCacheManagerIsStarted();
- BinaryVersionedValue value = operations.getWithVersion(obj2bytes(key, true), flags());
+ GetWithVersionOperation op = operationsFactory.newGetWithVersionOperation(obj2bytes(key, true));
+ BinaryVersionedValue value = (BinaryVersionedValue) op.execute();
return binary2VersionedValue(value);
}
@@ -152,7 +143,8 @@
@Override
public ServerStatistics stats() {
assertRemoteCacheManagerIsStarted();
- Map<String, String> statsMap = operations.stats();
+ StatsOperation op = operationsFactory.newStatsOperation();
+ Map<String, String> statsMap = (Map<String, String>) op.execute();
ServerStatisticsImpl stats = new ServerStatisticsImpl();
for (Map.Entry<String, String> entry : statsMap.entrySet()) {
stats.addStats(entry.getKey(), entry.getValue());
@@ -161,16 +153,6 @@
}
@Override
- public String getName() {
- return name;
- }
-
- @Override
- public String getVersion() {
- return Version.getProtocolVersion();
- }
-
- @Override
public V put(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdleTime, TimeUnit maxIdleTimeUnit) {
assertRemoteCacheManagerIsStarted();
int lifespanSecs = toSeconds(lifespan, lifespanUnit);
@@ -178,7 +160,8 @@
if (log.isTraceEnabled()) {
log.trace("About to add (K,V): (" + key + ", " + value + ") lifespanSecs:" + lifespanSecs + ", maxIdleSecs:" + maxIdleSecs);
}
- byte[] result = operations.put(obj2bytes(key, true), obj2bytes(value, false), lifespanSecs, maxIdleSecs, flags());
+ PutOperation op = operationsFactory.newPutKeyValueOperation(obj2bytes(key, true), obj2bytes(value, false), lifespanSecs, maxIdleSecs);
+ byte[] result = (byte[]) op.execute();
return (V) bytes2obj(result);
}
@@ -188,7 +171,8 @@
assertRemoteCacheManagerIsStarted();
int lifespanSecs = toSeconds(lifespan, lifespanUnit);
int maxIdleSecs = toSeconds(maxIdleTime, maxIdleTimeUnit);
- byte[] bytes = operations.putIfAbsent(obj2bytes(key, true), obj2bytes(value, false), lifespanSecs, maxIdleSecs, flags());
+ PutIfAbsentOperation op = operationsFactory.newPutIfAbsentOperation(obj2bytes(key, true), obj2bytes(value, false), lifespanSecs, maxIdleSecs);
+ byte[] bytes = (byte[]) op.execute();
return (V) bytes2obj(bytes);
}
@@ -197,7 +181,8 @@
assertRemoteCacheManagerIsStarted();
int lifespanSecs = toSeconds(lifespan, lifespanUnit);
int maxIdleSecs = toSeconds(maxIdleTime, maxIdleTimeUnit);
- byte[] bytes = operations.replace(obj2bytes(key, true), obj2bytes(value, false), lifespanSecs, maxIdleSecs, flags());
+ ReplaceOperation op = operationsFactory.newReplaceOperation(obj2bytes(key, true), obj2bytes(value, false), lifespanSecs, maxIdleSecs);
+ byte[] bytes = (byte[]) op.execute();
return (V) bytes2obj(bytes);
}
@@ -284,27 +269,32 @@
@Override
public boolean containsKey(Object key) {
assertRemoteCacheManagerIsStarted();
- return operations.containsKey(obj2bytes(key, true), flags());
+ ContainsKeyOperation op = operationsFactory.newContainsKeyOperation(obj2bytes(key, true));
+ return (Boolean)op.execute();
}
@Override
public V get(Object key) {
assertRemoteCacheManagerIsStarted();
- byte[] bytes = operations.get(obj2bytes(key, true), flags());
+ byte[] keyBytes = obj2bytes(key, true);
+ GetOperation gco = operationsFactory.newGetKeyOperation(keyBytes);
+ byte[] bytes = (byte[]) gco.execute();
return (V) bytes2obj(bytes);
}
@Override
public V remove(Object key) {
assertRemoteCacheManagerIsStarted();
- byte[] existingValue = operations.remove(obj2bytes(key, true), flags());
+ RemoveOperation removeOperation = operationsFactory.newRemoveOperation(obj2bytes(key, true));
+ byte[] existingValue = (byte[]) removeOperation.execute();
return (V) bytes2obj(existingValue);
}
@Override
public void clear() {
assertRemoteCacheManagerIsStarted();
- operations.clear(flags());
+ ClearOperation op = operationsFactory.newClearOperation() ;
+ op.execute();
}
@Override
@@ -321,20 +311,29 @@
}
}
+ @Override
+ public String getName() {
+ return name;
+ }
@Override
- public RemoteCache withFlags(Flag... flags) {
- this.flagsMap.set(flags);
+ public String getVersion() {
+ return Version.getProtocolVersion();
+ }
+
+ @Override
+ public RemoteCache<K, V> withFlags(Flag... flags) {
+ operationsFactory.setFlags(flags);
return this;
}
- private Flag[] flags() {
- Flag[] flags = this.flagsMap.get();
- this.flagsMap.remove();
- if (flags == null && forceReturnValue) {
- return FORCE_RETURN_VALUE;
+
+ private byte[] obj2bytes(Object o, boolean isKey) {
+ try {
+ return marshaller.objectToByteBuffer(o, isKey ? estimateKeySize : estimateValueSize);
+ } catch (IOException ioe) {
+ throw new TransportException("Unable to marshall object of type [" + o.getClass().getName() + "]", ioe);
}
- return flags;
}
private Object bytes2obj(byte[] bytes) {
Copied: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations (from rev 2033, branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations)
Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodConstants.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodConstants.java 2010-07-14 16:10:00 UTC (rev 2033)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodConstants.java 2010-07-14 16:50:13 UTC (rev 2034)
@@ -10,55 +10,57 @@
*/
public interface HotRodConstants {
- public static final short REQUEST_MAGIC = 0xA0;
- public static final short RESPONSE_MAGIC = 0xA1;
+ static final short REQUEST_MAGIC = 0xA0;
+ static final short RESPONSE_MAGIC = 0xA1;
- public static final byte HOTROD_VERSION = 10;
+ static final byte HOTROD_VERSION = 10;
//requests
- public static final byte PUT_REQUEST = 0x01;
- public static final byte GET_REQUEST = 0x03;
- public static final byte PUT_IF_ABSENT_REQUEST = 0x05;
- public static final byte REPLACE_REQUEST = 0x07;
- public static final byte REPLACE_IF_UNMODIFIED_REQUEST = 0x09;
- public static final byte REMOVE_REQUEST = 0x0B;
- public static final byte REMOVE_IF_UNMODIFIED_REQUEST = 0x0D;
- public static final byte CONTAINS_KEY_REQUEST = 0x0F;
- public static final byte GET_WITH_VERSION = 0x11;
- public static final byte CLEAR_REQUEST = 0x13;
- public static final byte STATS_REQUEST = 0x15;
- public static final byte PING_REQUEST = 0x17;
+ static final byte PUT_REQUEST = 0x01;
+ static final byte GET_REQUEST = 0x03;
+ static final byte PUT_IF_ABSENT_REQUEST = 0x05;
+ static final byte REPLACE_REQUEST = 0x07;
+ static final byte REPLACE_IF_UNMODIFIED_REQUEST = 0x09;
+ static final byte REMOVE_REQUEST = 0x0B;
+ static final byte REMOVE_IF_UNMODIFIED_REQUEST = 0x0D;
+ static final byte CONTAINS_KEY_REQUEST = 0x0F;
+ static final byte GET_WITH_VERSION = 0x11;
+ static final byte CLEAR_REQUEST = 0x13;
+ static final byte STATS_REQUEST = 0x15;
+ static final byte PING_REQUEST = 0x17;
//responses
- public static final byte PUT_RESPONSE = 0x02;
- public static final byte GET_RESPONSE = 0x04;
- public static final byte PUT_IF_ABSENT_RESPONSE = 0x06;
- public static final byte REPLACE_RESPONSE = 0x08;
- public static final byte REPLACE_IF_UNMODIFIED_RESPONSE = 0x0A;
- public static final byte REMOVE_RESPONSE = 0x0C;
- public static final byte REMOVE_IF_UNMODIFIED_RESPONSE = 0x0E;
- public static final byte CONTAINS_KEY_RESPONSE = 0x10;
- public static final byte GET_WITH_VERSION_RESPONSE = 0x12;
- public static final byte CLEAR_RESPONSE = 0x14;
- public static final byte STATS_RESPONSE = 0x16;
- public static final byte PING_RESPONSE = 0x18;
- public static final byte ERROR_RESPONSE = 0x50;
+ static final byte PUT_RESPONSE = 0x02;
+ static final byte GET_RESPONSE = 0x04;
+ static final byte PUT_IF_ABSENT_RESPONSE = 0x06;
+ static final byte REPLACE_RESPONSE = 0x08;
+ static final byte REPLACE_IF_UNMODIFIED_RESPONSE = 0x0A;
+ static final byte REMOVE_RESPONSE = 0x0C;
+ static final byte REMOVE_IF_UNMODIFIED_RESPONSE = 0x0E;
+ static final byte CONTAINS_KEY_RESPONSE = 0x10;
+ static final byte GET_WITH_VERSION_RESPONSE = 0x12;
+ static final byte CLEAR_RESPONSE = 0x14;
+ static final byte STATS_RESPONSE = 0x16;
+ static final byte PING_RESPONSE = 0x18;
+ static final byte ERROR_RESPONSE = 0x50;
//response status
- public static final byte NO_ERROR_STATUS = 0x00;
- public static final int INVALID_MAGIC_OR_MESSAGE_ID_STATUS = 0x81;
- public static final int REQUEST_PARSING_ERROR_STATUS = 0x84;
- public static final byte NOT_PUT_REMOVED_REPLACED_STATUS = 0x01;
- public static final int UNKNOWN_COMMAND_STATUS = 0x82;
- public static final int SERVER_ERROR_STATUS = 0x85;
- public static final int KEY_DOES_NOT_EXIST_STATUS = 0x02;
- public static final int UNKNOWN_VERSION_STATUS = 0x83;
- public static final int COMMAND_TIMEOUT_STATUS = 0x86;
+ static final byte NO_ERROR_STATUS = 0x00;
+ static final int INVALID_MAGIC_OR_MESSAGE_ID_STATUS = 0x81;
+ static final int REQUEST_PARSING_ERROR_STATUS = 0x84;
+ static final byte NOT_PUT_REMOVED_REPLACED_STATUS = 0x01;
+ static final int UNKNOWN_COMMAND_STATUS = 0x82;
+ static final int SERVER_ERROR_STATUS = 0x85;
+ static final int KEY_DOES_NOT_EXIST_STATUS = 0x02;
+ static final int UNKNOWN_VERSION_STATUS = 0x83;
+ static final int COMMAND_TIMEOUT_STATUS = 0x86;
- public static final byte CLIENT_INTELLIGENCE_BASIC = 0x01;
- public static final byte CLIENT_INTELLIGENCE_TOPOLOGY_AWARE = 0x02;
- public static final byte CLIENT_INTELLIGENCE_HASH_DISTRIBUTION_AWARE = 0x03;
- Charset STRING_CHARSET = Charset.forName("UTF-8");
+ static final byte CLIENT_INTELLIGENCE_BASIC = 0x01;
+ static final byte CLIENT_INTELLIGENCE_TOPOLOGY_AWARE = 0x02;
+ static final byte CLIENT_INTELLIGENCE_HASH_DISTRIBUTION_AWARE = 0x03;
+ Charset HOTROD_STRING_CHARSET = Charset.forName("UTF-8");
+
+ static final byte[] DEFAULT_CACHE_NAME_BYTES = new byte[]{};
}
Deleted: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodOperationsHelper.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodOperationsHelper.java 2010-07-14 16:10:00 UTC (rev 2033)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodOperationsHelper.java 2010-07-14 16:50:13 UTC (rev 2034)
@@ -1,183 +0,0 @@
-package org.infinispan.client.hotrod.impl.protocol;
-
-import org.infinispan.client.hotrod.Flag;
-import org.infinispan.client.hotrod.exceptions.HotRodClientException;
-import org.infinispan.client.hotrod.exceptions.HotRodTimeoutException;
-import org.infinispan.client.hotrod.exceptions.InvalidResponseException;
-import org.infinispan.client.hotrod.impl.transport.Transport;
-import org.infinispan.util.logging.Log;
-import org.infinispan.util.logging.LogFactory;
-
-import java.net.InetSocketAddress;
-import java.util.LinkedHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * Helper class for factorizing common parts of read/write operations.
- *
- * @author Mircea.Markus at jboss.com
- * @since 4.1
- */
-public class HotRodOperationsHelper {
- static Log log = LogFactory.getLog(HotRodOperationsHelper.class);
- static final AtomicLong MSG_ID = new AtomicLong();
- final static byte CLIENT_INTELLIGENCE = HotRodConstants.CLIENT_INTELLIGENCE_HASH_DISTRIBUTION_AWARE;
-
- public static final byte[] DEFAULT_CACHE_NAME_BYTES = new byte[]{};
-
- public static long writeHeader(Transport transport, short operationCode, byte[] cacheName, AtomicInteger topologyId, Flag... flags) {
- transport.writeByte(HotRodConstants.REQUEST_MAGIC);
- long messageId = MSG_ID.incrementAndGet();
- transport.writeVLong(messageId);
- transport.writeByte(HotRodConstants.HOTROD_VERSION);
- transport.writeByte(operationCode);
- transport.writeArray(cacheName);
-
- int flagInt = 0;
- if (flags != null) {
- for (Flag flag : flags) {
- flagInt = flag.getFlagInt() | flagInt;
- }
- }
- transport.writeVInt(flagInt);
- transport.writeByte(CLIENT_INTELLIGENCE);
- transport.writeVInt(topologyId.get());
- if (log.isTraceEnabled()) {
- log.trace("wrote header for message " + messageId + ". Operation code: " + operationCode + ". Flags: " + Integer.toHexString(flagInt));
- }
- return messageId;
- }
-
- /**
- * Magic | Message Id | Op code | Status | Topology Change Marker
- */
- public static short readHeaderAndValidate(Transport transport, long messageId, short opRespCode, AtomicInteger topologyId) {
- short magic = transport.readByte();
- if (magic != HotRodConstants.RESPONSE_MAGIC) {
- String message = "Invalid magic number. Expected " + Integer.toHexString(HotRodConstants.RESPONSE_MAGIC) + " and received " + Integer.toHexString(magic);
- log.error(message);
- throw new InvalidResponseException(message);
- }
- long receivedMessageId = transport.readVLong();
- if (receivedMessageId != messageId) {
- String message = "Invalid message id. Expected " + Long.toHexString(messageId) + " and received " + Long.toHexString(receivedMessageId);
- log.error(message);
- throw new InvalidResponseException(message);
- }
- if (log.isTraceEnabled()) {
- log.trace("Received response for message id: " + receivedMessageId);
- }
- short receivedOpCode = transport.readByte();
- if (receivedOpCode != opRespCode) {
- if (receivedOpCode == HotRodConstants.ERROR_RESPONSE) {
- checkForErrorsInResponseStatus(transport.readByte(), messageId, transport);
- throw new IllegalStateException("Error expected! (i.e. exception in the prev statement)");
- }
- throw new InvalidResponseException("Invalid response operation. Expected " + Integer.toHexString(opRespCode) + " and received " + Integer.toHexString(receivedOpCode));
- }
- if (log.isTraceEnabled()) {
- log.trace("Received operation code is: " + receivedOpCode);
- }
- short status = transport.readByte();
- checkForErrorsInResponseStatus(status, messageId, transport);
- short topologyChangeByte = transport.readByte();
- if (topologyChangeByte == 1) {
- readNewTopologyAndHash(transport, topologyId);
- }
- return status;
- }
-
- static void readNewTopologyAndHash(Transport transport, AtomicInteger topologyId) {
- int newTopologyId = transport.readVInt();
- topologyId.set(newTopologyId);
- int numKeyOwners = transport.readUnsignedShort();
- short hashFunctionVersion = transport.readByte();
- int hashSpace = transport.readVInt();
- int clusterSize = transport.readVInt();
-
- if (log.isTraceEnabled()) {
- log.trace("Topology change request: newTopologyId=" + newTopologyId + ", numKeyOwners=" + numKeyOwners +
- ", hashFunctionVersion=" + hashFunctionVersion + ", hashSpaceSize=" + hashSpace + ", clusterSize=" + clusterSize);
- }
-
- LinkedHashMap<InetSocketAddress, Integer> servers2HashCode = new LinkedHashMap<InetSocketAddress, Integer>();
-
- for (int i = 0; i < clusterSize; i++) {
- String host = transport.readString();
- int port = transport.readUnsignedShort();
- if (log.isTraceEnabled()) {
- log.trace("Server read:" + host + ":" + port);
- }
- int hashCode = transport.read4ByteInt();
- servers2HashCode.put(new InetSocketAddress(host, port), hashCode);
- if (log.isTraceEnabled()) {
- log.trace("Hash code is: " + hashCode);
- }
- }
- if (log.isInfoEnabled()) {
- log.info("New topology: " + servers2HashCode);
- }
- transport.getTransportFactory().updateServers(servers2HashCode.keySet());
- if (hashFunctionVersion == 0) {
- if (log.isTraceEnabled())
- log.trace("Not using a consistent hash function (hash function version == 0).");
- } else {
- transport.getTransportFactory().updateHashFunction(servers2HashCode, numKeyOwners, hashFunctionVersion, hashSpace);
- }
- }
-
- static void checkForErrorsInResponseStatus(short status, long messageId, Transport transport) {
- if (log.isTraceEnabled()) {
- log.trace("Received operation status: " + status);
- }
- switch ((int) status) {
- case HotRodConstants.INVALID_MAGIC_OR_MESSAGE_ID_STATUS:
- case HotRodConstants.REQUEST_PARSING_ERROR_STATUS:
- case HotRodConstants.UNKNOWN_COMMAND_STATUS:
- case HotRodConstants.SERVER_ERROR_STATUS:
- case HotRodConstants.UNKNOWN_VERSION_STATUS: {
- String msgFromServer = transport.readString();
- if (log.isWarnEnabled()) {
- log.warn("Error status received from the server:" + msgFromServer + " for message id " + messageId);
- }
- throw new HotRodClientException(msgFromServer, messageId, status);
- }
- case HotRodConstants.COMMAND_TIMEOUT_STATUS: {
- if (log.isTraceEnabled()) {
- log.trace("timeout message received from the server");
- }
- throw new HotRodTimeoutException();
- }
- case HotRodConstants.NO_ERROR_STATUS:
- case HotRodConstants.KEY_DOES_NOT_EXIST_STATUS:
- case HotRodConstants.NOT_PUT_REMOVED_REPLACED_STATUS: {
- //don't do anything, these are correct responses
- break;
- }
- default: {
- throw new IllegalStateException("Unknown status: " + Integer.toHexString(status));
- }
- }
- }
-
- public static boolean ping(Transport transport, AtomicInteger topologyId) {
- try {
- long messageId = HotRodOperationsHelper.writeHeader(transport, HotRodConstants.PING_REQUEST, DEFAULT_CACHE_NAME_BYTES, topologyId);
- short respStatus = HotRodOperationsHelper.readHeaderAndValidate(transport, messageId, HotRodConstants.PING_RESPONSE, topologyId);
- if (respStatus == HotRodConstants.NO_ERROR_STATUS) {
- if (log.isTraceEnabled())
- log.trace("Successfully validated transport: " + transport);
- return true;
- } else {
- if (log.isTraceEnabled())
- log.trace("Unknown response status: " + respStatus);
- return false;
- }
- } catch (Exception e) {
- if (log.isTraceEnabled())
- log.trace("Failed to validate transport: " + transport, e);
- return false;
- }
- }
-}
Deleted: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodOperationsImpl.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodOperationsImpl.java 2010-07-14 16:10:00 UTC (rev 2033)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodOperationsImpl.java 2010-07-14 16:50:13 UTC (rev 2034)
@@ -1,421 +0,0 @@
-package org.infinispan.client.hotrod.impl.protocol;
-
-import org.infinispan.client.hotrod.Flag;
-import org.infinispan.client.hotrod.exceptions.InvalidResponseException;
-import org.infinispan.client.hotrod.exceptions.TransportException;
-import org.infinispan.client.hotrod.impl.BinaryVersionedValue;
-import org.infinispan.client.hotrod.impl.VersionedOperationResponse;
-import org.infinispan.client.hotrod.impl.transport.Transport;
-import org.infinispan.client.hotrod.impl.transport.TransportFactory;
-import org.infinispan.util.Util;
-import org.infinispan.util.logging.Log;
-import org.infinispan.util.logging.LogFactory;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * @author Mircea.Markus at jboss.com
- * @since 4.1
- */
-public class HotRodOperationsImpl implements HotRodOperations, HotRodConstants {
-
- private static Log log = LogFactory.getLog(HotRodOperationsImpl.class);
-
- private final byte[] cacheName;
- private TransportFactory transportFactory;
- private final AtomicInteger topologyId;
-
- public HotRodOperationsImpl(String cacheName, TransportFactory transportFactory, AtomicInteger topologyId) {
- this.cacheName = cacheName.getBytes(STRING_CHARSET);
- this.transportFactory = transportFactory;
- this.topologyId = topologyId;
- }
-
- public byte[] get(byte[] key, Flag[] flags) {
- Transport transport = getTransport(key, true);
- int retryCount = 0;
- do {
- try {
- try {
- short status = sendKeyOperation(key, transport, GET_REQUEST, flags, GET_RESPONSE);
- if (status == KEY_DOES_NOT_EXIST_STATUS) {
- return null;
- }
- if (status == NO_ERROR_STATUS) {
- return transport.readArray();
- }
- } finally {
- releaseTransport(transport);
- }
- } catch (TransportException te) {
- logErrorAndThrowExceptionIfNeeded(retryCount, te);
- }
- if (shouldRetry(retryCount)) {
- transport = getTransport(key, false);
- }
- retryCount++;
- } while (shouldRetry(retryCount));
- throw new IllegalStateException("We should not reach here!");
- }
-
- public byte[] remove(byte[] key, Flag[] flags) {
- Transport transport = getTransport(key, true);
- int retryCount = 0;
- do {
- try {
- short status = sendKeyOperation(key, transport, REMOVE_REQUEST, flags, REMOVE_RESPONSE);
- if (status == KEY_DOES_NOT_EXIST_STATUS) {
- return null;
- } else if (status == NO_ERROR_STATUS) {
- return returnPossiblePrevValue(transport, flags);
- }
- } catch (TransportException te) {
- logErrorAndThrowExceptionIfNeeded(retryCount, te);
- } finally {
- releaseTransport(transport);
- }
- if (shouldRetry(retryCount)) {
- transport = getTransport(key, false);
- }
- retryCount++;
- } while (shouldRetry(retryCount));
- throw new IllegalStateException("We should not reach here!");
- }
-
- public boolean containsKey(byte[] key, Flag... flags) {
- Transport transport = getTransport(key, true);
- int retryCount = 0;
- do {
- try {
- short status = sendKeyOperation(key, transport, CONTAINS_KEY_REQUEST, flags, CONTAINS_KEY_RESPONSE);
- if (status == KEY_DOES_NOT_EXIST_STATUS) {
- return false;
- } else if (status == NO_ERROR_STATUS) {
- return true;
- }
- } catch (TransportException te) {
- logErrorAndThrowExceptionIfNeeded(retryCount, te);
- }
- finally {
- releaseTransport(transport);
- }
- if (shouldRetry(retryCount)) {
- transport = getTransport(key, false);
- }
- retryCount++;
-
- } while (shouldRetry(retryCount));
- throw new IllegalStateException("We should not reach here!");
- }
-
- public BinaryVersionedValue getWithVersion(byte[] key, Flag... flags) {
- Transport transport = getTransport(key, true);
- int retryCount = 0;
- do {
- try {
- short status = sendKeyOperation(key, transport, GET_WITH_VERSION, flags, GET_WITH_VERSION_RESPONSE);
- if (status == KEY_DOES_NOT_EXIST_STATUS) {
- return null;
- }
- if (status == NO_ERROR_STATUS) {
- long version = transport.readLong();
- if (log.isTraceEnabled()) {
- log.trace("Received version: " + version);
- }
- byte[] value = transport.readArray();
- return new BinaryVersionedValue(version, value);
- }
- } catch (TransportException te) {
- logErrorAndThrowExceptionIfNeeded(retryCount, te);
- } finally {
- releaseTransport(transport);
- }
- if (shouldRetry(retryCount)) {
- transport = getTransport(key, false);
- }
- retryCount++;
- } while (shouldRetry(retryCount));
- throw new IllegalStateException("We should not reach here!");
- }
-
-
- public byte[] put(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags) {
- Transport transport = getTransport(key, true);
- int retryCount = 0;
- do {
- try {
- short status = sendPutOperation(key, value, transport, PUT_REQUEST, PUT_RESPONSE, lifespan, maxIdle, flags);
- if (status != NO_ERROR_STATUS) {
- throw new InvalidResponseException("Unexpected response status: " + Integer.toHexString(status));
- }
- return returnPossiblePrevValue(transport, flags);
- } catch (TransportException te) {
- logErrorAndThrowExceptionIfNeeded(retryCount, te);
- } finally {
- releaseTransport(transport);
- }
- if (shouldRetry(retryCount)) {
- transport = getTransport(key, false);
- }
- retryCount++;
- } while (shouldRetry(retryCount));
- throw new IllegalStateException("We should not reach here!");
- }
-
- public byte[] putIfAbsent(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags) {
- Transport transport = getTransport(key, true);
- int retryCount = 0;
- do {
- try {
- short status = sendPutOperation(key, value, transport, PUT_IF_ABSENT_REQUEST, PUT_IF_ABSENT_RESPONSE, lifespan, maxIdle, flags);
- if (status == NO_ERROR_STATUS || status == NOT_PUT_REMOVED_REPLACED_STATUS) {
- byte[] bytes = returnPossiblePrevValue(transport, flags);
- if (log.isTraceEnabled()) {
- log.trace("Returning from putIfAbsent: " + Util.printArray(bytes, false));
- }
- return bytes;
- }
- } catch (TransportException te) {
- logErrorAndThrowExceptionIfNeeded(retryCount, te);
- }
- finally {
- releaseTransport(transport);
- }
- if (shouldRetry(retryCount)) {
- transport = getTransport(key, false);
- }
- retryCount++;
- } while (shouldRetry(retryCount));
- throw new IllegalStateException("We should not reach here!");
- }
-
- public byte[] replace(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags) {
- Transport transport = getTransport(key, true);
- int retryCount = 0;
- do {
- try {
- short status = sendPutOperation(key, value, transport, REPLACE_REQUEST, REPLACE_RESPONSE, lifespan, maxIdle, flags);
- if (status == NO_ERROR_STATUS || status == NOT_PUT_REMOVED_REPLACED_STATUS) {
- return returnPossiblePrevValue(transport, flags);
- }
- } catch (TransportException te) {
- logErrorAndThrowExceptionIfNeeded(retryCount, te);
- } finally {
- releaseTransport(transport);
- }
- if (shouldRetry(retryCount)) {
- transport = getTransport(key, false);
- }
- retryCount++;
- } while (shouldRetry(retryCount));
- throw new IllegalStateException(" should not reach here!");
- }
-
- /**
- * request : [header][key length][key][lifespan][max idle][entry_version][value length][value] response: If
- * ForceReturnPreviousValue has been passed, this responses will contain previous [value length][value] for that key.
- * If the key does not exist or previous was null, value length would be 0. Otherwise, if no ForceReturnPreviousValue
- * was sent, the response would be empty.
- */
- public VersionedOperationResponse replaceIfUnmodified(byte[] key, byte[] value, int lifespan, int maxIdle, long version, Flag... flags) {
- Transport transport = getTransport(key, true);
- int retryCount = 0;
- do {
- try {
- // 1) write header
- long messageId = HotRodOperationsHelper.writeHeader(transport, REPLACE_IF_UNMODIFIED_REQUEST, cacheName, topologyId, flags);
-
- //2) write message body
- transport.writeArray(key);
- transport.writeVInt(lifespan);
- transport.writeVInt(maxIdle);
- transport.writeLong(version);
- transport.writeArray(value);
- return returnVersionedOperationResponse(transport, messageId, REPLACE_IF_UNMODIFIED_RESPONSE, flags);
- } catch (TransportException te) {
- logErrorAndThrowExceptionIfNeeded(retryCount, te);
- }
- finally {
- releaseTransport(transport);
- }
- if (shouldRetry(retryCount)) {
- transport = getTransport(key, false);
- }
- retryCount++;
- } while (shouldRetry(retryCount));
- throw new IllegalStateException(" should not reach here!");
- }
-
- /**
- * Request: [header][key length][key][entry_version]
- */
- public VersionedOperationResponse removeIfUnmodified(byte[] key, long version, Flag... flags) {
- Transport transport = getTransport(key, true);
- int retryCount = 0;
- do {
- try {
- // 1) write header
- long messageId = HotRodOperationsHelper.writeHeader(transport, REMOVE_IF_UNMODIFIED_REQUEST, cacheName, topologyId, flags);
-
- //2) write message body
- transport.writeArray(key);
- transport.writeLong(version);
-
- //process response and return
- return returnVersionedOperationResponse(transport, messageId, REMOVE_IF_UNMODIFIED_RESPONSE, flags);
-
- } catch (TransportException te) {
- logErrorAndThrowExceptionIfNeeded(retryCount, te);
- }
- finally {
- releaseTransport(transport);
- }
- if (shouldRetry(retryCount)) {
- transport = getTransport(key, false);
- }
- retryCount++;
- } while (shouldRetry(retryCount));
- throw new IllegalStateException("Should not reach this point!");
- }
-
- public void clear(Flag... flags) {
- Transport transport = transportFactory.getTransport();
- int retryCount = 0;
- do {
- try {
- // 1) write header
- long messageId = HotRodOperationsHelper.writeHeader(transport, CLEAR_REQUEST, cacheName, topologyId, flags);
- HotRodOperationsHelper.readHeaderAndValidate(transport, messageId, CLEAR_RESPONSE, topologyId);
- } catch (TransportException te) {
- logErrorAndThrowExceptionIfNeeded(retryCount, te);
- } finally {
- releaseTransport(transport);
- }
- if (shouldRetry(retryCount)) {
- transport = transportFactory.getTransport();
- }
- retryCount++;
-
- } while (shouldRetry(retryCount));
- }
-
- public Map<String, String> stats() {
- Transport transport = transportFactory.getTransport();
- try {
- // 1) write header
- long messageId = HotRodOperationsHelper.writeHeader(transport, STATS_REQUEST, cacheName, topologyId);
- HotRodOperationsHelper.readHeaderAndValidate(transport, messageId, STATS_RESPONSE, topologyId);
- int nrOfStats = transport.readVInt();
-
- Map<String, String> result = new HashMap<String, String>();
- for (int i = 0; i < nrOfStats; i++) {
- String statName = transport.readString();
- String statValue = transport.readString();
- result.put(statName, statValue);
- }
- return result;
- } finally {
- releaseTransport(transport);
- }
- }
-
- //[header][key length][key][lifespan][max idle][value length][value]
-
- private short sendPutOperation(byte[] key, byte[] value, Transport transport, short opCode, byte opRespCode, int lifespan, int maxIdle, Flag[] flags) {
- // 1) write header
- long messageId = HotRodOperationsHelper.writeHeader(transport, opCode, cacheName, topologyId, flags);
-
- // 2) write key and value
- transport.writeArray(key);
- transport.writeVInt(lifespan);
- transport.writeVInt(maxIdle);
- transport.writeArray(value);
- transport.flush();
-
- // 3) now read header
-
- //return status (not error status for sure)
- return HotRodOperationsHelper.readHeaderAndValidate(transport, messageId, opRespCode, topologyId);
- }
-
- /*
- * Magic | MessageId | Version | Opcode | CacheNameLength | CacheName | Flags | Client Intelligence | Topology Id
- */
-
- private boolean hasForceReturn(Flag[] flags) {
- if (flags == null) return false;
- for (Flag flag : flags) {
- if (flag == Flag.FORCE_RETURN_VALUE) return true;
- }
- return false;
- }
-
- private short sendKeyOperation(byte[] key, Transport transport, byte opCode, Flag[] flags, byte opRespCode) {
- // 1) write [header][key length][key]
- long messageId = HotRodOperationsHelper.writeHeader(transport, opCode, cacheName, topologyId, flags);
- transport.writeArray(key);
- transport.flush();
-
- // 2) now read the header
- return HotRodOperationsHelper.readHeaderAndValidate(transport, messageId, opRespCode, topologyId);
- }
-
- private byte[] returnPossiblePrevValue(Transport transport, Flag[] flags) {
- if (hasForceReturn(flags)) {
- byte[] bytes = transport.readArray();
- if (log.isTraceEnabled()) log.trace("Previous value bytes is: " + Util.printArray(bytes, false));
- //0-length response means null
- return bytes.length == 0 ? null : bytes;
- } else {
- return null;
- }
- }
-
- private void releaseTransport(Transport transport) {
- if (transport != null)
- transportFactory.releaseTransport(transport);
- }
-
- private VersionedOperationResponse returnVersionedOperationResponse(Transport transport, long messageId, byte response, Flag[] flags) {
- //3) ...
- short respStatus = HotRodOperationsHelper.readHeaderAndValidate(transport, messageId, response, topologyId);
-
- //4 ...
- VersionedOperationResponse.RspCode code;
- if (respStatus == NO_ERROR_STATUS) {
- code = VersionedOperationResponse.RspCode.SUCCESS;
- } else if (respStatus == NOT_PUT_REMOVED_REPLACED_STATUS) {
- code = VersionedOperationResponse.RspCode.MODIFIED_KEY;
- } else if (respStatus == KEY_DOES_NOT_EXIST_STATUS) {
- code = VersionedOperationResponse.RspCode.NO_SUCH_KEY;
- } else {
- throw new IllegalStateException("Unknown response status: " + Integer.toHexString(respStatus));
- }
- byte[] prevValue = returnPossiblePrevValue(transport, flags);
- return new VersionedOperationResponse(prevValue, code);
- }
-
- private void logErrorAndThrowExceptionIfNeeded(int i, TransportException te) {
- String message = "Transport exception. Retry " + i + " out of " + transportFactory.getTransportCount();
- if (i == transportFactory.getTransportCount() - 1 || transportFactory.getTransportCount() < 0) {
- log.warn(message, te);
- throw te;
- } else {
- log.trace(message + ":" + te);
- }
- }
-
- private Transport getTransport(byte[] key, boolean hashAware) {
- if (hashAware) {
- return transportFactory.getTransport(key);
- } else {
- return transportFactory.getTransport();
- }
- }
-
- private boolean shouldRetry(int retryCount) {
- return retryCount < transportFactory.getTransportCount();
- }
-}
Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/AbstractTransport.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/AbstractTransport.java 2010-07-14 16:10:00 UTC (rev 2033)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/AbstractTransport.java 2010-07-14 16:50:13 UTC (rev 2034)
@@ -28,7 +28,7 @@
@Override
public String readString() {
byte[] strContent = readArray();
- String readString = new String(strContent, HotRodConstants.STRING_CHARSET);
+ String readString = new String(strContent, HotRodConstants.HOTROD_STRING_CHARSET);
if (log.isTraceEnabled()) {
log.trace("Read string is: " + readString);
}
@@ -80,7 +80,7 @@
@Override
public void writeString(String string) {
if (!string.isEmpty()) {
- writeArray(string.getBytes(HotRodConstants.STRING_CHARSET));
+ writeArray(string.getBytes(HotRodConstants.HOTROD_STRING_CHARSET));
} else {
writeVInt(0);
}
Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TransportObjectFactory.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TransportObjectFactory.java 2010-07-14 16:10:00 UTC (rev 2033)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TransportObjectFactory.java 2010-07-14 16:50:13 UTC (rev 2034)
@@ -1,7 +1,7 @@
package org.infinispan.client.hotrod.impl.transport.tcp;
import org.apache.commons.pool.BaseKeyedPoolableObjectFactory;
-import org.infinispan.client.hotrod.impl.protocol.HotRodOperationsHelper;
+import org.infinispan.client.hotrod.impl.operations.PingOperation;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
@@ -37,7 +37,7 @@
log.trace("Executing first ping!");
firstPingExecuted = true;
try {
- HotRodOperationsHelper.ping(tcpTransport, topologyId);
+ ping(tcpTransport, topologyId);
} catch (Exception e) {
log.trace("Ignoring ping request failure during ping on startup: " + e.getMessage());
}
@@ -45,6 +45,11 @@
return tcpTransport;
}
+ private boolean ping(TcpTransport tcpTransport, AtomicInteger topologyId) {
+ PingOperation po = new PingOperation(null, topologyId, tcpTransport);
+ return (Boolean)po.execute();
+ }
+
/**
* This will be called by the test thread when testWhileIdle==true.
*/
@@ -54,7 +59,7 @@
if (log.isTraceEnabled()) {
log.trace("About to validate(ping) connection to server " + key + ". TcpTransport is " + transport);
}
- return HotRodOperationsHelper.ping(transport, topologyId);
+ return ping(transport, topologyId);
}
@Override
Modified: trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HeavyLoadConnectionPoolingTest.java
===================================================================
--- trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HeavyLoadConnectionPoolingTest.java 2010-07-14 16:10:00 UTC (rev 2033)
+++ trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HeavyLoadConnectionPoolingTest.java 2010-07-14 16:50:13 UTC (rev 2034)
@@ -76,7 +76,7 @@
}
for (WorkerThread wt: workers) {
- wt.interrupt();
+ wt.stopWorker();
wt.waitToFinish();
}
//now wait for the idle thread to wake up and clean them
Modified: trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HotRodIntegrationTest.java
===================================================================
--- trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HotRodIntegrationTest.java 2010-07-14 16:10:00 UTC (rev 2033)
+++ trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HotRodIntegrationTest.java 2010-07-14 16:50:13 UTC (rev 2034)
@@ -9,7 +9,7 @@
import org.infinispan.server.hotrod.HotRodServer;
import org.infinispan.test.SingleCacheManagerTest;
import org.infinispan.test.fwk.TestCacheManagerFactory;
-import org.infinispan.util.ByteArrayKey;
+import org.infinispan.util.ByteArrayKey;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.testng.annotations.AfterClass;
Deleted: trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/NettyHotRodIntegrationTest.java
===================================================================
--- trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/NettyHotRodIntegrationTest.java 2010-07-14 16:10:00 UTC (rev 2033)
+++ trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/NettyHotRodIntegrationTest.java 2010-07-14 16:50:13 UTC (rev 2034)
@@ -1,24 +0,0 @@
-package org.infinispan.client.hotrod;
-
-import org.infinispan.client.hotrod.impl.transport.netty.NettyTransportFactory;
-import org.infinispan.config.Configuration;
-import org.infinispan.test.fwk.TestCacheManagerFactory;
-import org.testng.annotations.Test;
-
-import java.util.Properties;
-
-/**
- * @author Mircea.Markus at jboss.com
- * @since 4.1
- */
- at Test(testName = "hotrod.NettyHotRodIntegrationTest", groups = "functional")
-public class NettyHotRodIntegrationTest extends HotRodIntegrationTest {
-
- @Override
- protected RemoteCacheManager getRemoteCacheManager() {
- Properties props = new Properties();
- props.put("transport-factory", NettyTransportFactory.class.getName());
- props.put("hotrod-servers", "127.0.0.1:" + hotrodServer.getPort());
- return new RemoteCacheManager(props, true);
- }
-}
Modified: trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/RemoteCacheManagerTest.java
===================================================================
--- trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/RemoteCacheManagerTest.java 2010-07-14 16:10:00 UTC (rev 2033)
+++ trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/RemoteCacheManagerTest.java 2010-07-14 16:50:13 UTC (rev 2034)
@@ -31,8 +31,12 @@
@AfterTest(alwaysRun = true)
public void release() {
- if (cacheManager != null) cacheManager.stop();
- if (hotrodServer != null) hotrodServer.stop();
+ try {
+ if (cacheManager != null) cacheManager.stop();
+ if (hotrodServer != null) hotrodServer.stop();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
if (prevValue != null) {
System.setProperty(RemoteCacheManager.OVERRIDE_HOTROD_SERVERS, prevValue);
} else {
Modified: trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ReplTopologyChangeTest.java
===================================================================
--- trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ReplTopologyChangeTest.java 2010-07-14 16:10:00 UTC (rev 2033)
+++ trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ReplTopologyChangeTest.java 2010-07-14 16:50:13 UTC (rev 2034)
@@ -41,11 +41,19 @@
protected void clearContent() throws Throwable {
}
- @AfterClass
+ @AfterClass (alwaysRun = true)
@Override
protected void destroy() {
+ try {
hotRodServer1.stop();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ try {
hotRodServer2.stop();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
super.destroy();
}
@@ -112,6 +120,7 @@
public void testDropServer() {
hotRodServer3.stop();
manager(2).stop();
+ log.trace("Just stopped server 2");
waitForClusterToForm(2);
@@ -138,7 +147,7 @@
protected void waitForClusterToForm(int memberCount) {
TestingUtil.blockUntilViewReceived(manager(0).getCache(), memberCount, 30000);
for (int i = 0; i < memberCount; i++) {
- TestingUtil.blockUntilCacheStatusAchieved(manager(i).getCache(), ComponentStatus.RUNNING, 10000);
+ TestingUtil.blockUntilCacheStatusAchieved(manager(i).getCache(), ComponentStatus.RUNNING, 30000);
}
}
}
Modified: trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/WorkerThread.java
===================================================================
--- trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/WorkerThread.java 2010-07-14 16:10:00 UTC (rev 2033)
+++ trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/WorkerThread.java 2010-07-14 16:50:13 UTC (rev 2034)
@@ -30,6 +30,7 @@
volatile String key;
volatile String value;
volatile boolean finished = false;
+ volatile boolean stopWorker = false;
public WorkerThread(RemoteCache remoteCache) {
super("WorkerThread-" + WORKER_INDEX.getAndIncrement());
@@ -70,7 +71,7 @@
private void stress_() {
Random rnd = new Random();
- while (!isInterrupted()) {
+ while (!stopWorker) {
remoteCache.put(rnd.nextLong(), rnd.nextLong());
try {
Thread.sleep(50);
@@ -152,4 +153,8 @@
}
}
}
+
+ public void stopWorker() {
+ this.stopWorker = true;
+ }
}
More information about the infinispan-commits
mailing list