[jboss-cvs] JBossAS SVN: r74414 - trunk/cluster/src/main/org/jboss/ha/framework/server.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Jun 11 16:08:04 EDT 2008


Author: pferraro
Date: 2008-06-11 16:08:04 -0400 (Wed, 11 Jun 2008)
New Revision: 74414

Modified:
   trunk/cluster/src/main/org/jboss/ha/framework/server/ClusterPartition.java
Log:
[JBAS-5433] Ensure ClusterPartition can handle concurrent JGroups requests
Included some minor code style modifications.

Modified: trunk/cluster/src/main/org/jboss/ha/framework/server/ClusterPartition.java
===================================================================
--- trunk/cluster/src/main/org/jboss/ha/framework/server/ClusterPartition.java	2008-06-11 19:54:55 UTC (rev 74413)
+++ trunk/cluster/src/main/org/jboss/ha/framework/server/ClusterPartition.java	2008-06-11 20:08:04 UTC (rev 74414)
@@ -33,7 +33,6 @@
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.Vector;
 import java.util.concurrent.ConcurrentHashMap;
@@ -132,12 +131,12 @@
            
       public String getServiceName()
       {
-         return serviceName;
+         return this.serviceName;
       }
            
       public byte[] getPayload()
       {
-         return payload;
+         return this.payload;
       }
    }
    
@@ -154,18 +153,18 @@
       {
          try
          {
-            channel.connect(getPartitionName());
+            ClusterPartition.this.channel.connect(getPartitionName());
          }
          catch (Exception e)
          {
-            synchronized (channelLock)
+            synchronized (ClusterPartition.this.channelLock)
             {
-               connectException = e;
+               ClusterPartition.this.connectException = e;
             }
          }
          finally
          {
-            latch.countDown();
+            this.latch.countDown();
          }
       }
    }
@@ -190,8 +189,8 @@
    /** Thread pool used to asynchronously start our channel */
    private   ThreadPool threadPool;
    
-   protected HashMap<String, Object> rpcHandlers = new HashMap<String, Object>();
-   protected HashMap stateHandlers = new HashMap();
+   protected Map<String, Object> rpcHandlers = new ConcurrentHashMap<String, Object>();
+   protected Map<String, HAPartitionStateTransfer> stateHandlers = new HashMap<String, HAPartitionStateTransfer>();
    /** Do we send any membership change notifications synchronously? */
    protected boolean allowSyncListeners = false;
    /** The HAMembershipListener and HAMembershipExtendedListeners */
@@ -203,7 +202,7 @@
    /** The current cluster partition members */
    protected Vector members = null;
    protected Vector jgmembers = null;
-   protected ConcurrentHashMap<String, WeakReference<ClassLoader>> clmap =
+   protected Map<String, WeakReference<ClassLoader>> clmap =
                                           new ConcurrentHashMap<String, WeakReference<ClassLoader>>();
 
    public Vector history = new Vector();
