Author: nzamosenchuk
Date: 2012-03-26 08:55:32 -0400 (Mon, 26 Mar 2012)
New Revision: 5967
Modified:
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/SearchManager.java
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/MultiIndex.java
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/SearchIndex.java
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/dataflow/persistent/CacheableWorkspaceDataManager.java
Log:
EXOJCR-1825 :
- Make the latchers final
- Convert isResponsibleForResuming in SearchManager and CacheableWorkspaceDataManager into
a final AtomicReference
- Convert closed and allowQuery in SearchIndex into a final AtomicBoolean
- Convert online and stopped in MultiIndex into a final AtomicBoolean
Modified:
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/SearchManager.java
===================================================================
---
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/SearchManager.java 2012-03-26
12:50:39 UTC (rev 5966)
+++
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/SearchManager.java 2012-03-26
12:55:32 UTC (rev 5967)
@@ -219,7 +219,7 @@
/**
* Indicates that node keep responsible for resuming.
*/
- protected Boolean isResponsibleForResuming = false;
+ protected final AtomicBoolean isResponsibleForResuming = new AtomicBoolean(false);
/**
* Suspend remote command.
@@ -1203,7 +1203,7 @@
{
if (rpcService != null)
{
- isResponsibleForResuming = true;
+ isResponsibleForResuming.set(true);
try
{
@@ -1276,7 +1276,7 @@
throw new ResumeException(e);
}
- isResponsibleForResuming = false;
+ isResponsibleForResuming.set(false);
}
else
{
@@ -1322,7 +1322,7 @@
hotReindexingState = "Running. Started at " +
sdf.format(Calendar.getInstance().getTime());
try
{
- isResponsibleForResuming = true;
+ isResponsibleForResuming.set(true);
// set offline cluster wide (will make merger disposed and volatile
flushed)
if (rpcService != null && changesFilter.isShared())
{
@@ -1400,7 +1400,7 @@
hotReindexingState = "Stopped with errors at " +
sdf.format(Calendar.getInstance().getTime());
LOG.info("Reindexing halted with errors.");
}
- isResponsibleForResuming = false;
+ isResponsibleForResuming.set(false);
}
}
}, "HotReindexing-" + handler.getContext().getRepositoryName() +
"-"
@@ -1513,7 +1513,7 @@
public Serializable execute(Serializable[] args) throws Throwable
{
- return isResponsibleForResuming;
+ return isResponsibleForResuming.get();
}
});
Modified:
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/MultiIndex.java
===================================================================
---
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/MultiIndex.java 2012-03-26
12:50:39 UTC (rev 5966)
+++
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/MultiIndex.java 2012-03-26
12:55:32 UTC (rev 5967)
@@ -66,6 +66,7 @@
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -238,12 +239,12 @@
* Or for any other reason it should be switched
* to offline mode.
*/
- private boolean online = true;
+ private final AtomicBoolean online = new AtomicBoolean(true);
/**
* Flag indicating whether the index is stopped.
*/
- private volatile boolean stopped;
+ private final AtomicBoolean stopped = new AtomicBoolean();
/**
* The index format version of this multi index.
@@ -268,7 +269,7 @@
@Override
public void run()
{
- stopped = true;
+ stopped.set(true);
}
};
@@ -621,12 +622,12 @@
*/
public void reindex(ItemDataConsumer stateMgr) throws IOException,
RepositoryException
{
- if (stopped)
+ if (stopped.get())
{
throw new IllegalStateException("Can't invoke reindexing on closed
index.");
}
- if (online)
+ if (online.get())
{
throw new IllegalStateException("Can't invoke reindexing while index
still online.");
}
@@ -674,7 +675,7 @@
*/
synchronized void update(final Collection<String> remove, final
Collection<Document> add) throws IOException
{
- if (!online)
+ if (!online.get())
{
doUpdateOffline(remove, add);
}
@@ -1247,7 +1248,7 @@
try
{
// if we are reindexing there is already an active transaction
- if (online)
+ if (online.get())
{
executeAndLog(new Start(Action.INTERNAL_TRANS_REPL_INDEXES));
}
@@ -1278,7 +1279,7 @@
}
index.commit();
- if (online)
+ if (online.get())
{
// only commit if we are not reindexing
// when reindexing the final commit is done at the very end
@@ -1303,7 +1304,7 @@
}
}
}
- if (!online)
+ if (!online.get())
{
// do some cleanup right away when reindexing
attemptDelete();
@@ -1367,7 +1368,7 @@
// if index in offline mode, due to hot async reindexing,
// need to return the reader containing only stale indexes (old),
// without newly created.
- List<PersistentIndex> persistedIndexesList = online ? indexes :
staleIndexes;
+ List<PersistentIndex> persistedIndexesList = online.get() ?
indexes : staleIndexes;
List<ReadOnlyIndexReader> readerList = new
ArrayList<ReadOnlyIndexReader>();
for (int i = 0; i < persistedIndexesList.size(); i++)
{
@@ -1465,7 +1466,7 @@
}
modeHandler.removeIndexerIoModeListener(this);
indexUpdateMonitor.removeIndexUpdateMonitorListener(this);
- this.stopped = true;
+ this.stopped.set(true);
// Remove the hook that will stop the threads if they are still running
SecurityHelper.doPrivilegedAction(new PrivilegedAction<Object>()
{
@@ -1489,7 +1490,7 @@
}
});
}
-
+
// stop index merger
// when calling this method we must not lock this MultiIndex, otherwise
// a deadlock might occur
@@ -1891,7 +1892,7 @@
final AtomicLong count, final AtomicLong processed) throws IOException,
RepositoryException, InterruptedException
{
processed.incrementAndGet();
- if (stopped)
+ if (stopped.get())
{
throw new InterruptedException();
}
@@ -2046,7 +2047,7 @@
for (NodeDataIndexing node : iterator.next())
{
processed.incrementAndGet();
- if (stopped)
+ if (stopped.get())
{
throw new InterruptedException();
}
@@ -2178,7 +2179,7 @@
private synchronized void checkFlush()
{
// avoid frequent flushes during reindexing;
- long idleTime = online ? System.currentTimeMillis() - lastFlushTime : 0;
+ long idleTime = online.get() ? System.currentTimeMillis() - lastFlushTime : 0;
long volatileTime = System.currentTimeMillis() - lastFileSystemFlushTime;
// do not flush if volatileIdleTime is zero or negative
if ((handler.getVolatileIdleTime() > 0 && idleTime >
handler.getVolatileIdleTime() * 1000)
@@ -3431,7 +3432,7 @@
*/
public boolean isOnline()
{
- return online;
+ return online.get();
}
/**
@@ -3439,7 +3440,7 @@
*/
public boolean isStopped()
{
- return stopped;
+ return stopped.get();
}
/**
@@ -3451,7 +3452,7 @@
public synchronized void setOnline(boolean isOnline, boolean dropStaleIndexes) throws
IOException
{
// if mode really changed
- if (online != isOnline)
+ if (online.get() != isOnline)
{
// switching to ONLINE
if (isOnline)
@@ -3460,7 +3461,7 @@
if (modeHandler.getMode() == IndexerIoMode.READ_WRITE)
{
offlineIndex.commit(true);
- online = true;
+ online.set(true);
// cleaning stale indexes
for (PersistentIndex staleIndex : staleIndexes)
{
@@ -3473,7 +3474,7 @@
}
else
{
- online = true;
+ online.set(true);
staleIndexes.clear();
}
}
@@ -3498,10 +3499,10 @@
staleIndexes.addAll(indexes);
}
- online = false;
+ online.set(false);
}
}
- else if (!online)
+ else if (!online.get())
{
throw new IOException("Index is already in OFFLINE mode.");
}
Modified:
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/SearchIndex.java
===================================================================
---
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/SearchIndex.java 2012-03-26
12:50:39 UTC (rev 5966)
+++
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/SearchIndex.java 2012-03-26
12:55:32 UTC (rev 5967)
@@ -468,12 +468,12 @@
* Indicates if this <code>SearchIndex</code> is closed and cannot be
used
* anymore.
*/
- private boolean closed = false;
+ private final AtomicBoolean closed = new AtomicBoolean(false);
/**
* Allows or denies queries while index is offline.
*/
- private boolean allowQuery = true;
+ private final AtomicBoolean allowQuery = new AtomicBoolean(true);
/**
* Text extractor for extracting text content of binary properties.
@@ -520,7 +520,7 @@
/**
* Waiting query execution until resume.
*/
- protected AtomicReference<CountDownLatch> latcher = new
AtomicReference<CountDownLatch>();
+ protected final AtomicReference<CountDownLatch> latcher = new
AtomicReference<CountDownLatch>();
/**
* Indicates if component suspended or not.
@@ -1253,7 +1253,7 @@
*/
public void close()
{
- if (!closed)
+ if (!closed.get())
{
// cleanup resources obtained by filters
if (recoveryFilters != null)
@@ -1284,7 +1284,7 @@
errorLog.close();
index.close();
getContext().destroy();
- closed = true;
+ closed.set(true);
log.info("Index closed: " + path);
}
}
@@ -1560,7 +1560,7 @@
protected IndexReader getIndexReader(boolean includeSystemIndex) throws IOException
{
// deny query execution if index in offline mode and allowQuery is false
- if (!index.isOnline() && !allowQuery)
+ if (!index.isOnline() && !allowQuery.get())
{
throw new IndexOfflineIOException("Index is offline");
}
@@ -3181,7 +3181,7 @@
*/
private void checkOpen() throws IOException
{
- if (closed)
+ if (closed.get())
{
throw new IOException("query handler closed and cannot be used
anymore.");
}
@@ -3337,11 +3337,11 @@
checkOpen();
if (isOnline)
{
- this.allowQuery = true;
+ this.allowQuery.set(true);
}
else
{
- this.allowQuery = allowQuery;
+ this.allowQuery.set(allowQuery);
}
index.setOnline(isOnline, dropStaleIndexes);
}
@@ -3372,7 +3372,7 @@
{
try
{
- closed = false;
+ closed.set(false);
doInit();
latcher.get().countDown();
Modified:
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/dataflow/persistent/CacheableWorkspaceDataManager.java
===================================================================
---
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/dataflow/persistent/CacheableWorkspaceDataManager.java 2012-03-26
12:50:39 UTC (rev 5966)
+++
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/dataflow/persistent/CacheableWorkspaceDataManager.java 2012-03-26
12:55:32 UTC (rev 5967)
@@ -147,12 +147,12 @@
/**
* Allows to make all threads waiting until resume.
*/
- protected AtomicReference<CountDownLatch> latcher = new
AtomicReference<CountDownLatch>();
+ protected final AtomicReference<CountDownLatch> latcher = new
AtomicReference<CountDownLatch>();
/**
* Indicates that node keep responsible for resuming.
*/
- protected Boolean isResponsibleForResuming = false;
+ protected final AtomicBoolean isResponsibleForResuming = new AtomicBoolean(false);
/**
* Request to all nodes to check if there is someone who responsible for resuming.
@@ -1922,7 +1922,7 @@
{
if (rpcService != null)
{
- isResponsibleForResuming = true;
+ isResponsibleForResuming.set(true);
try
{
@@ -1963,7 +1963,7 @@
throw new ResumeException(e);
}
- isResponsibleForResuming = false;
+ isResponsibleForResuming.set(false);
}
else
{
@@ -2116,7 +2116,7 @@
public Serializable execute(Serializable[] args) throws Throwable
{
- return isResponsibleForResuming;
+ return isResponsibleForResuming.get();
}
});