[infinispan-commits] Infinispan SVN: r1002 - trunk/lucene-directory/src/main/java/org/infinispan/lucene.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Sun Oct 25 17:45:06 EDT 2009
Author: sannegrinovero
Date: 2009-10-25 17:45:06 -0400 (Sun, 25 Oct 2009)
New Revision: 1002
Modified:
trunk/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanLockFactory.java
Log:
reverting changeset 990 from lucene-directory only: was a core-only change, should not have affected lucene-directory
Modified: trunk/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanLockFactory.java
===================================================================
--- trunk/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanLockFactory.java 2009-10-25 13:26:07 UTC (rev 1001)
+++ trunk/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanLockFactory.java 2009-10-25 21:45:06 UTC (rev 1002)
@@ -21,19 +21,21 @@
*/
package org.infinispan.lucene;
-import org.apache.lucene.index.IndexWriter;
+import java.io.IOException;
+import javax.transaction.RollbackException;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+
+import org.apache.lucene.store.Lock;
import org.apache.lucene.store.LockFactory;
import org.infinispan.Cache;
+import org.infinispan.CacheException;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
-import javax.transaction.TransactionManager;
-import java.io.IOException;
-
/**
* Factory for locks obtained in <code>InfinispanDirectory</code>
*
- * @author Sanne Grinovero
* @since 4.0
* @author Lukasz Moren
* @see org.infinispan.lucene.InfinispanDirectory
@@ -42,60 +44,183 @@
public class InfinispanLockFactory extends LockFactory {
private static final Log log = LogFactory.getLog(InfinispanLockFactory.class);
- static final String DEF_LOCK_NAME = IndexWriter.WRITE_LOCK_NAME;
- private final Cache<CacheKey, Object> cache;
- private final String indexName;
- private final TransactionManager tm;
- private final InfinispanLock defLock;
+ private Cache<CacheKey, Object> cache;
+ private String indexName;
public InfinispanLockFactory(Cache<CacheKey, Object> cache, String indexName) {
this.cache = cache;
this.indexName = indexName;
-// tm = cache.getAdvancedCache().getComponentRegistry().getComponent(TransactionManager.class);
-// if (tm == null) {
-// throw new CacheException(
-// "Failed looking up TransactionManager, check if any transaction manager is associated with Infinispan cache: "
-// + cache.getName());
-// }
- tm = null;
- defLock = new InfinispanLock(cache, indexName, DEF_LOCK_NAME, tm);
}
/**
* {@inheritDoc}
*/
- public InfinispanLock makeLock(String lockName) {
- InfinispanLock lock;
- //It appears Lucene always uses the same name so we give locks
- //having this name a special treatment:
- if (DEF_LOCK_NAME.equals(lockName)) {
- lock = defLock;
+ public Lock makeLock(String lockName) {
+ try {
+ return new InfinispanLock(cache, indexName, lockName);
+ } finally {
+ if (log.isTraceEnabled()) {
+ log.trace("Created new lock: {0} for index {1}", lockName, indexName);
+ }
}
- else {
- // this branch is never taken with current Lucene version.
- lock = new InfinispanLock(cache, indexName, lockName, tm);
- }
- if (log.isTraceEnabled()) {
- log.trace("Lock prepared, not acquired: {0} for index {1}", lockName, indexName);
- }
- return lock;
}
/**
* {@inheritDoc}
*/
public void clearLock(String lockName) throws IOException {
- //Same special care as above for locks named DEF_LOCK_NAME:
- if (DEF_LOCK_NAME.equals(lockName)) {
- defLock.clearLock();
+ try {
+ cache.remove(new FileCacheKey(indexName, lockName, true));
+ } finally {
+ if (log.isTraceEnabled()) {
+ log.trace("Removed lock: {0} for index {1}", lockName, indexName);
+ }
}
- else {
- new InfinispanLock(cache, indexName, lockName, tm).clearLock();
+ }
+
+ /**
+ * Interprocess Lucene index lock
+ *
+ * @see org.apache.lucene.store.Directory#makeLock(String)
+ */
+ public static class InfinispanLock extends Lock {
+
+ private static final Log log = LogFactory.getLog(InfinispanLock.class);
+
+ private final Cache<CacheKey, Object> cache;
+ private String lockName;
+ private String indexName;
+
+ final TransactionManager tm;
+
+ InfinispanLock(Cache<CacheKey, Object> cache, String indexName, String lockName) {
+ this.cache = cache;
+ this.lockName = lockName;
+ this.indexName = indexName;
+
+ tm = cache.getAdvancedCache().getComponentRegistry().getComponent(TransactionManager.class);
+ if (tm == null) {
+ throw new CacheException(
+ "Failed looking up TransactionManager, check if any transaction manager is associated with Infinispan cache: "
+ + cache.getName());
+ }
}
- if (log.isTraceEnabled()) {
- log.trace("Removed lock: {0} for index {1}", lockName, indexName);
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean obtain() throws IOException {
+ boolean acquired = false;
+
+ synchronized (cache) {
+ try {
+ // begin transaction for lock obtaining
+ tm.begin();
+ CacheKey lock = new FileCacheKey(indexName, lockName, true);
+ if (!cache.containsKey(lock)) {
+ cache.put(lock, lock);
+ acquired = true;
+ }
+ } catch (Exception e) {
+ log.error("Cannot obtain lock for: " + indexName, e);
+ } finally {
+ try {
+ if (tm.getTransaction() != null) {
+ if (acquired) {
+ tm.commit();
+ if (log.isTraceEnabled()) {
+ log.trace("Lock: {0} acquired for index: {1} ", lockName, indexName);
+ }
+ } else {
+ tm.rollback();
+ }
+ }
+ } catch (RollbackException e) {
+ log.error("Cannot obtain lock for: " + indexName, e);
+ acquired = false;
+ } catch (Exception e) {
+ throw new CacheException(e);
+ }
+ }
+
+ if (acquired) {
+ try {
+ // begin new transaction to batch all changes, tx commited when lock is released.
+ tm.begin();
+ if (log.isTraceEnabled()) {
+ log.trace("Batch transaction started for index: {0}", indexName);
+ }
+ } catch (Exception e) {
+ log.error("Unable to start transaction", e);
+ }
+ }
+ }
+
+ return acquired;
}
+
+ /**
+ * {@inheritDoc}
+ */
+ public void release() throws IOException {
+ boolean removed = false;
+ synchronized (cache) {
+ try {
+ // commit changes in batch, transaction was started when lock was acquired
+ tm.commit();
+ if (log.isTraceEnabled()) {
+ log.trace("Batch transaction commited for index: {0}", indexName);
+ }
+
+ tm.begin();
+ removed = cache.remove(new FileCacheKey(indexName, lockName, true)) != null;
+ } catch (Exception e) {
+ throw new CacheException("Unable to commit work done or release lock!", e);
+ } finally {
+ try {
+ if (removed) {
+ tm.commit();
+ if (log.isTraceEnabled()) {
+ log.trace("Lock: {0} removed for index: {1} ", lockName, indexName);
+ }
+ } else {
+ tm.rollback();
+ }
+ } catch (Exception e) {
+ throw new CacheException("Unable to release lock!", e);
+ }
+ }
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean isLocked() {
+ boolean locked = false;
+ synchronized (cache) {
+ Transaction tx = null;
+ try {
+ // if there is an ongoing transaction we need to suspend it
+ if ((tx = tm.getTransaction()) != null) {
+ tm.suspend();
+ }
+ locked = cache.containsKey(new FileCacheKey(indexName, lockName, true));
+ } catch (Exception e) {
+ log.error("Error in suspending transaction", e);
+ } finally {
+ if (tx != null) {
+ try {
+ tm.resume(tx);
+ } catch (Exception e) {
+ throw new CacheException("Unable to resume suspended transaction " + tx, e);
+ }
+ }
+ }
+ }
+ return locked;
+ }
}
-
+
}
More information about the infinispan-commits
mailing list