@@ -288,63 +287,63 @@
    
    protected void createService() throws Exception
    {
-      if (replicantManager == null)
+      if (this.replicantManager == null)
          throw new IllegalStateException("DistributedReplicantManager property must be set before creating ClusterPartition service");
 
       setupLoggers(getPartitionName());
       
-      replicantManager.createService();
+      this.replicantManager.createService();
       
-      if (distributedState != null)
+      if (this.distributedState != null)
       {
-         distributedState.createService();
+         this.distributedState.createService();
       }         
       
       // Create the asynchronous handler for view changes
-      asynchHandler = new AsynchEventHandler(this, "AsynchViewChangeHandler");
+      this.asynchHandler = new AsynchEventHandler(this, "AsynchViewChangeHandler");
       
-      log.debug("done initializing partition");
+      this.log.debug("done initializing partition");
    }
    
    protected void startService() throws Exception
    {
       logHistory ("Starting partition");
       
-      cache = cacheManager.getCache(cacheConfigName, true);
-      channelFactory = cache.getConfiguration().getRuntimeConfig().getMuxChannelFactory();
-      stackName = cache.getConfiguration().getMultiplexerStack();
+      this.cache = this.cacheManager.getCache(this.cacheConfigName, true);
+      this.channelFactory = this.cache.getConfiguration().getRuntimeConfig().getMuxChannelFactory();
+      this.stackName = this.cache.getConfiguration().getMultiplexerStack();
       
-      if (channel == null || !channel.isOpen())
+      if (this.channel == null || !this.channel.isOpen())
       {
-         log.debug("Creating Channel for partition " + getPartitionName() +
+         this.log.debug("Creating Channel for partition " + getPartitionName() +
                " using stack " + getChannelStackName());
    
-         channel = createChannel();
+         this.channel = createChannel();
          
-         channel.setOpt(Channel.AUTO_RECONNECT, Boolean.TRUE);
-         channel.setOpt(Channel.AUTO_GETSTATE, Boolean.TRUE);
+         this.channel.setOpt(Channel.AUTO_RECONNECT, Boolean.TRUE);
+         this.channel.setOpt(Channel.AUTO_GETSTATE, Boolean.TRUE);
       }
       
-      log.info("Initializing partition " + getPartitionName());
+      this.log.info("Initializing partition " + getPartitionName());
       logHistory ("Initializing partition " + getPartitionName());
       
-      dispatcher = new RpcHandler(channel, null, null, new Object(), getDeadlockDetection());
+      this.dispatcher = new RpcHandler(this.channel, null, null, new Object(), getDeadlockDetection());
       
       // Subscribe to events generated by the channel
-      log.debug("setMembershipListener");
-      dispatcher.setMembershipListener(this);
-      log.debug("setMessageListener");
-      dispatcher.setMessageListener(messageListener);
-      dispatcher.setRequestMarshaller(new RequestMarshallerImpl());
-      dispatcher.setResponseMarshaller(new ResponseMarshallerImpl());
+      this.log.debug("setMembershipListener");
+      this.dispatcher.setMembershipListener(this);
+      this.log.debug("setMessageListener");
+      this.dispatcher.setMessageListener(this.messageListener);
+      this.dispatcher.setRequestMarshaller(new RequestMarshallerImpl());
+      this.dispatcher.setResponseMarshaller(new ResponseMarshallerImpl());
       
       // Clear any old connectException
-      connectException = null;
+      this.connectException = null;
       CountDownLatch connectLatch = new CountDownLatch(1);
       
-      if (threadPool == null)
+      if (this.threadPool == null)
       {
-         channel.connect(getPartitionName());
+         this.channel.connect(getPartitionName());
          connectLatch.countDown();
       }
       else
@@ -352,25 +351,25 @@
          // Do the channel connect in another thread while this
          // thread starts the cache and does that channel connect
          ChannelConnectTask task = new ChannelConnectTask(connectLatch);
-         threadPool.run(task);
+         this.threadPool.run(task);
       }
       
-      cache.start();
+      this.cache.start();
       
       try
       {         
          // This will block waiting for any async channel connect above
          connectLatch.await();
          
-         if (connectException != null)
-            throw connectException;
+         if (this.connectException != null)
+            throw this.connectException;
          
-         log.debug("Get current members");
+         this.log.debug("Get current members");
          waitForView();     
          
          // get current JG group properties         
-         log.debug("get nodeName");
-         this.localJGAddress = (IpAddress)channel.getLocalAddress();
+         this.log.debug("get nodeName");
+         this.localJGAddress = (IpAddress)this.channel.getLocalAddress();
          this.me = new ClusterNodeImpl(this.localJGAddress);
          this.nodeName = this.me.getName();
 
@@ -378,34 +377,34 @@
 
          fetchState();
          
-         replicantManager.startService();
+         this.replicantManager.startService();
          
-         if (distributedState != null)
+         if (this.distributedState != null)
          {
-            distributedState.setClusteredCache(getClusteredCache());
-            distributedState.startService();
+            this.distributedState.setClusteredCache(getClusteredCache());
+            this.distributedState.startService();
          }
          
          // Start the asynch listener handler thread
-         asynchHandler.start();
+         this.asynchHandler.start();
          
          // Register with the service locator
          HAPartitionLocator.getHAPartitionLocator().registerHAPartition(this);
          
          // Bind ourself in the public JNDI space if configured to do so
-         if (bindIntoJndi)
+         if (this.bindIntoJndi)
          {
             Context ctx = new InitialContext();
-            this.bind(HAPartitionLocator.getStandardJndiBinding(getPartitionName()), 
+            bind(HAPartitionLocator.getStandardJndiBinding(getPartitionName()), 
                       this, ClusterPartition.class, ctx);
-            log.debug("Bound in JNDI under /HAPartition/" + getPartitionName());
+            this.log.debug("Bound in JNDI under /HAPartition/" + getPartitionName());
          }
       }
       catch (Throwable t)
       {
-         log.debug("Caught exception after channel connected; closing channel -- " + t.getLocalizedMessage());
-         channel.close();
-         channel = null;
+         this.log.debug("Caught exception after channel connected; closing channel -- " + t.getLocalizedMessage());
+         this.channel.close();
+         this.channel = null;
          throw (t instanceof Exception) ? (Exception) t : new RuntimeException(t);
       }
       
@@ -414,46 +413,46 @@
    protected void stopService() throws Exception
    {
       logHistory ("Stopping partition");
-      log.info("Stopping partition " + getPartitionName());
+      this.log.info("Stopping partition " + getPartitionName());
 
       try
       {
-         asynchHandler.stop();
+         this.asynchHandler.stop();
       }
       catch( Exception e)
       {
-         log.warn("Failed to stop asynchHandler", e);
+         this.log.warn("Failed to stop asynchHandler", e);
       }    
       
-      if (distributedState != null)
+      if (this.distributedState != null)
       {
-         distributedState.stopService();
+         this.distributedState.stopService();
       }
 
-      replicantManager.stopService();
+      this.replicantManager.stopService();
       
       try
       {
-         cacheManager.releaseCache(cacheConfigName);
+         this.cacheManager.releaseCache(this.cacheConfigName);
       }
       catch (Exception e)
       {
-         log.error("cache release failed", e);
+         this.log.error("cache release failed", e);
       }
       
 //    NR 200505 : [JBCLUSTER-38] replace channel.close() by a disconnect and
 //    add the destroyPartition() step
       try
       {
-         if (channel != null && channel.isConnected())
-            channel.disconnect();
+         if (this.channel != null && this.channel.isConnected())
+            this.channel.disconnect();
       }
       catch (Exception e)
       {
-         log.error("channel disconnection failed", e);
+         this.log.error("channel disconnection failed", e);
       }
 
-      if (bindIntoJndi)
+      if (this.bindIntoJndi)
       {
          String boundName = HAPartitionLocator.getStandardJndiBinding(getPartitionName());
          InitialContext ctx = null;
@@ -464,7 +463,7 @@
             ctx.unbind(boundName);
          }
          catch (Exception e) {
-            log.error("partition unbind operation failed", e);
+            this.log.error("partition unbind operation failed", e);
          }
          finally
          {
@@ -476,31 +475,31 @@
       
       HAPartitionLocator.getHAPartitionLocator().deregisterHAPartition(this);
 
-      log.info("Partition " + getPartitionName() + " stopped.");
+      this.log.info("Partition " + getPartitionName() + " stopped.");
    }
    
    protected void destroyService()  throws Exception
    {
-      log.debug("Destroying HAPartition: " + getPartitionName()); 
+      this.log.debug("Destroying HAPartition: " + getPartitionName()); 
       
-      if (distributedState != null)
+      if (this.distributedState != null)
       {
-         distributedState.destroyService();
+         this.distributedState.destroyService();
       }   
 
-      replicantManager.destroyService();
+      this.replicantManager.destroyService();
 
       try
       {
-         if (channel != null && channel.isOpen())
-            channel.close();
+         if (this.channel != null && this.channel.isOpen())
+            this.channel.close();
       }
       catch (Exception e)
       {
-         log.error("Closing channel failed", e);
+         this.log.error("Closing channel failed", e);
       }
 
-      log.info("Partition " + getPartitionName() + " destroyed.");
+      this.log.info("Partition " + getPartitionName() + " destroyed.");
    }
    
    // ---------------------------------------------------------- State Transfer 
@@ -508,24 +507,24 @@
 
    protected void fetchState() throws Exception
    {
-      log.info("Fetching serviceState (will wait for " + getStateTransferTimeout() + 
+      this.log.info("Fetching serviceState (will wait for " + getStateTransferTimeout() + 
             " milliseconds):");
       long start, stop;
-      isStateSet = false;
+      this.isStateSet = false;
       start = System.currentTimeMillis();
-      boolean rc = channel.getState(null, getStateTransferTimeout());
+      boolean rc = this.channel.getState(null, getStateTransferTimeout());
       if (rc)
       {
-         synchronized (channelLock)
+         synchronized (this.channelLock)
          {
-            while (!isStateSet)
+            while (!this.isStateSet)
             {
-               if (setStateException != null)
-                  throw setStateException;
+               if (this.setStateException != null)
+                  throw this.setStateException;
 
                try
                {
-                  channelLock.wait();
+                  this.channelLock.wait();
                }
                catch (InterruptedException iex)
                {
@@ -533,7 +532,7 @@
             }
          }
          stop = System.currentTimeMillis();
-         log.info("serviceState was retrieved successfully (in " + (stop - start) + " milliseconds)");
+         this.log.info("serviceState was retrieved successfully (in " + (stop - start) + " milliseconds)");
       }
       else
       {
@@ -541,14 +540,14 @@
          // We need to find out if we are the coordinator, so we must
          // block until viewAccepted() is called at least once
 
-         synchronized (members)
+         synchronized (this.members)
          {
-            while (members.size() == 0)
+            while (this.members.size() == 0)
             {
-               log.debug("waiting on viewAccepted()");
+               this.log.debug("waiting on viewAccepted()");
                try
                {
-                  members.wait();
+                  this.members.wait();
                }
                catch (InterruptedException iex)
                {
@@ -558,7 +557,7 @@
 
          if (isCurrentNodeCoordinator())
          {
-            log.info("State could not be retrieved (we are the first member in group)");
+            this.log.info("State could not be retrieved (we are the first member in group)");
          }
          else
          {
@@ -572,12 +571,10 @@
    {
       MarshalledValueOutputStream mvos = null; // don't create until we know we need it
       
-      for (Iterator keys = stateHandlers.entrySet().iterator(); keys.hasNext(); )
+      for (Map.Entry<String, HAPartitionStateTransfer> entry: this.stateHandlers.entrySet())
       {
-         Map.Entry entry = (Map.Entry)keys.next();
-         HAPartition.HAPartitionStateTransfer subscriber = 
-            (HAPartition.HAPartitionStateTransfer) entry.getValue();
-         log.debug("getState for " + entry.getKey());
+         HAPartitionStateTransfer subscriber = entry.getValue();
+         this.log.debug("getState for " + entry.getKey());
          Object state = subscriber.getCurrentState();
          if (state != null)
          {
@@ -614,12 +611,12 @@
       
       if (type == EOF_VALUE)
       {
-         log.debug("serviceState stream is empty");
+         this.log.debug("serviceState stream is empty");
          return;
       }
       else if (type == NULL_VALUE)
       {
-         log.debug("serviceState is null");
+         this.log.debug("serviceState is null");
          return;
       }
       
@@ -636,9 +633,9 @@
             break;
          
          String key = (String) obj;
-         log.debug("setState for " + key);
+         this.log.debug("setState for " + key);
          Object someState = mvis.readObject();
-         HAPartition.HAPartitionStateTransfer subscriber = (HAPartition.HAPartitionStateTransfer)stateHandlers.get(key);
+         HAPartitionStateTransfer subscriber = this.stateHandlers.get(key);
          if (subscriber != null)
          {
             try
@@ -660,13 +657,13 @@
                }
                else
                {
-                  log.error("Caught exception setting serviceState to " + subscriber, e);
+                  this.log.error("Caught exception setting serviceState to " + subscriber, e);
                }
             }
          }
          else
          {
-            log.debug("There is no stateHandler for: " + key);
+            this.log.debug("There is no stateHandler for: " + key);
          }      
       }
       
@@ -676,29 +673,29 @@
       }
       catch(Exception e)
       {
-         log.error("Caught exception closing serviceState stream", e);
+         this.log.error("Caught exception closing serviceState stream", e);
       }
 
       used_mem_after=rt.totalMemory() - rt.freeMemory();
-      log.debug("received serviceState; expanded memory by " +
+      this.log.debug("received serviceState; expanded memory by " +
             (used_mem_after - used_mem_before) + " bytes (used memory before: " + used_mem_before +
             ", used memory after: " + used_mem_after + ")");
    }
 
    private void recordSetStateFailure(Throwable t)
    {
-      log.error("failed setting serviceState", t);
+      this.log.error("failed setting serviceState", t);
       if (t instanceof Exception)
-         setStateException = (Exception) t;
+         this.setStateException = (Exception) t;
       else
-         setStateException = new Exception(t);
+         this.setStateException = new Exception(t);
    }
 
    private void notifyChannelLock()
    {
-      synchronized (channelLock)
+      synchronized (this.channelLock)
       {         
-         channelLock.notifyAll();
+         this.channelLock.notifyAll();
       }
    }
    
@@ -708,21 +705,21 @@
    {      
       logHistory ("Node suspected: " + (suspected_mbr==null?"null":suspected_mbr.toString()));
       if (isCurrentNodeCoordinator ())
-         clusterLifeCycleLog.info ("Suspected member: " + suspected_mbr);
+         this.clusterLifeCycleLog.info ("Suspected member: " + suspected_mbr);
       else
-         log.info("Suspected member: " + suspected_mbr);
+         this.log.info("Suspected member: " + suspected_mbr);
    }
 
    public void block() 
    {       
-       flushBlockGate.close();           
-       log.debug("Block processed at " + me);	  
+       this.flushBlockGate.close();           
+       this.log.debug("Block processed at " + this.me);	  
    }
    
    public void unblock()
    {
-       flushBlockGate.open();           
-       log.debug("Unblock processed at " + me);
+       this.flushBlockGate.open();           
+       this.log.debug("Unblock processed at " + this.me);
    }
    
    /** Notification of a cluster view change. This is done from the JG protocol
@@ -743,7 +740,7 @@
 
          // Keep a list of other members only for "exclude-self" RPC calls
          this.jgotherMembers = (Vector)newView.getMembers().clone();
-         this.jgotherMembers.remove (channel.getLocalAddress());
+         this.jgotherMembers.remove (this.channel.getLocalAddress());
          this.otherMembers = translateAddresses (this.jgotherMembers); // TRANSLATE!
          Vector translatedNewView = translateAddresses ((Vector)newView.getMembers().clone());
          logHistory ("New view: " + translatedNewView + " with viewId: " + this.currentViewId +
@@ -761,16 +758,16 @@
          if (oldMembers == null)
          {
             // Initial viewAccepted
-            log.debug("ViewAccepted: initial members set for partition " + getPartitionName() + ": " +
+            this.log.debug("ViewAccepted: initial members set for partition " + getPartitionName() + ": " +
                      this.currentViewId + " (" + this.members + ")");
             
-            log.info("Number of cluster members: " + members.size());
-            for(int m = 0; m > members.size(); m ++)
+            this.log.info("Number of cluster members: " + this.members.size());
+            for(int m = 0; m > this.members.size(); m ++)
             {
-               Object node = members.get(m);
-               log.debug(node);
+               Object node = this.members.get(m);
+               this.log.debug(node);
             }
-            log.info ("Other members: " + this.otherMembers.size ());
+            this.log.info ("Other members: " + this.otherMembers.size ());
             
             // Wake up the deployer thread blocking in waitForView
             notifyChannelLock();
@@ -784,15 +781,15 @@
             difference = newMembers.size () - oldMembers.size ();
          
          if (isCurrentNodeCoordinator ())
-            clusterLifeCycleLog.info ("New cluster view for partition " + getPartitionName() + " (id: " +
+            this.clusterLifeCycleLog.info ("New cluster view for partition " + getPartitionName() + " (id: " +
                                       this.currentViewId + ", delta: " + difference + ") : " + this.members);
          else
-            log.info("New cluster view for partition " + getPartitionName() + ": " +
+            this.log.info("New cluster view for partition " + getPartitionName() + ": " +
                      this.currentViewId + " (" + this.members + " delta: " + difference + ")");
 
          // Build a ViewChangeEvent for the asynch listeners
          ViewChangeEvent event = new ViewChangeEvent();
-         event.viewId = currentViewId;
+         event.viewId = this.currentViewId;
          event.allMembers = translatedNewView;
          event.deadMembers = getDeadMembers(oldMembers, event.allMembers);
          event.newMembers = getNewMembers(oldMembers, event.allMembers);
@@ -804,7 +801,7 @@
             event.originatingGroups = mergeView.getSubgroups();
          }
 
-         log.debug("membership changed from " + 
+         this.log.debug("membership changed from " + 
                   (oldMembers == null ? 0 : oldMembers.size()) + " to " + 
                   event.allMembers.size());
          // Put the view change to the asynch queue
@@ -813,35 +810,35 @@
          // Broadcast the new view to the synchronous view change listeners
          if (this.allowSyncListeners)
          {
-            this.notifyListeners(synchListeners, event.viewId, event.allMembers,
+            notifyListeners(this.synchListeners, event.viewId, event.allMembers,
                   event.deadMembers, event.newMembers, event.originatingGroups);
          }
       }
       catch (Exception ex)
       {
-         log.error("ViewAccepted failed", ex);
+         this.log.error("ViewAccepted failed", ex);
       }
    }
 
    private void waitForView() throws Exception
    {
-      synchronized (channelLock)
+      synchronized (this.channelLock)
       {
          if (this.members == null)
          {
-            if (connectException != null)
-               throw connectException;
+            if (this.connectException != null)
+               throw this.connectException;
             
             try
             {
-               channelLock.wait(getMethodCallTimeout());
+               this.channelLock.wait(getMethodCallTimeout());
             }
             catch (InterruptedException iex)
             {
             }
             
-            if (connectException != null)
-               throw connectException;
+            if (this.connectException != null)
+               throw this.connectException;
             
             if (this.members == null)
                throw new IllegalStateException("No view received from Channel");
@@ -853,27 +850,27 @@
    
    public String getNodeName()
    {
-      return nodeName;
+      return this.nodeName;
    }
    
    public String getPartitionName()
    {
-      return partitionName;
+      return this.partitionName;
    }  
 
    public void setPartitionName(String newName)
    {
-      partitionName = newName;
+      this.partitionName = newName;
    }
    
    public DistributedReplicantManager getDistributedReplicantManager()
    {
-      return replicantManager;
+      return this.replicantManager;
    }
    
    public DistributedState getDistributedStateService()
    {
-      return distributedState;
+      return this.distributedState;
    }
 
    public long getCurrentViewId()
@@ -884,16 +881,16 @@
    public Vector getCurrentView()
    {
       Vector result = new Vector (this.members.size());
-      for (int i = 0; i < members.size(); i++)
+      for (int i = 0; i < this.members.size(); i++)
       {
-         result.add( ((ClusterNode) members.elementAt(i)).getName() );
+         result.add( ((ClusterNode) this.members.elementAt(i)).getName() );
       }
       return result;
    }
 
    public ClusterNode[] getClusterNodes ()
    {
-      synchronized (members)
+      synchronized (this.members)
       {
          ClusterNode[] nodes = new ClusterNode[this.members.size()];
          nodes = (ClusterNode[]) this.members.toArray(nodes);
@@ -903,7 +900,7 @@
 
    public ClusterNode getClusterNode ()
    {
-      return me;
+      return this.me;
    }
 
    public boolean isCurrentNodeCoordinator ()
@@ -921,19 +918,19 @@
    //
    public void registerRPCHandler(String objName, Object subscriber)
    {
-      rpcHandlers.put(objName, subscriber);
+      this.rpcHandlers.put(objName, subscriber);
    }
    
    public void registerRPCHandler(String objName, Object subscriber, ClassLoader classloader)
    {
       registerRPCHandler(objName, subscriber);
-      clmap.put(objName, new WeakReference<ClassLoader>(classloader));
+      this.clmap.put(objName, new WeakReference<ClassLoader>(classloader));
    }
    
    public void unregisterRPCHandler(String objName, Object subscriber)
    {
-      rpcHandlers.remove(objName);
-      clmap.remove(objName);
+      this.rpcHandlers.remove(objName);
+      this.clmap.remove(objName);
    }      
 
    /**
@@ -950,31 +947,31 @@
        Object[] args, Class[] types, boolean excludeSelf, long methodTimeout) throws Exception
    {
       RspList rsp = null;
-      boolean trace = log.isTraceEnabled();
+      boolean trace = this.log.isTraceEnabled();
 
       MethodCall m = new MethodCall(objName + "." + methodName, args, types);
       
-      if(channel.flushSupported())
+      if(this.channel.flushSupported())
       {
-     	 flushBlockGate.await(getStateTransferTimeout());
+     	 this.flushBlockGate.await(getStateTransferTimeout());
       }
       if (excludeSelf)
       {
          if( trace )
          {
-            log.trace("callMethodOnCluster(true), objName="+objName
-               +", methodName="+methodName+", members="+jgotherMembers);
+            this.log.trace("callMethodOnCluster(true), objName="+objName
+               +", methodName="+methodName+", members="+this.jgotherMembers);
          }
-         rsp = dispatcher.callRemoteMethods(this.jgotherMembers, m, GroupRequest.GET_ALL, methodTimeout);
+         rsp = this.dispatcher.callRemoteMethods(this.jgotherMembers, m, GroupRequest.GET_ALL, methodTimeout);
       }
       else
       {
          if( trace )
          {
-            log.trace("callMethodOnCluster(false), objName="+objName
-               +", methodName="+methodName+", members="+members);
+            this.log.trace("callMethodOnCluster(false), objName="+objName
+               +", methodName="+methodName+", members="+this.members);
          }
-         rsp = dispatcher.callRemoteMethods(null, m, GroupRequest.GET_ALL, methodTimeout);
+         rsp = this.dispatcher.callRemoteMethods(null, m, GroupRequest.GET_ALL, methodTimeout);
       }
 
       return processResponseList(rsp, trace);
@@ -1014,13 +1011,13 @@
    public ArrayList callMethodOnCoordinatorNode(String objName, String methodName,
           Object[] args, Class[] types,boolean excludeSelf, long methodTimeout) throws Exception
    {
-      boolean trace = log.isTraceEnabled();
+      boolean trace = this.log.isTraceEnabled();
 
       MethodCall m = new MethodCall(objName + "." + methodName, args, types);
       
       if( trace )
       {
-         log.trace("callMethodOnCoordinatorNode(false), objName="+objName
+         this.log.trace("callMethodOnCoordinatorNode(false), objName="+objName
             +", methodName="+methodName);
       }
 
@@ -1033,7 +1030,7 @@
          coordinatorOnly.addElement(this.jgmembers.elementAt (0));
       }
       
-      RspList rsp = dispatcher.callRemoteMethods(coordinatorOnly, m, GroupRequest.GET_ALL, methodTimeout);
+      RspList rsp = this.dispatcher.callRemoteMethods(coordinatorOnly, m, GroupRequest.GET_ALL, methodTimeout);
 
       return processResponseList(rsp, trace);
    }
@@ -1057,17 +1054,17 @@
           throw new IllegalArgumentException("targetNode " + targetNode + " is not an instance of " + 
                                           ClusterNodeImpl.class + " -- only targetNodes provided by this HAPartition should be used");
        MethodCall m;
-       boolean trace = log.isTraceEnabled();
+       boolean trace = this.log.isTraceEnabled();
        if(types != null)
           m=new MethodCall(serviceName + "." + methodName, args, types);
        else
           m=new MethodCall(serviceName + "." + methodName, args);
        if( trace )
        {
-          log.trace("callMethodOnNode( objName="+serviceName
+          this.log.trace("callMethodOnNode( objName="+serviceName
              +", methodName="+methodName);
        }
-       Object rc = dispatcher.callRemoteMethod(((ClusterNodeImpl)targetNode).getOriginalJGAddress(), m, GroupRequest.GET_FIRST, methodTimeout);
+       Object rc = this.dispatcher.callRemoteMethod(((ClusterNodeImpl)targetNode).getOriginalJGAddress(), m, GroupRequest.GET_FIRST, methodTimeout);
        if (rc != null)
        {
           Object item = rc;
@@ -1083,14 +1080,14 @@
                    rc = item;
                 }
                 else if( trace )
-                   log.trace("Ignoring non-received response: "+response);
+                   this.log.trace("Ignoring non-received response: "+response);
              }
              else
              {
                 if (!(item instanceof NoHandlerForRPC))
                    rc = item;
                 else if( trace )
-                   log.trace("Ignoring NoHandlerForRPC");
+                   this.log.trace("Ignoring NoHandlerForRPC");
              }
           }
        return rc;
@@ -1116,17 +1113,17 @@
          throw new IllegalArgumentException("targetNode " + targetNode + " is not an instance of " + 
                                          ClusterNodeImpl.class + " -- only targetNodes provided by this HAPartition should be used");
       MethodCall m;
-       boolean trace = log.isTraceEnabled();
+       boolean trace = this.log.isTraceEnabled();
        if(types != null)
           m=new MethodCall(serviceName + "." + methodName, args, types);
        else
           m=new MethodCall(serviceName + "." + methodName, args);
        if( trace )
        {
-          log.trace("callAsyncMethodOnNode( objName="+serviceName
+          this.log.trace("callAsyncMethodOnNode( objName="+serviceName
              +", methodName="+methodName);
        }
-       dispatcher.callRemoteMethod(((ClusterNodeImpl)targetNode).getOriginalJGAddress(), m, GroupRequest.GET_NONE, methodTimeout);
+       this.dispatcher.callRemoteMethod(((ClusterNodeImpl)targetNode).getOriginalJGAddress(), m, GroupRequest.GET_NONE, methodTimeout);
    }
 
    private ArrayList processResponseList(RspList rsp, boolean trace)
@@ -1148,14 +1145,14 @@
                      rtn.add(item);
                }
                else if( trace )
-                  log.trace("Ignoring non-received response: "+response);
+                  this.log.trace("Ignoring non-received response: "+response);
             }
             else
             {
                if (!(item instanceof NoHandlerForRPC))
                   rtn.add(item);
                else if( trace )
-                  log.trace("Ignoring NoHandlerForRPC");
+                  this.log.trace("Ignoring NoHandlerForRPC");
             }
          }
          
@@ -1169,31 +1166,31 @@
    public void callAsynchMethodOnCluster(String objName, String methodName,
       Object[] args, Class[] types, boolean excludeSelf) throws Exception
    {
-      boolean trace = log.isTraceEnabled();
+      boolean trace = this.log.isTraceEnabled();
 
       MethodCall m = new MethodCall(objName + "." + methodName, args, types);
 
-      if(channel.flushSupported())
+      if(this.channel.flushSupported())
       {
-     	 flushBlockGate.await(getStateTransferTimeout());
+     	 this.flushBlockGate.await(getStateTransferTimeout());
       }
       if (excludeSelf)
       {
          if( trace )
          {
-            log.trace("callAsynchMethodOnCluster(true), objName="+objName
-               +", methodName="+methodName+", members="+jgotherMembers);
+            this.log.trace("callAsynchMethodOnCluster(true), objName="+objName
+               +", methodName="+methodName+", members="+this.jgotherMembers);
          }
-         dispatcher.callRemoteMethods(this.jgotherMembers, m, GroupRequest.GET_NONE, getMethodCallTimeout());
+         this.dispatcher.callRemoteMethods(this.jgotherMembers, m, GroupRequest.GET_NONE, getMethodCallTimeout());
       }
       else
       {
          if( trace )
          {
-            log.trace("callAsynchMethodOnCluster(false), objName="+objName
-               +", methodName="+methodName+", members="+members);
+            this.log.trace("callAsynchMethodOnCluster(false), objName="+objName
+               +", methodName="+methodName+", members="+this.members);
          }
-         dispatcher.callRemoteMethods(null, m, GroupRequest.GET_NONE, getMethodCallTimeout());
+         this.dispatcher.callRemoteMethods(null, m, GroupRequest.GET_NONE, getMethodCallTimeout());
       }
    }
    
@@ -1205,12 +1202,12 @@
    //      
    public void subscribeToStateTransferEvents(String objectName, HAPartitionStateTransfer subscriber)
    {
-      stateHandlers.put(objectName, subscriber);
+      this.stateHandlers.put(objectName, subscriber);
    }
    
    public void unsubscribeFromStateTransferEvents(String objectName, HAPartitionStateTransfer subscriber)
    {
-      stateHandlers.remove(objectName);
+      this.stateHandlers.remove(objectName);
    }
    
    // *************************
@@ -1255,7 +1252,7 @@
    
    public boolean getAllowSynchronousMembershipNotifications()
    {
-      return allowSyncListeners;
+      return this.allowSyncListeners;
    }
 
    public void setAllowSynchronousMembershipNotifications(boolean allowSync)
@@ -1268,7 +1265,7 @@
    public void processEvent(Object event)
    {
       ViewChangeEvent vce = (ViewChangeEvent) event;
-      notifyListeners(asynchListeners, vce.viewId, vce.allMembers,
+      notifyListeners(this.asynchListeners, vce.viewId, vce.allMembers,
             vce.deadMembers, vce.newMembers, vce.originatingGroups);
       
    }
@@ -1283,7 +1280,7 @@
    
    public void setDistributedReplicantManagerImpl(DistributedReplicantManagerImpl drm)
    {
-      if (this.replicantManager != null  && !(replicantManager == drm))
+      if (this.replicantManager != null  && !(this.replicantManager == drm))
          throw new IllegalStateException("DistributedReplicantManager already set");
 
       this.replicantManager = drm;
@@ -1301,7 +1298,7 @@
       ClusterNodeImpl matched = null;
       for (ClusterNode member : getClusterNodes())
       {
-         if (member.equals(me))
+         if (member.equals(this.me))
          {
             if (matched == null)
             {
@@ -1313,12 +1310,12 @@
             {
                // Two nodes in view match us; try to figure out which one isn't us
                ClusterNodeImpl other = matched;
-               if (other.getOriginalJGAddress().equals(((ClusterNodeImpl)me).getOriginalJGAddress()))
+               if (other.getOriginalJGAddress().equals(((ClusterNodeImpl)this.me).getOriginalJGAddress()))
                {
                   other = (ClusterNodeImpl) member;
                }
                throw new IllegalStateException("Found member " + other + 
-                     " in current view that duplicates us (" + me + "). This" +
+                     " in current view that duplicates us (" + this.me + "). This" +
                      " node cannot join partition until duplicate member has" +
                      " been removed");
             }
@@ -1349,7 +1346,7 @@
          }
          catch (NameNotFoundException e)
          {
-            log.debug ("creating Subcontext " + ctxName);
+            this.log.debug ("creating Subcontext " + ctxName);
             ctx = ctx.createSubcontext (ctxName);
          }
          n = n.getSuffix (1);
@@ -1376,7 +1373,7 @@
       if(newMembers == null) newMembers=new Vector();
       Vector dead=(Vector)oldMembers.clone();
       dead.removeAll(newMembers);
-      log.debug("dead members: " + dead);
+      this.log.debug("dead members: " + dead);
       return dead;
    }
    
@@ -1399,7 +1396,7 @@
       Vector allMembers, Vector deadMembers, Vector newMembers,
       Vector originatingGroups)
    {
-      log.debug("Begin notifyListeners, viewID: "+viewID);
+      this.log.debug("Begin notifyListeners, viewID: "+viewID);
       synchronized(theListeners)
       {
          // JBAS-3619 -- don't hold synch lock while notifying
@@ -1426,11 +1423,11 @@
          catch (Throwable e)
          {
             // a problem in a listener should not prevent other members to receive the new view
-            log.warn("HAMembershipListener callback failure: "+aListener, e);
+            this.log.warn("HAMembershipListener callback failure: "+aListener, e);
          }
       }
       
-      log.debug("End notifyListeners, viewID: "+viewID);
+      this.log.debug("End notifyListeners, viewID: "+viewID);
    }
    
    /*
@@ -1441,7 +1438,7 @@
     */
    public void setBindIntoJndi(boolean bind)
    {
-       bindIntoJndi = bind;
+       this.bindIntoJndi = bind;
    }
    
    /*
@@ -1451,7 +1448,7 @@
     */
    public boolean getBindIntoJndi()
    {
-       return bindIntoJndi;
+       return this.bindIntoJndi;
    }
    
    
@@ -1459,7 +1456,7 @@
 
    public ThreadPool getThreadPool()
    {
-      return threadPool;
+      return this.threadPool;
    }
 
    public void setThreadPool(ThreadPool threadPool)
@@ -1486,7 +1483,7 @@
    {
       try
       {
-         history.add(new SimpleDateFormat().format (new Date()) + " : " + message);
+         this.history.add(new SimpleDateFormat().format (new Date()) + " : " + message);
       }
       catch (Exception ignored){}
    }
@@ -1523,17 +1520,17 @@
    
    public Cache getClusteredCache()
    {
-      return cache;
+      return this.cache;
    }
    
    public boolean getDeadlockDetection()
    {
-      return deadlock_detection;
+      return this.deadlock_detection;
    }
 
    public void setDeadlockDetection(boolean doit)
    {
-      deadlock_detection = doit;
+      this.deadlock_detection = doit;
    }
 
    public HAPartition getHAPartition()
@@ -1548,12 +1545,12 @@
 
    public ChannelFactory getChannelFactory()
    {
-      return channelFactory;
+      return this.channelFactory;
    }
 
    public CacheManager getCacheManager()
    {
-      return cacheManager;
+      return this.cacheManager;
    }
 
    public void setCacheManager(CacheManager cacheManager)
@@ -1563,7 +1560,7 @@
 
    public String getCacheConfigName()
    {
-      return cacheConfigName;
+      return this.cacheConfigName;
    }
 
    public void setCacheConfigName(String cacheConfigName)
@@ -1573,12 +1570,12 @@
 
    public String getChannelStackName()
    {
-      return stackName;
+      return this.stackName;
    }
 
    public InetAddress getNodeAddress()
    {
-      return nodeAddress;
+      return this.nodeAddress;
    }
 
    public void setNodeAddress(InetAddress address)
@@ -1587,7 +1584,7 @@
    }
 
    public long getStateTransferTimeout() {
-      return state_transfer_timeout;
+      return this.state_transfer_timeout;
    }
 
    public void setStateTransferTimeout(long timeout)
@@ -1596,7 +1593,7 @@
    }
 
    public long getMethodCallTimeout() {
-      return method_call_timeout;
+      return this.method_call_timeout;
    }
 
    public void setMethodCallTimeout(long timeout)
@@ -1680,14 +1677,14 @@
       {
          logHistory ("getState called on partition");
          
-         log.debug("getState called.");
+         ClusterPartition.this.log.debug("getState called.");
          try
          {
             getStateInternal(stream);
          }
          catch (Exception ex)
          {
-            log.error("getState failed", ex);
+            ClusterPartition.this.log.error("getState failed", ex);
          }
          
       }
@@ -1709,14 +1706,14 @@
          {
             if (stream == null)
             {
-               log.debug("transferred serviceState is null (may be first member in cluster)");
+               ClusterPartition.this.log.debug("transferred serviceState is null (may be first member in cluster)");
             }
             else
             {
                setStateInternal(stream);
             }
             
-            isStateSet = true;
+            ClusterPartition.this.isStateSet = true;
          }
          catch (Throwable t)
          {
@@ -1733,7 +1730,7 @@
       {
          logHistory ("getState called on partition");
          
-         log.debug("getState called.");
+         ClusterPartition.this.log.debug("getState called.");
          try
          {
             ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
@@ -1742,7 +1739,7 @@
          }
          catch (Exception ex)
          {
-            log.error("getState failed", ex);
+            ClusterPartition.this.log.error("getState failed", ex);
          }
          return null; // This will cause the receiver to get a "false" on the channel.getState() call
       }
@@ -1767,7 +1764,7 @@
          {
             if (obj == null)
             {
-               log.debug("transferred serviceState is null (may be first member in cluster)");
+               ClusterPartition.this.log.debug("transferred serviceState is null (may be first member in cluster)");
             }
             else
             {
@@ -1776,7 +1773,7 @@
                bais.close();
             }
             
-            isStateSet = true;
+            ClusterPartition.this.isStateSet = true;
          }
          catch (Throwable t)
          {
@@ -1832,7 +1829,7 @@
       
       public Object objectFromByteBuffer(byte[] buf) throws Exception
       {
-         boolean trace = log.isTraceEnabled();
+         boolean trace = ClusterPartition.this.log.isTraceEnabled();
          Object retval = objectFromByteBufferResponseInternal(buf);
          // HAServiceResponse is only received when a scoped classloader is required for unmarshalling
          if (!(retval instanceof HAServiceResponse))
@@ -1847,13 +1844,13 @@
          boolean overrideCL = false;
          try
          {
-            WeakReference<ClassLoader> weak = clmap.get(serviceName);
+            WeakReference<ClassLoader> weak = ClusterPartition.this.clmap.get(serviceName);
             if (weak != null) // this should always be true since we only use HAServiceResponse when classloader is specified
             {
                previousCL = Thread.currentThread().getContextClassLoader();
                ClassLoader loader = weak.get();
                if( trace )
-                  log.trace("overriding response Thread ContextClassLoader for service " + serviceName);            
+                  ClusterPartition.this.log.trace("overriding response Thread ContextClassLoader for service " + serviceName);            
                overrideCL = true;
                Thread.currentThread().setContextClassLoader(loader);
             }
@@ -1865,7 +1862,7 @@
          {
             if (overrideCL == true)
             {
-               log.trace("resetting response classloader");
+               ClusterPartition.this.log.trace("resetting response classloader");
                Thread.currentThread().setContextClassLoader(previousCL);
             }
          }
@@ -1903,17 +1900,17 @@
          Object body = null;
          Object retval = null;
          Object handler = null;
-         boolean trace = log.isTraceEnabled();
+         boolean trace = this.log.isTraceEnabled();
          boolean overrideCL = false;
          ClassLoader previousCL = null;
          String service = null;
          byte[] request_bytes = null;
          
          if( trace )
-            log.trace("Partition " + getPartitionName() + " received msg");
+            this.log.trace("Partition " + getPartitionName() + " received msg");
          if(req == null || req.getBuffer() == null)
          {
-            log.warn("Partition " + getPartitionName() + " message or message buffer is null!");
+            this.log.warn("Partition " + getPartitionName() + " message or message buffer is null!");
             return null;
          }
          
@@ -1922,7 +1919,7 @@
             Object wrapper = objectFromByteBufferInternal(req.getBuffer());
             if(wrapper == null || !(wrapper instanceof Object[]))
             {
-               log.warn("Partition " + getPartitionName() + " message wrapper does not contain Object[] object!");
+               this.log.warn("Partition " + getPartitionName() + " message wrapper does not contain Object[] object!");
                return null;
             }
 
@@ -1932,28 +1929,28 @@
             request_bytes = (byte[])temp[1];
 
             // see if this node has registered to handle this service
-            handler = rpcHandlers.get(service);
+            handler = ClusterPartition.this.rpcHandlers.get(service);
             if (handler == null)
             {
                if( trace )
-                  log.trace("Partition " + getPartitionName() + " no rpc handler registered under service " + service);
+                  this.log.trace("Partition " + getPartitionName() + " no rpc handler registered under service " + service);
                return new NoHandlerForRPC();
             }
          }
          catch(Exception e)
          {
-            log.warn("Partition " + getPartitionName() + " failed unserializing message buffer (msg=" + req + ")", e);
+            this.log.warn("Partition " + getPartitionName() + " failed unserializing message buffer (msg=" + req + ")", e);
             return null;
          }
          
          try
          {            
             // If client registered the service with a classloader, override the thread classloader here
-            WeakReference<ClassLoader> weak = clmap.get(service);
+            WeakReference<ClassLoader> weak = ClusterPartition.this.clmap.get(service);
             if (weak != null)
             {
                if( trace )
-                  log.trace("overriding Thread ContextClassLoader for RPC service " + service);
+                  this.log.trace("overriding Thread ContextClassLoader for RPC service " + service);
                previousCL = Thread.currentThread().getContextClassLoader();
                ClassLoader loader = weak.get();
                overrideCL = true;
@@ -1963,21 +1960,21 @@
          }
          catch (Exception e)
          {
-            log.warn("Partition " + getPartitionName() + " failed extracting message body from request bytes", e);
+            this.log.warn("Partition " + getPartitionName() + " failed extracting message body from request bytes", e);
             return null;
          }
          finally
          {
             if (overrideCL)
             {
-               log.trace("resetting Thread ContextClassLoader");
+               this.log.trace("resetting Thread ContextClassLoader");
                Thread.currentThread().setContextClassLoader(previousCL);
             }
          }
          
          if(body == null || !(body instanceof MethodCall))
          {
-            log.warn("Partition " + getPartitionName() + " message does not contain a MethodCall object!");
+            this.log.warn("Partition " + getPartitionName() + " message does not contain a MethodCall object!");
             return null;
          }
          
@@ -1986,15 +1983,15 @@
          String methodName = method_call.getName();      
          
          if( trace )
-            log.trace("full methodName: " + methodName);
+            this.log.trace("full methodName: " + methodName);
          
          int idx = methodName.lastIndexOf('.');
          String handlerName = methodName.substring(0, idx);
          String newMethodName = methodName.substring(idx + 1);
          if( trace ) 
          {
-            log.trace("handlerName: " + handlerName + " methodName: " + newMethodName);
-            log.trace("Handle: " + methodName);
+            this.log.trace("handlerName: " + handlerName + " methodName: " + newMethodName);
+            this.log.trace("Handle: " + methodName);
          }
          
          // prepare method call
@@ -2014,12 +2011,12 @@
                retval = new HAServiceResponse(handlerName, retbytes);
             }
             if( trace )
-               log.trace("rpc call return value: " + retval);
+               this.log.trace("rpc call return value: " + retval);
          }
          catch (Throwable t)
          {
             if( trace )
-               log.trace("Partition " + getPartitionName() + " rpc call threw exception", t);
+               this.log.trace("Partition " + getPartitionName() + " rpc call threw exception", t);
             retval = t;
          }
 
@@ -2046,29 +2043,29 @@
 
        public synchronized void close() 
        {
-           isOpen = false;
+           this.isOpen = false;
        }
 
        public synchronized void open() 
        {
-           ++generation;
-           isOpen = true;
+           ++this.generation;
+           this.isOpen = true;
            notifyAll();
        }
 
        // BLOCKS-UNTIL: opened-since(generation on entry)
        public synchronized void await() throws InterruptedException 
        {
-           int arrivalGeneration = generation;
-           while(!isOpen && arrivalGeneration == generation)
+           int arrivalGeneration = this.generation;
+           while(!this.isOpen && arrivalGeneration == this.generation)
                wait();
        }
        
        // BLOCKS-UNTIL: opened-since(generation on entry)
        public synchronized void await(long timeout) throws InterruptedException 
        {
-           int arrivalGeneration = generation;
-           while(!isOpen && arrivalGeneration == generation)
+           int arrivalGeneration = this.generation;
+           while(!this.isOpen && arrivalGeneration == this.generation)
                wait(timeout);
        }
    }




More information about the jboss-cvs-commits mailing list