[jboss-cvs] JBossAS SVN: r74414 - trunk/cluster/src/main/org/jboss/ha/framework/server.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Jun 11 16:08:04 EDT 2008
Author: pferraro
Date: 2008-06-11 16:08:04 -0400 (Wed, 11 Jun 2008)
New Revision: 74414
Modified:
trunk/cluster/src/main/org/jboss/ha/framework/server/ClusterPartition.java
Log:
[JBAS-5433] Ensure ClusterPartition can handle concurrent JGroups requests
Included some minor code style modifications.
Modified: trunk/cluster/src/main/org/jboss/ha/framework/server/ClusterPartition.java
===================================================================
--- trunk/cluster/src/main/org/jboss/ha/framework/server/ClusterPartition.java 2008-06-11 19:54:55 UTC (rev 74413)
+++ trunk/cluster/src/main/org/jboss/ha/framework/server/ClusterPartition.java 2008-06-11 20:08:04 UTC (rev 74414)
@@ -33,7 +33,6 @@
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.Map;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
@@ -132,12 +131,12 @@
public String getServiceName()
{
- return serviceName;
+ return this.serviceName;
}
public byte[] getPayload()
{
- return payload;
+ return this.payload;
}
}
@@ -154,18 +153,18 @@
{
try
{
- channel.connect(getPartitionName());
+ ClusterPartition.this.channel.connect(getPartitionName());
}
catch (Exception e)
{
- synchronized (channelLock)
+ synchronized (ClusterPartition.this.channelLock)
{
- connectException = e;
+ ClusterPartition.this.connectException = e;
}
}
finally
{
- latch.countDown();
+ this.latch.countDown();
}
}
}
@@ -190,8 +189,8 @@
/** Thread pool used to asynchronously start our channel */
private ThreadPool threadPool;
- protected HashMap<String, Object> rpcHandlers = new HashMap<String, Object>();
- protected HashMap stateHandlers = new HashMap();
+ protected Map<String, Object> rpcHandlers = new ConcurrentHashMap<String, Object>();
+ protected Map<String, HAPartitionStateTransfer> stateHandlers = new HashMap<String, HAPartitionStateTransfer>();
/** Do we send any membership change notifications synchronously? */
protected boolean allowSyncListeners = false;
/** The HAMembershipListener and HAMembershipExtendedListeners */
@@ -203,7 +202,7 @@
/** The current cluster partition members */
protected Vector members = null;
protected Vector jgmembers = null;
- protected ConcurrentHashMap<String, WeakReference<ClassLoader>> clmap =
+ protected Map<String, WeakReference<ClassLoader>> clmap =
new ConcurrentHashMap<String, WeakReference<ClassLoader>>();
public Vector history = new Vector();
@@ -288,63 +287,63 @@
protected void createService() throws Exception
{
- if (replicantManager == null)
+ if (this.replicantManager == null)
throw new IllegalStateException("DistributedReplicantManager property must be set before creating ClusterPartition service");
setupLoggers(getPartitionName());
- replicantManager.createService();
+ this.replicantManager.createService();
- if (distributedState != null)
+ if (this.distributedState != null)
{
- distributedState.createService();
+ this.distributedState.createService();
}
// Create the asynchronous handler for view changes
- asynchHandler = new AsynchEventHandler(this, "AsynchViewChangeHandler");
+ this.asynchHandler = new AsynchEventHandler(this, "AsynchViewChangeHandler");
- log.debug("done initializing partition");
+ this.log.debug("done initializing partition");
}
protected void startService() throws Exception
{
logHistory ("Starting partition");
- cache = cacheManager.getCache(cacheConfigName, true);
- channelFactory = cache.getConfiguration().getRuntimeConfig().getMuxChannelFactory();
- stackName = cache.getConfiguration().getMultiplexerStack();
+ this.cache = this.cacheManager.getCache(this.cacheConfigName, true);
+ this.channelFactory = this.cache.getConfiguration().getRuntimeConfig().getMuxChannelFactory();
+ this.stackName = this.cache.getConfiguration().getMultiplexerStack();
- if (channel == null || !channel.isOpen())
+ if (this.channel == null || !this.channel.isOpen())
{
- log.debug("Creating Channel for partition " + getPartitionName() +
+ this.log.debug("Creating Channel for partition " + getPartitionName() +
" using stack " + getChannelStackName());
- channel = createChannel();
+ this.channel = createChannel();
- channel.setOpt(Channel.AUTO_RECONNECT, Boolean.TRUE);
- channel.setOpt(Channel.AUTO_GETSTATE, Boolean.TRUE);
+ this.channel.setOpt(Channel.AUTO_RECONNECT, Boolean.TRUE);
+ this.channel.setOpt(Channel.AUTO_GETSTATE, Boolean.TRUE);
}
- log.info("Initializing partition " + getPartitionName());
+ this.log.info("Initializing partition " + getPartitionName());
logHistory ("Initializing partition " + getPartitionName());
- dispatcher = new RpcHandler(channel, null, null, new Object(), getDeadlockDetection());
+ this.dispatcher = new RpcHandler(this.channel, null, null, new Object(), getDeadlockDetection());
// Subscribe to events generated by the channel
- log.debug("setMembershipListener");
- dispatcher.setMembershipListener(this);
- log.debug("setMessageListener");
- dispatcher.setMessageListener(messageListener);
- dispatcher.setRequestMarshaller(new RequestMarshallerImpl());
- dispatcher.setResponseMarshaller(new ResponseMarshallerImpl());
+ this.log.debug("setMembershipListener");
+ this.dispatcher.setMembershipListener(this);
+ this.log.debug("setMessageListener");
+ this.dispatcher.setMessageListener(this.messageListener);
+ this.dispatcher.setRequestMarshaller(new RequestMarshallerImpl());
+ this.dispatcher.setResponseMarshaller(new ResponseMarshallerImpl());
// Clear any old connectException
- connectException = null;
+ this.connectException = null;
CountDownLatch connectLatch = new CountDownLatch(1);
- if (threadPool == null)
+ if (this.threadPool == null)
{
- channel.connect(getPartitionName());
+ this.channel.connect(getPartitionName());
connectLatch.countDown();
}
else
@@ -352,25 +351,25 @@
// Do the channel connect in another thread while this
// thread starts the cache and does that channel connect
ChannelConnectTask task = new ChannelConnectTask(connectLatch);
- threadPool.run(task);
+ this.threadPool.run(task);
}
- cache.start();
+ this.cache.start();
try
{
// This will block waiting for any async channel connect above
connectLatch.await();
- if (connectException != null)
- throw connectException;
+ if (this.connectException != null)
+ throw this.connectException;
- log.debug("Get current members");
+ this.log.debug("Get current members");
waitForView();
// get current JG group properties
- log.debug("get nodeName");
- this.localJGAddress = (IpAddress)channel.getLocalAddress();
+ this.log.debug("get nodeName");
+ this.localJGAddress = (IpAddress)this.channel.getLocalAddress();
this.me = new ClusterNodeImpl(this.localJGAddress);
this.nodeName = this.me.getName();
@@ -378,34 +377,34 @@
fetchState();
- replicantManager.startService();
+ this.replicantManager.startService();
- if (distributedState != null)
+ if (this.distributedState != null)
{
- distributedState.setClusteredCache(getClusteredCache());
- distributedState.startService();
+ this.distributedState.setClusteredCache(getClusteredCache());
+ this.distributedState.startService();
}
// Start the asynch listener handler thread
- asynchHandler.start();
+ this.asynchHandler.start();
// Register with the service locator
HAPartitionLocator.getHAPartitionLocator().registerHAPartition(this);
// Bind ourself in the public JNDI space if configured to do so
- if (bindIntoJndi)
+ if (this.bindIntoJndi)
{
Context ctx = new InitialContext();
- this.bind(HAPartitionLocator.getStandardJndiBinding(getPartitionName()),
+ bind(HAPartitionLocator.getStandardJndiBinding(getPartitionName()),
this, ClusterPartition.class, ctx);
- log.debug("Bound in JNDI under /HAPartition/" + getPartitionName());
+ this.log.debug("Bound in JNDI under /HAPartition/" + getPartitionName());
}
}
catch (Throwable t)
{
- log.debug("Caught exception after channel connected; closing channel -- " + t.getLocalizedMessage());
- channel.close();
- channel = null;
+ this.log.debug("Caught exception after channel connected; closing channel -- " + t.getLocalizedMessage());
+ this.channel.close();
+ this.channel = null;
throw (t instanceof Exception) ? (Exception) t : new RuntimeException(t);
}
@@ -414,46 +413,46 @@
protected void stopService() throws Exception
{
logHistory ("Stopping partition");
- log.info("Stopping partition " + getPartitionName());
+ this.log.info("Stopping partition " + getPartitionName());
try
{
- asynchHandler.stop();
+ this.asynchHandler.stop();
}
catch( Exception e)
{
- log.warn("Failed to stop asynchHandler", e);
+ this.log.warn("Failed to stop asynchHandler", e);
}
- if (distributedState != null)
+ if (this.distributedState != null)
{
- distributedState.stopService();
+ this.distributedState.stopService();
}
- replicantManager.stopService();
+ this.replicantManager.stopService();
try
{
- cacheManager.releaseCache(cacheConfigName);
+ this.cacheManager.releaseCache(this.cacheConfigName);
}
catch (Exception e)
{
- log.error("cache release failed", e);
+ this.log.error("cache release failed", e);
}
// NR 200505 : [JBCLUSTER-38] replace channel.close() by a disconnect and
// add the destroyPartition() step
try
{
- if (channel != null && channel.isConnected())
- channel.disconnect();
+ if (this.channel != null && this.channel.isConnected())
+ this.channel.disconnect();
}
catch (Exception e)
{
- log.error("channel disconnection failed", e);
+ this.log.error("channel disconnection failed", e);
}
- if (bindIntoJndi)
+ if (this.bindIntoJndi)
{
String boundName = HAPartitionLocator.getStandardJndiBinding(getPartitionName());
InitialContext ctx = null;
@@ -464,7 +463,7 @@
ctx.unbind(boundName);
}
catch (Exception e) {
- log.error("partition unbind operation failed", e);
+ this.log.error("partition unbind operation failed", e);
}
finally
{
@@ -476,31 +475,31 @@
HAPartitionLocator.getHAPartitionLocator().deregisterHAPartition(this);
- log.info("Partition " + getPartitionName() + " stopped.");
+ this.log.info("Partition " + getPartitionName() + " stopped.");
}
protected void destroyService() throws Exception
{
- log.debug("Destroying HAPartition: " + getPartitionName());
+ this.log.debug("Destroying HAPartition: " + getPartitionName());
- if (distributedState != null)
+ if (this.distributedState != null)
{
- distributedState.destroyService();
+ this.distributedState.destroyService();
}
- replicantManager.destroyService();
+ this.replicantManager.destroyService();
try
{
- if (channel != null && channel.isOpen())
- channel.close();
+ if (this.channel != null && this.channel.isOpen())
+ this.channel.close();
}
catch (Exception e)
{
- log.error("Closing channel failed", e);
+ this.log.error("Closing channel failed", e);
}
- log.info("Partition " + getPartitionName() + " destroyed.");
+ this.log.info("Partition " + getPartitionName() + " destroyed.");
}
// ---------------------------------------------------------- State Transfer
@@ -508,24 +507,24 @@
protected void fetchState() throws Exception
{
- log.info("Fetching serviceState (will wait for " + getStateTransferTimeout() +
+ this.log.info("Fetching serviceState (will wait for " + getStateTransferTimeout() +
" milliseconds):");
long start, stop;
- isStateSet = false;
+ this.isStateSet = false;
start = System.currentTimeMillis();
- boolean rc = channel.getState(null, getStateTransferTimeout());
+ boolean rc = this.channel.getState(null, getStateTransferTimeout());
if (rc)
{
- synchronized (channelLock)
+ synchronized (this.channelLock)
{
- while (!isStateSet)
+ while (!this.isStateSet)
{
- if (setStateException != null)
- throw setStateException;
+ if (this.setStateException != null)
+ throw this.setStateException;
try
{
- channelLock.wait();
+ this.channelLock.wait();
}
catch (InterruptedException iex)
{
@@ -533,7 +532,7 @@
}
}
stop = System.currentTimeMillis();
- log.info("serviceState was retrieved successfully (in " + (stop - start) + " milliseconds)");
+ this.log.info("serviceState was retrieved successfully (in " + (stop - start) + " milliseconds)");
}
else
{
@@ -541,14 +540,14 @@
// We need to find out if we are the coordinator, so we must
// block until viewAccepted() is called at least once
- synchronized (members)
+ synchronized (this.members)
{
- while (members.size() == 0)
+ while (this.members.size() == 0)
{
- log.debug("waiting on viewAccepted()");
+ this.log.debug("waiting on viewAccepted()");
try
{
- members.wait();
+ this.members.wait();
}
catch (InterruptedException iex)
{
@@ -558,7 +557,7 @@
if (isCurrentNodeCoordinator())
{
- log.info("State could not be retrieved (we are the first member in group)");
+ this.log.info("State could not be retrieved (we are the first member in group)");
}
else
{
@@ -572,12 +571,10 @@
{
MarshalledValueOutputStream mvos = null; // don't create until we know we need it
- for (Iterator keys = stateHandlers.entrySet().iterator(); keys.hasNext(); )
+ for (Map.Entry<String, HAPartitionStateTransfer> entry: this.stateHandlers.entrySet())
{
- Map.Entry entry = (Map.Entry)keys.next();
- HAPartition.HAPartitionStateTransfer subscriber =
- (HAPartition.HAPartitionStateTransfer) entry.getValue();
- log.debug("getState for " + entry.getKey());
+ HAPartitionStateTransfer subscriber = entry.getValue();
+ this.log.debug("getState for " + entry.getKey());
Object state = subscriber.getCurrentState();
if (state != null)
{
@@ -614,12 +611,12 @@
if (type == EOF_VALUE)
{
- log.debug("serviceState stream is empty");
+ this.log.debug("serviceState stream is empty");
return;
}
else if (type == NULL_VALUE)
{
- log.debug("serviceState is null");
+ this.log.debug("serviceState is null");
return;
}
@@ -636,9 +633,9 @@
break;
String key = (String) obj;
- log.debug("setState for " + key);
+ this.log.debug("setState for " + key);
Object someState = mvis.readObject();
- HAPartition.HAPartitionStateTransfer subscriber = (HAPartition.HAPartitionStateTransfer)stateHandlers.get(key);
+ HAPartitionStateTransfer subscriber = this.stateHandlers.get(key);
if (subscriber != null)
{
try
@@ -660,13 +657,13 @@
}
else
{
- log.error("Caught exception setting serviceState to " + subscriber, e);
+ this.log.error("Caught exception setting serviceState to " + subscriber, e);
}
}
}
else
{
- log.debug("There is no stateHandler for: " + key);
+ this.log.debug("There is no stateHandler for: " + key);
}
}
@@ -676,29 +673,29 @@
}
catch(Exception e)
{
- log.error("Caught exception closing serviceState stream", e);
+ this.log.error("Caught exception closing serviceState stream", e);
}
used_mem_after=rt.totalMemory() - rt.freeMemory();
- log.debug("received serviceState; expanded memory by " +
+ this.log.debug("received serviceState; expanded memory by " +
(used_mem_after - used_mem_before) + " bytes (used memory before: " + used_mem_before +
", used memory after: " + used_mem_after + ")");
}
private void recordSetStateFailure(Throwable t)
{
- log.error("failed setting serviceState", t);
+ this.log.error("failed setting serviceState", t);
if (t instanceof Exception)
- setStateException = (Exception) t;
+ this.setStateException = (Exception) t;
else
- setStateException = new Exception(t);
+ this.setStateException = new Exception(t);
}
private void notifyChannelLock()
{
- synchronized (channelLock)
+ synchronized (this.channelLock)
{
- channelLock.notifyAll();
+ this.channelLock.notifyAll();
}
}
@@ -708,21 +705,21 @@
{
logHistory ("Node suspected: " + (suspected_mbr==null?"null":suspected_mbr.toString()));
if (isCurrentNodeCoordinator ())
- clusterLifeCycleLog.info ("Suspected member: " + suspected_mbr);
+ this.clusterLifeCycleLog.info ("Suspected member: " + suspected_mbr);
else
- log.info("Suspected member: " + suspected_mbr);
+ this.log.info("Suspected member: " + suspected_mbr);
}
public void block()
{
- flushBlockGate.close();
- log.debug("Block processed at " + me);
+ this.flushBlockGate.close();
+ this.log.debug("Block processed at " + this.me);
}
public void unblock()
{
- flushBlockGate.open();
- log.debug("Unblock processed at " + me);
+ this.flushBlockGate.open();
+ this.log.debug("Unblock processed at " + this.me);
}
/** Notification of a cluster view change. This is done from the JG protocol
@@ -743,7 +740,7 @@
// Keep a list of other members only for "exclude-self" RPC calls
this.jgotherMembers = (Vector)newView.getMembers().clone();
- this.jgotherMembers.remove (channel.getLocalAddress());
+ this.jgotherMembers.remove (this.channel.getLocalAddress());
this.otherMembers = translateAddresses (this.jgotherMembers); // TRANSLATE!
Vector translatedNewView = translateAddresses ((Vector)newView.getMembers().clone());
logHistory ("New view: " + translatedNewView + " with viewId: " + this.currentViewId +
@@ -761,16 +758,16 @@
if (oldMembers == null)
{
// Initial viewAccepted
- log.debug("ViewAccepted: initial members set for partition " + getPartitionName() + ": " +
+ this.log.debug("ViewAccepted: initial members set for partition " + getPartitionName() + ": " +
this.currentViewId + " (" + this.members + ")");
- log.info("Number of cluster members: " + members.size());
- for(int m = 0; m > members.size(); m ++)
+ this.log.info("Number of cluster members: " + this.members.size());
+ for(int m = 0; m > this.members.size(); m ++)
{
- Object node = members.get(m);
- log.debug(node);
+ Object node = this.members.get(m);
+ this.log.debug(node);
}
- log.info ("Other members: " + this.otherMembers.size ());
+ this.log.info ("Other members: " + this.otherMembers.size ());
// Wake up the deployer thread blocking in waitForView
notifyChannelLock();
@@ -784,15 +781,15 @@
difference = newMembers.size () - oldMembers.size ();
if (isCurrentNodeCoordinator ())
- clusterLifeCycleLog.info ("New cluster view for partition " + getPartitionName() + " (id: " +
+ this.clusterLifeCycleLog.info ("New cluster view for partition " + getPartitionName() + " (id: " +
this.currentViewId + ", delta: " + difference + ") : " + this.members);
else
- log.info("New cluster view for partition " + getPartitionName() + ": " +
+ this.log.info("New cluster view for partition " + getPartitionName() + ": " +
this.currentViewId + " (" + this.members + " delta: " + difference + ")");
// Build a ViewChangeEvent for the asynch listeners
ViewChangeEvent event = new ViewChangeEvent();
- event.viewId = currentViewId;
+ event.viewId = this.currentViewId;
event.allMembers = translatedNewView;
event.deadMembers = getDeadMembers(oldMembers, event.allMembers);
event.newMembers = getNewMembers(oldMembers, event.allMembers);
@@ -804,7 +801,7 @@
event.originatingGroups = mergeView.getSubgroups();
}
- log.debug("membership changed from " +
+ this.log.debug("membership changed from " +
(oldMembers == null ? 0 : oldMembers.size()) + " to " +
event.allMembers.size());
// Put the view change to the asynch queue
@@ -813,35 +810,35 @@
// Broadcast the new view to the synchronous view change listeners
if (this.allowSyncListeners)
{
- this.notifyListeners(synchListeners, event.viewId, event.allMembers,
+ notifyListeners(this.synchListeners, event.viewId, event.allMembers,
event.deadMembers, event.newMembers, event.originatingGroups);
}
}
catch (Exception ex)
{
- log.error("ViewAccepted failed", ex);
+ this.log.error("ViewAccepted failed", ex);
}
}
private void waitForView() throws Exception
{
- synchronized (channelLock)
+ synchronized (this.channelLock)
{
if (this.members == null)
{
- if (connectException != null)
- throw connectException;
+ if (this.connectException != null)
+ throw this.connectException;
try
{
- channelLock.wait(getMethodCallTimeout());
+ this.channelLock.wait(getMethodCallTimeout());
}
catch (InterruptedException iex)
{
}
- if (connectException != null)
- throw connectException;
+ if (this.connectException != null)
+ throw this.connectException;
if (this.members == null)
throw new IllegalStateException("No view received from Channel");
@@ -853,27 +850,27 @@
public String getNodeName()
{
- return nodeName;
+ return this.nodeName;
}
public String getPartitionName()
{
- return partitionName;
+ return this.partitionName;
}
public void setPartitionName(String newName)
{
- partitionName = newName;
+ this.partitionName = newName;
}
public DistributedReplicantManager getDistributedReplicantManager()
{
- return replicantManager;
+ return this.replicantManager;
}
public DistributedState getDistributedStateService()
{
- return distributedState;
+ return this.distributedState;
}
public long getCurrentViewId()
@@ -884,16 +881,16 @@
public Vector getCurrentView()
{
Vector result = new Vector (this.members.size());
- for (int i = 0; i < members.size(); i++)
+ for (int i = 0; i < this.members.size(); i++)
{
- result.add( ((ClusterNode) members.elementAt(i)).getName() );
+ result.add( ((ClusterNode) this.members.elementAt(i)).getName() );
}
return result;
}
public ClusterNode[] getClusterNodes ()
{
- synchronized (members)
+ synchronized (this.members)
{
ClusterNode[] nodes = new ClusterNode[this.members.size()];
nodes = (ClusterNode[]) this.members.toArray(nodes);
@@ -903,7 +900,7 @@
public ClusterNode getClusterNode ()
{
- return me;
+ return this.me;
}
public boolean isCurrentNodeCoordinator ()
@@ -921,19 +918,19 @@
//
public void registerRPCHandler(String objName, Object subscriber)
{
- rpcHandlers.put(objName, subscriber);
+ this.rpcHandlers.put(objName, subscriber);
}
public void registerRPCHandler(String objName, Object subscriber, ClassLoader classloader)
{
registerRPCHandler(objName, subscriber);
- clmap.put(objName, new WeakReference<ClassLoader>(classloader));
+ this.clmap.put(objName, new WeakReference<ClassLoader>(classloader));
}
public void unregisterRPCHandler(String objName, Object subscriber)
{
- rpcHandlers.remove(objName);
- clmap.remove(objName);
+ this.rpcHandlers.remove(objName);
+ this.clmap.remove(objName);
}
/**
@@ -950,31 +947,31 @@
Object[] args, Class[] types, boolean excludeSelf, long methodTimeout) throws Exception
{
RspList rsp = null;
- boolean trace = log.isTraceEnabled();
+ boolean trace = this.log.isTraceEnabled();
MethodCall m = new MethodCall(objName + "." + methodName, args, types);
- if(channel.flushSupported())
+ if(this.channel.flushSupported())
{
- flushBlockGate.await(getStateTransferTimeout());
+ this.flushBlockGate.await(getStateTransferTimeout());
}
if (excludeSelf)
{
if( trace )
{
- log.trace("callMethodOnCluster(true), objName="+objName
- +", methodName="+methodName+", members="+jgotherMembers);
+ this.log.trace("callMethodOnCluster(true), objName="+objName
+ +", methodName="+methodName+", members="+this.jgotherMembers);
}
- rsp = dispatcher.callRemoteMethods(this.jgotherMembers, m, GroupRequest.GET_ALL, methodTimeout);
+ rsp = this.dispatcher.callRemoteMethods(this.jgotherMembers, m, GroupRequest.GET_ALL, methodTimeout);
}
else
{
if( trace )
{
- log.trace("callMethodOnCluster(false), objName="+objName
- +", methodName="+methodName+", members="+members);
+ this.log.trace("callMethodOnCluster(false), objName="+objName
+ +", methodName="+methodName+", members="+this.members);
}
- rsp = dispatcher.callRemoteMethods(null, m, GroupRequest.GET_ALL, methodTimeout);
+ rsp = this.dispatcher.callRemoteMethods(null, m, GroupRequest.GET_ALL, methodTimeout);
}
return processResponseList(rsp, trace);
@@ -1014,13 +1011,13 @@
public ArrayList callMethodOnCoordinatorNode(String objName, String methodName,
Object[] args, Class[] types,boolean excludeSelf, long methodTimeout) throws Exception
{
- boolean trace = log.isTraceEnabled();
+ boolean trace = this.log.isTraceEnabled();
MethodCall m = new MethodCall(objName + "." + methodName, args, types);
if( trace )
{
- log.trace("callMethodOnCoordinatorNode(false), objName="+objName
+ this.log.trace("callMethodOnCoordinatorNode(false), objName="+objName
+", methodName="+methodName);
}
@@ -1033,7 +1030,7 @@
coordinatorOnly.addElement(this.jgmembers.elementAt (0));
}
- RspList rsp = dispatcher.callRemoteMethods(coordinatorOnly, m, GroupRequest.GET_ALL, methodTimeout);
+ RspList rsp = this.dispatcher.callRemoteMethods(coordinatorOnly, m, GroupRequest.GET_ALL, methodTimeout);
return processResponseList(rsp, trace);
}
@@ -1057,17 +1054,17 @@
throw new IllegalArgumentException("targetNode " + targetNode + " is not an instance of " +
ClusterNodeImpl.class + " -- only targetNodes provided by this HAPartition should be used");
MethodCall m;
- boolean trace = log.isTraceEnabled();
+ boolean trace = this.log.isTraceEnabled();
if(types != null)
m=new MethodCall(serviceName + "." + methodName, args, types);
else
m=new MethodCall(serviceName + "." + methodName, args);
if( trace )
{
- log.trace("callMethodOnNode( objName="+serviceName
+ this.log.trace("callMethodOnNode( objName="+serviceName
+", methodName="+methodName);
}
- Object rc = dispatcher.callRemoteMethod(((ClusterNodeImpl)targetNode).getOriginalJGAddress(), m, GroupRequest.GET_FIRST, methodTimeout);
+ Object rc = this.dispatcher.callRemoteMethod(((ClusterNodeImpl)targetNode).getOriginalJGAddress(), m, GroupRequest.GET_FIRST, methodTimeout);
if (rc != null)
{
Object item = rc;
@@ -1083,14 +1080,14 @@
rc = item;
}
else if( trace )
- log.trace("Ignoring non-received response: "+response);
+ this.log.trace("Ignoring non-received response: "+response);
}
else
{
if (!(item instanceof NoHandlerForRPC))
rc = item;
else if( trace )
- log.trace("Ignoring NoHandlerForRPC");
+ this.log.trace("Ignoring NoHandlerForRPC");
}
}
return rc;
@@ -1116,17 +1113,17 @@
throw new IllegalArgumentException("targetNode " + targetNode + " is not an instance of " +
ClusterNodeImpl.class + " -- only targetNodes provided by this HAPartition should be used");
MethodCall m;
- boolean trace = log.isTraceEnabled();
+ boolean trace = this.log.isTraceEnabled();
if(types != null)
m=new MethodCall(serviceName + "." + methodName, args, types);
else
m=new MethodCall(serviceName + "." + methodName, args);
if( trace )
{
- log.trace("callAsyncMethodOnNode( objName="+serviceName
+ this.log.trace("callAsyncMethodOnNode( objName="+serviceName
+", methodName="+methodName);
}
- dispatcher.callRemoteMethod(((ClusterNodeImpl)targetNode).getOriginalJGAddress(), m, GroupRequest.GET_NONE, methodTimeout);
+ this.dispatcher.callRemoteMethod(((ClusterNodeImpl)targetNode).getOriginalJGAddress(), m, GroupRequest.GET_NONE, methodTimeout);
}
private ArrayList processResponseList(RspList rsp, boolean trace)
@@ -1148,14 +1145,14 @@
rtn.add(item);
}
else if( trace )
- log.trace("Ignoring non-received response: "+response);
+ this.log.trace("Ignoring non-received response: "+response);
}
else
{
if (!(item instanceof NoHandlerForRPC))
rtn.add(item);
else if( trace )
- log.trace("Ignoring NoHandlerForRPC");
+ this.log.trace("Ignoring NoHandlerForRPC");
}
}
@@ -1169,31 +1166,31 @@
public void callAsynchMethodOnCluster(String objName, String methodName,
Object[] args, Class[] types, boolean excludeSelf) throws Exception
{
- boolean trace = log.isTraceEnabled();
+ boolean trace = this.log.isTraceEnabled();
MethodCall m = new MethodCall(objName + "." + methodName, args, types);
- if(channel.flushSupported())
+ if(this.channel.flushSupported())
{
- flushBlockGate.await(getStateTransferTimeout());
+ this.flushBlockGate.await(getStateTransferTimeout());
}
if (excludeSelf)
{
if( trace )
{
- log.trace("callAsynchMethodOnCluster(true), objName="+objName
- +", methodName="+methodName+", members="+jgotherMembers);
+ this.log.trace("callAsynchMethodOnCluster(true), objName="+objName
+ +", methodName="+methodName+", members="+this.jgotherMembers);
}
- dispatcher.callRemoteMethods(this.jgotherMembers, m, GroupRequest.GET_NONE, getMethodCallTimeout());
+ this.dispatcher.callRemoteMethods(this.jgotherMembers, m, GroupRequest.GET_NONE, getMethodCallTimeout());
}
else
{
if( trace )
{
- log.trace("callAsynchMethodOnCluster(false), objName="+objName
- +", methodName="+methodName+", members="+members);
+ this.log.trace("callAsynchMethodOnCluster(false), objName="+objName
+ +", methodName="+methodName+", members="+this.members);
}
- dispatcher.callRemoteMethods(null, m, GroupRequest.GET_NONE, getMethodCallTimeout());
+ this.dispatcher.callRemoteMethods(null, m, GroupRequest.GET_NONE, getMethodCallTimeout());
}
}
@@ -1205,12 +1202,12 @@
//
public void subscribeToStateTransferEvents(String objectName, HAPartitionStateTransfer subscriber)
{
- stateHandlers.put(objectName, subscriber);
+ this.stateHandlers.put(objectName, subscriber);
}
public void unsubscribeFromStateTransferEvents(String objectName, HAPartitionStateTransfer subscriber)
{
- stateHandlers.remove(objectName);
+ this.stateHandlers.remove(objectName);
}
// *************************
@@ -1255,7 +1252,7 @@
public boolean getAllowSynchronousMembershipNotifications()
{
- return allowSyncListeners;
+ return this.allowSyncListeners;
}
public void setAllowSynchronousMembershipNotifications(boolean allowSync)
@@ -1268,7 +1265,7 @@
public void processEvent(Object event)
{
ViewChangeEvent vce = (ViewChangeEvent) event;
- notifyListeners(asynchListeners, vce.viewId, vce.allMembers,
+ notifyListeners(this.asynchListeners, vce.viewId, vce.allMembers,
vce.deadMembers, vce.newMembers, vce.originatingGroups);
}
@@ -1283,7 +1280,7 @@
public void setDistributedReplicantManagerImpl(DistributedReplicantManagerImpl drm)
{
- if (this.replicantManager != null && !(replicantManager == drm))
+ if (this.replicantManager != null && !(this.replicantManager == drm))
throw new IllegalStateException("DistributedReplicantManager already set");
this.replicantManager = drm;
@@ -1301,7 +1298,7 @@
ClusterNodeImpl matched = null;
for (ClusterNode member : getClusterNodes())
{
- if (member.equals(me))
+ if (member.equals(this.me))
{
if (matched == null)
{
@@ -1313,12 +1310,12 @@
{
// Two nodes in view match us; try to figure out which one isn't us
ClusterNodeImpl other = matched;
- if (other.getOriginalJGAddress().equals(((ClusterNodeImpl)me).getOriginalJGAddress()))
+ if (other.getOriginalJGAddress().equals(((ClusterNodeImpl)this.me).getOriginalJGAddress()))
{
other = (ClusterNodeImpl) member;
}
throw new IllegalStateException("Found member " + other +
- " in current view that duplicates us (" + me + "). This" +
+ " in current view that duplicates us (" + this.me + "). This" +
" node cannot join partition until duplicate member has" +
" been removed");
}
@@ -1349,7 +1346,7 @@
}
catch (NameNotFoundException e)
{
- log.debug ("creating Subcontext " + ctxName);
+ this.log.debug ("creating Subcontext " + ctxName);
ctx = ctx.createSubcontext (ctxName);
}
n = n.getSuffix (1);
@@ -1376,7 +1373,7 @@
if(newMembers == null) newMembers=new Vector();
Vector dead=(Vector)oldMembers.clone();
dead.removeAll(newMembers);
- log.debug("dead members: " + dead);
+ this.log.debug("dead members: " + dead);
return dead;
}
@@ -1399,7 +1396,7 @@
Vector allMembers, Vector deadMembers, Vector newMembers,
Vector originatingGroups)
{
- log.debug("Begin notifyListeners, viewID: "+viewID);
+ this.log.debug("Begin notifyListeners, viewID: "+viewID);
synchronized(theListeners)
{
// JBAS-3619 -- don't hold synch lock while notifying
@@ -1426,11 +1423,11 @@
catch (Throwable e)
{
// a problem in a listener should not prevent other members to receive the new view
- log.warn("HAMembershipListener callback failure: "+aListener, e);
+ this.log.warn("HAMembershipListener callback failure: "+aListener, e);
}
}
- log.debug("End notifyListeners, viewID: "+viewID);
+ this.log.debug("End notifyListeners, viewID: "+viewID);
}
/*
@@ -1441,7 +1438,7 @@
*/
public void setBindIntoJndi(boolean bind)
{
- bindIntoJndi = bind;
+ this.bindIntoJndi = bind;
}
/*
@@ -1451,7 +1448,7 @@
*/
public boolean getBindIntoJndi()
{
- return bindIntoJndi;
+ return this.bindIntoJndi;
}
@@ -1459,7 +1456,7 @@
public ThreadPool getThreadPool()
{
- return threadPool;
+ return this.threadPool;
}
public void setThreadPool(ThreadPool threadPool)
@@ -1486,7 +1483,7 @@
{
try
{
- history.add(new SimpleDateFormat().format (new Date()) + " : " + message);
+ this.history.add(new SimpleDateFormat().format (new Date()) + " : " + message);
}
catch (Exception ignored){}
}
@@ -1523,17 +1520,17 @@
public Cache getClusteredCache()
{
- return cache;
+ return this.cache;
}
public boolean getDeadlockDetection()
{
- return deadlock_detection;
+ return this.deadlock_detection;
}
public void setDeadlockDetection(boolean doit)
{
- deadlock_detection = doit;
+ this.deadlock_detection = doit;
}
public HAPartition getHAPartition()
@@ -1548,12 +1545,12 @@
public ChannelFactory getChannelFactory()
{
- return channelFactory;
+ return this.channelFactory;
}
public CacheManager getCacheManager()
{
- return cacheManager;
+ return this.cacheManager;
}
public void setCacheManager(CacheManager cacheManager)
@@ -1563,7 +1560,7 @@
public String getCacheConfigName()
{
- return cacheConfigName;
+ return this.cacheConfigName;
}
public void setCacheConfigName(String cacheConfigName)
@@ -1573,12 +1570,12 @@
public String getChannelStackName()
{
- return stackName;
+ return this.stackName;
}
public InetAddress getNodeAddress()
{
- return nodeAddress;
+ return this.nodeAddress;
}
public void setNodeAddress(InetAddress address)
@@ -1587,7 +1584,7 @@
}
public long getStateTransferTimeout() {
- return state_transfer_timeout;
+ return this.state_transfer_timeout;
}
public void setStateTransferTimeout(long timeout)
@@ -1596,7 +1593,7 @@
}
public long getMethodCallTimeout() {
- return method_call_timeout;
+ return this.method_call_timeout;
}
public void setMethodCallTimeout(long timeout)
@@ -1680,14 +1677,14 @@
{
logHistory ("getState called on partition");
- log.debug("getState called.");
+ ClusterPartition.this.log.debug("getState called.");
try
{
getStateInternal(stream);
}
catch (Exception ex)
{
- log.error("getState failed", ex);
+ ClusterPartition.this.log.error("getState failed", ex);
}
}
@@ -1709,14 +1706,14 @@
{
if (stream == null)
{
- log.debug("transferred serviceState is null (may be first member in cluster)");
+ ClusterPartition.this.log.debug("transferred serviceState is null (may be first member in cluster)");
}
else
{
setStateInternal(stream);
}
- isStateSet = true;
+ ClusterPartition.this.isStateSet = true;
}
catch (Throwable t)
{
@@ -1733,7 +1730,7 @@
{
logHistory ("getState called on partition");
- log.debug("getState called.");
+ ClusterPartition.this.log.debug("getState called.");
try
{
ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
@@ -1742,7 +1739,7 @@
}
catch (Exception ex)
{
- log.error("getState failed", ex);
+ ClusterPartition.this.log.error("getState failed", ex);
}
return null; // This will cause the receiver to get a "false" on the channel.getState() call
}
@@ -1767,7 +1764,7 @@
{
if (obj == null)
{
- log.debug("transferred serviceState is null (may be first member in cluster)");
+ ClusterPartition.this.log.debug("transferred serviceState is null (may be first member in cluster)");
}
else
{
@@ -1776,7 +1773,7 @@
bais.close();
}
- isStateSet = true;
+ ClusterPartition.this.isStateSet = true;
}
catch (Throwable t)
{
@@ -1832,7 +1829,7 @@
public Object objectFromByteBuffer(byte[] buf) throws Exception
{
- boolean trace = log.isTraceEnabled();
+ boolean trace = ClusterPartition.this.log.isTraceEnabled();
Object retval = objectFromByteBufferResponseInternal(buf);
// HAServiceResponse is only received when a scoped classloader is required for unmarshalling
if (!(retval instanceof HAServiceResponse))
@@ -1847,13 +1844,13 @@
boolean overrideCL = false;
try
{
- WeakReference<ClassLoader> weak = clmap.get(serviceName);
+ WeakReference<ClassLoader> weak = ClusterPartition.this.clmap.get(serviceName);
if (weak != null) // this should always be true since we only use HAServiceResponse when classloader is specified
{
previousCL = Thread.currentThread().getContextClassLoader();
ClassLoader loader = weak.get();
if( trace )
- log.trace("overriding response Thread ContextClassLoader for service " + serviceName);
+ ClusterPartition.this.log.trace("overriding response Thread ContextClassLoader for service " + serviceName);
overrideCL = true;
Thread.currentThread().setContextClassLoader(loader);
}
@@ -1865,7 +1862,7 @@
{
if (overrideCL == true)
{
- log.trace("resetting response classloader");
+ ClusterPartition.this.log.trace("resetting response classloader");
Thread.currentThread().setContextClassLoader(previousCL);
}
}
@@ -1903,17 +1900,17 @@
Object body = null;
Object retval = null;
Object handler = null;
- boolean trace = log.isTraceEnabled();
+ boolean trace = this.log.isTraceEnabled();
boolean overrideCL = false;
ClassLoader previousCL = null;
String service = null;
byte[] request_bytes = null;
if( trace )
- log.trace("Partition " + getPartitionName() + " received msg");
+ this.log.trace("Partition " + getPartitionName() + " received msg");
if(req == null || req.getBuffer() == null)
{
- log.warn("Partition " + getPartitionName() + " message or message buffer is null!");
+ this.log.warn("Partition " + getPartitionName() + " message or message buffer is null!");
return null;
}
@@ -1922,7 +1919,7 @@
Object wrapper = objectFromByteBufferInternal(req.getBuffer());
if(wrapper == null || !(wrapper instanceof Object[]))
{
- log.warn("Partition " + getPartitionName() + " message wrapper does not contain Object[] object!");
+ this.log.warn("Partition " + getPartitionName() + " message wrapper does not contain Object[] object!");
return null;
}
@@ -1932,28 +1929,28 @@
request_bytes = (byte[])temp[1];
// see if this node has registered to handle this service
- handler = rpcHandlers.get(service);
+ handler = ClusterPartition.this.rpcHandlers.get(service);
if (handler == null)
{
if( trace )
- log.trace("Partition " + getPartitionName() + " no rpc handler registered under service " + service);
+ this.log.trace("Partition " + getPartitionName() + " no rpc handler registered under service " + service);
return new NoHandlerForRPC();
}
}
catch(Exception e)
{
- log.warn("Partition " + getPartitionName() + " failed unserializing message buffer (msg=" + req + ")", e);
+ this.log.warn("Partition " + getPartitionName() + " failed unserializing message buffer (msg=" + req + ")", e);
return null;
}
try
{
// If client registered the service with a classloader, override the thread classloader here
- WeakReference<ClassLoader> weak = clmap.get(service);
+ WeakReference<ClassLoader> weak = ClusterPartition.this.clmap.get(service);
if (weak != null)
{
if( trace )
- log.trace("overriding Thread ContextClassLoader for RPC service " + service);
+ this.log.trace("overriding Thread ContextClassLoader for RPC service " + service);
previousCL = Thread.currentThread().getContextClassLoader();
ClassLoader loader = weak.get();
overrideCL = true;
@@ -1963,21 +1960,21 @@
}
catch (Exception e)
{
- log.warn("Partition " + getPartitionName() + " failed extracting message body from request bytes", e);
+ this.log.warn("Partition " + getPartitionName() + " failed extracting message body from request bytes", e);
return null;
}
finally
{
if (overrideCL)
{
- log.trace("resetting Thread ContextClassLoader");
+ this.log.trace("resetting Thread ContextClassLoader");
Thread.currentThread().setContextClassLoader(previousCL);
}
}
if(body == null || !(body instanceof MethodCall))
{
- log.warn("Partition " + getPartitionName() + " message does not contain a MethodCall object!");
+ this.log.warn("Partition " + getPartitionName() + " message does not contain a MethodCall object!");
return null;
}
@@ -1986,15 +1983,15 @@
String methodName = method_call.getName();
if( trace )
- log.trace("full methodName: " + methodName);
+ this.log.trace("full methodName: " + methodName);
int idx = methodName.lastIndexOf('.');
String handlerName = methodName.substring(0, idx);
String newMethodName = methodName.substring(idx + 1);
if( trace )
{
- log.trace("handlerName: " + handlerName + " methodName: " + newMethodName);
- log.trace("Handle: " + methodName);
+ this.log.trace("handlerName: " + handlerName + " methodName: " + newMethodName);
+ this.log.trace("Handle: " + methodName);
}
// prepare method call
@@ -2014,12 +2011,12 @@
retval = new HAServiceResponse(handlerName, retbytes);
}
if( trace )
- log.trace("rpc call return value: " + retval);
+ this.log.trace("rpc call return value: " + retval);
}
catch (Throwable t)
{
if( trace )
- log.trace("Partition " + getPartitionName() + " rpc call threw exception", t);
+ this.log.trace("Partition " + getPartitionName() + " rpc call threw exception", t);
retval = t;
}
@@ -2046,29 +2043,29 @@
public synchronized void close()
{
- isOpen = false;
+ this.isOpen = false;
}
public synchronized void open()
{
- ++generation;
- isOpen = true;
+ ++this.generation;
+ this.isOpen = true;
notifyAll();
}
// BLOCKS-UNTIL: opened-since(generation on entry)
public synchronized void await() throws InterruptedException
{
- int arrivalGeneration = generation;
- while(!isOpen && arrivalGeneration == generation)
+ int arrivalGeneration = this.generation;
+ while(!this.isOpen && arrivalGeneration == this.generation)
wait();
}
// BLOCKS-UNTIL: opened-since(generation on entry)
public synchronized void await(long timeout) throws InterruptedException
{
- int arrivalGeneration = generation;
- while(!isOpen && arrivalGeneration == generation)
+ int arrivalGeneration = this.generation;
+ while(!this.isOpen && arrivalGeneration == this.generation)
wait(timeout);
}
}
More information about the jboss-cvs-commits
mailing list