JBoss Cache SVN: r7909 - in core/trunk/src: main/java/org/jboss/cache and 6 other directories.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2009-03-18 07:36:28 -0400 (Wed, 18 Mar 2009)
New Revision: 7909
Added:
core/trunk/src/main/java/org/jboss/cache/util/concurrent/locks/AbstractSharedLockContainer.java
core/trunk/src/main/java/org/jboss/cache/util/concurrent/locks/LockContainer.java
core/trunk/src/main/java/org/jboss/cache/util/concurrent/locks/OwnableReentrantSharedLockContainer.java
core/trunk/src/main/java/org/jboss/cache/util/concurrent/locks/PerElementLockContainer.java
core/trunk/src/main/java/org/jboss/cache/util/concurrent/locks/PerElementOwnableReentrantLockContainer.java
core/trunk/src/main/java/org/jboss/cache/util/concurrent/locks/PerElementReentrantLockContainer.java
core/trunk/src/main/java/org/jboss/cache/util/concurrent/locks/ReentrantSharedLockContainer.java
core/trunk/src/test/java/org/jboss/cache/api/mvcc/LockPerFqnTest.java
Removed:
core/trunk/src/main/java/org/jboss/cache/util/concurrent/locks/LockContainer.java
core/trunk/src/main/java/org/jboss/cache/util/concurrent/locks/OwnableReentrantLockContainer.java
core/trunk/src/main/java/org/jboss/cache/util/concurrent/locks/ReentrantLockContainer.java
Modified:
core/trunk/src/main/docbook/userguide/en/modules/configuration_reference.xml
core/trunk/src/main/java/org/jboss/cache/RegionManagerImpl.java
core/trunk/src/main/java/org/jboss/cache/config/Configuration.java
core/trunk/src/main/java/org/jboss/cache/config/parsing/XmlConfigurationParser.java
core/trunk/src/main/java/org/jboss/cache/lock/MVCCLockManager.java
core/trunk/src/main/resources/schema/jbosscache-config-3.1.xsd
Log:
JBCACHE-1494 - Lock-per-Fqn option in addition to lock striping, for MVCC
Modified: core/trunk/src/main/docbook/userguide/en/modules/configuration_reference.xml
===================================================================
--- core/trunk/src/main/docbook/userguide/en/modules/configuration_reference.xml 2009-03-18 09:45:27 UTC (rev 7908)
+++ core/trunk/src/main/docbook/userguide/en/modules/configuration_reference.xml 2009-03-18 11:36:28 UTC (rev 7909)
@@ -23,6 +23,7 @@
lockAcquisitionTimeout="20000"
nodeLockingScheme="mvcc"
writeSkewCheck="false"
+ useLockStriping="true"
concurrencyLevel="500"/>
<!--
@@ -477,6 +478,19 @@
is <literal>mvcc</literal> and <literal>isolationLevel</literal> is <literal>REPEATABLE_READ</literal>.
See the <link linkend="mvcc.writeskew">section on write skews</link> for a more detailed discussion.</entry>
</row>
+ <row>
+ <entry><emphasis role="bold">useLockStriping</emphasis></entry>
+ <entry>useLockStriping</entry>
+ <entry>true, false</entry>
+ <entry>true</entry>
+
+ <entry>Specifies whether lock striping is used. Only used if <literal>nodeLockingScheme</literal>
+ is <literal>mvcc</literal>. Lock striping usually offers greater performance and better memory usage,
+ although in certain cases deadlocks may occur where several Fqns map to the same shared lock. This
+ can be mitigated by increasing your concurrency level, though the only concrete solution is to
+ disable lock striping altogether.
+ </entry>
+ </row>
<row>
<entry><emphasis role="bold">concurrencyLevel</emphasis></entry>
<entry>concurrencyLevel</entry>
@@ -484,7 +498,8 @@
<entry>500</entry>
<entry>Specifies the number of shared locks to use for write locks acquired. Only used if <literal>nodeLockingScheme</literal>
- is <literal>mvcc</literal>. See the <link linkend="mvcc.impl">section on JBoss Cache's MVCC implementation</link> for a more detailed discussion.</entry>
+ is <literal>mvcc</literal>, and is ignored if <literal>useLockStriping</literal> is <literal>false</literal>.
+ See the <link linkend="mvcc.impl">section on JBoss Cache's MVCC implementation</link> for a more detailed discussion.</entry>
</row>
</tbody>
</tgroup>
Modified: core/trunk/src/main/java/org/jboss/cache/RegionManagerImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/RegionManagerImpl.java 2009-03-18 09:45:27 UTC (rev 7908)
+++ core/trunk/src/main/java/org/jboss/cache/RegionManagerImpl.java 2009-03-18 11:36:28 UTC (rev 7909)
@@ -43,7 +43,7 @@
import org.jboss.cache.jmx.annotations.ManagedOperation;
import org.jboss.cache.lock.LockManager;
import org.jboss.cache.util.concurrent.locks.LockContainer;
-import org.jboss.cache.util.concurrent.locks.ReentrantLockContainer;
+import org.jboss.cache.util.concurrent.locks.ReentrantSharedLockContainer;
import org.jgroups.Address;
import java.util.ArrayList;
@@ -77,7 +77,7 @@
private EvictionConfig evictionConfig;
private final EvictionTimerTask evictionTimerTask = new EvictionTimerTask();
- private final LockContainer<Fqn> regionLocks = new ReentrantLockContainer<Fqn>(4);
+ private final LockContainer<Fqn> regionLocks = new ReentrantSharedLockContainer<Fqn>(4);
protected Configuration configuration;
protected RPCManager rpcManager;
protected LockManager lockManager;
@@ -93,12 +93,12 @@
protected final void lock(Fqn fqn)
{
- regionLocks.getLock(fqn).lock();
+ regionLocks.acquireLock(fqn);
}
protected final void unlock(Fqn fqn)
{
- regionLocks.getLock(fqn).unlock();
+ regionLocks.releaseLock(fqn);
}
@Inject
Modified: core/trunk/src/main/java/org/jboss/cache/config/Configuration.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/config/Configuration.java 2009-03-18 09:45:27 UTC (rev 7908)
+++ core/trunk/src/main/java/org/jboss/cache/config/Configuration.java 2009-03-18 11:36:28 UTC (rev 7909)
@@ -44,12 +44,7 @@
public class Configuration extends ConfigurationComponent
{
private static final long serialVersionUID = 5553791890144997466L;
-
- private Marshaller marshaller;
-
private transient JGroupsStackParser jGroupsStackParser = new JGroupsStackParser();
- private boolean invocationBatchingEnabled;
- private URL jgroupsConfigFile;
/**
* Behavior of the JVM shutdown hook registered by the cache
@@ -228,6 +223,10 @@
private int listenerAsyncQueueSize = 50000;
private int serializationExecutorPoolSize = 0;
private int serializationExecutorQueueSize = 50000;
+ private Marshaller marshaller;
+ private boolean invocationBatchingEnabled;
+ private boolean useLockStriping = true;
+ private URL jgroupsConfigFile;
@Start(priority = 1)
void correctIsolationLevels()
@@ -267,12 +266,23 @@
return writeSkewCheck;
}
+ public boolean isUseLockStriping()
+ {
+ return useLockStriping;
+ }
+
public void setWriteSkewCheck(boolean writeSkewCheck)
{
testImmutability("writeSkewCheck");
this.writeSkewCheck = writeSkewCheck;
}
+ public void setUseLockStriping(boolean useLockStriping)
+ {
+ testImmutability("useLockStriping");
+ this.useLockStriping = useLockStriping;
+ }
+
public int getConcurrencyLevel()
{
return concurrencyLevel;
Modified: core/trunk/src/main/java/org/jboss/cache/config/parsing/XmlConfigurationParser.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/config/parsing/XmlConfigurationParser.java 2009-03-18 09:45:27 UTC (rev 7908)
+++ core/trunk/src/main/java/org/jboss/cache/config/parsing/XmlConfigurationParser.java 2009-03-18 11:36:28 UTC (rev 7909)
@@ -395,19 +395,20 @@
private void configureLocking(Element element)
{
- String isolationLevel = getAttributeValue(element, "isolationLevel");
- if (existsAttribute(isolationLevel)) config.setIsolationLevel(IsolationLevel.valueOf(isolationLevel));
- String lockParentForChildInsertRemove = getAttributeValue(element, "lockParentForChildInsertRemove");
- if (existsAttribute(lockParentForChildInsertRemove))
- config.setLockParentForChildInsertRemove(getBoolean(lockParentForChildInsertRemove));
- String lockAcquisitionTimeout = getAttributeValue(element, "lockAcquisitionTimeout");
- if (existsAttribute(lockAcquisitionTimeout)) config.setLockAcquisitionTimeout(getLong(lockAcquisitionTimeout));
- String nodeLockingScheme = getAttributeValue(element, "nodeLockingScheme");
- if (existsAttribute(nodeLockingScheme)) config.setNodeLockingScheme(nodeLockingScheme);
- String writeSkewCheck = getAttributeValue(element, "writeSkewCheck");
- if (existsAttribute(writeSkewCheck)) config.setWriteSkewCheck(getBoolean(writeSkewCheck));
- String concurrencyLevel = getAttributeValue(element, "concurrencyLevel");
- if (existsAttribute(concurrencyLevel)) config.setConcurrencyLevel(getInt(concurrencyLevel));
+ String tmp = getAttributeValue(element, "isolationLevel");
+ if (existsAttribute(tmp)) config.setIsolationLevel(IsolationLevel.valueOf(tmp));
+ tmp = getAttributeValue(element, "lockParentForChildInsertRemove");
+ if (existsAttribute(tmp)) config.setLockParentForChildInsertRemove(getBoolean(tmp));
+ tmp = getAttributeValue(element, "lockAcquisitionTimeout");
+ if (existsAttribute(tmp)) config.setLockAcquisitionTimeout(getLong(tmp));
+ tmp = getAttributeValue(element, "nodeLockingScheme");
+ if (existsAttribute(tmp)) config.setNodeLockingScheme(tmp);
+ tmp = getAttributeValue(element, "writeSkewCheck");
+ if (existsAttribute(tmp)) config.setWriteSkewCheck(getBoolean(tmp));
+ tmp = getAttributeValue(element, "useLockStriping");
+ if (existsAttribute(tmp)) config.setUseLockStriping(getBoolean(tmp));
+ tmp = getAttributeValue(element, "concurrencyLevel");
+ if (existsAttribute(tmp)) config.setConcurrencyLevel(getInt(tmp));
}
private Element getSingleElement(String elementName)
Modified: core/trunk/src/main/java/org/jboss/cache/lock/MVCCLockManager.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/lock/MVCCLockManager.java 2009-03-18 09:45:27 UTC (rev 7908)
+++ core/trunk/src/main/java/org/jboss/cache/lock/MVCCLockManager.java 2009-03-18 11:36:28 UTC (rev 7909)
@@ -34,12 +34,13 @@
import org.jboss.cache.factories.annotations.Start;
import org.jboss.cache.invocation.InvocationContextContainer;
import org.jboss.cache.jmx.annotations.ManagedAttribute;
-import org.jboss.cache.jmx.annotations.ManagedOperation;
import static org.jboss.cache.lock.LockType.READ;
import org.jboss.cache.util.concurrent.locks.LockContainer;
import org.jboss.cache.util.concurrent.locks.OwnableReentrantLock;
-import org.jboss.cache.util.concurrent.locks.OwnableReentrantLockContainer;
-import org.jboss.cache.util.concurrent.locks.ReentrantLockContainer;
+import org.jboss.cache.util.concurrent.locks.OwnableReentrantSharedLockContainer;
+import org.jboss.cache.util.concurrent.locks.ReentrantSharedLockContainer;
+import org.jboss.cache.util.concurrent.locks.PerElementReentrantLockContainer;
+import org.jboss.cache.util.concurrent.locks.PerElementOwnableReentrantLockContainer;
import javax.transaction.TransactionManager;
import java.util.Collection;
@@ -92,7 +93,10 @@
@Start
public void startLockManager()
{
- lockContainer = transactionManager == null ? new ReentrantLockContainer<Fqn>(configuration.getConcurrencyLevel()) : new OwnableReentrantLockContainer<Fqn>(configuration.getConcurrencyLevel(), invocationContextContainer);
+ lockContainer =
+ configuration.isUseLockStriping() ?
+ transactionManager == null ? new ReentrantSharedLockContainer<Fqn>(configuration.getConcurrencyLevel()) : new OwnableReentrantSharedLockContainer<Fqn>(configuration.getConcurrencyLevel(), invocationContextContainer) :
+ transactionManager == null ? new PerElementReentrantLockContainer<Fqn>() : new PerElementOwnableReentrantLockContainer<Fqn>(invocationContextContainer);
}
@Start
@@ -106,8 +110,7 @@
if (lockType == READ) return true; // we don't support read locks. TODO: enforce this with an assertion
if (trace) log.trace("Attempting to lock " + fqn);
- Lock lock = lockContainer.getLock(fqn);
- return lock.tryLock(lockAcquisitionTimeout, MILLISECONDS);
+ return lockContainer.acquireLock(fqn, lockAcquisitionTimeout, MILLISECONDS);
}
public boolean lock(Fqn fqn, LockType lockType, Object owner, long timeoutMillis) throws InterruptedException
@@ -115,8 +118,7 @@
if (lockType == READ) return true; // we don't support read locks. TODO: enforce this with an assertion
if (trace) log.trace("Attempting to lock " + fqn);
- Lock lock = lockContainer.getLock(fqn);
- return lock.tryLock(timeoutMillis, MILLISECONDS);
+ return lockContainer.acquireLock(fqn, lockAcquisitionTimeout, MILLISECONDS);
}
public boolean lockAndRecord(Fqn fqn, LockType lockType, InvocationContext ctx) throws InterruptedException
@@ -124,8 +126,7 @@
if (lockType == READ) return true; // we don't support read locks. TODO: enforce this with an assertion
if (trace) log.trace("Attempting to lock " + fqn);
- Lock lock = lockContainer.getLock(fqn);
- if (lock.tryLock(ctx.getLockAcquisitionTimeout(lockAcquisitionTimeout), MILLISECONDS))
+ if (lockContainer.acquireLock(fqn, lockAcquisitionTimeout, MILLISECONDS))
{
ctx.addLock(fqn);
return true;
@@ -138,10 +139,9 @@
public void unlock(Fqn fqn, Object owner)
{
if (trace) log.trace("Attempting to unlock " + fqn);
- Lock lock = lockContainer.getLock(fqn);
try
{
- lock.unlock();
+ lockContainer.releaseLock(fqn);
}
catch (IllegalMonitorStateException imse)
{
@@ -163,7 +163,7 @@
if (trace) log.trace("Attempting to unlock " + f);
try
{
- lockContainer.getLock(f).unlock();
+ lockContainer.releaseLock(f);
}
catch (IllegalMonitorStateException imse)
{
@@ -336,10 +336,4 @@
{
return lockContainer.size() - lockContainer.getNumLocksHeld();
}
-
- @ManagedOperation(description = "Tests the spreading of locks across Fqns. For a given (String based) Fqn, this method returns the index in the lock array that it maps to.")
- public int testHashing(String fqn)
- {
- return lockContainer.hashToIndex(Fqn.fromString(fqn));
- }
}
Copied: core/trunk/src/main/java/org/jboss/cache/util/concurrent/locks/AbstractSharedLockContainer.java (from rev 7906, core/trunk/src/main/java/org/jboss/cache/util/concurrent/locks/LockContainer.java)
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/util/concurrent/locks/AbstractSharedLockContainer.java (rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/util/concurrent/locks/AbstractSharedLockContainer.java 2009-03-18 11:36:28 UTC (rev 7909)
@@ -0,0 +1,102 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.cache.util.concurrent.locks;
+
+import net.jcip.annotations.ThreadSafe;
+
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A container for locks. Used with lock striping.
+ *
+ * @author Manik Surtani (<a href="mailto:manik AT jboss DOT org">manik AT jboss DOT org</a>)
+ * @since 3.0
+ */
+@ThreadSafe
+public abstract class AbstractSharedLockContainer<E> implements LockContainer<E>
+{
+ private int lockSegmentMask;
+ private int lockSegmentShift;
+
+
+ protected int calculateNumberOfSegments(int concurrencyLevel)
+ {
+ int tempLockSegShift = 0;
+ int numLocks = 1;
+ while (numLocks < concurrencyLevel)
+ {
+ ++tempLockSegShift;
+ numLocks <<= 1;
+ }
+ lockSegmentShift = 32 - tempLockSegShift;
+ lockSegmentMask = numLocks - 1;
+ return numLocks;
+ }
+
+ final int hashToIndex(E object)
+ {
+ return (hash(object) >>> lockSegmentShift) & lockSegmentMask;
+ }
+
+ /**
+ * Returns a hash code for non-null Object x.
+ * Uses the same hash code spreader as most other java.util hash tables, except that this uses the string representation
+ * of the object passed in.
+ *
+ * @param object the object serving as a key
+ * @return the hash code
+ */
+ final int hash(E object)
+ {
+ // Spread bits to regularize both segment and index locations,
+ // using variant of single-word Wang/Jenkins hash.
+ int h = object.hashCode();
+ h += (h << 15) ^ 0xffffcd7d;
+ h ^= (h >>> 10);
+ h += (h << 3);
+ h ^= (h >>> 6);
+ h += (h << 2) + (h << 14);
+ h = h ^ (h >>> 16);
+ return h;
+ }
+
+ protected abstract void initLocks(int numLocks);
+
+ public void acquireLock(E object)
+ {
+ Lock lock = getLock(object);
+ lock.lock();
+ }
+
+ public boolean acquireLock(E object, long timeout, TimeUnit unit) throws InterruptedException
+ {
+ Lock lock = getLock(object);
+ return lock.tryLock(timeout, unit);
+ }
+
+ public void releaseLock(E object)
+ {
+ Lock lock = getLock(object);
+ lock.unlock();
+ }
+}
Property changes on: core/trunk/src/main/java/org/jboss/cache/util/concurrent/locks/AbstractSharedLockContainer.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Deleted: core/trunk/src/main/java/org/jboss/cache/util/concurrent/locks/LockContainer.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/util/concurrent/locks/LockContainer.java 2009-03-18 09:45:27 UTC (rev 7908)
+++ core/trunk/src/main/java/org/jboss/cache/util/concurrent/locks/LockContainer.java 2009-03-18 11:36:28 UTC (rev 7909)
@@ -1,116 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.cache.util.concurrent.locks;
-
-import net.jcip.annotations.ThreadSafe;
-
-import java.util.concurrent.locks.Lock;
-
-/**
- * A container for locks. Used with lock striping.
- *
- * @author Manik Surtani (<a href="mailto:manik AT jboss DOT org">manik AT jboss DOT org</a>)
- * @since 3.0
- */
-@ThreadSafe
-public abstract class LockContainer<E>
-{
- private int lockSegmentMask;
- private int lockSegmentShift;
-
-
- protected int calculateNumberOfSegments(int concurrencyLevel)
- {
- int tempLockSegShift = 0;
- int numLocks = 1;
- while (numLocks < concurrencyLevel)
- {
- ++tempLockSegShift;
- numLocks <<= 1;
- }
- lockSegmentShift = 32 - tempLockSegShift;
- lockSegmentMask = numLocks - 1;
- return numLocks;
- }
-
- public final int hashToIndex(E object)
- {
- return (hash(object) >>> lockSegmentShift) & lockSegmentMask;
- }
-
- /**
- * Returns a hash code for non-null Object x.
- * Uses the same hash code spreader as most other java.util hash tables, except that this uses the string representation
- * of the object passed in.
- *
- * @param object the object serving as a key
- * @return the hash code
- */
- final int hash(E object)
- {
- // Spread bits to regularize both segment and index locations,
- // using variant of single-word Wang/Jenkins hash.
- int h = object.hashCode();
- h += (h << 15) ^ 0xffffcd7d;
- h ^= (h >>> 10);
- h += (h << 3);
- h ^= (h >>> 6);
- h += (h << 2) + (h << 14);
- h = h ^ (h >>> 16);
- return h;
- }
-
- protected abstract void initLocks(int numLocks);
-
- /**
- * Tests if a give owner owns a lock on a specified object.
- *
- * @param object object to check
- * @param owner owner to test
- * @return true if owner owns lock, false otherwise
- */
- public abstract boolean ownsLock(E object, Object owner);
-
- /**
- * @param object object
- * @return true if an object is locked, false otherwise
- */
- public abstract boolean isLocked(E object);
-
- /**
- * @param object object
- * @return the lock for a specific object
- */
- public abstract Lock getLock(E object);
-
- /**
- * @return number of locks held
- */
- public abstract int getNumLocksHeld();
-
- /**
- * Clears all locks held and re-initialises stripes.
- */
- public abstract void reset();
-
- public abstract int size();
-}
Added: core/trunk/src/main/java/org/jboss/cache/util/concurrent/locks/LockContainer.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/util/concurrent/locks/LockContainer.java (rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/util/concurrent/locks/LockContainer.java 2009-03-18 11:36:28 UTC (rev 7909)
@@ -0,0 +1,52 @@
+package org.jboss.cache.util.concurrent.locks;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+
+/**
+ * A container for locks
+ *
+ * @author Manik Surtani
+ * @since 3.1.0
+ */
+public interface LockContainer<E>
+{
+ /**
+ * Tests if a give owner owns a lock on a specified object.
+ *
+ * @param object object to check
+ * @param owner owner to test
+ * @return true if owner owns lock, false otherwise
+ */
+ boolean ownsLock(E object, Object owner);
+
+ /**
+ * @param object object
+ * @return true if an object is locked, false otherwise
+ */
+ boolean isLocked(E object);
+
+ /**
+ * @param object object
+ * @return the lock for a specific object
+ */
+ Lock getLock(E object);
+
+ /**
+ * @return number of locks held
+ */
+ int getNumLocksHeld();
+
+ /**
+ * Clears all locks held and re-initialises stripes.
+ */
+ void reset();
+
+ int size();
+
+ void acquireLock(E object);
+
+ boolean acquireLock(E object, long timeout, TimeUnit unit) throws InterruptedException;
+
+ void releaseLock(E object);
+}
Deleted: core/trunk/src/main/java/org/jboss/cache/util/concurrent/locks/OwnableReentrantLockContainer.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/util/concurrent/locks/OwnableReentrantLockContainer.java 2009-03-18 09:45:27 UTC (rev 7908)
+++ core/trunk/src/main/java/org/jboss/cache/util/concurrent/locks/OwnableReentrantLockContainer.java 2009-03-18 11:36:28 UTC (rev 7909)
@@ -1,101 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.cache.util.concurrent.locks;
-
-import net.jcip.annotations.ThreadSafe;
-import org.jboss.cache.invocation.InvocationContextContainer;
-
-import java.util.Arrays;
-
-/**
- * A LockContainer that holds {@link org.jboss.cache.util.concurrent.locks.OwnableReentrantLock}s.
- *
- * @author Manik Surtani (<a href="mailto:manik AT jboss DOT org">manik AT jboss DOT org</a>)
- * @see org.jboss.cache.util.concurrent.locks.ReentrantLockContainer
- * @see org.jboss.cache.util.concurrent.locks.OwnableReentrantLock
- * @since 3.0
- */
-@ThreadSafe
-public class OwnableReentrantLockContainer<E> extends LockContainer<E>
-{
- OwnableReentrantLock[] sharedLocks;
- InvocationContextContainer icc;
-
- /**
- * Creates a new LockContainer which uses a certain number of shared locks across all elements that need to be locked.
- *
- * @param concurrencyLevel concurrency level for number of stripes to create. Stripes are created in powers of two, with a minimum of concurrencyLevel created.
- * @param icc invocation context container to use
- */
- public OwnableReentrantLockContainer(int concurrencyLevel, InvocationContextContainer icc)
- {
- this.icc = icc;
- initLocks(calculateNumberOfSegments(concurrencyLevel));
- }
-
- protected void initLocks(int numLocks)
- {
- sharedLocks = new OwnableReentrantLock[numLocks];
- for (int i = 0; i < numLocks; i++) sharedLocks[i] = new OwnableReentrantLock(icc);
- }
-
- public final OwnableReentrantLock getLock(E object)
- {
- return sharedLocks[hashToIndex(object)];
- }
-
- public final boolean ownsLock(E object, Object owner)
- {
- OwnableReentrantLock lock = getLock(object);
- return owner.equals(lock.getOwner());
- }
-
- public final boolean isLocked(E object)
- {
- OwnableReentrantLock lock = getLock(object);
- return lock.isLocked();
- }
-
- public final int getNumLocksHeld()
- {
- int i = 0;
- for (OwnableReentrantLock l : sharedLocks) if (l.isLocked()) i++;
- return i;
- }
-
- public String toString()
- {
- return "OwnableReentrantLockContainer{" +
- "sharedLocks=" + (sharedLocks == null ? null : Arrays.asList(sharedLocks)) +
- '}';
- }
-
- public void reset()
- {
- initLocks(sharedLocks.length);
- }
-
- public int size()
- {
- return sharedLocks.length;
- }
-}
Copied: core/trunk/src/main/java/org/jboss/cache/util/concurrent/locks/OwnableReentrantSharedLockContainer.java (from rev 7906, core/trunk/src/main/java/org/jboss/cache/util/concurrent/locks/OwnableReentrantLockContainer.java)
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/util/concurrent/locks/OwnableReentrantSharedLockContainer.java (rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/util/concurrent/locks/OwnableReentrantSharedLockContainer.java 2009-03-18 11:36:28 UTC (rev 7909)
@@ -0,0 +1,101 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.cache.util.concurrent.locks;
+
+import net.jcip.annotations.ThreadSafe;
+import org.jboss.cache.invocation.InvocationContextContainer;
+
+import java.util.Arrays;
+
+/**
+ * A LockContainer that holds {@link org.jboss.cache.util.concurrent.locks.OwnableReentrantLock}s.
+ *
+ * @author Manik Surtani (<a href="mailto:manik AT jboss DOT org">manik AT jboss DOT org</a>)
+ * @see ReentrantSharedLockContainer
+ * @see org.jboss.cache.util.concurrent.locks.OwnableReentrantLock
+ * @since 3.0
+ */
+@ThreadSafe
+public class OwnableReentrantSharedLockContainer<E> extends AbstractSharedLockContainer<E>
+{
+ OwnableReentrantLock[] sharedLocks;
+ InvocationContextContainer icc;
+
+ /**
+ * Creates a new LockContainer which uses a certain number of shared locks across all elements that need to be locked.
+ *
+ * @param concurrencyLevel concurrency level for number of stripes to create. Stripes are created in powers of two, with a minimum of concurrencyLevel created.
+ * @param icc invocation context container to use
+ */
+ public OwnableReentrantSharedLockContainer(int concurrencyLevel, InvocationContextContainer icc)
+ {
+ this.icc = icc;
+ initLocks(calculateNumberOfSegments(concurrencyLevel));
+ }
+
+ protected void initLocks(int numLocks)
+ {
+ sharedLocks = new OwnableReentrantLock[numLocks];
+ for (int i = 0; i < numLocks; i++) sharedLocks[i] = new OwnableReentrantLock(icc);
+ }
+
+ public final OwnableReentrantLock getLock(E object)
+ {
+ return sharedLocks[hashToIndex(object)];
+ }
+
+ public final boolean ownsLock(E object, Object owner)
+ {
+ OwnableReentrantLock lock = getLock(object);
+ return owner.equals(lock.getOwner());
+ }
+
+ public final boolean isLocked(E object)
+ {
+ OwnableReentrantLock lock = getLock(object);
+ return lock.isLocked();
+ }
+
+ public final int getNumLocksHeld()
+ {
+ int i = 0;
+ for (OwnableReentrantLock l : sharedLocks) if (l.isLocked()) i++;
+ return i;
+ }
+
+ public String toString()
+ {
+ return "OwnableReentrantSharedLockContainer{" +
+ "sharedLocks=" + (sharedLocks == null ? null : Arrays.asList(sharedLocks)) +
+ '}';
+ }
+
+ public void reset()
+ {
+ initLocks(sharedLocks.length);
+ }
+
+ public int size()
+ {
+ return sharedLocks.length;
+ }
+}
Property changes on: core/trunk/src/main/java/org/jboss/cache/util/concurrent/locks/OwnableReentrantSharedLockContainer.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Added: core/trunk/src/main/java/org/jboss/cache/util/concurrent/locks/PerElementLockContainer.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/util/concurrent/locks/PerElementLockContainer.java (rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/util/concurrent/locks/PerElementLockContainer.java 2009-03-18 11:36:28 UTC (rev 7909)
@@ -0,0 +1,87 @@
+package org.jboss.cache.util.concurrent.locks;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+
+/**
+ * A lock container that maintains a new lock per element
+ *
+ * @author Manik Surtani
+ * @since 3.1.0
+ */
+public abstract class PerElementLockContainer<E> implements LockContainer<E>
+{
+ ConcurrentMap<E, Lock> locks = new ConcurrentHashMap<E, Lock>();
+
+ protected abstract Lock newLock();
+
+ public Lock getLock(E object)
+ {
+ Lock l = newLock();
+ Lock tmp = locks.putIfAbsent(object, l);
+ if (tmp != null) l = tmp;
+ return l;
+ }
+
+ public int getNumLocksHeld()
+ {
+ return locks.size();
+ }
+
+ public void reset()
+ {
+ for (Lock l: locks.values())
+ {
+ try
+ {
+ l.unlock();
+ }
+ catch (Exception e)
+ {
+ // no-op
+ }
+ }
+ locks.clear();
+ }
+
+ public int size()
+ {
+ return locks.size();
+ }
+
+ public void acquireLock(E object)
+ {
+ Lock l = getLock(object);
+ l.lock();
+ // now check that the lock is still valid...
+ if (l != locks.get(object))
+ {
+ // we acquired the wrong lock!
+ l.unlock();
+ acquireLock(object);
+ }
+ }
+
+ public boolean acquireLock(E object, long timeout, TimeUnit unit) throws InterruptedException
+ {
+ Lock l = getLock(object);
+ boolean result = l.tryLock(timeout, unit);
+
+ // now check that the lock is still valid...
+ if (result && l != locks.get(object))
+ {
+ // we acquired the wrong lock!
+ l.unlock();
+ result = acquireLock(object, timeout, unit);
+ }
+ return result;
+ }
+
+ public void releaseLock(E object)
+ {
+ Lock l = locks.remove(object);
+ if (l != null) l.unlock();
+ }
+}
Added: core/trunk/src/main/java/org/jboss/cache/util/concurrent/locks/PerElementOwnableReentrantLockContainer.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/util/concurrent/locks/PerElementOwnableReentrantLockContainer.java (rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/util/concurrent/locks/PerElementOwnableReentrantLockContainer.java 2009-03-18 11:36:28 UTC (rev 7909)
@@ -0,0 +1,43 @@
+package org.jboss.cache.util.concurrent.locks;
+
+import org.jboss.cache.invocation.InvocationContextContainer;
+
+import java.util.concurrent.locks.Lock;
+
+/**
+ * Per element container for {@link org.jboss.cache.util.concurrent.locks.OwnableReentrantLock}s
+ *
+ * @author Manik Surtani
+ * @since 3.1.0
+ */
+public class PerElementOwnableReentrantLockContainer<E> extends PerElementLockContainer<E>
+{
+ private InvocationContextContainer icc;
+
+ public PerElementOwnableReentrantLockContainer(InvocationContextContainer icc)
+ {
+ this.icc = icc;
+ }
+
+ public boolean ownsLock(E object, Object owner)
+ {
+ OwnableReentrantLock l = getLockFromMap(object);
+ return l != null && owner.equals(l.getOwner());
+ }
+
+ public boolean isLocked(E object)
+ {
+ OwnableReentrantLock l = getLockFromMap(object);
+ return l != null && l.isLocked();
+ }
+
+ private OwnableReentrantLock getLockFromMap(E key)
+ {
+ return (OwnableReentrantLock) locks.get(key);
+ }
+
+ protected final Lock newLock()
+ {
+ return new OwnableReentrantLock(icc);
+ }
+}
Added: core/trunk/src/main/java/org/jboss/cache/util/concurrent/locks/PerElementReentrantLockContainer.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/util/concurrent/locks/PerElementReentrantLockContainer.java (rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/util/concurrent/locks/PerElementReentrantLockContainer.java 2009-03-18 11:36:28 UTC (rev 7909)
@@ -0,0 +1,35 @@
+package org.jboss.cache.util.concurrent.locks;
+
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Per-element container for {@link java.util.concurrent.locks.ReentrantLock}s
+ *
+ * @author Manik Surtani
+ * @since 3.1.0
+ */
+public class PerElementReentrantLockContainer<E> extends PerElementLockContainer<E>
+{
+ public boolean ownsLock(E object, Object owner)
+ {
+ ReentrantLock l = getLockFromMap(object);
+ return l != null && l.isHeldByCurrentThread();
+ }
+
+ public boolean isLocked(E object)
+ {
+ ReentrantLock l = getLockFromMap(object);
+ return l != null && l.isLocked();
+ }
+
+ private ReentrantLock getLockFromMap(E key)
+ {
+ return (ReentrantLock) locks.get(key);
+ }
+
+ protected final Lock newLock()
+ {
+ return new ReentrantLock();
+ }
+}
Deleted: core/trunk/src/main/java/org/jboss/cache/util/concurrent/locks/ReentrantLockContainer.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/util/concurrent/locks/ReentrantLockContainer.java 2009-03-18 09:45:27 UTC (rev 7908)
+++ core/trunk/src/main/java/org/jboss/cache/util/concurrent/locks/ReentrantLockContainer.java 2009-03-18 11:36:28 UTC (rev 7909)
@@ -1,97 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.cache.util.concurrent.locks;
-
-import net.jcip.annotations.ThreadSafe;
-
-import java.util.Arrays;
-import java.util.concurrent.locks.ReentrantLock;
-
-/**
- * A LockContainer that holds ReentrantLocks
- *
- * @author Manik Surtani (<a href="mailto:manik AT jboss DOT org">manik AT jboss DOT org</a>)
- * @see org.jboss.cache.util.concurrent.locks.OwnableReentrantLockContainer
- * @since 3.0
- */
-@ThreadSafe
-public class ReentrantLockContainer<E> extends LockContainer<E>
-{
- ReentrantLock[] sharedLocks;
-
- /**
- * Creates a new LockContainer which uses a certain number of shared locks across all elements that need to be locked.
- *
- * @param concurrencyLevel concurrency level for number of stripes to create. Stripes are created in powers of two, with a minimum of concurrencyLevel created.
- */
- public ReentrantLockContainer(int concurrencyLevel)
- {
- initLocks(calculateNumberOfSegments(concurrencyLevel));
- }
-
- protected void initLocks(int numLocks)
- {
- sharedLocks = new ReentrantLock[numLocks];
- for (int i = 0; i < numLocks; i++) sharedLocks[i] = new ReentrantLock();
- }
-
- public final ReentrantLock getLock(E object)
- {
- return sharedLocks[hashToIndex(object)];
- }
-
- public final int getNumLocksHeld()
- {
- int i = 0;
- for (ReentrantLock l : sharedLocks) if (l.isLocked()) i++;
- return i;
- }
-
- public final boolean ownsLock(E object, Object owner)
- {
- ReentrantLock lock = getLock(object);
- return lock.isHeldByCurrentThread();
- }
-
- public final boolean isLocked(E object)
- {
- ReentrantLock lock = getLock(object);
- return lock.isLocked();
- }
-
- public String toString()
- {
- return "ReentrantLockContainer{" +
- "sharedLocks=" + (sharedLocks == null ? null : Arrays.asList(sharedLocks)) +
- '}';
- }
-
- public void reset()
- {
- initLocks(sharedLocks.length);
- }
-
- public int size()
- {
- return sharedLocks.length;
- }
-}
Copied: core/trunk/src/main/java/org/jboss/cache/util/concurrent/locks/ReentrantSharedLockContainer.java (from rev 7906, core/trunk/src/main/java/org/jboss/cache/util/concurrent/locks/ReentrantLockContainer.java)
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/util/concurrent/locks/ReentrantSharedLockContainer.java (rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/util/concurrent/locks/ReentrantSharedLockContainer.java 2009-03-18 11:36:28 UTC (rev 7909)
@@ -0,0 +1,97 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.cache.util.concurrent.locks;
+
+import net.jcip.annotations.ThreadSafe;
+
+import java.util.Arrays;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * A LockContainer that holds ReentrantLocks
+ *
+ * @author Manik Surtani (<a href="mailto:manik AT jboss DOT org">manik AT jboss DOT org</a>)
+ * @see OwnableReentrantSharedLockContainer
+ * @since 3.0
+ */
+@ThreadSafe
+public class ReentrantSharedLockContainer<E> extends AbstractSharedLockContainer<E>
+{
+ ReentrantLock[] sharedLocks;
+
+ /**
+ * Creates a new LockContainer which uses a certain number of shared locks across all elements that need to be locked.
+ *
+ * @param concurrencyLevel concurrency level for number of stripes to create. Stripes are created in powers of two, with a minimum of concurrencyLevel created.
+ */
+ public ReentrantSharedLockContainer(int concurrencyLevel)
+ {
+ initLocks(calculateNumberOfSegments(concurrencyLevel));
+ }
+
+ protected void initLocks(int numLocks)
+ {
+ sharedLocks = new ReentrantLock[numLocks];
+ for (int i = 0; i < numLocks; i++) sharedLocks[i] = new ReentrantLock();
+ }
+
+ public final ReentrantLock getLock(E object)
+ {
+ return sharedLocks[hashToIndex(object)];
+ }
+
+ public final int getNumLocksHeld()
+ {
+ int i = 0;
+ for (ReentrantLock l : sharedLocks) if (l.isLocked()) i++;
+ return i;
+ }
+
+ public final boolean ownsLock(E object, Object owner)
+ {
+ ReentrantLock lock = getLock(object);
+ return lock.isHeldByCurrentThread();
+ }
+
+ public final boolean isLocked(E object)
+ {
+ ReentrantLock lock = getLock(object);
+ return lock.isLocked();
+ }
+
+ public String toString()
+ {
+ return "ReentrantSharedLockContainer{" +
+ "sharedLocks=" + (sharedLocks == null ? null : Arrays.asList(sharedLocks)) +
+ '}';
+ }
+
+ public void reset()
+ {
+ initLocks(sharedLocks.length);
+ }
+
+ public int size()
+ {
+ return sharedLocks.length;
+ }
+}
Property changes on: core/trunk/src/main/java/org/jboss/cache/util/concurrent/locks/ReentrantSharedLockContainer.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Modified: core/trunk/src/main/resources/schema/jbosscache-config-3.1.xsd
===================================================================
--- core/trunk/src/main/resources/schema/jbosscache-config-3.1.xsd 2009-03-18 09:45:27 UTC (rev 7908)
+++ core/trunk/src/main/resources/schema/jbosscache-config-3.1.xsd 2009-03-18 11:36:28 UTC (rev 7909)
@@ -86,6 +86,7 @@
</xs:simpleType>
</xs:attribute>
<xs:attribute name="writeSkewCheck" type="tns:booleanType"/>
+ <xs:attribute name="useLockStriping" type="tns:booleanType"/>
<xs:attribute name="concurrencyLevel" type="xs:integer"/>
</xs:complexType>
Added: core/trunk/src/test/java/org/jboss/cache/api/mvcc/LockPerFqnTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/api/mvcc/LockPerFqnTest.java (rev 0)
+++ core/trunk/src/test/java/org/jboss/cache/api/mvcc/LockPerFqnTest.java 2009-03-18 11:36:28 UTC (rev 7909)
@@ -0,0 +1,105 @@
+package org.jboss.cache.api.mvcc;
+
+import org.jboss.cache.Cache;
+import org.jboss.cache.DefaultCacheFactory;
+import org.jboss.cache.config.Configuration;
+import org.jboss.cache.invocation.InvocationContextContainer;
+import org.jboss.cache.lock.LockManager;
+import org.jboss.cache.util.TestingUtil;
+import org.jboss.cache.util.concurrent.locks.PerElementLockContainer;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+@Test(groups = "functional", sequential = true, testName = "api.mvcc.LockPerFqnTest")
+public class LockPerFqnTest
+{
+ Cache cache;
+
+ @BeforeMethod
+ public void setUp()
+ {
+ Configuration cfg = new Configuration();
+ cfg.setUseLockStriping(false);
+ cfg.setNodeLockingScheme(Configuration.NodeLockingScheme.MVCC);
+ cache = new DefaultCacheFactory().createCache(cfg);
+ }
+
+ @AfterMethod
+ public void tearDown()
+ {
+ TestingUtil.killCaches(cache);
+ }
+
+ public void testLocksCleanedUp()
+ {
+ cache.put("/a/b/c", "k", "v");
+ cache.put("/a/b/d", "k", "v");
+ assertNoLocks();
+ }
+
+ public void testLocksConcurrency() throws Exception
+ {
+ final int NUM_THREADS = 10;
+ final CountDownLatch l = new CountDownLatch(1);
+ final int numLoops = 1000;
+ final List<Exception> exceptions = new LinkedList<Exception>();
+
+ Thread[] t = new Thread[NUM_THREADS];
+ for (int i=0; i<NUM_THREADS; i++) t[i] = new Thread()
+ {
+ public void run()
+ {
+ try
+ {
+ l.await();
+ }
+ catch (Exception e)
+ {
+ // ignore
+ }
+ for (int i=0; i<numLoops; i++)
+ {
+ try
+ {
+ switch (i % 2)
+ {
+ case 0:
+ cache.put("/a/fqn" + i, "k", "v");
+ break;
+ case 1:
+ cache.removeNode("/a/fqn" + i);
+ break;
+ }
+ }
+ catch (Exception e)
+ {
+ exceptions.add(e);
+ }
+ }
+ }
+ };
+
+ for (Thread th: t) th.start();
+ l.countDown();
+ for (Thread th: t) th.join();
+
+ if (!exceptions.isEmpty()) throw exceptions.get(0);
+ assertNoLocks();
+ }
+
+ private void assertNoLocks()
+ {
+ LockManager lm = TestingUtil.extractLockManager(cache);
+ LockAssert.assertNoLocks(
+ lm, TestingUtil.extractComponentRegistry(cache).getComponent(InvocationContextContainer.class)
+ );
+
+ PerElementLockContainer lc = (PerElementLockContainer) TestingUtil.extractField(lm, "lockContainer");
+ assert lc.size() == 0;
+ }
+}
15 years, 9 months
JBoss Cache SVN: r7908 - in core/trunk/src: main/java/org/jboss/cache/mvcc and 2 other directories.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2009-03-18 05:45:27 -0400 (Wed, 18 Mar 2009)
New Revision: 7908
Added:
core/trunk/src/main/java/org/jboss/cache/mvcc/NullMarkerNodeForRemoval.java
Modified:
core/trunk/src/main/java/org/jboss/cache/AbstractNodeFactory.java
core/trunk/src/main/java/org/jboss/cache/NodeFactory.java
core/trunk/src/main/java/org/jboss/cache/mvcc/MVCCNodeFactory.java
core/trunk/src/main/java/org/jboss/cache/mvcc/MVCCNodeHelper.java
core/trunk/src/main/java/org/jboss/cache/mvcc/NullMarkerNode.java
core/trunk/src/main/java/org/jboss/cache/mvcc/RepeatableReadNode.java
core/trunk/src/test/java/org/jboss/cache/api/CacheAPITest.java
core/trunk/src/test/java/org/jboss/cache/api/mvcc/repeatable_read/CacheAPIMVCCTest.java
Log:
JBCACHE-1493 - REPEATABLE_READ inconsistent with write skew disabled
Modified: core/trunk/src/main/java/org/jboss/cache/AbstractNodeFactory.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/AbstractNodeFactory.java 2009-03-18 08:43:29 UTC (rev 7907)
+++ core/trunk/src/main/java/org/jboss/cache/AbstractNodeFactory.java 2009-03-18 09:45:27 UTC (rev 7908)
@@ -125,6 +125,11 @@
throw new UnsupportedOperationException("Unsupported in this implementation (" + getClass().getSimpleName() + ")!");
}
+ public ReadCommittedNode createWrappedNodeForRemoval(Fqn fqn, InternalNode<K, V> node, InternalNode<K, V> parent)
+ {
+ throw new UnsupportedOperationException("Unsupported in this implementation (" + getClass().getSimpleName() + ")!");
+ }
+
public NodeSPI<K, V> createRootNode()
{
return createNode(Fqn.ROOT, null);
Modified: core/trunk/src/main/java/org/jboss/cache/NodeFactory.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/NodeFactory.java 2009-03-18 08:43:29 UTC (rev 7907)
+++ core/trunk/src/main/java/org/jboss/cache/NodeFactory.java 2009-03-18 09:45:27 UTC (rev 7908)
@@ -38,6 +38,8 @@
{
ReadCommittedNode createWrappedNode(InternalNode<K, V> node, InternalNode<K, V> parent);
+ ReadCommittedNode createWrappedNodeForRemoval(Fqn fqn, InternalNode<K, V> node, InternalNode<K, V> parent);
+
WorkspaceNode<K, V> createWrappedNode(NodeSPI<K, V> dataNode, TransactionWorkspace workspace);
/**
Modified: core/trunk/src/main/java/org/jboss/cache/mvcc/MVCCNodeFactory.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/mvcc/MVCCNodeFactory.java 2009-03-18 08:43:29 UTC (rev 7907)
+++ core/trunk/src/main/java/org/jboss/cache/mvcc/MVCCNodeFactory.java 2009-03-18 09:45:27 UTC (rev 7908)
@@ -43,7 +43,6 @@
public class MVCCNodeFactory<K, V> extends AbstractNodeFactory<K, V>
{
private boolean useRepeatableRead;
- private static final NullMarkerNode NULL_MARKER = new NullMarkerNode();
private static final Log log = LogFactory.getLog(MVCCNodeFactory.class);
private static final boolean trace = log.isTraceEnabled();
private boolean lockChildForInsertRemove;
@@ -68,11 +67,47 @@
* @return a ReadCommittedNode
*/
@Override
- @SuppressWarnings("unchecked")
public ReadCommittedNode createWrappedNode(InternalNode<K, V> node, InternalNode<K, V> parent)
{
- if (node == null) return useRepeatableRead ? NULL_MARKER : null;
- ReadCommittedNode rcn = useRepeatableRead ? new RepeatableReadNode(node, parent) : new ReadCommittedNode(node, parent);
+ return createWrappedNode(null, node, parent, false);
+ }
+
+ @Override
+ public ReadCommittedNode createWrappedNodeForRemoval(Fqn fqn, InternalNode<K, V> node, InternalNode<K, V> parent)
+ {
+ return createWrappedNode(fqn, node, parent, true);
+ }
+
+ @SuppressWarnings("unchecked")
+ private ReadCommittedNode createWrappedNode(Fqn fqn, InternalNode<K, V> node, InternalNode<K, V> parent, boolean forRemoval)
+ {
+ ReadCommittedNode rcn;
+
+ if (node == null)
+ {
+ if (useRepeatableRead)
+ {
+ if (forRemoval)
+ {
+ // create but do not return this just yet as it needs to be initialized
+ rcn = new NullMarkerNodeForRemoval(parent, fqn);
+ }
+ else
+ {
+ return NullMarkerNode.getInstance();
+ }
+ }
+ else
+ {
+ // if we are using read-committed, just return a null
+ return null;
+ }
+ }
+ else
+ {
+ rcn = useRepeatableRead ? new RepeatableReadNode(node, parent) : new ReadCommittedNode(node, parent);
+ }
+
rcn.initialize(configuration, invocationContextContainer, componentRegistry, interceptorChain);
rcn.injectDependencies(cache);
return rcn;
Modified: core/trunk/src/main/java/org/jboss/cache/mvcc/MVCCNodeHelper.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/mvcc/MVCCNodeHelper.java 2009-03-18 08:43:29 UTC (rev 7907)
+++ core/trunk/src/main/java/org/jboss/cache/mvcc/MVCCNodeHelper.java 2009-03-18 09:45:27 UTC (rev 7908)
@@ -85,10 +85,8 @@
* Attempts to provide the context with a set of wrapped nodes based on the Collection of fqns passed in. If the
* nodes already exist in the context then the node is not wrapped again.
* <p/>
- * {@link InternalNode}s are wrapped using {@link org.jboss.cache.NodeFactory#createWrappedNode(org.jboss.cache.InternalNode,
- * org.jboss.cache.InternalNode)} and as such, null internal nodes are treated according to isolation level used.
- * See {@link org.jboss.cache.NodeFactory#createWrappedNode(org.jboss.cache.InternalNode,
- * org.jboss.cache.InternalNode)} for details on this behaviour.
+ * {@link InternalNode}s are wrapped using {@link org.jboss.cache.NodeFactory#createWrappedNode(org.jboss.cache.InternalNode, org.jboss.cache.InternalNode)} and as such, null internal nodes are treated according to isolation level used.
+ * See {@link org.jboss.cache.NodeFactory#createWrappedNode(org.jboss.cache.InternalNode, org.jboss.cache.InternalNode)} for details on this behaviour.
* <p/>
* Note that if the context has the {@link org.jboss.cache.config.Option#isForceWriteLock()} option set, then write
* locks are acquired and the node is copied.
@@ -404,9 +402,9 @@
@SuppressWarnings("unchecked")
private ReadCommittedNode wrapAndPutInContext(InvocationContext ctx, Fqn fqn, boolean forUpdate) {
ReadCommittedNode node = (ReadCommittedNode) ctx.lookUpNode(fqn);
- if (node == null) {
+ if (node == null || node.isNullNode()) {
InternalNode[] nodes = dataContainer.peekInternalNodeAndDirectParent(fqn, false);
- node = nodeFactory.createWrappedNode(nodes[0], nodes[1]);
+ node = nodeFactory.createWrappedNodeForRemoval(fqn, nodes[0], nodes[1]);
ctx.putLookedUpNode(fqn, node);
}
Modified: core/trunk/src/main/java/org/jboss/cache/mvcc/NullMarkerNode.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/mvcc/NullMarkerNode.java 2009-03-18 08:43:29 UTC (rev 7907)
+++ core/trunk/src/main/java/org/jboss/cache/mvcc/NullMarkerNode.java 2009-03-18 09:45:27 UTC (rev 7908)
@@ -30,41 +30,21 @@
* @author Manik Surtani (<a href="mailto:manik AT jboss DOT org">manik AT jboss DOT org</a>)
* @since 3.0
*/
-public class NullMarkerNode extends RepeatableReadNode
+public class NullMarkerNode extends NullMarkerNodeForRemoval
{
- public NullMarkerNode()
+ private static final NullMarkerNode INSTANCE = new NullMarkerNode();
+
+ private NullMarkerNode()
{
super(null, null);
}
- /**
- * @return always returns true
- */
- @Override
- public boolean isNullNode()
+ public static NullMarkerNode getInstance()
{
- return true;
+ return INSTANCE;
}
/**
- * @return always returns true so that any get commands, upon getting this node, will ignore the node as though it were removed.
- */
- @Override
- public boolean isDeleted()
- {
- return true;
- }
-
- /**
- * @return always returns true so that any get commands, upon getting this node, will ignore the node as though it were invalid.
- */
- @Override
- public boolean isValid()
- {
- return false;
- }
-
- /**
* A no-op.
*/
@Override
Added: core/trunk/src/main/java/org/jboss/cache/mvcc/NullMarkerNodeForRemoval.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/mvcc/NullMarkerNodeForRemoval.java (rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/mvcc/NullMarkerNodeForRemoval.java 2009-03-18 09:45:27 UTC (rev 7908)
@@ -0,0 +1,90 @@
+package org.jboss.cache.mvcc;
+
+import org.jboss.cache.DataContainer;
+import org.jboss.cache.Fqn;
+import org.jboss.cache.InternalNode;
+import org.jboss.cache.InvocationContext;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A specific type of null marker node, used for removal of nodes that don't exist
+ *
+ * @author Manik Surtani
+ * @since 3.1.0
+ */
+public class NullMarkerNodeForRemoval extends RepeatableReadNode
+{
+ private Fqn fqn;
+
+ public NullMarkerNodeForRemoval(InternalNode parent, Fqn fqn)
+ {
+ super(null, parent);
+ this.fqn = fqn;
+ }
+
+ @Override
+ public Fqn getFqn()
+ {
+ return fqn;
+ }
+
+ /**
+ * @return always returns true
+ */
+ @Override
+ public boolean isNullNode()
+ {
+ return true;
+ }
+
+ /**
+ * @return always returns true so that any get commands, upon getting this node, will ignore the node as though it were invalid.
+ */
+ @Override
+ public boolean isValid()
+ {
+ return false;
+ }
+
+ /**
+ * @return always returns true so that any get commands, upon getting this node, will ignore the node as though it were removed.
+ */
+ @Override
+ public boolean isDeleted()
+ {
+ return true;
+ }
+
+ @Override
+ protected void updateNode(Fqn fqn, InvocationContext ctx, DataContainer dataContainer)
+ {
+ // no-op since the only updates that are allowed to happen here are the removal of the node, which only affects the parent.
+ }
+
+ @Override
+ public Map getDataDirect()
+ {
+ return Collections.emptyMap();
+ }
+
+ @Override
+ public Set getChildrenNamesDirect()
+ {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Set getChildrenDirect()
+ {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public void setValid(boolean valid, boolean recursive)
+ {
+ // no-op
+ }
+}
Modified: core/trunk/src/main/java/org/jboss/cache/mvcc/RepeatableReadNode.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/mvcc/RepeatableReadNode.java 2009-03-18 08:43:29 UTC (rev 7907)
+++ core/trunk/src/main/java/org/jboss/cache/mvcc/RepeatableReadNode.java 2009-03-18 09:45:27 UTC (rev 7908)
@@ -71,8 +71,13 @@
// make a backup copy
backup = node;
- node = backup.copy();
+ node = copyNode(backup);
}
+
+ private InternalNode copyNode(InternalNode nodeToCopy)
+ {
+ return nodeToCopy == null ? null : nodeToCopy.copy();
+ }
@Override
@SuppressWarnings("unchecked")
Modified: core/trunk/src/test/java/org/jboss/cache/api/CacheAPITest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/api/CacheAPITest.java 2009-03-18 08:43:29 UTC (rev 7907)
+++ core/trunk/src/test/java/org/jboss/cache/api/CacheAPITest.java 2009-03-18 09:45:27 UTC (rev 7908)
@@ -35,7 +35,7 @@
@Test(groups = {"functional", "pessimistic"}, sequential = true, testName = "api.CacheAPITest")
public class CacheAPITest extends AbstractSingleCacheTest
{
- private CacheSPI<String, String> cache;
+ protected CacheSPI<String, String> cache;
private List<String> events;
public CacheSPI createCache()
Modified: core/trunk/src/test/java/org/jboss/cache/api/mvcc/repeatable_read/CacheAPIMVCCTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/api/mvcc/repeatable_read/CacheAPIMVCCTest.java 2009-03-18 08:43:29 UTC (rev 7907)
+++ core/trunk/src/test/java/org/jboss/cache/api/mvcc/repeatable_read/CacheAPIMVCCTest.java 2009-03-18 09:45:27 UTC (rev 7908)
@@ -6,7 +6,10 @@
import org.jboss.cache.lock.IsolationLevel;
import org.testng.annotations.Test;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+
/**
* MVCC version of {@link org.jboss.cache.api.CacheAPITest}
*/
@@ -25,4 +28,18 @@
{
return NodeLockingScheme.MVCC;
}
+
+ public void testWriteSkewOnRemovalOfNullNode() throws Exception
+ {
+ TransactionManager tm = cache.getTransactionManager();
+ tm.begin();
+ cache.getNode("/a");
+ Transaction tx = tm.suspend();
+ cache.put("/a", "k", "v2");
+ assert cache.get("/a", "k").equals("v2");
+ tm.resume(tx);
+ cache.removeNode("/a");
+ tx.commit();
+ assert cache.getNode("/a") == null; // this fails
+ }
}
\ No newline at end of file
15 years, 9 months
JBoss Cache SVN: r7907 - in core/trunk/src: main/java/org/jboss/cache/invocation and 2 other directories.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2009-03-18 04:43:29 -0400 (Wed, 18 Mar 2009)
New Revision: 7907
Modified:
core/trunk/src/main/java/org/jboss/cache/Cache.java
core/trunk/src/main/java/org/jboss/cache/Node.java
core/trunk/src/main/java/org/jboss/cache/invocation/CacheInvocationDelegate.java
core/trunk/src/main/java/org/jboss/cache/invocation/NodeInvocationDelegate.java
core/trunk/src/test/java/org/jboss/cache/api/CacheAPITest.java
core/trunk/src/test/java/org/jboss/cache/api/NodeAPITest.java
core/trunk/src/test/java/org/jboss/cache/mock/NodeSpiMock.java
Log:
JBCACHE-1492 - Add a hasChildren() (or equivalent) method to the Node interface
Modified: core/trunk/src/main/java/org/jboss/cache/Cache.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/Cache.java 2009-03-16 20:38:11 UTC (rev 7906)
+++ core/trunk/src/main/java/org/jboss/cache/Cache.java 2009-03-18 08:43:29 UTC (rev 7907)
@@ -281,6 +281,20 @@
Set<String> getChildrenNames(String fqn);
/**
+ * Tests if a node is a leaf, i.e., doesn't have any children
+ * @param fqn fqn to test
+ * @return true if it is a leaf, false otherwise
+ */
+ boolean isLeaf(Fqn fqn);
+
+ /**
+ * Tests if a node is a leaf, i.e., doesn't have any children
+ * @param fqn fqn to test
+ * @return true if it is a leaf, false otherwise
+ */
+ boolean isLeaf(String fqn);
+
+ /**
* Convenience method that allows for direct access to the data in a {@link Node}.
*
* @param fqn <b><i>absolute</i></b> {@link Fqn} to the {@link Node} to be accessed.
Modified: core/trunk/src/main/java/org/jboss/cache/Node.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/Node.java 2009-03-16 20:38:11 UTC (rev 7906)
+++ core/trunk/src/main/java/org/jboss/cache/Node.java 2009-03-18 08:43:29 UTC (rev 7907)
@@ -341,4 +341,9 @@
* @param recursive if true, child nodes will have their object references released as well.
*/
void releaseObjectReferences(boolean recursive);
+
+ /**
+ * @return true if the current node is a leaf node (i.e., has no children), or false otherwise.
+ */
+ boolean isLeaf();
}
Modified: core/trunk/src/main/java/org/jboss/cache/invocation/CacheInvocationDelegate.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/invocation/CacheInvocationDelegate.java 2009-03-16 20:38:11 UTC (rev 7906)
+++ core/trunk/src/main/java/org/jboss/cache/invocation/CacheInvocationDelegate.java 2009-03-18 08:43:29 UTC (rev 7907)
@@ -621,9 +621,33 @@
batchContainer.endBatch(successful);
}
- @SuppressWarnings("unchecked")
+ public boolean isLeaf(String fqn) throws NodeNotExistsException
+ {
+ return isLeaf(Fqn.fromString(fqn));
+ }
+
+ public boolean isLeaf(Fqn fqn) throws NodeNotExistsException
+ {
+ Set<Object> names = getChildrenNamesInternal(fqn);
+ if (names == null) throw new NodeNotExistsException("Node " + fqn + " does not exist!");
+ return names.isEmpty();
+ }
+
+
public Set<Object> getChildrenNames(Fqn fqn)
{
+ Set<Object> names = getChildrenNamesInternal(fqn);
+ return names == null ? Collections.emptySet() : names;
+ }
+
+ /**
+ * Will return a null if the node doesnt exist!
+ * @param fqn to check
+ * @return set or null
+ */
+ @SuppressWarnings("unchecked")
+ private Set<Object> getChildrenNamesInternal(Fqn fqn)
+ {
InvocationContext ctx = invocationContextContainer.get();
cacheStatusCheck(ctx);
GetChildrenNamesCommand command = commandsFactory.buildGetChildrenNamesCommand(fqn);
@@ -643,11 +667,7 @@
{
retval = Immutables.immutableSetWrap(retval); // this is already copied in the command
}
- else
- {
- retval = Collections.emptySet();
- }
-
+
return retval;
}
Modified: core/trunk/src/main/java/org/jboss/cache/invocation/NodeInvocationDelegate.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/invocation/NodeInvocationDelegate.java 2009-03-16 20:38:11 UTC (rev 7906)
+++ core/trunk/src/main/java/org/jboss/cache/invocation/NodeInvocationDelegate.java 2009-03-18 08:43:29 UTC (rev 7907)
@@ -293,6 +293,12 @@
return spi.getChildrenNames(getFqn());
}
+ public boolean isLeaf()
+ {
+ assertValid();
+ return getChildrenNames().isEmpty();
+ }
+
public Map<K, V> getData()
{
assertValid();
Modified: core/trunk/src/test/java/org/jboss/cache/api/CacheAPITest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/api/CacheAPITest.java 2009-03-16 20:38:11 UTC (rev 7906)
+++ core/trunk/src/test/java/org/jboss/cache/api/CacheAPITest.java 2009-03-18 08:43:29 UTC (rev 7907)
@@ -1,6 +1,14 @@
package org.jboss.cache.api;
-import org.jboss.cache.*;
+import org.jboss.cache.AbstractSingleCacheTest;
+import org.jboss.cache.Cache;
+import org.jboss.cache.CacheSPI;
+import org.jboss.cache.Fqn;
+import org.jboss.cache.Node;
+import org.jboss.cache.NodeSPI;
+import org.jboss.cache.Region;
+import org.jboss.cache.UnitTestCacheFactory;
+import org.jboss.cache.NodeNotExistsException;
import org.jboss.cache.config.Configuration;
import org.jboss.cache.config.Configuration.NodeLockingScheme;
import org.jboss.cache.config.ConfigurationException;
@@ -8,11 +16,8 @@
import org.jboss.cache.notifications.annotation.NodeCreated;
import org.jboss.cache.notifications.event.Event;
import org.jboss.cache.transaction.GenericTransactionManagerLookup;
-import org.jboss.cache.util.CachePrinter;
-import org.jboss.cache.util.TestingUtil;
import static org.testng.AssertJUnit.*;
import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import javax.transaction.TransactionManager;
@@ -355,6 +360,32 @@
assert cache.peek(Fqn.fromString("/a"), true, true) == null;
}
+ public void testIsLeaf()
+ {
+ cache.put("/a/b/c", "k", "v");
+ cache.put("/a/d", "k", "v");
+
+ assert !cache.isLeaf(Fqn.ROOT);
+ assert !cache.isLeaf("/a");
+ assert !cache.isLeaf("/a/b");
+ assert cache.isLeaf("/a/d");
+ assert cache.isLeaf("/a/b/c");
+
+ cache.removeNode("/a/b");
+ cache.removeNode("/a/d");
+
+ assert cache.isLeaf("/a");
+ try
+ {
+ assert cache.isLeaf("/a/b");
+ assert false;
+ }
+ catch (NodeNotExistsException expected)
+ {
+ assert true;
+ }
+ }
+
public void testRpcManagerElements()
{
assertEquals("CacheMode.LOCAL cache has no address", null, cache.getLocalAddress());
Modified: core/trunk/src/test/java/org/jboss/cache/api/NodeAPITest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/api/NodeAPITest.java 2009-03-16 20:38:11 UTC (rev 7906)
+++ core/trunk/src/test/java/org/jboss/cache/api/NodeAPITest.java 2009-03-18 08:43:29 UTC (rev 7907)
@@ -461,4 +461,27 @@
assertNull(cache.get("/foo/1/2", "item"));
assertNull(cache.get("/foo/1", "item"));
}
+
+ public void testIsLeaf()
+ {
+ cache.put("/a/b/c", "k", "v");
+ cache.put("/a/d", "k", "v");
+
+ Node A = cache.getNode("/a");
+ Node B = cache.getNode("/a/b");
+ Node C = cache.getNode("/a/b/c");
+ Node D = cache.getNode("/a/d");
+ Node root = cache.getRoot();
+
+ assert !root.isLeaf();
+ assert !A.isLeaf();
+ assert !B.isLeaf();
+ assert C.isLeaf();
+ assert D.isLeaf();
+
+ cache.removeNode("/a/b");
+ cache.removeNode("/a/d");
+
+ assert A.isLeaf();
+ }
}
Modified: core/trunk/src/test/java/org/jboss/cache/mock/NodeSpiMock.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/mock/NodeSpiMock.java 2009-03-16 20:38:11 UTC (rev 7906)
+++ core/trunk/src/test/java/org/jboss/cache/mock/NodeSpiMock.java 2009-03-18 08:43:29 UTC (rev 7907)
@@ -485,4 +485,9 @@
public void releaseObjectReferences(boolean recursive)
{
}
+
+ public boolean isLeaf()
+ {
+ return false;
+ }
}
15 years, 9 months
JBoss Cache SVN: r7906 - in core/trunk/src: test/java/org/jboss/cache and 1 other directory.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2009-03-16 16:38:11 -0400 (Mon, 16 Mar 2009)
New Revision: 7906
Modified:
core/trunk/src/main/java/org/jboss/cache/DataContainerImpl.java
core/trunk/src/test/java/org/jboss/cache/DataContainerTest.java
Log:
Fixed data container test failure
Modified: core/trunk/src/main/java/org/jboss/cache/DataContainerImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/DataContainerImpl.java 2009-03-16 20:37:52 UTC (rev 7905)
+++ core/trunk/src/main/java/org/jboss/cache/DataContainerImpl.java 2009-03-16 20:38:11 UTC (rev 7906)
@@ -72,7 +72,7 @@
private BuddyFqnTransformer buddyFqnTransformer;
private Configuration config;
private boolean usingMvcc;
- private volatile boolean started = false;
+ volatile boolean started = false;
private static final InternalNode[] NULL_ARRAY = {null, null};
@Inject
Modified: core/trunk/src/test/java/org/jboss/cache/DataContainerTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/DataContainerTest.java 2009-03-16 20:37:52 UTC (rev 7905)
+++ core/trunk/src/test/java/org/jboss/cache/DataContainerTest.java 2009-03-16 20:38:11 UTC (rev 7906)
@@ -3,12 +3,12 @@
import org.jboss.cache.buddyreplication.BuddyFqnTransformer;
import org.jboss.cache.marshall.NodeData;
import org.jboss.cache.mock.MockNodesFixture;
+import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.util.ArrayList;
import java.util.List;
-import org.testng.annotations.AfterMethod;
/**
* Tests functionality from DataContainer.
@@ -180,7 +180,10 @@
*/
public void testGetNumberOfNodes()
{
- assert container.getNumberOfNodes() == 8 : "eoght nodes expected";
+ int i;
+ assert (i=container.getNumberOfNodes()) == -1 : "-1 nodes expected, was " + i;
+ container.started = true;
+ assert (i=container.getNumberOfNodes()) == 8 : "8 nodes expected, was " + i;
}
/**
15 years, 9 months
JBoss Cache SVN: r7905 - in core/trunk/src/main/java/org/jboss/cache: factories and 3 other directories.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2009-03-16 16:37:52 -0400 (Mon, 16 Mar 2009)
New Revision: 7905
Modified:
core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
core/trunk/src/main/java/org/jboss/cache/factories/ComponentRegistry.java
core/trunk/src/main/java/org/jboss/cache/jmx/JmxRegistrationManager.java
core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java
core/trunk/src/main/java/org/jboss/cache/statetransfer/StateProviderBusyException.java
Log:
Re-implemented NBST using RPC instead of a partial FLUSH
Modified: core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java 2009-03-16 20:37:26 UTC (rev 7904)
+++ core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java 2009-03-16 20:37:52 UTC (rev 7905)
@@ -63,7 +63,6 @@
import org.jgroups.blocks.GroupRequest;
import org.jgroups.blocks.RspFilter;
import org.jgroups.protocols.TP;
-import org.jgroups.protocols.pbcast.FLUSH;
import org.jgroups.protocols.pbcast.STREAMING_STATE_TRANSFER;
import org.jgroups.stack.ProtocolStack;
import org.jgroups.util.Rsp;
@@ -156,11 +155,12 @@
public abstract class FlushTracker
{
- private final ReclosableLatch flushBlockGate = new ReclosableLatch();
+ // closed whenever a FLUSH is in progress. Open by default.
+ final ReclosableLatch flushBlockGate = new ReclosableLatch(true);
private final AtomicInteger flushCompletionCount = new AtomicInteger();
+ // closed whenever a FLUSH is NOT in progress. Closed by default.
+ final ReclosableLatch flushWaitGate = new ReclosableLatch(false);
- private final ReclosableLatch flushWaitGate = new ReclosableLatch(false);
-
public void block()
{
flushBlockGate.close();
@@ -179,46 +179,27 @@
return flushCompletionCount.get();
}
- public abstract void lockProcessingLock();
+ public abstract void lockProcessingLock() throws InterruptedException;
+
public abstract void unlockProcessingLock();
- public abstract void lockSuspendProcessingLock();
+
+ public abstract void lockSuspendProcessingLock() throws InterruptedException;
+
public abstract void unlockSuspendProcessingLock();
- public void waitForFlushCompletion(long timeout)
+ public void waitForFlushCompletion(long timeout) throws InterruptedException
{
- for (; ;)
+ if (channel.flushSupported() && !flushBlockGate.await(timeout, TimeUnit.MILLISECONDS))
{
- try
- {
- if (channel.flushSupported() && !flushBlockGate.await(timeout, TimeUnit.MILLISECONDS))
- {
- throw new TimeoutException("State retrieval timed out waiting for flush to unblock. (timeout = " + CachePrinter.prettyPrint(timeout) + ")");
- }
- return;
- }
- catch (InterruptedException e)
- {
- Thread.currentThread().interrupt();
- }
+ throw new TimeoutException("State retrieval timed out waiting for flush to unblock. (timeout = " + CachePrinter.prettyPrint(timeout) + ")");
}
}
- public void waitForFlushStart(long timeout)
+ public void waitForFlushStart(long timeout) throws InterruptedException
{
- for (; ;)
+ if (channel.flushSupported() && !flushWaitGate.await(timeout, TimeUnit.MILLISECONDS))
{
- try
- {
- if (channel.flushSupported() && !flushWaitGate.await(timeout, TimeUnit.MILLISECONDS))
- {
- throw new TimeoutException("State retrieval timed out waiting for flush to block. (timeout = " + CachePrinter.prettyPrint(timeout) + " )");
- }
- return;
- }
- catch (InterruptedException e)
- {
- Thread.currentThread().interrupt();
- }
+ throw new TimeoutException("State retrieval timed out waiting for flush to block. (timeout = " + CachePrinter.prettyPrint(timeout) + " )");
}
}
}
@@ -247,21 +228,11 @@
{
private final ReentrantReadWriteLock coordinationLock = new ReentrantReadWriteLock();
- public void lockProcessingLock()
+ public void lockProcessingLock() throws InterruptedException
{
- for (;;)
+ if (!coordinationLock.readLock().tryLock(configuration.getStateRetrievalTimeout(), TimeUnit.MILLISECONDS))
{
- try
- {
- if (!coordinationLock.readLock().tryLock(configuration.getStateRetrievalTimeout(), TimeUnit.MILLISECONDS))
- throw new TimeoutException("Could not obtain processing lock");
-
- return;
- }
- catch (InterruptedException e)
- {
- Thread.currentThread().interrupt();
- }
+ throw new TimeoutException("Could not obtain processing lock");
}
}
@@ -270,30 +241,38 @@
coordinationLock.readLock().unlock();
}
- public void lockSuspendProcessingLock()
+ public void lockSuspendProcessingLock() throws InterruptedException
{
- for (;;)
+ if (!coordinationLock.writeLock().tryLock(configuration.getStateRetrievalTimeout(), TimeUnit.MILLISECONDS))
{
- try
- {
- if (!coordinationLock.writeLock().tryLock(configuration.getStateRetrievalTimeout(), TimeUnit.MILLISECONDS))
- throw new TimeoutException("Could not obtain processing lock");
-
- return;
- }
- catch (InterruptedException e)
- {
- Thread.currentThread().interrupt();
- }
+ throw new TimeoutException("Could not obtain processing lock");
}
}
public void unlockSuspendProcessingLock()
{
if (coordinationLock.isWriteLockedByCurrentThread())
+ {
coordinationLock.writeLock().unlock();
+ }
}
+ public void waitForFlushCompletion(long timeout) throws InterruptedException
+ {
+ if (!flushBlockGate.await(timeout, TimeUnit.MILLISECONDS))
+ {
+ throw new TimeoutException("State retrieval timed out waiting for flush to unblock. (timeout = " + CachePrinter.prettyPrint(timeout) + ")");
+ }
+ }
+
+ public void waitForFlushStart(long timeout) throws InterruptedException
+ {
+ if (!flushWaitGate.await(timeout, TimeUnit.MILLISECONDS))
+ {
+ throw new TimeoutException("State retrieval timed out waiting for flush to block. (timeout = " + CachePrinter.prettyPrint(timeout) + " )");
+ }
+ }
+
}
// ------------ START: Lifecycle methods ------------
@@ -332,7 +311,7 @@
// Allow commands to be ACKed during state transfer
if (nonBlocking)
{
- componentRegistry.setBlockInStarting(false);
+ componentRegistry.setStatusCheckNecessary(false);
}
channel.connect(configuration.getClusterName());
if (log.isInfoEnabled()) log.info("Cache local address is " + getLocalAddress());
@@ -376,17 +355,19 @@
}
}
- if (log.isInfoEnabled()) log.info("state was retrieved successfully (in " + CachePrinter.prettyPrint((System.currentTimeMillis() - start)) + ")");
+ if (log.isInfoEnabled())
+ {
+ log.info("state was retrieved successfully (in " + CachePrinter.prettyPrint((System.currentTimeMillis() - start)) + ")");
+ }
}
}
private void sanityCheckJGroupsStack(JChannel channel)
{
if (channel.getProtocolStack().findProtocol(STREAMING_STATE_TRANSFER.class) == null)
+ {
throw new ConfigurationException("JGroups channel does not use STREAMING_STATE_TRANSFER! This is a requirement for non-blocking state transfer. Either make sure your JGroups configuration uses STREAMING_STATE_TRANSFER or disable non-blocking state transfer.");
-
- if (channel.getProtocolStack().findProtocol(FLUSH.class) == null)
- throw new ConfigurationException("JGroups channel does not use FLUSH! This is a requirement for non-blocking state transfer. Either make sure your JGroups configuration uses FLUSH or disable non-blocking state transfer.");
+ }
}
private void sanityCheckConfiguration(boolean nonBlockingStateTransfer, boolean fetchStateOnStart)
@@ -394,12 +375,16 @@
if (isInLocalMode || !nonBlockingStateTransfer || !fetchStateOnStart) return; // don't care about these cases!
if (configuration.getNodeLockingScheme() != NodeLockingScheme.MVCC)
+ {
throw new ConfigurationException("Non-blocking state transfer is only supported with the MVCC node locking scheme. Please change your node locking scheme to MVCC or disable non-blocking state transfer.");
+ }
if (isUsingBuddyReplication)
+ {
throw new ConfigurationException("Non-blocking state transfer cannot be used with buddy replication at this time. Please disable either buddy replication or non-blocking state transfer.");
+ }
}
-
+
private void startNonBlockStateTransfer(List<Address> members)
{
if (members.size() < 2)
@@ -440,7 +425,10 @@
if (!success)
{
wait <<= 2;
- if (log.isWarnEnabled()) log.warn("Could not find available peer for state, backing off and retrying after "+wait+" millis. Retries left: " + (numRetries -1 -i) );
+ if (log.isWarnEnabled())
+ {
+ log.warn("Could not find available peer for state, backing off and retrying after " + wait + " millis. Retries left: " + (numRetries - 1 - i));
+ }
try
{
@@ -460,7 +448,7 @@
throw new CacheException("Unable to fetch state on startup");
}
- componentRegistry.setBlockInStarting(true);
+ componentRegistry.setStatusCheckNecessary(true);
}
public void disconnect()
@@ -522,7 +510,10 @@
{
ReflectionUtil.setValue(configuration, "accessible", true);
configuration.setUsingMultiplexer(true);
- if (log.isDebugEnabled()) log.debug("Created Multiplexer Channel for cache cluster " + configuration.getClusterName() + " using stack " + configuration.getMultiplexerStack());
+ if (log.isDebugEnabled())
+ {
+ log.debug("Created Multiplexer Channel for cache cluster " + configuration.getClusterName() + " using stack " + configuration.getMultiplexerStack());
+ }
}
else
{
@@ -697,8 +688,14 @@
if (rpcDispatcher == null) return null;
int modeToUse = mode;
int preferredMode;
- if ((preferredMode = spi.getInvocationContext().getOptionOverrides().getGroupRequestMode()) > -1) modeToUse = preferredMode;
- if (trace) log.trace("callRemoteMethods(): valid members are " + recipients + " methods: " + command + " Using OOB? " + useOutOfBandMessage + " modeToUse: " + modeToUse);
+ if ((preferredMode = spi.getInvocationContext().getOptionOverrides().getGroupRequestMode()) > -1)
+ {
+ modeToUse = preferredMode;
+ }
+ if (trace)
+ {
+ log.trace("callRemoteMethods(): valid members are " + recipients + " methods: " + command + " Using OOB? " + useOutOfBandMessage + " modeToUse: " + modeToUse);
+ }
flushTracker.lockProcessingLock();
unlock = true;
@@ -707,7 +704,10 @@
useOutOfBandMessage = false;
RspList rsps = rpcDispatcher.invokeRemoteCommands(recipients, command, modeToUse, timeout, isUsingBuddyReplication, useOutOfBandMessage, responseFilter);
if (mode == GroupRequest.GET_NONE) return Collections.emptyList();// async case
- if (trace) log.trace("(" + getLocalAddress() + "): responses for method " + command.getClass().getSimpleName() + ":\n" + rsps);
+ if (trace)
+ {
+ log.trace("(" + getLocalAddress() + "): responses for method " + command.getClass().getSimpleName() + ":\n" + rsps);
+ }
// short-circuit no-return-value calls.
if (rsps == null) return Collections.emptyList();
List<Object> retval = new ArrayList<Object>(rsps.size());
@@ -751,7 +751,9 @@
{
computeStats(success);
if (unlock)
+ {
flushTracker.unlockProcessingLock();
+ }
}
}
@@ -779,7 +781,10 @@
// should this really be throwing an exception? Are there valid use cases where partial state may not be available? - Manik
// Yes -- cache is configured LOCAL but app doesn't know it -- Brian
//throw new IllegalArgumentException("Cannot fetch partial state, targets are " + sources + " and stateId is " + stateId);
- if (log.isWarnEnabled()) log.warn("Cannot fetch partial state, targets are " + sources + " and stateId is " + stateId);
+ if (log.isWarnEnabled())
+ {
+ log.warn("Cannot fetch partial state, targets are " + sources + " and stateId is " + stateId);
+ }
return;
}
@@ -796,13 +801,19 @@
return;
}
- if (log.isDebugEnabled()) log.debug("Node " + getLocalAddress() + " fetching partial state " + stateId + " from members " + targets);
+ if (log.isDebugEnabled())
+ {
+ log.debug("Node " + getLocalAddress() + " fetching partial state " + stateId + " from members " + targets);
+ }
boolean successfulTransfer = false;
for (Address target : targets)
{
try
{
- if (log.isDebugEnabled()) log.debug("Node " + getLocalAddress() + " fetching partial state " + stateId + " from member " + target);
+ if (log.isDebugEnabled())
+ {
+ log.debug("Node " + getLocalAddress() + " fetching partial state " + stateId + " from member " + target);
+ }
messageListener.setStateSet(false);
successfulTransfer = getState(stateId, target);
if (successfulTransfer)
@@ -817,17 +828,26 @@
successfulTransfer = false;
}
}
- if (log.isDebugEnabled()) log.debug("Node " + getLocalAddress() + " fetching partial state " + stateId + " from member " + target + (successfulTransfer ? " successful" : " failed"));
+ if (log.isDebugEnabled())
+ {
+ log.debug("Node " + getLocalAddress() + " fetching partial state " + stateId + " from member " + target + (successfulTransfer ? " successful" : " failed"));
+ }
if (successfulTransfer) break;
}
catch (IllegalStateException ise)
{
// thrown by the JGroups channel if state retrieval fails.
- if (log.isInfoEnabled()) log.info("Channel problems fetching state. Continuing on to next provider. ", ise);
+ if (log.isInfoEnabled())
+ {
+ log.info("Channel problems fetching state. Continuing on to next provider. ", ise);
+ }
}
}
- if (!successfulTransfer && log.isDebugEnabled()) log.debug("Node " + getLocalAddress() + " could not fetch partial state " + stateId + " from any member " + targets);
+ if (!successfulTransfer && log.isDebugEnabled())
+ {
+ log.debug("Node " + getLocalAddress() + " could not fetch partial state " + stateId + " from any member " + targets);
+ }
}
private boolean getState(String stateId, Address target) throws ChannelNotConnectedException, ChannelClosedException
@@ -989,21 +1009,24 @@
*/
public void block()
{
- try
+ if (!configuration.isNonBlockingStateTransfer())
{
- if (log.isDebugEnabled()) log.debug("Block received at " + getLocalAddress());
+ try
+ {
+ if (log.isDebugEnabled()) log.debug("Block received at " + getLocalAddress());
- flushTracker.block();
- notifier.notifyCacheBlocked(true);
- notifier.notifyCacheBlocked(false);
+ flushTracker.block();
+ notifier.notifyCacheBlocked(true);
+ notifier.notifyCacheBlocked(false);
- if (log.isDebugEnabled()) log.debug("Block processed at " + getLocalAddress());
+ if (log.isDebugEnabled()) log.debug("Block processed at " + getLocalAddress());
+ }
+ catch (Throwable e)
+ {
+ //do not rethrow! jgroups might behave funny, resulting even in deadlock
+ log.error("Error found while processing block()", e);
+ }
}
- catch (Throwable e)
- {
- //do not rethrow! jgroups might behave funny, resulting even in deadlock
- log.error("Error found while processing block()", e);
- }
}
/**
@@ -1011,23 +1034,25 @@
*/
public void unblock()
{
- try
+ if (!configuration.isNonBlockingStateTransfer())
{
- if (log.isDebugEnabled()) log.debug("UnBlock received at " + getLocalAddress());
+ try
+ {
+ if (log.isDebugEnabled()) log.debug("UnBlock received at " + getLocalAddress());
- notifier.notifyCacheUnblocked(true);
- notifier.notifyCacheUnblocked(false);
- flushTracker.unblock();
+ notifier.notifyCacheUnblocked(true);
+ notifier.notifyCacheUnblocked(false);
+ flushTracker.unblock();
- if (log.isDebugEnabled()) log.debug("UnBlock processed at " + getLocalAddress());
+ if (log.isDebugEnabled()) log.debug("UnBlock processed at " + getLocalAddress());
+ }
+ catch (Throwable e)
+ {
+ //do not rethrow! jgroups might behave funny, resulting even in deadlock
+ log.error("Error found while processing unblock", e);
+ }
}
- catch (Throwable e)
- {
- //do not rethrow! jgroups might behave funny, resulting even in deadlock
- log.error("Error found while processing unblock", e);
- }
}
-
}
//jmx operations
Modified: core/trunk/src/main/java/org/jboss/cache/factories/ComponentRegistry.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/factories/ComponentRegistry.java 2009-03-16 20:37:26 UTC (rev 7904)
+++ core/trunk/src/main/java/org/jboss/cache/factories/ComponentRegistry.java 2009-03-16 20:37:52 UTC (rev 7905)
@@ -105,7 +105,7 @@
*/
private boolean invokedFromShutdownHook;
- private volatile boolean blockInStarting = true;
+ private volatile boolean statusCheckNecessary = true;
/**
* Creates an instance of the component registry. The configuration passed in is automatically registered.
@@ -886,23 +886,26 @@
if (trace) log.trace("Is remotely originating.");
// else if this is a remote call and the status is STARTING, wait until the cache starts.
- if (state == CacheStatus.STARTING && blockInStarting)
+ if (statusCheckNecessary)
{
- if (trace) log.trace("Cache is starting; block.");
- try
+ if (state == CacheStatus.STARTING)
{
- blockUntilCacheStarts();
- return true;
+ if (trace) log.trace("Cache is starting; block.");
+ try
+ {
+ blockUntilCacheStarts();
+ return true;
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
}
- catch (InterruptedException e)
+ else
{
- Thread.currentThread().interrupt();
+ log.warn("Received a remote call but the cache is not in STARTED state - ignoring call.");
}
}
- else if (blockInStarting)
- {
- log.warn("Received a remote call but the cache is not in STARTED state - ignoring call.");
- }
return false;
}
@@ -1022,14 +1025,14 @@
/**
* Returns an immutable set contating all the components that exists in the reporsitory at this moment.
*/
- public Set<Component> getRegiteredComponents()
+ public Set<Component> getRegisteredComponents()
{
HashSet<Component> defensiveCopy = new HashSet<Component>(componentLookup.values());
return Collections.unmodifiableSet(defensiveCopy);
}
- public void setBlockInStarting(boolean blockInStarting)
+ public void setStatusCheckNecessary(boolean statusCheckNecessary)
{
- this.blockInStarting = blockInStarting;
+ this.statusCheckNecessary = statusCheckNecessary;
}
}
Modified: core/trunk/src/main/java/org/jboss/cache/jmx/JmxRegistrationManager.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/jmx/JmxRegistrationManager.java 2009-03-16 20:37:26 UTC (rev 7904)
+++ core/trunk/src/main/java/org/jboss/cache/jmx/JmxRegistrationManager.java 2009-03-16 20:37:52 UTC (rev 7905)
@@ -196,7 +196,7 @@
private List<ResourceDMBean> getResourceDMBeans()
{
List<ResourceDMBean> resourceDMBeans = new ArrayList<ResourceDMBean>();
- for (ComponentRegistry.Component component : cacheSpi.getComponentRegistry().getRegiteredComponents())
+ for (ComponentRegistry.Component component : cacheSpi.getComponentRegistry().getRegisteredComponents())
{
ResourceDMBean resourceDMBean = new ResourceDMBean(component.getInstance());
if (resourceDMBean.isManagedResource())
Modified: core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java 2009-03-16 20:37:26 UTC (rev 7904)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java 2009-03-16 20:37:52 UTC (rev 7905)
@@ -21,17 +21,6 @@
*/
package org.jboss.cache.marshall;
-import java.io.NotSerializableException;
-import java.util.Map;
-import java.util.Vector;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
import org.jboss.cache.InvocationContext;
import org.jboss.cache.RPCManager;
import org.jboss.cache.RPCManagerImpl.FlushTracker;
@@ -40,6 +29,7 @@
import org.jboss.cache.commands.remote.AnnounceBuddyPoolNameCommand;
import org.jboss.cache.commands.remote.AssignToBuddyGroupCommand;
import org.jboss.cache.commands.remote.RemoveFromBuddyGroupCommand;
+import org.jboss.cache.commands.remote.StateTransferControlCommand;
import org.jboss.cache.config.Configuration;
import org.jboss.cache.factories.ComponentRegistry;
import org.jboss.cache.interceptors.InterceptorChain;
@@ -58,6 +48,17 @@
import org.jgroups.util.Rsp;
import org.jgroups.util.RspList;
+import java.io.NotSerializableException;
+import java.util.Map;
+import java.util.Vector;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
/**
* A JGroups RPC dispatcher that knows how to deal with {@link org.jboss.cache.commands.ReplicableCommand}s.
*
@@ -267,7 +268,7 @@
boolean replayIgnored = false;
- if (configuration.isNonBlockingStateTransfer())
+ if (configuration.isNonBlockingStateTransfer() && !(cmd instanceof StateTransferControlCommand))
{
int flushCount = flushTracker.getFlushCompletionCount();
flushTracker.lockProcessingLock();
@@ -297,10 +298,7 @@
if (trace) log.trace("This is a non-visitable command - so performing directly and not via the invoker.");
// need to check cache status for all except buddy replication commands.
- if (!(cmd instanceof AnnounceBuddyPoolNameCommand ||
- cmd instanceof AssignToBuddyGroupCommand ||
- cmd instanceof RemoveFromBuddyGroupCommand)
- && !componentRegistry.invocationsAllowed(false))
+ if (requiresRunningCache(cmd) && !componentRegistry.invocationsAllowed(false))
{
return new RequestIgnoredResponse();
}
@@ -326,6 +324,14 @@
}
}
+ private boolean requiresRunningCache(ReplicableCommand cmd)
+ {
+ return !(cmd instanceof AnnounceBuddyPoolNameCommand ||
+ cmd instanceof AssignToBuddyGroupCommand ||
+ cmd instanceof RemoveFromBuddyGroupCommand ||
+ cmd instanceof StateTransferControlCommand);
+ }
+
@Override
public String toString()
{
Modified: core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java 2009-03-16 20:37:26 UTC (rev 7904)
+++ core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java 2009-03-16 20:37:52 UTC (rev 7905)
@@ -32,7 +32,9 @@
import org.jboss.cache.NodeSPI;
import org.jboss.cache.RPCManager;
import org.jboss.cache.buddyreplication.BuddyManager;
+import org.jboss.cache.commands.CommandsFactory;
import org.jboss.cache.commands.WriteCommand;
+import org.jboss.cache.commands.remote.StateTransferControlCommand;
import org.jboss.cache.commands.tx.PrepareCommand;
import org.jboss.cache.config.Configuration;
import org.jboss.cache.factories.ComponentRegistry;
@@ -49,17 +51,16 @@
import org.jboss.cache.transaction.TransactionLog;
import org.jboss.cache.transaction.TransactionLog.LogEntry;
import org.jgroups.Address;
-import org.jgroups.Channel;
import java.io.IOException;
import java.io.ObjectInputStream;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.Vector;
public class DefaultStateTransferIntegrator implements StateTransferIntegrator
{
@@ -71,25 +72,29 @@
private Set<Fqn> internalFqns;
private Configuration cfg;
- private RPCManager manager;
+ private RPCManager rpcManager;
private TransactionLog txLog;
private boolean needToPersistState; // for JBCACHE-131
private boolean nonBlocking;
private InvocationContextContainer container;
private InterceptorChain chain;
private ComponentRegistry registry;
+ private CommandsFactory commandsFactory;
@Inject
- public void inject(CacheSPI<?, ?> cache, Configuration cfg, RPCManager rpcManager, TransactionLog txLog, InvocationContextContainer container, InterceptorChain chain, ComponentRegistry registry)
+ public void inject(CacheSPI<?, ?> cache, Configuration cfg, RPCManager rpcManager, TransactionLog txLog,
+ InvocationContextContainer container, InterceptorChain chain, ComponentRegistry registry,
+ CommandsFactory commandsFactory)
{
this.cache = cache;
this.cfg = cfg;
- this.manager = rpcManager;
+ this.rpcManager = rpcManager;
this.nonBlocking = cfg.isNonBlockingStateTransfer();
this.txLog = txLog;
this.container = container;
this.chain = chain;
this.registry = registry;
+ this.commandsFactory = commandsFactory;
}
@Start(priority = 14)
@@ -120,47 +125,21 @@
integrateTxLog(ois);
}
- private void doPartialFlush(Channel channel, List<Address> members)
+ /**
+ * Mimics a partial flush between the current instance and the address to flush, by opening and closing the necessary
+ * latches on both ends.
+ * @param addressToFlush address to flush in addition to the current address
+ * @param block if true, mimics setting a flush. Otherwise, mimics un-setting a flush.
+ * @throws Exception if there are issues
+ */
+ private void mimicPartialFlushViaRPC(Address addressToFlush, boolean block) throws Exception
{
- int retries = 5;
- int sleepBetweenRetries = 250;
- int sleepIncreaseFactor = 2;
- if (trace) log.trace("Attempting a partial flush on members " + members + " with up to " + retries + " retries.");
-
- boolean success = false;
- int i;
- for (i=1; i<=retries; i++)
- {
- if (trace) log.trace("Attempt number " + i);
- try
- {
- if (success = channel.startFlush(members, false)) break;
- if (trace) log.trace("Channel.startFlush() returned false!");
- }
- catch (Exception e)
- {
- if (trace) log.trace("Caught exception attempting a partial flush", e);
- }
- try
- {
- if (trace) log.trace("Partial state transfer failed. Backing off for " + sleepBetweenRetries + " millis and retrying");
- Thread.sleep(sleepBetweenRetries);
- sleepBetweenRetries *= sleepIncreaseFactor;
- }
- catch (InterruptedException ie)
- {
- Thread.currentThread().interrupt();
- }
- }
-
- if (success)
- {
- if (log.isDebugEnabled()) log.debug("Partial flush between " + members + " succeeded!");
- }
- else
- {
- throw new CacheException("Could initiate partial flush between " +members+ "! State-transfer failed!");
- }
+ StateTransferControlCommand cmd = commandsFactory.buildStateTransferControlCommand(block);
+ Vector<Address> recipient = new Vector<Address>();
+ recipient.add(addressToFlush);
+ if (!block) rpcManager.getFlushTracker().unblock();
+ rpcManager.callRemoteMethods(recipient, cmd, true, cfg.getStateRetrievalTimeout(), true);
+ if (block) rpcManager.getFlushTracker().block();
}
private void integrateTxLog(ObjectInputStream ois) throws Exception
@@ -170,11 +149,8 @@
processCommitLog(ois);
- Channel channel = manager.getChannel();
+ mimicPartialFlushViaRPC(rpcManager.getLastStateTransferSource(), true);
- List<Address> targets = Arrays.asList(channel.getLocalAddress(), manager.getLastStateTransferSource());
- doPartialFlush(channel, targets);
-
try
{
if (trace)
@@ -201,12 +177,13 @@
// Block all remote commands once transfer is complete,
// and before FLUSH completes
- registry.setBlockInStarting(true);
+ registry.setStatusCheckNecessary(true);
}
finally
{
if (trace) log.trace("Stopping partial flush");
- channel.stopFlush(targets);
+// channel.stopFlush(targets);
+ mimicPartialFlushViaRPC(rpcManager.getLastStateTransferSource(), false);
}
}
Modified: core/trunk/src/main/java/org/jboss/cache/statetransfer/StateProviderBusyException.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/statetransfer/StateProviderBusyException.java 2009-03-16 20:37:26 UTC (rev 7904)
+++ core/trunk/src/main/java/org/jboss/cache/statetransfer/StateProviderBusyException.java 2009-03-16 20:37:52 UTC (rev 7905)
@@ -1,12 +1,14 @@
package org.jboss.cache.statetransfer;
+import org.jboss.cache.CacheException;
+
/**
* Thrown when a state provider is busy
*
* @author Manik Surtani
* @since 3.1
*/
-public class StateProviderBusyException extends Exception
+public class StateProviderBusyException extends CacheException
{
public StateProviderBusyException()
{
15 years, 9 months
JBoss Cache SVN: r7904 - in core/trunk/src/main/java/org/jboss/cache: commands/remote and 3 other directories.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2009-03-16 16:37:26 -0400 (Mon, 16 Mar 2009)
New Revision: 7904
Added:
core/trunk/src/main/java/org/jboss/cache/commands/remote/StateTransferControlCommand.java
Modified:
core/trunk/src/main/java/org/jboss/cache/commands/CommandsFactory.java
core/trunk/src/main/java/org/jboss/cache/commands/CommandsFactoryImpl.java
core/trunk/src/main/java/org/jboss/cache/interceptors/InterceptorChain.java
core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java
core/trunk/src/main/java/org/jboss/cache/util/concurrent/ReclosableLatch.java
Log:
Basic refactorings
Modified: core/trunk/src/main/java/org/jboss/cache/commands/CommandsFactory.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/commands/CommandsFactory.java 2009-03-12 09:38:02 UTC (rev 7903)
+++ core/trunk/src/main/java/org/jboss/cache/commands/CommandsFactory.java 2009-03-16 20:37:26 UTC (rev 7904)
@@ -37,6 +37,7 @@
import org.jboss.cache.commands.remote.DataGravitationCleanupCommand;
import org.jboss.cache.commands.remote.RemoveFromBuddyGroupCommand;
import org.jboss.cache.commands.remote.ReplicateCommand;
+import org.jboss.cache.commands.remote.StateTransferControlCommand;
import org.jboss.cache.commands.tx.CommitCommand;
import org.jboss.cache.commands.tx.OptimisticPrepareCommand;
import org.jboss.cache.commands.tx.PrepareCommand;
@@ -139,4 +140,6 @@
* @return a newly constructed cache command
*/
ReplicableCommand fromStream(int id, Object[] parameters);
+
+ StateTransferControlCommand buildStateTransferControlCommand(boolean b);
}
Modified: core/trunk/src/main/java/org/jboss/cache/commands/CommandsFactoryImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/commands/CommandsFactoryImpl.java 2009-03-12 09:38:02 UTC (rev 7903)
+++ core/trunk/src/main/java/org/jboss/cache/commands/CommandsFactoryImpl.java 2009-03-16 20:37:26 UTC (rev 7904)
@@ -43,6 +43,7 @@
import org.jboss.cache.commands.remote.DataGravitationCleanupCommand;
import org.jboss.cache.commands.remote.RemoveFromBuddyGroupCommand;
import org.jboss.cache.commands.remote.ReplicateCommand;
+import org.jboss.cache.commands.remote.StateTransferControlCommand;
import org.jboss.cache.commands.tx.CommitCommand;
import org.jboss.cache.commands.tx.OptimisticPrepareCommand;
import org.jboss.cache.commands.tx.PrepareCommand;
@@ -102,6 +103,11 @@
this.buddyFqnTransformer = buddyFqnTransformer;
}
+ public StateTransferControlCommand buildStateTransferControlCommand(boolean enabled)
+ {
+ return new StateTransferControlCommand(enabled);
+ }
+
public PutDataMapCommand buildPutDataMapCommand(GlobalTransaction gtx, Fqn fqn, Map data)
{
PutDataMapCommand cmd = new PutDataMapCommand(gtx, fqn, data);
@@ -489,6 +495,13 @@
command = returnValue;
break;
}
+ case StateTransferControlCommand.METHOD_ID:
+ {
+ StateTransferControlCommand cmd = new StateTransferControlCommand();
+ cmd.init(rpcManager);
+ command = cmd;
+ break;
+ }
default:
throw new CacheException("Unknown command id " + id + "!");
}
Added: core/trunk/src/main/java/org/jboss/cache/commands/remote/StateTransferControlCommand.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/commands/remote/StateTransferControlCommand.java (rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/commands/remote/StateTransferControlCommand.java 2009-03-16 20:37:26 UTC (rev 7904)
@@ -0,0 +1,63 @@
+package org.jboss.cache.commands.remote;
+
+import org.jboss.cache.InvocationContext;
+import org.jboss.cache.RPCManager;
+import org.jboss.cache.commands.ReplicableCommand;
+
+/**
+ * A control command for communication between peers for non-blocking state transfer
+ *
+ * @author Manik Surtani
+ */
+public class StateTransferControlCommand implements ReplicableCommand
+{
+ public static final int METHOD_ID = 49;
+ RPCManager rpcManager;
+ boolean enabled;
+
+ public StateTransferControlCommand()
+ {
+ }
+
+ public StateTransferControlCommand(boolean enabled)
+ {
+ this.enabled = enabled;
+ }
+
+ public void init(RPCManager rpcManager)
+ {
+ this.rpcManager = rpcManager;
+ }
+
+ public Object perform(InvocationContext ctx) throws Throwable
+ {
+ if (enabled)
+ rpcManager.getFlushTracker().block();
+ else
+ rpcManager.getFlushTracker().unblock();
+ return null;
+ }
+
+ public int getCommandId()
+ {
+ return METHOD_ID;
+ }
+
+ public Object[] getParameters()
+ {
+ return new Object[]{enabled};
+ }
+
+ public void setParameters(int commandId, Object[] parameters)
+ {
+ enabled = (Boolean) parameters[0];
+ }
+
+ @Override
+ public String toString()
+ {
+ return "StateTransferControlCommand{" +
+ "enabled=" + enabled +
+ '}';
+ }
+}
Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/InterceptorChain.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/InterceptorChain.java 2009-03-12 09:38:02 UTC (rev 7903)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/InterceptorChain.java 2009-03-16 20:37:26 UTC (rev 7904)
@@ -286,6 +286,11 @@
{
return command.acceptVisitor(ctx, firstInChain);
}
+ catch (InterruptedException ie)
+ {
+ Thread.currentThread().interrupt();
+ return null;
+ }
catch (CacheException e)
{
throw e;
Modified: core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java 2009-03-12 09:38:02 UTC (rev 7903)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java 2009-03-16 20:37:26 UTC (rev 7904)
@@ -40,6 +40,7 @@
import org.jboss.cache.commands.remote.DataGravitationCleanupCommand;
import org.jboss.cache.commands.remote.RemoveFromBuddyGroupCommand;
import org.jboss.cache.commands.remote.ReplicateCommand;
+import org.jboss.cache.commands.remote.StateTransferControlCommand;
import org.jboss.cache.commands.tx.AbstractTransactionCommand;
import org.jboss.cache.commands.tx.CommitCommand;
import org.jboss.cache.commands.tx.OptimisticPrepareCommand;
@@ -189,6 +190,7 @@
case AnnounceBuddyPoolNameCommand.METHOD_ID:
case AssignToBuddyGroupCommand.METHOD_ID:
case RemoveFromBuddyGroupCommand.METHOD_ID:
+ case StateTransferControlCommand.METHOD_ID:
break;
// possible when we have a replication queue.
Modified: core/trunk/src/main/java/org/jboss/cache/util/concurrent/ReclosableLatch.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/util/concurrent/ReclosableLatch.java 2009-03-12 09:38:02 UTC (rev 7903)
+++ core/trunk/src/main/java/org/jboss/cache/util/concurrent/ReclosableLatch.java 2009-03-16 20:37:26 UTC (rev 7904)
@@ -86,4 +86,9 @@
{
return tryAcquireSharedNanos(1, unit.toNanos(time)); // the 1 is a dummy value that is not used.
}
+
+ public boolean isOpen()
+ {
+ return getState() == OPEN_STATE;
+ }
}
15 years, 9 months
JBoss Cache SVN: r7903 - core/trunk/src/main/docbook/userguide/en/modules.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2009-03-12 05:38:02 -0400 (Thu, 12 Mar 2009)
New Revision: 7903
Modified:
core/trunk/src/main/docbook/userguide/en/modules/configuration_reference.xml
Log:
Incorrect default value
Modified: core/trunk/src/main/docbook/userguide/en/modules/configuration_reference.xml
===================================================================
--- core/trunk/src/main/docbook/userguide/en/modules/configuration_reference.xml 2009-03-11 17:37:39 UTC (rev 7902)
+++ core/trunk/src/main/docbook/userguide/en/modules/configuration_reference.xml 2009-03-12 09:38:02 UTC (rev 7903)
@@ -471,7 +471,7 @@
<entry><emphasis role="bold">writeSkewCheck</emphasis></entry>
<entry>writeSkewCheck</entry>
<entry>true, false</entry>
- <entry>true</entry>
+ <entry>false</entry>
<entry>Specifies whether to check for write skews. Only used if <literal>nodeLockingScheme</literal>
is <literal>mvcc</literal> and <literal>isolationLevel</literal> is <literal>REPEATABLE_READ</literal>.
15 years, 9 months
JBoss Cache SVN: r7902 - in core/trunk/src/main/java/org/jboss/cache: config and 1 other directory.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2009-03-11 13:37:39 -0400 (Wed, 11 Mar 2009)
New Revision: 7902
Modified:
core/trunk/src/main/java/org/jboss/cache/RegionManagerImpl.java
core/trunk/src/main/java/org/jboss/cache/config/EvictionRegionConfig.java
Log:
JBCACHE-1489 - Javadocs not clear on how to create basic local mode caches
Modified: core/trunk/src/main/java/org/jboss/cache/RegionManagerImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/RegionManagerImpl.java 2009-03-11 17:29:26 UTC (rev 7901)
+++ core/trunk/src/main/java/org/jboss/cache/RegionManagerImpl.java 2009-03-11 17:37:39 UTC (rev 7902)
@@ -695,6 +695,7 @@
for (EvictionRegionConfig erc : ercs)
{
Fqn fqn = erc.getRegionFqn();
+ if (fqn == null) throw new ConfigurationException("Regions cannot be configured with a null region fqn. If you configured this region programmatically, ensure that you set the region fqn in EvictionRegionConfig");
if (trace) log.trace("Creating eviction region " + fqn);
if (fqn.equals(DEFAULT_REGION) || fqn.isRoot())
Modified: core/trunk/src/main/java/org/jboss/cache/config/EvictionRegionConfig.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/config/EvictionRegionConfig.java 2009-03-11 17:29:26 UTC (rev 7901)
+++ core/trunk/src/main/java/org/jboss/cache/config/EvictionRegionConfig.java 2009-03-11 17:37:39 UTC (rev 7902)
@@ -29,6 +29,9 @@
import java.lang.reflect.Method;
+/**
+ * It is imperative that a region Fqn is set, either via one of the constructors or using {@link #setRegionFqn(org.jboss.cache.Fqn)}.
+ */
public class EvictionRegionConfig extends ConfigurationComponent
{
/**
15 years, 9 months