Author: bstansberry(a)jboss.com
Date: 2007-12-21 14:49:29 -0500 (Fri, 21 Dec 2007)
New Revision: 14254
Added:
core/trunk/cache-jbosscache2/src/main/java/org/hibernate/cache/jbc2/timestamp/ClusteredConcurrentTimestampsRegionImpl.java
core/trunk/cache-jbosscache2/src/test/java/org/hibernate/test/cache/jbc2/timestamp/ClusteredConcurrentTimestampRegionTestCase.java
Log:
Put prototype of a next-gen TimestampsRegion impl under src control
Added:
core/trunk/cache-jbosscache2/src/main/java/org/hibernate/cache/jbc2/timestamp/ClusteredConcurrentTimestampsRegionImpl.java
===================================================================
---
core/trunk/cache-jbosscache2/src/main/java/org/hibernate/cache/jbc2/timestamp/ClusteredConcurrentTimestampsRegionImpl.java
(rev 0)
+++
core/trunk/cache-jbosscache2/src/main/java/org/hibernate/cache/jbc2/timestamp/ClusteredConcurrentTimestampsRegionImpl.java 2007-12-21
19:49:29 UTC (rev 14254)
@@ -0,0 +1,363 @@
+/*
+ * Copyright (c) 2007, Red Hat Middleware, LLC. All rights reserved.
+ *
+ * This copyrighted material is made available to anyone wishing to use, modify,
+ * copy, or redistribute it subject to the terms and conditions of the GNU
+ * Lesser General Public License, v. 2.1. This program is distributed in the
+ * hope that it will be useful, but WITHOUT A WARRANTY; without even the implied
+ * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details. You should have received a
+ * copy of the GNU Lesser General Public License, v.2.1 along with this
+ * distribution; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ *
+ * Red Hat Author(s): Brian Stansberry
+ */
+
+package org.hibernate.cache.jbc2.timestamp;
+
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
+
+import javax.transaction.Transaction;
+
+import org.hibernate.cache.CacheException;
+import org.hibernate.cache.TimestampsRegion;
+import org.hibernate.cache.jbc2.TransactionalDataRegionAdapter;
+import org.hibernate.cache.jbc2.util.CacheHelper;
+import org.jboss.cache.Cache;
+import org.jboss.cache.Fqn;
+import org.jboss.cache.config.Option;
+import org.jboss.cache.notifications.annotation.CacheListener;
+import org.jboss.cache.notifications.annotation.NodeModified;
+import org.jboss.cache.notifications.annotation.NodeRemoved;
+import org.jboss.cache.notifications.event.NodeModifiedEvent;
+import org.jboss.cache.notifications.event.NodeRemovedEvent;
+
+/**
+ * Prototype of a clustered timestamps cache region impl usable if the
+ * TimestampsRegion API is changed.
+ * <p>
+ * Maintains a local (authoritative) cache of timestamps along with the
+ * distributed cache held in JBoss Cache. Listens for changes in the distributed
+ * cache and updates the local cache accordingly. Ensures that any changes in
+ * the local cache represent either 1) an increase in the timestamp or
+ * 2) a stepback in the timestamp by the caller that initially increased
+ * it as part of a pre-invalidate call. This approach allows
+ * timestamp changes to be replicated asynchronously by JBoss Cache while still
+ * preventing invalid backward changes in timestamps.
+ * </p>
+ *
+ * NOTE: This is just a prototype!!! Only useful if we change the
+ * TimestampsRegion API.
+ *
+ * @author Brian Stansberry
+ * @version $Revision: 14106 $
+ */
+@CacheListener
+public class ClusteredConcurrentTimestampsRegionImpl extends
TransactionalDataRegionAdapter implements TimestampsRegion {
+
+ public static final String TYPE = "TS";
+
+ private final ConcurrentHashMap localCache = new ConcurrentHashMap();
+
+ /**
+ * Create a new ClusteredConccurentTimestampsRegionImpl.
+ *
+ * @param jbcCache
+ * @param regionName
+ * @param regionPrefix
+ * TODO
+ * @param metadata
+ */
+ public ClusteredConcurrentTimestampsRegionImpl(Cache jbcCache, String regionName,
String regionPrefix, Properties properties) {
+ super(jbcCache, regionName, regionPrefix, null);
+
+ jbcCache.addCacheListener(this);
+
+ populateLocalCache();
+ }
+
+ @Override
+ protected Fqn<String> createRegionFqn(String regionName, String regionPrefix)
{
+ return getTypeFirstRegionFqn(regionName, regionPrefix, TYPE);
+ }
+
+ public void evict(Object key) throws CacheException {
+ Option opt = getNonLockingDataVersionOption(true);
+ CacheHelper.removeNode(getCacheInstance(), getRegionFqn(), key, opt);
+ }
+
+ public void evictAll() throws CacheException {
+ Option opt = getNonLockingDataVersionOption(true);
+ CacheHelper.removeAll(getCacheInstance(), getRegionFqn(), opt);
+ // Restore the region root node
+ CacheHelper.addNode(getCacheInstance(), getRegionFqn(), false, true, null);
+ }
+
+ public Object get(Object key) throws CacheException {
+ Entry entry = getLocalEntry(key);
+ Object timestamp = entry.getCurrent();
+ if (timestamp == null) {
+ // Double check the distributed cache
+ Object[] vals = (Object[]) suspendAndGet(key, null, false);
+ if (vals != null) {
+ storeDataFromJBC(key, vals);
+ timestamp = entry.getCurrent();
+ }
+ }
+ return timestamp;
+ }
+
+ public void put(Object key, Object value) throws CacheException {
+
+ throw new UnsupportedOperationException("Prototype only; Hibernate core must
change the API before really using");
+ }
+
+ public void preInvalidate(Object key, Object value) throws CacheException {
+
+ Entry entry = getLocalEntry(key);
+ if (entry.preInvalidate(value)) {
+ putInJBossCache(key, entry);
+ }
+ }
+
+ public void invalidate(Object key, Object value, Object preInvalidateValue) throws
CacheException {
+
+ Entry entry = getLocalEntry(key);
+ if (entry.invalidate(value, preInvalidateValue)) {
+ putInJBossCache(key, entry);
+ }
+ }
+
+ private void putInJBossCache(Object key, Entry entry) {
+
+ // Get an exclusive right to update JBC for this key from this node.
+ boolean locked = false;
+ try {
+ entry.acquireJBCWriteMutex();
+ locked = true;
+ // We have the JBCWriteMutex, so no other *local* thread will
+ // be trying to write this key.
+ // It's possible here some remote thread has come in and
+ // changed the values again, but since we are reading the
+ // values to write to JBC right now, we know we are writing
+ // the latest values; i.e. we don't assume that what we cached
+ // in entry.update() above is what we should write to JBC *now*.
+ // Our write could be redundant, i.e. we are writing what
+ // some remote thread just came in an wrote. There is a chance
+ // that yet another remote thread will update us, and we'll then
+ // overwrite that later data in JBC. But, all remote nodes will
+ // ignore that change in their localCache; the only place it
+ // will live will be in JBC, where it can only effect the
+ // initial state transfer values on newly joined nodes
+ // (i.e. populateLocalCache()).
+
+ // Don't hold the JBC node lock throughout the tx, as that
+ // prevents reads and other updates
+ Transaction tx = suspend();
+ try {
+ Option opt = getNonLockingDataVersionOption(false);
+ // We ensure ASYNC semantics (JBCACHE-1175)
+ opt.setForceAsynchronous(true);
+ CacheHelper.put(getCacheInstance(), getRegionFqn(), key,
entry.getJBCUpdateValues(), opt);
+ }
+ finally {
+ resume(tx);
+ }
+ }
+ catch (InterruptedException e) {
+ throw new CacheException("Interrupted while acquiring right to update
" + key, e);
+ }
+ finally {
+ if (locked) {
+ entry.releaseJBCWriteMutex();
+ }
+ }
+ }
+
+ @Override
+ public void destroy() throws CacheException {
+
+ getCacheInstance().removeCacheListener(this);
+ super.destroy();
+ localCache.clear();
+ }
+
+ /**
+ * Monitors cache events and updates the local cache
+ *
+ * @param event
+ */
+ @NodeModified
+ public void nodeModified(NodeModifiedEvent event) {
+ if (event.isOriginLocal() || event.isPre())
+ return;
+
+ Fqn fqn = event.getFqn();
+ Fqn regFqn = getRegionFqn();
+ if (fqn.size() == regFqn.size() + 1 && fqn.isChildOf(regFqn)) {
+ Object key = fqn.get(regFqn.size());
+ Object[] vals = (Object[]) event.getData().get(ITEM);
+ storeDataFromJBC(key, vals);
+ // TODO consider this hack instead of the simple entry.update above:
+// if (!entry.update(vals[0], vals[1])) {
+// // Hack! Use the fact that the Object[] stored in JBC is
+// // mutable to correct our local JBC state in this callback
+// Object[] correct = entry.getJBCUpdateValues();
+// vals[0] = correct[0];
+// vals[1] = correct[1];
+// }
+ }
+ }
+
+ private void storeDataFromJBC(Object key, Object[] vals) {
+ Entry entry = getLocalEntry(key);
+ if (vals[0].equals(vals[1])) {
+ entry.preInvalidate(vals[0]);
+ }
+ else {
+ entry.invalidate(vals[0], vals[1]);
+ }
+ }
+
+ /**
+ * Monitors cache events and updates the local cache
+ *
+ * @param event
+ */
+ @NodeRemoved
+ public void nodeRemoved(NodeRemovedEvent event) {
+ if (event.isOriginLocal() || event.isPre())
+ return;
+
+ Fqn fqn = event.getFqn();
+ Fqn regFqn = getRegionFqn();
+ if (fqn.isChildOrEquals(regFqn)) {
+ if (fqn.size() == regFqn.size()) {
+ localCache.clear();
+ }
+ else {
+ Object key = fqn.get(regFqn.size());
+ localCache.remove(key);
+ }
+ }
+ }
+
+ /**
+ * Brings all data from the distributed cache into our local cache.
+ */
+ private void populateLocalCache() {
+ Set children = CacheHelper.getChildrenNames(getCacheInstance(), getRegionFqn());
+ for (Object key : children) {
+ Object[] vals = (Object[]) suspendAndGet(key, null, false);
+ if (vals != null) {
+ storeDataFromJBC(key, vals);
+ }
+ }
+ }
+
+ private Entry getLocalEntry(Object key) {
+
+ Entry entry = new Entry();
+ Entry oldEntry = (Entry) localCache.putIfAbsent(key, entry);
+ return (oldEntry == null ? entry : oldEntry);
+ }
+
+ private class Entry {
+
+ private Semaphore writeMutex = new Semaphore(1);
+ private boolean preInvalidated = false;
+ private Object preInval = null;
+ private Object current = null;
+
+ void acquireJBCWriteMutex() throws InterruptedException {
+ writeMutex.acquire();
+ }
+
+ void releaseJBCWriteMutex() {
+ writeMutex.release();
+ }
+
+ synchronized boolean preInvalidate(Object newVal) {
+
+ boolean result = false;
+ if (newVal instanceof Comparable) {
+ if (current == null || ((Comparable) newVal).compareTo(current) > 0)
{
+ preInval = current = newVal;
+ preInvalidated = true;
+ result = true;
+ }
+ }
+ else {
+ preInval = current = newVal;
+ result = true;
+ }
+
+ return result;
+ }
+
+ synchronized boolean invalidate(Object newVal, Object preInvalidateValue) {
+
+ boolean result = false;
+
+ if (current == null) {
+ // Initial load from JBC
+ current = newVal;
+ preInval = preInvalidateValue;
+ preInvalidated = false;
+ result = true;
+ }
+ else if (preInvalidated) {
+ if (newVal instanceof Comparable) {
+ if (safeEquals(preInvalidateValue, this.preInval)
+ || ((Comparable) newVal).compareTo(preInval) > 0) {
+ current = newVal;
+ preInval = preInvalidateValue;
+ preInvalidated = false;
+ result = true;
+ }
+ }
+ else {
+ current = newVal;
+ preInval = preInvalidateValue;
+ result = true;
+ }
+ }
+ else if (newVal instanceof Comparable) {
+ // See if we had a 2nd invalidation from the same initial
+ // preinvalidation timestamp. If so, only increment
+ // if the new current value is an increase
+ if (safeEquals(preInvalidateValue, this.preInval)
+ && ((Comparable) newVal).compareTo(current) > 0) {
+ current = newVal;
+ preInval = preInvalidateValue;
+ result = true;
+ }
+ }
+
+ return result;
+ }
+
+ synchronized Object getCurrent() {
+ return current;
+ }
+
+ synchronized Object getPreInval() {
+ return preInval;
+ }
+
+ synchronized Object[] getJBCUpdateValues() {
+ return new Object[] {current, preInval};
+ }
+
+ private boolean safeEquals(Object a, Object b) {
+ return (a == b || (a != null && a.equals(b)));
+ }
+ }
+
+
+
+}
Added:
core/trunk/cache-jbosscache2/src/test/java/org/hibernate/test/cache/jbc2/timestamp/ClusteredConcurrentTimestampRegionTestCase.java
===================================================================
---
core/trunk/cache-jbosscache2/src/test/java/org/hibernate/test/cache/jbc2/timestamp/ClusteredConcurrentTimestampRegionTestCase.java
(rev 0)
+++
core/trunk/cache-jbosscache2/src/test/java/org/hibernate/test/cache/jbc2/timestamp/ClusteredConcurrentTimestampRegionTestCase.java 2007-12-21
19:49:29 UTC (rev 14254)
@@ -0,0 +1,281 @@
+/*
+ * Copyright (c) 2007, Red Hat Middleware, LLC. All rights reserved.
+ *
+ * This copyrighted material is made available to anyone wishing to use, modify,
+ * copy, or redistribute it subject to the terms and conditions of the GNU
+ * Lesser General Public License, v. 2.1. This program is distributed in the
+ * hope that it will be useful, but WITHOUT A WARRANTY; without even the implied
+ * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details. You should have received a
+ * copy of the GNU Lesser General Public License, v.2.1 along with this
+ * distribution; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ *
+ * Red Hat Author(s): Brian Stansberry
+ */
+
+package org.hibernate.test.cache.jbc2.timestamp;
+
+import java.util.Properties;
+import java.util.Random;
+
+import junit.framework.AssertionFailedError;
+
+import org.hibernate.cache.UpdateTimestampsCache;
+import org.hibernate.cache.jbc2.CacheInstanceManager;
+import org.hibernate.cache.jbc2.JBossCacheRegionFactory;
+import org.hibernate.cache.jbc2.MultiplexedJBossCacheRegionFactory;
+import org.hibernate.cache.jbc2.timestamp.ClusteredConcurrentTimestampsRegionImpl;
+import org.hibernate.cfg.Configuration;
+import org.hibernate.test.cache.jbc2.AbstractJBossCacheTestCase;
+import org.hibernate.test.util.CacheTestUtil;
+import org.jboss.cache.Cache;
+
+/**
+ * A ClusteredConcurrentTimestampCacheTestCase.
+ *
+ * @author <a href="brian.stansberry(a)jboss.com">Brian
Stansberry</a>
+ * @version $Revision: 1 $
+ */
+public class ClusteredConcurrentTimestampRegionTestCase extends
AbstractJBossCacheTestCase {
+
+ private static final String KEY1 = "com.foo.test.Entity1";
+ private static final String KEY2 = "com.foo.test.Entity2";
+
+ private static final Long ONE = new Long(1);
+ private static final Long TWO = new Long(2);
+ private static final Long THREE = new Long(3);
+ private static final Long TEN = new Long(10);
+ private static final Long ELEVEN = new Long(11);
+
+ private static Cache cache;
+ private static Properties properties;
+ private ClusteredConcurrentTimestampsRegionImpl region;
+
+ /**
+ * Create a new ClusteredConcurrentTimestampCacheTestCase.
+ *
+ * @param name
+ */
+ public ClusteredConcurrentTimestampRegionTestCase(String name) {
+ super(name);
+ }
+
+
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+
+ if (cache == null) {
+ Configuration cfg = CacheTestUtil.buildConfiguration("test",
MultiplexedJBossCacheRegionFactory.class, false, true);
+ properties = cfg.getProperties();
+ cache = createCache();
+
+ // Sleep a bit to avoid concurrent FLUSH problem
+ avoidConcurrentFlush();
+ }
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ super.tearDown();
+
+ if (region != null) {
+ region.destroy();
+ }
+ }
+
+ private Cache createCache() throws Exception {
+ Configuration cfg = CacheTestUtil.buildConfiguration("test",
MultiplexedJBossCacheRegionFactory.class, false, true);
+ JBossCacheRegionFactory regionFactory = CacheTestUtil.startRegionFactory(cfg);
+ CacheInstanceManager mgr = regionFactory.getCacheInstanceManager();
+ return mgr.getTimestampsCacheInstance();
+ }
+
+ protected ClusteredConcurrentTimestampsRegionImpl getTimestampRegion(Cache cache)
throws Exception {
+
+ return new ClusteredConcurrentTimestampsRegionImpl(cache, "test/" +
UpdateTimestampsCache.class.getName(), "test", properties);
+ }
+
+ public void testSimplePreinvalidate() throws Exception {
+
+ region = getTimestampRegion(cache);
+
+ assertEquals(null, region.get(KEY1));
+ region.preInvalidate(KEY1, TWO);
+ assertEquals(TWO, region.get(KEY1));
+ region.preInvalidate(KEY1, ONE);
+ assertEquals(TWO, region.get(KEY1));
+ region.preInvalidate(KEY1, TWO);
+ assertEquals(TWO, region.get(KEY1));
+ region.preInvalidate(KEY1, THREE);
+ assertEquals(THREE, region.get(KEY1));
+ }
+
+ public void testInitialState() throws Exception {
+
+ region = getTimestampRegion(cache);
+ region.preInvalidate(KEY1, TEN);
+ region.preInvalidate(KEY2, ELEVEN);
+ region.invalidate(KEY1, ONE, TEN);
+
+ Cache cache2 = createCache();
+ registerCache(cache2);
+
+ // Sleep a bit to avoid concurrent FLUSH problem
+ avoidConcurrentFlush();
+
+ ClusteredConcurrentTimestampsRegionImpl region2 = getTimestampRegion(cache2);
+ assertEquals(ONE, region2.get(KEY1));
+ assertEquals(ELEVEN, region2.get(KEY2));
+ }
+
+ public void testSimpleInvalidate() throws Exception {
+
+ region = getTimestampRegion(cache);
+
+ assertEquals(null, region.get(KEY1));
+ region.preInvalidate(KEY1, TWO);
+ assertEquals(TWO, region.get(KEY1));
+ region.invalidate(KEY1, ONE, TWO);
+ assertEquals(ONE, region.get(KEY1));
+ region.preInvalidate(KEY1, TEN);
+ region.preInvalidate(KEY1, ELEVEN);
+ assertEquals(ELEVEN, region.get(KEY1));
+ region.invalidate(KEY1, TWO, TEN);
+ assertEquals(ELEVEN, region.get(KEY1));
+ region.invalidate(KEY1, TWO, ELEVEN);
+ assertEquals(TWO, region.get(KEY1));
+ region.preInvalidate(KEY1, TEN);
+ assertEquals(TEN, region.get(KEY1));
+ region.invalidate(KEY1, THREE, TEN);
+ assertEquals(THREE, region.get(KEY1));
+ }
+
+ public void testConcurrentActivityClustered() throws Exception {
+ concurrentActivityTest(true);
+ }
+
+ public void testConcurrentActivityNonClustered() throws Exception {
+ concurrentActivityTest(false);
+ }
+
+ private void concurrentActivityTest(boolean clustered) throws Exception {
+
+ region = getTimestampRegion(cache);
+ ClusteredConcurrentTimestampsRegionImpl region2 = region;
+
+ if (clustered) {
+ Cache cache2 = createCache();
+ registerCache(cache2);
+
+ // Sleep a bit to avoid concurrent FLUSH problem
+ avoidConcurrentFlush();
+
+ region2 = getTimestampRegion(cache2);
+ }
+
+ Tester[] testers = new Tester[20];
+ for (int i = 0; i < testers.length; i++) {
+ testers[i] = new Tester((i % 2 == 0) ? region : region2);
+ testers[i].start();
+ }
+
+ for (int j = 0; j < 10; j++) {
+ sleep(2000);
+
+ log.info("Running for " + ((j + 1) * 2) + " seconds");
+
+ for (int i = 0; i < testers.length; i++) {
+ if (testers[i].assertionFailure != null)
+ throw testers[i].assertionFailure;
+ }
+
+ for (int i = 0; i < testers.length; i++) {
+ if (testers[i].exception != null)
+ throw testers[i].exception;
+ }
+ }
+
+ for (int i = 0; i < testers.length; i++) {
+ testers[i].stop();
+ }
+
+ for (int i = 0; i < testers.length; i++) {
+ if (testers[i].assertionFailure != null)
+ throw testers[i].assertionFailure;
+ }
+
+ for (int i = 0; i < testers.length; i++) {
+ if (testers[i].exception != null)
+ throw testers[i].exception;
+ }
+ }
+
+
+
+ private class Tester implements Runnable {
+
+ ClusteredConcurrentTimestampsRegionImpl region;
+ Exception exception;
+ AssertionFailedError assertionFailure;
+ boolean stopped = true;
+ Thread thread;
+ Random random = new Random();
+
+ Tester(ClusteredConcurrentTimestampsRegionImpl region) {
+ this.region = region;
+ }
+
+ public void run() {
+ stopped = false;
+
+ while (!stopped) {
+ try {
+ Long pre = new Long(region.nextTimestamp() + region.getTimeout());
+ region.preInvalidate(KEY1, pre);
+ sleep(random.nextInt(1));
+ Long post = new Long(region.nextTimestamp());
+ region.invalidate(KEY1, post, pre);
+ Long ts = (Long) region.get(KEY1);
+ assertTrue(ts + " >= " + post, ts.longValue() >=
post.longValue());
+ sleep(random.nextInt(1));
+ }
+ catch (AssertionFailedError e) {
+ assertionFailure = e;
+ }
+ catch (Exception e) {
+ if (!stopped)
+ exception = e;
+ }
+ finally {
+ stopped = true;
+ }
+ }
+ }
+
+ void start() {
+ if (stopped) {
+ if (thread == null) {
+ thread = new Thread(this);
+ thread.setDaemon(true);
+ }
+ thread.start();
+ }
+ }
+
+ void stop() {
+ if (!stopped) {
+ stopped = true;
+ try {
+ thread.join(100);
+ }
+ catch (InterruptedException ignored) {}
+
+ if (thread.isAlive())
+ thread.interrupt();
+ }
+ }
+ }
+}