[infinispan-commits] Infinispan SVN: r1002 - trunk/lucene-directory/src/main/java/org/infinispan/lucene.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Sun Oct 25 17:45:06 EDT 2009


Author: sannegrinovero
Date: 2009-10-25 17:45:06 -0400 (Sun, 25 Oct 2009)
New Revision: 1002

Modified:
   trunk/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanLockFactory.java
Log:
reverting changeset 990 from lucene-directory only: was a core-only change, should not have affected lucene-directory

Modified: trunk/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanLockFactory.java
===================================================================
--- trunk/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanLockFactory.java	2009-10-25 13:26:07 UTC (rev 1001)
+++ trunk/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanLockFactory.java	2009-10-25 21:45:06 UTC (rev 1002)
@@ -21,19 +21,21 @@
  */
 package org.infinispan.lucene;
 
-import org.apache.lucene.index.IndexWriter;
+import java.io.IOException;
+import javax.transaction.RollbackException;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+
+import org.apache.lucene.store.Lock;
 import org.apache.lucene.store.LockFactory;
 import org.infinispan.Cache;
+import org.infinispan.CacheException;
 import org.infinispan.util.logging.Log;
 import org.infinispan.util.logging.LogFactory;
 
-import javax.transaction.TransactionManager;
-import java.io.IOException;
-
 /**
  * Factory for locks obtained in <code>InfinispanDirectory</code>
  * 
- * @author Sanne Grinovero
  * @since 4.0
  * @author Lukasz Moren
  * @see org.infinispan.lucene.InfinispanDirectory
@@ -42,60 +44,183 @@
 public class InfinispanLockFactory extends LockFactory {
 
    private static final Log log = LogFactory.getLog(InfinispanLockFactory.class);
-   static final String DEF_LOCK_NAME = IndexWriter.WRITE_LOCK_NAME;
 
-   private final Cache<CacheKey, Object> cache;
-   private final String indexName;
-   private final TransactionManager tm;
-   private final InfinispanLock defLock;
+   private Cache<CacheKey, Object> cache;
+   private String indexName;
 
    public InfinispanLockFactory(Cache<CacheKey, Object> cache, String indexName) {
       this.cache = cache;
       this.indexName = indexName;
-//      tm = cache.getAdvancedCache().getComponentRegistry().getComponent(TransactionManager.class);
-//      if (tm == null) {
-//         throw new CacheException(
-//                  "Failed looking up TransactionManager, check if any transaction manager is associated with Infinispan cache: "
-//                           + cache.getName());
-//      }
-      tm = null;
-      defLock = new InfinispanLock(cache, indexName, DEF_LOCK_NAME, tm);
    }
 
    /**
     * {@inheritDoc}
     */
