[infinispan-commits] Infinispan SVN: r2033 - in branches/4.1.x/client/hotrod-client: src/main/java/org/infinispan/client/hotrod and 6 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Wed Jul 14 12:10:02 EDT 2010
Author: mircea.markus
Date: 2010-07-14 12:10:00 -0400 (Wed, 14 Jul 2010)
New Revision: 2033
Added:
branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/
branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/AbstractKeyOperation.java
branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/AbstractKeyValueOperation.java
branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/ClearOperation.java
branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/ContainsKeyOperation.java
branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/GetOperation.java
branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/GetWithVersionOperation.java
branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/HotRodOperation.java
branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/OperationsFactory.java
branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/PingOperation.java
branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/PutIfAbsentOperation.java
branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/PutOperation.java
branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/RemoveIfUnmodifiedOperation.java
branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/RemoveOperation.java
branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/ReplaceIfUnmodifiedOperation.java
branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/ReplaceOperation.java
branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/RetryOnFailureOperation.java
branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/StatsOperation.java
Removed:
branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodOperationsHelper.java
branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodOperationsImpl.java
branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/
branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/NettyHotRodIntegrationTest.java
Modified:
branches/4.1.x/client/hotrod-client/pom.xml
branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCache.java
branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java
branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/RemoteCacheImpl.java
branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodConstants.java
branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/AbstractTransport.java
branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TransportObjectFactory.java
branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HeavyLoadConnectionPoolingTest.java
branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HotRodIntegrationTest.java
branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/RemoteCacheManagerTest.java
branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ReplTopologyChangeTest.java
branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/WorkerThread.java
Log:
ISPN-538 - remove code duplication from HotRodOperationsImpl
- removed dependecy on netty
Modified: branches/4.1.x/client/hotrod-client/pom.xml
===================================================================
--- branches/4.1.x/client/hotrod-client/pom.xml 2010-07-14 10:28:12 UTC (rev 2032)
+++ branches/4.1.x/client/hotrod-client/pom.xml 2010-07-14 16:10:00 UTC (rev 2033)
@@ -10,10 +10,6 @@
<relativePath>../../parent/pom.xml</relativePath>
</parent>
- <properties>
- <version.netty>3.2.1.Final</version.netty>
- </properties>
-
<artifactId>infinispan-client-hotrod</artifactId>
<name>Infinispan Client Hotrod Module</name>
<description>Infinispan client hotrod module</description>
@@ -50,13 +46,6 @@
</dependency>
<dependency>
- <groupId>org.jboss.netty</groupId>
- <artifactId>netty</artifactId>
- <version>${version.netty}</version>
- <scope>compile</scope>
- </dependency>
-
- <dependency>
<groupId>commons-pool</groupId>
<artifactId>commons-pool</artifactId>
<version>1.5.4</version>
Modified: branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCache.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCache.java 2010-07-14 10:28:12 UTC (rev 2032)
+++ branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCache.java 2010-07-14 16:10:00 UTC (rev 2033)
@@ -56,7 +56,6 @@
*/
public interface RemoteCache<K, V> extends Cache<K, V> {
-
/**
* Removes the given entry only if its version matches the supplied version. A typical use case looks like this:
* <pre>
Modified: branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java 2010-07-14 10:28:12 UTC (rev 2032)
+++ branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java 2010-07-14 16:10:00 UTC (rev 2033)
@@ -2,19 +2,15 @@
import org.infinispan.client.hotrod.exceptions.HotRodClientException;
import org.infinispan.client.hotrod.impl.async.DefaultAsyncExecutorFactory;
-import org.infinispan.client.hotrod.impl.protocol.HotRodOperations;
-import org.infinispan.client.hotrod.impl.protocol.HotRodOperationsImpl;
+import org.infinispan.client.hotrod.impl.operations.OperationsFactory;
import org.infinispan.client.hotrod.impl.RemoteCacheImpl;
import org.infinispan.client.hotrod.impl.transport.TransportFactory;
-
import org.infinispan.client.hotrod.impl.transport.tcp.TcpTransportFactory;
-import org.infinispan.config.ConfigurationException;
import org.infinispan.executors.ExecutorFactory;
import org.infinispan.manager.CacheContainer;
import org.infinispan.marshall.Marshaller;
import org.infinispan.marshall.jboss.GenericJBossMarshaller;
import org.infinispan.util.TypedProperties;
-import org.infinispan.util.Util;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
@@ -30,7 +26,6 @@
import java.util.StringTokenizer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
-
import static org.infinispan.util.Util.getInstance;
/**
@@ -144,8 +139,13 @@
public static final String OVERRIDE_HOTROD_SERVERS = "infinispan.hotrod.client.servers";
private static final String KEY_SIZE = "marshaller.default-array-size.key";
+
private static final String VALUE_SIZE = "marshaller.default-array-size.value";
+ private static final int DEFAULT_KEY_SIZE = 64;
+ private static final int DEFAULT_VALUE_SIZE = 512;
+
+
private TypedProperties props;
private TransportFactory transportFactory;
private Marshaller marshaller;
@@ -154,8 +154,6 @@
private ExecutorService asyncExecutorService;
private final Map<String, RemoteCacheImpl> cacheName2RemoteCache = new HashMap<String, RemoteCacheImpl>();
private AtomicInteger topologyId = new AtomicInteger();
- private static final int DEFAULT_KEY_SIZE = 64;
- private static final int DEFAULT_VALUE_SIZE = 512;
/**
@@ -175,10 +173,10 @@
}
/**
- * Same as {@link org.infinispan.client.hotrod.RemoteCacheManager#RemoteCacheManager(HotRodMarshaller, java.util.Properties, boolean)} with start = true.
+ * Same as {@link org.infinispan.client.hotrod.RemoteCacheManager#RemoteCacheManager(Marshaller, java.util.Properties, boolean)} with start = true.
*/
- public RemoteCacheManager(Marshaller marshaller, Properties props) {
- this(marshaller, props, false);
+ public RemoteCacheManager(Marshaller hotRodMarshaller, Properties props) {
+ this(hotRodMarshaller, props, false);
}
/**
@@ -325,8 +323,7 @@
if (props.contains("asyn-executor-factory")) {
asyncExecutorClass = props.getProperty("asyn-executor-factory");
}
- ExecutorFactory executorFactory = null;
- executorFactory = (ExecutorFactory) getInstance(asyncExecutorClass);
+ ExecutorFactory executorFactory = (ExecutorFactory) getInstance(asyncExecutorClass);
asyncExecutorService = executorFactory.getExecutor(props);
@@ -343,7 +340,9 @@
@Override
public void stop() {
- transportFactory.destroy();
+ if (isStarted()) {
+ transportFactory.destroy();
+ }
started = false;
}
@@ -363,10 +362,8 @@
private <K, V> RemoteCache<K, V> createRemoteCache(String cacheName, boolean forceReturnValue) {
synchronized (cacheName2RemoteCache) {
if (!cacheName2RemoteCache.containsKey(cacheName)) {
- RemoteCacheImpl<K, V> result = new RemoteCacheImpl<K, V>(this, cacheName, forceReturnValue);
- if (isStarted()) {
- startRemoteCache(result);
- }
+ RemoteCacheImpl<K, V> result = new RemoteCacheImpl<K, V>(this, cacheName);
+ startRemoteCache(result);
cacheName2RemoteCache.put(cacheName, result);
return result;
} else {
@@ -376,9 +373,8 @@
}
private <K, V> void startRemoteCache(RemoteCacheImpl<K, V> result) {
- HotRodOperations hotRodOperations = new HotRodOperationsImpl(result.getName(), transportFactory, topologyId);
- result.init(hotRodOperations, marshaller, asyncExecutorService,
- props.getIntProperty(KEY_SIZE, DEFAULT_KEY_SIZE),
+ OperationsFactory operationsFactory = new OperationsFactory(transportFactory, result.getName(), topologyId, forceReturnValueDefault);
+ result.init(marshaller, asyncExecutorService, operationsFactory, props.getIntProperty(KEY_SIZE, DEFAULT_KEY_SIZE),
props.getIntProperty(VALUE_SIZE, DEFAULT_VALUE_SIZE));
}
Modified: branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/RemoteCacheImpl.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/RemoteCacheImpl.java 2010-07-14 10:28:12 UTC (rev 2032)
+++ branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/RemoteCacheImpl.java 2010-07-14 16:10:00 UTC (rev 2033)
@@ -9,7 +9,7 @@
import org.infinispan.client.hotrod.exceptions.RemoteCacheManagerNotStartedException;
import org.infinispan.client.hotrod.exceptions.TransportException;
import org.infinispan.client.hotrod.impl.async.NotifyingFutureImpl;
-import org.infinispan.client.hotrod.impl.protocol.HotRodOperations;
+import org.infinispan.client.hotrod.impl.operations.*;
import org.infinispan.marshall.Marshaller;
import org.infinispan.util.concurrent.NotifyingFuture;
import org.infinispan.util.logging.Log;
@@ -30,31 +30,27 @@
private static Log log = LogFactory.getLog(RemoteCacheImpl.class);
- private static final Flag[] FORCE_RETURN_VALUE = {Flag.FORCE_RETURN_VALUE};
-
- private ThreadLocal<Flag[]> flagsMap = new ThreadLocal<Flag[]>();
- private HotRodOperations operations;
private Marshaller marshaller;
private final String name;
private final RemoteCacheManager remoteCacheManager;
private volatile ExecutorService executorService;
- private volatile boolean forceReturnValue;
+ private OperationsFactory operationsFactory;
private int estimateKeySize;
private int estimateValueSize;
- public RemoteCacheImpl(RemoteCacheManager rcm, String name, boolean forceReturnValue) {
+
+ public RemoteCacheImpl(RemoteCacheManager rcm, String name) {
if (log.isTraceEnabled()) {
log.trace("Creating remote cache: " + name);
}
this.name = name;
- this.forceReturnValue = forceReturnValue;
this.remoteCacheManager = rcm;
}
- public void init(HotRodOperations operations, Marshaller marshaller, ExecutorService executorService, int estimateKeySize, int estimateValueSize) {
- this.operations = operations;
+ public void init(Marshaller marshaller, ExecutorService executorService, OperationsFactory operationsFactory, int estimateKeySize, int estimateValueSize) {
this.marshaller = marshaller;
this.executorService = executorService;
+ this.operationsFactory = operationsFactory;
this.estimateKeySize = estimateKeySize;
this.estimateValueSize = estimateValueSize;
}
@@ -63,18 +59,11 @@
return remoteCacheManager;
}
- private byte[] obj2bytes(Object o, boolean isKey) {
- try {
- return marshaller.objectToByteBuffer(o, isKey ? estimateKeySize : estimateValueSize);
- } catch (IOException ioe) {
- throw new TransportException("Unable to marshall object of type [" + o.getClass().getName() + "]", ioe);
- }
- }
-
@Override
public boolean removeWithVersion(K key, long version) {
assertRemoteCacheManagerIsStarted();
- VersionedOperationResponse response = operations.removeIfUnmodified(obj2bytes(key, true), version, flags());
+ RemoveIfUnmodifiedOperation op = operationsFactory.newRemoveIfUnmodifiedOperation(obj2bytes(key, true), version);
+ VersionedOperationResponse response = (VersionedOperationResponse) op.execute();
return response.getCode().isUpdated();
}
@@ -97,7 +86,8 @@
@Override
public boolean replaceWithVersion(K key, V newValue, long version, int lifespanSeconds, int maxIdleTimeSeconds) {
assertRemoteCacheManagerIsStarted();
- VersionedOperationResponse response = operations.replaceIfUnmodified(obj2bytes(key, true), obj2bytes(newValue, false), lifespanSeconds, maxIdleTimeSeconds, version, flags());
+ ReplaceIfUnmodifiedOperation op = operationsFactory.newReplaceIfUnmodifiedOperation(obj2bytes(key, true), obj2bytes(newValue, false), lifespanSeconds, maxIdleTimeSeconds, version);
+ VersionedOperationResponse response = (VersionedOperationResponse) op.execute();
return response.getCode().isUpdated();
}
@@ -120,7 +110,8 @@
@Override
public VersionedValue<V> getVersioned(K key) {
assertRemoteCacheManagerIsStarted();
- BinaryVersionedValue value = operations.getWithVersion(obj2bytes(key, true), flags());
+ GetWithVersionOperation op = operationsFactory.newGetWithVersionOperation(obj2bytes(key, true));
+ BinaryVersionedValue value = (BinaryVersionedValue) op.execute();
return binary2VersionedValue(value);
}
@@ -152,7 +143,8 @@
@Override
public ServerStatistics stats() {
assertRemoteCacheManagerIsStarted();
- Map<String, String> statsMap = operations.stats();
+ StatsOperation op = operationsFactory.newStatsOperation();
+ Map<String, String> statsMap = (Map<String, String>) op.execute();
ServerStatisticsImpl stats = new ServerStatisticsImpl();
for (Map.Entry<String, String> entry : statsMap.entrySet()) {
stats.addStats(entry.getKey(), entry.getValue());
@@ -161,16 +153,6 @@
}
@Override
- public String getName() {
- return name;
- }
-
- @Override
- public String getVersion() {
- return Version.getProtocolVersion();
- }
-
- @Override
public V put(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdleTime, TimeUnit maxIdleTimeUnit) {
assertRemoteCacheManagerIsStarted();
int lifespanSecs = toSeconds(lifespan, lifespanUnit);
@@ -178,7 +160,8 @@
if (log.isTraceEnabled()) {
log.trace("About to add (K,V): (" + key + ", " + value + ") lifespanSecs:" + lifespanSecs + ", maxIdleSecs:" + maxIdleSecs);
}
- byte[] result = operations.put(obj2bytes(key, true), obj2bytes(value, false), lifespanSecs, maxIdleSecs, flags());
+ PutOperation op = operationsFactory.newPutKeyValueOperation(obj2bytes(key, true), obj2bytes(value, false), lifespanSecs, maxIdleSecs);
+ byte[] result = (byte[]) op.execute();
return (V) bytes2obj(result);
}
@@ -188,7 +171,8 @@
assertRemoteCacheManagerIsStarted();
int lifespanSecs = toSeconds(lifespan, lifespanUnit);
int maxIdleSecs = toSeconds(maxIdleTime, maxIdleTimeUnit);
- byte[] bytes = operations.putIfAbsent(obj2bytes(key, true), obj2bytes(value, false), lifespanSecs, maxIdleSecs, flags());
+ PutIfAbsentOperation op = operationsFactory.newPutIfAbsentOperation(obj2bytes(key, true), obj2bytes(value, false), lifespanSecs, maxIdleSecs);
+ byte[] bytes = (byte[]) op.execute();
return (V) bytes2obj(bytes);
}
@@ -197,7 +181,8 @@
assertRemoteCacheManagerIsStarted();
int lifespanSecs = toSeconds(lifespan, lifespanUnit);
int maxIdleSecs = toSeconds(maxIdleTime, maxIdleTimeUnit);
- byte[] bytes = operations.replace(obj2bytes(key, true), obj2bytes(value, false), lifespanSecs, maxIdleSecs, flags());
+ ReplaceOperation op = operationsFactory.newReplaceOperation(obj2bytes(key, true), obj2bytes(value, false), lifespanSecs, maxIdleSecs);
+ byte[] bytes = (byte[]) op.execute();
return (V) bytes2obj(bytes);
}
@@ -284,27 +269,32 @@
@Override
public boolean containsKey(Object key) {
assertRemoteCacheManagerIsStarted();
- return operations.containsKey(obj2bytes(key, true), flags());
+ ContainsKeyOperation op = operationsFactory.newContainsKeyOperation(obj2bytes(key, true));
+ return (Boolean)op.execute();
}
@Override
public V get(Object key) {
assertRemoteCacheManagerIsStarted();
- byte[] bytes = operations.get(obj2bytes(key, true), flags());
+ byte[] keyBytes = obj2bytes(key, true);
+ GetOperation gco = operationsFactory.newGetKeyOperation(keyBytes);
+ byte[] bytes = (byte[]) gco.execute();
return (V) bytes2obj(bytes);
}
@Override
public V remove(Object key) {
assertRemoteCacheManagerIsStarted();
- byte[] existingValue = operations.remove(obj2bytes(key, true), flags());
+ RemoveOperation removeOperation = operationsFactory.newRemoveOperation(obj2bytes(key, true));
+ byte[] existingValue = (byte[]) removeOperation.execute();
return (V) bytes2obj(existingValue);
}
@Override
public void clear() {
assertRemoteCacheManagerIsStarted();
- operations.clear(flags());
+ ClearOperation op = operationsFactory.newClearOperation() ;
+ op.execute();
}
@Override
@@ -321,20 +311,29 @@
}
}
+ @Override
+ public String getName() {
+ return name;
+ }
@Override
- public RemoteCache withFlags(Flag... flags) {
- this.flagsMap.set(flags);
+ public String getVersion() {
+ return Version.getProtocolVersion();
+ }
+
+ @Override
+ public RemoteCache<K, V> withFlags(Flag... flags) {
+ operationsFactory.setFlags(flags);
return this;
}
- private Flag[] flags() {
- Flag[] flags = this.flagsMap.get();
- this.flagsMap.remove();
- if (flags == null && forceReturnValue) {
- return FORCE_RETURN_VALUE;
+
+ private byte[] obj2bytes(Object o, boolean isKey) {
+ try {
+ return marshaller.objectToByteBuffer(o, isKey ? estimateKeySize : estimateValueSize);
+ } catch (IOException ioe) {
+ throw new TransportException("Unable to marshall object of type [" + o.getClass().getName() + "]", ioe);
}
- return flags;
}
private Object bytes2obj(byte[] bytes) {
Added: branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/AbstractKeyOperation.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/AbstractKeyOperation.java (rev 0)
+++ branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/AbstractKeyOperation.java 2010-07-14 16:10:00 UTC (rev 2033)
@@ -0,0 +1,88 @@
+package org.infinispan.client.hotrod.impl.operations;
+
+import net.jcip.annotations.Immutable;
+import org.infinispan.client.hotrod.Flag;
+import org.infinispan.client.hotrod.impl.VersionedOperationResponse;
+import org.infinispan.client.hotrod.impl.transport.Transport;
+import org.infinispan.client.hotrod.impl.transport.TransportFactory;
+import org.infinispan.util.Util;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Basic class for all hot rod operations that manipulate a key.
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+ at Immutable
+public abstract class AbstractKeyOperation extends RetryOnFailureOperation {
+
+ private static Log log = LogFactory.getLog(AbstractKeyOperation.class);
+
+ protected final byte[] key;
+
+ protected AbstractKeyOperation(TransportFactory transportFactory, byte[] key, byte[] cacheName, AtomicInteger topologyId, Flag[] flags) {
+ super(transportFactory, cacheName, topologyId, flags);
+ this.key = key;
+ }
+
+ @Override
+ protected Transport getTransport(int retryCount) {
+ if (retryCount == 0) {
+ return transportFactory.getTransport(key);
+ } else {
+ return transportFactory.getTransport();
+ }
+ }
+
+ protected short sendKeyOperation(byte[] key, Transport transport, byte opCode, byte opRespCode) {
+ // 1) write [header][key length][key]
+ long messageId = writeHeader(transport, opCode);
+ transport.writeArray(key);
+ transport.flush();
+
+ // 2) now read the header
+ return readHeaderAndValidate(transport, messageId, opRespCode);
+ }
+
+ protected byte[] returnPossiblePrevValue(Transport transport) {
+ if (hasForceReturn(flags)) {
+ byte[] bytes = transport.readArray();
+ if (log.isTraceEnabled()) log.trace("Previous value bytes is: " + Util.printArray(bytes, false));
+ //0-length response means null
+ return bytes.length == 0 ? null : bytes;
+ } else {
+ return null;
+ }
+ }
+
+ private boolean hasForceReturn(Flag[] flags) {
+ if (flags == null) return false;
+ for (Flag flag : flags) {
+ if (flag == Flag.FORCE_RETURN_VALUE) return true;
+ }
+ return false;
+ }
+
+ protected VersionedOperationResponse returnVersionedOperationResponse(Transport transport, long messageId, byte response) {
+ //3) ...
+ short respStatus = readHeaderAndValidate(transport, messageId, response);
+
+ //4 ...
+ VersionedOperationResponse.RspCode code;
+ if (respStatus == NO_ERROR_STATUS) {
+ code = VersionedOperationResponse.RspCode.SUCCESS;
+ } else if (respStatus == NOT_PUT_REMOVED_REPLACED_STATUS) {
+ code = VersionedOperationResponse.RspCode.MODIFIED_KEY;
+ } else if (respStatus == KEY_DOES_NOT_EXIST_STATUS) {
+ code = VersionedOperationResponse.RspCode.NO_SUCH_KEY;
+ } else {
+ throw new IllegalStateException("Unknown response status: " + Integer.toHexString(respStatus));
+ }
+ byte[] prevValue = returnPossiblePrevValue(transport);
+ return new VersionedOperationResponse(prevValue, code);
+ }
+}
Added: branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/AbstractKeyValueOperation.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/AbstractKeyValueOperation.java (rev 0)
+++ branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/AbstractKeyValueOperation.java 2010-07-14 16:10:00 UTC (rev 2033)
@@ -0,0 +1,50 @@
+package org.infinispan.client.hotrod.impl.operations;
+
+import net.jcip.annotations.Immutable;
+import org.infinispan.client.hotrod.Flag;
+import org.infinispan.client.hotrod.impl.transport.Transport;
+import org.infinispan.client.hotrod.impl.transport.TransportFactory;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Base class for all operations that manipulate a key and a value.
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+ at Immutable
+public abstract class AbstractKeyValueOperation extends AbstractKeyOperation {
+
+ protected final byte[] value;
+
+ protected final int lifespan;
+
+ protected final int maxIdle;
+
+ protected AbstractKeyValueOperation(TransportFactory transportFactory, byte[] key, byte[] cacheName,
+ AtomicInteger topologyId, Flag[] flags, byte[] value, int lifespan, int maxIdle) {
+ super(transportFactory, key, cacheName, topologyId, flags);
+ this.value = value;
+ this.lifespan = lifespan;
+ this.maxIdle = maxIdle;
+ }
+
+ //[header][key length][key][lifespan][max idle][value length][value]
+ protected short sendPutOperation(Transport transport, short opCode, byte opRespCode) {
+ // 1) write header
+ long messageId = writeHeader(transport, opCode);
+
+ // 2) write key and value
+ transport.writeArray(key);
+ transport.writeVInt(lifespan);
+ transport.writeVInt(maxIdle);
+ transport.writeArray(value);
+ transport.flush();
+
+ // 3) now read header
+
+ //return status (not error status for sure)
+ return readHeaderAndValidate(transport, messageId, opRespCode);
+ }
+}
Added: branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/ClearOperation.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/ClearOperation.java (rev 0)
+++ branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/ClearOperation.java 2010-07-14 16:10:00 UTC (rev 2033)
@@ -0,0 +1,34 @@
+package org.infinispan.client.hotrod.impl.operations;
+
+import net.jcip.annotations.Immutable;
+import org.infinispan.client.hotrod.Flag;
+import org.infinispan.client.hotrod.impl.transport.Transport;
+import org.infinispan.client.hotrod.impl.transport.TransportFactory;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Corresponds to clear operation as defined by <a href="http://community.jboss.org/wiki/HotRodProtocol">Hot Rod protocol specification</a>.
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+ at Immutable
+public class ClearOperation extends RetryOnFailureOperation {
+
+ public ClearOperation(TransportFactory transportFactory, byte[] cacheName, AtomicInteger topologyId, Flag[] flags) {
+ super(transportFactory, cacheName, topologyId, flags);
+ }
+
+ @Override
+ protected Transport getTransport(int retryCount) {
+ return transportFactory.getTransport();
+ }
+
+ @Override
+ protected Object executeOperation(Transport transport) {
+ long messageId = writeHeader(transport, CLEAR_REQUEST);
+ readHeaderAndValidate(transport, messageId, CLEAR_RESPONSE);
+ return null;
+ }
+}
Added: branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/ContainsKeyOperation.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/ContainsKeyOperation.java (rev 0)
+++ branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/ContainsKeyOperation.java 2010-07-14 16:10:00 UTC (rev 2033)
@@ -0,0 +1,34 @@
+package org.infinispan.client.hotrod.impl.operations;
+
+import net.jcip.annotations.Immutable;
+import org.infinispan.client.hotrod.Flag;
+import org.infinispan.client.hotrod.impl.transport.Transport;
+import org.infinispan.client.hotrod.impl.transport.TransportFactory;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Implemets "containsKey" operation as described in <a href="http://community.jboss.org/wiki/HotRodProtocol">Hot Rod protocol specification</a>.
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+ at Immutable
+public class ContainsKeyOperation extends AbstractKeyOperation {
+
+ public ContainsKeyOperation(TransportFactory transportFactory, byte[] key, byte[] cacheName, AtomicInteger topologyId, Flag[] flags) {
+ super(transportFactory, key, cacheName, topologyId, flags);
+ }
+
+ @Override
+ protected Object executeOperation(Transport transport) {
+ boolean containsKey = false;
+ short status = sendKeyOperation(key, transport, CONTAINS_KEY_REQUEST, CONTAINS_KEY_RESPONSE);
+ if (status == KEY_DOES_NOT_EXIST_STATUS) {
+ containsKey = false;
+ } else if (status == NO_ERROR_STATUS) {
+ containsKey = true;
+ }
+ return containsKey;
+ }
+}
Added: branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/GetOperation.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/GetOperation.java (rev 0)
+++ branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/GetOperation.java 2010-07-14 16:10:00 UTC (rev 2033)
@@ -0,0 +1,36 @@
+package org.infinispan.client.hotrod.impl.operations;
+
+import net.jcip.annotations.Immutable;
+import org.infinispan.client.hotrod.Flag;
+import org.infinispan.client.hotrod.impl.transport.Transport;
+import org.infinispan.client.hotrod.impl.transport.TransportFactory;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Implements "get" operation as described by <a href="http://community.jboss.org/wiki/HotRodProtocol">Hot Rod protocol specification</a>.
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+ at Immutable
+public class GetOperation extends AbstractKeyOperation {
+
+ public GetOperation(TransportFactory transportFactory, byte[] key, byte[] cacheName, AtomicInteger topologyId, Flag[] flags) {
+ super(transportFactory, key, cacheName, topologyId, flags);
+ }
+
+ @Override
+ public Object executeOperation(Transport transport) {
+ byte[] result = null;
+ short status = sendKeyOperation(key, transport, GET_REQUEST, GET_RESPONSE);
+ if (status == KEY_DOES_NOT_EXIST_STATUS) {
+ result = null;
+ } else {
+ if (status == NO_ERROR_STATUS) {
+ result = transport.readArray();
+ }
+ }
+ return result;
+ }
+}
Added: branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/GetWithVersionOperation.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/GetWithVersionOperation.java (rev 0)
+++ branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/GetWithVersionOperation.java 2010-07-14 16:10:00 UTC (rev 2033)
@@ -0,0 +1,46 @@
+package org.infinispan.client.hotrod.impl.operations;
+
+import net.jcip.annotations.Immutable;
+import org.infinispan.client.hotrod.Flag;
+import org.infinispan.client.hotrod.impl.BinaryVersionedValue;
+import org.infinispan.client.hotrod.impl.transport.Transport;
+import org.infinispan.client.hotrod.impl.transport.TransportFactory;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Corresponds to getWithVersion operation as described by
+ * <a href="http://community.jboss.org/wiki/HotRodProtocol">Hot Rod protocol specification</a>.
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+ at Immutable
+public class GetWithVersionOperation extends AbstractKeyOperation {
+
+ private static Log log = LogFactory.getLog(GetWithVersionOperation.class);
+
+ public GetWithVersionOperation(TransportFactory transportFactory, byte[] key, byte[] cacheName,
+ AtomicInteger topologyId, Flag[] flags) {
+ super(transportFactory, key, cacheName, topologyId, flags);
+ }
+
+ @Override
+ protected Object executeOperation(Transport transport) {
+ short status = sendKeyOperation(key, transport, GET_WITH_VERSION, GET_WITH_VERSION_RESPONSE);
+ Object result = null;
+ if (status == KEY_DOES_NOT_EXIST_STATUS) {
+ result = null;
+ } else if (status == NO_ERROR_STATUS) {
+ long version = transport.readLong();
+ if (log.isTraceEnabled()) {
+ log.trace("Received version: " + version);
+ }
+ byte[] value = transport.readArray();
+ result = new BinaryVersionedValue(version, value);
+ }
+ return result;
+ }
+}
Added: branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/HotRodOperation.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/HotRodOperation.java (rev 0)
+++ branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/HotRodOperation.java 2010-07-14 16:10:00 UTC (rev 2033)
@@ -0,0 +1,181 @@
+package org.infinispan.client.hotrod.impl.operations;
+
+import net.jcip.annotations.Immutable;
+import org.infinispan.client.hotrod.Flag;
+import org.infinispan.client.hotrod.exceptions.HotRodClientException;
+import org.infinispan.client.hotrod.exceptions.HotRodTimeoutException;
+import org.infinispan.client.hotrod.exceptions.InvalidResponseException;
+import org.infinispan.client.hotrod.impl.protocol.HotRodConstants;
+import org.infinispan.client.hotrod.impl.transport.Transport;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+
+import java.net.InetSocketAddress;
+import java.util.LinkedHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Generic hotrod operation. It is aware of {@link org.infinispan.client.hotrod.Flag}s and it is targeted against a
+ * cache name. This base class encapsulates the knowledge of writing and reading a header, as described in the
+ * <a href="http://community.jboss.org/wiki/HotRodProtocol">Hot Rod protocol specification</a>
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+ at Immutable
+public abstract class HotRodOperation implements HotRodConstants {
+
+ static final AtomicLong MSG_ID = new AtomicLong();
+
+ private static Log log = LogFactory.getLog(HotRodOperation.class);
+
+ protected final Flag[] flags;
+
+ protected final byte[] cacheName;
+
+ protected final AtomicInteger topologyId;
+
+ protected HotRodOperation(Flag[] flags, byte[] cacheName, AtomicInteger topologyId) {
+ this.flags = flags;
+ this.cacheName = cacheName;
+ this.topologyId = topologyId;
+ }
+
+ public abstract Object execute();
+
+ protected final long writeHeader(Transport transport, short operationCode) {
+ transport.writeByte(HotRodConstants.REQUEST_MAGIC);
+ long messageId = MSG_ID.incrementAndGet();
+ transport.writeVLong(messageId);
+ transport.writeByte(HotRodConstants.HOTROD_VERSION);
+ transport.writeByte(operationCode);
+ transport.writeArray(cacheName);
+
+ int flagInt = 0;
+ if (flags != null) {
+ for (Flag flag : flags) {
+ flagInt = flag.getFlagInt() | flagInt;
+ }
+ }
+ transport.writeVInt(flagInt);
+ transport.writeByte(CLIENT_INTELLIGENCE_HASH_DISTRIBUTION_AWARE);
+ transport.writeVInt(topologyId.get());
+ if (log.isTraceEnabled()) {
+ log.trace("wrote header for message " + messageId + ". Operation code: " + operationCode + ". Flags: " + Integer.toHexString(flagInt));
+ }
+ return messageId;
+ }
+
+ /**
+ * Magic | Message Id | Op code | Status | Topology Change Marker
+ */
+ protected short readHeaderAndValidate(Transport transport, long messageId, short opRespCode) {
+ short magic = transport.readByte();
+ if (magic != HotRodConstants.RESPONSE_MAGIC) {
+ String message = "Invalid magic number. Expected " + Integer.toHexString(HotRodConstants.RESPONSE_MAGIC) + " and received " + Integer.toHexString(magic);
+ log.error(message);
+ throw new InvalidResponseException(message);
+ }
+ long receivedMessageId = transport.readVLong();
+ if (receivedMessageId != messageId) {
+ String message = "Invalid message id. Expected " + Long.toHexString(messageId) + " and received " + Long.toHexString(receivedMessageId);
+ log.error(message);
+ throw new InvalidResponseException(message);
+ }
+ if (log.isTraceEnabled()) {
+ log.trace("Received response for message id: " + receivedMessageId);
+ }
+ short receivedOpCode = transport.readByte();
+ if (receivedOpCode != opRespCode) {
+ if (receivedOpCode == HotRodConstants.ERROR_RESPONSE) {
+ checkForErrorsInResponseStatus(transport.readByte(), messageId, transport);
+ throw new IllegalStateException("Error expected! (i.e. exception in the prev statement)");
+ }
+ throw new InvalidResponseException("Invalid response operation. Expected " + Integer.toHexString(opRespCode) + " and received " + Integer.toHexString(receivedOpCode));
+ }
+ if (log.isTraceEnabled()) {
+ log.trace("Received operation code is: " + receivedOpCode);
+ }
+ short status = transport.readByte();
+ checkForErrorsInResponseStatus(status, messageId, transport);
+ short topologyChangeByte = transport.readByte();
+ if (topologyChangeByte == 1) {
+ readNewTopologyAndHash(transport, topologyId);
+ }
+ return status;
+ }
+
+ protected void checkForErrorsInResponseStatus(short status, long messageId, Transport transport) {
+ if (log.isTraceEnabled()) {
+ log.trace("Received operation status: " + status);
+ }
+ switch ((int) status) {
+ case HotRodConstants.INVALID_MAGIC_OR_MESSAGE_ID_STATUS:
+ case HotRodConstants.REQUEST_PARSING_ERROR_STATUS:
+ case HotRodConstants.UNKNOWN_COMMAND_STATUS:
+ case HotRodConstants.SERVER_ERROR_STATUS:
+ case HotRodConstants.UNKNOWN_VERSION_STATUS: {
+ String msgFromServer = transport.readString();
+ if (log.isWarnEnabled()) {
+ log.warn("Error status received from the server:" + msgFromServer + " for message id " + messageId);
+ }
+ throw new HotRodClientException(msgFromServer, messageId, status);
+ }
+ case HotRodConstants.COMMAND_TIMEOUT_STATUS: {
+ if (log.isTraceEnabled()) {
+ log.trace("timeout message received from the server");
+ }
+ throw new HotRodTimeoutException();
+ }
+ case HotRodConstants.NO_ERROR_STATUS:
+ case HotRodConstants.KEY_DOES_NOT_EXIST_STATUS:
+ case HotRodConstants.NOT_PUT_REMOVED_REPLACED_STATUS: {
+ //don't do anything, these are correct responses
+ break;
+ }
+ default: {
+ throw new IllegalStateException("Unknown status: " + Integer.toHexString(status));
+ }
+ }
+ }
+
+ private void readNewTopologyAndHash(Transport transport, AtomicInteger topologyId) {
+ int newTopologyId = transport.readVInt();
+ topologyId.set(newTopologyId);
+ int numKeyOwners = transport.readUnsignedShort();
+ short hashFunctionVersion = transport.readByte();
+ int hashSpace = transport.readVInt();
+ int clusterSize = transport.readVInt();
+
+ if (log.isTraceEnabled()) {
+ log.trace("Topology change request: newTopologyId=" + newTopologyId + ", numKeyOwners=" + numKeyOwners +
+ ", hashFunctionVersion=" + hashFunctionVersion + ", hashSpaceSize=" + hashSpace + ", clusterSize=" + clusterSize);
+ }
+
+ LinkedHashMap<InetSocketAddress, Integer> servers2HashCode = new LinkedHashMap<InetSocketAddress, Integer>();
+
+ for (int i = 0; i < clusterSize; i++) {
+ String host = transport.readString();
+ int port = transport.readUnsignedShort();
+ if (log.isTraceEnabled()) {
+ log.trace("Server read:" + host + ":" + port);
+ }
+ int hashCode = transport.read4ByteInt();
+ servers2HashCode.put(new InetSocketAddress(host, port), hashCode);
+ if (log.isTraceEnabled()) {
+ log.trace("Hash code is: " + hashCode);
+ }
+ }
+ if (log.isInfoEnabled()) {
+ log.info("New topology: " + servers2HashCode);
+ }
+ transport.getTransportFactory().updateServers(servers2HashCode.keySet());
+ if (hashFunctionVersion == 0) {
+ if (log.isTraceEnabled())
+ log.trace("Not using a consistent hash function (hash function version == 0).");
+ } else {
+ transport.getTransportFactory().updateHashFunction(servers2HashCode, numKeyOwners, hashFunctionVersion, hashSpace);
+ }
+ }
+}
Added: branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/OperationsFactory.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/OperationsFactory.java (rev 0)
+++ branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/OperationsFactory.java 2010-07-14 16:10:00 UTC (rev 2033)
@@ -0,0 +1,95 @@
+package org.infinispan.client.hotrod.impl.operations;
+
+import net.jcip.annotations.Immutable;
+import org.infinispan.client.hotrod.Flag;
+import org.infinispan.client.hotrod.impl.protocol.HotRodConstants;
+import org.infinispan.client.hotrod.impl.transport.TransportFactory;
+import org.infinispan.manager.CacheContainer;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Factory for {@link org.infinispan.client.hotrod.impl.operations.HotRodOperation} objects.
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+ at Immutable
+public class OperationsFactory implements HotRodConstants {
+
+ private static final Flag[] FORCE_RETURN_VALUE = {Flag.FORCE_RETURN_VALUE};
+
+ private final ThreadLocal<Flag[]> flagsMap = new ThreadLocal<Flag[]>();
+
+ private final TransportFactory transportFactory;
+
+ private final byte[] cacheNameBytes;
+
+ private final AtomicInteger topologyId;
+
+ private final boolean forceReturnValue;
+
+ public OperationsFactory(TransportFactory transportFactory, String cacheName, AtomicInteger topologyId, boolean forceReturnValue) {
+ this.transportFactory = transportFactory;
+ this.cacheNameBytes = cacheName.equals(CacheContainer.DEFAULT_CACHE_NAME) ? DEFAULT_CACHE_NAME_BYTES : cacheName.getBytes(HOTROD_STRING_CHARSET);
+ this.topologyId = topologyId;
+ this.forceReturnValue = forceReturnValue;
+ }
+
+ public GetOperation newGetKeyOperation(byte[] key) {
+ return new GetOperation(transportFactory, key, cacheNameBytes, topologyId, flags());
+ }
+
+ public RemoveOperation newRemoveOperation(byte[] key) {
+ return new RemoveOperation(transportFactory, key, cacheNameBytes, topologyId, flags());
+ }
+
+ public RemoveIfUnmodifiedOperation newRemoveIfUnmodifiedOperation(byte[] key, long version) {
+ return new RemoveIfUnmodifiedOperation(transportFactory, key, cacheNameBytes, topologyId, flags(), version);
+ }
+
+ public ReplaceIfUnmodifiedOperation newReplaceIfUnmodifiedOperation(byte[] key, byte[] value, int lifespanSeconds, int maxIdleTimeSeconds, long version) {
+ return new ReplaceIfUnmodifiedOperation(transportFactory, key, cacheNameBytes, topologyId, flags(), value, lifespanSeconds, maxIdleTimeSeconds, version);
+ }
+
+ public GetWithVersionOperation newGetWithVersionOperation(byte[] key) {
+ return new GetWithVersionOperation(transportFactory, key, cacheNameBytes, topologyId, flags());
+ }
+
+ public StatsOperation newStatsOperation() {
+ return new StatsOperation(transportFactory, cacheNameBytes, topologyId, flags());
+ }
+
+ public PutOperation newPutKeyValueOperation(byte[] key, byte[] value, int lifespanSecs, int maxIdleSecs) {
+ return new PutOperation(transportFactory, key, cacheNameBytes, topologyId, flags(), value, lifespanSecs, maxIdleSecs);
+ }
+
+ public PutIfAbsentOperation newPutIfAbsentOperation(byte[] key, byte[] value, int lifespanSecs, int maxIdleSecs) {
+ return new PutIfAbsentOperation(transportFactory, key, cacheNameBytes, topologyId, flags(), value, lifespanSecs, maxIdleSecs);
+ }
+
+ public ReplaceOperation newReplaceOperation(byte[] key, byte[] values, int lifespanSecs, int maxIdleSecs) {
+ return new ReplaceOperation(transportFactory, key, cacheNameBytes, topologyId, flags(), values, lifespanSecs, maxIdleSecs);
+ }
+
+ public ContainsKeyOperation newContainsKeyOperation(byte[] key) {
+ return new ContainsKeyOperation(transportFactory, key, cacheNameBytes, topologyId, flags());
+ }
+
+ public ClearOperation newClearOperation() {
+ return new ClearOperation(transportFactory, cacheNameBytes, topologyId, flags());
+ }
+
+ private Flag[] flags() {
+ Flag[] flags = this.flagsMap.get();
+ this.flagsMap.remove();
+ if (flags == null && forceReturnValue) {
+ return FORCE_RETURN_VALUE;
+ }
+ return flags;
+ }
+
+ public void setFlags(Flag[] flags) {
+ this.flagsMap.set(flags);
+ }
+}
Added: branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/PingOperation.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/PingOperation.java (rev 0)
+++ branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/PingOperation.java 2010-07-14 16:10:00 UTC (rev 2033)
@@ -0,0 +1,52 @@
+package org.infinispan.client.hotrod.impl.operations;
+
+import net.jcip.annotations.Immutable;
+import org.infinispan.client.hotrod.Flag;
+import org.infinispan.client.hotrod.impl.protocol.HotRodConstants;
+import org.infinispan.client.hotrod.impl.transport.Transport;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Corresponds to the "ping" operation as defined in <a href="http://community.jboss.org/wiki/HotRodProtocol">Hot Rod protocol specification</a>.
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+ at Immutable
+public class PingOperation extends HotRodOperation {
+
+ private static Log log = LogFactory.getLog(PingOperation.class);
+
+ private final Transport transport;
+
+ public PingOperation(Flag[] flags, AtomicInteger topologyId, Transport transport) {
+ super(flags, DEFAULT_CACHE_NAME_BYTES, topologyId);
+ this.transport = transport;
+ }
+
+ @Override
+ public Object execute() {
+ boolean success;
+ try {
+ long messageId = writeHeader(transport, HotRodConstants.PING_REQUEST);
+ short respStatus = readHeaderAndValidate(transport, messageId, HotRodConstants.PING_RESPONSE);
+ if (respStatus == HotRodConstants.NO_ERROR_STATUS) {
+ if (log.isTraceEnabled())
+ log.trace("Successfully validated transport: " + transport);
+ success = true;
+ } else {
+ if (log.isTraceEnabled())
+ log.trace("Unknown response status: " + respStatus);
+ success = false;
+ }
+ } catch (Exception e) {
+ if (log.isTraceEnabled())
+ log.trace("Failed to validate transport: " + transport, e);
+ success = false;
+ }
+ return success;
+ }
+}
Added: branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/PutIfAbsentOperation.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/PutIfAbsentOperation.java (rev 0)
+++ branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/PutIfAbsentOperation.java 2010-07-14 16:10:00 UTC (rev 2033)
@@ -0,0 +1,42 @@
+package org.infinispan.client.hotrod.impl.operations;
+
+import net.jcip.annotations.Immutable;
+import org.infinispan.client.hotrod.Flag;
+import org.infinispan.client.hotrod.impl.transport.Transport;
+import org.infinispan.client.hotrod.impl.transport.TransportFactory;
+import org.infinispan.util.Util;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Implements "putIfAbsent" operation as described in <a href="http://community.jboss.org/wiki/HotRodProtocol">Hot Rod
+ * protocol specification</a>.
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+ at Immutable
+public class PutIfAbsentOperation extends AbstractKeyValueOperation {
+
+ private static Log log = LogFactory.getLog(PutIfAbsentOperation.class);
+
+ public PutIfAbsentOperation(TransportFactory transportFactory, byte[] key, byte[] cacheName, AtomicInteger topologyId,
+ Flag[] flags, byte[] value, int lifespan, int maxIdle) {
+ super(transportFactory, key, cacheName, topologyId, flags, value, lifespan, maxIdle);
+ }
+
+ @Override
+ protected Object executeOperation(Transport transport) {
+ short status = sendPutOperation(transport, PUT_IF_ABSENT_REQUEST, PUT_IF_ABSENT_RESPONSE);
+ byte[] previousValue = null;
+ if (status == NO_ERROR_STATUS || status == NOT_PUT_REMOVED_REPLACED_STATUS) {
+ previousValue = returnPossiblePrevValue(transport);
+ if (log.isTraceEnabled()) {
+ log.trace("Returning from putIfAbsent: " + Util.printArray(previousValue, false));
+ }
+ }
+ return previousValue;
+ }
+}
Added: branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/PutOperation.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/PutOperation.java (rev 0)
+++ branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/PutOperation.java 2010-07-14 16:10:00 UTC (rev 2033)
@@ -0,0 +1,33 @@
+package org.infinispan.client.hotrod.impl.operations;
+
+import org.infinispan.client.hotrod.Flag;
+import org.infinispan.client.hotrod.exceptions.InvalidResponseException;
+import org.infinispan.client.hotrod.impl.transport.Transport;
+import org.infinispan.client.hotrod.impl.transport.TransportFactory;
+import org.jgroups.annotations.Immutable;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Implements "put" as defined by <a href="http://community.jboss.org/wiki/HotRodProtocol">Hot Rod protocol specification</a>.
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+ at Immutable
+public class PutOperation extends AbstractKeyValueOperation {
+
+ public PutOperation(TransportFactory transportFactory, byte[] key, byte[] cacheName, AtomicInteger topologyId,
+ Flag[] flags, byte[] value, int lifespan, int maxIdle) {
+ super(transportFactory, key, cacheName, topologyId, flags, value, lifespan, maxIdle);
+ }
+
+ @Override
+ protected Object executeOperation(Transport transport) {
+ short status = sendPutOperation(transport, PUT_REQUEST, PUT_RESPONSE);
+ if (status != NO_ERROR_STATUS) {
+ throw new InvalidResponseException("Unexpected response status: " + Integer.toHexString(status));
+ }
+ return returnPossiblePrevValue(transport);
+ }
+}
Added: branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/RemoveIfUnmodifiedOperation.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/RemoveIfUnmodifiedOperation.java (rev 0)
+++ branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/RemoveIfUnmodifiedOperation.java 2010-07-14 16:10:00 UTC (rev 2033)
@@ -0,0 +1,40 @@
+package org.infinispan.client.hotrod.impl.operations;
+
+import net.jcip.annotations.Immutable;
+import org.infinispan.client.hotrod.Flag;
+import org.infinispan.client.hotrod.impl.VersionedOperationResponse;
+import org.infinispan.client.hotrod.impl.transport.Transport;
+import org.infinispan.client.hotrod.impl.transport.TransportFactory;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Implements "removeIfUnmodified" operation as defined by
+ * <a href="http://community.jboss.org/wiki/HotRodProtocol">Hot Rod protocol specification</a>.
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+ at Immutable
+public class RemoveIfUnmodifiedOperation extends AbstractKeyOperation {
+
+ private final long version;
+
+ public RemoveIfUnmodifiedOperation(TransportFactory transportFactory, byte[] key, byte[] cacheName, AtomicInteger topologyId, Flag[] flags, long version) {
+ super(transportFactory, key, cacheName, topologyId, flags);
+ this.version = version;
+ }
+
+ @Override
+ protected Object executeOperation(Transport transport) {
+ // 1) write header
+ long messageId = writeHeader(transport, REMOVE_IF_UNMODIFIED_REQUEST);
+
+ //2) write message body
+ transport.writeArray(key);
+ transport.writeLong(version);
+
+ //process response and return
+ return returnVersionedOperationResponse(transport, messageId, REMOVE_IF_UNMODIFIED_RESPONSE);
+ }
+}
Added: branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/RemoveOperation.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/RemoveOperation.java (rev 0)
+++ branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/RemoveOperation.java 2010-07-14 16:10:00 UTC (rev 2033)
@@ -0,0 +1,35 @@
+package org.infinispan.client.hotrod.impl.operations;
+
+import net.jcip.annotations.Immutable;
+import org.infinispan.client.hotrod.Flag;
+import org.infinispan.client.hotrod.impl.transport.Transport;
+import org.infinispan.client.hotrod.impl.transport.TransportFactory;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Implement "remove" operation as described in <a href="http://community.jboss.org/wiki/HotRodProtocol">Hot Rod
+ * protocol specification</a>.
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+ at Immutable
+public class RemoveOperation extends AbstractKeyOperation {
+
+ public RemoveOperation(TransportFactory transportFactory, byte[] key, byte[] cacheName, AtomicInteger topologyId, Flag[] flags) {
+ super(transportFactory, key, cacheName, topologyId, flags);
+ }
+
+ @Override
+ public Object executeOperation(Transport transport) {
+ byte[] result = null;
+ short status = sendKeyOperation(key, transport, REMOVE_REQUEST, REMOVE_RESPONSE);
+ if (status == KEY_DOES_NOT_EXIST_STATUS) {
+ result = null;
+ } else if (status == NO_ERROR_STATUS) {
+ result = returnPossiblePrevValue(transport);
+ }
+ return result;
+ }
+}
Added: branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/ReplaceIfUnmodifiedOperation.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/ReplaceIfUnmodifiedOperation.java (rev 0)
+++ branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/ReplaceIfUnmodifiedOperation.java 2010-07-14 16:10:00 UTC (rev 2033)
@@ -0,0 +1,41 @@
+package org.infinispan.client.hotrod.impl.operations;
+
+import org.infinispan.client.hotrod.Flag;
+import org.infinispan.client.hotrod.impl.VersionedOperationResponse;
+import org.infinispan.client.hotrod.impl.transport.Transport;
+import org.infinispan.client.hotrod.impl.transport.TransportFactory;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Implement "replaceIfUnmodified" as defined by <a href="http://community.jboss.org/wiki/HotRodProtocol">Hot Rod
+ * protocol specification</a>.
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public class ReplaceIfUnmodifiedOperation extends AbstractKeyValueOperation {
+
+ private final long version;
+
+ public ReplaceIfUnmodifiedOperation(TransportFactory transportFactory, byte[] key, byte[] cacheName,
+ AtomicInteger topologyId, Flag[] flags, byte[] value, int lifespan,
+ int maxIdle, long version) {
+ super(transportFactory, key, cacheName, topologyId, flags, value, lifespan, maxIdle);
+ this.version = version;
+ }
+
+ @Override
+ protected Object executeOperation(Transport transport) {
+ // 1) write header
+ long messageId = writeHeader(transport, REPLACE_IF_UNMODIFIED_REQUEST);
+
+ //2) write message body
+ transport.writeArray(key);
+ transport.writeVInt(lifespan);
+ transport.writeVInt(maxIdle);
+ transport.writeLong(version);
+ transport.writeArray(value);
+ return returnVersionedOperationResponse(transport, messageId, REPLACE_IF_UNMODIFIED_RESPONSE);
+ }
+}
Added: branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/ReplaceOperation.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/ReplaceOperation.java (rev 0)
+++ branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/ReplaceOperation.java 2010-07-14 16:10:00 UTC (rev 2033)
@@ -0,0 +1,34 @@
+package org.infinispan.client.hotrod.impl.operations;
+
+import net.jcip.annotations.Immutable;
+import org.infinispan.client.hotrod.Flag;
+import org.infinispan.client.hotrod.impl.transport.Transport;
+import org.infinispan.client.hotrod.impl.transport.TransportFactory;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Implements "Replace" operation as defined by <a href="http://community.jboss.org/wiki/HotRodProtocol">Hot Rod
+ * protocol specification</a>.
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+ at Immutable
+public class ReplaceOperation extends AbstractKeyValueOperation {
+
+ public ReplaceOperation(TransportFactory transportFactory, byte[] key, byte[] cacheName, AtomicInteger topologyId,
+ Flag[] flags, byte[] value, int lifespan, int maxIdle) {
+ super(transportFactory, key, cacheName, topologyId, flags, value, lifespan, maxIdle);
+ }
+
+ @Override
+ protected Object executeOperation(Transport transport) {
+ byte[] result = null;
+ short status = sendPutOperation(transport, REPLACE_REQUEST, REPLACE_RESPONSE);
+ if (status == NO_ERROR_STATUS || status == NOT_PUT_REMOVED_REPLACED_STATUS) {
+ result = returnPossiblePrevValue(transport);
+ }
+ return result;
+ }
+}
Added: branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/RetryOnFailureOperation.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/RetryOnFailureOperation.java (rev 0)
+++ branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/RetryOnFailureOperation.java 2010-07-14 16:10:00 UTC (rev 2033)
@@ -0,0 +1,74 @@
+package org.infinispan.client.hotrod.impl.operations;
+
+import net.jcip.annotations.Immutable;
+import org.infinispan.client.hotrod.Flag;
+import org.infinispan.client.hotrod.exceptions.TransportException;
+import org.infinispan.client.hotrod.impl.transport.Transport;
+import org.infinispan.client.hotrod.impl.transport.TransportFactory;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Base class for all the operations that need retry logic: if the operation fails due to connection problems, try with
+ * another available connection.
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+ at Immutable
+public abstract class RetryOnFailureOperation extends HotRodOperation {
+
+ private static Log log = LogFactory.getLog(RetryOnFailureOperation.class);
+
+ protected final TransportFactory transportFactory;
+
+ protected RetryOnFailureOperation(TransportFactory transportFactory, byte[] cacheName, AtomicInteger topologyId, Flag[] flags) {
+ super(flags, cacheName, topologyId);
+ this.transportFactory = transportFactory;
+ }
+
+ @Override
+ public Object execute() {
+ int retryCount = 0;
+ Transport transport = getTransport(retryCount);
+ do {
+ try {
+ return executeOperation(transport);
+ } catch (TransportException te) {
+ logErrorAndThrowExceptionIfNeeded(retryCount, te);
+ } finally {
+ releaseTransport(transport);
+ }
+ if (shouldRetry(retryCount)) {
+ transport = getTransport(retryCount);
+ }
+ retryCount++;
+ } while (shouldRetry(retryCount));
+ throw new IllegalStateException("We should not reach here!");
+ }
+
+ protected boolean shouldRetry(int retryCount) {
+ return retryCount < transportFactory.getTransportCount();
+ }
+
+ protected void logErrorAndThrowExceptionIfNeeded(int i, TransportException te) {
+ String message = "Transport exception. Retry " + i + " out of " + transportFactory.getTransportCount();
+ if (i == transportFactory.getTransportCount() - 1 || transportFactory.getTransportCount() < 0) {
+ log.warn(message, te);
+ throw te;
+ } else {
+ log.trace(message + ":" + te);
+ }
+ }
+
+ protected void releaseTransport(Transport transport) {
+ if (transport != null)
+ transportFactory.releaseTransport(transport);
+ }
+
+ protected abstract Transport getTransport(int retryCount);
+
+ protected abstract Object executeOperation(Transport transport);
+}
Added: branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/StatsOperation.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/StatsOperation.java (rev 0)
+++ branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/StatsOperation.java 2010-07-14 16:10:00 UTC (rev 2033)
@@ -0,0 +1,46 @@
+package org.infinispan.client.hotrod.impl.operations;
+
+import net.jcip.annotations.Immutable;
+import org.infinispan.client.hotrod.Flag;
+import org.infinispan.client.hotrod.impl.transport.Transport;
+import org.infinispan.client.hotrod.impl.transport.TransportFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Implements to the stats operation as defined by <a href="http://community.jboss.org/wiki/HotRodProtocol">Hot Rod protocol specification</a>.
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+ at Immutable
+public class StatsOperation extends RetryOnFailureOperation {
+
+ public StatsOperation(TransportFactory transportFactory, byte[] cacheName, AtomicInteger topologyId, Flag[] flags) {
+ super(transportFactory, cacheName, topologyId, flags);
+ }
+
+ @Override
+ protected Transport getTransport(int retryCount) {
+ return transportFactory.getTransport();
+ }
+
+ @Override
+ protected Object executeOperation(Transport transport) {
+ Map<String, String> result;
+ // 1) write header
+ long messageId = writeHeader(transport, STATS_REQUEST);
+ readHeaderAndValidate(transport, messageId, STATS_RESPONSE);
+ int nrOfStats = transport.readVInt();
+
+ result = new HashMap<String, String>();
+ for (int i = 0; i < nrOfStats; i++) {
+ String statName = transport.readString();
+ String statValue = transport.readString();
+ result.put(statName, statValue);
+ }
+ return result;
+ }
+}
Modified: branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodConstants.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodConstants.java 2010-07-14 10:28:12 UTC (rev 2032)
+++ branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodConstants.java 2010-07-14 16:10:00 UTC (rev 2033)
@@ -10,55 +10,57 @@
*/
public interface HotRodConstants {
- public static final short REQUEST_MAGIC = 0xA0;
- public static final short RESPONSE_MAGIC = 0xA1;
+ static final short REQUEST_MAGIC = 0xA0;
+ static final short RESPONSE_MAGIC = 0xA1;
- public static final byte HOTROD_VERSION = 10;
+ static final byte HOTROD_VERSION = 10;
//requests
- public static final byte PUT_REQUEST = 0x01;
- public static final byte GET_REQUEST = 0x03;
- public static final byte PUT_IF_ABSENT_REQUEST = 0x05;
- public static final byte REPLACE_REQUEST = 0x07;
- public static final byte REPLACE_IF_UNMODIFIED_REQUEST = 0x09;
- public static final byte REMOVE_REQUEST = 0x0B;
- public static final byte REMOVE_IF_UNMODIFIED_REQUEST = 0x0D;
- public static final byte CONTAINS_KEY_REQUEST = 0x0F;
- public static final byte GET_WITH_VERSION = 0x11;
- public static final byte CLEAR_REQUEST = 0x13;
- public static final byte STATS_REQUEST = 0x15;
- public static final byte PING_REQUEST = 0x17;
+ static final byte PUT_REQUEST = 0x01;
+ static final byte GET_REQUEST = 0x03;
+ static final byte PUT_IF_ABSENT_REQUEST = 0x05;
+ static final byte REPLACE_REQUEST = 0x07;
+ static final byte REPLACE_IF_UNMODIFIED_REQUEST = 0x09;
+ static final byte REMOVE_REQUEST = 0x0B;
+ static final byte REMOVE_IF_UNMODIFIED_REQUEST = 0x0D;
+ static final byte CONTAINS_KEY_REQUEST = 0x0F;
+ static final byte GET_WITH_VERSION = 0x11;
+ static final byte CLEAR_REQUEST = 0x13;
+ static final byte STATS_REQUEST = 0x15;
+ static final byte PING_REQUEST = 0x17;
//responses
- public static final byte PUT_RESPONSE = 0x02;
- public static final byte GET_RESPONSE = 0x04;
- public static final byte PUT_IF_ABSENT_RESPONSE = 0x06;
- public static final byte REPLACE_RESPONSE = 0x08;
- public static final byte REPLACE_IF_UNMODIFIED_RESPONSE = 0x0A;
- public static final byte REMOVE_RESPONSE = 0x0C;
- public static final byte REMOVE_IF_UNMODIFIED_RESPONSE = 0x0E;
- public static final byte CONTAINS_KEY_RESPONSE = 0x10;
- public static final byte GET_WITH_VERSION_RESPONSE = 0x12;
- public static final byte CLEAR_RESPONSE = 0x14;
- public static final byte STATS_RESPONSE = 0x16;
- public static final byte PING_RESPONSE = 0x18;
- public static final byte ERROR_RESPONSE = 0x50;
+ static final byte PUT_RESPONSE = 0x02;
+ static final byte GET_RESPONSE = 0x04;
+ static final byte PUT_IF_ABSENT_RESPONSE = 0x06;
+ static final byte REPLACE_RESPONSE = 0x08;
+ static final byte REPLACE_IF_UNMODIFIED_RESPONSE = 0x0A;
+ static final byte REMOVE_RESPONSE = 0x0C;
+ static final byte REMOVE_IF_UNMODIFIED_RESPONSE = 0x0E;
+ static final byte CONTAINS_KEY_RESPONSE = 0x10;
+ static final byte GET_WITH_VERSION_RESPONSE = 0x12;
+ static final byte CLEAR_RESPONSE = 0x14;
+ static final byte STATS_RESPONSE = 0x16;
+ static final byte PING_RESPONSE = 0x18;
+ static final byte ERROR_RESPONSE = 0x50;
//response status
- public static final byte NO_ERROR_STATUS = 0x00;
- public static final int INVALID_MAGIC_OR_MESSAGE_ID_STATUS = 0x81;
- public static final int REQUEST_PARSING_ERROR_STATUS = 0x84;
- public static final byte NOT_PUT_REMOVED_REPLACED_STATUS = 0x01;
- public static final int UNKNOWN_COMMAND_STATUS = 0x82;
- public static final int SERVER_ERROR_STATUS = 0x85;
- public static final int KEY_DOES_NOT_EXIST_STATUS = 0x02;
- public static final int UNKNOWN_VERSION_STATUS = 0x83;
- public static final int COMMAND_TIMEOUT_STATUS = 0x86;
+ static final byte NO_ERROR_STATUS = 0x00;
+ static final int INVALID_MAGIC_OR_MESSAGE_ID_STATUS = 0x81;
+ static final int REQUEST_PARSING_ERROR_STATUS = 0x84;
+ static final byte NOT_PUT_REMOVED_REPLACED_STATUS = 0x01;
+ static final int UNKNOWN_COMMAND_STATUS = 0x82;
+ static final int SERVER_ERROR_STATUS = 0x85;
+ static final int KEY_DOES_NOT_EXIST_STATUS = 0x02;
+ static final int UNKNOWN_VERSION_STATUS = 0x83;
+ static final int COMMAND_TIMEOUT_STATUS = 0x86;
- public static final byte CLIENT_INTELLIGENCE_BASIC = 0x01;
- public static final byte CLIENT_INTELLIGENCE_TOPOLOGY_AWARE = 0x02;
- public static final byte CLIENT_INTELLIGENCE_HASH_DISTRIBUTION_AWARE = 0x03;
- Charset STRING_CHARSET = Charset.forName("UTF-8");
+ static final byte CLIENT_INTELLIGENCE_BASIC = 0x01;
+ static final byte CLIENT_INTELLIGENCE_TOPOLOGY_AWARE = 0x02;
+ static final byte CLIENT_INTELLIGENCE_HASH_DISTRIBUTION_AWARE = 0x03;
+ Charset HOTROD_STRING_CHARSET = Charset.forName("UTF-8");
+
+ static final byte[] DEFAULT_CACHE_NAME_BYTES = new byte[]{};
}
Deleted: branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodOperationsHelper.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodOperationsHelper.java 2010-07-14 10:28:12 UTC (rev 2032)
+++ branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodOperationsHelper.java 2010-07-14 16:10:00 UTC (rev 2033)
@@ -1,183 +0,0 @@
-package org.infinispan.client.hotrod.impl.protocol;
-
-import org.infinispan.client.hotrod.Flag;
-import org.infinispan.client.hotrod.exceptions.HotRodClientException;
-import org.infinispan.client.hotrod.exceptions.HotRodTimeoutException;
-import org.infinispan.client.hotrod.exceptions.InvalidResponseException;
-import org.infinispan.client.hotrod.impl.transport.Transport;
-import org.infinispan.util.logging.Log;
-import org.infinispan.util.logging.LogFactory;
-
-import java.net.InetSocketAddress;
-import java.util.LinkedHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * Helper class for factorizing common parts of read/write operations.
- *
- * @author Mircea.Markus at jboss.com
- * @since 4.1
- */
-public class HotRodOperationsHelper {
- static Log log = LogFactory.getLog(HotRodOperationsHelper.class);
- static final AtomicLong MSG_ID = new AtomicLong();
- final static byte CLIENT_INTELLIGENCE = HotRodConstants.CLIENT_INTELLIGENCE_HASH_DISTRIBUTION_AWARE;
-
- public static final byte[] DEFAULT_CACHE_NAME_BYTES = new byte[]{};
-
- public static long writeHeader(Transport transport, short operationCode, byte[] cacheName, AtomicInteger topologyId, Flag... flags) {
- transport.writeByte(HotRodConstants.REQUEST_MAGIC);
- long messageId = MSG_ID.incrementAndGet();
- transport.writeVLong(messageId);
- transport.writeByte(HotRodConstants.HOTROD_VERSION);
- transport.writeByte(operationCode);
- transport.writeArray(cacheName);
-
- int flagInt = 0;
- if (flags != null) {
- for (Flag flag : flags) {
- flagInt = flag.getFlagInt() | flagInt;
- }
- }
- transport.writeVInt(flagInt);
- transport.writeByte(CLIENT_INTELLIGENCE);
- transport.writeVInt(topologyId.get());
- if (log.isTraceEnabled()) {
- log.trace("wrote header for message " + messageId + ". Operation code: " + operationCode + ". Flags: " + Integer.toHexString(flagInt));
- }
- return messageId;
- }
-
- /**
- * Magic | Message Id | Op code | Status | Topology Change Marker
- */
- public static short readHeaderAndValidate(Transport transport, long messageId, short opRespCode, AtomicInteger topologyId) {
- short magic = transport.readByte();
- if (magic != HotRodConstants.RESPONSE_MAGIC) {
- String message = "Invalid magic number. Expected " + Integer.toHexString(HotRodConstants.RESPONSE_MAGIC) + " and received " + Integer.toHexString(magic);
- log.error(message);
- throw new InvalidResponseException(message);
- }
- long receivedMessageId = transport.readVLong();
- if (receivedMessageId != messageId) {
- String message = "Invalid message id. Expected " + Long.toHexString(messageId) + " and received " + Long.toHexString(receivedMessageId);
- log.error(message);
- throw new InvalidResponseException(message);
- }
- if (log.isTraceEnabled()) {
- log.trace("Received response for message id: " + receivedMessageId);
- }
- short receivedOpCode = transport.readByte();
- if (receivedOpCode != opRespCode) {
- if (receivedOpCode == HotRodConstants.ERROR_RESPONSE) {
- checkForErrorsInResponseStatus(transport.readByte(), messageId, transport);
- throw new IllegalStateException("Error expected! (i.e. exception in the prev statement)");
- }
- throw new InvalidResponseException("Invalid response operation. Expected " + Integer.toHexString(opRespCode) + " and received " + Integer.toHexString(receivedOpCode));
- }
- if (log.isTraceEnabled()) {
- log.trace("Received operation code is: " + receivedOpCode);
- }
- short status = transport.readByte();
- checkForErrorsInResponseStatus(status, messageId, transport);
- short topologyChangeByte = transport.readByte();
- if (topologyChangeByte == 1) {
- readNewTopologyAndHash(transport, topologyId);
- }
- return status;
- }
-
- static void readNewTopologyAndHash(Transport transport, AtomicInteger topologyId) {
- int newTopologyId = transport.readVInt();
- topologyId.set(newTopologyId);
- int numKeyOwners = transport.readUnsignedShort();
- short hashFunctionVersion = transport.readByte();
- int hashSpace = transport.readVInt();
- int clusterSize = transport.readVInt();
-
- if (log.isTraceEnabled()) {
- log.trace("Topology change request: newTopologyId=" + newTopologyId + ", numKeyOwners=" + numKeyOwners +
- ", hashFunctionVersion=" + hashFunctionVersion + ", hashSpaceSize=" + hashSpace + ", clusterSize=" + clusterSize);
- }
-
- LinkedHashMap<InetSocketAddress, Integer> servers2HashCode = new LinkedHashMap<InetSocketAddress, Integer>();
-
- for (int i = 0; i < clusterSize; i++) {
- String host = transport.readString();
- int port = transport.readUnsignedShort();
- if (log.isTraceEnabled()) {
- log.trace("Server read:" + host + ":" + port);
- }
- int hashCode = transport.read4ByteInt();
- servers2HashCode.put(new InetSocketAddress(host, port), hashCode);
- if (log.isTraceEnabled()) {
- log.trace("Hash code is: " + hashCode);
- }
- }
- if (log.isInfoEnabled()) {
- log.info("New topology: " + servers2HashCode);
- }
- transport.getTransportFactory().updateServers(servers2HashCode.keySet());
- if (hashFunctionVersion == 0) {
- if (log.isTraceEnabled())
- log.trace("Not using a consistent hash function (hash function version == 0).");
- } else {
- transport.getTransportFactory().updateHashFunction(servers2HashCode, numKeyOwners, hashFunctionVersion, hashSpace);
- }
- }
-
- static void checkForErrorsInResponseStatus(short status, long messageId, Transport transport) {
- if (log.isTraceEnabled()) {
- log.trace("Received operation status: " + status);
- }
- switch ((int) status) {
- case HotRodConstants.INVALID_MAGIC_OR_MESSAGE_ID_STATUS:
- case HotRodConstants.REQUEST_PARSING_ERROR_STATUS:
- case HotRodConstants.UNKNOWN_COMMAND_STATUS:
- case HotRodConstants.SERVER_ERROR_STATUS:
- case HotRodConstants.UNKNOWN_VERSION_STATUS: {
- String msgFromServer = transport.readString();
- if (log.isWarnEnabled()) {
- log.warn("Error status received from the server:" + msgFromServer + " for message id " + messageId);
- }
- throw new HotRodClientException(msgFromServer, messageId, status);
- }
- case HotRodConstants.COMMAND_TIMEOUT_STATUS: {
- if (log.isTraceEnabled()) {
- log.trace("timeout message received from the server");
- }
- throw new HotRodTimeoutException();
- }
- case HotRodConstants.NO_ERROR_STATUS:
- case HotRodConstants.KEY_DOES_NOT_EXIST_STATUS:
- case HotRodConstants.NOT_PUT_REMOVED_REPLACED_STATUS: {
- //don't do anything, these are correct responses
- break;
- }
- default: {
- throw new IllegalStateException("Unknown status: " + Integer.toHexString(status));
- }
- }
- }
-
- public static boolean ping(Transport transport, AtomicInteger topologyId) {
- try {
- long messageId = HotRodOperationsHelper.writeHeader(transport, HotRodConstants.PING_REQUEST, DEFAULT_CACHE_NAME_BYTES, topologyId);
- short respStatus = HotRodOperationsHelper.readHeaderAndValidate(transport, messageId, HotRodConstants.PING_RESPONSE, topologyId);
- if (respStatus == HotRodConstants.NO_ERROR_STATUS) {
- if (log.isTraceEnabled())
- log.trace("Successfully validated transport: " + transport);
- return true;
- } else {
- if (log.isTraceEnabled())
- log.trace("Unknown response status: " + respStatus);
- return false;
- }
- } catch (Exception e) {
- if (log.isTraceEnabled())
- log.trace("Failed to validate transport: " + transport, e);
- return false;
- }
- }
-}
Deleted: branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodOperationsImpl.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodOperationsImpl.java 2010-07-14 10:28:12 UTC (rev 2032)
+++ branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodOperationsImpl.java 2010-07-14 16:10:00 UTC (rev 2033)
@@ -1,421 +0,0 @@
-package org.infinispan.client.hotrod.impl.protocol;
-
-import org.infinispan.client.hotrod.Flag;
-import org.infinispan.client.hotrod.exceptions.InvalidResponseException;
-import org.infinispan.client.hotrod.exceptions.TransportException;
-import org.infinispan.client.hotrod.impl.BinaryVersionedValue;
-import org.infinispan.client.hotrod.impl.VersionedOperationResponse;
-import org.infinispan.client.hotrod.impl.transport.Transport;
-import org.infinispan.client.hotrod.impl.transport.TransportFactory;
-import org.infinispan.util.Util;
-import org.infinispan.util.logging.Log;
-import org.infinispan.util.logging.LogFactory;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * @author Mircea.Markus at jboss.com
- * @since 4.1
- */
-public class HotRodOperationsImpl implements HotRodOperations, HotRodConstants {
-
- private static Log log = LogFactory.getLog(HotRodOperationsImpl.class);
-
- private final byte[] cacheName;
- private TransportFactory transportFactory;
- private final AtomicInteger topologyId;
-
- public HotRodOperationsImpl(String cacheName, TransportFactory transportFactory, AtomicInteger topologyId) {
- this.cacheName = cacheName.getBytes(STRING_CHARSET);
- this.transportFactory = transportFactory;
- this.topologyId = topologyId;
- }
-
- public byte[] get(byte[] key, Flag[] flags) {
- Transport transport = getTransport(key, true);
- int retryCount = 0;
- do {
- try {
- try {
- short status = sendKeyOperation(key, transport, GET_REQUEST, flags, GET_RESPONSE);
- if (status == KEY_DOES_NOT_EXIST_STATUS) {
- return null;
- }
- if (status == NO_ERROR_STATUS) {
- return transport.readArray();
- }
- } finally {
- releaseTransport(transport);
- }
- } catch (TransportException te) {
- logErrorAndThrowExceptionIfNeeded(retryCount, te);
- }
- if (shouldRetry(retryCount)) {
- transport = getTransport(key, false);
- }
- retryCount++;
- } while (shouldRetry(retryCount));
- throw new IllegalStateException("We should not reach here!");
- }
-
- public byte[] remove(byte[] key, Flag[] flags) {
- Transport transport = getTransport(key, true);
- int retryCount = 0;
- do {
- try {
- short status = sendKeyOperation(key, transport, REMOVE_REQUEST, flags, REMOVE_RESPONSE);
- if (status == KEY_DOES_NOT_EXIST_STATUS) {
- return null;
- } else if (status == NO_ERROR_STATUS) {
- return returnPossiblePrevValue(transport, flags);
- }
- } catch (TransportException te) {
- logErrorAndThrowExceptionIfNeeded(retryCount, te);
- } finally {
- releaseTransport(transport);
- }
- if (shouldRetry(retryCount)) {
- transport = getTransport(key, false);
- }
- retryCount++;
- } while (shouldRetry(retryCount));
- throw new IllegalStateException("We should not reach here!");
- }
-
- public boolean containsKey(byte[] key, Flag... flags) {
- Transport transport = getTransport(key, true);
- int retryCount = 0;
- do {
- try {
- short status = sendKeyOperation(key, transport, CONTAINS_KEY_REQUEST, flags, CONTAINS_KEY_RESPONSE);
- if (status == KEY_DOES_NOT_EXIST_STATUS) {
- return false;
- } else if (status == NO_ERROR_STATUS) {
- return true;
- }
- } catch (TransportException te) {
- logErrorAndThrowExceptionIfNeeded(retryCount, te);
- }
- finally {
- releaseTransport(transport);
- }
- if (shouldRetry(retryCount)) {
- transport = getTransport(key, false);
- }
- retryCount++;
-
- } while (shouldRetry(retryCount));
- throw new IllegalStateException("We should not reach here!");
- }
-
- public BinaryVersionedValue getWithVersion(byte[] key, Flag... flags) {
- Transport transport = getTransport(key, true);
- int retryCount = 0;
- do {
- try {
- short status = sendKeyOperation(key, transport, GET_WITH_VERSION, flags, GET_WITH_VERSION_RESPONSE);
- if (status == KEY_DOES_NOT_EXIST_STATUS) {
- return null;
- }
- if (status == NO_ERROR_STATUS) {
- long version = transport.readLong();
- if (log.isTraceEnabled()) {
- log.trace("Received version: " + version);
- }
- byte[] value = transport.readArray();
- return new BinaryVersionedValue(version, value);
- }
- } catch (TransportException te) {
- logErrorAndThrowExceptionIfNeeded(retryCount, te);
- } finally {
- releaseTransport(transport);
- }
- if (shouldRetry(retryCount)) {
- transport = getTransport(key, false);
- }
- retryCount++;
- } while (shouldRetry(retryCount));
- throw new IllegalStateException("We should not reach here!");
- }
-
-
- public byte[] put(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags) {
- Transport transport = getTransport(key, true);
- int retryCount = 0;
- do {
- try {
- short status = sendPutOperation(key, value, transport, PUT_REQUEST, PUT_RESPONSE, lifespan, maxIdle, flags);
- if (status != NO_ERROR_STATUS) {
- throw new InvalidResponseException("Unexpected response status: " + Integer.toHexString(status));
- }
- return returnPossiblePrevValue(transport, flags);
- } catch (TransportException te) {
- logErrorAndThrowExceptionIfNeeded(retryCount, te);
- } finally {
- releaseTransport(transport);
- }
- if (shouldRetry(retryCount)) {
- transport = getTransport(key, false);
- }
- retryCount++;
- } while (shouldRetry(retryCount));
- throw new IllegalStateException("We should not reach here!");
- }
-
- public byte[] putIfAbsent(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags) {
- Transport transport = getTransport(key, true);
- int retryCount = 0;
- do {
- try {
- short status = sendPutOperation(key, value, transport, PUT_IF_ABSENT_REQUEST, PUT_IF_ABSENT_RESPONSE, lifespan, maxIdle, flags);
- if (status == NO_ERROR_STATUS || status == NOT_PUT_REMOVED_REPLACED_STATUS) {
- byte[] bytes = returnPossiblePrevValue(transport, flags);
- if (log.isTraceEnabled()) {
- log.trace("Returning from putIfAbsent: " + Util.printArray(bytes, false));
- }
- return bytes;
- }
- } catch (TransportException te) {
- logErrorAndThrowExceptionIfNeeded(retryCount, te);
- }
- finally {
- releaseTransport(transport);
- }
- if (shouldRetry(retryCount)) {
- transport = getTransport(key, false);
- }
- retryCount++;
- } while (shouldRetry(retryCount));
- throw new IllegalStateException("We should not reach here!");
- }
-
- public byte[] replace(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags) {
- Transport transport = getTransport(key, true);
- int retryCount = 0;
- do {
- try {
- short status = sendPutOperation(key, value, transport, REPLACE_REQUEST, REPLACE_RESPONSE, lifespan, maxIdle, flags);
- if (status == NO_ERROR_STATUS || status == NOT_PUT_REMOVED_REPLACED_STATUS) {
- return returnPossiblePrevValue(transport, flags);
- }
- } catch (TransportException te) {
- logErrorAndThrowExceptionIfNeeded(retryCount, te);
- } finally {
- releaseTransport(transport);
- }
- if (shouldRetry(retryCount)) {
- transport = getTransport(key, false);
- }
- retryCount++;
- } while (shouldRetry(retryCount));
- throw new IllegalStateException(" should not reach here!");
- }
-
- /**
- * request : [header][key length][key][lifespan][max idle][entry_version][value length][value] response: If
- * ForceReturnPreviousValue has been passed, this responses will contain previous [value length][value] for that key.
- * If the key does not exist or previous was null, value length would be 0. Otherwise, if no ForceReturnPreviousValue
- * was sent, the response would be empty.
- */
- public VersionedOperationResponse replaceIfUnmodified(byte[] key, byte[] value, int lifespan, int maxIdle, long version, Flag... flags) {
- Transport transport = getTransport(key, true);
- int retryCount = 0;
- do {
- try {
- // 1) write header
- long messageId = HotRodOperationsHelper.writeHeader(transport, REPLACE_IF_UNMODIFIED_REQUEST, cacheName, topologyId, flags);
-
- //2) write message body
- transport.writeArray(key);
- transport.writeVInt(lifespan);
- transport.writeVInt(maxIdle);
- transport.writeLong(version);
- transport.writeArray(value);
- return returnVersionedOperationResponse(transport, messageId, REPLACE_IF_UNMODIFIED_RESPONSE, flags);
- } catch (TransportException te) {
- logErrorAndThrowExceptionIfNeeded(retryCount, te);
- }
- finally {
- releaseTransport(transport);
- }
- if (shouldRetry(retryCount)) {
- transport = getTransport(key, false);
- }
- retryCount++;
- } while (shouldRetry(retryCount));
- throw new IllegalStateException(" should not reach here!");
- }
-
- /**
- * Request: [header][key length][key][entry_version]
- */
- public VersionedOperationResponse removeIfUnmodified(byte[] key, long version, Flag... flags) {
- Transport transport = getTransport(key, true);
- int retryCount = 0;
- do {
- try {
- // 1) write header
- long messageId = HotRodOperationsHelper.writeHeader(transport, REMOVE_IF_UNMODIFIED_REQUEST, cacheName, topologyId, flags);
-
- //2) write message body
- transport.writeArray(key);
- transport.writeLong(version);
-
- //process response and return
- return returnVersionedOperationResponse(transport, messageId, REMOVE_IF_UNMODIFIED_RESPONSE, flags);
-
- } catch (TransportException te) {
- logErrorAndThrowExceptionIfNeeded(retryCount, te);
- }
- finally {
- releaseTransport(transport);
- }
- if (shouldRetry(retryCount)) {
- transport = getTransport(key, false);
- }
- retryCount++;
- } while (shouldRetry(retryCount));
- throw new IllegalStateException("Should not reach this point!");
- }
-
- public void clear(Flag... flags) {
- Transport transport = transportFactory.getTransport();
- int retryCount = 0;
- do {
- try {
- // 1) write header
- long messageId = HotRodOperationsHelper.writeHeader(transport, CLEAR_REQUEST, cacheName, topologyId, flags);
- HotRodOperationsHelper.readHeaderAndValidate(transport, messageId, CLEAR_RESPONSE, topologyId);
- } catch (TransportException te) {
- logErrorAndThrowExceptionIfNeeded(retryCount, te);
- } finally {
- releaseTransport(transport);
- }
- if (shouldRetry(retryCount)) {
- transport = transportFactory.getTransport();
- }
- retryCount++;
-
- } while (shouldRetry(retryCount));
- }
-
- public Map<String, String> stats() {
- Transport transport = transportFactory.getTransport();
- try {
- // 1) write header
- long messageId = HotRodOperationsHelper.writeHeader(transport, STATS_REQUEST, cacheName, topologyId);
- HotRodOperationsHelper.readHeaderAndValidate(transport, messageId, STATS_RESPONSE, topologyId);
- int nrOfStats = transport.readVInt();
-
- Map<String, String> result = new HashMap<String, String>();
- for (int i = 0; i < nrOfStats; i++) {
- String statName = transport.readString();
- String statValue = transport.readString();
- result.put(statName, statValue);
- }
- return result;
- } finally {
- releaseTransport(transport);
- }
- }
-
- //[header][key length][key][lifespan][max idle][value length][value]
-
- private short sendPutOperation(byte[] key, byte[] value, Transport transport, short opCode, byte opRespCode, int lifespan, int maxIdle, Flag[] flags) {
- // 1) write header
- long messageId = HotRodOperationsHelper.writeHeader(transport, opCode, cacheName, topologyId, flags);
-
- // 2) write key and value
- transport.writeArray(key);
- transport.writeVInt(lifespan);
- transport.writeVInt(maxIdle);
- transport.writeArray(value);
- transport.flush();
-
- // 3) now read header
-
- //return status (not error status for sure)
- return HotRodOperationsHelper.readHeaderAndValidate(transport, messageId, opRespCode, topologyId);
- }
-
- /*
- * Magic | MessageId | Version | Opcode | CacheNameLength | CacheName | Flags | Client Intelligence | Topology Id
- */
-
- private boolean hasForceReturn(Flag[] flags) {
- if (flags == null) return false;
- for (Flag flag : flags) {
- if (flag == Flag.FORCE_RETURN_VALUE) return true;
- }
- return false;
- }
-
- private short sendKeyOperation(byte[] key, Transport transport, byte opCode, Flag[] flags, byte opRespCode) {
- // 1) write [header][key length][key]
- long messageId = HotRodOperationsHelper.writeHeader(transport, opCode, cacheName, topologyId, flags);
- transport.writeArray(key);
- transport.flush();
-
- // 2) now read the header
- return HotRodOperationsHelper.readHeaderAndValidate(transport, messageId, opRespCode, topologyId);
- }
-
- private byte[] returnPossiblePrevValue(Transport transport, Flag[] flags) {
- if (hasForceReturn(flags)) {
- byte[] bytes = transport.readArray();
- if (log.isTraceEnabled()) log.trace("Previous value bytes is: " + Util.printArray(bytes, false));
- //0-length response means null
- return bytes.length == 0 ? null : bytes;
- } else {
- return null;
- }
- }
-
- private void releaseTransport(Transport transport) {
- if (transport != null)
- transportFactory.releaseTransport(transport);
- }
-
- private VersionedOperationResponse returnVersionedOperationResponse(Transport transport, long messageId, byte response, Flag[] flags) {
- //3) ...
- short respStatus = HotRodOperationsHelper.readHeaderAndValidate(transport, messageId, response, topologyId);
-
- //4 ...
- VersionedOperationResponse.RspCode code;
- if (respStatus == NO_ERROR_STATUS) {
- code = VersionedOperationResponse.RspCode.SUCCESS;
- } else if (respStatus == NOT_PUT_REMOVED_REPLACED_STATUS) {
- code = VersionedOperationResponse.RspCode.MODIFIED_KEY;
- } else if (respStatus == KEY_DOES_NOT_EXIST_STATUS) {
- code = VersionedOperationResponse.RspCode.NO_SUCH_KEY;
- } else {
- throw new IllegalStateException("Unknown response status: " + Integer.toHexString(respStatus));
- }
- byte[] prevValue = returnPossiblePrevValue(transport, flags);
- return new VersionedOperationResponse(prevValue, code);
- }
-
- private void logErrorAndThrowExceptionIfNeeded(int i, TransportException te) {
- String message = "Transport exception. Retry " + i + " out of " + transportFactory.getTransportCount();
- if (i == transportFactory.getTransportCount() - 1 || transportFactory.getTransportCount() < 0) {
- log.warn(message, te);
- throw te;
- } else {
- log.trace(message + ":" + te);
- }
- }
-
- private Transport getTransport(byte[] key, boolean hashAware) {
- if (hashAware) {
- return transportFactory.getTransport(key);
- } else {
- return transportFactory.getTransport();
- }
- }
-
- private boolean shouldRetry(int retryCount) {
- return retryCount < transportFactory.getTransportCount();
- }
-}
Modified: branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/AbstractTransport.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/AbstractTransport.java 2010-07-14 10:28:12 UTC (rev 2032)
+++ branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/AbstractTransport.java 2010-07-14 16:10:00 UTC (rev 2033)
@@ -28,7 +28,7 @@
@Override
public String readString() {
byte[] strContent = readArray();
- String readString = new String(strContent, HotRodConstants.STRING_CHARSET);
+ String readString = new String(strContent, HotRodConstants.HOTROD_STRING_CHARSET);
if (log.isTraceEnabled()) {
log.trace("Read string is: " + readString);
}
@@ -80,7 +80,7 @@
@Override
public void writeString(String string) {
if (!string.isEmpty()) {
- writeArray(string.getBytes(HotRodConstants.STRING_CHARSET));
+ writeArray(string.getBytes(HotRodConstants.HOTROD_STRING_CHARSET));
} else {
writeVInt(0);
}
Modified: branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TransportObjectFactory.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TransportObjectFactory.java 2010-07-14 10:28:12 UTC (rev 2032)
+++ branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TransportObjectFactory.java 2010-07-14 16:10:00 UTC (rev 2033)
@@ -1,7 +1,7 @@
package org.infinispan.client.hotrod.impl.transport.tcp;
import org.apache.commons.pool.BaseKeyedPoolableObjectFactory;
-import org.infinispan.client.hotrod.impl.protocol.HotRodOperationsHelper;
+import org.infinispan.client.hotrod.impl.operations.PingOperation;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
@@ -37,7 +37,7 @@
log.trace("Executing first ping!");
firstPingExecuted = true;
try {
- HotRodOperationsHelper.ping(tcpTransport, topologyId);
+ ping(tcpTransport, topologyId);
} catch (Exception e) {
log.trace("Ignoring ping request failure during ping on startup: " + e.getMessage());
}
@@ -45,6 +45,11 @@
return tcpTransport;
}
+ private boolean ping(TcpTransport tcpTransport, AtomicInteger topologyId) {
+ PingOperation po = new PingOperation(null, topologyId, tcpTransport);
+ return (Boolean)po.execute();
+ }
+
/**
* This will be called by the test thread when testWhileIdle==true.
*/
@@ -54,7 +59,7 @@
if (log.isTraceEnabled()) {
log.trace("About to validate(ping) connection to server " + key + ". TcpTransport is " + transport);
}
- return HotRodOperationsHelper.ping(transport, topologyId);
+ return ping(transport, topologyId);
}
@Override
Modified: branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HeavyLoadConnectionPoolingTest.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HeavyLoadConnectionPoolingTest.java 2010-07-14 10:28:12 UTC (rev 2032)
+++ branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HeavyLoadConnectionPoolingTest.java 2010-07-14 16:10:00 UTC (rev 2033)
@@ -76,7 +76,7 @@
}
for (WorkerThread wt: workers) {
- wt.interrupt();
+ wt.stopWorker();
wt.waitToFinish();
}
//now wait for the idle thread to wake up and clean them
Modified: branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HotRodIntegrationTest.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HotRodIntegrationTest.java 2010-07-14 10:28:12 UTC (rev 2032)
+++ branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HotRodIntegrationTest.java 2010-07-14 16:10:00 UTC (rev 2033)
@@ -9,7 +9,7 @@
import org.infinispan.server.hotrod.HotRodServer;
import org.infinispan.test.SingleCacheManagerTest;
import org.infinispan.test.fwk.TestCacheManagerFactory;
-import org.infinispan.util.ByteArrayKey;
+import org.infinispan.util.ByteArrayKey;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.testng.annotations.AfterClass;
Deleted: branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/NettyHotRodIntegrationTest.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/NettyHotRodIntegrationTest.java 2010-07-14 10:28:12 UTC (rev 2032)
+++ branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/NettyHotRodIntegrationTest.java 2010-07-14 16:10:00 UTC (rev 2033)
@@ -1,24 +0,0 @@
-package org.infinispan.client.hotrod;
-
-import org.infinispan.client.hotrod.impl.transport.netty.NettyTransportFactory;
-import org.infinispan.config.Configuration;
-import org.infinispan.test.fwk.TestCacheManagerFactory;
-import org.testng.annotations.Test;
-
-import java.util.Properties;
-
-/**
- * @author Mircea.Markus at jboss.com
- * @since 4.1
- */
- at Test(testName = "hotrod.NettyHotRodIntegrationTest", groups = "functional")
-public class NettyHotRodIntegrationTest extends HotRodIntegrationTest {
-
- @Override
- protected RemoteCacheManager getRemoteCacheManager() {
- Properties props = new Properties();
- props.put("transport-factory", NettyTransportFactory.class.getName());
- props.put("hotrod-servers", "127.0.0.1:" + hotrodServer.getPort());
- return new RemoteCacheManager(props, true);
- }
-}
Modified: branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/RemoteCacheManagerTest.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/RemoteCacheManagerTest.java 2010-07-14 10:28:12 UTC (rev 2032)
+++ branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/RemoteCacheManagerTest.java 2010-07-14 16:10:00 UTC (rev 2033)
@@ -31,8 +31,12 @@
@AfterTest(alwaysRun = true)
public void release() {
- if (cacheManager != null) cacheManager.stop();
- if (hotrodServer != null) hotrodServer.stop();
+ try {
+ if (cacheManager != null) cacheManager.stop();
+ if (hotrodServer != null) hotrodServer.stop();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
if (prevValue != null) {
System.setProperty(RemoteCacheManager.OVERRIDE_HOTROD_SERVERS, prevValue);
} else {
Modified: branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ReplTopologyChangeTest.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ReplTopologyChangeTest.java 2010-07-14 10:28:12 UTC (rev 2032)
+++ branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ReplTopologyChangeTest.java 2010-07-14 16:10:00 UTC (rev 2033)
@@ -41,11 +41,19 @@
protected void clearContent() throws Throwable {
}
- @AfterClass
+ @AfterClass (alwaysRun = true)
@Override
protected void destroy() {
+ try {
hotRodServer1.stop();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ try {
hotRodServer2.stop();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
super.destroy();
}
@@ -112,6 +120,7 @@
public void testDropServer() {
hotRodServer3.stop();
manager(2).stop();
+ log.trace("Just stopped server 2");
waitForClusterToForm(2);
@@ -138,7 +147,7 @@
protected void waitForClusterToForm(int memberCount) {
TestingUtil.blockUntilViewReceived(manager(0).getCache(), memberCount, 30000);
for (int i = 0; i < memberCount; i++) {
- TestingUtil.blockUntilCacheStatusAchieved(manager(i).getCache(), ComponentStatus.RUNNING, 10000);
+ TestingUtil.blockUntilCacheStatusAchieved(manager(i).getCache(), ComponentStatus.RUNNING, 30000);
}
}
}
Modified: branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/WorkerThread.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/WorkerThread.java 2010-07-14 10:28:12 UTC (rev 2032)
+++ branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/WorkerThread.java 2010-07-14 16:10:00 UTC (rev 2033)
@@ -30,6 +30,7 @@
volatile String key;
volatile String value;
volatile boolean finished = false;
+ volatile boolean stopWorker = false;
public WorkerThread(RemoteCache remoteCache) {
super("WorkerThread-" + WORKER_INDEX.getAndIncrement());
@@ -70,7 +71,7 @@
private void stress_() {
Random rnd = new Random();
- while (!isInterrupted()) {
+ while (!stopWorker) {
remoteCache.put(rnd.nextLong(), rnd.nextLong());
try {
Thread.sleep(50);
@@ -152,4 +153,8 @@
}
}
}
+
+ public void stopWorker() {
+ this.stopWorker = true;
+ }
}
More information about the infinispan-commits
mailing list