[infinispan-commits] Infinispan SVN: r1115 - in trunk/core/src: main/java/org/infinispan/interceptors and 1 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Sun Nov 8 11:52:07 EST 2009
Author: manik.surtani at jboss.com
Date: 2009-11-08 11:52:07 -0500 (Sun, 08 Nov 2009)
New Revision: 1115
Modified:
trunk/core/src/main/java/org/infinispan/distribution/DistributionManager.java
trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java
trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java
trunk/core/src/test/resources/log4j.xml
Log:
[ISPN-258] (Joiner doesn't see state when rehashing is in progress) fixed
Modified: trunk/core/src/main/java/org/infinispan/distribution/DistributionManager.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/DistributionManager.java 2009-11-08 16:38:31 UTC (rev 1114)
+++ trunk/core/src/main/java/org/infinispan/distribution/DistributionManager.java 2009-11-08 16:52:07 UTC (rev 1115)
@@ -105,6 +105,8 @@
boolean isRehashInProgress();
+ boolean isJoinComplete();
+
void applyReceivedState(Map<Object, InternalCacheValue> state);
}
Modified: trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java 2009-11-08 16:38:31 UTC (rev 1114)
+++ trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java 2009-11-08 16:52:07 UTC (rev 1115)
@@ -357,6 +357,10 @@
}
}
+ public boolean isJoinComplete() {
+ return joinComplete;
+ }
+
void drainTransactionLog() {
List<WriteCommand> c;
while (transactionLogger.size() > 10) {
Modified: trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java 2009-11-08 16:38:31 UTC (rev 1114)
+++ trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java 2009-11-08 16:52:07 UTC (rev 1115)
@@ -86,11 +86,12 @@
@Override
public Object visitGetKeyValueCommand(InvocationContext ctx, GetKeyValueCommand command) throws Throwable {
+ boolean isStillRehashingOnJoin = !dm.isJoinComplete();
Object returnValue = invokeNextInterceptor(ctx, command);
// need to check in the context as well since a null retval is not necessarily an indication of the entry not being
// available. It could just have been removed in the same tx beforehand.
if (!ctx.hasFlag(Flag.SKIP_REMOTE_LOOKUP) && returnValue == null && ctx.lookupEntry(command.getKey()) == null)
- returnValue = remoteGetAndStoreInL1(ctx, command.getKey());
+ returnValue = remoteGetAndStoreInL1(ctx, command.getKey(), isStillRehashingOnJoin);
return returnValue;
}
@@ -106,28 +107,40 @@
* @return value of a remote get, or null
* @throws Throwable if there are problems
*/
- private Object remoteGetAndStoreInL1(InvocationContext ctx, Object key) throws Throwable {
- if (ctx.isOriginLocal() && !dm.isLocal(key) && isNotInL1(key)) {
- if (trace) log.trace("Doing a remote get for key {0}", key);
- // attempt a remote lookup
- InternalCacheEntry ice = dm.retrieveFromRemoteSource(key);
+ private Object remoteGetAndStoreInL1(InvocationContext ctx, Object key, boolean dmWasRehashingDuringLocalLookup) throws Throwable {
+ boolean isMappedToLocalNode = false;
+ if (ctx.isOriginLocal() && !(isMappedToLocalNode = dm.isLocal(key)) && isNotInL1(key)) {
+ return realRemoteGetAndStoreInL1(ctx, key);
+ } else {
+ // maybe we are still rehashing as a joiner? ISPN-258
+ if (isMappedToLocalNode && dmWasRehashingDuringLocalLookup) {
+ if (trace) log.trace("Key is mapped to local node, but a rehash is in progress so may need to look elsewhere");
+ // try a remote lookup all the same
+ return realRemoteGetAndStoreInL1(ctx, key);
+ } else {
+ if (trace)
+ log.trace("Not doing a remote get for key {0} since entry is mapped to current node, or is in L1", key);
+ }
+ }
+ return null;
+ }
- if (ice != null) {
- if (isL1CacheEnabled) {
- if (trace) log.trace("Caching remotely retrieved entry for key {0} in L1", key);
- long lifespan = ice.getLifespan() < 0 ? configuration.getL1Lifespan() : Math.min(ice.getLifespan(), configuration.getL1Lifespan());
- PutKeyValueCommand put = cf.buildPutKeyValueCommand(ice.getKey(), ice.getValue(), lifespan, -1);
- entryFactory.wrapEntryForWriting(ctx, key, true, false, ctx.hasLockedKey(key), false);
- invokeNextInterceptor(ctx, put);
- } else {
- if (trace) log.trace("Not caching remotely retrieved entry for key {0} in L1", key);
- }
- return ice.getValue();
+ private Object realRemoteGetAndStoreInL1(InvocationContext ctx, Object key) throws Throwable {
+ if (trace) log.trace("Doing a remote get for key {0}", key);
+ // attempt a remote lookup
+ InternalCacheEntry ice = dm.retrieveFromRemoteSource(key);
+
+ if (ice != null) {
+ if (isL1CacheEnabled) {
+ if (trace) log.trace("Caching remotely retrieved entry for key {0} in L1", key);
+ long lifespan = ice.getLifespan() < 0 ? configuration.getL1Lifespan() : Math.min(ice.getLifespan(), configuration.getL1Lifespan());
+ PutKeyValueCommand put = cf.buildPutKeyValueCommand(ice.getKey(), ice.getValue(), lifespan, -1);
+ entryFactory.wrapEntryForWriting(ctx, key, true, false, ctx.hasLockedKey(key), false);
+ invokeNextInterceptor(ctx, put);
+ } else {
+ if (trace) log.trace("Not caching remotely retrieved entry for key {0} in L1", key);
}
-
- } else {
- if (trace)
- log.trace("Not doing a remote get for key {0} since entry is mapped to current node, or is in L1", key);
+ return ice.getValue();
}
return null;
}
@@ -225,9 +238,9 @@
// this should only happen if:
// a) unsafeUnreliableReturnValues is false
// b) unsafeUnreliableReturnValues is true, we are in a TX and the command is conditional
-
+ boolean isStillRehashingOnJoin = !dm.isJoinComplete();
if (isNeedReliableReturnValues(ctx) || (isConditionalCommand && ctx.isInTxScope())) {
- for (Object k : keygen.getKeys()) remoteGetAndStoreInL1(ctx, k);
+ for (Object k : keygen.getKeys()) remoteGetAndStoreInL1(ctx, k, isStillRehashingOnJoin);
}
}
Modified: trunk/core/src/test/resources/log4j.xml
===================================================================
--- trunk/core/src/test/resources/log4j.xml 2009-11-08 16:38:31 UTC (rev 1114)
+++ trunk/core/src/test/resources/log4j.xml 2009-11-08 16:52:07 UTC (rev 1115)
@@ -45,7 +45,7 @@
<!-- ================ -->
<category name="org.infinispan">
- <priority value="INFO"/>
+ <priority value="TRACE"/>
</category>
<category name="org.infinispan.profiling">
@@ -66,8 +66,8 @@
<root>
<priority value="WARN"/>
- <!--<appender-ref ref="CONSOLE"/>-->
- <appender-ref ref="FILE"/>
+ <appender-ref ref="CONSOLE"/>
+ <!--<appender-ref ref="FILE"/>-->
</root>
</log4j:configuration>
More information about the infinispan-commits
mailing list