[jboss-cvs] JBossAS SVN: r77616 - trunk/cluster/src/main/org/jboss/ha/framework/server.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Aug 28 14:29:29 EDT 2008
Author: pferraro
Date: 2008-08-28 14:29:28 -0400 (Thu, 28 Aug 2008)
New Revision: 77616
Modified:
trunk/cluster/src/main/org/jboss/ha/framework/server/DistributedStateImpl.java
Log:
[JBAS-5435] Ensure DistributedState can handle concurrent JGroups requests
Dropped obsolete HAPartition property.
Code cleanup.
Cache listener methods are already thread-safe.
Modified: trunk/cluster/src/main/org/jboss/ha/framework/server/DistributedStateImpl.java
===================================================================
--- trunk/cluster/src/main/org/jboss/ha/framework/server/DistributedStateImpl.java 2008-08-28 17:21:08 UTC (rev 77615)
+++ trunk/cluster/src/main/org/jboss/ha/framework/server/DistributedStateImpl.java 2008-08-28 18:29:28 UTC (rev 77616)
@@ -21,7 +21,6 @@
*/
package org.jboss.ha.framework.server;
-
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
@@ -29,6 +28,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -41,7 +41,6 @@
import org.jboss.cache.notifications.annotation.NodeModified;
import org.jboss.cache.notifications.event.NodeModifiedEvent;
import org.jboss.cache.notifications.event.NodeModifiedEvent.ModificationType;
-import org.jboss.ha.framework.interfaces.HAPartition;
import org.jboss.logging.Logger;
/**
@@ -53,132 +52,116 @@
* @version $Revision$
*/
@CacheListener
-public class DistributedStateImpl
- implements DistributedStateImplMBean
+public class DistributedStateImpl implements DistributedStateImplMBean
{
// Constants -----------------------------------------------------
- protected final static String SERVICE_NAME = "DistributedState";
+ public static final String ROOT = "__DISTRIBUTED_STATE__";
+ public static final Fqn ROOTFQN = Fqn.fromElements(ROOT);
+ public static final int ROOTFQNSIZE = ROOTFQN.size();
+
+ protected static final String SERVICE_NAME = "DistributedState";
- protected final static Class[] set_types=new Class[]{String.class, Serializable.class, Serializable.class};
- protected final static Class[] remove_types=new Class[]{String.class, Serializable.class};
-
// Attributes ----------------------------------------------------
- protected HashMap keyListeners = new HashMap ();
- protected HAPartition partition;
- protected Logger log;
+ protected Map<String, List<Object>> keyListeners = new HashMap<String, List<Object>>();
+ protected Logger log = Logger.getLogger(this.getClass());
protected String name = null;
protected Cache cache;
protected boolean replAsync;
- public static final String ROOT = "__DISTRIBUTED_STATE__";
-
- public static final Fqn ROOTFQN = new Fqn(new Object[] { ROOT });
-
- public static final int ROOTFQNSIZE = ROOTFQN.size();
-
- // Static --------------------------------------------------------c
-
- // Constructors --------------------------------------------------
-
- public DistributedStateImpl ()
- {
- super();
- this.log = Logger.getLogger (this.getClass ());
- }
-
// Public --------------------------------------------------------
- protected void createService() throws Exception
+ protected void createService()
{
}
- public void startService() throws Exception
+ public void startService()
{
- if (cache == null)
+ if (this.cache == null)
+ {
throw new IllegalStateException("No clustered cache available");
+ }
this.cache.addCacheListener(this);
}
- public void stopService() throws Exception
+ public void stopService()
{
- cache.removeCacheListener(this);
+ this.cache.removeCacheListener(this);
}
-
+
public void destroyService() throws Exception
- {
+ {
}
- public String listContent () throws Exception
+ public String listContent() throws Exception
{
- StringBuffer result = new StringBuffer ();
- Collection cats = this.getAllCategories ();
- if (cats == null)
- return result.toString();
-
- Iterator catsIter = cats.iterator ();
- while (catsIter.hasNext ())
+ StringBuilder result = new StringBuilder();
+ Collection cats = this.getAllCategories();
+ if (cats == null) return result.toString();
+
+ Iterator catsIter = cats.iterator();
+ while (catsIter.hasNext())
{
- String category = (String)catsIter.next ();
- Iterator keysIter = this.getAllKeys(category).iterator ();
+ String category = (String) catsIter.next();
+ Iterator keysIter = this.getAllKeys(category).iterator();
- result.append ("-----------------------------------------------\n");
- result.append ("Logger : ").append (category).append ("\n\n");
- result.append ("KEY\t:\tVALUE\n");
+ result.append("-----------------------------------------------\n");
+ result.append("Logger : ").append(category).append("\n\n");
+ result.append("KEY\t:\tVALUE\n");
- while (keysIter.hasNext ())
+ while (keysIter.hasNext())
{
- Serializable key = (Serializable) keysIter.next ();
- String value = this.get (category, key).toString ();
- result.append ("'").append(key);
- result.append ("'\t:\t'");
- result.append (value);
+ Serializable key = (Serializable) keysIter.next();
+ String value = this.get(category, key).toString();
+ result.append("'").append(key);
+ result.append("'\t:\t'");
+ result.append(value);
result.append("'\n");
}
- result.append ("\n");
+ result.append("\n");
}
- return result.toString ();
+ return result.toString();
}
- public String listXmlContent () throws Exception
+ public String listXmlContent() throws Exception
{
- StringBuffer result = new StringBuffer ();
- result.append ("<DistributedState>\n");
- Collection cats = this.getAllCategories ();
+ StringBuilder result = new StringBuilder();
+ result.append("<DistributedState>\n");
+ Collection cats = this.getAllCategories();
if (cats != null)
{
- Iterator catsIter = cats.iterator ();
- while (catsIter.hasNext ())
+ Iterator catsIter = cats.iterator();
+ while (catsIter.hasNext())
{
- String category = (String)catsIter.next ();
- Iterator keysIter = this.getAllKeys(category).iterator ();
+ String category = (String) catsIter.next();
+ Iterator keysIter = this.getAllKeys(category).iterator();
- result.append ("\t<Logger>\n");
- result.append ("\t\t<LoggerName>").append (category).append ("</LoggerName>\n");
+ result.append("\t<Logger>\n");
+ result.append("\t\t<LoggerName>").append(category).append("</LoggerName>\n");
- while (keysIter.hasNext ())
+ while (keysIter.hasNext())
{
- Serializable key = (Serializable) keysIter.next ();
- String value = this.get (category, key).toString ();
- result.append ("\t\t<Entry>\n");
- result.append ("\t\t\t<Key>").append (key).append ("</Key>\n");
- result.append ("\t\t\t<Value>").append (value).append ("</Value>\n");
- result.append ("\t\t</Entry>\n");
+ Serializable key = (Serializable) keysIter.next();
+ String value = this.get(category, key).toString();
+ result.append("\t\t<Entry>\n");
+ result.append("\t\t\t<Key>").append(key).append("</Key>\n");
+ result.append("\t\t\t<Value>").append(value).append("</Value>\n");
+ result.append("\t\t</Entry>\n");
}
- result.append ("\t</Logger>\n");
+ result.append("\t</Logger>\n");
}
}
- result.append ("</DistributedState>\n");
+ result.append("</DistributedState>\n");
- return result.toString ();
+ return result.toString();
}
public Cache getClusteredCache()
- {
- return cache;
+ {
+ return this.cache;
}
/**
@@ -188,33 +171,38 @@
*
* @throws IllegalStateException if the cache isn't configured for replication
*/
- public void setClusteredCache(Cache cache)
+ public void setClusteredCache(Cache<Serializable, Serializable> cache)
{
this.cache = cache;
- if (this.cache != null)
+ if (this.cache != null)
{
CacheMode cm = cache.getConfiguration().getCacheMode();
if (CacheMode.REPL_ASYNC == cm)
+ {
this.replAsync = true;
+ }
else if (CacheMode.REPL_SYNC == cm)
- this.replAsync = false;
+ {
+ this.replAsync = false;
+ }
else
+ {
throw new IllegalStateException("Cache must be configured for replication, not " + cm);
+ }
}
}
// DistributedState implementation ----------------------------------------------
/*
- * (non-Javadoc)
- *
- * @see org.jboss.ha.framework.interfaces.DistributedState#set(java.lang.String,
- * java.io.Serializable, java.io.Serializable)
- */
- public void set(String category, Serializable key, Serializable value)
- throws Exception
+ * (non-Javadoc)
+ *
+ * @see org.jboss.ha.framework.interfaces.DistributedState#set(java.lang.String,
+ * java.io.Serializable, java.io.Serializable)
+ */
+ public void set(String category, Serializable key, Serializable value) throws Exception
{
- set(category, key, value, true);
+ this.set(category, key, value, true);
}
/*
@@ -225,17 +213,20 @@
* asynchronousCall is not supported yet. TreeCache cannot switch this
* on the fly. Will take value from TreeCache-config instead.
*/
- public void set(String category, Serializable key, Serializable value,
- boolean asynchronousCall) throws Exception
+ public void set(String category, Serializable key, Serializable value, boolean asynchronousCall) throws Exception
{
- if (replAsync != asynchronousCall)
+ if (this.replAsync != asynchronousCall)
{
if (asynchronousCall)
- cache.getInvocationContext().getOptionOverrides().setForceAsynchronous(true);
+ {
+ this.cache.getInvocationContext().getOptionOverrides().setForceAsynchronous(true);
+ }
else
- cache.getInvocationContext().getOptionOverrides().setForceSynchronous(true);
+ {
+ this.cache.getInvocationContext().getOptionOverrides().setForceSynchronous(true);
+ }
}
- cache.put(buildFqn(category), key, value);
+ this.cache.put(this.buildFqn(category), key, value);
}
/*
@@ -245,10 +236,9 @@
* java.io.Serializable) @return - returns null in case of
* CacheException
*/
- public Serializable remove(String category, Serializable key)
- throws Exception
+ public Serializable remove(String category, Serializable key) throws Exception
{
- return remove(category, key, true);
+ return this.remove(category, key, true);
}
/*
@@ -257,23 +247,26 @@
* @see org.jboss.ha.framework.interfaces.DistributedState#remove(java.lang.String,
* java.io.Serializable, boolean)
*/
- public Serializable remove(String category, Serializable key,
- boolean asynchronousCall) throws Exception
+ public Serializable remove(String category, Serializable key, boolean asynchronousCall) throws Exception
{
- Serializable retVal = get(category, key);
- if(retVal != null)
+ Serializable retVal = this.get(category, key);
+ if (retVal != null)
{
- if (replAsync != asynchronousCall)
+ if (this.replAsync != asynchronousCall)
{
if (asynchronousCall)
- cache.getInvocationContext().getOptionOverrides().setForceAsynchronous(true);
+ {
+ this.cache.getInvocationContext().getOptionOverrides().setForceAsynchronous(true);
+ }
else
- cache.getInvocationContext().getOptionOverrides().setForceSynchronous(true);
+ {
+ this.cache.getInvocationContext().getOptionOverrides().setForceSynchronous(true);
+ }
}
- cache.remove(buildFqn(category), key);
+ this.cache.remove(this.buildFqn(category), key);
}
- return retVal;
- }
+ return retVal;
+ }
/*
* (non-Javadoc)
@@ -285,26 +278,27 @@
{
try
{
- return (Serializable) cache.get(buildFqn(category), key);
- } catch (CacheException ce)
+ return (Serializable) this.cache.get(this.buildFqn(category), key);
+ }
+ catch (CacheException ce)
{
return null;
- }
+ }
}
- public Collection getAllCategories ()
+ public Collection getAllCategories()
{
try
{
- Node base = cache.getRoot().getChild(ROOTFQN);
+ Node base = this.cache.getRoot().getChild(ROOTFQN);
Collection keys = (base == null ? null : base.getChildrenNames());
- if(keys != null && keys.size() > 0)
+ if (keys != null && keys.size() > 0)
{
keys = Collections.unmodifiableCollection(keys);
}
return keys;
}
- catch(CacheException ce)
+ catch (CacheException ce)
{
return null;
}
@@ -320,11 +314,11 @@
{
try
{
- Node node = getNode(category);
- if (node==null)
- return null;
+ Node<Serializable, Serializable> node = this.getNode(category);
+ if (node == null) return null;
return node.getKeys();
- } catch (CacheException ce)
+ }
+ catch (CacheException ce)
{
return null;
}
@@ -340,193 +334,185 @@
{
try
{
- Node categoryNode = getNode(category);
- if (categoryNode == null) {
- return null;
- }
+ Node categoryNode = this.getNode(category);
+ if (categoryNode == null) return null;
Set childNodes = categoryNode.getKeys();
- if (childNodes == null)
- {
- return null;
- }
+ if (childNodes == null) return null;
Map entries = categoryNode.getData();
- if (entries == null)
- return null;
+ if (entries == null) return null;
Collection retVal = new HashSet(entries.values());
return Collections.unmodifiableCollection(retVal);
- } catch (CacheException ce)
+ }
+ catch (CacheException ce)
{
return null;
}
}
-
- public void registerDSListenerEx (String category, DSListenerEx subscriber)
+ public void registerDSListenerEx(String category, DSListenerEx subscriber)
{
- registerListener(category, subscriber);
+ this.registerListener(category, subscriber);
}
- public void unregisterDSListenerEx (String category, DSListenerEx subscriber)
+
+ public void unregisterDSListenerEx(String category, DSListenerEx subscriber)
{
- unregisterListener(category, subscriber);
+ this.unregisterListener(category, subscriber);
}
- public void registerDSListener (String category, DSListener subscriber)
+
+ public void registerDSListener(String category, DSListener subscriber)
{
- registerListener(category, subscriber);
+ this.registerListener(category, subscriber);
}
- public void unregisterDSListener (String category, DSListener subscriber)
+
+ public void unregisterDSListener(String category, DSListener subscriber)
{
- unregisterListener(category, subscriber);
+ this.unregisterListener(category, subscriber);
}
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
- protected void registerListener (String category, Object subscriber)
+ protected void registerListener(String category, Object subscriber)
{
- synchronized(this.keyListeners)
+ synchronized (this.keyListeners)
{
- ArrayList listeners = (ArrayList)keyListeners.get (category);
+ List<Object> listeners = this.keyListeners.get(category);
if (listeners == null)
{
- listeners = new ArrayList ();
- keyListeners.put (category, listeners);
+ listeners = new ArrayList<Object>();
+ this.keyListeners.put(category, listeners);
}
- listeners.add (subscriber);
+ listeners.add(subscriber);
}
}
- protected void unregisterListener (String category, Object subscriber)
+ protected void unregisterListener(String category, Object subscriber)
{
- synchronized(this.keyListeners)
+ synchronized (this.keyListeners)
{
- ArrayList listeners = (ArrayList)keyListeners.get (category);
+ List<Object> listeners = this.keyListeners.get(category);
if (listeners == null) return;
- listeners.remove (subscriber);
- if (listeners.size () == 0)
+ listeners.remove(subscriber);
+ if (listeners.size() == 0)
{
- keyListeners.remove (category);
+ this.keyListeners.remove(category);
}
}
}
- protected void notifyKeyListeners (String category, Serializable key,
- Serializable value, boolean locallyModified)
+ protected void notifyKeyListeners(String category, Serializable key, Serializable value, boolean locallyModified)
{
- synchronized(this.keyListeners)
+ synchronized (this.keyListeners)
{
- ArrayList listeners = (ArrayList)keyListeners.get (category);
- if (listeners == null)
- return;
+ List<Object> listeners = this.keyListeners.get(category);
+ if (listeners == null) return;
String strKey = key.toString();
- for (int i = 0; i < listeners.size (); i++)
+ for (Object listener: listeners)
{
- Object listener = listeners.get (i);
- if( listener instanceof DSListener )
+ if (listener instanceof DSListener)
{
DSListener dslistener = (DSListener) listener;
- dslistener.valueHasChanged (category, strKey, value, locallyModified);
+ dslistener.valueHasChanged(category, strKey, value, locallyModified);
}
else
{
DSListenerEx dslistener = (DSListenerEx) listener;
- dslistener.valueHasChanged (category, key, value, locallyModified);
+ dslistener.valueHasChanged(category, key, value, locallyModified);
}
}
}
}
- protected void notifyKeyListenersOfRemove (String category, Serializable key,
- Serializable oldContent, boolean locallyModified)
+ protected void notifyKeyListenersOfRemove(String category, Serializable key, Serializable oldContent,
+ boolean locallyModified)
{
- synchronized(this.keyListeners)
+ synchronized (this.keyListeners)
{
- ArrayList listeners = (ArrayList)keyListeners.get (category);
- if (listeners == null)
- return;
+ List<Object> listeners = this.keyListeners.get(category);
+ if (listeners == null) return;
String strKey = key.toString();
- for (int i = 0; i < listeners.size (); i++)
+ for (Object listener: listeners)
{
- Object listener = listeners.get (i);
- if( listener instanceof DSListener )
+ if (listener instanceof DSListener)
{
DSListener dslistener = (DSListener) listener;
- dslistener.keyHasBeenRemoved (category, strKey, oldContent, locallyModified);
+ dslistener.keyHasBeenRemoved(category, strKey, oldContent, locallyModified);
}
else
{
DSListenerEx dslistener = (DSListenerEx) listener;
- dslistener.keyHasBeenRemoved (category, key, oldContent, locallyModified);
+ dslistener.keyHasBeenRemoved(category, key, oldContent, locallyModified);
}
}
}
}
- protected void cleanupKeyListeners ()
+ protected void cleanupKeyListeners()
{
// NOT IMPLEMENTED YET
}
/** ExtendedTreeCacheListener methods */
-
// Private -------------------------------------------------------
protected Fqn buildFqn(String category)
{
- return new Fqn(ROOTFQN, category);
+ return Fqn.fromRelativeElements(ROOTFQN, category);
}
protected Fqn buildFqn(String category, Serializable key)
{
- return new Fqn(new Object[] { ROOT, category, key });
+ return Fqn.fromElements(ROOT, category, key);
}
protected Fqn buildFqn(String category, Serializable key, Serializable value)
{
- return new Fqn(new Object[] { ROOT, category, key, value });
+ return Fqn.fromElements(ROOT, category, key, value);
}
protected Node getNode(String category) throws CacheException
{
- return cache.getRoot().getChild(buildFqn(category));
+ return this.cache.getRoot().getChild(this.buildFqn(category));
}
// @CacheListener -------------------------------------------------
-
+
@NodeModified
public void nodeModified(NodeModifiedEvent event)
{
- if (event.isPre())
- return;
-
+ if (event.isPre()) return;
+
// we're only interested in put and remove data operations
ModificationType modType = event.getModificationType();
- if (!modType.equals(ModificationType.PUT_DATA) && !modType.equals(ModificationType.REMOVE_DATA))
- return;
-
+ if (!modType.equals(ModificationType.PUT_DATA) && !modType.equals(ModificationType.REMOVE_DATA)) return;
+
// ignore changes for other roots in a shared cache
Fqn fqn = event.getFqn();
- if (!fqn.isChildOf(ROOTFQN))
- return;
-
+ if (!fqn.isChildOf(ROOTFQN)) return;
+
Serializable key = null;
Serializable value = null;
-
+
// there should be exactly one key/value pair in the map
Map data = event.getData();
if (data != null && !data.isEmpty())
{
- key = (Serializable)data.keySet().iterator().next();
- value = (Serializable)data.get(key);
- }
-
+ key = (Serializable) data.keySet().iterator().next();
+ value = (Serializable) data.get(key);
+ }
+
if (modType.equals(ModificationType.PUT_DATA))
- DistributedStateImpl.this.notifyKeyListeners((String)fqn.get(ROOTFQNSIZE), key, value, event.isOriginLocal());
+ {
+ DistributedStateImpl.this.notifyKeyListeners((String) fqn.get(ROOTFQNSIZE), key, value, event.isOriginLocal());
+ }
else
- DistributedStateImpl.this.notifyKeyListenersOfRemove((String)fqn.get(ROOTFQNSIZE), key, value, event.isOriginLocal());
+ {
+ DistributedStateImpl.this.notifyKeyListenersOfRemove((String) fqn.get(ROOTFQNSIZE), key, value, event.isOriginLocal());
+ }
}
}
More information about the jboss-cvs-commits
mailing list