[jboss-cvs] JBossAS SVN: r107069 - projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/framework/server.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Jul 23 16:11:09 EDT 2010


Author: smarlow at redhat.com
Date: 2010-07-23 16:11:08 -0400 (Fri, 23 Jul 2010)
New Revision: 107069

Modified:
   projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/framework/server/DistributedStateImpl.java
Log:
JBCLUSTER-255

Modified: projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/framework/server/DistributedStateImpl.java
===================================================================
--- projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/framework/server/DistributedStateImpl.java	2010-07-23 18:45:08 UTC (rev 107068)
+++ projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/framework/server/DistributedStateImpl.java	2010-07-23 20:11:08 UTC (rev 107069)
@@ -24,33 +24,41 @@
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 import org.infinispan.Cache;
+import org.infinispan.atomic.AtomicMap;
+import org.infinispan.atomic.AtomicMapLookup;
 import org.infinispan.config.Configuration;
 import org.infinispan.notifications.cachelistener.annotation.CacheEntryModified;
 import org.infinispan.notifications.cachelistener.annotation.CacheEntryRemoved;
 import org.infinispan.notifications.cachelistener.event.CacheEntryModifiedEvent;
 import org.infinispan.notifications.cachelistener.event.CacheEntryRemovedEvent;
-import org.infinispan.tree.TreeCache;
 import org.infinispan.CacheException;
-
 import org.infinispan.notifications.Listener;
