[jboss-cvs] JBossAS SVN: r77616 - trunk/cluster/src/main/org/jboss/ha/framework/server.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Aug 28 14:29:29 EDT 2008


Author: pferraro
Date: 2008-08-28 14:29:28 -0400 (Thu, 28 Aug 2008)
New Revision: 77616

Modified:
   trunk/cluster/src/main/org/jboss/ha/framework/server/DistributedStateImpl.java
Log:
[JBAS-5435] Ensure DistributedState can handle concurrent JGroups requests
Dropped obsolete HAPartition property.
Code cleanup.
Cache listener methods are already thread-safe.

Modified: trunk/cluster/src/main/org/jboss/ha/framework/server/DistributedStateImpl.java
===================================================================
--- trunk/cluster/src/main/org/jboss/ha/framework/server/DistributedStateImpl.java	2008-08-28 17:21:08 UTC (rev 77615)
+++ trunk/cluster/src/main/org/jboss/ha/framework/server/DistributedStateImpl.java	2008-08-28 18:29:28 UTC (rev 77616)
@@ -21,7 +21,6 @@
   */
 package org.jboss.ha.framework.server;
 
-
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -29,6 +28,7 @@
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -41,7 +41,6 @@
 import org.jboss.cache.notifications.annotation.NodeModified;
 import org.jboss.cache.notifications.event.NodeModifiedEvent;
 import org.jboss.cache.notifications.event.NodeModifiedEvent.ModificationType;
-import org.jboss.ha.framework.interfaces.HAPartition;
 import org.jboss.logging.Logger;
 
 /**
@@ -53,132 +52,116 @@
  * @version $Revision$
  */
 @CacheListener
