[hibernate-commits] Hibernate SVN: r14254 - in core/trunk/cache-jbosscache2/src: test/java/org/hibernate/test/cache/jbc2/timestamp and 1 other directory.

hibernate-commits at lists.jboss.org hibernate-commits at lists.jboss.org
Fri Dec 21 14:49:29 EST 2007


Author: bstansberry at jboss.com
Date: 2007-12-21 14:49:29 -0500 (Fri, 21 Dec 2007)
New Revision: 14254

Added:
   core/trunk/cache-jbosscache2/src/main/java/org/hibernate/cache/jbc2/timestamp/ClusteredConcurrentTimestampsRegionImpl.java
   core/trunk/cache-jbosscache2/src/test/java/org/hibernate/test/cache/jbc2/timestamp/ClusteredConcurrentTimestampRegionTestCase.java
Log:
Put prototype of a next-gen TimestampsRegion impl under src control

Added: core/trunk/cache-jbosscache2/src/main/java/org/hibernate/cache/jbc2/timestamp/ClusteredConcurrentTimestampsRegionImpl.java
===================================================================
--- core/trunk/cache-jbosscache2/src/main/java/org/hibernate/cache/jbc2/timestamp/ClusteredConcurrentTimestampsRegionImpl.java	                        (rev 0)
+++ core/trunk/cache-jbosscache2/src/main/java/org/hibernate/cache/jbc2/timestamp/ClusteredConcurrentTimestampsRegionImpl.java	2007-12-21 19:49:29 UTC (rev 14254)
@@ -0,0 +1,363 @@
+/*
+ * Copyright (c) 2007, Red Hat Middleware, LLC. All rights reserved.
+ *
+ * This copyrighted material is made available to anyone wishing to use, modify,
+ * copy, or redistribute it subject to the terms and conditions of the GNU
+ * Lesser General Public License, v. 2.1. This program is distributed in the
+ * hope that it will be useful, but WITHOUT A WARRANTY; without even the implied
+ * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details. You should have received a
+ * copy of the GNU Lesser General Public License, v.2.1 along with this
+ * distribution; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ *
+ * Red Hat Author(s): Brian Stansberry
+ */
+
+package org.hibernate.cache.jbc2.timestamp;
+
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
+
+import javax.transaction.Transaction;
+
+import org.hibernate.cache.CacheException;
+import org.hibernate.cache.TimestampsRegion;
+import org.hibernate.cache.jbc2.TransactionalDataRegionAdapter;
+import org.hibernate.cache.jbc2.util.CacheHelper;
+import org.jboss.cache.Cache;
+import org.jboss.cache.Fqn;
+import org.jboss.cache.config.Option;
+import org.jboss.cache.notifications.annotation.CacheListener;
+import org.jboss.cache.notifications.annotation.NodeModified;
+import org.jboss.cache.notifications.annotation.NodeRemoved;
+import org.jboss.cache.notifications.event.NodeModifiedEvent;
+import org.jboss.cache.notifications.event.NodeRemovedEvent;
+
+/**
+ * Prototype of a clustered timestamps cache region impl usable if the
+ * TimestampsRegion API is changed.
+ * <p>
+ * Maintains a local (authoritative) cache of timestamps along with the
+ * distributed cache held in JBoss Cache. Listens for changes in the distributed
+ * cache and updates the local cache accordingly. Ensures that any changes in
+ * the local cache represent either 1) an increase in the timestamp or 
+ * 2) a stepback in the timestamp by the caller that initially increased
+ * it as part of a pre-invalidate call. This approach allows
+ * timestamp changes to be replicated asynchronously by JBoss Cache while still
+ * preventing invalid backward changes in timestamps.
+ * </p>
+ * 
+ * NOTE: This is just a prototype!!! Only useful if we change the 
+ * TimestampsRegion API.
+ * 
+ * @author Brian Stansberry
+ * @version $Revision: 14106 $
+ */
+ at CacheListener
+public class ClusteredConcurrentTimestampsRegionImpl extends TransactionalDataRegionAdapter implements TimestampsRegion {
+
+    public static final String TYPE = "TS";
+    
+    private final ConcurrentHashMap localCache = new ConcurrentHashMap();
+    
+    /**
+     * Create a new ClusteredConccurentTimestampsRegionImpl.
+     * 
+     * @param jbcCache
+     * @param regionName
+     * @param regionPrefix
+     *            TODO
+     * @param metadata
+     */
+    public ClusteredConcurrentTimestampsRegionImpl(Cache jbcCache, String regionName, String regionPrefix, Properties properties) {
+        super(jbcCache, regionName, regionPrefix, null);
+
+        jbcCache.addCacheListener(this);
+
+        populateLocalCache();
+    }
+
+    @Override
+    protected Fqn<String> createRegionFqn(String regionName, String regionPrefix) {
+        return getTypeFirstRegionFqn(regionName, regionPrefix, TYPE);
+    }
+
+    public void evict(Object key) throws CacheException {
+        Option opt = getNonLockingDataVersionOption(true);
+        CacheHelper.removeNode(getCacheInstance(), getRegionFqn(), key, opt);
+    }
+
+    public void evictAll() throws CacheException {
+        Option opt = getNonLockingDataVersionOption(true);
+        CacheHelper.removeAll(getCacheInstance(), getRegionFqn(), opt);
+        // Restore the region root node
+        CacheHelper.addNode(getCacheInstance(), getRegionFqn(), false, true, null);   
+    }
+
+    public Object get(Object key) throws CacheException {
+        Entry entry = getLocalEntry(key);
+        Object timestamp = entry.getCurrent();
+        if (timestamp == null) {
+            // Double check the distributed cache
+            Object[] vals = (Object[]) suspendAndGet(key, null, false);
+            if (vals != null) {
+                storeDataFromJBC(key, vals);
+                timestamp = entry.getCurrent();
+            }
+        }
+        return timestamp;
+    }
+
+    public void put(Object key, Object value) throws CacheException {
+        
+        throw new UnsupportedOperationException("Prototype only; Hibernate core must change the API before really using");
+    }
+    
+    public void preInvalidate(Object key, Object value) throws CacheException {
+        
+        Entry entry = getLocalEntry(key);
+        if (entry.preInvalidate(value)) {
+            putInJBossCache(key, entry);
+        }
+    }
+    
+    public void invalidate(Object key, Object value, Object preInvalidateValue) throws CacheException {
+        
+        Entry entry = getLocalEntry(key);
+        if (entry.invalidate(value, preInvalidateValue)) {
+            putInJBossCache(key, entry);
+        }
+    }
+    
+    private void putInJBossCache(Object key, Entry entry) {        
+    
+        // Get an exclusive right to update JBC for this key from this node.
+        boolean locked = false;
+        try {
+            entry.acquireJBCWriteMutex();
+            locked = true;
+            // We have the JBCWriteMutex, so no other *local* thread will 
+            // be trying to write this key. 
+            // It's possible here some remote thread has come in and
+            // changed the values again, but since we are reading the
+            // values to write to JBC right now, we know we are writing
+            // the latest values; i.e. we don't assume that what we cached
+            // in entry.update() above is what we should write to JBC *now*.
+            // Our write could be redundant, i.e. we are writing what
+            // some remote thread just came in an wrote.  There is a chance 
+            // that yet another remote thread will update us, and we'll then
+            // overwrite that later data in JBC.  But, all remote nodes will
+            // ignore that change in their localCache; the only place it 
+            // will live will be in JBC, where it can only effect the 
+            // initial state transfer values on newly joined nodes 
+            // (i.e. populateLocalCache()).
+            
+            // Don't hold the JBC node lock throughout the tx, as that
+            // prevents reads and other updates
+            Transaction tx = suspend();
+            try {
+                Option opt = getNonLockingDataVersionOption(false);
+                // We ensure ASYNC semantics (JBCACHE-1175)
+                opt.setForceAsynchronous(true);
+                CacheHelper.put(getCacheInstance(), getRegionFqn(), key, entry.getJBCUpdateValues(), opt);
+            } 
+            finally {
+                resume(tx);
+            }  
+        } 
+        catch (InterruptedException e) {
+            throw new CacheException("Interrupted while acquiring right to update " + key, e);
+        } 
+        finally {
+            if (locked) {
+                entry.releaseJBCWriteMutex();
+            }
+        }
+    }
+
+    @Override
+    public void destroy() throws CacheException {
+
+        getCacheInstance().removeCacheListener(this);
+        super.destroy();
+        localCache.clear();
+    }
+
+    /**
+     * Monitors cache events and updates the local cache
+     * 
+     * @param event
+     */
+    @NodeModified
+    public void nodeModified(NodeModifiedEvent event) {
+        if (event.isOriginLocal() || event.isPre())
+            return;
+
+        Fqn fqn = event.getFqn();
+        Fqn regFqn = getRegionFqn();
+        if (fqn.size() == regFqn.size() + 1 && fqn.isChildOf(regFqn)) {
+            Object key = fqn.get(regFqn.size());
+            Object[] vals = (Object[]) event.getData().get(ITEM);
+            storeDataFromJBC(key, vals);
+            // TODO consider this hack instead of the simple entry.update above:
+//            if (!entry.update(vals[0], vals[1])) {
+//                // Hack! Use the fact that the Object[] stored in JBC is
+//                // mutable to correct our local JBC state in this callback
+//                Object[] correct = entry.getJBCUpdateValues();
+//                vals[0] = correct[0];
+//                vals[1] = correct[1];
+//            }
+        }
+    }
+    
+    private void storeDataFromJBC(Object key, Object[] vals) {
+        Entry entry = getLocalEntry(key);
+        if (vals[0].equals(vals[1])) {
+            entry.preInvalidate(vals[0]);
+        }
+        else {
+            entry.invalidate(vals[0], vals[1]);
+        }
+    }
+
+    /**
+     * Monitors cache events and updates the local cache
+     * 
+     * @param event
+     */
+    @NodeRemoved
+    public void nodeRemoved(NodeRemovedEvent event) {
+        if (event.isOriginLocal() || event.isPre())
+            return;
+
+        Fqn fqn = event.getFqn();
+        Fqn regFqn = getRegionFqn();
+        if (fqn.isChildOrEquals(regFqn)) {
+            if (fqn.size() == regFqn.size()) {
+                localCache.clear();
+            }
+            else {
+                Object key = fqn.get(regFqn.size());
+                localCache.remove(key);
+            }
+        }
+    }
+
+    /**
+     * Brings all data from the distributed cache into our local cache.
+     */
+    private void populateLocalCache() {
+        Set children = CacheHelper.getChildrenNames(getCacheInstance(), getRegionFqn());
+        for (Object key : children) {
+            Object[] vals = (Object[]) suspendAndGet(key, null, false);
+            if (vals != null) {
+                storeDataFromJBC(key, vals);
+            }
+        }
+    }
+    
+    private Entry getLocalEntry(Object key) {
+        
+        Entry entry = new Entry();
+        Entry oldEntry = (Entry) localCache.putIfAbsent(key, entry);
+        return (oldEntry == null ? entry : oldEntry);
+    }
+    
+    private class Entry {
+        
+        private Semaphore writeMutex = new Semaphore(1);
+        private boolean preInvalidated = false;
+        private Object preInval  = null;
+        private Object current = null;
+        
+        void acquireJBCWriteMutex() throws InterruptedException {
+            writeMutex.acquire();
+        }
+        
+        void releaseJBCWriteMutex() {
+            writeMutex.release();
+        }
+        
+        synchronized boolean preInvalidate(Object newVal) {
+            
+            boolean result = false;
+            if (newVal instanceof Comparable) {
+                if (current == null || ((Comparable) newVal).compareTo(current) > 0) {
+                    preInval = current = newVal;
+                    preInvalidated = true;
+                    result = true;
+                }
+            }
+            else {
+                preInval = current = newVal;
+                result = true;
+            }
+            
+            return result;
+        }
+        
+        synchronized boolean invalidate(Object newVal, Object preInvalidateValue) {
+            
+            boolean result = false;
+            
+            if (current == null) {
+                // Initial load from JBC
+                current = newVal;
+                preInval = preInvalidateValue;
+                preInvalidated = false;
+                result = true;     
+            }
+            else if (preInvalidated) {
+                if (newVal instanceof Comparable) {
+                    if (safeEquals(preInvalidateValue, this.preInval)
+                            || ((Comparable) newVal).compareTo(preInval) > 0) {
+                        current = newVal;
+                        preInval = preInvalidateValue;
+                        preInvalidated = false;
+                        result =  true;                    
+                    }
+                }
+                else {
+                    current = newVal;
+                    preInval = preInvalidateValue;
+                    result =  true;
+                }
+            }
+            else if (newVal instanceof Comparable) {
+                // See if we had a 2nd invalidation from the same initial
+                // preinvalidation timestamp. If so, only increment
+                // if the new current value is an increase
+                if (safeEquals(preInvalidateValue, this.preInval)
+                        && ((Comparable) newVal).compareTo(current) > 0) {
+                    current = newVal;
+                    preInval = preInvalidateValue;
+                    result =  true;                    
+                }
+            }  
+            
+            return result;
+        }
+        
+        synchronized Object getCurrent() {
+            return current;
+        }
+        
+        synchronized Object getPreInval() {
+            return preInval;
+        }
+        
+        synchronized Object[] getJBCUpdateValues() {
+            return new Object[] {current, preInval};
+        }
+        
+        private boolean safeEquals(Object a, Object b) {
+            return (a == b || (a != null && a.equals(b)));
+        }
+    }
+    
+    
+
+}