-import org.infinispan.tree.Fqn;
-import org.infinispan.tree.Node;
-import org.infinispan.tree.TreeCacheFactory;
+import org.infinispan.util.SimpleImmutableEntry;
 import org.jboss.ha.framework.server.spi.ManagedDistributedState;
 import org.jboss.logging.Logger;
 
 /**
- *   This class manages distributed state across the cluster.
+ * This class manages distributed state across the cluster.
  *
+ * The Cache layout will have two formats.  The value collection will combine the category
+ * + DS key for the composite key.  The DS value will be the value.  It will look like:
+ * Cache<SimpleImmutableEntry<String Category, Serializable DS_Key>, Serializable DS_Value>
+ *
+ * The other Cache format will be the DS keys per category.  It will look like:
+ * Cache<String, AtomicMap<Serializable, null>>
+ *
+ * If it ever becomes important to eliminate the "keys per category" (e.g. to save space),
+ * we would have to implement the getAllCategories() + getAllKeys() sequentially.
+ *
  * @author  <a href="mailto:sacha.labourey at cogito-info.ch">Sacha Labourey</a>.
  * @author  <a href="mailto:bill at burkecentral.com">Bill Burke</a>.
  * @author  Scott.Stark at jboss.org
@@ -61,22 +69,19 @@
 public class DistributedStateImpl 
    implements ManagedDistributedState
 {
-   public static final Serializable ROOT = "__DISTRIBUTED_STATE__";
-   public static final Fqn  ROOTFQN = Fqn.fromElements(ROOT);
    private static final Logger LOG = Logger.getLogger(DistributedStateImpl.class);
+   private final ConcurrentHashMap<String, List<Object>> keyListeners = new ConcurrentHashMap<String, List<Object>>();
 
-   private final Map<String, List<Object>> keyListeners = new HashMap<String, List<Object>>();
-
-   private volatile TreeCache<Serializable, Serializable> cache;
+   private volatile Cache<Serializable, Serializable> cache;
    private volatile boolean replAsync;
-   private volatile HAPartitionCacheHandlerImpl cacheHandler;
+   private volatile InfinispanHAPartitionCacheHandler<Serializable, Serializable> cacheHandler;
 
-   public HAPartitionCacheHandlerImpl getCacheHandler()
+   public InfinispanHAPartitionCacheHandler getCacheHandler()
    {
       return cacheHandler;
    }
 
-   public void setCacheHandler(HAPartitionCacheHandlerImpl cacheHandler)
+   public void setCacheHandler(InfinispanHAPartitionCacheHandler cacheHandler)
    {
       this.cacheHandler = cacheHandler;
    }
@@ -90,18 +95,20 @@
    public void startService() throws Exception
    {
       this.internalSetClusteredCache(cacheHandler.getCache());
-      this.cache.getCache().addListener(this);
+      this.cache.addListener(this);
    }
 
    public void stopService() throws Exception
    {      
-      this.cache.getCache().removeListener(this);
+      this.cache.removeListener(this);
    }
 
    public void destroyService() throws Exception
    {
    }
 
+/*
+TODO:  jmx support, delete or bring mbean interface back.
    public String listContent() throws Exception
    {
       StringBuilder result = new StringBuilder();
@@ -165,6 +172,7 @@
 
       return result.toString();
    }
+*/
 
    // DistributedState implementation ----------------------------------------------
    /*
@@ -201,10 +209,22 @@
       //      this.cache.put(this.buildFqn(category), key, value, Flag.FORCE_SYNCHRONOUS);
       //   }
       //}
-
-      this.cache.put(this.buildFqn(category), key, value);
+      cache.startBatch();
+      boolean commit = false;
+      try
+      {
+         cache.put(buildValueCollectionKey(category, key), value);
+         AtomicMap m = getKeysPerCategoryCollection(category);
+         m.put(key, null);
+         // cache.put(category, m);  // TODO:  need this?
+         commit = true;
+      }
+      finally {
+         cache.endBatch(commit);
+      }
    }
 
+
    /*
      * (non-Javadoc)
      *
@@ -241,7 +261,20 @@
       //         this.cache.getInvocationContext().getOptionOverrides().setForceSynchronous(true);
       //      }
       //   }
-         this.cache.remove(this.buildFqn(category), key);
+         cache.startBatch();
+         boolean commit = false;
+         try
+         {
+            cache.remove(buildValueCollectionKey(category, key));
+            AtomicMap m = getKeysPerCategoryCollection(category);
+            m.remove(key);
+            // cache.put(category, m);  // TODO:  need this?
+            commit = true;
+         }
+         finally {
+            cache.endBatch(commit);
+         }
+
       }
       return retVal;
    }
@@ -257,7 +290,7 @@
    {
       try
       {
-         return this.cache.get(this.buildFqn(category), key);
+         return this.cache.get(buildValueCollectionKey(category, key));
       }
       catch (CacheException ce)
       {
@@ -270,13 +303,21 @@
    {
       try
       {
-         Node base = this.cache.getRoot().getChild(ROOTFQN);
-         Collection<String> keys = (base == null ? null : base.getChildrenNames());
+         Set keys = cache.keySet();
          if (keys != null && keys.size() > 0)
          {
-            keys = Collections.unmodifiableCollection(keys);
+            Collection<String> categories = new LinkedList<String>();
+            for(Iterator iter = keys.iterator(); iter.hasNext();)
+            {
+               Object next = iter.next();
+               if (next instanceof String)  // only the keys that are Strings, are categories
+               {
+                  categories.add((String)next);
+               }
+            }
+            return categories;
          }
-         return keys;
+         return null;
       }
       catch (CacheException ce)
       {
@@ -295,9 +336,8 @@
    {
       try
       {
-         Node<Serializable, Serializable> node = this.getNode(category);
-         if (node == null) return null;
-         return node.getKeys();
+         AtomicMap m = getKeysPerCategoryCollection(category);
+         return m.keySet();  // the keyset are the DS keys
       }
       catch (CacheException ce)
       {
@@ -316,14 +356,18 @@
    {
       try
       {
-         Node<Serializable, Serializable> categoryNode = this.getNode(category);
-         if (categoryNode == null) return null;
-         Set<Serializable> childNodes = categoryNode.getKeys();
-         if (childNodes == null) return null;
-         Map<Serializable, Serializable> entries = categoryNode.getData();
-         if (entries == null) return null;
-         Collection<Serializable> retVal = new HashSet<Serializable>(entries.values());
-         return Collections.unmodifiableCollection(retVal);
+         Collection<Serializable> keys = getAllKeys(category);
+         if(keys != null)
+         {
+            List<Serializable> values = new ArrayList();
+            for (Serializable key: keys)
+            {
+               values.add(cache.get(buildValueCollectionKey(category, key)));
+            }
+            return values;
+         }
+         else
+            return null;
       }
       catch (CacheException ce)
       {
@@ -373,25 +417,24 @@
 
    protected void registerListener(String category, Object subscriber)
    {
-      synchronized (this.keyListeners)
+      List<Object> listeners = this.keyListeners.get(category);
+      if (listeners == null)
       {
-         List<Object> listeners = this.keyListeners.get(category);
-         if (listeners == null)
+         listeners = new CopyOnWriteArrayList<Object>();
+         List<Object> other = this.keyListeners.putIfAbsent(category, listeners);
+         if (other != null )
          {
-            listeners = new ArrayList<Object>();
-            this.keyListeners.put(category, listeners);
+            listeners = other;   // use the listeners added by other thread
          }
-         listeners.add(subscriber);
       }
+      listeners.add(subscriber);
    }
 
    protected void unregisterListener(String category, Object subscriber)
    {
-      synchronized (this.keyListeners)
+      List<Object> listeners = this.keyListeners.get(category);
+      if (listeners != null)
       {
-         List<Object> listeners = this.keyListeners.get(category);
-         if (listeners == null) return;
-
          listeners.remove(subscriber);
          if (listeners.size() == 0)
          {
@@ -402,50 +445,44 @@
 
    protected void notifyKeyListeners(String category, Serializable key, Serializable value, boolean locallyModified)
    {
-      synchronized (this.keyListeners)
+      List<Object> listeners = this.keyListeners.get(category);
+      if (listeners == null) return;
+      String strKey = key.toString();
+
+      for (Object listener: listeners)
       {
-         List<Object> listeners = this.keyListeners.get(category);
-         if (listeners == null) return;
-         String strKey = key.toString();
-
-         for (Object listener: listeners)
+         if (listener instanceof DSListener)
          {
-            if (listener instanceof DSListener)
-            {
-               DSListener dslistener = (DSListener) listener;
-               dslistener.valueHasChanged(category, strKey, value, locallyModified);
-            }
-            else
-            {
-               DSListenerEx dslistener = (DSListenerEx) listener;
-               dslistener.valueHasChanged(category, key, value, locallyModified);
-            }
+            DSListener dslistener = (DSListener) listener;
+            dslistener.valueHasChanged(category, strKey, value, locallyModified);
          }
+         else
+         {
+            DSListenerEx dslistener = (DSListenerEx) listener;
+            dslistener.valueHasChanged(category, key, value, locallyModified);
+         }
       }
    }
 
    protected void notifyKeyListenersOfRemove(String category, Serializable key, Serializable oldContent,
          boolean locallyModified)
    {
-      synchronized (this.keyListeners)
+      List<Object> listeners = this.keyListeners.get(category);
+      if (listeners == null) return;
+      String strKey = key.toString();
+
+      for (Object listener: listeners)
       {
-         List<Object> listeners = this.keyListeners.get(category);
-         if (listeners == null) return;
-         String strKey = key.toString();
-
-         for (Object listener: listeners)
+         if (listener instanceof DSListener)
          {
-            if (listener instanceof DSListener)
-            {
-               DSListener dslistener = (DSListener) listener;
-               dslistener.keyHasBeenRemoved(category, strKey, oldContent, locallyModified);
-            }
-            else
-            {
-               DSListenerEx dslistener = (DSListenerEx) listener;
-               dslistener.keyHasBeenRemoved(category, key, oldContent, locallyModified);
-            }
+            DSListener dslistener = (DSListener) listener;
+            dslistener.keyHasBeenRemoved(category, strKey, oldContent, locallyModified);
          }
+         else
+         {
+            DSListenerEx dslistener = (DSListenerEx) listener;
+            dslistener.keyHasBeenRemoved(category, key, oldContent, locallyModified);
+         }
       }
    }
 
@@ -457,28 +494,8 @@
    /** ExtendedTreeCacheListener methods */
 
    // Private -------------------------------------------------------