-public class DistributedStateImpl
-   implements DistributedStateImplMBean
+public class DistributedStateImpl implements DistributedStateImplMBean
 {
    // Constants -----------------------------------------------------
 
-   protected final static String SERVICE_NAME = "DistributedState";
+   public static final String ROOT = "__DISTRIBUTED_STATE__";
+   public static final Fqn ROOTFQN = Fqn.fromElements(ROOT);
+   public static final int ROOTFQNSIZE = ROOTFQN.size();
+   
+   protected static final String SERVICE_NAME = "DistributedState";
 
-   protected final static Class[] set_types=new Class[]{String.class, Serializable.class, Serializable.class};
-   protected final static Class[] remove_types=new Class[]{String.class, Serializable.class};
-
    // Attributes ----------------------------------------------------
 
-   protected HashMap keyListeners = new HashMap ();
-   protected HAPartition partition;
-   protected Logger log;
+   protected Map<String, List<Object>> keyListeners = new HashMap<String, List<Object>>();
+   protected Logger log = Logger.getLogger(this.getClass());
    protected String name = null;
    protected Cache cache;
    protected boolean replAsync;
 
-   public static final String ROOT = "__DISTRIBUTED_STATE__";
-
-   public static final Fqn ROOTFQN = new Fqn(new Object[] { ROOT });
-
-   public static final int ROOTFQNSIZE = ROOTFQN.size();
-
-   // Static --------------------------------------------------------c
-
-   // Constructors --------------------------------------------------
-
-   public DistributedStateImpl () 
-   {
-      super();
-      this.log = Logger.getLogger (this.getClass ());
-   }
-
    // Public --------------------------------------------------------
 
-   protected void createService() throws Exception
+   protected void createService()
    {
    }
 
-   public void startService() throws Exception
+   public void startService()
    {
-      if (cache == null)
+      if (this.cache == null)
+      {
          throw new IllegalStateException("No clustered cache available");
+      }
 
       this.cache.addCacheListener(this);
    }
 
-   public void stopService() throws Exception
+   public void stopService()
    {
-      cache.removeCacheListener(this); 
+      this.cache.removeCacheListener(this);
    }
-   
+
    public void destroyService() throws Exception
-   {     
+   {
    }
 
-   public String listContent () throws Exception
+   public String listContent() throws Exception
    {
-      StringBuffer result = new StringBuffer ();
-      Collection cats = this.getAllCategories ();
-      if (cats == null)
-         return result.toString();
-      
-      Iterator catsIter = cats.iterator ();
-      while (catsIter.hasNext ())
+      StringBuilder result = new StringBuilder();
+      Collection cats = this.getAllCategories();
+      if (cats == null) return result.toString();
+
+      Iterator catsIter = cats.iterator();
+      while (catsIter.hasNext())
       {
-         String category = (String)catsIter.next ();
-         Iterator keysIter = this.getAllKeys(category).iterator ();
+         String category = (String) catsIter.next();
+         Iterator keysIter = this.getAllKeys(category).iterator();
 
-         result.append ("-----------------------------------------------\n");
-         result.append ("Logger : ").append (category).append ("\n\n");
-         result.append ("KEY\t:\tVALUE\n");
+         result.append("-----------------------------------------------\n");
+         result.append("Logger : ").append(category).append("\n\n");
+         result.append("KEY\t:\tVALUE\n");
 
-         while (keysIter.hasNext ())
+         while (keysIter.hasNext())
          {
-            Serializable key = (Serializable) keysIter.next ();
-            String value = this.get (category, key).toString ();
-            result.append ("'").append(key);
-            result.append ("'\t:\t'");
-            result.append (value);
+            Serializable key = (Serializable) keysIter.next();
+            String value = this.get(category, key).toString();
+            result.append("'").append(key);
+            result.append("'\t:\t'");
+            result.append(value);
             result.append("'\n");
          }
-         result.append ("\n");
+         result.append("\n");
       }
-      return result.toString ();
+      return result.toString();
    }
 
-   public String listXmlContent () throws Exception
+   public String listXmlContent() throws Exception
    {
-      StringBuffer result = new StringBuffer ();
-      result.append ("<DistributedState>\n");      
-      Collection cats = this.getAllCategories ();
+      StringBuilder result = new StringBuilder();
+      result.append("<DistributedState>\n");
+      Collection cats = this.getAllCategories();
       if (cats != null)
       {
-         Iterator catsIter = cats.iterator ();
-         while (catsIter.hasNext ())
+         Iterator catsIter = cats.iterator();
+         while (catsIter.hasNext())
          {
-            String category = (String)catsIter.next ();
-            Iterator keysIter = this.getAllKeys(category).iterator ();
+            String category = (String) catsIter.next();
+            Iterator keysIter = this.getAllKeys(category).iterator();
 
-            result.append ("\t<Logger>\n");
-            result.append ("\t\t<LoggerName>").append (category).append ("</LoggerName>\n");
+            result.append("\t<Logger>\n");
+            result.append("\t\t<LoggerName>").append(category).append("</LoggerName>\n");
 
-            while (keysIter.hasNext ())
+            while (keysIter.hasNext())
             {
-               Serializable key = (Serializable) keysIter.next ();
-               String value = this.get (category, key).toString ();
-               result.append ("\t\t<Entry>\n");
-               result.append ("\t\t\t<Key>").append (key).append ("</Key>\n");
-               result.append ("\t\t\t<Value>").append (value).append ("</Value>\n");
-               result.append ("\t\t</Entry>\n");
+               Serializable key = (Serializable) keysIter.next();
+               String value = this.get(category, key).toString();
+               result.append("\t\t<Entry>\n");
+               result.append("\t\t\t<Key>").append(key).append("</Key>\n");
+               result.append("\t\t\t<Value>").append(value).append("</Value>\n");
+               result.append("\t\t</Entry>\n");
             }
-            result.append ("\t</Logger>\n");
+            result.append("\t</Logger>\n");
          }
       }
 
-      result.append ("</DistributedState>\n");
+      result.append("</DistributedState>\n");
 
-      return result.toString ();
+      return result.toString();
    }
 
    public Cache getClusteredCache()
-   {     
-      return cache;
+   {
+      return this.cache;
    }
 
    /**
@@ -188,33 +171,38 @@
     * 
     * @throws IllegalStateException if the cache isn't configured for replication
     */
-   public void setClusteredCache(Cache cache)
+   public void setClusteredCache(Cache<Serializable, Serializable> cache)
    {
       this.cache = cache;
-      if (this.cache != null) 
+      if (this.cache != null)
       {
       	CacheMode cm = cache.getConfiguration().getCacheMode();
       	if (CacheMode.REPL_ASYNC == cm)
+      	{
       	   this.replAsync = true;
+      	}
       	else if (CacheMode.REPL_SYNC == cm)
-           this.replAsync = false;
+      	{
+      	   this.replAsync = false;
+      	}
       	else
+      	{
       	   throw new IllegalStateException("Cache must be configured for replication, not " + cm);
+      	}
       }
    }
 
    // DistributedState implementation ----------------------------------------------
 
    /*
-	* (non-Javadoc)
-	*
-	* @see org.jboss.ha.framework.interfaces.DistributedState#set(java.lang.String,
-	*      java.io.Serializable, java.io.Serializable)
-	*/
-   public void set(String category, Serializable key, Serializable value)
-         throws Exception
+   * (non-Javadoc)
+   *
+   * @see org.jboss.ha.framework.interfaces.DistributedState#set(java.lang.String,
+   *      java.io.Serializable, java.io.Serializable)
+   */
+   public void set(String category, Serializable key, Serializable value) throws Exception
    {
-      set(category, key, value, true);
+      this.set(category, key, value, true);
    }
 
    /*
@@ -225,17 +213,20 @@
     *      asynchronousCall is not supported yet. TreeCache cannot switch this
     *      on the fly. Will take value from TreeCache-config instead.
     */
-   public void set(String category, Serializable key, Serializable value,
-         boolean asynchronousCall) throws Exception
+   public void set(String category, Serializable key, Serializable value, boolean asynchronousCall) throws Exception
    {
-      if (replAsync != asynchronousCall)
+      if (this.replAsync != asynchronousCall)
       {
          if (asynchronousCall)
-            cache.getInvocationContext().getOptionOverrides().setForceAsynchronous(true);
+         {
+            this.cache.getInvocationContext().getOptionOverrides().setForceAsynchronous(true);
+         }
          else
-            cache.getInvocationContext().getOptionOverrides().setForceSynchronous(true);
+         {
+            this.cache.getInvocationContext().getOptionOverrides().setForceSynchronous(true);
+         }
       }
-      cache.put(buildFqn(category), key, value);      
+      this.cache.put(this.buildFqn(category), key, value);
    }
 
    /*
@@ -245,10 +236,9 @@
      *      java.io.Serializable) @return - returns null in case of
      *      CacheException
      */
-   public Serializable remove(String category, Serializable key)
-         throws Exception
+   public Serializable remove(String category, Serializable key) throws Exception
    {
-      return remove(category, key, true);
+      return this.remove(category, key, true);
    }
 
    /*
@@ -257,23 +247,26 @@
     * @see org.jboss.ha.framework.interfaces.DistributedState#remove(java.lang.String,
     *      java.io.Serializable, boolean)
     */
-   public Serializable remove(String category, Serializable key,
-         boolean asynchronousCall) throws Exception
+   public Serializable remove(String category, Serializable key, boolean asynchronousCall) throws Exception
    {
-      Serializable retVal = get(category, key);
-      if(retVal != null)
+      Serializable retVal = this.get(category, key);
+      if (retVal != null)
       {
-         if (replAsync != asynchronousCall)
+         if (this.replAsync != asynchronousCall)
          {
             if (asynchronousCall)
-               cache.getInvocationContext().getOptionOverrides().setForceAsynchronous(true);
+            {
+               this.cache.getInvocationContext().getOptionOverrides().setForceAsynchronous(true);
+            }
             else
-               cache.getInvocationContext().getOptionOverrides().setForceSynchronous(true);
+            {
+               this.cache.getInvocationContext().getOptionOverrides().setForceSynchronous(true);
+            }
          }
-         cache.remove(buildFqn(category), key);
+         this.cache.remove(this.buildFqn(category), key);
       }
-	  return retVal;
-	}
+      return retVal;
+   }
 
    /*
      * (non-Javadoc)
@@ -285,26 +278,27 @@
    {
       try
       {
-         return (Serializable) cache.get(buildFqn(category), key);
-      } catch (CacheException ce)
+         return (Serializable) this.cache.get(this.buildFqn(category), key);
+      }
+      catch (CacheException ce)
       {
          return null;
-	  }
+      }
    }
 
-   public Collection getAllCategories ()
+   public Collection getAllCategories()
    {
       try
       {
-         Node base = cache.getRoot().getChild(ROOTFQN);
+         Node base = this.cache.getRoot().getChild(ROOTFQN);
          Collection keys = (base == null ? null : base.getChildrenNames());
-         if(keys != null && keys.size() > 0)
+         if (keys != null && keys.size() > 0)
          {
             keys = Collections.unmodifiableCollection(keys);
          }
          return keys;
       }
-      catch(CacheException ce)
+      catch (CacheException ce)
       {
          return null;
       }
@@ -320,11 +314,11 @@
    {
       try
       {
-         Node node = getNode(category);
-         if (node==null)
-            return null;
+         Node<Serializable, Serializable> node = this.getNode(category);
+         if (node == null) return null;
          return node.getKeys();
-      } catch (CacheException ce)
+      }
+      catch (CacheException ce)
       {
          return null;
       }
@@ -340,193 +334,185 @@
    {
       try
       {
-         Node categoryNode = getNode(category);
-         if (categoryNode == null) {
-            return null;
-         }
+         Node categoryNode = this.getNode(category);
+         if (categoryNode == null) return null;
          Set childNodes = categoryNode.getKeys();
-         if (childNodes == null)
-         {
-            return null;
-         }
+         if (childNodes == null) return null;
          Map entries = categoryNode.getData();
-         if (entries == null)
-            return null;
+         if (entries == null) return null;
          Collection retVal = new HashSet(entries.values());
          return Collections.unmodifiableCollection(retVal);
-      } catch (CacheException ce)
+      }
+      catch (CacheException ce)
       {
          return null;
       }
    }
 
-
-   public void registerDSListenerEx (String category, DSListenerEx subscriber)
+   public void registerDSListenerEx(String category, DSListenerEx subscriber)
    {
-      registerListener(category, subscriber);
+      this.registerListener(category, subscriber);
    }
-   public void unregisterDSListenerEx (String category, DSListenerEx subscriber)
+
+   public void unregisterDSListenerEx(String category, DSListenerEx subscriber)
    {
-      unregisterListener(category, subscriber);
+      this.unregisterListener(category, subscriber);
    }
-   public void registerDSListener (String category, DSListener subscriber)
+
+   public void registerDSListener(String category, DSListener subscriber)
    {
-      registerListener(category, subscriber);
+      this.registerListener(category, subscriber);
    }
-   public void unregisterDSListener (String category, DSListener subscriber)
+
+   public void unregisterDSListener(String category, DSListener subscriber)
    {
-      unregisterListener(category, subscriber);
+      this.unregisterListener(category, subscriber);
    }
 
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
 
-   protected void registerListener (String category, Object subscriber)
+   protected void registerListener(String category, Object subscriber)
    {
-      synchronized(this.keyListeners)
+      synchronized (this.keyListeners)
       {
-         ArrayList listeners = (ArrayList)keyListeners.get (category);
+         List<Object> listeners = this.keyListeners.get(category);
          if (listeners == null)
          {
-            listeners = new ArrayList ();
-            keyListeners.put (category, listeners);
+            listeners = new ArrayList<Object>();
+            this.keyListeners.put(category, listeners);
          }
-         listeners.add (subscriber);
+         listeners.add(subscriber);
       }
    }
 
-   protected void unregisterListener (String category, Object subscriber)
+   protected void unregisterListener(String category, Object subscriber)
    {
-      synchronized(this.keyListeners)
+      synchronized (this.keyListeners)
       {
-         ArrayList listeners = (ArrayList)keyListeners.get (category);
+         List<Object> listeners = this.keyListeners.get(category);
          if (listeners == null) return;
 
-         listeners.remove (subscriber);
-         if (listeners.size () == 0)
+         listeners.remove(subscriber);
+         if (listeners.size() == 0)
          {
-            keyListeners.remove (category);
+            this.keyListeners.remove(category);
          }
       }
    }
 
-   protected void notifyKeyListeners (String category, Serializable key,
-                                      Serializable value, boolean locallyModified)
+   protected void notifyKeyListeners(String category, Serializable key, Serializable value, boolean locallyModified)
    {
-      synchronized(this.keyListeners)
+      synchronized (this.keyListeners)
       {
-         ArrayList listeners = (ArrayList)keyListeners.get (category);
-         if (listeners == null)
-            return;
+         List<Object> listeners = this.keyListeners.get(category);
+         if (listeners == null) return;
          String strKey = key.toString();
 
-         for (int i = 0; i < listeners.size (); i++)
+         for (Object listener: listeners)
          {
-            Object listener = listeners.get (i);
-            if( listener instanceof DSListener )
+            if (listener instanceof DSListener)
             {
                DSListener dslistener = (DSListener) listener;
-               dslistener.valueHasChanged (category, strKey, value, locallyModified);
+               dslistener.valueHasChanged(category, strKey, value, locallyModified);
             }
             else
             {
                DSListenerEx dslistener = (DSListenerEx) listener;
-               dslistener.valueHasChanged (category, key, value, locallyModified);
+               dslistener.valueHasChanged(category, key, value, locallyModified);
             }
          }
       }
    }
 
-   protected void notifyKeyListenersOfRemove (String category, Serializable key,
-                                              Serializable oldContent, boolean locallyModified)
+   protected void notifyKeyListenersOfRemove(String category, Serializable key, Serializable oldContent,
+         boolean locallyModified)
    {
-      synchronized(this.keyListeners)
+      synchronized (this.keyListeners)
       {
-         ArrayList listeners = (ArrayList)keyListeners.get (category);
-         if (listeners == null)
-            return;
+         List<Object> listeners = this.keyListeners.get(category);
+         if (listeners == null) return;
          String strKey = key.toString();
 
-         for (int i = 0; i < listeners.size (); i++)
+         for (Object listener: listeners)
          {
-            Object listener = listeners.get (i);
-            if( listener instanceof DSListener )
+            if (listener instanceof DSListener)
             {
                DSListener dslistener = (DSListener) listener;
-               dslistener.keyHasBeenRemoved (category, strKey, oldContent, locallyModified);
+               dslistener.keyHasBeenRemoved(category, strKey, oldContent, locallyModified);
             }
             else
             {
                DSListenerEx dslistener = (DSListenerEx) listener;
-               dslistener.keyHasBeenRemoved (category, key, oldContent, locallyModified);
+               dslistener.keyHasBeenRemoved(category, key, oldContent, locallyModified);
             }
          }
       }
    }
 
-   protected void cleanupKeyListeners ()
+   protected void cleanupKeyListeners()
    {
       // NOT IMPLEMENTED YET
    }
 
    /** ExtendedTreeCacheListener methods */
 
-
    // Private -------------------------------------------------------
    protected Fqn buildFqn(String category)
    {
-      return new Fqn(ROOTFQN, category);
+      return Fqn.fromRelativeElements(ROOTFQN, category);
    }
 
    protected Fqn buildFqn(String category, Serializable key)
    {
-      return new Fqn(new Object[] { ROOT, category, key });
+      return Fqn.fromElements(ROOT, category, key);
    }
 
    protected Fqn buildFqn(String category, Serializable key, Serializable value)
    {
-      return new Fqn(new Object[] { ROOT, category, key, value });
+      return Fqn.fromElements(ROOT, category, key, value);
    }
 
    protected Node getNode(String category) throws CacheException
    {
-      return cache.getRoot().getChild(buildFqn(category));
+      return this.cache.getRoot().getChild(this.buildFqn(category));
    }
 
    // @CacheListener  -------------------------------------------------
-   
+
    @NodeModified
    public void nodeModified(NodeModifiedEvent event)
    {
-      if (event.isPre())
-         return;
-      
+      if (event.isPre()) return;
+
       // we're only interested in put and remove data operations
       ModificationType modType = event.getModificationType();
-      if (!modType.equals(ModificationType.PUT_DATA) && !modType.equals(ModificationType.REMOVE_DATA))
-         return;
-      
+      if (!modType.equals(ModificationType.PUT_DATA) && !modType.equals(ModificationType.REMOVE_DATA)) return;
+
       // ignore changes for other roots in a shared cache
       Fqn fqn = event.getFqn();
-      if (!fqn.isChildOf(ROOTFQN))
-         return;
-      
+      if (!fqn.isChildOf(ROOTFQN)) return;
+
       Serializable key = null;
       Serializable value = null;
-      
+
       // there should be exactly one key/value pair in the map
       Map data = event.getData();
       if (data != null && !data.isEmpty())
       {
-         key = (Serializable)data.keySet().iterator().next();
-         value = (Serializable)data.get(key);
-      }        
-      
+         key = (Serializable) data.keySet().iterator().next();
+         value = (Serializable) data.get(key);
+      }
+
       if (modType.equals(ModificationType.PUT_DATA))
-            DistributedStateImpl.this.notifyKeyListeners((String)fqn.get(ROOTFQNSIZE), key, value, event.isOriginLocal());
+      {
+         DistributedStateImpl.this.notifyKeyListeners((String) fqn.get(ROOTFQNSIZE), key, value, event.isOriginLocal());
+      }
       else
-         DistributedStateImpl.this.notifyKeyListenersOfRemove((String)fqn.get(ROOTFQNSIZE), key, value, event.isOriginLocal());
+      {
+         DistributedStateImpl.this.notifyKeyListenersOfRemove((String) fqn.get(ROOTFQNSIZE), key, value, event.isOriginLocal());
+      }
    }
 
 }




More information about the jboss-cvs-commits mailing list