[infinispan-commits] Infinispan SVN: r1823 - in trunk/server: hotrod/src/main/scala/org/infinispan/server/hotrod and 1 other directory.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Wed May 19 02:31:39 EDT 2010
Author: galder.zamarreno at jboss.com
Date: 2010-05-19 02:31:38 -0400 (Wed, 19 May 2010)
New Revision: 1823
Modified:
trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolDecoder.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/AbstractVersionedDecoder.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala
Log:
[ISPN-437] (Hotrod server optimisation: use SKIP_REMOTE_LOOKUP if no return values are needed) Done.
Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolDecoder.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolDecoder.scala 2010-05-18 22:51:43 UTC (rev 1822)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolDecoder.scala 2010-05-19 06:31:38 UTC (rev 1823)
@@ -72,20 +72,22 @@
}
}
- private def put(header: SuitableHeader, k: K, params: Option[SuitableParameters], cache: Cache[K, V]): AnyRef = {
+ private def put(h: SuitableHeader, k: K, params: Option[SuitableParameters], c: Cache[K, V]): AnyRef = {
val p = params.get
- val cache = getCache(header)
- val v = createValue(header, p, generateVersion(cache))
- val prev = cache.put(k, v, toMillis(p.lifespan), DefaultTimeUnit, toMillis(p.maxIdle), DefaultTimeUnit)
- createSuccessResponse(header, params, prev)
+ val v = createValue(h, p, generateVersion(c))
+ // Get an optimised cache in case we can make the operation more efficient
+ val prev = getOptimizedCache(h, c).put(k, v, toMillis(p.lifespan), DefaultTimeUnit, toMillis(p.maxIdle), DefaultTimeUnit)
+ createSuccessResponse(h, params, prev)
}
- private def putIfAbsent(header: SuitableHeader, k: K, params: Option[SuitableParameters], cache: Cache[K, V]): AnyRef = {
+ protected def getOptimizedCache(header: SuitableHeader, c: Cache[K, V]): Cache[K, V] = c
+
+ private def putIfAbsent(header: SuitableHeader, k: K, params: Option[SuitableParameters], c: Cache[K, V]): AnyRef = {
val p = params.get
- val prev = cache.get(k)
- if (prev == null) { // Generate new version only if key not present
- val v = createValue(header, p, generateVersion(cache))
- cache.putIfAbsent(k, v, toMillis(p.lifespan), DefaultTimeUnit, toMillis(p.maxIdle), DefaultTimeUnit)
+ var prev = c.get(k)
+ if (prev == null) { // Generate new version only if key not present
+ val v = createValue(header, p, generateVersion(c))
+ prev = c.putIfAbsent(k, v, toMillis(p.lifespan), DefaultTimeUnit, toMillis(p.maxIdle), DefaultTimeUnit)
}
if (prev == null)
createSuccessResponse(header, params, prev)
@@ -93,12 +95,12 @@
createNotExecutedResponse(header, params, prev)
}
- private def replace(header: SuitableHeader, k: K, params: Option[SuitableParameters], cache: Cache[K, V]): AnyRef = {
+ private def replace(header: SuitableHeader, k: K, params: Option[SuitableParameters], c: Cache[K, V]): AnyRef = {
val p = params.get
- val prev = cache.get(k)
+ var prev = c.get(k)
if (prev != null) { // Generate new version only if key present
- val v = createValue(header, p, generateVersion(cache))
- cache.replace(k, v, toMillis(p.lifespan), DefaultTimeUnit, toMillis(p.maxIdle), DefaultTimeUnit)
+ val v = createValue(header, p, generateVersion(c))
+ prev = c.replace(k, v, toMillis(p.lifespan), DefaultTimeUnit, toMillis(p.maxIdle), DefaultTimeUnit)
}
if (prev != null)
createSuccessResponse(header, params, prev)
@@ -106,14 +108,14 @@
createNotExecutedResponse(header, params, prev)
}
- private def replaceIfUmodified(header: SuitableHeader, k: K, params: Option[SuitableParameters], cache: Cache[K, V]): AnyRef = {
+ private def replaceIfUmodified(header: SuitableHeader, k: K, params: Option[SuitableParameters], c: Cache[K, V]): AnyRef = {
val p = params.get
- val prev = cache.get(k)
+ val prev = c.get(k)
if (prev != null) {
if (prev.version == p.streamVersion) {
// Generate new version only if key present and version has not changed, otherwise it's wasteful
- val v = createValue(header, p, generateVersion(cache))
- val replaced = cache.replace(k, prev, v);
+ val v = createValue(header, p, generateVersion(c))
+ val replaced = c.replace(k, prev, v);
if (replaced)
createSuccessResponse(header, params, prev)
else
@@ -124,8 +126,8 @@
} else createNotExistResponse(header, params)
}
- private def remove(header: SuitableHeader, k: K, params: Option[SuitableParameters], cache: Cache[K, V]): AnyRef = {
- val prev = cache.remove(k)
+ private def remove(header: SuitableHeader, k: K, params: Option[SuitableParameters], c: Cache[K, V]): AnyRef = {
+ val prev = c.remove(k)
if (prev != null)
createSuccessResponse(header, params, prev)
else
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/AbstractVersionedDecoder.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/AbstractVersionedDecoder.scala 2010-05-18 22:51:43 UTC (rev 1822)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/AbstractVersionedDecoder.scala 2010-05-19 06:31:38 UTC (rev 1823)
@@ -37,4 +37,5 @@
def createErrorResponse(header: HotRodHeader, t: Throwable): AnyRef
+ def getOptimizedCache(h: HotRodHeader, c: Cache[CacheKey, CacheValue]): Cache[CacheKey, CacheValue]
}
\ No newline at end of file
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala 2010-05-18 22:51:43 UTC (rev 1822)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala 2010-05-19 06:31:38 UTC (rev 1823)
@@ -12,14 +12,14 @@
import collection.immutable
import org.infinispan.util.concurrent.TimeoutException
import java.io.IOException
-import org.infinispan.distribution.DefaultConsistentHash
+import org.infinispan.context.Flag.SKIP_REMOTE_LOOKUP
/**
- * // TODO: Document this
+ * HotRod protocol decoder specific for specification version 1.0.
+ *
* @author Galder Zamarreño
- * @since
+ * @since 4.1
*/
-
class Decoder10(cacheManager: CacheManager) extends AbstractVersionedDecoder {
import RequestResolver._
import ResponseResolver._
@@ -137,7 +137,8 @@
}
case ClearRequest => {
val topologyResponse = getTopologyResponse(h)
- cache.clear
+ // Get an optimised cache in case we can make the operation more efficient
+ getOptimizedCache(h, cache).clear
new Response(h.messageId, h.cacheName, h.clientIntel, ClearResponse, Success, topologyResponse)
}
case PingRequest => {
@@ -197,6 +198,13 @@
} else None
}
+ override def getOptimizedCache(h: HotRodHeader, c: Cache[CacheKey, CacheValue]): Cache[CacheKey, CacheValue] = {
+ if (c.getConfiguration.getCacheMode.isDistributed && h.flag == ForceReturnPreviousValue) {
+ c.getAdvancedCache.withFlags(SKIP_REMOTE_LOOKUP)
+ } else {
+ c
+ }
+ }
}
object RequestResolver extends Logging {
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala 2010-05-18 22:51:43 UTC (rev 1822)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala 2010-05-19 06:31:38 UTC (rev 1823)
@@ -67,11 +67,12 @@
override def getCache(header: HotRodHeader): Cache[CacheKey, CacheValue] = {
val cacheName = header.cacheName
if (cacheName == TopologyCacheName)
- throw new CacheException("Remote requests are not allowed to topology cache. Do no send remote requests to cache " + TopologyCacheName)
+ throw new CacheException("Remote requests are not allowed to topology cache. Do no send remote requests to cache "
+ + TopologyCacheName)
if (cacheName != DefaultCacheManager.DEFAULT_CACHE_NAME && !cacheManager.getCacheNames.contains(cacheName))
throw new CacheNotFoundException("Cache with name '" + cacheName + "' not found amongst the configured caches")
-
+
if (cacheName == DefaultCacheManager.DEFAULT_CACHE_NAME) cacheManager.getCache[CacheKey, CacheValue]
else cacheManager.getCache(cacheName)
}
@@ -125,6 +126,9 @@
}
}
+ override protected def getOptimizedCache(h: HotRodHeader, c: Cache[CacheKey, CacheValue]): Cache[CacheKey, CacheValue] = {
+ h.decoder.getOptimizedCache(h, c)
+ }
}
object HotRodDecoder extends Logging {
More information about the infinispan-commits
mailing list