[jboss-cvs] JBossAS SVN: r77618 - trunk/cluster/src/main/org/jboss/cache/invalidation/bridges.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Aug 28 15:28:10 EDT 2008


Author: pferraro
Date: 2008-08-28 15:28:10 -0400 (Thu, 28 Aug 2008)
New Revision: 77618

Modified:
   trunk/cluster/src/main/org/jboss/cache/invalidation/bridges/JGCacheInvalidationBridge.java
Log:
[JBAS-5439] Ensure JGCacheInvalidationBridge can handle concurrent JGroups requests
Code cleanup.  No significant changes needed.

Modified: trunk/cluster/src/main/org/jboss/cache/invalidation/bridges/JGCacheInvalidationBridge.java
===================================================================
--- trunk/cluster/src/main/org/jboss/cache/invalidation/bridges/JGCacheInvalidationBridge.java	2008-08-28 18:48:12 UTC (rev 77617)
+++ trunk/cluster/src/main/org/jboss/cache/invalidation/bridges/JGCacheInvalidationBridge.java	2008-08-28 19:28:10 UTC (rev 77618)
@@ -51,79 +51,66 @@
  * </ul>
  */
 
-public class JGCacheInvalidationBridge 
-   extends org.jboss.system.ServiceMBeanSupport 
-   implements JGCacheInvalidationBridgeMBean, 
-      DistributedState.DSListenerEx,
-      InvalidationBridgeListener, 
-      DistributedReplicantManager.ReplicantListener
+public class JGCacheInvalidationBridge
+   extends org.jboss.system.ServiceMBeanSupport
+   implements JGCacheInvalidationBridgeMBean,
+     DistributedState.DSListenerEx,
+     InvalidationBridgeListener,
+     DistributedReplicantManager.ReplicantListener
 {
-   
    // Constants -----------------------------------------------------
    
+   protected final static Class[] rpc_invalidate_types = new Class[] { String.class, Serializable.class };
+   protected final static Class[] rpc_invalidates_types = new Class[] { String.class, Serializable[].class };
+   protected final static Class[] rpc_invalidate_all_types = new Class[] { String.class };
+   protected final static Class[] rpc_batch_invalidate_types = new Class[] { BatchInvalidation[].class };
+
    // Attributes ----------------------------------------------------      
-   
+
    /**
     * The ClusterPartition with which we are associated.
     */
-   protected HAPartition partition;
-   protected String bridgeName = "DefaultJGCacheIB";
-   
-   protected DistributedState ds = null;
-   protected DistributedReplicantManager drm = null;
+   protected volatile HAPartition partition;
+   protected volatile String bridgeName = "DefaultJGCacheIB";
+   protected volatile InvalidationManagerMBean invalMgr = null;
+
    protected String RPC_HANDLER_NAME = null;
-   protected String nodeName = null;
-   
-   protected InvalidationManagerMBean invalMgr = null;
+
    protected BridgeInvalidationSubscription invalidationSubscription = null;
    protected Collection localGroups = null;
-   protected Vector bridgedGroups = new Vector ();   
+   protected Vector bridgedGroups = new Vector();
 
-   protected final Class[] rpc_invalidate_types=new Class[]{String.class, Serializable.class};
-   protected final Class[] rpc_invalidates_types=new Class[]{String.class, Serializable[].class};
-   protected final Class[] rpc_invalidate_all_types=new Class[]{String.class};
-   protected final Class[] rpc_batch_invalidate_types=new Class[]{BatchInvalidation[].class};
+   // Public --------------------------------------------------------
 
-
-   // Static --------------------------------------------------------
-   
-   // Constructors --------------------------------------------------
-
-   public JGCacheInvalidationBridge ()
-   {
-   }
-      
-   // Public --------------------------------------------------------
-   
    // JGCacheInvalidationBridgeMBean implementation ----------------------------------------------
 
    public HAPartition getHAPartition()
    {
-      return partition;
+      return this.partition;
    }
 
    public void setHAPartition(HAPartition clusterPartition)
    {
       this.partition = clusterPartition;
    }
-   
-   public String getPartitionName ()
+
+   public String getPartitionName()
    {
       return this.partition.getPartitionName();
    }
-   
-   public String getBridgeName ()
+
+   public String getBridgeName()
    {
       return this.bridgeName;
    }
-   
-   public void setBridgeName (String name)
+
+   public void setBridgeName(String name)
    {
       this.bridgeName = name;
    }
-   
+
    // DistributedReplicantManager.ReplicantListener implementation ---------------------------
-   
+
    /**
     * @todo examine thread safety. synchronized keyword was added to method 
     * signature when internal behavior of DistributedReplicantManagerImpl was 
@@ -131,433 +118,481 @@
     * notifications. Need to examine in detail how this method interacts with
     * DistributedState to see if we can remove/narrow the synchronization. 
     */
-   public synchronized void replicantsChanged (String key, java.util.List newReplicants, int newReplicantsViewId, boolean merge)
+   public synchronized void replicantsChanged(String key, java.util.List newReplicants, int newReplicantsViewId,
+         boolean merge)
    {
-      if (key.equals (this.RPC_HANDLER_NAME) && this.drm.isMasterReplica (this.RPC_HANDLER_NAME))
+      DistributedReplicantManager drm = this.partition.getDistributedReplicantManager();
+      
+      if (key.equals(this.RPC_HANDLER_NAME) && drm.isMasterReplica(this.RPC_HANDLER_NAME))
       {
-         log.debug ("The list of replicants for the JG bridge has changed, computing and updating local info...");
+         this.log.debug("The list of replicants for the JG bridge has changed, computing and updating local info...");
+
+         DistributedState ds = this.partition.getDistributedStateService();
          
          // we remove any entry from the DS whose node is dead
          //
-         java.util.Collection coll = this.ds.getAllKeys (this.RPC_HANDLER_NAME);                  
-         if (coll == null) 
+         java.util.Collection coll = ds.getAllKeys(this.RPC_HANDLER_NAME);
+         if (coll == null)
          {
-            log.debug ("... No bridge info was associated with this node");
+            this.log.debug("... No bridge info was associated with this node");
             return;
          }
-         
+
          // to avoid ConcurrentModificationException, we copy the list of keys in a new structure
          //
-         ArrayList collCopy = new java.util.ArrayList (coll);
-         java.util.List newReplicantsNodeNames = this.drm.lookupReplicantsNodeNames (this.RPC_HANDLER_NAME);
-         
+         ArrayList collCopy = new java.util.ArrayList(coll);
+         java.util.List newReplicantsNodeNames = drm.lookupReplicantsNodeNames(this.RPC_HANDLER_NAME);
 
          for (int i = 0; i < collCopy.size(); i++)
          {
-            String nodeEntry = (String)collCopy.get(i);
-            if (!newReplicantsNodeNames.contains (nodeEntry))
+            String nodeEntry = (String) collCopy.get(i);
+            if (!newReplicantsNodeNames.contains(nodeEntry))
             {
                // the list of bridged topic contains a dead member: we remove it
                //
                try
                {
-                  log.debug ("removing bridge information associated with this node from the DS");
-                  this.ds.remove (this.RPC_HANDLER_NAME, nodeEntry, true);
+                  this.log.debug("removing bridge information associated with this node from the DS");
+                  ds.remove(this.RPC_HANDLER_NAME, nodeEntry, true);
                }
                catch (Exception e)
                {
-                  log.info ("Unable to remove a node entry from the distributed cache", e);
+                  this.log.info("Unable to remove a node entry from the distributed cache", e);
                }
             }
          }
       }
    }
-      
+
    // DistributedState.DSListener implementation ----------------------------------------------
-   
-    public void valueHasChanged (String category, Serializable key, Serializable value, boolean locallyModified)
-    {
-       this.updatedBridgedInvalidationGroupsInfo ();
-    }
-    
-    public void keyHasBeenRemoved (String category, Serializable key, Serializable previousContent, boolean locallyModified)
-    {
-       this.updatedBridgedInvalidationGroupsInfo ();
-    }
 
+   public void valueHasChanged(String category, Serializable key, Serializable value, boolean locallyModified)
+   {
+      this.updatedBridgedInvalidationGroupsInfo();
+   }
+
+   public void keyHasBeenRemoved(String category, Serializable key, Serializable previousContent,
+         boolean locallyModified)
+   {
+      this.updatedBridgedInvalidationGroupsInfo();
+   }
+
    // InvalidationBridgeListener implementation ----------------------------------------------
-   
-   public void batchInvalidate (BatchInvalidation[] invalidations, boolean asynchronous)
+
+   public void batchInvalidate(BatchInvalidation[] invalidations, boolean asynchronous)
    {
       if (invalidations == null) return;
-      
+
       // we need to sort which group other nodes accept or refuse and propagate through the net
       //      
       ArrayList acceptedGroups = new ArrayList();
-      
-      for (int i=0; i<invalidations.length; i++)
+
+      for (BatchInvalidation currBI : invalidations)
       {
-         BatchInvalidation currBI = invalidations[i];
-         if (groupExistsRemotely (currBI.getInvalidationGroupName ()))
-            acceptedGroups.add (currBI);
+         if (this.groupExistsRemotely(currBI.getInvalidationGroupName()))
+         {
+            acceptedGroups.add(currBI);
+         }
       }
-      
-      if (acceptedGroups.size () > 0)
+
+      if (acceptedGroups.size() > 0)
       {
-         BatchInvalidation[] result = new BatchInvalidation[acceptedGroups.size ()];
-         result = (BatchInvalidation[])acceptedGroups.toArray (result);
-         
-         if (log.isTraceEnabled ())
-            log.trace ("Transmitting batch invalidation: " + result);
-         this._do_rpc_batchInvalidate (result, asynchronous);      
+         BatchInvalidation[] result = new BatchInvalidation[acceptedGroups.size()];
+         result = (BatchInvalidation[]) acceptedGroups.toArray(result);
+
+         if (this.log.isTraceEnabled())
+         {
+            this.log.trace("Transmitting batch invalidation: " + result);
+         }
+         this._do_rpc_batchInvalidate(result, asynchronous);
       }
    }
-   
-   public void invalidate (String invalidationGroupName, Serializable[] keys, boolean asynchronous)
+
+   public void invalidate(String invalidationGroupName, Serializable[] keys, boolean asynchronous)
    {
       // if the group exists on another node, we simply propagate to other nodes
       //
-      if (log.isTraceEnabled ())
-         log.trace ("Transmitting invalidations for group: " + invalidationGroupName);
-      
-      if (groupExistsRemotely (invalidationGroupName))
-         _do_rpc_invalidates (invalidationGroupName, keys, asynchronous);
+      if (this.log.isTraceEnabled())
+      {
+         this.log.trace("Transmitting invalidations for group: " + invalidationGroupName);
+      }
+
+      if (this.groupExistsRemotely(invalidationGroupName))
+      {
+         this._do_rpc_invalidates(invalidationGroupName, keys, asynchronous);
+      }
    }
-   
-   public void invalidate (String invalidationGroupName, Serializable key, boolean asynchronous)
+
+   public void invalidate(String invalidationGroupName, Serializable key, boolean asynchronous)
    {
       // if the group exists on another node, we simply propagate to other nodes
       //
-      if (log.isTraceEnabled ())
-         log.trace ("Transmitting invalidation for group: " + invalidationGroupName);
+      if (this.log.isTraceEnabled())
+      {
+         this.log.trace("Transmitting invalidation for group: " + invalidationGroupName);
+      }
 
-      if (groupExistsRemotely (invalidationGroupName))
-         _do_rpc_invalidate (invalidationGroupName, key, asynchronous);
+      if (this.groupExistsRemotely(invalidationGroupName))
+      {
+         this._do_rpc_invalidate(invalidationGroupName, key, asynchronous);
+      }
    }
 
    public void invalidateAll(String groupName, boolean async)
    {
-      if (log.isTraceEnabled ())
-         log.trace ("Transmitting for all entries for invalidation for group: " + groupName);
-      if (groupExistsRemotely (groupName))
-         _do_rpc_invalidate_all (groupName, async);
+      if (this.log.isTraceEnabled())
+      {
+         this.log.trace("Transmitting for all entries for invalidation for group: " + groupName);
+      }
+      if (this.groupExistsRemotely(groupName))
+      {
+         this._do_rpc_invalidate_all(groupName, async);
+      }
    }
 
-   public void newGroupCreated (String groupInvalidationName)
+   public void newGroupCreated(String groupInvalidationName)
    {
       try
       {
-         this.publishLocalInvalidationGroups ();
+         this.publishLocalInvalidationGroups();
          //this.updatedBridgedInvalidationGroupsInfo ();
       }
       catch (Exception e)
       {
-         log.info ("Problem while registering a new invalidation group over the cluster", e);
+         this.log.info("Problem while registering a new invalidation group over the cluster", e);
       }
    }
-   
-   public void groupIsDropped (String groupInvalidationName)
+
+   public void groupIsDropped(String groupInvalidationName)
    {
       try
       {
-         this.publishLocalInvalidationGroups ();
+         this.publishLocalInvalidationGroups();
          //this.updatedBridgedInvalidationGroupsInfo ();
       }
       catch (Exception e)
       {
-         log.info ("Problem while un-registering a new invalidation group over the cluster", e);
+         this.log.info("Problem while un-registering a new invalidation group over the cluster", e);
       }
    }
-   
+
    // Bean configuration properties ---------------------------------------------------
-   
-   public InvalidationManagerMBean getInvalidationManager ()
+
+   public InvalidationManagerMBean getInvalidationManager()
    {
       return this.invalMgr;
    }
-   
-   public void setInvalidationManager (InvalidationManagerMBean manager)
+
+   public void setInvalidationManager(InvalidationManagerMBean manager)
    {
       this.invalMgr = manager;
    }
-   
+
    // ServiceMBeanSupport overrides ---------------------------------------------------
-   
-   public void startService () throws Exception
+
+   @Override
+   public void startService() throws Exception
    {
-      if (partition == null)
-      {
+      if (this.partition == null)
          throw new IllegalStateException("HAPartition property must be set before starting InvalidationBridge service");
-      }
+
+      this.RPC_HANDLER_NAME = "DCacheBridge-" + this.bridgeName;
+
+      DistributedReplicantManager drm = this.partition.getDistributedReplicantManager();
+      DistributedState ds = this.partition.getDistributedStateService();
       
-      RPC_HANDLER_NAME = "DCacheBridge-" + this.bridgeName;
-  
-      this.ds = partition.getDistributedStateService ();
-      this.drm = partition.getDistributedReplicantManager ();
-      this.nodeName = partition.getNodeName();
-      
-      this.drm.add (this.RPC_HANDLER_NAME, "");
-      this.drm.registerListener (this.RPC_HANDLER_NAME, this);      
-      this.ds.registerDSListenerEx (RPC_HANDLER_NAME, this);
-      partition.registerRPCHandler(RPC_HANDLER_NAME, this);   
-      
+      drm.add(this.RPC_HANDLER_NAME, "");
+      drm.registerListener(this.RPC_HANDLER_NAME, this);
+      ds.registerDSListenerEx(this.RPC_HANDLER_NAME, this);
+      this.partition.registerRPCHandler(this.RPC_HANDLER_NAME, this);
+
       // we now publish the list of caches we have access to
-      if( invalMgr == null )
+      if (this.invalMgr == null)
+      {
          throw new IllegalStateException("Failed to find an InvalidationManagerMBean, ensure one is injected");
+      }
 
-      publishLocalInvalidationGroups ();      
-      this.updatedBridgedInvalidationGroupsInfo ();
-      
-      this.invalidationSubscription = invalMgr.registerBridgeListener (this);
-      
+      this.publishLocalInvalidationGroups();
+      this.updatedBridgedInvalidationGroupsInfo();
+
+      this.invalidationSubscription = this.invalMgr.registerBridgeListener(this);
    }
-   
-   public void stopService ()
+
+   @Override
+   public void stopService()
    {
+      DistributedReplicantManager drm = this.partition.getDistributedReplicantManager();
+      DistributedState ds = this.partition.getDistributedStateService();
+      
       try
       {
-         partition.unregisterRPCHandler (this.RPC_HANDLER_NAME, this);
-         this.ds.unregisterDSListenerEx (this.RPC_HANDLER_NAME, this);
-         this.drm.unregisterListener (this.RPC_HANDLER_NAME, this);
-         this.drm.remove (this.RPC_HANDLER_NAME);
-         
-         this.invalidationSubscription.unregister ();
-                  
-         this.ds.remove (this.RPC_HANDLER_NAME, this.nodeName, true);
-         
-//         this.invalMgr = null;
-//         partition = null;
-         this.drm = null;
-         this.ds = null;
+         this.partition.unregisterRPCHandler(this.RPC_HANDLER_NAME, this);
+         ds.unregisterDSListenerEx(this.RPC_HANDLER_NAME, this);
+         drm.unregisterListener(this.RPC_HANDLER_NAME, this);
+         drm.remove(this.RPC_HANDLER_NAME);
+
+         this.invalidationSubscription.unregister();
+
+         ds.remove(this.RPC_HANDLER_NAME, this.partition.getNodeName(), true);
+
+         //         this.invalMgr = null;
+         //         partition = null;
          this.invalidationSubscription = null;
          this.RPC_HANDLER_NAME = null;
-         this.nodeName = null;
          this.localGroups = null;
-         this.bridgedGroups = new Vector ();                  
+         this.bridgedGroups = new Vector();
       }
       catch (Exception e)
       {
-         log.info ("Problem while shuting down invalidation cache bridge", e);
+         this.log.info("Problem while shuting down invalidation cache bridge", e);
       }
    }
-   
+
    // RPC calls ---------------------------------------------
-   
-   public void _rpc_invalidate (String invalidationGroupName, Serializable key)
+
+   public void _rpc_invalidate(String invalidationGroupName, Serializable key)
    {
-      if (log.isTraceEnabled ())
-         log.trace ("Received remote invalidation for group: " + invalidationGroupName);
+      if (this.log.isTraceEnabled())
+      {
+         this.log.trace("Received remote invalidation for group: " + invalidationGroupName);
+      }
 
-      this.invalidationSubscription.invalidate (invalidationGroupName, key);
+      this.invalidationSubscription.invalidate(invalidationGroupName, key);
    }
-   
-   public void _rpc_invalidates (String invalidationGroupName, Serializable[] keys)
+
+   public void _rpc_invalidates(String invalidationGroupName, Serializable[] keys)
    {
-      if (log.isTraceEnabled ())
-         log.trace ("Received remote invalidations for group: " + invalidationGroupName);
+      if (this.log.isTraceEnabled())
+      {
+         this.log.trace("Received remote invalidations for group: " + invalidationGroupName);
+      }
 
-      this.invalidationSubscription.invalidate (invalidationGroupName, keys);
+      this.invalidationSubscription.invalidate(invalidationGroupName, keys);
    }
 
-   public void _rpc_invalidate_all (String invalidationGroupName)
+   public void _rpc_invalidate_all(String invalidationGroupName)
    {
-      if (log.isTraceEnabled ())
-         log.trace ("Received remote invalidate_all for group: " + invalidationGroupName);
+      if (this.log.isTraceEnabled())
+      {
+         this.log.trace("Received remote invalidate_all for group: " + invalidationGroupName);
+      }
 
-      this.invalidationSubscription.invalidateAll (invalidationGroupName);
+      this.invalidationSubscription.invalidateAll(invalidationGroupName);
    }
 
-   public void _rpc_batchInvalidate (BatchInvalidation[] invalidations)
+   public void _rpc_batchInvalidate(BatchInvalidation[] invalidations)
    {
-      if (log.isTraceEnabled () && invalidations != null)
-         log.trace ("Received remote batch invalidation for this number of groups: " + invalidations.length);
+      if (this.log.isTraceEnabled() && invalidations != null)
+      {
+         this.log.trace("Received remote batch invalidation for this number of groups: " + invalidations.length);
+      }
 
-      this.invalidationSubscription.batchInvalidate (invalidations);
+      this.invalidationSubscription.batchInvalidate(invalidations);
    }
 
-   protected void _do_rpc_invalidate (String invalidationGroupName, Serializable key, boolean asynch)
+   protected void _do_rpc_invalidate(String invalidationGroupName, Serializable key, boolean asynch)
    {
-      Object[] params = new Object[] {invalidationGroupName, key};
+      Object[] params = new Object[] { invalidationGroupName, key };
       try
-      {         
+      {
          if (asynch)
-            partition.callAsynchMethodOnCluster (this.RPC_HANDLER_NAME,
-                                                      "_rpc_invalidate",
-                                                      params, rpc_invalidate_types, true);
+         {
+            this.partition.callAsynchMethodOnCluster(this.RPC_HANDLER_NAME, "_rpc_invalidate", params,
+                  rpc_invalidate_types, true);
+         }
          else
-            partition.callMethodOnCluster (this.RPC_HANDLER_NAME,
-                                                "_rpc_invalidate",
-                                                params, rpc_invalidate_types, true);
+         {
+            this.partition.callMethodOnCluster(this.RPC_HANDLER_NAME, "_rpc_invalidate", params,
+                  rpc_invalidate_types, true);
+         }
       }
       catch (Exception e)
       {
-         log.debug ("Distributed invalidation (1) has failed for group " + 
-                    invalidationGroupName + " (Bridge: " + this.bridgeName + ")");
+         this.log.debug("Distributed invalidation (1) has failed for group " + invalidationGroupName + " (Bridge: "
+               + this.bridgeName + ")");
       }
    }
-   
-   protected void _do_rpc_invalidates (String invalidationGroupName, Serializable[] keys, boolean asynch)
+
+   protected void _do_rpc_invalidates(String invalidationGroupName, Serializable[] keys, boolean asynch)
    {
-      Object[] params = new Object[] {invalidationGroupName, keys};
+      Object[] params = new Object[] { invalidationGroupName, keys };
       try
-      {         
+      {
          if (asynch)
-            partition.callAsynchMethodOnCluster (this.RPC_HANDLER_NAME,
-                                                      "_rpc_invalidates", params, rpc_invalidates_types, true);
+         {
+            this.partition.callAsynchMethodOnCluster(this.RPC_HANDLER_NAME, "_rpc_invalidates", params,
+                  rpc_invalidates_types, true);
+         }
          else
-            partition.callMethodOnCluster (this.RPC_HANDLER_NAME,
-                                                "_rpc_invalidates", params, rpc_invalidates_types, true);
+         {
+            this.partition.callMethodOnCluster(this.RPC_HANDLER_NAME, "_rpc_invalidates", params,
+                  rpc_invalidates_types, true);
+         }
       }
       catch (Exception e)
       {
-         log.debug ("Distributed invalidation (2) has failed for group " + 
-                    invalidationGroupName + " (Bridge: " + this.bridgeName + ")");
+         this.log.debug("Distributed invalidation (2) has failed for group " + invalidationGroupName + " (Bridge: "
+               + this.bridgeName + ")");
       }
    }
 
-   protected void _do_rpc_invalidate_all (String invalidationGroupName, boolean asynch)
+   protected void _do_rpc_invalidate_all(String invalidationGroupName, boolean asynch)
    {
-      Object[] params = new Object[] {invalidationGroupName};
+      Object[] params = new Object[] { invalidationGroupName };
+
       try
       {
          if (asynch)
-            partition.callAsynchMethodOnCluster (this.RPC_HANDLER_NAME,
-                                                      "_rpc_invalidate_all", params, rpc_invalidate_all_types, true);
+         {
+            this.partition.callAsynchMethodOnCluster(this.RPC_HANDLER_NAME, "_rpc_invalidate_all", params,
+                  rpc_invalidate_all_types, true);
+         }
          else
-            partition.callMethodOnCluster (this.RPC_HANDLER_NAME,
-                                                "_rpc_invalidate_all", params, rpc_invalidate_all_types, true);
+         {
+            this.partition.callMethodOnCluster(this.RPC_HANDLER_NAME, "_rpc_invalidate_all", params,
+                  rpc_invalidate_all_types, true);
+         }
       }
       catch (Exception e)
       {
-         log.debug ("Distributed invalidation (2) has failed for group " +
-                    invalidationGroupName + " (Bridge: " + this.bridgeName + ")");
+         this.log.debug("Distributed invalidation (2) has failed for group " + invalidationGroupName + " (Bridge: "
+               + this.bridgeName + ")");
       }
    }
-   
-   protected void _do_rpc_batchInvalidate (BatchInvalidation[] invalidations, boolean asynch)
+
+   protected void _do_rpc_batchInvalidate(BatchInvalidation[] invalidations, boolean asynch)
    {
-      Object[] params = new Object[] {invalidations};
+      Object[] params = new Object[] { invalidations };
       try
-      {         
+      {
          if (asynch)
-            partition.callAsynchMethodOnCluster (this.RPC_HANDLER_NAME,
-                                                      "_rpc_batchInvalidate", params, rpc_batch_invalidate_types, true);
+         {
+            this.partition.callAsynchMethodOnCluster(this.RPC_HANDLER_NAME, "_rpc_batchInvalidate", params,
+                  rpc_batch_invalidate_types, true);
+         }
          else
-            partition.callMethodOnCluster (this.RPC_HANDLER_NAME,
-                                                "_rpc_batchInvalidate", params, rpc_batch_invalidate_types, true);
+         {
+            this.partition.callMethodOnCluster(this.RPC_HANDLER_NAME, "_rpc_batchInvalidate", params,
+                  rpc_batch_invalidate_types, true);
+         }
       }
       catch (Exception e)
       {
-         log.debug ("Distributed invalidation (3) has failed (Bridge: " + this.bridgeName + ")");
+         this.log.debug("Distributed invalidation (3) has failed (Bridge: " + this.bridgeName + ")");
       }
    }
 
-   
    // Package protected ---------------------------------------------
-   
+
    // Protected -----------------------------------------------------
-   
-   protected synchronized void publishLocalInvalidationGroups () throws Exception
+
+   protected synchronized void publishLocalInvalidationGroups() throws Exception
    {
-      this.localGroups = invalMgr.getInvalidationGroups ();      
-      
-      log.debug ("Publishing locally available invalidation groups: " + this.localGroups);
+      this.localGroups = this.invalMgr.getInvalidationGroups();
 
-      ArrayList content = new ArrayList (this.localGroups);      
-      ArrayList result = new ArrayList (content.size ());
-      
+      this.log.debug("Publishing locally available invalidation groups: " + this.localGroups);
 
+      ArrayList content = new ArrayList(this.localGroups);
+      ArrayList result = new ArrayList(content.size());
+
       for (int i = 0; i < content.size(); i++)
       {
-         String aGroup = ((InvalidationGroup)content.get(i)).getGroupName ();
-         result.add (aGroup);
+         String aGroup = ((InvalidationGroup) content.get(i)).getGroupName();
+         result.add(aGroup);
       }
+
+      String nodeName = this.partition.getNodeName();
+      DistributedState ds = this.partition.getDistributedStateService();
       
-      if (result.size () > 0)       
+      if (result.size() > 0)
       {
-         NodeInfo info = new NodeInfo (result, this.nodeName);
-         this.ds.set (this.RPC_HANDLER_NAME, this.nodeName, info, true);
+         NodeInfo info = new NodeInfo(result, nodeName);
+         ds.set(this.RPC_HANDLER_NAME, nodeName, info, true);
       }
       else
-         this.ds.remove (this.RPC_HANDLER_NAME, this.nodeName, true);
+      {
+         ds.remove(this.RPC_HANDLER_NAME, nodeName, true);
+      }
    }
-   
-   protected void updatedBridgedInvalidationGroupsInfo ()
+
+   protected void updatedBridgedInvalidationGroupsInfo()
    {
-      Collection bridgedByNode = this.ds.getAllValues (this.RPC_HANDLER_NAME);
-      
-      log.debug ("Updating list of invalidation groups that are bridged...");
-      
+      Collection bridgedByNode = this.partition.getDistributedStateService().getAllValues(this.RPC_HANDLER_NAME);
+
+      this.log.debug("Updating list of invalidation groups that are bridged...");
+
       if (bridgedByNode != null)
       {
          // Make a copy
          //      
-         ArrayList copy = new ArrayList (bridgedByNode);
+         ArrayList copy = new ArrayList(bridgedByNode);
 
-         Vector result  = new Vector ();
+         Vector result = new Vector();
+
+         String nodeName = this.partition.getNodeName();
          
-
          for (int i = 0; i < copy.size(); i++)
          {
-            NodeInfo infoForNode = (NodeInfo)copy.get(i);
-            log.trace ("InfoForNode: " + infoForNode);
-            
-            if (infoForNode != null && !infoForNode.groupName.equals (this.nodeName))
+            NodeInfo infoForNode = (NodeInfo) copy.get(i);
+            this.log.trace("InfoForNode: " + infoForNode);
+
+            if (infoForNode != null && !infoForNode.groupName.equals(nodeName))
             {
                ArrayList groupsForNode = infoForNode.groups;
-               log.trace ("Groups for node: " + groupsForNode);
-               
+               this.log.trace("Groups for node: " + groupsForNode);
 
                for (int j = 0; j < groupsForNode.size(); j++)
                {
-                  String aGroup = (String)groupsForNode.get(j);
-                  if (!result.contains (aGroup))
+                  String aGroup = (String) groupsForNode.get(j);
+                  if (!result.contains(aGroup))
                   {
-                     log.trace ("Adding: " + aGroup);
-                     result.add (aGroup);                  
+                     this.log.trace("Adding: " + aGroup);
+                     result.add(aGroup);
                   }
                }
-               
-            }            
-            
+
+            }
+
          }
          // atomic assignation of the result
          //
          this.bridgedGroups = result;
-         
-         log.debug ("... computed list of bridged groups: " + result);
+
+         this.log.debug("... computed list of bridged groups: " + result);
       }
       else
       {
-         log.debug ("... nothing needs to be bridged.");            
+         this.log.debug("... nothing needs to be bridged.");
       }
-         
+
    }
-   
-   protected boolean groupExistsRemotely (String groupName)
+
+   protected boolean groupExistsRemotely(String groupName)
    {
-      return this.bridgedGroups.contains (groupName);
+      return this.bridgedGroups.contains(groupName);
    }
-   
+
    // Private -------------------------------------------------------
-   
+
    // Inner classes -------------------------------------------------
-   
+
 }
 
 class NodeInfo implements java.io.Serializable
 {
    static final long serialVersionUID = -3215712955134929006L;
-   
+
    public ArrayList groups = null;
+
    public String groupName = null;
+
+   public NodeInfo()
+   {
+   }
    
-   public NodeInfo (){}
-   
-   public NodeInfo (ArrayList groups, String groupName)
+   public NodeInfo(ArrayList groups, String groupName)
    {
       this.groups = groups;
       this.groupName = groupName;
    }
-   
 }




More information about the jboss-cvs-commits mailing list