Author: skabashnyuk
Date: 2010-02-26 03:32:46 -0500 (Fri, 26 Feb 2010)
New Revision: 1955
Modified:
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/JBossCacheIndexChangesFilter.java
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/JBossCacheIndexUpdateMonitor.java
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/DefaultIndexUpdateMonitor.java
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/IndexUpdateMonitor.java
jcr/trunk/exo.jcr.component.core/src/test/java/org/exoplatform/services/jcr/lab/cluster/prepare/TestIndexUpdateMonitor.java
Log:
EXOJCR-540 : Changed logic of system workspace handling in
JBossCacheIndexUpdateMonitor class in a same way as it did in
JBossCacheIndexInfos class
Modified:
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/JBossCacheIndexChangesFilter.java
===================================================================
---
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/JBossCacheIndexChangesFilter.java 2010-02-25
20:36:58 UTC (rev 1954)
+++
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/JBossCacheIndexChangesFilter.java 2010-02-26
08:32:46 UTC (rev 1955)
@@ -90,7 +90,8 @@
// try to get pushState parameters, since they are set programmatically only
Boolean pushState =
config.getParameterBoolean(QueryHandlerParams.PARAM_JBOSSCACHE_PUSHSTATE, false);
- Integer pushStateTimeOut =
config.getParameterInteger(QueryHandlerParams.PARAM_JBOSSCACHE_PUSHSTATE_TIMEOUT, 10000);
+ Integer pushStateTimeOut =
+
config.getParameterInteger(QueryHandlerParams.PARAM_JBOSSCACHE_PUSHSTATE_TIMEOUT, 10000);
singletonStoreProperties.setProperty("pushStateWhenCoordinator",
pushState.toString());
singletonStoreProperties.setProperty("pushStateWhenCoordinatorTimeout",
pushStateTimeOut.toString());
@@ -126,13 +127,13 @@
if (!parentHandler.isInitialized())
{
parentHandler.setIndexInfos(new JBossCacheIndexInfos(cache, true,
modeHandler));
- parentHandler.setIndexUpdateMonitor(new JBossCacheIndexUpdateMonitor(cache,
modeHandler));
+ parentHandler.setIndexUpdateMonitor(new JBossCacheIndexUpdateMonitor(cache,
true, modeHandler));
parentHandler.init();
}
if (!handler.isInitialized())
{
handler.setIndexInfos(new JBossCacheIndexInfos(cache, false, modeHandler));
- handler.setIndexUpdateMonitor(new JBossCacheIndexUpdateMonitor(cache,
modeHandler));
+ handler.setIndexUpdateMonitor(new JBossCacheIndexUpdateMonitor(cache, false,
modeHandler));
handler.init();
}
Modified:
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/JBossCacheIndexUpdateMonitor.java
===================================================================
---
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/JBossCacheIndexUpdateMonitor.java 2010-02-25
20:36:58 UTC (rev 1954)
+++
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/JBossCacheIndexUpdateMonitor.java 2010-02-26
08:32:46 UTC (rev 1955)
@@ -21,17 +21,15 @@
import org.exoplatform.services.jcr.impl.core.query.IndexerIoMode;
import org.exoplatform.services.jcr.impl.core.query.IndexerIoModeHandler;
import org.exoplatform.services.jcr.impl.core.query.IndexerIoModeListener;
+import org.exoplatform.services.jcr.impl.core.query.lucene.IndexInfos;
import org.exoplatform.services.jcr.impl.core.query.lucene.IndexUpdateMonitor;
import org.exoplatform.services.jcr.impl.core.query.lucene.IndexUpdateMonitorListener;
import org.exoplatform.services.log.ExoLogger;
import org.exoplatform.services.log.Log;
import org.jboss.cache.Cache;
import org.jboss.cache.CacheException;
-import org.jboss.cache.CacheSPI;
import org.jboss.cache.Fqn;
import org.jboss.cache.Node;
-import org.jboss.cache.lock.LockManager;
-import org.jboss.cache.lock.LockType;
import org.jboss.cache.notifications.annotation.CacheListener;
import org.jboss.cache.notifications.annotation.NodeModified;
import org.jboss.cache.notifications.event.NodeModifiedEvent;
@@ -56,8 +54,10 @@
private final Cache<Serializable, Object> cache;
- private final static Fqn PARAMETER_ROOT =
Fqn.fromString("INDEX_UPDATE_MONITOR");
+ private static final String INDEX_PARAMETERS =
"$index_parameters".intern();
+ private static final String SYSINDEX_PARAMETERS =
"$sysindex_parameters".intern();
+
private final static String PARAMETER_NAME = "index-update-in-progress";
public final IndexerIoModeHandler modeHandler;
@@ -68,25 +68,34 @@
private final List<IndexUpdateMonitorListener> listeners;
/**
+ * This FQN points to cache node, where list of indexes for this {@link IndexInfos}
instance is stored.
+ */
+ private final Fqn parametersFqn;
+
+ /**
* @param cache instance of JbossCache that is used to deliver index names
*/
- public JBossCacheIndexUpdateMonitor(Cache<Serializable, Object> cache,
IndexerIoModeHandler modeHandler)
+ public JBossCacheIndexUpdateMonitor(Cache<Serializable, Object> cache, boolean
system,
+ IndexerIoModeHandler modeHandler)
{
this.cache = cache;
this.modeHandler = modeHandler;
this.listeners = new CopyOnWriteArrayList<IndexUpdateMonitorListener>();
+ // store parsed FQN to avoid it's parsing each time cache event is generated
+ this.parametersFqn = Fqn.fromString(system ? INDEX_PARAMETERS :
SYSINDEX_PARAMETERS);
+
modeHandler.addIndexerIoModeListener(this);
Node<Serializable, Object> cacheRoot = cache.getRoot();
// prepare cache structures
- if (!cacheRoot.hasChild(PARAMETER_ROOT))
+ if (!cacheRoot.hasChild(parametersFqn))
{
cache.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
- cacheRoot.addChild(PARAMETER_ROOT).setResident(true);
+ cacheRoot.addChild(parametersFqn).setResident(true);
}
else
{
- cache.getNode(PARAMETER_ROOT).setResident(true);
+ cache.getNode(parametersFqn).setResident(true);
}
if (IndexerIoMode.READ_WRITE == modeHandler.getMode())
@@ -98,6 +107,7 @@
// Currently READ_ONLY is set, so new lists should be fired to multiIndex.
cache.addCacheListener(this);
}
+
}
/**
@@ -124,44 +134,12 @@
*/
public boolean getUpdateInProgress()
{
- Object value = cache.get(PARAMETER_ROOT, PARAMETER_NAME);
+
+ Object value = cache.get(parametersFqn, PARAMETER_NAME);
return value != null ? (Boolean)value : false;
}
/**
- * Returns true if the node is locked (either for reading or writing) by anyone, and
false otherwise.
- * @param name
- * @return
- */
- public boolean isLocked(String name)
- {
- LockManager lm = ((CacheSPI<Serializable,
Object>)cache).getComponentRegistry().getComponent(LockManager.class);
- return lm.isLocked(Fqn.fromRelativeFqn(PARAMETER_ROOT, Fqn.fromString(name)));
- }
-
- /**
- * Acquires a lock of type lockType, for a given owner
- * @param name
- * @param lockType
- * @return
- * @throws InterruptedException
- */
- public boolean lock(String name, LockType lockType)
- {
-
- LockManager lm = ((CacheSPI<Serializable,
Object>)cache).getComponentRegistry().getComponent(LockManager.class);
- try
- {
- return lm.lock(Fqn.fromRelativeFqn(PARAMETER_ROOT, Fqn.fromString(name)),
lockType, Integer.MAX_VALUE);
- }
- catch (InterruptedException e)
- {
- log.warn("An error occurs while tryning to lock the node " + name,
e);
- }
- return false;
- }
-
- /**
* @see
org.exoplatform.services.jcr.impl.core.query.lucene.IndexUpdateMonitor#setUpdateInProgress(boolean)
*/
public void setUpdateInProgress(boolean updateInProgress)
@@ -172,7 +150,7 @@
}
try
{
- cache.put(PARAMETER_ROOT, PARAMETER_NAME, new Boolean(updateInProgress));
+ cache.put(parametersFqn, PARAMETER_NAME, new Boolean(updateInProgress));
for (IndexUpdateMonitorListener listener : listeners)
{
listener.onUpdateInProgressChange(updateInProgress);
@@ -186,17 +164,6 @@
}
/**
- * Releases the lock passed in
- * @param name
- */
- public void unlock(String name)
- {
- LockManager lm = ((CacheSPI<Serializable,
Object>)cache).getComponentRegistry().getComponent(LockManager.class);
- lm.unlock(Fqn.fromRelativeFqn(PARAMETER_ROOT, Fqn.fromString(name)),
cache.getInvocationContext()
- .getGlobalTransaction());
- }
-
- /**
* @see
org.exoplatform.services.jcr.impl.core.query.lucene.IndexUpdateMonitor#addIndexUpdateMonitorListener(org.exoplatform.services.jcr.impl.core.query.lucene.IndexUpdateMonitorListener)
*/
public void addIndexUpdateMonitorListener(IndexUpdateMonitorListener listener)
@@ -212,7 +179,7 @@
@NodeModified
public void cacheNodeModified(NodeModifiedEvent event)
{
- if (!event.isPre() && event.getFqn().equals(PARAMETER_ROOT))
+ if (!event.isPre() && event.getFqn().equals(parametersFqn))
{
Object value = null;
Map<?, ?> data = event.getData();
@@ -227,7 +194,7 @@
if (value == null)
{
log.warn("The data cannot be found, we will try to get it from the
cache");
- value = cache.get(PARAMETER_ROOT, PARAMETER_NAME);
+ value = cache.get(parametersFqn, PARAMETER_NAME);
}
boolean updateInProgress = value != null ? (Boolean)value : false;
for (IndexUpdateMonitorListener listener : listeners)
Modified:
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/DefaultIndexUpdateMonitor.java
===================================================================
---
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/DefaultIndexUpdateMonitor.java 2010-02-25
20:36:58 UTC (rev 1954)
+++
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/DefaultIndexUpdateMonitor.java 2010-02-26
08:32:46 UTC (rev 1955)
@@ -18,8 +18,6 @@
*/
package org.exoplatform.services.jcr.impl.core.query.lucene;
-import org.jboss.cache.lock.LockType;
-
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -32,12 +30,12 @@
public class DefaultIndexUpdateMonitor implements IndexUpdateMonitor
{
private final AtomicBoolean updateInProgress;
-
+
/**
* The list of all the listeners
*/
private final List<IndexUpdateMonitorListener> listeners;
-
+
/**
* @param semaphore
*/
@@ -67,33 +65,10 @@
for (IndexUpdateMonitorListener listener : listeners)
{
listener.onUpdateInProgressChange(updateInProgress);
- }
+ }
}
/**
- * @see
org.exoplatform.services.jcr.impl.core.query.lucene.IndexUpdateMonitor#unlock(java.lang.String)
- */
- public void unlock(String name)
- {
- }
-
- /**
- * @see
org.exoplatform.services.jcr.impl.core.query.lucene.IndexUpdateMonitor#lock(java.lang.String,
org.jboss.cache.lock.LockType)
- */
- public boolean lock(String name, LockType lockType)
- {
- return false;
- }
-
- /**
- * @see
org.exoplatform.services.jcr.impl.core.query.lucene.IndexUpdateMonitor#isLocked(java.lang.String)
- */
- public boolean isLocked(String name)
- {
- return false;
- }
-
- /**
* @see
org.exoplatform.services.jcr.impl.core.query.lucene.IndexUpdateMonitor#addIndexUpdateMonitorListener(org.exoplatform.services.jcr.impl.core.query.lucene.IndexUpdateMonitorListener)
*/
public void addIndexUpdateMonitorListener(IndexUpdateMonitorListener listener)
Modified:
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/IndexUpdateMonitor.java
===================================================================
---
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/IndexUpdateMonitor.java 2010-02-25
20:36:58 UTC (rev 1954)
+++
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/IndexUpdateMonitor.java 2010-02-26
08:32:46 UTC (rev 1955)
@@ -18,7 +18,6 @@
*/
package org.exoplatform.services.jcr.impl.core.query.lucene;
-import org.jboss.cache.lock.LockType;
/**
* @author <a href="mailto:Sergey.Kabashnyuk@exoplatform.org">Sergey
Kabashnyuk</a>
@@ -42,27 +41,5 @@
* @param listener the listener to add
*/
void addIndexUpdateMonitorListener(IndexUpdateMonitorListener listener);
-
- /**
- * Returns true if the node is locked (either for reading or writing) by anyone, and
false otherwise.
- * @param name
- * @return
- */
- public boolean isLocked(String name);
- /**
- * Acquires a lock of type lockType, for a given owner
- * @param name
- * @param lockType
- * @return
- * @throws InterruptedException
- */
- public boolean lock(String name, LockType lockType);
-
- /**
- * Releases the lock passed in
- * @param name
- */
- public void unlock(String name);
-
}
\ No newline at end of file
Modified:
jcr/trunk/exo.jcr.component.core/src/test/java/org/exoplatform/services/jcr/lab/cluster/prepare/TestIndexUpdateMonitor.java
===================================================================
---
jcr/trunk/exo.jcr.component.core/src/test/java/org/exoplatform/services/jcr/lab/cluster/prepare/TestIndexUpdateMonitor.java 2010-02-25
20:36:58 UTC (rev 1954)
+++
jcr/trunk/exo.jcr.component.core/src/test/java/org/exoplatform/services/jcr/lab/cluster/prepare/TestIndexUpdateMonitor.java 2010-02-26
08:32:46 UTC (rev 1955)
@@ -29,10 +29,8 @@
import org.jboss.cache.Cache;
import org.jboss.cache.CacheFactory;
import org.jboss.cache.DefaultCacheFactory;
-import org.jboss.cache.lock.LockType;
import java.io.Serializable;
-import java.util.concurrent.atomic.AtomicBoolean;
/**
* @author <a href="mailto:Sergey.Kabashnyuk@exoplatform.org">Sergey
Kabashnyuk</a>
@@ -59,7 +57,8 @@
// TODO Auto-generated method stub
super.setUp();
cache = createCache();
- indexUpdateMonitor = new JBossCacheIndexUpdateMonitor(cache, new
IndexerIoModeHandler(IndexerIoMode.READ_WRITE));
+ indexUpdateMonitor =
+ new JBossCacheIndexUpdateMonitor(cache, false, new
IndexerIoModeHandler(IndexerIoMode.READ_WRITE));
}
/**
@@ -92,177 +91,6 @@
}
- public void testLock() throws Exception
- {
- String lockName = "testLock";
- assertFalse(indexUpdateMonitor.isLocked(lockName));
- assertTrue(indexUpdateMonitor.lock(lockName, LockType.WRITE));
- assertTrue(indexUpdateMonitor.isLocked(lockName));
- LockChecker checker = new LockChecker(indexUpdateMonitor, lockName);
- Thread lockThread = new Thread(checker);
- assertFalse(checker.isWaiting());
- lockThread.start();
- assertTrue(checker.isWaiting());
- indexUpdateMonitor.unlock(lockName);
- assertFalse(checker.isWaiting());
-
- }
-
- public void _testMultiThread() throws Exception
- {
- AtomicBoolean atomicBoolean = new AtomicBoolean();
- ThreadGroup chengers = new ThreadGroup("Changers");
- ThreadGroup checkers = new ThreadGroup("Checkers");
- Thread[] changersArray = new Thread[10];
- Thread[] checkerArray = new Thread[10];
-
- for (int i = 0; i < changersArray.length; i++)
- {
- changersArray[i] = new Thread(chengers, new
UpdateMonitorChanger(atomicBoolean));
- changersArray[i].start();
- }
-
- for (int i = 0; i < checkerArray.length; i++)
- {
- checkerArray[i] = new Thread(checkers, new
UpdateMonitorChecker(atomicBoolean));
- checkerArray[i].start();
- }
-
- // Thread changer = new Thread(new UpdateMonitorChanger(atomicBoolean));
- // changer.start();
- // Thread checker = new Thread(new UpdateMonitorChecker(atomicBoolean));
- // checker.start();
-
- Thread.sleep(4 * 60 * 1000);
- chengers.destroy();
- checkers.destroy();
- }
-
- private class LockChecker implements Runnable
- {
- private final String lockName;
-
- private boolean waiting = false;
-
- /**
- * @return the waiting
- */
- protected boolean isWaiting()
- {
- return waiting;
- }
-
- /**
- * @param indexUpdateMonitor
- */
- public LockChecker(IndexUpdateMonitor indexUpdateMonitor, String lockName)
- {
- super();
- this.indexUpdateMonitor = indexUpdateMonitor;
- this.lockName = lockName;
- }
-
- private final IndexUpdateMonitor indexUpdateMonitor;
-
- /**
- * @see java.lang.Runnable#run()
- */
- public void run()
- {
- waiting = true;
- assertTrue(indexUpdateMonitor.lock(lockName, LockType.WRITE));
- waiting = false;
- }
- }
-
- private class UpdateMonitorChanger implements Runnable
- {
- private AtomicBoolean atomicBoolean;
-
- /**
- * @param atomicBoolean
- */
- public UpdateMonitorChanger(AtomicBoolean atomicBoolean)
- {
- super();
- this.atomicBoolean = atomicBoolean;
- }
-
- /**
- * @see java.lang.Runnable#run()
- */
- public void run()
- {
- while (!Thread.currentThread().isInterrupted())
- {
-
- synchronized (atomicBoolean)
- {
- assertEquals(atomicBoolean.get(),
indexUpdateMonitor.getUpdateInProgress());
- boolean oldValue = atomicBoolean.get();
-
- indexUpdateMonitor.setUpdateInProgress(!oldValue);
-
- if (!atomicBoolean.compareAndSet(oldValue, !oldValue))
- {
- log.warn("Fail to change monitor");
- }
- }
- }
-
- }
- }
-
- private class UpdateMonitorChecker implements Runnable
- {
-
- /**
- * @param atomicBoolean
- */
- public UpdateMonitorChecker(AtomicBoolean atomicBoolean)
- {
- super();
- this.atomicBoolean = atomicBoolean;
- }
-
- private final AtomicBoolean atomicBoolean;
-
- /**
- * @see java.lang.Runnable#run()
- */
- public void run()
- {
-
- while (!Thread.currentThread().isInterrupted())
- {
-
- synchronized (atomicBoolean)
- {
- //assertEquals(atomicBoolean.get(),
indexUpdateMonitor.getUpdateInProgress());
- if (atomicBoolean.get() == indexUpdateMonitor.getUpdateInProgress())
- {
- System.out.println("check ok");
- }
- else
- {
- System.out.println("check fail");
- }
-
- }
-
- try
- {
- Thread.sleep(100);
- }
- catch (InterruptedException e)
- {
- return;
- }
- }
-
- }
- }
-
private Cache<Serializable, Object> createCache()
{
String jbcConfig =
"conf/cluster/test-jbosscache-indexer-config-exoloader_db1_ws.xml";