[infinispan-commits] Infinispan SVN: r2052 - in branches/4.1.x: client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl and 5 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Mon Jul 19 07:18:52 EDT 2010


Author: mircea.markus
Date: 2010-07-19 07:18:51 -0400 (Mon, 19 Jul 2010)
New Revision: 2052

Added:
   branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/BulkGetOperation.java
   branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/BulkGetSimpleTest.java
Modified:
   branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCache.java
   branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/RemoteCacheImpl.java
   branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/OperationsFactory.java
   branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodConstants.java
   branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/retry/ReplicationRetryTest.java
   branches/4.1.x/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala
   branches/4.1.x/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala
   branches/4.1.x/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodEncoder.scala
   branches/4.1.x/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodOperation.scala
   branches/4.1.x/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Response.scala
Log:
ISPN-516 - HotRod: improve protocol to be able to transfer the entire state of the cache

Modified: branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCache.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCache.java	2010-07-19 10:44:24 UTC (rev 2051)
+++ branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCache.java	2010-07-19 11:18:51 UTC (rev 2052)
@@ -14,42 +14,41 @@
  * Provides remote reference to a Hot Rod server/cluster. It implements {@link org.infinispan.Cache}, but given its
  * nature (remote) some operations are not supported. All these unsupported operations are being overridden within this
  * interface and documented as such.
- * <p>
+ * <p/>
  * <b>New operations</b>: besides the operations inherited from {@link org.infinispan.Cache}, RemoteCache also adds new
  * operations to optimize/reduce network traffic: e.g. versioned put operation.
- * <p>
+ * <p/>
  * <b>Concurrency</b>: implementors of this interface will support multi-threaded access, similar to the way {@link
  * org.infinispan.Cache} supports it.
