[jboss-cvs] JBossAS SVN: r107069 - projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/framework/server.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Jul 23 16:11:09 EDT 2010
Author: smarlow at redhat.com
Date: 2010-07-23 16:11:08 -0400 (Fri, 23 Jul 2010)
New Revision: 107069
Modified:
projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/framework/server/DistributedStateImpl.java
Log:
JBCLUSTER-255
Modified: projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/framework/server/DistributedStateImpl.java
===================================================================
--- projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/framework/server/DistributedStateImpl.java 2010-07-23 18:45:08 UTC (rev 107068)
+++ projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/framework/server/DistributedStateImpl.java 2010-07-23 20:11:08 UTC (rev 107069)
@@ -24,33 +24,41 @@
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
import org.infinispan.Cache;
+import org.infinispan.atomic.AtomicMap;
+import org.infinispan.atomic.AtomicMapLookup;
import org.infinispan.config.Configuration;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryModified;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryRemoved;
import org.infinispan.notifications.cachelistener.event.CacheEntryModifiedEvent;
import org.infinispan.notifications.cachelistener.event.CacheEntryRemovedEvent;
-import org.infinispan.tree.TreeCache;
import org.infinispan.CacheException;
-
import org.infinispan.notifications.Listener;
-import org.infinispan.tree.Fqn;
-import org.infinispan.tree.Node;
-import org.infinispan.tree.TreeCacheFactory;
+import org.infinispan.util.SimpleImmutableEntry;
import org.jboss.ha.framework.server.spi.ManagedDistributedState;
import org.jboss.logging.Logger;
/**
- * This class manages distributed state across the cluster.
+ * This class manages distributed state across the cluster.
*
+ * The Cache layout will have two formats. The value collection will combine the category
+ * + DS key for the composite key. The DS value will be the value. It will look like:
+ * Cache<SimpleImmutableEntry<String Category, Serializable DS_Key>, Serializable DS_Value>
+ *
+ * The other Cache format will be the DS keys per category. It will look like:
+ * Cache<String, AtomicMap<Serializable, null>>
+ *
+ * If it ever becomes important to eliminate the "keys per category" (e.g. to save space),
+ * we would have to implement the getAllCategories() + getAllKeys() sequentially.
+ *
* @author <a href="mailto:sacha.labourey at cogito-info.ch">Sacha Labourey</a>.
* @author <a href="mailto:bill at burkecentral.com">Bill Burke</a>.
* @author Scott.Stark at jboss.org
@@ -61,22 +69,19 @@
public class DistributedStateImpl
implements ManagedDistributedState
{
- public static final Serializable ROOT = "__DISTRIBUTED_STATE__";
- public static final Fqn ROOTFQN = Fqn.fromElements(ROOT);
private static final Logger LOG = Logger.getLogger(DistributedStateImpl.class);
+ private final ConcurrentHashMap<String, List<Object>> keyListeners = new ConcurrentHashMap<String, List<Object>>();
- private final Map<String, List<Object>> keyListeners = new HashMap<String, List<Object>>();
-
- private volatile TreeCache<Serializable, Serializable> cache;
+ private volatile Cache<Serializable, Serializable> cache;
private volatile boolean replAsync;
- private volatile HAPartitionCacheHandlerImpl cacheHandler;
+ private volatile InfinispanHAPartitionCacheHandler<Serializable, Serializable> cacheHandler;
- public HAPartitionCacheHandlerImpl getCacheHandler()
+ public InfinispanHAPartitionCacheHandler getCacheHandler()
{
return cacheHandler;
}
- public void setCacheHandler(HAPartitionCacheHandlerImpl cacheHandler)
+ public void setCacheHandler(InfinispanHAPartitionCacheHandler cacheHandler)
{
this.cacheHandler = cacheHandler;
}
@@ -90,18 +95,20 @@
public void startService() throws Exception
{
this.internalSetClusteredCache(cacheHandler.getCache());
- this.cache.getCache().addListener(this);
+ this.cache.addListener(this);
}
public void stopService() throws Exception
{
- this.cache.getCache().removeListener(this);
+ this.cache.removeListener(this);
}
public void destroyService() throws Exception
{
}
+/*
+TODO: jmx support, delete or bring mbean interface back.
public String listContent() throws Exception
{
StringBuilder result = new StringBuilder();
@@ -165,6 +172,7 @@
return result.toString();
}
+*/
// DistributedState implementation ----------------------------------------------
/*
@@ -201,10 +209,22 @@
// this.cache.put(this.buildFqn(category), key, value, Flag.FORCE_SYNCHRONOUS);
// }
//}
-
- this.cache.put(this.buildFqn(category), key, value);
+ cache.startBatch();
+ boolean commit = false;
+ try
+ {
+ cache.put(buildValueCollectionKey(category, key), value);
+ AtomicMap m = getKeysPerCategoryCollection(category);
+ m.put(key, null);
+ // cache.put(category, m); // TODO: need this?
+ commit = true;
+ }
+ finally {
+ cache.endBatch(commit);
+ }
}
+
/*
* (non-Javadoc)
*
@@ -241,7 +261,20 @@
// this.cache.getInvocationContext().getOptionOverrides().setForceSynchronous(true);
// }
// }
- this.cache.remove(this.buildFqn(category), key);
+ cache.startBatch();
+ boolean commit = false;
+ try
+ {
+ cache.remove(buildValueCollectionKey(category, key));
+ AtomicMap m = getKeysPerCategoryCollection(category);
+ m.remove(key);
+ // cache.put(category, m); // TODO: need this?
+ commit = true;
+ }
+ finally {
+ cache.endBatch(commit);
+ }
+
}
return retVal;
}
@@ -257,7 +290,7 @@
{
try
{
- return this.cache.get(this.buildFqn(category), key);
+ return this.cache.get(buildValueCollectionKey(category, key));
}
catch (CacheException ce)
{
@@ -270,13 +303,21 @@
{
try
{
- Node base = this.cache.getRoot().getChild(ROOTFQN);
- Collection<String> keys = (base == null ? null : base.getChildrenNames());
+ Set keys = cache.keySet();
if (keys != null && keys.size() > 0)
{
- keys = Collections.unmodifiableCollection(keys);
+ Collection<String> categories = new LinkedList<String>();
+ for(Iterator iter = keys.iterator(); iter.hasNext();)
+ {
+ Object next = iter.next();
+ if (next instanceof String) // only the keys that are Strings, are categories
+ {
+ categories.add((String)next);
+ }
+ }
+ return categories;
}
- return keys;
+ return null;
}
catch (CacheException ce)
{
@@ -295,9 +336,8 @@
{
try
{
- Node<Serializable, Serializable> node = this.getNode(category);
- if (node == null) return null;
- return node.getKeys();
+ AtomicMap m = getKeysPerCategoryCollection(category);
+ return m.keySet(); // the keyset are the DS keys
}
catch (CacheException ce)
{
@@ -316,14 +356,18 @@
{
try
{
- Node<Serializable, Serializable> categoryNode = this.getNode(category);
- if (categoryNode == null) return null;
- Set<Serializable> childNodes = categoryNode.getKeys();
- if (childNodes == null) return null;
- Map<Serializable, Serializable> entries = categoryNode.getData();
- if (entries == null) return null;
- Collection<Serializable> retVal = new HashSet<Serializable>(entries.values());
- return Collections.unmodifiableCollection(retVal);
+ Collection<Serializable> keys = getAllKeys(category);
+ if(keys != null)
+ {
+ List<Serializable> values = new ArrayList();
+ for (Serializable key: keys)
+ {
+ values.add(cache.get(buildValueCollectionKey(category, key)));
+ }
+ return values;
+ }
+ else
+ return null;
}
catch (CacheException ce)
{
@@ -373,25 +417,24 @@
protected void registerListener(String category, Object subscriber)
{
- synchronized (this.keyListeners)
+ List<Object> listeners = this.keyListeners.get(category);
+ if (listeners == null)
{
- List<Object> listeners = this.keyListeners.get(category);
- if (listeners == null)
+ listeners = new CopyOnWriteArrayList<Object>();
+ List<Object> other = this.keyListeners.putIfAbsent(category, listeners);
+ if (other != null )
{
- listeners = new ArrayList<Object>();
- this.keyListeners.put(category, listeners);
+ listeners = other; // use the listeners added by other thread
}
- listeners.add(subscriber);
}
+ listeners.add(subscriber);
}
protected void unregisterListener(String category, Object subscriber)
{
- synchronized (this.keyListeners)
+ List<Object> listeners = this.keyListeners.get(category);
+ if (listeners != null)
{
- List<Object> listeners = this.keyListeners.get(category);
- if (listeners == null) return;
-
listeners.remove(subscriber);
if (listeners.size() == 0)
{
@@ -402,50 +445,44 @@
protected void notifyKeyListeners(String category, Serializable key, Serializable value, boolean locallyModified)
{
- synchronized (this.keyListeners)
+ List<Object> listeners = this.keyListeners.get(category);
+ if (listeners == null) return;
+ String strKey = key.toString();
+
+ for (Object listener: listeners)
{
- List<Object> listeners = this.keyListeners.get(category);
- if (listeners == null) return;
- String strKey = key.toString();
-
- for (Object listener: listeners)
+ if (listener instanceof DSListener)
{
- if (listener instanceof DSListener)
- {
- DSListener dslistener = (DSListener) listener;
- dslistener.valueHasChanged(category, strKey, value, locallyModified);
- }
- else
- {
- DSListenerEx dslistener = (DSListenerEx) listener;
- dslistener.valueHasChanged(category, key, value, locallyModified);
- }
+ DSListener dslistener = (DSListener) listener;
+ dslistener.valueHasChanged(category, strKey, value, locallyModified);
}
+ else
+ {
+ DSListenerEx dslistener = (DSListenerEx) listener;
+ dslistener.valueHasChanged(category, key, value, locallyModified);
+ }
}
}
protected void notifyKeyListenersOfRemove(String category, Serializable key, Serializable oldContent,
boolean locallyModified)
{
- synchronized (this.keyListeners)
+ List<Object> listeners = this.keyListeners.get(category);
+ if (listeners == null) return;
+ String strKey = key.toString();
+
+ for (Object listener: listeners)
{
- List<Object> listeners = this.keyListeners.get(category);
- if (listeners == null) return;
- String strKey = key.toString();
-
- for (Object listener: listeners)
+ if (listener instanceof DSListener)
{
- if (listener instanceof DSListener)
- {
- DSListener dslistener = (DSListener) listener;
- dslistener.keyHasBeenRemoved(category, strKey, oldContent, locallyModified);
- }
- else
- {
- DSListenerEx dslistener = (DSListenerEx) listener;
- dslistener.keyHasBeenRemoved(category, key, oldContent, locallyModified);
- }
+ DSListener dslistener = (DSListener) listener;
+ dslistener.keyHasBeenRemoved(category, strKey, oldContent, locallyModified);
}
+ else
+ {
+ DSListenerEx dslistener = (DSListenerEx) listener;
+ dslistener.keyHasBeenRemoved(category, key, oldContent, locallyModified);
+ }
}
}
@@ -457,28 +494,8 @@
/** ExtendedTreeCacheListener methods */
// Private -------------------------------------------------------
- protected Fqn buildFqn(String category)
- {
- return Fqn.fromRelativeElements(ROOTFQN, category);
- }
- protected Fqn buildFqn(String category, Serializable key)
- {
- return Fqn.fromElements(ROOT, category, key);
- }
-
- protected Fqn buildFqn(String category, Serializable key, Serializable value)
- {
- return Fqn.fromElements(ROOT, category, key, value);
- }
-
- protected Node<Serializable, Serializable> getNode(String category) throws CacheException
- {
- return this.cache.getRoot().getChild(this.buildFqn(category));
- }
-
// @CacheListener -------------------------------------------------
- // The following is at the (raw) Cache level and not reflective of tree operation.
@CacheEntryModified
public void nodeModified(CacheEntryModifiedEvent event)
@@ -487,8 +504,13 @@
Object key = event.getKey();
Object value = event.getValue();
- String category = null;
- notifyKeyListeners(category, (Serializable)key, (Serializable)value, event.isOriginLocal());
+ if(key instanceof SimpleImmutableEntry)
+ {
+ SimpleImmutableEntry e = (SimpleImmutableEntry)key;
+ key = e.getValue();
+ String category = (String)e.getKey();
+ notifyKeyListeners(category, (Serializable)key, (Serializable)value, event.isOriginLocal());
+ }
}
@CacheEntryRemoved
@@ -498,8 +520,13 @@
Object key = event.getKey();
Object value = event.getValue();
- String category = null;
- notifyKeyListenersOfRemove(category, (Serializable)key, (Serializable)value, event.isOriginLocal());
+ if(key instanceof SimpleImmutableEntry)
+ {
+ SimpleImmutableEntry e = (SimpleImmutableEntry)key;
+ key = e.getValue();
+ String category = (String)e.getKey();
+ notifyKeyListenersOfRemove(category, (Serializable)key, (Serializable)value, event.isOriginLocal());
+ }
}
@@ -507,8 +534,7 @@
private void internalSetClusteredCache(Cache<Serializable, Serializable> cache)
{
- TreeCacheFactory tcf = new TreeCacheFactory();
- this.cache = tcf.createTreeCache(cache);
+ this.cache = cache;
if (this.cache != null)
{
@@ -528,4 +554,15 @@
}
}
+ private SimpleImmutableEntry<String, Serializable>
+ buildValueCollectionKey(String category, Serializable dskey)
+ {
+ return new SimpleImmutableEntry(category, dskey);
+ }
+
+ private AtomicMap getKeysPerCategoryCollection(String category)
+ {
+ return AtomicMapLookup.getAtomicMap(cache, category, true);
+
+ }
}
More information about the jboss-cvs-commits
mailing list