[infinispan-commits] Infinispan SVN: r2183 - branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Mon Aug 9 17:06:13 EDT 2010


Author: sannegrinovero
Date: 2010-08-09 17:06:12 -0400 (Mon, 09 Aug 2010)
New Revision: 2183

Modified:
   branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanIndexInput.java
Log:
[ISPN-583] (Lucene readlocks have to avoid being persisted in a store) branch 4.1

Modified: branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanIndexInput.java
===================================================================
--- branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanIndexInput.java	2010-08-09 21:03:44 UTC (rev 2182)
+++ branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanIndexInput.java	2010-08-09 21:06:12 UTC (rev 2183)
@@ -88,13 +88,20 @@
       int newValue = 0;
       // spinning as we currently don't mandate transactions, so no proper lock support available
       boolean done = false;
+      Object lockValue = cache.withFlags(Flag.SKIP_LOCKING, Flag.SKIP_CACHE_STORE).get(readLockKey);
       while (done == false) {
-         Object lockValue = cache.get(readLockKey);
-         if (lockValue == null)
-            return; // no special locking for some core files
-         int refCount = (Integer) lockValue;
-         newValue = refCount - 1;
-         done = cache.replace(readLockKey, refCount, newValue);
+         if (lockValue == null) {
+            lockValue = cache.withFlags(Flag.SKIP_CACHE_STORE).putIfAbsent(readLockKey, Integer.valueOf(0));
+            done = (null == lockValue);
+         }
+         else {
+            int refCount = (Integer) lockValue;
+            newValue = refCount - 1;
+            done = cache.withFlags(Flag.SKIP_CACHE_STORE).replace(readLockKey, refCount, newValue);
+            if (!done) {
+               lockValue = cache.withFlags(Flag.SKIP_LOCKING, Flag.SKIP_CACHE_STORE).get(readLockKey);
+            }
+         }
       }
       if (newValue == 0) {
          realFileDelete(readLockKey, cache);
@@ -121,9 +128,9 @@
          removed = cache.withFlags(Flag.SKIP_LOCKING).remove(chunkKey);
          chunkKey = new ChunkCacheKey(indexName, filename, ++i);
       } while (removed != null);
+      FileCacheKey key = new FileCacheKey(indexName, filename);
       cache.startBatch();
-      cache.withFlags(Flag.SKIP_REMOTE_LOOKUP, Flag.SKIP_LOCKING).remove(readLockKey);
-      FileCacheKey key = new FileCacheKey(indexName, filename);
+      cache.withFlags(Flag.SKIP_REMOTE_LOOKUP, Flag.SKIP_LOCKING, Flag.SKIP_CACHE_STORE).remove(readLockKey);
       cache.withFlags(Flag.SKIP_REMOTE_LOOKUP, Flag.SKIP_LOCKING).remove(key);
       cache.endBatch(true);
    }
@@ -137,18 +144,26 @@
    private void aquireReadLock() throws FileNotFoundException {
       // spinning as we currently don't mandate transactions, so no proper lock support is available
       boolean done = false;
+      Object lockValue = cache.withFlags(Flag.SKIP_LOCKING, Flag.SKIP_CACHE_STORE).get(readLockKey);
       while (done == false) {
-         Object lockValue = cache.withFlags(Flag.SKIP_LOCKING).get(readLockKey);
-         if (lockValue == null)
-            return; // no special locking for some core files
-         int refCount = (Integer) lockValue;
-         if (refCount == 0) {
-            // too late: in case refCount==0 the delete process already triggered chunk deletion.
-            // safest reaction is to tell this file doesn't exist anymore.
-            throw new FileNotFoundException("segment file was deleted");
+         if (lockValue != null) {
+            int refCount = (Integer) lockValue;
+            if (refCount == 0) {
+               // too late: in case refCount==0 the delete process already triggered chunk deletion.
+               // safest reaction is to tell this file doesn't exist anymore.
+               throw new FileNotFoundException("segment file was deleted");
+            }
+            Integer newValue = Integer.valueOf(refCount + 1);
+            done = cache.withFlags(Flag.SKIP_CACHE_STORE).replace(readLockKey, refCount, newValue);
+            if (!done) {
+               lockValue = cache.withFlags(Flag.SKIP_LOCKING, Flag.SKIP_CACHE_STORE).get(readLockKey);
+            }
+         } else {
+            // readLocks are not stored, so if there's no value assume it's ==1, which means
+            // existing file, not deleted, nobody else owning a read lock
+            lockValue = cache.withFlags(Flag.SKIP_CACHE_STORE).putIfAbsent(readLockKey, Integer.valueOf(2));
+            done = (null == lockValue);
          }
-         Integer newValue = Integer.valueOf(refCount + 1);
-         done = cache.replace(readLockKey, refCount, newValue);
       }
    }
 



More information about the infinispan-commits mailing list