[infinispan-commits] Infinispan SVN: r1805 - in trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl: async and 5 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Tue May 18 11:40:50 EDT 2010
Author: mircea.markus
Date: 2010-05-18 11:40:48 -0400 (Tue, 18 May 2010)
New Revision: 1805
Added:
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/consistenthash/ConsistentHashV1.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodOperationsHelper.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotrodConstants.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotrodOperations.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotrodOperationsImpl.java
Removed:
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodConstants.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperations.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperationsImpl.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/consistenthash/ConsitentHashV1.java
Modified:
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/async/DefaultAsyncExecutorFactory.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/async/NotifyingFutureImpl.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/consistenthash/ConsistentHash.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/consistenthash/ConsistentHashFactory.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/AbstractTransport.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/Transport.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/TransportFactory.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/VHelper.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/HotrodClientDecoder.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/HotrodClientEncoder.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/HotrodClientPipelaneFactory.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/InputStreamAdapter.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransport.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransportFactory.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/OutputStreamAdapter.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/PropsKeyedObjectPoolFactory.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/RequestBalancingStrategy.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/RoundRobinBalancingStrategy.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransport.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TransportObjectFactory.java
Log:
updated test and javadocs, fixed various issues
Deleted: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodConstants.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodConstants.java 2010-05-18 15:39:00 UTC (rev 1804)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodConstants.java 2010-05-18 15:40:48 UTC (rev 1805)
@@ -1,61 +0,0 @@
-package org.infinispan.client.hotrod.impl;
-
-/**
- * // TODO: Document this
- *
- * @author mmarkus
- * @since 4.1
- */
-public interface HotrodConstants {
-
- public static final short REQUEST_MAGIC = 0xA0;
- public static final short RESPONSE_MAGIC = 0xA1;
-
- public static final byte HOTROD_VERSION = 10;
-
- //requests
- public static final byte PUT_REQUEST = 0x01;
- public static final byte GET_REQUEST = 0x03;
- public static final byte PUT_IF_ABSENT_REQUEST = 0x05;
- public static final byte REPLACE_REQUEST = 0x07;
- public static final byte REPLACE_IF_UNMODIFIED_REQUEST = 0x09;
- public static final byte REMOVE_REQUEST = 0x0B;
- public static final byte REMOVE_IF_UNMODIFIED_REQUEST = 0x0D;
- public static final byte CONTAINS_KEY_REQUEST = 0x0F;
- public static final byte GET_WITH_VERSION = 0x11;
- public static final byte CLEAR_REQUEST = 0x13;
- public static final byte STATS_REQUEST = 0x15;
- public static final byte PING_REQUEST = 0x17;
-
-
- //responses
- public static final byte PUT_RESPONSE = 0x02;
- public static final byte GET_RESPONSE = 0x04;
- public static final byte PUT_IF_ABSENT_RESPONSE = 0x06;
- public static final byte REPLACE_RESPONSE = 0x08;
- public static final byte REPLACE_IF_UNMODIFIED_RESPONSE = 0x0A;
- public static final byte REMOVE_RESPONSE = 0x0C;
- public static final byte REMOVE_IF_UNMODIFIED_RESPONSE = 0x0E;
- public static final byte CONTAINS_KEY_RESPONSE = 0x10;
- public static final byte GET_WITH_VERSION_RESPONSE = 0x12;
- public static final byte CLEAR_RESPONSE = 0x14;
- public static final byte STATS_RESPONSE = 0x16;
- public static final byte PING_RESPONSE = 0x18;
- public static final byte ERROR_RESPONSE = 0x50;
-
- //response status
- public static final byte NO_ERROR_STATUS = 0x00;
- public static final int INVALID_MAGIC_OR_MESSAGE_ID_STATUS = 0x81;
- public static final int REQUEST_PARSING_ERROR_STATUS = 0x84;
- public static final byte NOT_PUT_REMOVED_REPLACED_STATUS = 0x01;
- public static final int UNKNOWN_COMMAND_STATUS = 0x82;
- public static final int SERVER_ERROR_STATUS = 0x85;
- public static final int KEY_DOES_NOT_EXIST_STATUS = 0x02;
- public static final int UNKNOWN_VERSION_STATUS = 0x83;
- public static final int COMMAND_TIMEOUT_STATUS = 0x86;
-
-
- public static final byte CLIENT_INTELLIGENCE_BASIC = 0x01;
- public static final byte CLIENT_INTELLIGENCE_TOPOLOGY_AWARE = 0x02;
- public static final byte CLIENT_INTELLIGENCE_HASH_DISTRIBUTION_AWARE = 0x03;
-}
Deleted: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperations.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperations.java 2010-05-18 15:39:00 UTC (rev 1804)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperations.java 2010-05-18 15:40:48 UTC (rev 1805)
@@ -1,67 +0,0 @@
-package org.infinispan.client.hotrod.impl;
-
-
-import org.infinispan.client.hotrod.Flag;
-
-import java.util.Map;
-
-/**
- * // TODO: Document this
- *
- * - TODO - add timeout support
- * - TODO - enforce encoding and add such tests
- *
- * @author mmarkus
- * @since 4.1
- */
-public interface HotrodOperations {
-
- public byte[] get(byte[] key, Flag... flags);
-
- public byte[] remove(byte[] key, Flag... flags);
-
- public boolean containsKey(byte[] key, Flag... flags);
-
- /**
- * Returns null if the given key does not exist.
- */
- public BinaryVersionedValue getWithVersion(byte[] key, Flag... flags);
-
- /**
- * @param lifespan number of seconds that a entry during which the entry is allowed to life.
- * If number of seconds is bigger than 30 days, this number of seconds is treated as UNIX time and so, represents
- * the number of seconds since 1/1/1970. If set to 0, lifespan is unlimited.
- * @param maxIdle Number of seconds that a entry can be idle before it's evicted from the cache. If 0, no max
- * @param flags
- */
- public byte[] put(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags);
-
- /**
- * @param lifespan same as in {@link #put(byte[],byte[],int,int,org.infinispan.client.hotrod.Flag...)}
- * @param maxIdle same as in {@link #put(byte[],byte[],int,int,org.infinispan.client.hotrod.Flag...)}
- * @param flags
- */
- public byte[] putIfAbsent(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags);
-
- /**
- * @param lifespan same as in {@link #put(byte[],byte[],int,int,org.infinispan.client.hotrod.Flag...)}
- * @param maxIdle same as in {@link #put(byte[],byte[],int,int,org.infinispan.client.hotrod.Flag...)}
- * @param flags
- */
- public byte[] replace(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags);
-
- /**
- * @param lifespan same as in {@link #put(byte[],byte[],int,int,org.infinispan.client.hotrod.Flag...)}
- * @param maxIdle same as in {@link #put(byte[],byte[],int,int,org.infinispan.client.hotrod.Flag...)}
- * @param flags
- */
- public VersionedOperationResponse replaceIfUnmodified(byte[] key, byte[] value, int lifespan, int maxIdle, long version, Flag... flags);
-
- public VersionedOperationResponse removeIfUnmodified(byte[] key, long version, Flag... flags);
-
- public void clear(Flag... flags);
-
- public Map<String, String> stats();
-
- public boolean ping();
-}
Deleted: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperationsImpl.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperationsImpl.java 2010-05-18 15:39:00 UTC (rev 1804)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperationsImpl.java 2010-05-18 15:40:48 UTC (rev 1805)
@@ -1,460 +0,0 @@
-package org.infinispan.client.hotrod.impl;
-
-import org.infinispan.client.hotrod.Flag;
-import org.infinispan.client.hotrod.exceptions.HotRodClientException;
-import org.infinispan.client.hotrod.exceptions.InvalidResponseException;
-import org.infinispan.client.hotrod.exceptions.TimeoutException;
-import org.infinispan.client.hotrod.exceptions.TransportException;
-import org.infinispan.client.hotrod.impl.transport.Transport;
-import org.infinispan.client.hotrod.impl.transport.TransportFactory;
-import org.infinispan.util.logging.Log;
-import org.infinispan.util.logging.LogFactory;
-
-import java.net.InetSocketAddress;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * // TODO: Document this
- *
- * @author mmarkus
- * @since 4.1
- */
-public class HotrodOperationsImpl implements HotrodOperations, HotrodConstants {
-
- private static Log log = LogFactory.getLog(HotrodOperationsImpl.class);
-
- private final byte[] cacheNameBytes;
- private static final AtomicLong MSG_ID = new AtomicLong();
- private static final AtomicInteger TOPOLOGY_ID = new AtomicInteger();
- private TransportFactory transportFactory;
- private byte clientIntelligence = CLIENT_INTELLIGENCE_HASH_DISTRIBUTION_AWARE;
-
- public HotrodOperationsImpl(String cacheName, TransportFactory transportFactory) {
- cacheNameBytes = cacheName.getBytes(); //todo add charset here
- this.transportFactory = transportFactory;
- }
-
- public byte[] get(byte[] key, Flag[] flags) {
- Transport transport = transportFactory.getTransport(key);
- try {
- short status = sendKeyOperation(key, transport, GET_REQUEST, flags, GET_RESPONSE);
- if (status == KEY_DOES_NOT_EXIST_STATUS) {
- return null;
- }
- if (status == NO_ERROR_STATUS) {
- return transport.readArray();
- }
- } finally {
- releaseTransport(transport);
- }
- throw new IllegalStateException("We should not reach here!");
- }
-
- public byte[] remove(byte[] key, Flag[] flags) {
- Transport transport = transportFactory.getTransport(key);
- try {
- short status = sendKeyOperation(key, transport, REMOVE_REQUEST, flags, REMOVE_RESPONSE);
- if (status == KEY_DOES_NOT_EXIST_STATUS) {
- return null;
- } else if (status == NO_ERROR_STATUS) {
- return returnPossiblePrevValue(transport, flags);
- }
- } finally {
- releaseTransport(transport);
- }
- throw new IllegalStateException("We should not reach here!");
- }
-
- public boolean containsKey(byte[] key, Flag... flags) {
- Transport transport = transportFactory.getTransport(key);
- try {
- short status = sendKeyOperation(key, transport, CONTAINS_KEY_REQUEST, flags, CONTAINS_KEY_RESPONSE);
- if (status == KEY_DOES_NOT_EXIST_STATUS) {
- return false;
- } else if (status == NO_ERROR_STATUS) {
- return true;
- }
- } finally {
- releaseTransport(transport);
- }
- throw new IllegalStateException("We should not reach here!");
- }
-
- public BinaryVersionedValue getWithVersion(byte[] key, Flag... flags) {
- Transport transport = transportFactory.getTransport(key);
- try {
- short status = sendKeyOperation(key, transport, GET_WITH_VERSION, flags, GET_WITH_VERSION_RESPONSE);
- if (status == KEY_DOES_NOT_EXIST_STATUS) {
- return null;
- }
- if (status == NO_ERROR_STATUS) {
- long version = transport.readLong();
- if (log.isTraceEnabled()) {
- log.trace("Received version: " + version);
- }
- byte[] value = transport.readArray();
- return new BinaryVersionedValue(version, value);
- }
- } finally {
- releaseTransport(transport);
- }
- throw new IllegalStateException("We should not reach here!");
- }
-
-
- public byte[] put(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags) {
- Transport transport = transportFactory.getTransport(key);
- try {
- short status = sendPutOperation(key, value, transport, PUT_REQUEST, PUT_RESPONSE, lifespan, maxIdle, flags);
- if (status != NO_ERROR_STATUS) {
- throw new InvalidResponseException("Unexpected response status: " + Integer.toHexString(status));
- }
- return returnPossiblePrevValue(transport, flags);
- } finally {
- releaseTransport(transport);
- }
- }
-
- public byte[] putIfAbsent(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags) {
- Transport transport = transportFactory.getTransport(key);
- try {
- short status = sendPutOperation(key, value, transport, PUT_IF_ABSENT_REQUEST, PUT_IF_ABSENT_RESPONSE, lifespan, maxIdle, flags);
- if (status == NO_ERROR_STATUS || status == NOT_PUT_REMOVED_REPLACED_STATUS) {
- byte[] bytes = returnPossiblePrevValue(transport, flags);
- if (log.isTraceEnabled()) {
- log.trace("Returning from putIfAbsent: " + Arrays.toString(bytes));
- }
- return bytes;
- }
- } finally {
- releaseTransport(transport);
- }
- throw new IllegalStateException("We should not reach here!");
- }
-
- public byte[] replace(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags) {
- Transport transport = transportFactory.getTransport(key);
- try {
- short status = sendPutOperation(key, value, transport, REPLACE_REQUEST, REPLACE_RESPONSE, lifespan, maxIdle, flags);
- if (status == NO_ERROR_STATUS || status == NOT_PUT_REMOVED_REPLACED_STATUS) {
- return returnPossiblePrevValue(transport, flags);
- }
- } finally {
- releaseTransport(transport);
- }
- throw new IllegalStateException("We should not reach here!");
- }
-
- /**
- * request : [header][key length][key][lifespan][max idle][entry_version][value length][value] response: If
- * ForceReturnPreviousValue has been passed, this responses will contain previous [value length][value] for that key.
- * If the key does not exist or previous was null, value length would be 0. Otherwise, if no ForceReturnPreviousValue
- * was sent, the response would be empty.
- */
- public VersionedOperationResponse replaceIfUnmodified(byte[] key, byte[] value, int lifespan, int maxIdle, long version, Flag... flags) {
- Transport transport = transportFactory.getTransport(key);
- try {
- // 1) write header
- long messageId = writeHeader(transport, REPLACE_IF_UNMODIFIED_REQUEST, flags);
-
- //2) write message body
- transport.writeArray(key);
- transport.writeVInt(lifespan);
- transport.writeVInt(maxIdle);
- transport.writeLong(version);
- transport.writeArray(value);
- return returnVersionedOperationResponse(transport, messageId, REPLACE_IF_UNMODIFIED_RESPONSE, flags);
- } finally {
- releaseTransport(transport);
- }
- }
-
- /**
- * Request: [header][key length][key][entry_version]
- */
- public VersionedOperationResponse removeIfUnmodified(byte[] key, long version, Flag... flags) {
- Transport transport = transportFactory.getTransport(key);
- try {
- // 1) write header
- long messageId = writeHeader(transport, REMOVE_IF_UNMODIFIED_REQUEST, flags);
-
- //2) write message body
- transport.writeArray(key);
- transport.writeLong(version);
-
- //process response and return
- return returnVersionedOperationResponse(transport, messageId, REMOVE_IF_UNMODIFIED_RESPONSE, flags);
-
- } finally {
- releaseTransport(transport);
- }
- }
-
- public void clear(Flag... flags) {
- Transport transport = transportFactory.getTransport();
- try {
- // 1) write header
- long messageId = writeHeader(transport, CLEAR_REQUEST, flags);
- readHeaderAndValidate(transport, messageId, CLEAR_RESPONSE);
- } finally {
- releaseTransport(transport);
- }
- }
-
- public Map<String, String> stats() {
- Transport transport = transportFactory.getTransport();
- try {
- // 1) write header
- long messageId = writeHeader(transport, STATS_REQUEST);
- readHeaderAndValidate(transport, messageId, STATS_RESPONSE);
- int nrOfStats = transport.readVInt();
-
- Map<String, String> result = new HashMap<String, String>();
- for (int i = 0; i < nrOfStats; i++) {
- String statName = transport.readString();
- String statValue = transport.readString();
- result.put(statName, statValue);
- }
- return result;
- } finally {
- releaseTransport(transport);
- }
- }
-
- @Override
- public boolean ping() {
- Transport transport = null;
- try {
- transport = transportFactory.getTransport();
- // 1) write header
- long messageId = writeHeader(transport, PING_REQUEST);
- short respStatus = readHeaderAndValidate(transport, messageId, HotrodConstants.PING_RESPONSE);
- if (respStatus == NO_ERROR_STATUS) {
- return true;
- }
- throw new IllegalStateException("Unknown response status: " + Integer.toHexString(respStatus));
- } catch (TransportException te) {
- log.trace("Exception while ping", te);
- return false;
- }
- finally {
- releaseTransport(transport);
- }
- }
-
- //[header][key length][key][lifespan][max idle][value length][value]
-
- private short sendPutOperation(byte[] key, byte[] value, Transport transport, short opCode, byte opRespCode, int lifespan, int maxIdle, Flag[] flags) {
- // 1) write header
- long messageId = writeHeader(transport, opCode, flags);
-
- // 2) write key and value
- transport.writeArray(key);
- transport.writeVInt(lifespan);
- transport.writeVInt(maxIdle);
- transport.writeArray(value);
- transport.flush();
-
- // 3) now read header
-
- //return status (not error status for sure)
- return readHeaderAndValidate(transport, messageId, opRespCode);
- }
-
- /*
- * Magic | MessageId | Version | Opcode | CacheNameLength | CacheName | Flags | Client Intelligence | Topology Id
- */
-
- private long writeHeader(Transport transport, short operationCode, Flag... flags) {
- transport.writeByte(REQUEST_MAGIC);
- long messageId = MSG_ID.incrementAndGet();
- transport.writeVLong(messageId);
- transport.writeByte(HOTROD_VERSION);
- transport.writeByte(operationCode);
- transport.writeArray(cacheNameBytes);
-
- int flagInt = 0;
- if (flags != null) {
- for (Flag flag : flags) {
- flagInt = flag.getFlagInt() | flagInt;
- }
- }
- transport.writeVInt(flagInt);
- transport.writeByte(clientIntelligence);
- transport.writeVInt(TOPOLOGY_ID.get());
- if (log.isTraceEnabled()) {
- log.trace("wrote header for message " + messageId + ". Operation code: " + operationCode + ". Flags: " + Integer.toHexString(flagInt));
- }
- return messageId;
- }
-
- /**
- * Magic | Message Id | Op code | Status | Topology Change Marker
- */
- private short readHeaderAndValidate(Transport transport, long messageId, short opRespCode) {
- short magic = transport.readByte();
- if (magic != RESPONSE_MAGIC) {
- String message = "Invalid magic number. Expected " + Integer.toHexString(RESPONSE_MAGIC) + " and received " + Integer.toHexString(magic);
- log.error(message);
- throw new InvalidResponseException(message);
- }
- long receivedMessageId = transport.readVLong();
- if (receivedMessageId != messageId) {
- String message = "Invalid message id. Expected " + Long.toHexString(messageId) + " and received " + Long.toHexString(receivedMessageId);
- log.error(message);
- throw new InvalidResponseException(message);
- }
- if (log.isTraceEnabled()) {
- log.trace("Received response for message id: " + receivedMessageId);
- }
- short receivedOpCode = transport.readByte();
- if (receivedOpCode != opRespCode) {
- if (receivedOpCode == ERROR_RESPONSE) {
- checkForErrorsInResponseStatus(transport.readByte(), messageId, transport);
- throw new IllegalStateException("Error expected! (i.e. exception in the prev statement)");
- }
- throw new InvalidResponseException("Invalid response operation. Expected " + Integer.toHexString(opRespCode) + " and received " + Integer.toHexString(receivedOpCode));
- }
- if (log.isTraceEnabled()) {
- log.trace("Received operation code is: " + receivedOpCode);
- }
- short status = transport.readByte();
- checkForErrorsInResponseStatus(status, messageId, transport);
- short topologyChangeByte = transport.readByte();
- if (topologyChangeByte == 1) {
- readNewTopologyAndHash(transport);
- }
- return status;
- }
-
- private void readNewTopologyAndHash(Transport transport) {
- int newTopologyId = transport.readVInt();
- TOPOLOGY_ID.set(newTopologyId);
- int numKeyOwners = transport.readUnsignedShort();
- short hashFunctionVersion = transport.readByte();
- int hashSpace = transport.readVInt();
- int clusterSize = transport.readVInt();
-
- if (log.isTraceEnabled()) {
- log.trace("Topology change request: newTopologyId=" + newTopologyId + ", numKeyOwners=" + numKeyOwners +
- ", hashFunctionVersion=" + hashFunctionVersion + ", hashSpaceSize=" + hashSpace + ", clusterSize=" + clusterSize);
- }
-
- LinkedHashMap<InetSocketAddress, Integer> servers2HashCode = new LinkedHashMap<InetSocketAddress, Integer>();
-
- for (int i = 0; i < clusterSize; i++) {
- String host = transport.readString();
- int port = transport.readUnsignedShort();
- if (log.isTraceEnabled()) {
- log.trace("Server read:" + host + ":" + port);
- }
- int hashCode = transport.read4ByteInt();
- servers2HashCode.put(new InetSocketAddress(host, port), hashCode);
- if (log.isTraceEnabled()) {
- log.trace("Hash code is: " + hashCode);
- }
- }
- if (log.isInfoEnabled()) {
- log.info("New topology: " + servers2HashCode);
- }
- transportFactory.updateServers(servers2HashCode.keySet());
- if (hashFunctionVersion == 0) {
- if (log.isTraceEnabled())
- log.trace("Not using a consistent hash function (hash function version == 0).");
- } else {
- transportFactory.updateHashFunction(servers2HashCode, numKeyOwners, hashFunctionVersion, hashSpace);
- }
- }
-
- private void checkForErrorsInResponseStatus(short status, long messageId, Transport transport) {
- if (log.isTraceEnabled()) {
- log.trace("Received operation status: " + status);
- }
- switch ((int) status) {
- case INVALID_MAGIC_OR_MESSAGE_ID_STATUS:
- case REQUEST_PARSING_ERROR_STATUS:
- case UNKNOWN_COMMAND_STATUS:
- case SERVER_ERROR_STATUS:
- case UNKNOWN_VERSION_STATUS: {
- String msgFromServer = transport.readString();
- if (log.isWarnEnabled()) {
- log.warn("Error status received from the server:" + msgFromServer + " for message id " + messageId);
- }
- throw new HotRodClientException(msgFromServer, messageId, status);
- }
- case COMMAND_TIMEOUT_STATUS: {
- if (log.isTraceEnabled()) {
- log.trace("timeout message received from the server");
- }
- throw new TimeoutException();
- }
- case NO_ERROR_STATUS:
- case KEY_DOES_NOT_EXIST_STATUS:
- case NOT_PUT_REMOVED_REPLACED_STATUS: {
- //don't do anything, these are correct responses
- break;
- }
- default: {
- throw new IllegalStateException("Unknown status: " + Integer.toHexString(status));
- }
- }
- }
-
- private boolean hasForceReturn(Flag[] flags) {
- if (flags == null) return false;
- for (Flag flag : flags) {
- if (flag == Flag.FORCE_RETURN_VALUE) return true;
- }
- return false;
- }
-
- private short sendKeyOperation(byte[] key, Transport transport, byte opCode, Flag[] flags, byte opRespCode) {
- // 1) write [header][key length][key]
- long messageId = writeHeader(transport, opCode, flags);
- transport.writeArray(key);
- transport.flush();
-
- // 2) now read the header
- return readHeaderAndValidate(transport, messageId, opRespCode);
- }
-
- private byte[] returnPossiblePrevValue(Transport transport, Flag[] flags) {
- if (hasForceReturn(flags)) {
- byte[] bytes = transport.readArray();
- if (log.isTraceEnabled()) log.trace("Previous value bytes is: " + Arrays.toString(bytes));
- //0-length response means null
- return bytes.length == 0 ? null : bytes;
- } else {
- return null;
- }
- }
-
- private void releaseTransport(Transport transport) {
- if (transport != null)
- transportFactory.releaseTransport(transport);
- }
-
- private VersionedOperationResponse returnVersionedOperationResponse(Transport transport, long messageId, byte response, Flag[] flags) {
- //3) ...
- short respStatus = readHeaderAndValidate(transport, messageId, response);
-
- //4 ...
- VersionedOperationResponse.RspCode code;
- if (respStatus == NO_ERROR_STATUS) {
- code = VersionedOperationResponse.RspCode.SUCCESS;
- } else if (respStatus == NOT_PUT_REMOVED_REPLACED_STATUS) {
- code = VersionedOperationResponse.RspCode.MODIFIED_KEY;
- } else if (respStatus == KEY_DOES_NOT_EXIST_STATUS) {
- code = VersionedOperationResponse.RspCode.NO_SUCH_KEY;
- } else {
- throw new IllegalStateException("Unknown response status: " + Integer.toHexString(respStatus));
- }
- byte[] prevValue = returnPossiblePrevValue(transport, flags);
- return new VersionedOperationResponse(prevValue, code);
- }
-}
Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/async/DefaultAsyncExecutorFactory.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/async/DefaultAsyncExecutorFactory.java 2010-05-18 15:39:00 UTC (rev 1804)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/async/DefaultAsyncExecutorFactory.java 2010-05-18 15:40:48 UTC (rev 1805)
@@ -4,16 +4,19 @@
import java.util.Properties;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
- * // TODO: Document this
+ * Default implementation for {@link org.infinispan.executors.ExecutorFactory} based on an {@link ThreadPoolExecutor}.
+ * Accepts following configuration parameters:
+ * <ul>
+ * <li> - default-executor-factory.poolSize = the fixed size fo the pool</li>
+ * <li> - default-executor-factory.queueSize = The size of the {@link LinkedBlockingQueue} backing up the executor</li>
+ * </ul>
*
* @author Mircea.Markus at jboss.com
* @since 4.1
Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/async/NotifyingFutureImpl.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/async/NotifyingFutureImpl.java 2010-05-18 15:39:00 UTC (rev 1804)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/async/NotifyingFutureImpl.java 2010-05-18 15:40:48 UTC (rev 1805)
@@ -10,7 +10,7 @@
import java.util.concurrent.TimeoutException;
/**
- * // TODO: Document this
+ * Notifying future implementation for async calls.
*
* @author Mircea.Markus at jboss.com
* @since 4.1
Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/consistenthash/ConsistentHash.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/consistenthash/ConsistentHash.java 2010-05-18 15:39:00 UTC (rev 1804)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/consistenthash/ConsistentHash.java 2010-05-18 15:40:48 UTC (rev 1805)
@@ -5,7 +5,7 @@
import java.util.Map;
/**
- * // TODO: Document this
+ * Abstraction for the used consistent hash.
*
* @author Mircea.Markus at jboss.com
* @since 4.1
Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/consistenthash/ConsistentHashFactory.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/consistenthash/ConsistentHashFactory.java 2010-05-18 15:39:00 UTC (rev 1804)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/consistenthash/ConsistentHashFactory.java 2010-05-18 15:40:48 UTC (rev 1805)
@@ -4,29 +4,38 @@
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
/**
- * // TODO: Document this
+ * Factory for {@link org.infinispan.client.hotrod.impl.consistenthash.ConsistentHash} function. It will try to look
+ * into the configuration for consistent hash definitions as follows:
+ * consistent-hash.[version]=[fully qualified class implementing ConsistentHash]
+ * e.g.
+ * consistent-hash.1=org.infinispan.client.hotrod.impl.consistenthash.ConsitentHashV1
+ * <p/>
+ * If no CH function is defined for a certain version, then it will be defaulted to "org.infinispan.client.hotrod.impl.ConsistentHashV[version]".
+ * E.g. if the server indicates that in use CH is version 1, and it is not defined within the configuration, it will be defaulted to
+ * org.infinispan.client.hotrod.impl.ConsistentHashV1.
*
* @author Mircea.Markus at jboss.com
* @since 4.1
*/
public class ConsistentHashFactory {
-
+
private static Log log = LogFactory.getLog(ConsistentHashFactory.class);
- public final Map<Integer, String> version2ConsistentHash = new HashMap<Integer, String>();
+ private final Map<Integer, String> version2ConsistentHash = new HashMap<Integer, String>();
public void init(Properties props) {
for (String propName : props.stringPropertyNames()) {
- if (propName.indexOf("consistent-hash") >=0) {
+ if (propName.indexOf("consistent-hash") >= 0) {
if (log.isTraceEnabled()) log.trace("Processing consistent hash: " + propName);
String versionString = propName.substring("consistent-hash.".length());
int version = Integer.parseInt(versionString);
- String hashFunction = props.getProperty(versionString);
+ String hashFunction = props.getProperty(propName);
version2ConsistentHash.put(version, hashFunction);
if (log.isTraceEnabled()) {
log.trace("Added consistent hash version " + version + ": " + hashFunction);
@@ -38,8 +47,8 @@
public ConsistentHash newConsistentHash(int version) {
String hashFunctionClass = version2ConsistentHash.get(version);
if (hashFunctionClass == null) {
- log.trace("No hash function configured for version " + version);
- hashFunctionClass = ConsistentHashFactory.class.getPackage().getName() + ".ConsitentHashV" + version;
+ if (log.isTraceEnabled()) log.trace("No hash function configured for version " + version);
+ hashFunctionClass = ConsistentHashFactory.class.getPackage().getName() + ".ConsistentHashV" + version;
if (log.isTraceEnabled()) log.trace("Trying to use default value: " + hashFunctionClass);
}
ConsistentHash consistentHash = null;
@@ -50,4 +59,8 @@
}
return consistentHash;
}
+
+ public Map<Integer, String> getVersion2ConsistentHash() {
+ return Collections.unmodifiableMap(version2ConsistentHash);
+ }
}
Copied: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/consistenthash/ConsistentHashV1.java (from rev 1800, trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/consistenthash/ConsitentHashV1.java)
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/consistenthash/ConsistentHashV1.java (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/consistenthash/ConsistentHashV1.java 2010-05-18 15:40:48 UTC (rev 1805)
@@ -0,0 +1,59 @@
+package org.infinispan.client.hotrod.impl.consistenthash;
+
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+
+import java.net.InetSocketAddress;
+import java.util.LinkedHashMap;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import static org.infinispan.util.hash.MurmurHash2.hash;
+
+/**
+ * Version one consistent hash function based on {@link org.infinispan.util.hash.MurmurHash2};
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public class ConsistentHashV1 implements ConsistentHash {
+
+ private static Log log = LogFactory.getLog(ConsistentHashV1.class);
+
+ private final SortedMap<Integer, InetSocketAddress> positions = new TreeMap<Integer, InetSocketAddress>();
+
+ private volatile int hashSpace;
+
+ @Override
+ public void init(LinkedHashMap<InetSocketAddress,Integer> servers2HashCode, int numKeyOwners, int hashSpace) {
+ for (InetSocketAddress addr :servers2HashCode.keySet()) {
+ positions.put(servers2HashCode.get(addr), addr);
+ }
+ if (log.isTraceEnabled())
+ log.trace("Positions are: " + positions);
+ this.hashSpace = hashSpace;
+ }
+
+ @Override
+ public InetSocketAddress getServer(byte[] key) {
+ int keyHashCode = hash(key);
+ if (keyHashCode == Integer.MIN_VALUE) keyHashCode += 1;
+ int hash = Math.abs(keyHashCode);
+
+ SortedMap<Integer, InetSocketAddress> candidates = positions.tailMap(hash % hashSpace);
+ if (log.isTraceEnabled()) {
+ log.trace("Found possible candidates: " + candidates);
+ }
+ if (candidates.isEmpty()) {
+ InetSocketAddress socketAddress = positions.get(positions.firstKey());
+ if (log.isTraceEnabled()) {
+ log.trace("Over the wheel, returning first member: " + socketAddress);
+ }
+ return socketAddress;
+ } else {
+ InetSocketAddress socketAddress = candidates.get(candidates.firstKey());
+ log.trace("Found candidate: " + socketAddress);
+ return socketAddress;
+ }
+ }
+}
Deleted: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/consistenthash/ConsitentHashV1.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/consistenthash/ConsitentHashV1.java 2010-05-18 15:39:00 UTC (rev 1804)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/consistenthash/ConsitentHashV1.java 2010-05-18 15:40:48 UTC (rev 1805)
@@ -1,59 +0,0 @@
-package org.infinispan.client.hotrod.impl.consistenthash;
-
-import org.infinispan.util.logging.Log;
-import org.infinispan.util.logging.LogFactory;
-
-import java.net.InetSocketAddress;
-import java.util.LinkedHashMap;
-import java.util.SortedMap;
-import java.util.TreeMap;
-
-import static org.infinispan.util.hash.MurmurHash2.hash;
-
-/**
- * // TODO: Document this
- *
- * @author Mircea.Markus at jboss.com
- * @since 4.1
- */
-public class ConsitentHashV1 implements ConsistentHash {
-
- private static Log log = LogFactory.getLog(ConsitentHashV1.class);
-
- private final SortedMap<Integer, InetSocketAddress> positions = new TreeMap<Integer, InetSocketAddress>();
-
- private volatile int hashSpace;
-
- @Override
- public void init(LinkedHashMap<InetSocketAddress,Integer> servers2HashCode, int numKeyOwners, int hashSpace) {
- for (InetSocketAddress addr :servers2HashCode.keySet()) {
- positions.put(servers2HashCode.get(addr), addr);
- }
- if (log.isTraceEnabled())
- log.trace("Positions are: " + positions);
- this.hashSpace = hashSpace;
- }
-
- @Override
- public InetSocketAddress getServer(byte[] key) {
- int keyHashCode = hash(key);
- if (keyHashCode == Integer.MIN_VALUE) keyHashCode += 1;
- int hash = Math.abs(keyHashCode);
-
- SortedMap<Integer, InetSocketAddress> candidates = positions.tailMap(hash % hashSpace);
- if (log.isTraceEnabled()) {
- log.trace("Found possible candidates: " + candidates);
- }
- if (candidates.isEmpty()) {
- InetSocketAddress socketAddress = positions.get(positions.firstKey());
- if (log.isTraceEnabled()) {
- log.trace("Over the wheel, returning first member: " + socketAddress);
- }
- return socketAddress;
- } else {
- InetSocketAddress socketAddress = candidates.get(candidates.firstKey());
- log.trace("Found candidate: " + socketAddress);
- return socketAddress;
- }
- }
-}
Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodOperationsHelper.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodOperationsHelper.java (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodOperationsHelper.java 2010-05-18 15:40:48 UTC (rev 1805)
@@ -0,0 +1,162 @@
+package org.infinispan.client.hotrod.impl.protocol;
+
+import org.infinispan.client.hotrod.Flag;
+import org.infinispan.client.hotrod.exceptions.HotRodClientException;
+import org.infinispan.client.hotrod.exceptions.InvalidResponseException;
+import org.infinispan.client.hotrod.exceptions.HotRodTimeoutException;
+import org.infinispan.client.hotrod.impl.protocol.HotrodConstants;
+import org.infinispan.client.hotrod.impl.transport.Transport;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+
+import java.net.InetSocketAddress;
+import java.util.LinkedHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Helper class for factorizing common parts of read/write operations.
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public class HotRodOperationsHelper {
+ static Log log = LogFactory.getLog(HotrodOperationsImpl.class);
+ static final AtomicLong MSG_ID = new AtomicLong();
+ final static byte CLIENT_INTELLIGENCE = HotrodConstants.CLIENT_INTELLIGENCE_HASH_DISTRIBUTION_AWARE;
+
+ public static long writeHeader(Transport transport, short operationCode, String cacheName, AtomicInteger topologyId, Flag... flags) {
+ transport.writeByte(HotrodConstants.REQUEST_MAGIC);
+ long messageId = MSG_ID.incrementAndGet();
+ transport.writeVLong(messageId);
+ transport.writeByte(HotrodConstants.HOTROD_VERSION);
+ transport.writeByte(operationCode);
+ transport.writeArray(cacheName.getBytes());
+
+ int flagInt = 0;
+ if (flags != null) {
+ for (Flag flag : flags) {
+ flagInt = flag.getFlagInt() | flagInt;
+ }
+ }
+ transport.writeVInt(flagInt);
+ transport.writeByte(CLIENT_INTELLIGENCE);
+ transport.writeVInt(topologyId.get());
+ if (log.isTraceEnabled()) {
+ log.trace("wrote header for message " + messageId + ". Operation code: " + operationCode + ". Flags: " + Integer.toHexString(flagInt));
+ }
+ return messageId;
+ }
+
+ /**
+ * Magic | Message Id | Op code | Status | Topology Change Marker
+ */
+ public static short readHeaderAndValidate(Transport transport, long messageId, short opRespCode, AtomicInteger topologyId) {
+ short magic = transport.readByte();
+ if (magic != HotrodConstants.RESPONSE_MAGIC) {
+ String message = "Invalid magic number. Expected " + Integer.toHexString(HotrodConstants.RESPONSE_MAGIC) + " and received " + Integer.toHexString(magic);
+ log.error(message);
+ throw new InvalidResponseException(message);
+ }
+ long receivedMessageId = transport.readVLong();
+ if (receivedMessageId != messageId) {
+ String message = "Invalid message id. Expected " + Long.toHexString(messageId) + " and received " + Long.toHexString(receivedMessageId);
+ log.error(message);
+ throw new InvalidResponseException(message);
+ }
+ if (log.isTraceEnabled()) {
+ log.trace("Received response for message id: " + receivedMessageId);
+ }
+ short receivedOpCode = transport.readByte();
+ if (receivedOpCode != opRespCode) {
+ if (receivedOpCode == HotrodConstants.ERROR_RESPONSE) {
+ checkForErrorsInResponseStatus(transport.readByte(), messageId, transport);
+ throw new IllegalStateException("Error expected! (i.e. exception in the prev statement)");
+ }
+ throw new InvalidResponseException("Invalid response operation. Expected " + Integer.toHexString(opRespCode) + " and received " + Integer.toHexString(receivedOpCode));
+ }
+ if (log.isTraceEnabled()) {
+ log.trace("Received operation code is: " + receivedOpCode);
+ }
+ short status = transport.readByte();
+ checkForErrorsInResponseStatus(status, messageId, transport);
+ short topologyChangeByte = transport.readByte();
+ if (topologyChangeByte == 1) {
+ readNewTopologyAndHash(transport, topologyId);
+ }
+ return status;
+ }
+
+ static void readNewTopologyAndHash(Transport transport, AtomicInteger topologyId) {
+ int newTopologyId = transport.readVInt();
+ topologyId.set(newTopologyId);
+ int numKeyOwners = transport.readUnsignedShort();
+ short hashFunctionVersion = transport.readByte();
+ int hashSpace = transport.readVInt();
+ int clusterSize = transport.readVInt();
+
+ if (log.isTraceEnabled()) {
+ log.trace("Topology change request: newTopologyId=" + newTopologyId + ", numKeyOwners=" + numKeyOwners +
+ ", hashFunctionVersion=" + hashFunctionVersion + ", hashSpaceSize=" + hashSpace + ", clusterSize=" + clusterSize);
+ }
+
+ LinkedHashMap<InetSocketAddress, Integer> servers2HashCode = new LinkedHashMap<InetSocketAddress, Integer>();
+
+ for (int i = 0; i < clusterSize; i++) {
+ String host = transport.readString();
+ int port = transport.readUnsignedShort();
+ if (log.isTraceEnabled()) {
+ log.trace("Server read:" + host + ":" + port);
+ }
+ int hashCode = transport.read4ByteInt();
+ servers2HashCode.put(new InetSocketAddress(host, port), hashCode);
+ if (log.isTraceEnabled()) {
+ log.trace("Hash code is: " + hashCode);
+ }
+ }
+ if (log.isInfoEnabled()) {
+ log.info("New topology: " + servers2HashCode);
+ }
+ transport.getTransportFactory().updateServers(servers2HashCode.keySet());
+ if (hashFunctionVersion == 0) {
+ if (log.isTraceEnabled())
+ log.trace("Not using a consistent hash function (hash function version == 0).");
+ } else {
+ transport.getTransportFactory().updateHashFunction(servers2HashCode, numKeyOwners, hashFunctionVersion, hashSpace);
+ }
+ }
+
+ static void checkForErrorsInResponseStatus(short status, long messageId, Transport transport) {
+ if (log.isTraceEnabled()) {
+ log.trace("Received operation status: " + status);
+ }
+ switch ((int) status) {
+ case HotrodConstants.INVALID_MAGIC_OR_MESSAGE_ID_STATUS:
+ case HotrodConstants.REQUEST_PARSING_ERROR_STATUS:
+ case HotrodConstants.UNKNOWN_COMMAND_STATUS:
+ case HotrodConstants.SERVER_ERROR_STATUS:
+ case HotrodConstants.UNKNOWN_VERSION_STATUS: {
+ String msgFromServer = transport.readString();
+ if (log.isWarnEnabled()) {
+ log.warn("Error status received from the server:" + msgFromServer + " for message id " + messageId);
+ }
+ throw new HotRodClientException(msgFromServer, messageId, status);
+ }
+ case HotrodConstants.COMMAND_TIMEOUT_STATUS: {
+ if (log.isTraceEnabled()) {
+ log.trace("timeout message received from the server");
+ }
+ throw new HotRodTimeoutException();
+ }
+ case HotrodConstants.NO_ERROR_STATUS:
+ case HotrodConstants.KEY_DOES_NOT_EXIST_STATUS:
+ case HotrodConstants.NOT_PUT_REMOVED_REPLACED_STATUS: {
+ //don't do anything, these are correct responses
+ break;
+ }
+ default: {
+ throw new IllegalStateException("Unknown status: " + Integer.toHexString(status));
+ }
+ }
+ }
+}
Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotrodConstants.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotrodConstants.java (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotrodConstants.java 2010-05-18 15:40:48 UTC (rev 1805)
@@ -0,0 +1,61 @@
+package org.infinispan.client.hotrod.impl.protocol;
+
+/**
+ * Defines constants defined by Hotrod specifications.
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public interface HotrodConstants {
+
+ public static final short REQUEST_MAGIC = 0xA0;
+ public static final short RESPONSE_MAGIC = 0xA1;
+
+ public static final byte HOTROD_VERSION = 10;
+
+ //requests
+ public static final byte PUT_REQUEST = 0x01;
+ public static final byte GET_REQUEST = 0x03;
+ public static final byte PUT_IF_ABSENT_REQUEST = 0x05;
+ public static final byte REPLACE_REQUEST = 0x07;
+ public static final byte REPLACE_IF_UNMODIFIED_REQUEST = 0x09;
+ public static final byte REMOVE_REQUEST = 0x0B;
+ public static final byte REMOVE_IF_UNMODIFIED_REQUEST = 0x0D;
+ public static final byte CONTAINS_KEY_REQUEST = 0x0F;
+ public static final byte GET_WITH_VERSION = 0x11;
+ public static final byte CLEAR_REQUEST = 0x13;
+ public static final byte STATS_REQUEST = 0x15;
+ public static final byte PING_REQUEST = 0x17;
+
+
+ //responses
+ public static final byte PUT_RESPONSE = 0x02;
+ public static final byte GET_RESPONSE = 0x04;
+ public static final byte PUT_IF_ABSENT_RESPONSE = 0x06;
+ public static final byte REPLACE_RESPONSE = 0x08;
+ public static final byte REPLACE_IF_UNMODIFIED_RESPONSE = 0x0A;
+ public static final byte REMOVE_RESPONSE = 0x0C;
+ public static final byte REMOVE_IF_UNMODIFIED_RESPONSE = 0x0E;
+ public static final byte CONTAINS_KEY_RESPONSE = 0x10;
+ public static final byte GET_WITH_VERSION_RESPONSE = 0x12;
+ public static final byte CLEAR_RESPONSE = 0x14;
+ public static final byte STATS_RESPONSE = 0x16;
+ public static final byte PING_RESPONSE = 0x18;
+ public static final byte ERROR_RESPONSE = 0x50;
+
+ //response status
+ public static final byte NO_ERROR_STATUS = 0x00;
+ public static final int INVALID_MAGIC_OR_MESSAGE_ID_STATUS = 0x81;
+ public static final int REQUEST_PARSING_ERROR_STATUS = 0x84;
+ public static final byte NOT_PUT_REMOVED_REPLACED_STATUS = 0x01;
+ public static final int UNKNOWN_COMMAND_STATUS = 0x82;
+ public static final int SERVER_ERROR_STATUS = 0x85;
+ public static final int KEY_DOES_NOT_EXIST_STATUS = 0x02;
+ public static final int UNKNOWN_VERSION_STATUS = 0x83;
+ public static final int COMMAND_TIMEOUT_STATUS = 0x86;
+
+
+ public static final byte CLIENT_INTELLIGENCE_BASIC = 0x01;
+ public static final byte CLIENT_INTELLIGENCE_TOPOLOGY_AWARE = 0x02;
+ public static final byte CLIENT_INTELLIGENCE_HASH_DISTRIBUTION_AWARE = 0x03;
+}
Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotrodOperations.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotrodOperations.java (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotrodOperations.java 2010-05-18 15:40:48 UTC (rev 1805)
@@ -0,0 +1,68 @@
+package org.infinispan.client.hotrod.impl.protocol;
+
+
+import org.infinispan.client.hotrod.Flag;
+import org.infinispan.client.hotrod.impl.BinaryVersionedValue;
+import org.infinispan.client.hotrod.impl.VersionedOperationResponse;
+
+import java.util.Map;
+
+/**
+ * Defines the hotrod operations as described in the protocol spec: http://community.jboss.org/wiki/HotRodProtocol
+ *
+ * - TODO - enforce encoding and add such tests
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public interface HotrodOperations {
+
+ public byte[] get(byte[] key, Flag... flags);
+
+ public byte[] remove(byte[] key, Flag... flags);
+
+ public boolean containsKey(byte[] key, Flag... flags);
+
+ /**
+ * Returns null if the given key does not exist.
+ */
+ public BinaryVersionedValue getWithVersion(byte[] key, Flag... flags);
+
+ /**
+ * @param lifespan number of seconds that a entry during which the entry is allowed to life.
+ * If number of seconds is bigger than 30 days, this number of seconds is treated as UNIX time and so, represents
+ * the number of seconds since 1/1/1970. If set to 0, lifespan is unlimited.
+ * @param maxIdle Number of seconds that a entry can be idle before it's evicted from the cache. If 0, no max
+ * @param flags
+ */
+ public byte[] put(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags);
+
+ /**
+ * @param lifespan same as in {@link #put(byte[],byte[],int,int,org.infinispan.client.hotrod.Flag...)}
+ * @param maxIdle same as in {@link #put(byte[],byte[],int,int,org.infinispan.client.hotrod.Flag...)}
+ * @param flags
+ */
+ public byte[] putIfAbsent(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags);
+
+ /**
+ * @param lifespan same as in {@link #put(byte[],byte[],int,int,org.infinispan.client.hotrod.Flag...)}
+ * @param maxIdle same as in {@link #put(byte[],byte[],int,int,org.infinispan.client.hotrod.Flag...)}
+ * @param flags
+ */
+ public byte[] replace(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags);
+
+ /**
+ * @param lifespan same as in {@link #put(byte[],byte[],int,int,org.infinispan.client.hotrod.Flag...)}
+ * @param maxIdle same as in {@link #put(byte[],byte[],int,int,org.infinispan.client.hotrod.Flag...)}
+ * @param flags
+ */
+ public VersionedOperationResponse replaceIfUnmodified(byte[] key, byte[] value, int lifespan, int maxIdle, long version, Flag... flags);
+
+ public VersionedOperationResponse removeIfUnmodified(byte[] key, long version, Flag... flags);
+
+ public void clear(Flag... flags);
+
+ public Map<String, String> stats();
+
+ public boolean ping();
+}
Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotrodOperationsImpl.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotrodOperationsImpl.java (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotrodOperationsImpl.java 2010-05-18 15:40:48 UTC (rev 1805)
@@ -0,0 +1,320 @@
+package org.infinispan.client.hotrod.impl.protocol;
+
+import org.infinispan.client.hotrod.Flag;
+import org.infinispan.client.hotrod.exceptions.InvalidResponseException;
+import org.infinispan.client.hotrod.exceptions.TransportException;
+import org.infinispan.client.hotrod.impl.BinaryVersionedValue;
+import org.infinispan.client.hotrod.impl.VersionedOperationResponse;
+import org.infinispan.client.hotrod.impl.protocol.HotrodConstants;
+import org.infinispan.client.hotrod.impl.transport.Transport;
+import org.infinispan.client.hotrod.impl.transport.TransportFactory;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public class HotrodOperationsImpl implements HotrodOperations, HotrodConstants {
+
+ private static Log log = LogFactory.getLog(HotrodOperationsImpl.class);
+
+ private final String cacheName;
+ private TransportFactory transportFactory;
+ private final AtomicInteger topologyId;
+
+ public HotrodOperationsImpl(String cacheName, TransportFactory transportFactory, AtomicInteger topologyId) {
+ this.cacheName = cacheName; //todo add charset here
+ this.transportFactory = transportFactory;
+ this.topologyId = topologyId;
+ }
+
+ public byte[] get(byte[] key, Flag[] flags) {
+ Transport transport = transportFactory.getTransport(key);
+ try {
+ short status = sendKeyOperation(key, transport, GET_REQUEST, flags, GET_RESPONSE);
+ if (status == KEY_DOES_NOT_EXIST_STATUS) {
+ return null;
+ }
+ if (status == NO_ERROR_STATUS) {
+ return transport.readArray();
+ }
+ } finally {
+ releaseTransport(transport);
+ }
+ throw new IllegalStateException("We should not reach here!");
+ }
+
+ public byte[] remove(byte[] key, Flag[] flags) {
+ Transport transport = transportFactory.getTransport(key);
+ try {
+ short status = sendKeyOperation(key, transport, REMOVE_REQUEST, flags, REMOVE_RESPONSE);
+ if (status == KEY_DOES_NOT_EXIST_STATUS) {
+ return null;
+ } else if (status == NO_ERROR_STATUS) {
+ return returnPossiblePrevValue(transport, flags);
+ }
+ } finally {
+ releaseTransport(transport);
+ }
+ throw new IllegalStateException("We should not reach here!");
+ }
+
+ public boolean containsKey(byte[] key, Flag... flags) {
+ Transport transport = transportFactory.getTransport(key);
+ try {
+ short status = sendKeyOperation(key, transport, CONTAINS_KEY_REQUEST, flags, CONTAINS_KEY_RESPONSE);
+ if (status == KEY_DOES_NOT_EXIST_STATUS) {
+ return false;
+ } else if (status == NO_ERROR_STATUS) {
+ return true;
+ }
+ } finally {
+ releaseTransport(transport);
+ }
+ throw new IllegalStateException("We should not reach here!");
+ }
+
+ public BinaryVersionedValue getWithVersion(byte[] key, Flag... flags) {
+ Transport transport = transportFactory.getTransport(key);
+ try {
+ short status = sendKeyOperation(key, transport, GET_WITH_VERSION, flags, GET_WITH_VERSION_RESPONSE);
+ if (status == KEY_DOES_NOT_EXIST_STATUS) {
+ return null;
+ }
+ if (status == NO_ERROR_STATUS) {
+ long version = transport.readLong();
+ if (log.isTraceEnabled()) {
+ log.trace("Received version: " + version);
+ }
+ byte[] value = transport.readArray();
+ return new BinaryVersionedValue(version, value);
+ }
+ } finally {
+ releaseTransport(transport);
+ }
+ throw new IllegalStateException("We should not reach here!");
+ }
+
+
+ public byte[] put(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags) {
+ Transport transport = transportFactory.getTransport(key);
+ try {
+ short status = sendPutOperation(key, value, transport, PUT_REQUEST, PUT_RESPONSE, lifespan, maxIdle, flags);
+ if (status != NO_ERROR_STATUS) {
+ throw new InvalidResponseException("Unexpected response status: " + Integer.toHexString(status));
+ }
+ return returnPossiblePrevValue(transport, flags);
+ } finally {
+ releaseTransport(transport);
+ }
+ }
+
+ public byte[] putIfAbsent(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags) {
+ Transport transport = transportFactory.getTransport(key);
+ try {
+ short status = sendPutOperation(key, value, transport, PUT_IF_ABSENT_REQUEST, PUT_IF_ABSENT_RESPONSE, lifespan, maxIdle, flags);
+ if (status == NO_ERROR_STATUS || status == NOT_PUT_REMOVED_REPLACED_STATUS) {
+ byte[] bytes = returnPossiblePrevValue(transport, flags);
+ if (log.isTraceEnabled()) {
+ log.trace("Returning from putIfAbsent: " + Arrays.toString(bytes));
+ }
+ return bytes;
+ }
+ } finally {
+ releaseTransport(transport);
+ }
+ throw new IllegalStateException("We should not reach here!");
+ }
+
+ public byte[] replace(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags) {
+ Transport transport = transportFactory.getTransport(key);
+ try {
+ short status = sendPutOperation(key, value, transport, REPLACE_REQUEST, REPLACE_RESPONSE, lifespan, maxIdle, flags);
+ if (status == NO_ERROR_STATUS || status == NOT_PUT_REMOVED_REPLACED_STATUS) {
+ return returnPossiblePrevValue(transport, flags);
+ }
+ } finally {
+ releaseTransport(transport);
+ }
+ throw new IllegalStateException("We should not reach here!");
+ }
+
+ /**
+ * request : [header][key length][key][lifespan][max idle][entry_version][value length][value] response: If
+ * ForceReturnPreviousValue has been passed, this responses will contain previous [value length][value] for that key.
+ * If the key does not exist or previous was null, value length would be 0. Otherwise, if no ForceReturnPreviousValue
+ * was sent, the response would be empty.
+ */
+ public VersionedOperationResponse replaceIfUnmodified(byte[] key, byte[] value, int lifespan, int maxIdle, long version, Flag... flags) {
+ Transport transport = transportFactory.getTransport(key);
+ try {
+ // 1) write header
+ long messageId = HotRodOperationsHelper.writeHeader(transport, REPLACE_IF_UNMODIFIED_REQUEST, cacheName, topologyId, flags);
+
+ //2) write message body
+ transport.writeArray(key);
+ transport.writeVInt(lifespan);
+ transport.writeVInt(maxIdle);
+ transport.writeLong(version);
+ transport.writeArray(value);
+ return returnVersionedOperationResponse(transport, messageId, REPLACE_IF_UNMODIFIED_RESPONSE, flags);
+ } finally {
+ releaseTransport(transport);
+ }
+ }
+
+ /**
+ * Request: [header][key length][key][entry_version]
+ */
+ public VersionedOperationResponse removeIfUnmodified(byte[] key, long version, Flag... flags) {
+ Transport transport = transportFactory.getTransport(key);
+ try {
+ // 1) write header
+ long messageId = HotRodOperationsHelper.writeHeader(transport, REMOVE_IF_UNMODIFIED_REQUEST, cacheName, topologyId, flags);
+
+ //2) write message body
+ transport.writeArray(key);
+ transport.writeLong(version);
+
+ //process response and return
+ return returnVersionedOperationResponse(transport, messageId, REMOVE_IF_UNMODIFIED_RESPONSE, flags);
+
+ } finally {
+ releaseTransport(transport);
+ }
+ }
+
+ public void clear(Flag... flags) {
+ Transport transport = transportFactory.getTransport();
+ try {
+ // 1) write header
+ long messageId = HotRodOperationsHelper.writeHeader(transport, CLEAR_REQUEST, cacheName, topologyId, flags);
+ HotRodOperationsHelper.readHeaderAndValidate(transport, messageId, CLEAR_RESPONSE, topologyId);
+ } finally {
+ releaseTransport(transport);
+ }
+ }
+
+ public Map<String, String> stats() {
+ Transport transport = transportFactory.getTransport();
+ try {
+ // 1) write header
+ long messageId = HotRodOperationsHelper.writeHeader(transport, STATS_REQUEST, cacheName, topologyId);
+ HotRodOperationsHelper.readHeaderAndValidate(transport, messageId, STATS_RESPONSE, topologyId);
+ int nrOfStats = transport.readVInt();
+
+ Map<String, String> result = new HashMap<String, String>();
+ for (int i = 0; i < nrOfStats; i++) {
+ String statName = transport.readString();
+ String statValue = transport.readString();
+ result.put(statName, statValue);
+ }
+ return result;
+ } finally {
+ releaseTransport(transport);
+ }
+ }
+
+ @Override
+ public boolean ping() {
+ Transport transport = null;
+ try {
+ transport = transportFactory.getTransport();
+ // 1) write header
+ long messageId = HotRodOperationsHelper.writeHeader(transport, PING_REQUEST, cacheName, topologyId);
+ short respStatus = HotRodOperationsHelper.readHeaderAndValidate(transport, messageId, HotrodConstants.PING_RESPONSE, topologyId);
+ if (respStatus == NO_ERROR_STATUS) {
+ return true;
+ }
+ throw new IllegalStateException("Unknown response status: " + Integer.toHexString(respStatus));
+ } catch (TransportException te) {
+ log.trace("Exception while ping", te);
+ return false;
+ }
+ finally {
+ releaseTransport(transport);
+ }
+ }
+
+ //[header][key length][key][lifespan][max idle][value length][value]
+
+ private short sendPutOperation(byte[] key, byte[] value, Transport transport, short opCode, byte opRespCode, int lifespan, int maxIdle, Flag[] flags) {
+ // 1) write header
+ long messageId = HotRodOperationsHelper.writeHeader(transport, opCode, cacheName, topologyId, flags);
+
+ // 2) write key and value
+ transport.writeArray(key);
+ transport.writeVInt(lifespan);
+ transport.writeVInt(maxIdle);
+ transport.writeArray(value);
+ transport.flush();
+
+ // 3) now read header
+
+ //return status (not error status for sure)
+ return HotRodOperationsHelper.readHeaderAndValidate(transport, messageId, opRespCode, topologyId);
+ }
+
+ /*
+ * Magic | MessageId | Version | Opcode | CacheNameLength | CacheName | Flags | Client Intelligence | Topology Id
+ */
+
+ private boolean hasForceReturn(Flag[] flags) {
+ if (flags == null) return false;
+ for (Flag flag : flags) {
+ if (flag == Flag.FORCE_RETURN_VALUE) return true;
+ }
+ return false;
+ }
+
+ private short sendKeyOperation(byte[] key, Transport transport, byte opCode, Flag[] flags, byte opRespCode) {
+ // 1) write [header][key length][key]
+ long messageId = HotRodOperationsHelper.writeHeader(transport, opCode, cacheName, topologyId, flags);
+ transport.writeArray(key);
+ transport.flush();
+
+ // 2) now read the header
+ return HotRodOperationsHelper.readHeaderAndValidate(transport, messageId, opRespCode, topologyId);
+ }
+
+ private byte[] returnPossiblePrevValue(Transport transport, Flag[] flags) {
+ if (hasForceReturn(flags)) {
+ byte[] bytes = transport.readArray();
+ if (log.isTraceEnabled()) log.trace("Previous value bytes is: " + Arrays.toString(bytes));
+ //0-length response means null
+ return bytes.length == 0 ? null : bytes;
+ } else {
+ return null;
+ }
+ }
+
+ private void releaseTransport(Transport transport) {
+ if (transport != null)
+ transportFactory.releaseTransport(transport);
+ }
+
+ private VersionedOperationResponse returnVersionedOperationResponse(Transport transport, long messageId, byte response, Flag[] flags) {
+ //3) ...
+ short respStatus = HotRodOperationsHelper.readHeaderAndValidate(transport, messageId, response, topologyId);
+
+ //4 ...
+ VersionedOperationResponse.RspCode code;
+ if (respStatus == NO_ERROR_STATUS) {
+ code = VersionedOperationResponse.RspCode.SUCCESS;
+ } else if (respStatus == NOT_PUT_REMOVED_REPLACED_STATUS) {
+ code = VersionedOperationResponse.RspCode.MODIFIED_KEY;
+ } else if (respStatus == KEY_DOES_NOT_EXIST_STATUS) {
+ code = VersionedOperationResponse.RspCode.NO_SUCH_KEY;
+ } else {
+ throw new IllegalStateException("Unknown response status: " + Integer.toHexString(respStatus));
+ }
+ byte[] prevValue = returnPossiblePrevValue(transport, flags);
+ return new VersionedOperationResponse(prevValue, code);
+ }
+}
Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/AbstractTransport.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/AbstractTransport.java 2010-05-18 15:39:00 UTC (rev 1804)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/AbstractTransport.java 2010-05-18 15:40:48 UTC (rev 1805)
@@ -3,8 +3,10 @@
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
+import java.nio.charset.Charset;
+
/**
- * // TODO: Document this
+ * Support class for transport implementations.
*
* @author Mircea.Markus at jboss.com
* @since 4.1
@@ -13,6 +15,14 @@
private static Log log = LogFactory.getLog(AbstractTransport.class);
+ private static final Charset CHARSET = Charset.forName("UTF-8");
+
+ private final TransportFactory transportFactory;
+
+ protected AbstractTransport(TransportFactory transportFactory) {
+ this.transportFactory = transportFactory;
+ }
+
public byte[] readArray() {
int responseLength = readVInt();
return readByteArray(responseLength);
@@ -21,16 +31,15 @@
@Override
public String readString() {
byte[] strContent = readArray();
- String readString = new String(strContent);
+ String readString = new String(strContent, CHARSET);
if (log.isTraceEnabled()) {
log.trace("Read string is: " + readString);
}
- return readString;//todo take care of encoding here
+ return readString;
}
@Override
public long readLong() {
- //todo - optimize this not to create the longBytes on every call, but reuse it/cache it as class is NOT thread safe
byte[] longBytes = readByteArray(8);
long result = 0;
for (byte longByte : longBytes) {
@@ -71,6 +80,11 @@
return value;
}
+ @Override
+ public TransportFactory getTransportFactory() {
+ return transportFactory;
+ }
+
public void writeArray(byte[] toAppend) {
writeVInt(toAppend.length);
writeBytes(toAppend);
Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/Transport.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/Transport.java 2010-05-18 15:39:00 UTC (rev 1804)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/Transport.java 2010-05-18 15:40:48 UTC (rev 1805)
@@ -1,16 +1,15 @@
package org.infinispan.client.hotrod.impl.transport;
-import net.jcip.annotations.NotThreadSafe;
-
/**
- * // TODO: Document this
+ * Transport abstraction.
*
- * @author mmarkus
+ * @author Mircea.Markus at jboss.com
* @since 4.1
*/
- at NotThreadSafe
public interface Transport {
+ public TransportFactory getTransportFactory();
+
public void writeArray(byte[] toAppend);
public void writeByte(short toWrite);
Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/TransportFactory.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/TransportFactory.java 2010-05-18 15:39:00 UTC (rev 1804)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/TransportFactory.java 2010-05-18 15:40:48 UTC (rev 1805)
@@ -5,9 +5,10 @@
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
/**
- * // TODO: Document this
+ * Transport factory for building and managing {@link org.infinispan.client.hotrod.impl.transport.Transport} objects.
*
* @author Mircea.Markus at jboss.com
* @since 4.1
@@ -20,7 +21,7 @@
public void releaseTransport(Transport transport);
- void start(Properties props, Collection<InetSocketAddress> staticConfiguredServers);
+ void start(Properties props, Collection<InetSocketAddress> staticConfiguredServers, AtomicInteger topologyId);
void updateServers(Collection<InetSocketAddress> newServers);
Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/VHelper.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/VHelper.java 2010-05-18 15:39:00 UTC (rev 1804)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/VHelper.java 2010-05-18 15:40:48 UTC (rev 1805)
@@ -10,9 +10,9 @@
import java.io.OutputStream;
/**
- * // TODO: Document this
+ * Helper for handling v-operations.
*
- * @author mmarkus
+ * @author Mircea.Markus at jboss.com
* @since 4.1
*/
public class VHelper {
Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/HotrodClientDecoder.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/HotrodClientDecoder.java 2010-05-18 15:39:00 UTC (rev 1804)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/HotrodClientDecoder.java 2010-05-18 15:40:48 UTC (rev 1805)
@@ -9,8 +9,6 @@
import org.jboss.netty.handler.codec.frame.FrameDecoder;
/**
- * // TODO: Document this
- *
* @author Mircea.Markus at jboss.com
* @since 4.1
*/
Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/HotrodClientEncoder.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/HotrodClientEncoder.java 2010-05-18 15:39:00 UTC (rev 1804)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/HotrodClientEncoder.java 2010-05-18 15:40:48 UTC (rev 1805)
@@ -11,8 +11,6 @@
import static org.jboss.netty.buffer.ChannelBuffers.*;
/**
- * // TODO: Document this
- *
* @author Mircea.Markus at jboss.com
* @since 4.1
*/
Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/HotrodClientPipelaneFactory.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/HotrodClientPipelaneFactory.java 2010-05-18 15:39:00 UTC (rev 1804)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/HotrodClientPipelaneFactory.java 2010-05-18 15:40:48 UTC (rev 1805)
@@ -11,8 +11,6 @@
import static org.jboss.netty.channel.Channels.*;
/**
- * // TODO: Document this
- *
* @author Mircea.Markus at jboss.com
* @since 4.1
*/
Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/InputStreamAdapter.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/InputStreamAdapter.java 2010-05-18 15:39:00 UTC (rev 1804)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/InputStreamAdapter.java 2010-05-18 15:40:48 UTC (rev 1805)
@@ -7,8 +7,6 @@
import java.io.InputStream;
/**
- * // TODO: Document this
- *
* @author Mircea.Markus at jboss.com
* @since 4.1
*/
Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransport.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransport.java 2010-05-18 15:39:00 UTC (rev 1804)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransport.java 2010-05-18 15:40:48 UTC (rev 1805)
@@ -2,6 +2,7 @@
import org.infinispan.client.hotrod.exceptions.TransportException;
import org.infinispan.client.hotrod.impl.transport.AbstractTransport;
+import org.infinispan.client.hotrod.impl.transport.TransportFactory;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.jboss.netty.bootstrap.ClientBootstrap;
@@ -13,8 +14,6 @@
import java.util.concurrent.Executors;
/**
- * // TODO: Document this
- *
* @author Mircea.Markus at jboss.com
* @since 4.1
*/
@@ -28,7 +27,8 @@
private HotrodClientDecoder decoder = new HotrodClientDecoder();
- public NettyTransport(InetSocketAddress serverAddress) {
+ public NettyTransport(InetSocketAddress serverAddress, TransportFactory transportFactory) {
+ super(transportFactory);
this.serverAddress = serverAddress;
init();
}
Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransportFactory.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransportFactory.java 2010-05-18 15:39:00 UTC (rev 1804)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransportFactory.java 2010-05-18 15:40:48 UTC (rev 1805)
@@ -9,10 +9,9 @@
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
/**
- * // TODO: Document this
- *
* @author Mircea.Markus at jboss.com
* @since 4.1
*/
@@ -24,7 +23,7 @@
private Collection<InetSocketAddress> serverAddresses;
@Override
- public void start(Properties props, Collection<InetSocketAddress> staticConfiguredServers) {
+ public void start(Properties props, Collection<InetSocketAddress> staticConfiguredServers, AtomicInteger topologyId) {
this.serverAddresses = staticConfiguredServers;
serverAddr = serverAddresses.iterator().next();
}
@@ -37,7 +36,7 @@
@Override
public Transport getTransport() {
log.info("Connecting to server on: " + serverAddr);
- return new NettyTransport(serverAddr);
+ return new NettyTransport(serverAddr, this);
}
@Override
Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/OutputStreamAdapter.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/OutputStreamAdapter.java 2010-05-18 15:39:00 UTC (rev 1804)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/OutputStreamAdapter.java 2010-05-18 15:40:48 UTC (rev 1805)
@@ -6,8 +6,6 @@
import java.io.OutputStream;
/**
- * // TODO: Document this
- *
* @author Mircea.Markus at jboss.com
* @since 4.1
*/
Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/PropsKeyedObjectPoolFactory.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/PropsKeyedObjectPoolFactory.java 2010-05-18 15:39:00 UTC (rev 1804)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/PropsKeyedObjectPoolFactory.java 2010-05-18 15:40:48 UTC (rev 1805)
@@ -9,8 +9,6 @@
import java.util.Properties;
/**
- * // TODO: Document this
- *
* @author Mircea.Markus at jboss.com
* @since 4.1
*/
@@ -21,16 +19,15 @@
public PropsKeyedObjectPoolFactory(KeyedPoolableObjectFactory factory, Properties props) {
super(factory);
- _maxActive = intProp(props, "maxActive", 1);
+ _maxActive = intProp(props, "maxActive", 2);
_maxTotal = intProp(props, "maxTotal", -1);
- _maxIdle = intProp(props, "maxIdle", 4);
- int value = GenericKeyedObjectPool.WHEN_EXHAUSTED_BLOCK;
- _whenExhaustedAction = (byte) intProp(props, "whenExhaustedAction", value);
+ _maxIdle = intProp(props, "maxIdle", 2);
+ _whenExhaustedAction = (byte) intProp(props, "whenExhaustedAction", (int) GenericKeyedObjectPool.WHEN_EXHAUSTED_BLOCK);
_testOnBorrow = booleanProp(props, "testOnBorrow", false);
_testOnReturn = booleanProp(props, "testOnReturn", false);
- _timeBetweenEvictionRunsMillis = intProp(props, "timeBetweenEvictionRunsMillis", -1);
- _minEvictableIdleTimeMillis = longProp(props, "minEvictableIdleTimeMillis", 1800000L);
- _testWhileIdle = booleanProp(props, "testWhileIdle", false);
+ _timeBetweenEvictionRunsMillis = intProp(props, "timeBetweenEvictionRunsMillis", 5 * 60 * 1000);
+ _minEvictableIdleTimeMillis = longProp(props, "minEvictableIdleTimeMillis", 30 * 60 * 1000);
+ _testWhileIdle = booleanProp(props, "testWhileIdle", true);
_minIdle = intProp(props, "minIdle", 0);
_lifo = booleanProp(props, "lifo", true);
}
Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/RequestBalancingStrategy.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/RequestBalancingStrategy.java 2010-05-18 15:39:00 UTC (rev 1804)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/RequestBalancingStrategy.java 2010-05-18 15:40:48 UTC (rev 1805)
@@ -1,14 +1,18 @@
package org.infinispan.client.hotrod.impl.transport.tcp;
+import net.jcip.annotations.ThreadSafe;
+
import java.net.InetSocketAddress;
import java.util.Collection;
/**
- * // TODO: Document this
+ * Defines how request are distributed between the servers for replicated caches. This class must be thread safe: setServer
+ * and nextServer are called from multiple threads.
*
* @author Mircea.Markus at jboss.com
* @since 4.1
*/
+ at ThreadSafe
public interface RequestBalancingStrategy {
void setServers(Collection<InetSocketAddress> servers);
Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/RoundRobinBalancingStrategy.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/RoundRobinBalancingStrategy.java 2010-05-18 15:39:00 UTC (rev 1804)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/RoundRobinBalancingStrategy.java 2010-05-18 15:40:48 UTC (rev 1805)
@@ -13,10 +13,8 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
- * // TODO: Document this
+ * Round-robin implementation for {@link org.infinispan.client.hotrod.impl.transport.tcp.RequestBalancingStrategy}.
*
- * todo - this can be called from several threads, synchronize!
- *
* @author Mircea.Markus at jboss.com
* @since 4.1
*/
@@ -30,7 +28,7 @@
private final Lock writeLock = readWriteLock.writeLock();
private final AtomicInteger index = new AtomicInteger(0);
- private InetSocketAddress[] servers;
+ private volatile InetSocketAddress[] servers;
@Override
public void setServers(Collection<InetSocketAddress> servers) {
@@ -56,7 +54,7 @@
int pos = index.getAndIncrement() % servers.length;
InetSocketAddress server = servers[pos];
if (log.isTraceEnabled()) {
- log.trace("Retuning server: " + server);
+ log.trace("Returning server: " + server);
}
return server;
} finally {
Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransport.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransport.java 2010-05-18 15:39:00 UTC (rev 1804)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransport.java 2010-05-18 15:40:48 UTC (rev 1805)
@@ -1,7 +1,9 @@
package org.infinispan.client.hotrod.impl.transport.tcp;
+import net.jcip.annotations.ThreadSafe;
import org.infinispan.client.hotrod.impl.transport.AbstractTransport;
import org.infinispan.client.hotrod.exceptions.TransportException;
+import org.infinispan.client.hotrod.impl.transport.TransportFactory;
import org.infinispan.client.hotrod.impl.transport.VHelper;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
@@ -12,18 +14,32 @@
import java.nio.channels.SocketChannel;
/**
- * // TODO: Document this
+ * Transport implementation based on TCP.
*
- * @author mmarkus
+ * @author Mircea.Markus at jboss.com
* @since 4.1
*/
+ at ThreadSafe
public class TcpTransport extends AbstractTransport {
private static Log log = LogFactory.getLog(TcpTransport.class);
- private Socket socket;
- private InetSocketAddress serverAddress;
+ private final Socket socket;
+ private final InetSocketAddress serverAddress;
+ public TcpTransport(InetSocketAddress serverAddress, TransportFactory transportFactory) {
+ super(transportFactory);
+ this.serverAddress = serverAddress;
+ try {
+ SocketChannel socketChannel = SocketChannel.open(serverAddress);
+ socket = socketChannel.socket();
+ } catch (IOException e) {
+ String message = "Could not connect to server: " + serverAddress;
+ log.error(message, e);
+ throw new TransportException(message, e);
+ }
+ }
+
public void writeVInt(int vInt) {
try {
VHelper.writeVInt(vInt, socket.getOutputStream());
@@ -68,18 +84,6 @@
}
}
- public TcpTransport(InetSocketAddress serverAddress) {
- this.serverAddress = serverAddress;
- try {
- SocketChannel socketChannel = SocketChannel.open(serverAddress);
- socket = socketChannel.socket();
- } catch (IOException e) {
- String message = "Could not connect to server: " + serverAddress;
- log.error(message, e);
- throw new TransportException(message, e);
- }
- }
-
protected void writeBytes(byte[] toAppend) {
try {
socket.getOutputStream().write(toAppend);
Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java 2010-05-18 15:39:00 UTC (rev 1804)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java 2010-05-18 15:40:48 UTC (rev 1805)
@@ -1,5 +1,6 @@
package org.infinispan.client.hotrod.impl.transport.tcp;
+import net.jcip.annotations.ThreadSafe;
import org.apache.commons.pool.impl.GenericKeyedObjectPool;
import org.infinispan.client.hotrod.exceptions.TransportException;
import org.infinispan.client.hotrod.impl.consistenthash.ConsistentHash;
@@ -16,22 +17,20 @@
import java.util.LinkedHashMap;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
/**
- * // TODO: Document this
- * <p/>
- * todo - all methods but start and start can be called from multiple threads, add proper sync
- *
* @author Mircea.Markus at jboss.com
* @since 4.1
*/
+ at ThreadSafe
public class TcpTransportFactory implements TransportFactory {
private static final Log log = LogFactory.getLog(TcpTransportFactory.class);
/**
- * These are declared volatile as the thread that calls {@link #start(java.util.Properties, java.util.Collection)}
- * might(and likely will) be different from the thread that calls {@link #getTransport()} or other methods
+ * These are declared volatile as the thread that calls {@link org.infinispan.client.hotrod.impl.transport.TransportFactory#start(java.util.Properties, java.util.Collection, java.util.concurrent.atomic.AtomicInteger)}
+ * might(and likely will) be different from the thread(s) that calls {@link #getTransport()} or other methods
*/
private volatile GenericKeyedObjectPool connectionPool;
private volatile RequestBalancingStrategy balancer;
@@ -40,12 +39,12 @@
private final ConsistentHashFactory hashFactory = new ConsistentHashFactory();
@Override
- public void start(Properties props, Collection<InetSocketAddress> staticConfiguredServers) {
+ public void start(Properties props, Collection<InetSocketAddress> staticConfiguredServers, AtomicInteger topologyId) {
hashFactory.init(props);
servers = staticConfiguredServers;
String balancerClass = props.getProperty("requestBalancingStrategy", RoundRobinBalancingStrategy.class.getName());
balancer = (RequestBalancingStrategy) VHelper.newInstance(balancerClass);
- PropsKeyedObjectPoolFactory poolFactory = new PropsKeyedObjectPoolFactory(new TransportObjectFactory(), props);
+ PropsKeyedObjectPoolFactory poolFactory = new PropsKeyedObjectPoolFactory(new TransportObjectFactory(this, topologyId), props);
connectionPool = (GenericKeyedObjectPool) poolFactory.createPool();
balancer.setServers(servers);
}
Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TransportObjectFactory.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TransportObjectFactory.java 2010-05-18 15:39:00 UTC (rev 1804)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TransportObjectFactory.java 2010-05-18 15:40:48 UTC (rev 1805)
@@ -1,39 +1,66 @@
package org.infinispan.client.hotrod.impl.transport.tcp;
import org.apache.commons.pool.BaseKeyedPoolableObjectFactory;
+import org.infinispan.client.hotrod.impl.protocol.HotRodOperationsHelper;
+import org.infinispan.client.hotrod.impl.protocol.HotrodConstants;
+import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicInteger;
/**
- * // TODO: Document this
- *
* @author Mircea.Markus at jboss.com
* @since 4.1
*/
public class TransportObjectFactory extends BaseKeyedPoolableObjectFactory {
- private static Log log = LogFactory.getLog(TransportObjectFactory.class);
+ private static final Log log = LogFactory.getLog(TransportObjectFactory.class);
+ private final TcpTransportFactory tcpTransportFactory;
+ private final AtomicInteger topologyId;
+ public TransportObjectFactory(TcpTransportFactory tcpTransportFactory, AtomicInteger topologyId) {
+ this.tcpTransportFactory = tcpTransportFactory;
+ this.topologyId = topologyId;
+ }
+
@Override
public Object makeObject(Object key) throws Exception {
InetSocketAddress serverAddress = (InetSocketAddress) key;
- TcpTransport tcpTransport = new TcpTransport(serverAddress);
+ TcpTransport tcpTransport = new TcpTransport(serverAddress, tcpTransportFactory);
if (log.isTraceEnabled()) {
log.trace("Created tcp transport: " + tcpTransport);
}
return tcpTransport;
}
+ /**
+ * This will be called by the test thread when testWhileIdle==true.
+ */
@Override
public boolean validateObject(Object key, Object obj) {
TcpTransport transport = (TcpTransport) obj;
- if (log.isTraceEnabled()) {
- log.trace("About to validate(ping) connection to server " + key + ". TcpTransport is " + transport);
+ try {
+ if (log.isTraceEnabled()) {
+ log.trace("About to validate(ping) connection to server " + key + ". TcpTransport is " + transport);
+ }
+ long messageId = HotRodOperationsHelper.writeHeader(transport, HotrodConstants.PING_REQUEST, DefaultCacheManager.DEFAULT_CACHE_NAME, topologyId);
+ short respStatus = HotRodOperationsHelper.readHeaderAndValidate(transport, messageId, HotrodConstants.PING_RESPONSE, topologyId);
+ if (respStatus == HotrodConstants.NO_ERROR_STATUS) {
+ if (log.isTraceEnabled())
+ log.trace("Successfully validated transport: " + transport);
+ return true;
+ } else {
+ if (log.isTraceEnabled())
+ log.trace("Unknown response status: " + respStatus);
+ return false;
+ }
+ } catch (Exception e) {
+ if (log.isTraceEnabled())
+ log.trace("Failed to validate transport: " + transport, e);
+ return false;
}
- //todo implement
- return true;
}
@Override
More information about the infinispan-commits
mailing list