-   public InfinispanLock makeLock(String lockName) {
-      InfinispanLock lock;
-      //It appears Lucene always uses the same name so we give locks
-      //having this name a special treatment:
-      if (DEF_LOCK_NAME.equals(lockName)) {
-         lock = defLock;
+   public Lock makeLock(String lockName) {
+      try {
+         return new InfinispanLock(cache, indexName, lockName);
+      } finally {
+         if (log.isTraceEnabled()) {
+            log.trace("Created new lock: {0} for index {1}", lockName, indexName);
+         }
       }
-      else {
-         // this branch is never taken with current Lucene version.
-         lock = new InfinispanLock(cache, indexName, lockName, tm);
-      }
-      if (log.isTraceEnabled()) {
-         log.trace("Lock prepared, not acquired: {0} for index {1}", lockName, indexName);
-      }
-      return lock;
    }
 
    /**
     * {@inheritDoc}
     */
    public void clearLock(String lockName) throws IOException {
-      //Same special care as above for locks named DEF_LOCK_NAME:
-      if (DEF_LOCK_NAME.equals(lockName)) {
-         defLock.clearLock();
+      try {
+         cache.remove(new FileCacheKey(indexName, lockName, true));
+      } finally {
+         if (log.isTraceEnabled()) {
+            log.trace("Removed lock: {0} for index {1}", lockName, indexName);
+         }
       }
-      else {
-         new InfinispanLock(cache, indexName, lockName, tm).clearLock();
+   }
+
+   /**
+    * Interprocess Lucene index lock
+    * 
+    * @see org.apache.lucene.store.Directory#makeLock(String)
+    */
+   public static class InfinispanLock extends Lock {
+
+      private static final Log log = LogFactory.getLog(InfinispanLock.class);
+
+      private final Cache<CacheKey, Object> cache;
+      private String lockName;
+      private String indexName;
+
+      final TransactionManager tm;
+
+      InfinispanLock(Cache<CacheKey, Object> cache, String indexName, String lockName) {
+         this.cache = cache;
+         this.lockName = lockName;
+         this.indexName = indexName;
+
+         tm = cache.getAdvancedCache().getComponentRegistry().getComponent(TransactionManager.class);
+         if (tm == null) {
+            throw new CacheException(
+                     "Failed looking up TransactionManager, check if any transaction manager is associated with Infinispan cache: "
+                              + cache.getName());
+         }
       }
-      if (log.isTraceEnabled()) {
-         log.trace("Removed lock: {0} for index {1}", lockName, indexName);
+
+      /**
+       * {@inheritDoc}
+       */
+      public boolean obtain() throws IOException {
+         boolean acquired = false;
+
+         synchronized (cache) {
+            try {
+               // begin transaction for lock obtaining
+               tm.begin();
+               CacheKey lock = new FileCacheKey(indexName, lockName, true);
+               if (!cache.containsKey(lock)) {
+                  cache.put(lock, lock);
+                  acquired = true;
+               }
+            } catch (Exception e) {
+               log.error("Cannot obtain lock for: " + indexName, e);
+            } finally {
+               try {
+                  if (tm.getTransaction() != null) {
+                     if (acquired) {
+                        tm.commit();
+                        if (log.isTraceEnabled()) {
+                           log.trace("Lock: {0} acquired for index: {1} ", lockName, indexName);
+                        }
+                     } else {
+                        tm.rollback();
+                     }
+                  }
+               } catch (RollbackException e) {
+                  log.error("Cannot obtain lock for: " + indexName, e);
+                  acquired = false;
+               } catch (Exception e) {
+                  throw new CacheException(e);
+               }
+            }
+
+            if (acquired) {
+               try {
+                  // begin new transaction to batch all changes, tx commited when lock is released.
+                  tm.begin();
+                  if (log.isTraceEnabled()) {
+                     log.trace("Batch transaction started for index: {0}", indexName);
+                  }
+               } catch (Exception e) {
+                  log.error("Unable to start transaction", e);
+               }
+            }
+         }
+
+         return acquired;
       }
+
+      /**
+       * {@inheritDoc}
+       */
+      public void release() throws IOException {
+         boolean removed = false;
+         synchronized (cache) {
+            try {
+               // commit changes in batch, transaction was started when lock was acquired
+               tm.commit();
+               if (log.isTraceEnabled()) {
+                  log.trace("Batch transaction commited for index: {0}", indexName);
+               }
+
+               tm.begin();
+               removed = cache.remove(new FileCacheKey(indexName, lockName, true)) != null;
+            } catch (Exception e) {
+               throw new CacheException("Unable to commit work done or release lock!", e);
+            } finally {
+               try {
+                  if (removed) {
+                     tm.commit();
+                     if (log.isTraceEnabled()) {
+                        log.trace("Lock: {0} removed for index: {1} ", lockName, indexName);
+                     }
+                  } else {
+                     tm.rollback();
+                  }
+               } catch (Exception e) {
+                  throw new CacheException("Unable to release lock!", e);
+               }
+            }
+         }
+      }
+
+      /**
+       * {@inheritDoc}
+       */
+      public boolean isLocked() {
+         boolean locked = false;
+         synchronized (cache) {
+            Transaction tx = null;
+            try {
+               // if there is an ongoing transaction we need to suspend it
+               if ((tx = tm.getTransaction()) != null) {
+                  tm.suspend();
+               }
+               locked = cache.containsKey(new FileCacheKey(indexName, lockName, true));
+            } catch (Exception e) {
+               log.error("Error in suspending transaction", e);
+            } finally {
+               if (tx != null) {
+                  try {
+                     tm.resume(tx);
+                  } catch (Exception e) {
+                     throw new CacheException("Unable to resume suspended transaction " + tx, e);
+                  }
+               }
+            }
+         }
+         return locked;
+      }
    }
-   
+
 }



More information about the infinispan-commits mailing list