Added: core/trunk/cache-jbosscache2/src/test/java/org/hibernate/test/cache/jbc2/timestamp/ClusteredConcurrentTimestampRegionTestCase.java
===================================================================
--- core/trunk/cache-jbosscache2/src/test/java/org/hibernate/test/cache/jbc2/timestamp/ClusteredConcurrentTimestampRegionTestCase.java	                        (rev 0)
+++ core/trunk/cache-jbosscache2/src/test/java/org/hibernate/test/cache/jbc2/timestamp/ClusteredConcurrentTimestampRegionTestCase.java	2007-12-21 19:49:29 UTC (rev 14254)
@@ -0,0 +1,281 @@
+/*
+ * Copyright (c) 2007, Red Hat Middleware, LLC. All rights reserved.
+ *
+ * This copyrighted material is made available to anyone wishing to use, modify,
+ * copy, or redistribute it subject to the terms and conditions of the GNU
+ * Lesser General Public License, v. 2.1. This program is distributed in the
+ * hope that it will be useful, but WITHOUT A WARRANTY; without even the implied
+ * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details. You should have received a
+ * copy of the GNU Lesser General Public License, v.2.1 along with this
+ * distribution; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ *
+ * Red Hat Author(s): Brian Stansberry
+ */
+
+package org.hibernate.test.cache.jbc2.timestamp;
+
+import java.util.Properties;
+import java.util.Random;
+
+import junit.framework.AssertionFailedError;
+
+import org.hibernate.cache.UpdateTimestampsCache;
+import org.hibernate.cache.jbc2.CacheInstanceManager;
+import org.hibernate.cache.jbc2.JBossCacheRegionFactory;
+import org.hibernate.cache.jbc2.MultiplexedJBossCacheRegionFactory;
+import org.hibernate.cache.jbc2.timestamp.ClusteredConcurrentTimestampsRegionImpl;
+import org.hibernate.cfg.Configuration;
+import org.hibernate.test.cache.jbc2.AbstractJBossCacheTestCase;
+import org.hibernate.test.util.CacheTestUtil;
+import org.jboss.cache.Cache;
+
+/**
+ * A ClusteredConcurrentTimestampCacheTestCase.
+ * 
+ * @author <a href="brian.stansberry at jboss.com">Brian Stansberry</a>
+ * @version $Revision: 1 $
+ */
+public class ClusteredConcurrentTimestampRegionTestCase extends AbstractJBossCacheTestCase {
+
+    private static final String KEY1 = "com.foo.test.Entity1";
+    private static final String KEY2 = "com.foo.test.Entity2";
+    
+    private static final Long ONE = new Long(1);
+    private static final Long TWO = new Long(2);
+    private static final Long THREE = new Long(3);
+    private static final Long TEN = new Long(10);
+    private static final Long ELEVEN = new Long(11);
+    
+    private static Cache cache;
+    private static Properties properties;
+    private ClusteredConcurrentTimestampsRegionImpl region;
+    
+    /**
+     * Create a new ClusteredConcurrentTimestampCacheTestCase.
+     * 
+     * @param name
+     */
+    public ClusteredConcurrentTimestampRegionTestCase(String name) {
+        super(name);
+    }
+    
+    
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        
+        if (cache == null) {
+            Configuration cfg = CacheTestUtil.buildConfiguration("test", MultiplexedJBossCacheRegionFactory.class, false, true);
+            properties = cfg.getProperties();
+            cache = createCache();
+            
+            // Sleep a bit to avoid concurrent FLUSH problem
+            avoidConcurrentFlush();
+        }
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        
+        if (region != null) {
+            region.destroy();
+        }
+    }
+    
+    private Cache createCache() throws Exception {
+        Configuration cfg = CacheTestUtil.buildConfiguration("test", MultiplexedJBossCacheRegionFactory.class, false, true);
+        JBossCacheRegionFactory regionFactory = CacheTestUtil.startRegionFactory(cfg);
+        CacheInstanceManager mgr = regionFactory.getCacheInstanceManager();
+        return mgr.getTimestampsCacheInstance();
+    }
+
+    protected ClusteredConcurrentTimestampsRegionImpl getTimestampRegion(Cache cache)  throws Exception {        
+        
+        return new ClusteredConcurrentTimestampsRegionImpl(cache, "test/" + UpdateTimestampsCache.class.getName(), "test", properties);
+    }
+    
+    public void testSimplePreinvalidate() throws Exception {
+        
+        region = getTimestampRegion(cache);
+        
+        assertEquals(null, region.get(KEY1));
+        region.preInvalidate(KEY1, TWO);
+        assertEquals(TWO, region.get(KEY1));
+        region.preInvalidate(KEY1, ONE);
+        assertEquals(TWO, region.get(KEY1));
+        region.preInvalidate(KEY1, TWO);
+        assertEquals(TWO, region.get(KEY1));
+        region.preInvalidate(KEY1, THREE);
+        assertEquals(THREE, region.get(KEY1));
+    }
+    
+    public void testInitialState() throws Exception {
+        
+        region = getTimestampRegion(cache);
+        region.preInvalidate(KEY1, TEN);
+        region.preInvalidate(KEY2, ELEVEN);
+        region.invalidate(KEY1, ONE, TEN);
+        
+        Cache cache2 = createCache();
+        registerCache(cache2);
+        
+        // Sleep a bit to avoid concurrent FLUSH problem
+        avoidConcurrentFlush();
+        
+        ClusteredConcurrentTimestampsRegionImpl region2 = getTimestampRegion(cache2);
+        assertEquals(ONE, region2.get(KEY1));
+        assertEquals(ELEVEN, region2.get(KEY2));
+    }
+    
+    public void testSimpleInvalidate() throws Exception {
+        
+        region = getTimestampRegion(cache);
+        
+        assertEquals(null, region.get(KEY1));
+        region.preInvalidate(KEY1, TWO);
+        assertEquals(TWO, region.get(KEY1));
+        region.invalidate(KEY1, ONE, TWO);
+        assertEquals(ONE, region.get(KEY1));
+        region.preInvalidate(KEY1, TEN);
+        region.preInvalidate(KEY1, ELEVEN);
+        assertEquals(ELEVEN, region.get(KEY1));
+        region.invalidate(KEY1, TWO, TEN);
+        assertEquals(ELEVEN, region.get(KEY1));
+        region.invalidate(KEY1, TWO, ELEVEN);
+        assertEquals(TWO, region.get(KEY1));
+        region.preInvalidate(KEY1, TEN);
+        assertEquals(TEN, region.get(KEY1));
+        region.invalidate(KEY1, THREE, TEN);
+        assertEquals(THREE, region.get(KEY1));        
+    }
+    
+    public void testConcurrentActivityClustered() throws Exception {
+        concurrentActivityTest(true);
+    }
+    
+    public void testConcurrentActivityNonClustered() throws Exception {
+        concurrentActivityTest(false);
+    }
+    
+    private void concurrentActivityTest(boolean clustered) throws Exception {
+        
+        region = getTimestampRegion(cache);
+        ClusteredConcurrentTimestampsRegionImpl region2 = region;
+        
+        if (clustered) {
+            Cache cache2 = createCache();
+            registerCache(cache2);
+            
+            // Sleep a bit to avoid concurrent FLUSH problem
+            avoidConcurrentFlush();
+            
+            region2 = getTimestampRegion(cache2);
+        }
+        
+        Tester[] testers = new Tester[20];
+        for (int i = 0; i < testers.length; i++) {
+            testers[i] = new Tester((i % 2 == 0) ? region : region2);
+            testers[i].start();
+        }
+        
+        for (int j = 0; j < 10; j++) {
+            sleep(2000);
+            
+            log.info("Running for " + ((j + 1) * 2) + " seconds");
+            
+            for (int i = 0; i < testers.length; i++) {
+                if (testers[i].assertionFailure != null)
+                    throw testers[i].assertionFailure;
+            }
+            
+            for (int i = 0; i < testers.length; i++) {
+                if (testers[i].exception != null)
+                    throw testers[i].exception;
+            }
+        }
+        
+        for (int i = 0; i < testers.length; i++) {
+            testers[i].stop();
+        }
+        
+        for (int i = 0; i < testers.length; i++) {
+            if (testers[i].assertionFailure != null)
+                throw testers[i].assertionFailure;
+        }
+        
+        for (int i = 0; i < testers.length; i++) {
+            if (testers[i].exception != null)
+                throw testers[i].exception;
+        }
+    }
+    
+    
+    
+    private class Tester implements Runnable {
+        
+        ClusteredConcurrentTimestampsRegionImpl region;
+        Exception exception;
+        AssertionFailedError assertionFailure;
+        boolean stopped = true;
+        Thread thread;
+        Random random = new Random();
+
+        Tester(ClusteredConcurrentTimestampsRegionImpl region) {
+            this.region = region;
+        }
+        
+        public void run() {
+            stopped = false;
+        
+            while (!stopped) {
+                try {
+                    Long pre = new Long(region.nextTimestamp() + region.getTimeout());
+                    region.preInvalidate(KEY1, pre);
+                    sleep(random.nextInt(1));
+                    Long post = new Long(region.nextTimestamp());
+                    region.invalidate(KEY1, post, pre);
+                    Long ts = (Long) region.get(KEY1);
+                    assertTrue(ts + " >= " + post, ts.longValue() >= post.longValue());
+                    sleep(random.nextInt(1));
+                }
+                catch (AssertionFailedError e) {
+                    assertionFailure = e;
+                }
+                catch (Exception e) {
+                    if (!stopped)
+                        exception = e;
+                }
+                finally {
+                    stopped = true;
+                }
+            }
+        }
+        
+        void start() {
+            if (stopped) {
+                if (thread == null) {
+                    thread = new Thread(this);
+                    thread.setDaemon(true);
+                }
+                thread.start();
+            }            
+        }
+        
+        void stop() {
+            if (!stopped) {
+                stopped = true;
+                try {
+                    thread.join(100);
+                }
+                catch (InterruptedException ignored) {}
+                
+                if (thread.isAlive())
+                    thread.interrupt();
+            }
+        }
+    }
+}




More information about the hibernate-commits mailing list