[infinispan-commits] Infinispan SVN: r1651 - in trunk/client/hotrod-client/src: main/java/org/infinispan/client/hotrod/impl/transport/netty and 3 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Thu Apr 1 10:46:37 EDT 2010


Author: mircea.markus
Date: 2010-04-01 10:46:36 -0400 (Thu, 01 Apr 2010)
New Revision: 1651

Modified:
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/AbstractTransport.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodConstants.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperationsImpl.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/Transport.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransport.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransport.java
   trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HotRodIntegrationTest.java
   trunk/client/hotrod-client/src/test/resources/log4j.xml
Log:
ongoing work on hotrod client

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/AbstractTransport.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/AbstractTransport.java	2010-04-01 10:21:39 UTC (rev 1650)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/AbstractTransport.java	2010-04-01 14:46:36 UTC (rev 1651)
@@ -1,9 +1,5 @@
 package org.infinispan.client.hotrod.impl;
 
-import org.infinispan.client.hotrod.impl.transport.TransportException;
-
-import java.io.IOException;
-
 /**
  * // TODO: Document this
  *
@@ -12,25 +8,41 @@
  */
 public abstract class AbstractTransport implements Transport {
 
-   public byte[] readByteArray() {
-         int responseLength = readVInt();
-         byte[] bufferToFill = new byte[responseLength];
-         readBuffer(bufferToFill);
-         return bufferToFill;
+   public byte[] readArray() {
+      int responseLength = readVInt();
+      return readByteArray(responseLength);
    }
 
    @Override
    public String readString() {
-      byte[] strContent = readByteArray();
+      byte[] strContent = readArray();
       return new String(strContent);//todo take care of encoding here
    }
 
-   protected abstract void readBuffer(byte[] bufferToFill);
+   @Override
+   public long readLong() {
+      byte[] longBytes = readByteArray(8);
+      long result = 0;
+      for (int i = 0; i < 8; i++) {
+         result <<= 8;
+         result ^= (long) longBytes[i] & 0xFF;
+      }
+      return result;
+   }
 
-   public void writeByteArray(byte[] toAppend) {
+   @Override
+   public void writeLong(long longValue) {
+      byte[] b = new byte[8];
+      for (int i = 0; i < 8; i++) {
+         b[7 - i] = (byte) (longValue >>> (i * 8));
+      }
+      writeBytes(b);
+   }
+
+   public void writeArray(byte[] toAppend) {
       writeVInt(toAppend.length);
-      writeBuffer(toAppend);
+      writeBytes(toAppend);
    }
 
-   protected abstract void writeBuffer(byte[] toAppend);
+   protected abstract void writeBytes(byte[] toAppend);
 }

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodConstants.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodConstants.java	2010-04-01 10:21:39 UTC (rev 1650)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodConstants.java	2010-04-01 14:46:36 UTC (rev 1651)
@@ -22,13 +22,9 @@
    public static final byte REMOVE_REQUEST = 0x0B;
    public static final byte REMOVE_IF_UNMODIFIED_REQUEST = 0x0D;
    public static final byte CONTAINS_KEY_REQUEST = 0x0F;
-   public static final byte PUT_FOR_EXTERNAL_READ_REQUEST = 0x11;
-   public static final byte GET_WITH_CAS_REQUEST = 0x13;
-   public static final byte EVICT_REQUEST = 0x15;
-   public static final byte CLEAR_REQUEST = 0x17;
-   public static final byte STATS_REQUEST = 0x19;
-   public static final byte QUIT_REQUEST = 0x1B;
-   public static final byte EVENT_REGISTRATION_REQUEST = 0x1D;
+   public static final byte GET_WITH_VERSION = 0x11;
+   public static final byte CLEAR_REQUEST = 0x13;
+   public static final byte STATS_REQUEST = 0x15;
    public static final byte PING_REQUEST = 0x17;
 
 
@@ -41,15 +37,11 @@
    public static final byte REMOVE_RESPONSE = 0x0C;
    public static final byte REMOVE_IF_UNMODIFIED_RESPONSE = 0x0E;
    public static final byte CONTAINS_KEY_RESPONSE = 0x10;
-   public static final byte PUT_FOR_EXTERNAL_READ_RESPONSE = 0x12;
-   public static final byte GET_WITH_CAS_RESPONSE = 0x14;
-   public static final byte EVICT_RESPONSE = 0x16;
-   public static final byte CLEAR_RESPONSE = 0x18;
-   public static final byte STATS_RESPONSE = 0x1A;
-   public static final byte QUIT_RESPONSE = 0x1C;
-   public static final byte EVENT_REGISTRATION_RESPONSE = 0x1E;
-   public static final byte ERROR_RESPONSE = 0x50;
+   public static final byte GET_WITH_VERSION_RESPONSE = 0x12;
+   public static final byte CLEAR_RESPONSE = 0x14;
+   public static final byte STATS_RESPONSE = 0x16;
    public static final byte PING_RESPONSE = 0x18;
+   public static final byte ERROR_RESPONSE = 0x50;
 
    //response status
    public static final byte NO_ERROR_STATUS = 0x00;

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperationsImpl.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperationsImpl.java	2010-04-01 10:21:39 UTC (rev 1650)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperationsImpl.java	2010-04-01 14:46:36 UTC (rev 1651)
@@ -40,7 +40,7 @@
             return null;
          }
          if (status == NO_ERROR_STATUS) {
-            return transport.readByteArray();
+            return transport.readArray();
          }
       } finally {
          releaseTransport(transport);
@@ -81,13 +81,16 @@
    public BinaryVersionedValue getWithVersion(byte[] key, Flag... flags) {
       Transport transport = getTransport();
       try {
-         short status = sendKeyOperation(key, transport, GET_WITH_CAS_REQUEST, flags, GET_WITH_CAS_RESPONSE);
+         short status = sendKeyOperation(key, transport, GET_WITH_VERSION, flags, GET_WITH_VERSION_RESPONSE);
          if (status == KEY_DOES_NOT_EXIST_STATUS) {
             return null;
          }
          if (status == NO_ERROR_STATUS) {
-            long version = transport.readVLong();
-            byte[] value = transport.readByteArray();
+            long version = transport.readLong();
+            if (log.isTraceEnabled()) {
+               log.trace("Received version: " + version);
+            }
+            byte[] value = transport.readArray();
             return new BinaryVersionedValue(version, value);
          }
       } finally {
@@ -153,12 +156,12 @@
          long messageId = writeHeader(transport, REPLACE_IF_UNMODIFIED_REQUEST, flags);
 
          //2) write message body
-         transport.writeByteArray(key);
+         transport.writeArray(key);
          transport.writeVInt(lifespan);
          transport.writeVInt(maxIdle);
-         transport.writeVLong(version);
-         transport.writeByteArray(value);
-         return returnVersionedOperationResponse(transport, messageId, flags);
+         transport.writeLong(version);
+         transport.writeArray(value);
+         return returnVersionedOperationResponse(transport, messageId, REPLACE_IF_UNMODIFIED_RESPONSE, flags);
       } finally {
          releaseTransport(transport);
       }
@@ -174,11 +177,11 @@
          long messageId = writeHeader(transport, REMOVE_IF_UNMODIFIED_REQUEST, flags);
 
          //2) write message body
-         transport.writeByteArray(key);
-         transport.writeVLong(version);
+         transport.writeArray(key);
+         transport.writeLong(version);
 
          //process response and return
-         return returnVersionedOperationResponse(transport, messageId, flags);
+         return returnVersionedOperationResponse(transport, messageId, REMOVE_IF_UNMODIFIED_RESPONSE, flags);
 
       } finally {
          releaseTransport(transport);
@@ -201,7 +204,7 @@
       try {
          // 1) write header
          long messageId = writeHeader(transport, STATS_REQUEST);
-         readHeaderAndValidate(transport, messageId, CLEAR_RESPONSE);
+         readHeaderAndValidate(transport, messageId, STATS_RESPONSE);
          int nrOfStats = transport.readVInt();
          Map<String, Number> result = new HashMap<String, Number>();
          for (int i = 0; i < nrOfStats; i++) {
@@ -247,10 +250,10 @@
       long messageId = writeHeader(transport, opCode, flags);
 
       // 2) write key and value
-      transport.writeByteArray(key);
+      transport.writeArray(key);
       transport.writeVInt(lifespan);
       transport.writeVInt(maxIdle);
-      transport.writeByteArray(value);
+      transport.writeArray(value);
       transport.flush();
 
       // 3) now read header
@@ -269,7 +272,10 @@
       transport.writeVLong(messageId);
       transport.writeByte(HOTROD_VERSION);
       transport.writeByte(operationCode);
-      transport.writeByteArray(cacheNameBytes);
+      transport.writeArray(cacheNameBytes);
+
+
+
       int flagInt = 0;
       if (flags != null) {
          for (Flag flag : flags) {
@@ -279,6 +285,9 @@
       transport.writeVInt(flagInt);
       transport.writeByte(clientIntelligence);
       transport.writeVInt(0);//this will be changed once smarter clients are supported
+      if (log.isTraceEnabled()) {
+         log.trace("wrote header for message " + messageId + ". Operation code: " + operationCode);
+      }
       return messageId;
    }
 
@@ -288,12 +297,19 @@
    private short readHeaderAndValidate(Transport transport, long messageId, short opRespCode) {
       short magic = transport.readByte();
       if (magic != RESPONSE_MAGIC) {
-         throw new InvalidResponseException("Invalid magic number. Expected " + Integer.toHexString(RESPONSE_MAGIC) + " and received " + Integer.toHexString(magic));
+         String message = "Invalid magic number. Expected " + Integer.toHexString(RESPONSE_MAGIC) + " and received " + Integer.toHexString(magic);
+         log.error(message);
+         throw new InvalidResponseException(message);
       }
       long receivedMessageId = transport.readVLong();
       if (receivedMessageId != messageId) {
-         throw new InvalidResponseException("Invalid message id. Expected " + Long.toHexString(messageId) + " and received " + Long.toHexString(receivedMessageId));
+         String message = "Invalid message id. Expected " + Long.toHexString(messageId) + " and received " + Long.toHexString(receivedMessageId);
+         log.error(message);
+         throw new InvalidResponseException(message);
       }
+      if (log.isTraceEnabled()) {
+         log.trace("Received response for message id: " + receivedMessageId);
+      }
       short receivedOpCode = transport.readByte();
       if (receivedOpCode != opRespCode) {
          if (receivedOpCode == ERROR_RESPONSE) {
@@ -302,6 +318,9 @@
          }
          throw new InvalidResponseException("Invalid response operation. Expected " + Integer.toHexString(opRespCode) + " and received " + Integer.toHexString(receivedOpCode));
       }
+      if (log.isTraceEnabled()) {
+         log.trace("Received operation code is: " + receivedOpCode);
+      }
       short status = transport.readByte();
       transport.readByte(); //todo - this will be changed once we support smarter than basic clients
       checkForErrorsInResponseStatus(status, messageId, transport);
@@ -309,19 +328,30 @@
    }
 
    private void checkForErrorsInResponseStatus(short status, long messageId, Transport transport) {
+      if (log.isTraceEnabled()) {
+         log.trace("Received operation status: " + status);
+      }
       switch ((int) status) {
          case INVALID_MAGIC_OR_MESSAGE_ID_STATUS:
          case REQUEST_PARSING_ERROR_STATUS:
          case UNKNOWN_COMMAND_STATUS:
          case SERVER_ERROR_STATUS:
          case UNKNOWN_VERSION_STATUS: {
-            throw new HotRodClientException(transport.readString(), messageId, status);
+            String msgFromServer = transport.readString();
+            if (log.isWarnEnabled()) {
+               log.warn("Error status received from the server:" + msgFromServer + " for message id " + messageId);
+            }
+            throw new HotRodClientException(msgFromServer, messageId, status);
          }
          case COMMAND_TIMEOUT_STATUS: {
+            if (log.isTraceEnabled()) {
+               log.trace("timeout message received from the server");
+            }
             throw new TimeoutException();
          }
          case NO_ERROR_STATUS:
-         case KEY_DOES_NOT_EXIST_STATUS: {
+         case KEY_DOES_NOT_EXIST_STATUS:
+         case NOT_PUT_REMOVED_REPLACED_STATUS: {
             //don't do anything, these are correct responses
             break;
          }
@@ -342,7 +372,7 @@
    private short sendKeyOperation(byte[] key, Transport transport, byte opCode, Flag[] flags, byte opRespCode) {
       // 1) write [header][key length][key]
       long messageId = writeHeader(transport, opCode, flags);
-      transport.writeByteArray(key);
+      transport.writeArray(key);
       transport.flush();
 
       // 2) now read the header
@@ -350,7 +380,7 @@
    }
 
    private byte[] returnPossiblePrevValue(Transport transport, Flag[] flags) {
-      return hasForceReturn(flags) ? transport.readByteArray() : null;
+      return hasForceReturn(flags) ? transport.readArray() : null;
    }
 
    private void releaseTransport(Transport transport) {
@@ -358,9 +388,9 @@
          transport.release();
    }
 
-   private VersionedOperationResponse returnVersionedOperationResponse(Transport transport, long messageId, Flag[] flags) {
+   private VersionedOperationResponse returnVersionedOperationResponse(Transport transport, long messageId, byte response, Flag[] flags) {
       //3) ...
-      short respStatus = readHeaderAndValidate(transport, messageId, REPLACE_IF_UNMODIFIED_RESPONSE);
+      short respStatus = readHeaderAndValidate(transport, messageId, response);
 
       //4 ...
       VersionedOperationResponse.RspCode code;

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/Transport.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/Transport.java	2010-04-01 10:21:39 UTC (rev 1650)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/Transport.java	2010-04-01 14:46:36 UTC (rev 1651)
@@ -8,7 +8,7 @@
  */
 public interface Transport {
 
-   public void writeByteArray(byte[] toAppend);
+   public void writeArray(byte[] toAppend);
 
    public void writeByte(short toWrite);
 
@@ -29,7 +29,13 @@
    /**
     * reads an vint which is size; then an array having that size.
     */
-   public byte[] readByteArray();
+   public byte[] readArray();
 
    String readString();
+
+   byte[] readByteArray(int size);
+
+   long readLong();
+
+   void writeLong(long longValue);
 }

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransport.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransport.java	2010-04-01 10:21:39 UTC (rev 1650)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransport.java	2010-04-01 14:46:36 UTC (rev 1651)
@@ -1,7 +1,6 @@
 package org.infinispan.client.hotrod.impl.transport.netty;
 
 import org.infinispan.client.hotrod.impl.AbstractTransport;
-import org.infinispan.client.hotrod.impl.Transport;
 import org.infinispan.client.hotrod.impl.transport.TransportException;
 import org.jboss.netty.bootstrap.ClientBootstrap;
 import org.jboss.netty.channel.Channel;
@@ -50,7 +49,7 @@
    }
 
    @Override
-   protected void writeBuffer(byte[] toAppend) {
+   protected void writeBytes(byte[] toAppend) {
       channel.write(toAppend);
    }
 
@@ -100,7 +99,9 @@
    }
 
    @Override
-   protected void readBuffer(byte[] bufferToFill) {
-      decoder.fillBuffer(bufferToFill);
+   public byte[] readByteArray(int size) {
+      byte[] bytes = new byte[size];
+      decoder.fillBuffer(bytes);
+      return bytes;
    }
 }

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransport.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransport.java	2010-04-01 10:21:39 UTC (rev 1650)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransport.java	2010-04-01 14:46:36 UTC (rev 1651)
@@ -1,7 +1,6 @@
 package org.infinispan.client.hotrod.impl.transport.tcp;
 
 import org.infinispan.client.hotrod.impl.AbstractTransport;
-import org.infinispan.client.hotrod.impl.Transport;
 import org.infinispan.client.hotrod.impl.transport.TransportException;
 import org.infinispan.client.hotrod.impl.transport.VHelper;
 
@@ -68,7 +67,7 @@
       }
    }
 
-   protected void writeBuffer(byte[] toAppend) {
+   protected void writeBytes(byte[] toAppend) {
       try {
          socket.getOutputStream().write(toAppend);
       } catch (IOException e) {
@@ -114,18 +113,19 @@
       }
    }
 
-   protected void readBuffer(byte[] bufferToFill)  {
-      int size;
+   public byte[] readByteArray(int size)  {
+      byte[] bytes = new byte[size];
       try {
-         size = socket.getInputStream().read(bufferToFill);
+         size = socket.getInputStream().read(bytes);
       } catch (IOException e) {
          throw new TransportException(e);
       }
       if (size == -1) {
          throw new RuntimeException("End of stream reached!");
       }
-      if (size != bufferToFill.length) {
-         throw new TransportException("Expected " + bufferToFill.length + " bytes but only could read " + size + " bytes!");
+      if (size != bytes.length) {
+         throw new TransportException("Expected " + bytes.length + " bytes but only could read " + size + " bytes!");
       }
+      return bytes;
    }
 }

Modified: trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HotRodIntegrationTest.java
===================================================================
--- trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HotRodIntegrationTest.java	2010-04-01 10:21:39 UTC (rev 1650)
+++ trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HotRodIntegrationTest.java	2010-04-01 14:46:36 UTC (rev 1651)
@@ -10,6 +10,8 @@
 import org.infinispan.server.hotrod.test.HotRodTestingUtil;
 import org.infinispan.test.SingleCacheManagerTest;
 import org.infinispan.test.fwk.TestCacheManagerFactory;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.Test;
 
@@ -127,12 +129,17 @@
       assert remoteCache.containsKey("aKey");
    }
 
+   private static Log log = LogFactory.getLog(HotRodIntegrationTest.class);
+
+   @Test (invocationCount = 10)
    public void testGetVersionedCacheEntry() {
       assert null == remoteCache.getVersioned("aKey");
       remoteCache.put("aKey", "aValue");
+      assert remoteCache.get("aKey").equals("aValue");
       RemoteCache.VersionedValue valueBinary = remoteCache.getVersioned("aKey");
       assert valueBinary != null;
       assertEquals(valueBinary.getValue(), "aValue");
+      log.info("Version is: " + valueBinary.getVersion());
 
       //now put the same value
       remoteCache.put("aKey", "aValue");
@@ -154,7 +161,7 @@
       assert null == remoteCache.replace("aKey", "anotherValue");
       remoteCache.put("aKey", "aValue");
       assert null == remoteCache.replace("aKey", "anotherValue");
-      assert get(cache, "aKey").equals("anotherValue");
+      assert remoteCache.get("aKey").equals("anotherValue");
    }
 
    public void testReplaceIfUnmodified() {
@@ -177,10 +184,10 @@
 
       remoteCache.put("aKey", "aValue");
       RemoteCache.VersionedValue valueBinary = remoteCache.getVersioned("aKey");
-      assert !remoteCache.remove("aKey", valueBinary.getVersion());
+      assert remoteCache.remove("aKey", valueBinary.getVersion());
       assert !cache.containsKey("aKey");
 
-      remoteCache.put("aKey", "aValueNew");
+      remoteCache.put("aKey", "aNewValue");
 
       RemoteCache.VersionedValue entry2 = remoteCache.getVersioned("aKey");
       assert entry2.getVersion() != valueBinary.getVersion();
@@ -192,10 +199,10 @@
    public void testPutIfAbsent() {
       remoteCache.put("aKey", "aValue");
       assert null == remoteCache.putIfAbsent("aKey", "anotherValue");
-      assert get(cache, "aKey").equals("aValue");
+      assert remoteCache.get("aKey").equals("aValue");
 
-      assert cache.remove("aKey").equals("aValue");
-      assert !remoteCache.containsKey("aKey");
+      assert remoteCache.get("aKey").equals("aValue");
+      assert remoteCache.containsKey("aKey");
 
       assert true : remoteCache.replace("aKey", "anotherValue");
    }

Modified: trunk/client/hotrod-client/src/test/resources/log4j.xml
===================================================================
--- trunk/client/hotrod-client/src/test/resources/log4j.xml	2010-04-01 10:21:39 UTC (rev 1650)
+++ trunk/client/hotrod-client/src/test/resources/log4j.xml	2010-04-01 14:46:36 UTC (rev 1651)
@@ -8,7 +8,7 @@
 
    <!-- A time/date based rolling appender -->
    <appender name="FILE" class="org.apache.log4j.DailyRollingFileAppender">
-      <param name="File" value="jdbc_cache_store.log"/>
+      <param name="File" value="hotrod_client.log"/>
       <param name="Append" value="false"/>
 
       <!-- Rollover at midnight each day -->
@@ -52,6 +52,10 @@
       <priority value="TRACE"/>
    </category>
 
+   <category name="org.infinispan.client.hotrod">
+      <priority value="TRACE"/>
+   </category>
+
    <category name="org.infinispan">
       <priority value="INFO"/>
    </category>



More information about the infinispan-commits mailing list