[infinispan-commits] Infinispan SVN: r1115 - in trunk/core/src: main/java/org/infinispan/interceptors and 1 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Sun Nov 8 11:52:07 EST 2009


Author: manik.surtani at jboss.com
Date: 2009-11-08 11:52:07 -0500 (Sun, 08 Nov 2009)
New Revision: 1115

Modified:
   trunk/core/src/main/java/org/infinispan/distribution/DistributionManager.java
   trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java
   trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java
   trunk/core/src/test/resources/log4j.xml
Log:
[ISPN-258] (Joiner doesn't see state when rehashing is in progress) fixed

Modified: trunk/core/src/main/java/org/infinispan/distribution/DistributionManager.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/DistributionManager.java	2009-11-08 16:38:31 UTC (rev 1114)
+++ trunk/core/src/main/java/org/infinispan/distribution/DistributionManager.java	2009-11-08 16:52:07 UTC (rev 1115)
@@ -105,6 +105,8 @@
 
    boolean isRehashInProgress();
 
+   boolean isJoinComplete();
+
    void applyReceivedState(Map<Object, InternalCacheValue> state);
 }
 

Modified: trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java	2009-11-08 16:38:31 UTC (rev 1114)
+++ trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java	2009-11-08 16:52:07 UTC (rev 1115)
@@ -357,6 +357,10 @@
       }
    }
 
+   public boolean isJoinComplete() {
+      return joinComplete;
+   }
+
    void drainTransactionLog() {
       List<WriteCommand> c;
       while (transactionLogger.size() > 10) {

Modified: trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java	2009-11-08 16:38:31 UTC (rev 1114)
+++ trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java	2009-11-08 16:52:07 UTC (rev 1115)
@@ -86,11 +86,12 @@
 
    @Override
    public Object visitGetKeyValueCommand(InvocationContext ctx, GetKeyValueCommand command) throws Throwable {
+      boolean isStillRehashingOnJoin = !dm.isJoinComplete();
       Object returnValue = invokeNextInterceptor(ctx, command);
       // need to check in the context as well since a null retval is not necessarily an indication of the entry not being
       // available.  It could just have been removed in the same tx beforehand.
       if (!ctx.hasFlag(Flag.SKIP_REMOTE_LOOKUP) && returnValue == null && ctx.lookupEntry(command.getKey()) == null)
-         returnValue = remoteGetAndStoreInL1(ctx, command.getKey());
+         returnValue = remoteGetAndStoreInL1(ctx, command.getKey(), isStillRehashingOnJoin);
       return returnValue;
    }
 
@@ -106,28 +107,40 @@
     * @return value of a remote get, or null
     * @throws Throwable if there are problems
     */
-   private Object remoteGetAndStoreInL1(InvocationContext ctx, Object key) throws Throwable {
-      if (ctx.isOriginLocal() && !dm.isLocal(key) && isNotInL1(key)) {
-         if (trace) log.trace("Doing a remote get for key {0}", key);
-         // attempt a remote lookup
-         InternalCacheEntry ice = dm.retrieveFromRemoteSource(key);
+   private Object remoteGetAndStoreInL1(InvocationContext ctx, Object key, boolean dmWasRehashingDuringLocalLookup) throws Throwable {
+      boolean isMappedToLocalNode = false;
+      if (ctx.isOriginLocal() && !(isMappedToLocalNode = dm.isLocal(key)) && isNotInL1(key)) {
+         return realRemoteGetAndStoreInL1(ctx, key);
+      } else {
+         // maybe we are still rehashing as a joiner? ISPN-258
+         if (isMappedToLocalNode && dmWasRehashingDuringLocalLookup) {
+            if (trace) log.trace("Key is mapped to local node, but a rehash is in progress so may need to look elsewhere");
+            // try a remote lookup all the same
+            return realRemoteGetAndStoreInL1(ctx, key);
+         } else {
+            if (trace)
+               log.trace("Not doing a remote get for key {0} since entry is mapped to current node, or is in L1", key);
+         }
+      }
+      return null;
+   }
 
-         if (ice != null) {
-            if (isL1CacheEnabled) {
-               if (trace) log.trace("Caching remotely retrieved entry for key {0} in L1", key);
-               long lifespan = ice.getLifespan() < 0 ? configuration.getL1Lifespan() : Math.min(ice.getLifespan(), configuration.getL1Lifespan());
-               PutKeyValueCommand put = cf.buildPutKeyValueCommand(ice.getKey(), ice.getValue(), lifespan, -1);
-               entryFactory.wrapEntryForWriting(ctx, key, true, false, ctx.hasLockedKey(key), false);
-               invokeNextInterceptor(ctx, put);
-            } else {
-               if (trace) log.trace("Not caching remotely retrieved entry for key {0} in L1", key);
-            }
-            return ice.getValue();
+   private Object realRemoteGetAndStoreInL1(InvocationContext ctx, Object key) throws Throwable {
+      if (trace) log.trace("Doing a remote get for key {0}", key);
+      // attempt a remote lookup
+      InternalCacheEntry ice = dm.retrieveFromRemoteSource(key);
+
+      if (ice != null) {
+         if (isL1CacheEnabled) {
+            if (trace) log.trace("Caching remotely retrieved entry for key {0} in L1", key);
+            long lifespan = ice.getLifespan() < 0 ? configuration.getL1Lifespan() : Math.min(ice.getLifespan(), configuration.getL1Lifespan());
+            PutKeyValueCommand put = cf.buildPutKeyValueCommand(ice.getKey(), ice.getValue(), lifespan, -1);
+            entryFactory.wrapEntryForWriting(ctx, key, true, false, ctx.hasLockedKey(key), false);
+            invokeNextInterceptor(ctx, put);
+         } else {
+            if (trace) log.trace("Not caching remotely retrieved entry for key {0} in L1", key);
          }
-
-      } else {
-         if (trace)
-            log.trace("Not doing a remote get for key {0} since entry is mapped to current node, or is in L1", key);
+         return ice.getValue();
       }
       return null;
    }
@@ -225,9 +238,9 @@
       // this should only happen if:
       //   a) unsafeUnreliableReturnValues is false
       //   b) unsafeUnreliableReturnValues is true, we are in a TX and the command is conditional
-
+      boolean isStillRehashingOnJoin = !dm.isJoinComplete();
       if (isNeedReliableReturnValues(ctx) || (isConditionalCommand && ctx.isInTxScope())) {
-         for (Object k : keygen.getKeys()) remoteGetAndStoreInL1(ctx, k);
+         for (Object k : keygen.getKeys()) remoteGetAndStoreInL1(ctx, k, isStillRehashingOnJoin);
       }
    }
 

Modified: trunk/core/src/test/resources/log4j.xml
===================================================================
--- trunk/core/src/test/resources/log4j.xml	2009-11-08 16:38:31 UTC (rev 1114)
+++ trunk/core/src/test/resources/log4j.xml	2009-11-08 16:52:07 UTC (rev 1115)
@@ -45,7 +45,7 @@
    <!-- ================ -->
 
    <category name="org.infinispan">
-      <priority value="INFO"/>
+      <priority value="TRACE"/>
    </category>
 
    <category name="org.infinispan.profiling">
@@ -66,8 +66,8 @@
 
    <root>
       <priority value="WARN"/>
-      <!--<appender-ref ref="CONSOLE"/>-->
-      <appender-ref ref="FILE"/>
+      <appender-ref ref="CONSOLE"/>
+      <!--<appender-ref ref="FILE"/>-->
    </root>
 
 </log4j:configuration>



More information about the infinispan-commits mailing list