-   protected Fqn buildFqn(String category)
-   {
-      return Fqn.fromRelativeElements(ROOTFQN, category);
-   }
 
-   protected Fqn buildFqn(String category, Serializable key)
-   {
-      return Fqn.fromElements(ROOT, category, key);
-   }
-
-   protected Fqn buildFqn(String category, Serializable key, Serializable value)
-   {
-      return Fqn.fromElements(ROOT, category, key, value);
-   }
-
-   protected Node<Serializable, Serializable> getNode(String category) throws CacheException
-   {
-      return this.cache.getRoot().getChild(this.buildFqn(category));
-   }
-
    // @CacheListener  -------------------------------------------------
-   // The following is at the (raw) Cache level and not reflective of tree operation.
 
    @CacheEntryModified
    public void nodeModified(CacheEntryModifiedEvent event)
@@ -487,8 +504,13 @@
 
       Object key = event.getKey();
       Object value = event.getValue();
-      String category = null;
-      notifyKeyListeners(category, (Serializable)key, (Serializable)value, event.isOriginLocal());
+      if(key instanceof SimpleImmutableEntry)
+      {
+         SimpleImmutableEntry e = (SimpleImmutableEntry)key;
+         key = e.getValue();
+         String category = (String)e.getKey();
+         notifyKeyListeners(category, (Serializable)key, (Serializable)value, event.isOriginLocal());
+      }
    }
 
    @CacheEntryRemoved
@@ -498,8 +520,13 @@
 
       Object key = event.getKey();
       Object value = event.getValue();
-      String category = null;
-      notifyKeyListenersOfRemove(category, (Serializable)key, (Serializable)value, event.isOriginLocal());
+      if(key instanceof SimpleImmutableEntry)
+      {
+         SimpleImmutableEntry e = (SimpleImmutableEntry)key;
+         key = e.getValue();
+         String category = (String)e.getKey();
+         notifyKeyListenersOfRemove(category, (Serializable)key, (Serializable)value, event.isOriginLocal());
+      }
    }
 
    
@@ -507,8 +534,7 @@
    
    private void internalSetClusteredCache(Cache<Serializable, Serializable> cache)
    {
-      TreeCacheFactory tcf = new TreeCacheFactory();
-      this.cache = tcf.createTreeCache(cache);
+      this.cache = cache;
 
       if (this.cache != null)
       {        
@@ -528,4 +554,15 @@
       }
    }
 
+   private SimpleImmutableEntry<String, Serializable>
+   buildValueCollectionKey(String category, Serializable dskey)
+   {
+      return new SimpleImmutableEntry(category, dskey);
+   }
+
+   private AtomicMap getKeysPerCategoryCollection(String category)
+   {
+      return AtomicMapLookup.getAtomicMap(cache, category, true);
+
+   }
 }



More information about the jboss-cvs-commits mailing list