[exo-jcr-commits] exo-jcr SVN: r3263 - jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene.
do-not-reply at jboss.org
do-not-reply at jboss.org
Fri Oct 8 08:39:12 EDT 2010
Author: nzamosenchuk
Date: 2010-10-08 08:39:11 -0400 (Fri, 08 Oct 2010)
New Revision: 3263
Removed:
jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/IndexMerger.java
Modified:
jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/MultiIndex.java
jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/RedoLog.java
Log:
EXOJCR-987 Getting rid of Merger and actions inside MultiIndex
Deleted: jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/IndexMerger.java
===================================================================
--- jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/IndexMerger.java 2010-10-08 12:04:24 UTC (rev 3262)
+++ jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/IndexMerger.java 2010-10-08 12:39:11 UTC (rev 3263)
@@ -1,572 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.exoplatform.services.jcr.impl.core.query.lucene;
-
-import EDU.oswego.cs.dl.util.concurrent.Mutex;
-import EDU.oswego.cs.dl.util.concurrent.Sync;
-
-import org.apache.commons.collections.Buffer;
-import org.apache.commons.collections.BufferUtils;
-import org.apache.commons.collections.buffer.UnboundedFifoBuffer;
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.index.Term;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-
-/**
- * Merges indexes in a separate daemon thread.
- */
-class IndexMerger extends Thread implements IndexListener
-{
-
- /**
- * Logger instance for this class.
- */
- private static final Logger log = LoggerFactory.getLogger("exo.jcr.component.core.IndexMerger");
-
- /**
- * Marker task to signal the background thread to quit.
- */
- private static final Merge QUIT = new Merge(new Index[0]);
-
- /**
- * minMergeDocs config parameter.
- */
- private int minMergeDocs = SearchIndex.DEFAULT_MIN_MERGE_DOCS;
-
- /**
- * maxMergeDocs config parameter
- */
- private int maxMergeDocs = SearchIndex.DEFAULT_MAX_MERGE_DOCS;
-
- /**
- * mergeFactor config parameter
- */
- private int mergeFactor = SearchIndex.DEFAULT_MERGE_FACTOR;
-
- /**
- * Queue of merge Tasks
- */
- private final Buffer mergeTasks = BufferUtils.blockingBuffer(new UnboundedFifoBuffer());
-
- /**
- * List of id <code>Term</code> that identify documents that were deleted
- * while a merge was running.
- */
- private final List deletedDocuments = Collections.synchronizedList(new ArrayList());
-
- /**
- * List of <code>IndexBucket</code>s in ascending document limit.
- */
- private final List indexBuckets = new ArrayList();
-
- /**
- * The <code>MultiIndex</code> this index merger is working on.
- */
- private final MultiIndex multiIndex;
-
- /**
- * Monitor object to synchronize merge calculation.
- */
- private final Object lock = new Object();
-
- /**
- * Mutex that is acquired when replacing indexes on MultiIndex.
- */
- private final Sync indexReplacement = new Mutex();
-
- /**
- * When released, indicates that this index merger is idle.
- */
- private final Sync mergerIdle = new Mutex();
-
- /**
- * Creates an <code>IndexMerger</code>.
- *
- * @param multiIndex the <code>MultiIndex</code>.
- */
- IndexMerger(MultiIndex multiIndex)
- {
- this.multiIndex = multiIndex;
- setName("IndexMerger");
- setDaemon(true);
- try
- {
- mergerIdle.acquire();
- }
- catch (InterruptedException e)
- {
- // will never happen, lock is free upon construction
- throw new InternalError("Unable to acquire mutex after construction");
- }
- }
-
- /**
- * Informs the index merger that an index was added / created.
- *
- * @param name the name of the index.
- * @param numDocs the number of documents it contains.
- */
- void indexAdded(String name, int numDocs)
- {
- if (numDocs < 0)
- {
- throw new IllegalArgumentException("numDocs must be positive");
- }
- // multiple threads may enter this method:
- // - the background thread of this IndexMerger, when it replaces indexes
- // after a successful merge
- // - a regular thread that updates the workspace
- //
- // therefore we have to synchronize this block
- synchronized (lock)
- {
- // initially create buckets
- if (indexBuckets.size() == 0)
- {
- long lower = 0;
- long upper = minMergeDocs;
- while (upper < maxMergeDocs)
- {
- indexBuckets.add(new IndexBucket(lower, upper, true));
- lower = upper + 1;
- upper *= mergeFactor;
- }
- // one with upper = maxMergeDocs
- indexBuckets.add(new IndexBucket(lower, maxMergeDocs, false));
- // and another one as overflow, just in case...
- indexBuckets.add(new IndexBucket(maxMergeDocs + 1, Long.MAX_VALUE, false));
- }
-
- // put index in bucket
- IndexBucket bucket = (IndexBucket)indexBuckets.get(indexBuckets.size() - 1);
- for (int i = 0; i < indexBuckets.size(); i++)
- {
- bucket = (IndexBucket)indexBuckets.get(i);
- if (bucket.fits(numDocs))
- {
- break;
- }
- }
- bucket.add(new Index(name, numDocs));
-
- if (log.isDebugEnabled())
- {
- log.debug("index added: name=" + name + ", numDocs=" + numDocs);
- }
-
- // if bucket does not allow merge, we don't have to continue
- if (!bucket.allowsMerge())
- {
- return;
- }
-
- // check if we need a merge
- if (bucket.size() >= mergeFactor)
- {
- long targetMergeDocs = bucket.upper;
- targetMergeDocs = Math.min(targetMergeDocs * mergeFactor, maxMergeDocs);
- // sum up docs in bucket
- List indexesToMerge = new ArrayList();
- int mergeDocs = 0;
- for (Iterator it = bucket.iterator(); it.hasNext() && mergeDocs <= targetMergeDocs;)
- {
- indexesToMerge.add(it.next());
- }
- if (indexesToMerge.size() > 2)
- {
- // found merge
- Index[] idxs = (Index[])indexesToMerge.toArray(new Index[indexesToMerge.size()]);
- bucket.removeAll(indexesToMerge);
- if (log.isDebugEnabled())
- {
- log.debug("requesting merge for " + indexesToMerge);
- }
- mergeTasks.add(new Merge(idxs));
- log.debug("merge queue now contains " + mergeTasks.size() + " tasks.");
- }
- }
- }
- }
-
- /**
- * @inheritDoc
- */
- public void documentDeleted(Term id)
- {
- log.debug("document deleted: " + id.text());
- deletedDocuments.add(id);
- }
-
- /**
- * When the calling thread returns this index merger will be idle, that is
- * there will be no merge tasks pending anymore. The method returns immediately
- * if there are currently no tasks pending at all.
- */
- void waitUntilIdle() throws InterruptedException
- {
- mergerIdle.acquire();
- // and immediately release again
- mergerIdle.release();
- }
-
- /**
- * Signals this <code>IndexMerger</code> to stop and waits until it
- * has terminated.
- */
- void dispose()
- {
- log.debug("dispose IndexMerger");
- // get mutex for index replacements
- try
- {
- indexReplacement.acquire();
- }
- catch (InterruptedException e)
- {
- log.warn("Interrupted while acquiring index replacement sync: " + e);
- // try to stop IndexMerger without the sync
- }
-
- // clear task queue
- mergeTasks.clear();
-
- // send quit
- mergeTasks.add(QUIT);
- log.debug("quit sent");
-
- try
- {
- // give the merger thread some time to quit,
- // it is possible that the merger is busy working on a large index.
- // if that is the case we will just ignore it and the daemon will
- // die without being able to finish the merge.
- // at this point it is not possible anymore to replace indexes
- // on the MultiIndex because we hold the indexReplacement Sync.
- this.join(500);
- if (isAlive())
- {
- log.info("Unable to stop IndexMerger. Daemon is busy.");
- }
- else
- {
- log.debug("IndexMerger thread stopped");
- }
- log.debug("merge queue size: " + mergeTasks.size());
- }
- catch (InterruptedException e)
- {
- log.warn("Interrupted while waiting for IndexMerger thread to terminate.");
- }
- }
-
- /**
- * Implements the index merging.
- */
- public void run()
- {
- for (;;)
- {
- boolean isIdle = false;
- if (mergeTasks.size() == 0)
- {
- mergerIdle.release();
- isIdle = true;
- }
- Merge task = (Merge)mergeTasks.remove();
- if (task == QUIT)
- {
- mergerIdle.release();
- break;
- }
- if (isIdle)
- {
- try
- {
- mergerIdle.acquire();
- }
- catch (InterruptedException e)
- {
- Thread.interrupted();
- log.warn("Unable to acquire mergerIdle sync");
- }
- }
-
- log.debug("accepted merge request");
-
- // reset deleted documents
- deletedDocuments.clear();
-
- // get readers
- String[] names = new String[task.indexes.length];
- for (int i = 0; i < task.indexes.length; i++)
- {
- names[i] = task.indexes[i].name;
- }
- try
- {
- log.debug("create new index");
- PersistentIndex index = multiIndex.getOrCreateIndex(null);
- boolean success = false;
- try
- {
-
- log.debug("get index readers from MultiIndex");
- IndexReader[] readers = multiIndex.getIndexReaders(names, this);
- try
- {
- // do the merge
- long time = System.currentTimeMillis();
- index.addIndexes(readers);
- time = System.currentTimeMillis() - time;
- int docCount = 0;
- for (int i = 0; i < readers.length; i++)
- {
- docCount += readers[i].numDocs();
- }
- log.info("merged " + docCount + " documents in " + time + " ms into " + index.getName() + ".");
- }
- finally
- {
- for (int i = 0; i < readers.length; i++)
- {
- try
- {
- Util.closeOrRelease(readers[i]);
- }
- catch (IOException e)
- {
- log.warn("Unable to close IndexReader: " + e);
- }
- }
- }
-
- // inform multi index
- // if we cannot get the sync immediately we have to quit
- if (!indexReplacement.attempt(0))
- {
- log.debug("index merging canceled");
- break;
- }
- try
- {
- log.debug("replace indexes");
- multiIndex.replaceIndexes(names, index, deletedDocuments);
- }
- finally
- {
- indexReplacement.release();
- }
-
- success = true;
-
- }
- finally
- {
- if (!success)
- {
- // delete index
- log.debug("deleting index " + index.getName());
- multiIndex.deleteIndex(index);
- }
- }
- }
- catch (Throwable e)
- {
- log.error("Error while merging indexes: ", e);
- }
- }
- log.info("IndexMerger terminated");
- }
-
- //-----------------------< merge properties >-------------------------------
-
- /**
- * The merge factor.
- */
- public void setMergeFactor(int mergeFactor)
- {
- this.mergeFactor = mergeFactor;
- }
-
- /**
- * The initial threshold for number of documents to merge to a new index.
- */
- public void setMinMergeDocs(int minMergeDocs)
- {
- this.minMergeDocs = minMergeDocs;
- }
-
- /**
- * The maximum number of document to merge.
- */
- public void setMaxMergeDocs(int maxMergeDocs)
- {
- this.maxMergeDocs = maxMergeDocs;
- }
-
- //------------------------------< internal >--------------------------------
-
- /**
- * Implements a simple struct that holds the name of an index and how
- * many document it contains. <code>Index</code> is comparable using the
- * number of documents it contains.
- */
- private static final class Index implements Comparable
- {
-
- /**
- * The name of the index.
- */
- private final String name;
-
- /**
- * The number of documents the index contains.
- */
- private final int numDocs;
-
- /**
- * Creates a new index struct.
- *
- * @param name name of an index.
- * @param numDocs number of documents it contains.
- */
- Index(String name, int numDocs)
- {
- this.name = name;
- this.numDocs = numDocs;
- }
-
- /**
- * Indexes are first ordered by {@link #numDocs} and then by {@link
- * #name}.
- *
- * @param o the other <code>Index</code>.
- * @return a negative integer, zero, or a positive integer as this
- * Index is less than, equal to, or greater than the specified
- * Index.
- */
- public int compareTo(Object o)
- {
- Index other = (Index)o;
- int val = numDocs < other.numDocs ? -1 : (numDocs == other.numDocs ? 0 : 1);
- if (val != 0)
- {
- return val;
- }
- else
- {
- return name.compareTo(other.name);
- }
- }
-
- /**
- * @inheritDoc
- */
- public String toString()
- {
- return name + ":" + numDocs;
- }
- }
-
- /**
- * Defines a merge task, to merge a couple of indexes into a new index.
- */
- private static final class Merge
- {
-
- private final Index[] indexes;
-
- /**
- * Merge task, to merge <code>indexes</code> into a new index with
- * <code>name</code>.
- *
- * @param indexes the indexes to merge.
- */
- Merge(Index[] indexes)
- {
- this.indexes = new Index[indexes.length];
- System.arraycopy(indexes, 0, this.indexes, 0, indexes.length);
- }
- }
-
- /**
- * Implements a <code>List</code> with a document limit value. An
- * <code>IndexBucket</code> contains {@link Index}es with documents less
- * or equal the document limit of the bucket.
- */
- private static final class IndexBucket extends ArrayList
- {
-
- /**
- * The lower document limit.
- */
- private final long lower;
-
- /**
- * The upper document limit.
- */
- private final long upper;
-
- /**
- * Flag indicating if indexes in this bucket can be merged.
- */
- private final boolean allowMerge;
-
- /**
- * Creates a new <code>IndexBucket</code>. Limits are both inclusive.
- *
- * @param lower document limit.
- * @param upper document limit.
- * @param allowMerge if indexes in this bucket can be merged.
- */
- IndexBucket(long lower, long upper, boolean allowMerge)
- {
- this.lower = lower;
- this.upper = upper;
- this.allowMerge = allowMerge;
- }
-
- /**
- * Returns <code>true</code> if the number of documents fit in this
- * <code>IndexBucket</code>; otherwise <code>false</code>
- *
- * @param numDocs the number of documents.
- * @return <code>true</code> if <code>numDocs</code> fit.
- */
- boolean fits(long numDocs)
- {
- return numDocs >= lower && numDocs <= upper;
- }
-
- /**
- * Returns <code>true</code> if indexes in this bucket can be merged.
- *
- * @return <code>true</code> if indexes in this bucket can be merged.
- */
- boolean allowsMerge()
- {
- return allowMerge;
- }
- }
-}
Modified: jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/MultiIndex.java
===================================================================
--- jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/MultiIndex.java 2010-10-08 12:04:24 UTC (rev 3262)
+++ jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/MultiIndex.java 2010-10-08 12:39:11 UTC (rev 3263)
@@ -171,11 +171,6 @@
private long lastFileSystemFlushTime;
/**
- * The <code>IndexMerger</code> for this <code>MultiIndex</code>.
- */
- private final IndexMerger merger;
-
- /**
* Timer to schedule flushes of this index after some idle time.
*/
private static final Timer FLUSH_TIMER = new Timer(true);
@@ -265,12 +260,6 @@
// as of 1.5 deletable file is not used anymore
removeDeletable();
- // initialize IndexMerger
- merger = new IndexMerger(this);
- merger.setMaxMergeDocs(handler.getMaxMergeDocs());
- merger.setMergeFactor(handler.getMergeFactor());
- merger.setMinMergeDocs(handler.getMinMergeDocs());
-
// this method is run in privileged mode internally
IndexingQueueStore store = new IndexingQueueStore(indexDir);
@@ -299,7 +288,6 @@
index.setUseCompoundFile(handler.getUseCompoundFile());
index.setTermInfosIndexDivisor(handler.getTermInfosIndexDivisor());
indexes.add(index);
- merger.indexAdded(index.getName(), index.getNumDocuments());
}
// init volatile index
@@ -398,10 +386,16 @@
{
long count = 0;
// traverse and index workspace
- executeAndLog(new Start(Action.INTERNAL_TRANSACTION));
+
+ // TODO: this was removed
+ //executeAndLog(new Start(Action.INTERNAL_TRANSACTION));
+
// NodeData rootState = (NodeData) stateMgr.getItemData(rootId);
count = createIndex(indexingTree.getIndexingRoot(), stateMgr, count);
- executeAndLog(new Commit(getTransactionId()));
+
+ // TODO : this was replaced
+ //executeAndLog(new Commit(getTransactionId()));
+
log.info("Created initial index for {} nodes", new Long(count));
releaseMultiReader();
scheduleFlushTask();
@@ -467,23 +461,67 @@
try
{
long transactionId = nextTransactionId++;
- executeAndLog(new Start(transactionId));
+
+ // TODO: this was removed
+ //executeAndLog(new Start(transactionId));
for (Iterator it = remove.iterator(); it.hasNext();)
{
- executeAndLog(new DeleteNode(transactionId, (String)it.next()));
+ // TODO this was replaced
+ //executeAndLog(new DeleteNode(transactionId, (String)it.next()));
+
+ String uuidString = (String)it.next();
+ // check if indexing queue is still working on
+ // this node from a previous update
+ Document doc = indexingQueue.removeDocument(uuidString);
+ if (doc != null)
+ {
+ Util.disposeDocument(doc);
+ }
+ Term idTerm = new Term(FieldNames.UUID, uuidString);
+ // if the document cannot be deleted from the volatile index
+ // delete it from one of the persistent indexes.
+ int num = volatileIndex.removeDocument(idTerm);
+ if (num == 0)
+ {
+ for (int i = indexes.size() - 1; i >= 0; i--)
+ {
+ // only look in registered indexes
+ PersistentIndex idx = (PersistentIndex)indexes.get(i);
+ if (indexNames.contains(idx.getName()))
+ {
+ num = idx.removeDocument(idTerm);
+ if (num > 0)
+ {
+ break;
+ }
+ }
+ }
+ }
+
+
+
}
for (Iterator it = add.iterator(); it.hasNext();)
{
Document doc = (Document)it.next();
if (doc != null)
{
- executeAndLog(new AddNode(transactionId, doc));
+ // TODO: ths is replaced
+ //executeAndLog(new AddNode(transactionId, doc));
+
+ if (doc != null)
+ {
+ volatileIndex.addDocuments(new Document[]{doc});
+ }
+
// commit volatile index if needed
flush |= checkVolatileCommit();
}
}
- executeAndLog(new Commit(transactionId));
+
+ // TODO : this was replaced
+ //executeAndLog(new Commit(transactionId));
// flush whole index when volatile index has been commited.
if (flush)
@@ -558,11 +596,15 @@
try
{
Term idTerm = new Term(FieldNames.UUID, uuid.toString());
- executeAndLog(new Start(Action.INTERNAL_TRANSACTION));
+
+ // TODO: this was removed
+ //executeAndLog(new Start(Action.INTERNAL_TRANSACTION));
+
num = volatileIndex.removeDocument(idTerm);
if (num > 0)
{
- redoLog.append(new DeleteNode(getTransactionId(), uuid));
+ // TODO: removed line
+ //redoLog.append(new DeleteNode(getTransactionId(), uuid));
}
for (int i = 0; i < indexes.size(); i++)
{
@@ -573,12 +615,14 @@
int removed = index.removeDocument(idTerm);
if (removed > 0)
{
- redoLog.append(new DeleteNode(getTransactionId(), uuid));
+ // TODO: removed line
+ //redoLog.append(new DeleteNode(getTransactionId(), uuid));
}
num += removed;
}
}
- executeAndLog(new Commit(getTransactionId()));
+ // TODO : this was replaced
+ //executeAndLog(new Commit(getTransactionId()));
}
finally
{
@@ -792,7 +836,8 @@
// if we are reindexing there is already an active transaction
if (!reindexing)
{
- executeAndLog(new Start(Action.INTERNAL_TRANS_REPL_INDEXES));
+ // TODO: this was removed
+ //executeAndLog(new Start(Action.INTERNAL_TRANS_REPL_INDEXES));
}
// delete obsolete indexes
Set names = new HashSet(Arrays.asList(obsoleteIndexes));
@@ -802,16 +847,37 @@
String indexName = (String)it.next();
if (indexNames.contains(indexName))
{
- executeAndLog(new DeleteIndex(getTransactionId(), indexName));
+ // TODO this was replaced
+ //executeAndLog(new DeleteIndex(getTransactionId(), indexName));
+
+ for (Iterator it2 = indexes.iterator(); it2.hasNext();)
+ {
+ PersistentIndex idx = (PersistentIndex)it2.next();
+ if (idx.getName().equals(indexName))
+ {
+ idx.close();
+ deleteIndex(idx);
+ break;
+ }
+ }
+
}
}
// Index merger does not log an action when it creates the
// target
// index of the merge. We have to do this here.
- executeAndLog(new CreateIndex(getTransactionId(), index.getName()));
+ // TODO this was replaced
+ ///executeAndLog(new CreateIndex(getTransactionId(), index.getName()));
- executeAndLog(new AddIndex(getTransactionId(), index.getName()));
+ PersistentIndex idx = getOrCreateIndex(index.getName());
+
+ // TODO: this is replaced
+ //executeAndLog(new AddIndex(getTransactionId(), index.getName()));
+ if (!indexNames.contains(index.getName()))
+ {
+ indexNames.addName(index.getName());
+ }
// delete documents in index
for (Iterator it = deleted.iterator(); it.hasNext();)
@@ -825,7 +891,9 @@
{
// only commit if we are not reindexing
// when reindexing the final commit is done at the very end
- executeAndLog(new Commit(getTransactionId()));
+
+ // TODO : this was replaced
+ //executeAndLog(new Commit(getTransactionId()));
}
}
finally
@@ -954,7 +1022,6 @@
// stop index merger
// when calling this method we must not lock this MultiIndex, otherwise
// a deadlock might occur
- merger.dispose();
synchronized (this)
{
@@ -1108,7 +1175,8 @@
synchronized (this)
{
// commit volatile index
- executeAndLog(new Start(Action.INTERNAL_TRANSACTION));
+ // TODO: this was removed
+ //executeAndLog(new Start(Action.INTERNAL_TRANSACTION));
commitVolatileIndex();
// commit persistent indexes
@@ -1126,11 +1194,17 @@
// check if index still contains documents
if (index.getNumDocuments() == 0)
{
- executeAndLog(new DeleteIndex(getTransactionId(), index.getName()));
+ // TODO THIS was replaced
+ //executeAndLog(new DeleteIndex(getTransactionId(), index.getName()));
+
+ index.close();
+ deleteIndex(index);
+
}
}
}
- executeAndLog(new Commit(getTransactionId()));
+ // TODO : this was replaced
+ //executeAndLog(new Commit(getTransactionId()));
indexNames.write();
@@ -1329,16 +1403,31 @@
long time = System.currentTimeMillis();
// create index
- CreateIndex create = new CreateIndex(getTransactionId(), null);
- executeAndLog(create);
+
+ //TODO this was replaced
+// CreateIndex create = new CreateIndex(getTransactionId(), null);
+// executeAndLog(create);
+
+ PersistentIndex idx = getOrCreateIndex(null);
// commit volatile index
- executeAndLog(new VolatileCommit(getTransactionId(), create.getIndexName()));
+
+ // TODO THIS IS REPLACED
+ //executeAndLog(new VolatileCommit(getTransactionId(), idx.getName()));
+ idx.copyIndex(volatileIndex);
+ resetVolatileIndex();
+
+ // TODO this is replaced
// add new index
- AddIndex add = new AddIndex(getTransactionId(), create.getIndexName());
- executeAndLog(add);
+// AddIndex add = new AddIndex(getTransactionId(), create.getIndexName());
+// executeAndLog(add);
+ if (!indexNames.contains(idx.getName()))
+ {
+ indexNames.addName(idx.getName());
+ }
+
// create new volatile index
resetVolatileIndex();
@@ -1376,7 +1465,11 @@
{
return count;
}
- executeAndLog(new AddNode(getTransactionId(), node.getIdentifier()));
+
+ // TODO: this is replaced
+ //executeAndLog(new AddNode(getTransactionId(), node.getIdentifier()));
+ volatileIndex.addDocuments(new Document[]{createDocument(node)});
+
if (++count % 100 == 0)
{
@@ -1552,11 +1645,45 @@
{
for (Iterator it = finished.keySet().iterator(); it.hasNext();)
{
- executeAndLog(new DeleteNode(getTransactionId(), (String)it.next()));
+ // TODO this was replaced
+ //executeAndLog(new DeleteNode(getTransactionId(), (String)it.next()));
+
+ String uuidString = (String)it.next();
+ // check if indexing queue is still working on
+ // this node from a previous update
+ Document doc = indexingQueue.removeDocument(uuidString);
+ if (doc != null)
+ {
+ Util.disposeDocument(doc);
+ }
+ Term idTerm = new Term(FieldNames.UUID, uuidString);
+ // if the document cannot be deleted from the volatile index
+ // delete it from one of the persistent indexes.
+ int num = volatileIndex.removeDocument(idTerm);
+ if (num == 0)
+ {
+ for (int i = indexes.size() - 1; i >= 0; i--)
+ {
+ // only look in registered indexes
+ PersistentIndex idx = (PersistentIndex)indexes.get(i);
+ if (indexNames.contains(idx.getName()))
+ {
+ num = idx.removeDocument(idTerm);
+ if (num > 0)
+ {
+ return;
+ }
+ }
+ }
+ }
+
+
}
for (Iterator it = finished.values().iterator(); it.hasNext();)
{
- executeAndLog(new AddNode(getTransactionId(), (Document)it.next()));
+ // TODO this was replaced
+ //executeAndLog(new AddNode(getTransactionId(), (Document)it.next()));
+ volatileIndex.addDocuments(new Document[]{(Document)it.next()});
}
}
else
@@ -1790,35 +1917,35 @@
Action a;
if (actionLabel.equals(Action.ADD_NODE))
{
- a = AddNode.fromString(transactionId, arguments);
+ a = AddNode_.fromString(transactionId, arguments);
}
else if (actionLabel.equals(Action.ADD_INDEX))
{
- a = AddIndex.fromString(transactionId, arguments);
+ a = AddIndex_.fromString(transactionId, arguments);
}
else if (actionLabel.equals(Action.COMMIT))
{
- a = Commit.fromString(transactionId, arguments);
+ a = Commit_.fromString(transactionId, arguments);
}
else if (actionLabel.equals(Action.CREATE_INDEX))
{
- a = CreateIndex.fromString(transactionId, arguments);
+ a = CreateIndex_.fromString(transactionId, arguments);
}
else if (actionLabel.equals(Action.DELETE_INDEX))
{
- a = DeleteIndex.fromString(transactionId, arguments);
+ a = DeleteIndex_.fromString(transactionId, arguments);
}
else if (actionLabel.equals(Action.DELETE_NODE))
{
- a = DeleteNode.fromString(transactionId, arguments);
+ a = DeleteNode_.fromString(transactionId, arguments);
}
else if (actionLabel.equals(Action.START))
{
- a = Start.fromString(transactionId, arguments);
+ a = Start_.fromString(transactionId, arguments);
}
else if (actionLabel.equals(Action.VOLATILE_COMMIT))
{
- a = VolatileCommit.fromString(transactionId, arguments);
+ a = VolatileCommit_.fromString(transactionId, arguments);
}
else
{
@@ -1831,7 +1958,7 @@
/**
* Adds an index to the MultiIndex's active persistent index list.
*/
- private static class AddIndex extends Action
+ private static class AddIndex_ extends Action
{
/**
@@ -1848,7 +1975,7 @@
* the name of the index to add, or <code>null</code> if an
* index with a new name should be created.
*/
- AddIndex(long transactionId, String indexName)
+ AddIndex_(long transactionId, String indexName)
{
super(transactionId, Action.TYPE_ADD_INDEX);
this.indexName = indexName;
@@ -1865,9 +1992,9 @@
* @throws IllegalArgumentException
* if the arguments are malformed.
*/
- static AddIndex fromString(long transactionId, String arguments)
+ static AddIndex_ fromString(long transactionId, String arguments)
{
- return new AddIndex(transactionId, arguments);
+ return new AddIndex_(transactionId, arguments);
}
/**
@@ -1882,9 +2009,6 @@
if (!index.indexNames.contains(indexName))
{
index.indexNames.addName(indexName);
- // now that the index is in the active list let the merger know
- // about it
- index.merger.indexAdded(indexName, idx.getNumDocuments());
}
}
@@ -1907,7 +2031,7 @@
/**
* Adds a node to the index.
*/
- private static class AddNode extends Action
+ private static class AddNode_ extends Action
{
/**
@@ -1935,7 +2059,7 @@
* @param uuid
* the uuid of the node to add.
*/
- AddNode(long transactionId, String uuid)
+ AddNode_(long transactionId, String uuid)
{
super(transactionId, Action.TYPE_ADD_NODE);
this.uuid = uuid;
@@ -1949,7 +2073,7 @@
* @param doc
* the document to add.
*/
- AddNode(long transactionId, Document doc)
+ AddNode_(long transactionId, Document doc)
{
this(transactionId, doc.get(FieldNames.UUID));
this.doc = doc;
@@ -1966,14 +2090,14 @@
* @throws IllegalArgumentException
* if the arguments are malformed. Not a UUID.
*/
- static AddNode fromString(long transactionId, String arguments) throws IllegalArgumentException
+ static AddNode_ fromString(long transactionId, String arguments) throws IllegalArgumentException
{
// simple length check
if (arguments.length() != Constants.UUID_FORMATTED_LENGTH)
{
throw new IllegalArgumentException("arguments is not a uuid");
}
- return new AddNode(transactionId, arguments);
+ return new AddNode_(transactionId, arguments);
}
/**
@@ -2021,7 +2145,7 @@
/**
* Commits a transaction.
*/
- private static class Commit extends Action
+ private static class Commit_ extends Action
{
/**
@@ -2030,7 +2154,7 @@
* @param transactionId
* the id of the transaction that is committed.
*/
- Commit(long transactionId)
+ Commit_(long transactionId)
{
super(transactionId, Action.TYPE_COMMIT);
}
@@ -2044,9 +2168,9 @@
* ignored by this method.
* @return the Commit action.
*/
- static Commit fromString(long transactionId, String arguments)
+ static Commit_ fromString(long transactionId, String arguments)
{
- return new Commit(transactionId);
+ return new Commit_(transactionId);
}
/**
@@ -2074,7 +2198,7 @@
* Creates an new sub index but does not add it to the active persistent
* index list.
*/
- private static class CreateIndex extends Action
+ private static class CreateIndex_ extends Action
{
/**
@@ -2091,7 +2215,7 @@
* the name of the index to add, or <code>null</code> if an
* index with a new name should be created.
*/
- CreateIndex(long transactionId, String indexName)
+ CreateIndex_(long transactionId, String indexName)
{
super(transactionId, Action.TYPE_CREATE_INDEX);
this.indexName = indexName;
@@ -2108,10 +2232,10 @@
* @throws IllegalArgumentException
* if the arguments are malformed.
*/
- static CreateIndex fromString(long transactionId, String arguments)
+ static CreateIndex_ fromString(long transactionId, String arguments)
{
// when created from String, this action is executed as redo action
- return new CreateIndex(transactionId, arguments);
+ return new CreateIndex_(transactionId, arguments);
}
/**
@@ -2171,7 +2295,7 @@
/**
* Closes and deletes an index that is no longer in use.
*/
- private static class DeleteIndex extends Action
+ private static class DeleteIndex_ extends Action
{
/**
@@ -2187,7 +2311,7 @@
* @param indexName
* the name of the index to delete.
*/
- DeleteIndex(long transactionId, String indexName)
+ DeleteIndex_(long transactionId, String indexName)
{
super(transactionId, Action.TYPE_DELETE_INDEX);
this.indexName = indexName;
@@ -2204,9 +2328,9 @@
* @throws IllegalArgumentException
* if the arguments are malformed.
*/
- static DeleteIndex fromString(long transactionId, String arguments)
+ static DeleteIndex_ fromString(long transactionId, String arguments)
{
- return new DeleteIndex(transactionId, arguments);
+ return new DeleteIndex_(transactionId, arguments);
}
/**
@@ -2249,7 +2373,7 @@
/**
* Deletes a node from the index.
*/
- private static class DeleteNode extends Action
+ private static class DeleteNode_ extends Action
{
/**
@@ -2271,7 +2395,7 @@
* @param uuid
* the uuid of the node to delete.
*/
- DeleteNode(long transactionId, String uuid)
+ DeleteNode_(long transactionId, String uuid)
{
super(transactionId, Action.TYPE_DELETE_NODE);
this.uuid = uuid;
@@ -2288,14 +2412,14 @@
* @throws IllegalArgumentException
* if the arguments are malformed. Not a UUID.
*/
- static DeleteNode fromString(long transactionId, String arguments)
+ static DeleteNode_ fromString(long transactionId, String arguments)
{
// simple length check
if (arguments.length() != Constants.UUID_FORMATTED_LENGTH)
{
throw new IllegalArgumentException("arguments is not a uuid");
}
- return new DeleteNode(transactionId, arguments);
+ return new DeleteNode_(transactionId, arguments);
}
/**
@@ -2355,7 +2479,7 @@
/**
* Starts a transaction.
*/
- private static class Start extends Action
+ private static class Start_ extends Action
{
/**
@@ -2364,7 +2488,7 @@
* @param transactionId
* the id of the transaction that started.
*/
- Start(long transactionId)
+ Start_(long transactionId)
{
super(transactionId, Action.TYPE_START);
}
@@ -2378,9 +2502,9 @@
* ignored by this method.
* @return the Start action.
*/
- static Start fromString(long transactionId, String arguments)
+ static Start_ fromString(long transactionId, String arguments)
{
- return new Start(transactionId);
+ return new Start_(transactionId);
}
/**
@@ -2407,7 +2531,7 @@
/**
* Commits the volatile index to disk.
*/
- private static class VolatileCommit extends Action
+ private static class VolatileCommit_ extends Action
{
/**
@@ -2421,7 +2545,7 @@
* @param transactionId
* the id of the transaction that executes this action.
*/
- VolatileCommit(long transactionId, String targetIndex)
+ VolatileCommit_(long transactionId, String targetIndex)
{
super(transactionId, Action.TYPE_VOLATILE_COMMIT);
this.targetIndex = targetIndex;
@@ -2436,9 +2560,9 @@
* ignored by this implementation.
* @return the VolatileCommit action.
*/
- static VolatileCommit fromString(long transactionId, String arguments)
+ static VolatileCommit_ fromString(long transactionId, String arguments)
{
- return new VolatileCommit(transactionId, arguments);
+ return new VolatileCommit_(transactionId, arguments);
}
/**
@@ -2500,7 +2624,6 @@
protected void setReadOny()
{
// try to stop merger in safe way
- merger.dispose();
flushTask.cancel();
FLUSH_TIMER.purge();
this.redoLog = null;
@@ -2532,18 +2655,8 @@
attemptDelete();
// now that we are ready, start index merger
- merger.start();
if (redoLogApplied)
{
- // wait for the index merge to finish pending jobs
- try
- {
- merger.waitUntilIdle();
- }
- catch (InterruptedException e)
- {
- // move on
- }
flush();
}
Modified: jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/RedoLog.java
===================================================================
--- jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/RedoLog.java 2010-10-08 12:04:24 UTC (rev 3262)
+++ jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/RedoLog.java 2010-10-08 12:39:11 UTC (rev 3263)
@@ -193,7 +193,10 @@
out.close();
out = null;
}
- dir.deleteFile(REDO_LOG);
+ if (dir.fileExists(REDO_LOG))
+ {
+ dir.deleteFile(REDO_LOG);
+ }
entryCount = 0;
return null;
}
@@ -208,17 +211,17 @@
private void initOut() throws IOException
{
SecurityHelper.doPriviledgedIOExceptionAction(new PrivilegedExceptionAction<Object>()
+ {
+ public Object run() throws Exception
{
- public Object run() throws Exception
- {
if (out == null)
{
OutputStream os = new IndexOutputStream(dir.createOutput(REDO_LOG));
out = new BufferedWriter(new OutputStreamWriter(os));
}
- return null;
- }
- });
+ return null;
+ }
+ });
}
/**
@@ -231,9 +234,9 @@
private void read(final ActionCollector collector) throws IOException
{
SecurityHelper.doPriviledgedIOExceptionAction(new PrivilegedExceptionAction<Object>()
+ {
+ public Object run() throws Exception
{
- public Object run() throws Exception
- {
if (!dir.fileExists(REDO_LOG))
{
return null;
@@ -269,9 +272,9 @@
}
}
}
- return null;
- }
- });
+ return null;
+ }
+ });
}
//-----------------------< internal >---------------------------------------
More information about the exo-jcr-commits
mailing list