- * <p>
- * <b>Return values</b>: previously existing values for certain {@link java.util.Map} operations are not returned,
- * null is returned instead. E.g. {@link java.util.Map#put(Object, Object)} returns the previous value
- * associated to the supplied key. In case of RemoteCache, this returns null.
- * <p>
- * <b>Synthetic operations</b>: aggregate operations are being implemented based on other Hot Rod operations.
- * E.g. all the {@link java.util.Map#putAll(java.util.Map)} is implemented through multiple individual puts. This means
- * that the these operations are not atomic and that they are costly, e.g. as the number of network round-trips is not
- * one, but the size of the added map. All these synthetic operations are documented as such.
- * <p>
- * <b>changing default behavior through {@link org.infinispan.client.hotrod.Flag}s</b>: it is possible to change de default cache behaviour by using
- * flags on an per invocation basis.
- * E.g.
+ * <p/>
+ * <b>Return values</b>: previously existing values for certain {@link java.util.Map} operations are not returned, null
+ * is returned instead. E.g. {@link java.util.Map#put(Object, Object)} returns the previous value associated to the
+ * supplied key. In case of RemoteCache, this returns null.
+ * <p/>
+ * <b>Synthetic operations</b>: aggregate operations are being implemented based on other Hot Rod operations. E.g. all
+ * the {@link java.util.Map#putAll(java.util.Map)} is implemented through multiple individual puts. This means that the
+ * these operations are not atomic and that they are costly, e.g. as the number of network round-trips is not one, but
+ * the size of the added map. All these synthetic operations are documented as such.
+ * <p/>
+ * <b>changing default behavior through {@link org.infinispan.client.hotrod.Flag}s</b>: it is possible to change de
+ * default cache behaviour by using flags on an per invocation basis. E.g.
  * <pre>
  *      RemoteCache cache = getRemoteCache();
  *      Object value = cache.withFlags(Flag.FORCE_RETURN_VALUE).get(aKey);
  * </pre>
- * In the previous example, using {@link org.infinispan.client.hotrod.Flag#FORCE_RETURN_VALUE} will make the client to also return previously
- * existing value associated with <tt>aKey</tt>. If this flag would not be present, Infinispan would return (by default)
- * <tt>null</tt>. This is in order to avoid fetching a possibly large object from the remote server, which might not be
- * needed. The flags as set by the {@link org.infinispan.client.hotrod.RemoteCache#withFlags(Flag...)} operation only apply for the very next
- * operation executed <b>by the same thread</b> on the RemoteCache.
- * <p>
- * <b><a href="http://community.jboss.org/wiki/Eviction">Eviction and expiration</a></b>:
- * Unlike local {@link org.infinispan.Cache} cache, which allows specifying time values with any granularity (as defined by {@link TimeUnit}),
- * HotRod only supports seconds as time units. If a different time unit is used instead, HotRod will transparently convert it to
- * seconds, using {@link java.util.concurrent.TimeUnit#toSeconds(long)} method. This might result in loss of precision for
- * values specified as nanos or milliseconds. <br/>
- * Another fundamental difference is in the case of lifespan (naturally does NOT apply for max idle): If number of seconds is bigger than 30 days,
- * this number of seconds is treated as UNIX time and so, represents the number of seconds since 1/1/1970. <br/>
+ * In the previous example, using {@link org.infinispan.client.hotrod.Flag#FORCE_RETURN_VALUE} will make the client to
+ * also return previously existing value associated with <tt>aKey</tt>. If this flag would not be present, Infinispan
+ * would return (by default) <tt>null</tt>. This is in order to avoid fetching a possibly large object from the remote
+ * server, which might not be needed. The flags as set by the {@link org.infinispan.client.hotrod.RemoteCache#withFlags(Flag...)}
+ * operation only apply for the very next operation executed <b>by the same thread</b> on the RemoteCache.
+ * <p/>
+ * <b><a href="http://community.jboss.org/wiki/Eviction">Eviction and expiration</a></b>: Unlike local {@link
+ * org.infinispan.Cache} cache, which allows specifying time values with any granularity (as defined by {@link
+ * TimeUnit}), HotRod only supports seconds as time units. If a different time unit is used instead, HotRod will
+ * transparently convert it to seconds, using {@link java.util.concurrent.TimeUnit#toSeconds(long)} method. This might
+ * result in loss of precision for values specified as nanos or milliseconds. <br/> Another fundamental difference is in
+ * the case of lifespan (naturally does NOT apply for max idle): If number of seconds is bigger than 30 days, this
+ * number of seconds is treated as UNIX time and so, represents the number of seconds since 1/1/1970. <br/>
  *
  * @author Mircea.Markus at jboss.com
  * @since 4.1
@@ -63,7 +62,8 @@
     * //some processing
     * remoteCache.removeWithVersion(key, ve.getVersion();
     * </pre>
-    * Lat call (removeWithVersion) will make sure that the entry will only be removed if it hasn't been changed in between.
+    * Lat call (removeWithVersion) will make sure that the entry will only be removed if it hasn't been changed in
+    * between.
     *
     * @return true if the entry has been removed
     * @see VersionedValue
@@ -77,8 +77,8 @@
    NotifyingFuture<Boolean> removeWithVersionAsync(K key, long version);
 
    /**
-    * Removes the given value only if its version matches the supplied version. See {@link #removeWithVersion(Object, long)} for a
-    * sample usage.
+    * Removes the given value only if its version matches the supplied version. See {@link #removeWithVersion(Object,
+    * long)} for a sample usage.
     *
     * @return true if the method has been replaced
     * @see #getVersioned(Object)
@@ -249,8 +249,7 @@
    NotifyingFuture<Boolean> replaceAsync(K key, V oldValue, V newValue);
 
    /**
-    * This operation is not supported. Consider using {@link #replaceAsync(K,V,long,int)}
-    * instead.
+    * This operation is not supported. Consider using {@link #replaceAsync(K,V,long,int)} instead.
     *
     * @throws UnsupportedOperationException
     */
@@ -339,10 +338,25 @@
 
    public ServerStatistics stats();
 
-   RemoteCache<K,V> withFlags(Flag... flags);
+   RemoteCache<K, V> withFlags(Flag... flags);
 
    /**
     * Returns the {@link org.infinispan.client.hotrod.RemoteCacheManager} that created this cache.
     */
    public RemoteCacheManager getRemoteCacheManager();
+
+   /**
+    * Bulk get operations, returns all the entries within the remote cache.
+    *
+    * @return the returned values depend on the configuration of the back-end infinispan servers. Read <a
+    *         href="http://community.jboss.org/wiki/HotRodBulkGet-Design#Server_side">this</a> for more details. The
+    *         returned Map is unmodifiable.
+    */
+   public Map<K, V> getBulk();
+
+   /**
+    * Same as {@link #getBulk()}, but limits the returned set of values to the specified size. No ordering is guaranteed, and there is no
+    * guarantee that "size" elements are returned( e.g. if the number of elements in the back-end server is smaller that "size")
+    */
+   public Map<K, V> getBulk(int size);
 }

Modified: branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/RemoteCacheImpl.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/RemoteCacheImpl.java	2010-07-19 10:44:24 UTC (rev 2051)
+++ branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/RemoteCacheImpl.java	2010-07-19 11:18:51 UTC (rev 2052)
@@ -16,6 +16,8 @@
 import org.infinispan.util.logging.LogFactory;
 
 import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
@@ -279,10 +281,33 @@
       byte[] keyBytes = obj2bytes(key, true);
       GetOperation gco = operationsFactory.newGetKeyOperation(keyBytes);
       byte[] bytes = (byte[]) gco.execute();
-      return (V) bytes2obj(bytes);
+      V result = (V) bytes2obj(bytes);
+      if (log.isTraceEnabled()) {
+         log.trace("For key(" + key + ") returning " + result);
+      }
+      return result;
    }
 
    @Override
+   public Map<K, V> getBulk() {
+      return getBulk(0);
+   }
+
+   @Override
+   public Map<K, V> getBulk(int size) {
+      assertRemoteCacheManagerIsStarted();
+      BulkGetOperation op = operationsFactory.newBulkGetOperation(size);
+      Map<byte[], byte[]> result = (Map) op.execute();
+      Map<K,V> toReturn = new HashMap<K,V>();
+      for (Map.Entry<byte[], byte[]> entry : result.entrySet()) {
+         V value = (V) bytes2obj(entry.getValue());
+         K key = (K) bytes2obj(entry.getKey());
+         toReturn.put(key, value);
+      }
+      return Collections.unmodifiableMap(toReturn);
+   }
+
+   @Override
    public V remove(Object key) {
       assertRemoteCacheManagerIsStarted();
       RemoveOperation removeOperation = operationsFactory.newRemoveOperation(obj2bytes(key, true));

Added: branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/BulkGetOperation.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/BulkGetOperation.java	                        (rev 0)
+++ branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/BulkGetOperation.java	2010-07-19 11:18:51 UTC (rev 2052)
@@ -0,0 +1,42 @@
+package org.infinispan.client.hotrod.impl.operations;
+
+import org.infinispan.client.hotrod.Flag;
+import org.infinispan.client.hotrod.impl.transport.Transport;
+import org.infinispan.client.hotrod.impl.transport.TransportFactory;
+
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Reads more keys at a time. Specified <a href="http://community.jboss.org/wiki/HotRodBulkGet-Design">here</a>.
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public class BulkGetOperation extends RetryOnFailureOperation {
+
+   private final int entryCount;
+
+   public BulkGetOperation(TransportFactory transportFactory, byte[] cacheName, AtomicInteger topologyId, Flag[] flags, int entryCount) {
+      super(transportFactory, cacheName, topologyId, flags);
+      this.entryCount = entryCount;
+   }
+   
+   @Override
+   protected Transport getTransport(int retryCount) {
+      return transportFactory.getTransport();
+   }
+
+   @Override
+   protected Object executeOperation(Transport transport) {
+      long messageId = writeHeader(transport, BULK_GET_REQUEST);
+      transport.writeVInt(entryCount);
+      transport.flush();
+      readHeaderAndValidate(transport, messageId, BULK_GET_RESPONSE);
+      HashMap result = new HashMap();
+      while ( transport.readByte() == 1) { //there's more!
+         result.put(transport.readArray(), transport.readArray());
+      }
+      return result;
+   }
+}

Modified: branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/OperationsFactory.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/OperationsFactory.java	2010-07-19 10:44:24 UTC (rev 2051)
+++ branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/operations/OperationsFactory.java	2010-07-19 11:18:51 UTC (rev 2052)
@@ -80,6 +80,10 @@
       return new ClearOperation(transportFactory, cacheNameBytes, topologyId, flags());
    }
 
+   public BulkGetOperation newBulkGetOperation(int size) {
+      return new BulkGetOperation(transportFactory, cacheNameBytes, topologyId, flags(), size);
+   }
+
    private Flag[] flags() {
       Flag[] flags = this.flagsMap.get();
       this.flagsMap.remove();

Modified: branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodConstants.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodConstants.java	2010-07-19 10:44:24 UTC (rev 2051)
+++ branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodConstants.java	2010-07-19 11:18:51 UTC (rev 2052)
@@ -28,6 +28,7 @@
    static final byte CLEAR_REQUEST = 0x13;
    static final byte STATS_REQUEST = 0x15;
    static final byte PING_REQUEST = 0x17;
+   static final byte BULK_GET_REQUEST = 0x19;
 
 
    //responses
@@ -43,6 +44,7 @@
    static final byte CLEAR_RESPONSE = 0x14;
    static final byte STATS_RESPONSE = 0x16;
    static final byte PING_RESPONSE = 0x18;
+   static final byte BULK_GET_RESPONSE = 0x20;
    static final byte ERROR_RESPONSE = 0x50;
 
    //response status

Added: branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/BulkGetSimpleTest.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/BulkGetSimpleTest.java	                        (rev 0)
+++ branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/BulkGetSimpleTest.java	2010-07-19 11:18:51 UTC (rev 2052)
@@ -0,0 +1,70 @@
+package org.infinispan.client.hotrod;
+
+import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.server.hotrod.HotRodServer;
+import org.infinispan.test.SingleCacheManagerTest;
+import org.infinispan.test.fwk.TestCacheManagerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.Test;
+
+import java.util.Map;
+import java.util.Properties;
+
+import static org.testng.AssertJUnit.assertEquals;
+
+/**
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+ at Test(testName = "client.hotrod.BulkGetSimpleTest", groups = "functional")
+public class BulkGetSimpleTest extends SingleCacheManagerTest {
+   private HotRodServer hotRodServer;
+   private RemoteCacheManager remoteCacheManager;
+   private RemoteCache<Object, Object> remoteCache;
+
+   @Override
+   protected EmbeddedCacheManager createCacheManager() throws Exception {
+      cacheManager = TestCacheManagerFactory.createLocalCacheManager();
+      cache = cacheManager.getCache();
+
+      hotRodServer = TestHelper.startHotRodServer(cacheManager);
+
+      Properties hotrodClientConf = new Properties();
+      hotrodClientConf.put("infinispan.client.hotrod.server_list", "localhost:" + hotRodServer.getPort());
+      remoteCacheManager = new RemoteCacheManager(hotrodClientConf);
+      remoteCache = remoteCacheManager.getCache();
+      populateCacheManager();
+      return cacheManager;
+   }
+
+   @AfterMethod
+   @Override
+   protected void clearContent() {
+      
+   }
+
+   private void populateCacheManager() {
+      for (int i = 0; i < 100; i++) {
+         remoteCache.put(i, i);
+      }
+   }
+
+   public void testBulkGet() {
+      Map<Object,Object> map = remoteCache.getBulk();
+      assert map.size() == 100;
+      for (int i = 0; i < 100; i++) {
+         assert map.get(i).equals(i);
+      }
+   }
+
+   public void testBulkGetWithSize() {
+      Map<Object,Object> map = remoteCache.getBulk(50);
+      assertEquals(50, map.size());
+      for (int i = 0; i < 100; i++) {
+         if (map.containsKey(i)) {
+            Integer value = (Integer) map.get(i);
+            assertEquals((Integer)i, value);
+         }
+      }
+   }
+}

Modified: branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/retry/ReplicationRetryTest.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/retry/ReplicationRetryTest.java	2010-07-19 10:44:24 UTC (rev 2051)
+++ branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/retry/ReplicationRetryTest.java	2010-07-19 11:18:51 UTC (rev 2052)
@@ -16,6 +16,7 @@
 import org.testng.annotations.Test;
 
 import java.net.InetSocketAddress;
+import java.util.Map;
 import java.util.Properties;
 
 import static org.testng.Assert.assertEquals;
@@ -99,6 +100,13 @@
       assertEquals(false, remoteCache.containsKey("k"));
    }
 
+   public void testBulkGet() {
+      validateSequenceAndStopServer();
+      resetStats();
+      Map map = remoteCache.getBulk();
+      assertEquals(3, map.size());
+   }
+
    private void validateSequenceAndStopServer() {
       resetStats();
       assertNoHits();

Modified: branches/4.1.x/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala
===================================================================
--- branches/4.1.x/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala	2010-07-19 10:44:24 UTC (rev 2051)
+++ branches/4.1.x/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala	2010-07-19 11:18:51 UTC (rev 2052)
@@ -40,6 +40,7 @@
          case 0x13 => ClearRequest
          case 0x15 => StatsRequest
          case 0x17 => PingRequest
+         case 0x19 => BulkGetRequest
          case _ => throw new UnknownOperationException("Unknown operation: " + streamOp)
       }
       if (isTraceEnabled) trace("Operation code: {0} has been matched to {1}", streamOp, op)
@@ -145,6 +146,11 @@
             new Response(h.messageId, h.cacheName, h.clientIntel, ClearResponse, Success, h.topologyId)
          }
          case PingRequest => new Response(h.messageId, h.cacheName, h.clientIntel, PingResponse, Success, h.topologyId)
+         case BulkGetRequest => {
+            val count = buffer.readUnsignedInt
+            if (isTraceEnabled) trace("About to create bulk response, count = " + count)
+            new BulkGetResponse(h.messageId, h.cacheName, h.clientIntel, BulkGetResponse, Success, h.topologyId, cache, count)
+         }
       }
    }
 
@@ -195,6 +201,7 @@
          case ClearRequest => ClearResponse
          case StatsRequest => StatsResponse
          case PingRequest => PingResponse
+         case BulkGetRequest => BulkGetResponse
       }
    }
 
@@ -214,6 +221,7 @@
    val ClearResponse = Value(0x14)
    val StatsResponse = Value(0x16)
    val PingResponse = Value(0x18)
+   val BulkGetResponse = Value(0x20)
    val ErrorResponse = Value(0x50)
 }
 

Modified: branches/4.1.x/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala
===================================================================
--- branches/4.1.x/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala	2010-07-19 10:44:24 UTC (rev 2051)
+++ branches/4.1.x/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala	2010-07-19 11:18:51 UTC (rev 2052)
@@ -102,9 +102,13 @@
    override def createMultiGetResponse(h: HotRodHeader, pairs: Map[ByteArrayKey, CacheValue]): AnyRef =
       null // Unsupported
 
-   override def handleCustomRequest(h: HotRodHeader, b: ChannelBuffer, cache: Cache[ByteArrayKey, CacheValue]): AnyRef =
+   override def handleCustomRequest(h: HotRodHeader, b: ChannelBuffer, cache: Cache[ByteArrayKey, CacheValue]): AnyRef = {
       h.decoder.handleCustomRequest(h, b, cache)
+      if (isTrace) trace("About to return: " + result)
+      result
+   }
 
+
    override def createStatsResponse(h: HotRodHeader, stats: Stats): AnyRef =
       h.decoder.createStatsResponse(h, stats)
 

Modified: branches/4.1.x/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodEncoder.scala
===================================================================
--- branches/4.1.x/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodEncoder.scala	2010-07-19 10:44:24 UTC (rev 2051)
+++ branches/4.1.x/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodEncoder.scala	2010-07-19 11:18:51 UTC (rev 2052)
@@ -8,6 +8,8 @@
 import org.infinispan.Cache
 import org.infinispan.server.core.{CacheValue, Logging}
 import org.infinispan.util.ByteArrayKey
+import java.util.Iterator
+import org.infinispan.container.entries.{InternalCacheValue, InternalCacheEntry}
 
 /**
  * // TODO: Document this
@@ -50,6 +52,27 @@
                buffer.writeRangedBytes(g.data.get)
             }
          }
+         case g: BulkGetResponse => {
+            if (isTrace) trace("About to repond to bulk get request: ")
+            if (g.status == Success) {
+               val dataContainer = g.cache.getAdvancedCache().getDataContainer()
+               var iterator: Iterator[InternalCacheEntry] = dataContainer.iterator()
+               val count = g.count
+               var written:Int = 0;
+               if (isTrace) trace("About to write (max) " + count + " messages to the client. Is written <= count ?" + (written <= count))
+               while (iterator.hasNext() && ((written < count) || (count == 0)) ) {
+                  if (isTrace) trace("About to write message number " + written)
+                  buffer.writeByte(1) //not done
+                  written = written + 1
+                  var ice: InternalCacheEntry = iterator.next()
+                  val key:ByteArrayKey = ice.getKey().asInstanceOf[ByteArrayKey]
+                  buffer.writeRangedBytes(key.getData)
+                  val cacheValue : CacheValue = ice.getValue().asInstanceOf[CacheValue]
+                  buffer.writeRangedBytes(cacheValue.data)
+               }
+               buffer.writeByte(0)
+            }
+         }
          case g: GetResponse => if (g.status == Success) buffer.writeRangedBytes(g.data.get)
          case e: ErrorResponse => buffer.writeString(e.msg)
          case _ => if (buffer == null) throw new IllegalArgumentException("Response received is unknown: " + msg);         

Modified: branches/4.1.x/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodOperation.scala
===================================================================
--- branches/4.1.x/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodOperation.scala	2010-07-19 10:44:24 UTC (rev 2051)
+++ branches/4.1.x/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodOperation.scala	2010-07-19 11:18:51 UTC (rev 2052)
@@ -14,5 +14,6 @@
    val ClearRequest = Value
    val QuitRequest = Value
    val PingRequest = Value
+   val BulkGetRequest = Value
 
 }
\ No newline at end of file

Modified: branches/4.1.x/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Response.scala
===================================================================
--- branches/4.1.x/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Response.scala	2010-07-19 10:44:24 UTC (rev 2051)
+++ branches/4.1.x/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Response.scala	2010-07-19 11:18:51 UTC (rev 2052)
@@ -2,7 +2,9 @@
 
 import OperationStatus._
 import OperationResponse._
-import org.infinispan.util.Util
+import org.infinispan.Cache
+import org.infinispan.server.core.CacheValue
+import org.infinispan.util.{ByteArrayKey, Util}
 
 /**
  * // TODO: Document this
@@ -51,6 +53,19 @@
          .append("}").toString
    }
 }
+class BulkGetResponse(override val messageId: Long, override val cacheName: String, override val clientIntel: Short,
+                  override val operation: OperationResponse, override val status: OperationStatus,
+                  override val topologyId: Int,
+                  val cache: Cache[ByteArrayKey, CacheValue], val count: Int)
+      extends Response(messageId, cacheName, clientIntel, operation, status, topologyId) {
+   override def toString = {
+      new StringBuilder().append("BulkGetResponse").append("{")
+         .append("messageId=").append(messageId)
+         .append(", operation=").append(operation)
+         .append(", status=").append(status)
+         .append(", data=").append("}").toString
+   }
+}
 
 class GetWithVersionResponse(override val messageId: Long, override val cacheName: String,
                              override val clientIntel: Short, override val operation: OperationResponse,



More information about the infinispan-commits mailing list