[infinispan-commits] Infinispan SVN: r1651 - in trunk/client/hotrod-client/src: main/java/org/infinispan/client/hotrod/impl/transport/netty and 3 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Thu Apr 1 10:46:37 EDT 2010
Author: mircea.markus
Date: 2010-04-01 10:46:36 -0400 (Thu, 01 Apr 2010)
New Revision: 1651
Modified:
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/AbstractTransport.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodConstants.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperationsImpl.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/Transport.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransport.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransport.java
trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HotRodIntegrationTest.java
trunk/client/hotrod-client/src/test/resources/log4j.xml
Log:
ongoing work on hotrod client
Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/AbstractTransport.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/AbstractTransport.java 2010-04-01 10:21:39 UTC (rev 1650)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/AbstractTransport.java 2010-04-01 14:46:36 UTC (rev 1651)
@@ -1,9 +1,5 @@
package org.infinispan.client.hotrod.impl;
-import org.infinispan.client.hotrod.impl.transport.TransportException;
-
-import java.io.IOException;
-
/**
* // TODO: Document this
*
@@ -12,25 +8,41 @@
*/
public abstract class AbstractTransport implements Transport {
- public byte[] readByteArray() {
- int responseLength = readVInt();
- byte[] bufferToFill = new byte[responseLength];
- readBuffer(bufferToFill);
- return bufferToFill;
+ public byte[] readArray() {
+ int responseLength = readVInt();
+ return readByteArray(responseLength);
}
@Override
public String readString() {
- byte[] strContent = readByteArray();
+ byte[] strContent = readArray();
return new String(strContent);//todo take care of encoding here
}
- protected abstract void readBuffer(byte[] bufferToFill);
+ @Override
+ public long readLong() {
+ byte[] longBytes = readByteArray(8);
+ long result = 0;
+ for (int i = 0; i < 8; i++) {
+ result <<= 8;
+ result ^= (long) longBytes[i] & 0xFF;
+ }
+ return result;
+ }
- public void writeByteArray(byte[] toAppend) {
+ @Override
+ public void writeLong(long longValue) {
+ byte[] b = new byte[8];
+ for (int i = 0; i < 8; i++) {
+ b[7 - i] = (byte) (longValue >>> (i * 8));
+ }
+ writeBytes(b);
+ }
+
+ public void writeArray(byte[] toAppend) {
writeVInt(toAppend.length);
- writeBuffer(toAppend);
+ writeBytes(toAppend);
}
- protected abstract void writeBuffer(byte[] toAppend);
+ protected abstract void writeBytes(byte[] toAppend);
}
Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodConstants.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodConstants.java 2010-04-01 10:21:39 UTC (rev 1650)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodConstants.java 2010-04-01 14:46:36 UTC (rev 1651)
@@ -22,13 +22,9 @@
public static final byte REMOVE_REQUEST = 0x0B;
public static final byte REMOVE_IF_UNMODIFIED_REQUEST = 0x0D;
public static final byte CONTAINS_KEY_REQUEST = 0x0F;
- public static final byte PUT_FOR_EXTERNAL_READ_REQUEST = 0x11;
- public static final byte GET_WITH_CAS_REQUEST = 0x13;
- public static final byte EVICT_REQUEST = 0x15;
- public static final byte CLEAR_REQUEST = 0x17;
- public static final byte STATS_REQUEST = 0x19;
- public static final byte QUIT_REQUEST = 0x1B;
- public static final byte EVENT_REGISTRATION_REQUEST = 0x1D;
+ public static final byte GET_WITH_VERSION = 0x11;
+ public static final byte CLEAR_REQUEST = 0x13;
+ public static final byte STATS_REQUEST = 0x15;
public static final byte PING_REQUEST = 0x17;
@@ -41,15 +37,11 @@
public static final byte REMOVE_RESPONSE = 0x0C;
public static final byte REMOVE_IF_UNMODIFIED_RESPONSE = 0x0E;
public static final byte CONTAINS_KEY_RESPONSE = 0x10;
- public static final byte PUT_FOR_EXTERNAL_READ_RESPONSE = 0x12;
- public static final byte GET_WITH_CAS_RESPONSE = 0x14;
- public static final byte EVICT_RESPONSE = 0x16;
- public static final byte CLEAR_RESPONSE = 0x18;
- public static final byte STATS_RESPONSE = 0x1A;
- public static final byte QUIT_RESPONSE = 0x1C;
- public static final byte EVENT_REGISTRATION_RESPONSE = 0x1E;
- public static final byte ERROR_RESPONSE = 0x50;
+ public static final byte GET_WITH_VERSION_RESPONSE = 0x12;
+ public static final byte CLEAR_RESPONSE = 0x14;
+ public static final byte STATS_RESPONSE = 0x16;
public static final byte PING_RESPONSE = 0x18;
+ public static final byte ERROR_RESPONSE = 0x50;
//response status
public static final byte NO_ERROR_STATUS = 0x00;
Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperationsImpl.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperationsImpl.java 2010-04-01 10:21:39 UTC (rev 1650)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperationsImpl.java 2010-04-01 14:46:36 UTC (rev 1651)
@@ -40,7 +40,7 @@
return null;
}
if (status == NO_ERROR_STATUS) {
- return transport.readByteArray();
+ return transport.readArray();
}
} finally {
releaseTransport(transport);
@@ -81,13 +81,16 @@
public BinaryVersionedValue getWithVersion(byte[] key, Flag... flags) {
Transport transport = getTransport();
try {
- short status = sendKeyOperation(key, transport, GET_WITH_CAS_REQUEST, flags, GET_WITH_CAS_RESPONSE);
+ short status = sendKeyOperation(key, transport, GET_WITH_VERSION, flags, GET_WITH_VERSION_RESPONSE);
if (status == KEY_DOES_NOT_EXIST_STATUS) {
return null;
}
if (status == NO_ERROR_STATUS) {
- long version = transport.readVLong();
- byte[] value = transport.readByteArray();
+ long version = transport.readLong();
+ if (log.isTraceEnabled()) {
+ log.trace("Received version: " + version);
+ }
+ byte[] value = transport.readArray();
return new BinaryVersionedValue(version, value);
}
} finally {
@@ -153,12 +156,12 @@
long messageId = writeHeader(transport, REPLACE_IF_UNMODIFIED_REQUEST, flags);
//2) write message body
- transport.writeByteArray(key);
+ transport.writeArray(key);
transport.writeVInt(lifespan);
transport.writeVInt(maxIdle);
- transport.writeVLong(version);
- transport.writeByteArray(value);
- return returnVersionedOperationResponse(transport, messageId, flags);
+ transport.writeLong(version);
+ transport.writeArray(value);
+ return returnVersionedOperationResponse(transport, messageId, REPLACE_IF_UNMODIFIED_RESPONSE, flags);
} finally {
releaseTransport(transport);
}
@@ -174,11 +177,11 @@
long messageId = writeHeader(transport, REMOVE_IF_UNMODIFIED_REQUEST, flags);
//2) write message body
- transport.writeByteArray(key);
- transport.writeVLong(version);
+ transport.writeArray(key);
+ transport.writeLong(version);
//process response and return
- return returnVersionedOperationResponse(transport, messageId, flags);
+ return returnVersionedOperationResponse(transport, messageId, REMOVE_IF_UNMODIFIED_RESPONSE, flags);
} finally {
releaseTransport(transport);
@@ -201,7 +204,7 @@
try {
// 1) write header
long messageId = writeHeader(transport, STATS_REQUEST);
- readHeaderAndValidate(transport, messageId, CLEAR_RESPONSE);
+ readHeaderAndValidate(transport, messageId, STATS_RESPONSE);
int nrOfStats = transport.readVInt();
Map<String, Number> result = new HashMap<String, Number>();
for (int i = 0; i < nrOfStats; i++) {
@@ -247,10 +250,10 @@
long messageId = writeHeader(transport, opCode, flags);
// 2) write key and value
- transport.writeByteArray(key);
+ transport.writeArray(key);
transport.writeVInt(lifespan);
transport.writeVInt(maxIdle);
- transport.writeByteArray(value);
+ transport.writeArray(value);
transport.flush();
// 3) now read header
@@ -269,7 +272,10 @@
transport.writeVLong(messageId);
transport.writeByte(HOTROD_VERSION);
transport.writeByte(operationCode);
- transport.writeByteArray(cacheNameBytes);
+ transport.writeArray(cacheNameBytes);
+
+
+
int flagInt = 0;
if (flags != null) {
for (Flag flag : flags) {
@@ -279,6 +285,9 @@
transport.writeVInt(flagInt);
transport.writeByte(clientIntelligence);
transport.writeVInt(0);//this will be changed once smarter clients are supported
+ if (log.isTraceEnabled()) {
+ log.trace("wrote header for message " + messageId + ". Operation code: " + operationCode);
+ }
return messageId;
}
@@ -288,12 +297,19 @@
private short readHeaderAndValidate(Transport transport, long messageId, short opRespCode) {
short magic = transport.readByte();
if (magic != RESPONSE_MAGIC) {
- throw new InvalidResponseException("Invalid magic number. Expected " + Integer.toHexString(RESPONSE_MAGIC) + " and received " + Integer.toHexString(magic));
+ String message = "Invalid magic number. Expected " + Integer.toHexString(RESPONSE_MAGIC) + " and received " + Integer.toHexString(magic);
+ log.error(message);
+ throw new InvalidResponseException(message);
}
long receivedMessageId = transport.readVLong();
if (receivedMessageId != messageId) {
- throw new InvalidResponseException("Invalid message id. Expected " + Long.toHexString(messageId) + " and received " + Long.toHexString(receivedMessageId));
+ String message = "Invalid message id. Expected " + Long.toHexString(messageId) + " and received " + Long.toHexString(receivedMessageId);
+ log.error(message);
+ throw new InvalidResponseException(message);
}
+ if (log.isTraceEnabled()) {
+ log.trace("Received response for message id: " + receivedMessageId);
+ }
short receivedOpCode = transport.readByte();
if (receivedOpCode != opRespCode) {
if (receivedOpCode == ERROR_RESPONSE) {
@@ -302,6 +318,9 @@
}
throw new InvalidResponseException("Invalid response operation. Expected " + Integer.toHexString(opRespCode) + " and received " + Integer.toHexString(receivedOpCode));
}
+ if (log.isTraceEnabled()) {
+ log.trace("Received operation code is: " + receivedOpCode);
+ }
short status = transport.readByte();
transport.readByte(); //todo - this will be changed once we support smarter than basic clients
checkForErrorsInResponseStatus(status, messageId, transport);
@@ -309,19 +328,30 @@
}
private void checkForErrorsInResponseStatus(short status, long messageId, Transport transport) {
+ if (log.isTraceEnabled()) {
+ log.trace("Received operation status: " + status);
+ }
switch ((int) status) {
case INVALID_MAGIC_OR_MESSAGE_ID_STATUS:
case REQUEST_PARSING_ERROR_STATUS:
case UNKNOWN_COMMAND_STATUS:
case SERVER_ERROR_STATUS:
case UNKNOWN_VERSION_STATUS: {
- throw new HotRodClientException(transport.readString(), messageId, status);
+ String msgFromServer = transport.readString();
+ if (log.isWarnEnabled()) {
+ log.warn("Error status received from the server:" + msgFromServer + " for message id " + messageId);
+ }
+ throw new HotRodClientException(msgFromServer, messageId, status);
}
case COMMAND_TIMEOUT_STATUS: {
+ if (log.isTraceEnabled()) {
+ log.trace("timeout message received from the server");
+ }
throw new TimeoutException();
}
case NO_ERROR_STATUS:
- case KEY_DOES_NOT_EXIST_STATUS: {
+ case KEY_DOES_NOT_EXIST_STATUS:
+ case NOT_PUT_REMOVED_REPLACED_STATUS: {
//don't do anything, these are correct responses
break;
}
@@ -342,7 +372,7 @@
private short sendKeyOperation(byte[] key, Transport transport, byte opCode, Flag[] flags, byte opRespCode) {
// 1) write [header][key length][key]
long messageId = writeHeader(transport, opCode, flags);
- transport.writeByteArray(key);
+ transport.writeArray(key);
transport.flush();
// 2) now read the header
@@ -350,7 +380,7 @@
}
private byte[] returnPossiblePrevValue(Transport transport, Flag[] flags) {
- return hasForceReturn(flags) ? transport.readByteArray() : null;
+ return hasForceReturn(flags) ? transport.readArray() : null;
}
private void releaseTransport(Transport transport) {
@@ -358,9 +388,9 @@
transport.release();
}
- private VersionedOperationResponse returnVersionedOperationResponse(Transport transport, long messageId, Flag[] flags) {
+ private VersionedOperationResponse returnVersionedOperationResponse(Transport transport, long messageId, byte response, Flag[] flags) {
//3) ...
- short respStatus = readHeaderAndValidate(transport, messageId, REPLACE_IF_UNMODIFIED_RESPONSE);
+ short respStatus = readHeaderAndValidate(transport, messageId, response);
//4 ...
VersionedOperationResponse.RspCode code;
Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/Transport.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/Transport.java 2010-04-01 10:21:39 UTC (rev 1650)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/Transport.java 2010-04-01 14:46:36 UTC (rev 1651)
@@ -8,7 +8,7 @@
*/
public interface Transport {
- public void writeByteArray(byte[] toAppend);
+ public void writeArray(byte[] toAppend);
public void writeByte(short toWrite);
@@ -29,7 +29,13 @@
/**
* reads an vint which is size; then an array having that size.
*/
- public byte[] readByteArray();
+ public byte[] readArray();
String readString();
+
+ byte[] readByteArray(int size);
+
+ long readLong();
+
+ void writeLong(long longValue);
}
Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransport.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransport.java 2010-04-01 10:21:39 UTC (rev 1650)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransport.java 2010-04-01 14:46:36 UTC (rev 1651)
@@ -1,7 +1,6 @@
package org.infinispan.client.hotrod.impl.transport.netty;
import org.infinispan.client.hotrod.impl.AbstractTransport;
-import org.infinispan.client.hotrod.impl.Transport;
import org.infinispan.client.hotrod.impl.transport.TransportException;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
@@ -50,7 +49,7 @@
}
@Override
- protected void writeBuffer(byte[] toAppend) {
+ protected void writeBytes(byte[] toAppend) {
channel.write(toAppend);
}
@@ -100,7 +99,9 @@
}
@Override
- protected void readBuffer(byte[] bufferToFill) {
- decoder.fillBuffer(bufferToFill);
+ public byte[] readByteArray(int size) {
+ byte[] bytes = new byte[size];
+ decoder.fillBuffer(bytes);
+ return bytes;
}
}
Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransport.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransport.java 2010-04-01 10:21:39 UTC (rev 1650)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransport.java 2010-04-01 14:46:36 UTC (rev 1651)
@@ -1,7 +1,6 @@
package org.infinispan.client.hotrod.impl.transport.tcp;
import org.infinispan.client.hotrod.impl.AbstractTransport;
-import org.infinispan.client.hotrod.impl.Transport;
import org.infinispan.client.hotrod.impl.transport.TransportException;
import org.infinispan.client.hotrod.impl.transport.VHelper;
@@ -68,7 +67,7 @@
}
}
- protected void writeBuffer(byte[] toAppend) {
+ protected void writeBytes(byte[] toAppend) {
try {
socket.getOutputStream().write(toAppend);
} catch (IOException e) {
@@ -114,18 +113,19 @@
}
}
- protected void readBuffer(byte[] bufferToFill) {
- int size;
+ public byte[] readByteArray(int size) {
+ byte[] bytes = new byte[size];
try {
- size = socket.getInputStream().read(bufferToFill);
+ size = socket.getInputStream().read(bytes);
} catch (IOException e) {
throw new TransportException(e);
}
if (size == -1) {
throw new RuntimeException("End of stream reached!");
}
- if (size != bufferToFill.length) {
- throw new TransportException("Expected " + bufferToFill.length + " bytes but only could read " + size + " bytes!");
+ if (size != bytes.length) {
+ throw new TransportException("Expected " + bytes.length + " bytes but only could read " + size + " bytes!");
}
+ return bytes;
}
}
Modified: trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HotRodIntegrationTest.java
===================================================================
--- trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HotRodIntegrationTest.java 2010-04-01 10:21:39 UTC (rev 1650)
+++ trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HotRodIntegrationTest.java 2010-04-01 14:46:36 UTC (rev 1651)
@@ -10,6 +10,8 @@
import org.infinispan.server.hotrod.test.HotRodTestingUtil;
import org.infinispan.test.SingleCacheManagerTest;
import org.infinispan.test.fwk.TestCacheManagerFactory;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;
@@ -127,12 +129,17 @@
assert remoteCache.containsKey("aKey");
}
+ private static Log log = LogFactory.getLog(HotRodIntegrationTest.class);
+
+ @Test (invocationCount = 10)
public void testGetVersionedCacheEntry() {
assert null == remoteCache.getVersioned("aKey");
remoteCache.put("aKey", "aValue");
+ assert remoteCache.get("aKey").equals("aValue");
RemoteCache.VersionedValue valueBinary = remoteCache.getVersioned("aKey");
assert valueBinary != null;
assertEquals(valueBinary.getValue(), "aValue");
+ log.info("Version is: " + valueBinary.getVersion());
//now put the same value
remoteCache.put("aKey", "aValue");
@@ -154,7 +161,7 @@
assert null == remoteCache.replace("aKey", "anotherValue");
remoteCache.put("aKey", "aValue");
assert null == remoteCache.replace("aKey", "anotherValue");
- assert get(cache, "aKey").equals("anotherValue");
+ assert remoteCache.get("aKey").equals("anotherValue");
}
public void testReplaceIfUnmodified() {
@@ -177,10 +184,10 @@
remoteCache.put("aKey", "aValue");
RemoteCache.VersionedValue valueBinary = remoteCache.getVersioned("aKey");
- assert !remoteCache.remove("aKey", valueBinary.getVersion());
+ assert remoteCache.remove("aKey", valueBinary.getVersion());
assert !cache.containsKey("aKey");
- remoteCache.put("aKey", "aValueNew");
+ remoteCache.put("aKey", "aNewValue");
RemoteCache.VersionedValue entry2 = remoteCache.getVersioned("aKey");
assert entry2.getVersion() != valueBinary.getVersion();
@@ -192,10 +199,10 @@
public void testPutIfAbsent() {
remoteCache.put("aKey", "aValue");
assert null == remoteCache.putIfAbsent("aKey", "anotherValue");
- assert get(cache, "aKey").equals("aValue");
+ assert remoteCache.get("aKey").equals("aValue");
- assert cache.remove("aKey").equals("aValue");
- assert !remoteCache.containsKey("aKey");
+ assert remoteCache.get("aKey").equals("aValue");
+ assert remoteCache.containsKey("aKey");
assert true : remoteCache.replace("aKey", "anotherValue");
}
Modified: trunk/client/hotrod-client/src/test/resources/log4j.xml
===================================================================
--- trunk/client/hotrod-client/src/test/resources/log4j.xml 2010-04-01 10:21:39 UTC (rev 1650)
+++ trunk/client/hotrod-client/src/test/resources/log4j.xml 2010-04-01 14:46:36 UTC (rev 1651)
@@ -8,7 +8,7 @@
<!-- A time/date based rolling appender -->
<appender name="FILE" class="org.apache.log4j.DailyRollingFileAppender">
- <param name="File" value="jdbc_cache_store.log"/>
+ <param name="File" value="hotrod_client.log"/>
<param name="Append" value="false"/>
<!-- Rollover at midnight each day -->
@@ -52,6 +52,10 @@
<priority value="TRACE"/>
</category>
+ <category name="org.infinispan.client.hotrod">
+ <priority value="TRACE"/>
+ </category>
+
<category name="org.infinispan">
<priority value="INFO"/>
</category>
More information about the infinispan-commits
mailing list