[infinispan-commits] Infinispan SVN: r1823 - in trunk/server: hotrod/src/main/scala/org/infinispan/server/hotrod and 1 other directory.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Wed May 19 02:31:39 EDT 2010


Author: galder.zamarreno at jboss.com
Date: 2010-05-19 02:31:38 -0400 (Wed, 19 May 2010)
New Revision: 1823

Modified:
   trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolDecoder.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/AbstractVersionedDecoder.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala
Log:
[ISPN-437] (Hotrod server optimisation: use SKIP_REMOTE_LOOKUP if no return values are needed) Done.

Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolDecoder.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolDecoder.scala	2010-05-18 22:51:43 UTC (rev 1822)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolDecoder.scala	2010-05-19 06:31:38 UTC (rev 1823)
@@ -72,20 +72,22 @@
       }
    }
 
-   private def put(header: SuitableHeader, k: K, params: Option[SuitableParameters], cache: Cache[K, V]): AnyRef = {
+   private def put(h: SuitableHeader, k: K, params: Option[SuitableParameters], c: Cache[K, V]): AnyRef = {
       val p = params.get
-      val cache = getCache(header)
-      val v = createValue(header, p, generateVersion(cache))
-      val prev = cache.put(k, v, toMillis(p.lifespan), DefaultTimeUnit, toMillis(p.maxIdle), DefaultTimeUnit)
-      createSuccessResponse(header, params, prev)
+      val v = createValue(h, p, generateVersion(c))
+      // Get an optimised cache in case we can make the operation more efficient
+      val prev = getOptimizedCache(h, c).put(k, v, toMillis(p.lifespan), DefaultTimeUnit, toMillis(p.maxIdle), DefaultTimeUnit)
+      createSuccessResponse(h, params, prev)
    }
 
-   private def putIfAbsent(header: SuitableHeader, k: K, params: Option[SuitableParameters], cache: Cache[K, V]): AnyRef = {
+   protected def getOptimizedCache(header: SuitableHeader, c: Cache[K, V]): Cache[K, V] = c
+
+   private def putIfAbsent(header: SuitableHeader, k: K, params: Option[SuitableParameters], c: Cache[K, V]): AnyRef = {
       val p = params.get
-      val prev = cache.get(k)
-      if (prev == null) { // Generate new version only if key not present      
-         val v = createValue(header, p, generateVersion(cache))
-         cache.putIfAbsent(k, v, toMillis(p.lifespan), DefaultTimeUnit, toMillis(p.maxIdle), DefaultTimeUnit)
+      var prev = c.get(k)
+      if (prev == null) { // Generate new version only if key not present
+         val v = createValue(header, p, generateVersion(c))
+         prev = c.putIfAbsent(k, v, toMillis(p.lifespan), DefaultTimeUnit, toMillis(p.maxIdle), DefaultTimeUnit)
       }
       if (prev == null)
          createSuccessResponse(header, params, prev)
@@ -93,12 +95,12 @@
          createNotExecutedResponse(header, params, prev)
    }
 
-   private def replace(header: SuitableHeader, k: K, params: Option[SuitableParameters], cache: Cache[K, V]): AnyRef = {
+   private def replace(header: SuitableHeader, k: K, params: Option[SuitableParameters], c: Cache[K, V]): AnyRef = {
       val p = params.get
-      val prev = cache.get(k)
+      var prev = c.get(k)
       if (prev != null) { // Generate new version only if key present
-         val v = createValue(header, p, generateVersion(cache))
-         cache.replace(k, v, toMillis(p.lifespan), DefaultTimeUnit, toMillis(p.maxIdle), DefaultTimeUnit)
+         val v = createValue(header, p, generateVersion(c))
+         prev = c.replace(k, v, toMillis(p.lifespan), DefaultTimeUnit, toMillis(p.maxIdle), DefaultTimeUnit)
       }
       if (prev != null)
          createSuccessResponse(header, params, prev)
@@ -106,14 +108,14 @@
          createNotExecutedResponse(header, params, prev)
    }
 
-   private def replaceIfUmodified(header: SuitableHeader, k: K, params: Option[SuitableParameters], cache: Cache[K, V]): AnyRef = {
+   private def replaceIfUmodified(header: SuitableHeader, k: K, params: Option[SuitableParameters], c: Cache[K, V]): AnyRef = {
       val p = params.get
-      val prev = cache.get(k)
+      val prev = c.get(k)
       if (prev != null) {
          if (prev.version == p.streamVersion) {
             // Generate new version only if key present and version has not changed, otherwise it's wasteful
-            val v = createValue(header, p, generateVersion(cache))
-            val replaced = cache.replace(k, prev, v);
+            val v = createValue(header, p, generateVersion(c))
+            val replaced = c.replace(k, prev, v);
             if (replaced)
                createSuccessResponse(header, params, prev)
             else
@@ -124,8 +126,8 @@
       } else createNotExistResponse(header, params)
    }
 
-   private def remove(header: SuitableHeader, k: K, params: Option[SuitableParameters], cache: Cache[K, V]): AnyRef = {
-      val prev = cache.remove(k)
+   private def remove(header: SuitableHeader, k: K, params: Option[SuitableParameters], c: Cache[K, V]): AnyRef = {
+      val prev = c.remove(k)
       if (prev != null)
          createSuccessResponse(header, params, prev)
       else

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/AbstractVersionedDecoder.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/AbstractVersionedDecoder.scala	2010-05-18 22:51:43 UTC (rev 1822)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/AbstractVersionedDecoder.scala	2010-05-19 06:31:38 UTC (rev 1823)
@@ -37,4 +37,5 @@
 
    def createErrorResponse(header: HotRodHeader, t: Throwable): AnyRef
 
+   def getOptimizedCache(h: HotRodHeader, c: Cache[CacheKey, CacheValue]): Cache[CacheKey, CacheValue]
 }
\ No newline at end of file

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala	2010-05-18 22:51:43 UTC (rev 1822)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala	2010-05-19 06:31:38 UTC (rev 1823)
@@ -12,14 +12,14 @@
 import collection.immutable
 import org.infinispan.util.concurrent.TimeoutException
 import java.io.IOException
-import org.infinispan.distribution.DefaultConsistentHash
+import org.infinispan.context.Flag.SKIP_REMOTE_LOOKUP
 
 /**
- * // TODO: Document this
+ * HotRod protocol decoder specific for specification version 1.0.
+ *
  * @author Galder Zamarreño
- * @since
+ * @since 4.1
  */
-
 class Decoder10(cacheManager: CacheManager) extends AbstractVersionedDecoder {
    import RequestResolver._
    import ResponseResolver._
@@ -137,7 +137,8 @@
          }
          case ClearRequest => {
             val topologyResponse = getTopologyResponse(h)
-            cache.clear
+            // Get an optimised cache in case we can make the operation more efficient
+            getOptimizedCache(h, cache).clear
             new Response(h.messageId, h.cacheName, h.clientIntel, ClearResponse, Success, topologyResponse)
          }
          case PingRequest => {
@@ -197,6 +198,13 @@
       } else None
    }
 
+   override def getOptimizedCache(h: HotRodHeader, c: Cache[CacheKey, CacheValue]): Cache[CacheKey, CacheValue] = {
+      if (c.getConfiguration.getCacheMode.isDistributed && h.flag == ForceReturnPreviousValue) {
+         c.getAdvancedCache.withFlags(SKIP_REMOTE_LOOKUP)
+      } else {
+         c
+      }
+   }
 }
 
 object RequestResolver extends Logging {

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala	2010-05-18 22:51:43 UTC (rev 1822)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala	2010-05-19 06:31:38 UTC (rev 1823)
@@ -67,11 +67,12 @@
    override def getCache(header: HotRodHeader): Cache[CacheKey, CacheValue] = {
       val cacheName = header.cacheName
       if (cacheName == TopologyCacheName)
-         throw new CacheException("Remote requests are not allowed to topology cache. Do no send remote requests to cache " + TopologyCacheName)
+         throw new CacheException("Remote requests are not allowed to topology cache. Do no send remote requests to cache "
+               + TopologyCacheName)
 
       if (cacheName != DefaultCacheManager.DEFAULT_CACHE_NAME && !cacheManager.getCacheNames.contains(cacheName))
          throw new CacheNotFoundException("Cache with name '" + cacheName + "' not found amongst the configured caches")
-      
+
       if (cacheName == DefaultCacheManager.DEFAULT_CACHE_NAME) cacheManager.getCache[CacheKey, CacheValue]
       else cacheManager.getCache(cacheName)
    }
@@ -125,6 +126,9 @@
       }
    }
 
+   override protected def getOptimizedCache(h: HotRodHeader, c: Cache[CacheKey, CacheValue]): Cache[CacheKey, CacheValue] = {
+      h.decoder.getOptimizedCache(h, c)
+   }
 }
 
 object HotRodDecoder extends Logging {



More information about the infinispan-commits mailing list