[jboss-cvs] JBoss Messaging SVN: r2782 - trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Jun 14 08:16:17 EDT 2007
Author: timfox
Date: 2007-06-14 08:16:17 -0400 (Thu, 14 Jun 2007)
New Revision: 2782
Modified:
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
Log:
Reverted http://jira.jboss.com/jira/browse/JBMESSAGING-819
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2007-06-13 18:15:18 UTC (rev 2781)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2007-06-14 12:16:17 UTC (rev 2782)
@@ -85,8 +85,6 @@
import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
-import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock;
-import EDU.oswego.cs.dl.util.concurrent.ReentrantWriterPreferenceReadWriteLock;
/**
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
@@ -162,7 +160,15 @@
private volatile boolean started;
- private ReadWriteLock stoplock;
+ //FIXME using a stopping flag is not a good approach and introduces a race condition
+ //http://jira.jboss.org/jira/browse/JBMESSAGING-819
+ //the code can check stopping and find it to be false, then the service can stop, setting stopping to true
+ //then actually stopping the post office, then the same thread that checked stopping continues and performs
+ //its action only to find the service stopped
+ //Should use a read-write lock instead
+ //One way to minimise the chance of the race happening is to sleep for a little while after setting stopping to true
+ //before actually stopping the service (see below)
+ private volatile boolean stopping;
private JChannelFactory jChannelFactory;
@@ -272,129 +278,107 @@
this.jChannelFactory = JChannelFactory;
- pooledExecutor = new PooledExecutor(new LinkedQueue(), poolSize);
+ this.pooledExecutor = new PooledExecutor(new LinkedQueue(), poolSize);
- pooledExecutor.setMinimumPoolSize(poolSize);
-
- stoplock = new ReentrantWriterPreferenceReadWriteLock();
+ this.pooledExecutor.setMinimumPoolSize(poolSize);
}
// MessagingComponent overrides -----------------------------------------------------------------
public synchronized void start() throws Exception
{
- stoplock.writeLock().acquire();
-
- try
- {
- if (started)
- {
- log.warn("Attempt to start() but " + this + " is already started");
-
- return;
- }
-
- if (trace) { log.trace(this + " starting"); }
-
- //We set started = true at the beginning since otherwise when state arrives it will get rejected
- started = true;
-
- this.syncChannel = jChannelFactory.createSyncChannel();
- this.asyncChannel = jChannelFactory.createASyncChannel();
-
- // We don't want to receive local messages on any of the channels
- syncChannel.setOpt(Channel.LOCAL, Boolean.FALSE);
-
- asyncChannel.setOpt(Channel.LOCAL, Boolean.FALSE);
-
- MessageListener cml = new ControlMessageListener();
- MembershipListener ml = new ControlMembershipListener();
- RequestHandler rh = new PostOfficeRequestHandler();
-
- // register as a listener for nodeid-adress mapping events
- nodeAddressMapListener = new NodeAddressMapListener();
-
- registerListener(nodeAddressMapListener);
-
- this.controlMessageDispatcher = new MessageDispatcher(syncChannel, cml, ml, rh, true);
-
- Receiver r = new DataReceiver();
- asyncChannel.setReceiver(r);
-
- syncChannel.connect(groupName);
- asyncChannel.connect(groupName);
-
- super.start();
-
- //Sanity check - we check there aren't any other nodes already in the cluster with the same node id
- if (knowAboutNodeId(currentNodeId))
- {
- throw new IllegalArgumentException("Cannot start post office since there is already a post office in the " +
- "cluster with the same node id (" + this.currentNodeId + "). " +
- "Are you sure you have given each node a unique node id during installation?");
- }
-
- Address syncAddress = syncChannel.getLocalAddress();
- Address asyncAddress = asyncChannel.getLocalAddress();
- PostOfficeAddressInfo info = new PostOfficeAddressInfo(syncAddress, asyncAddress);
- put(ADDRESS_INFO_KEY, info);
-
- statsSender.start();
-
- log.debug(this + " started");
- }
- finally
- {
- stoplock.writeLock().release();
- }
+ if (started)
+ {
+ log.warn("Attempt to start() but " + this + " is already started");
+ }
+
+ if (trace) { log.trace(this + " starting"); }
+
+ this.syncChannel = jChannelFactory.createSyncChannel();
+ this.asyncChannel = jChannelFactory.createASyncChannel();
+
+ // We don't want to receive local messages on any of the channels
+ syncChannel.setOpt(Channel.LOCAL, Boolean.FALSE);
+
+ asyncChannel.setOpt(Channel.LOCAL, Boolean.FALSE);
+
+ MessageListener cml = new ControlMessageListener();
+ MembershipListener ml = new ControlMembershipListener();
+ RequestHandler rh = new PostOfficeRequestHandler();
+
+ // register as a listener for nodeid-adress mapping events
+ nodeAddressMapListener = new NodeAddressMapListener();
+
+ registerListener(nodeAddressMapListener);
+
+ this.controlMessageDispatcher = new MessageDispatcher(syncChannel, cml, ml, rh, true);
+
+ Receiver r = new DataReceiver();
+ asyncChannel.setReceiver(r);
+
+ syncChannel.connect(groupName);
+ asyncChannel.connect(groupName);
+
+ super.start();
+
+ //Sanity check - we check there aren't any other nodes already in the cluster with the same node id
+ if (knowAboutNodeId(currentNodeId))
+ {
+ throw new IllegalArgumentException("Cannot start post office since there is already a post office in the " +
+ "cluster with the same node id (" + this.currentNodeId + "). " +
+ "Are you sure you have given each node a unique node id during installation?");
+ }
+
+ Address syncAddress = syncChannel.getLocalAddress();
+ Address asyncAddress = asyncChannel.getLocalAddress();
+ PostOfficeAddressInfo info = new PostOfficeAddressInfo(syncAddress, asyncAddress);
+ put(ADDRESS_INFO_KEY, info);
+
+ statsSender.start();
+
+ started = true;
+
+ log.debug(this + " started");
}
public synchronized void stop(boolean sendNotification) throws Exception
{
if (trace) { log.trace(this + " stopping"); }
-
- stoplock.writeLock().acquire();
-
- try
- {
- if (!started)
- {
- log.warn("Attempt to stop() but " + this + " is not started");
-
- return;
- }
-
- started = false;
-
- //Need to send this *before* stopping
- syncSendRequest(new LeaveClusterRequest(getNodeId()));
-
- //FIXME http://jira.jboss.org/jira/browse/JBMESSAGING-819 this is a temporary kludge for now
- Thread.sleep(1000);
-
- statsSender.stop();
-
- super.stop(sendNotification);
-
- pooledExecutor.shutdownAfterProcessingCurrentlyQueuedTasks();
-
- // TODO in case of shared channels, we should have some sort of unsetReceiver(r)
- asyncChannel.setReceiver(null);
-
- unregisterListener(nodeAddressMapListener);
-
- // TODO - what happens if we share the channel? Don't we mess up the other applications this way?
- syncChannel.close();
-
- // TODO - what happens if we share the channel? Don't we mess up the other applications this way?
- asyncChannel.close();
-
- log.debug(this + " stopped");
- }
- finally
+
+ if (!started)
{
- stoplock.writeLock().release();
+ log.warn("Attempt to stop() but " + this + " is not started");
+ return;
}
+
+ //Need to send this *before* stopping
+ syncSendRequest(new LeaveClusterRequest(getNodeId()));
+
+ stopping = true;
+
+ //FIXME http://jira.jboss.org/jira/browse/JBMESSAGING-819 this is a temporary kludge for now
+ Thread.sleep(1000);
+
+ statsSender.stop();
+
+ super.stop(sendNotification);
+
+ pooledExecutor.shutdownAfterProcessingCurrentlyQueuedTasks();
+
+ // TODO in case of shared channels, we should have some sort of unsetReceiver(r)
+ asyncChannel.setReceiver(null);
+
+ unregisterListener(nodeAddressMapListener);
+
+ // TODO - what happens if we share the channel? Don't we mess up the other applications this way?
+ syncChannel.close();
+
+ // TODO - what happens if we share the channel? Don't we mess up the other applications this way?
+ asyncChannel.close();
+
+ started = false;
+
+ log.debug(this + " stopped");
}
// NotificationBroadcaster implementation -------------------------------------------------------
@@ -729,25 +713,15 @@
*/
public void asyncSendRequest(ClusterRequest request) throws Exception
{
- stoplock.readLock().acquire();
-
- try
- {
- if (!started)
- {
- return;
- }
+ if (stopping)
+ {
+ return;
+ }
- if (trace) { log.trace(this + " sending asynchronously " + request + " to group"); }
-
- byte[] bytes = writeRequest(request);
-
- asyncChannel.send(new Message(null, null, bytes));
- }
- finally
- {
- stoplock.readLock().release();
- }
+ if (trace) { log.trace(this + " sending asynchronously " + request + " to group"); }
+
+ byte[] bytes = writeRequest(request);
+ asyncChannel.send(new Message(null, null, bytes));
}
/*
@@ -755,31 +729,22 @@
*/
public void asyncSendRequest(ClusterRequest request, int nodeId) throws Exception
{
- stoplock.readLock().acquire();
-
- try
- {
- if (!started)
- {
- return;
- }
-
- Address address = this.getAddressForNodeId(nodeId, false);
-
- if (address == null)
- {
- throw new IllegalArgumentException("Cannot find address for node " + nodeId);
- }
-
- if (trace) { log.trace(this + " sending asynchronously " + request + " to node " + nodeId + "/" + address); }
-
- byte[] bytes = writeRequest(request);
- asyncChannel.send(new Message(address, null, bytes));
- }
- finally
- {
- stoplock.readLock().release();
- }
+ if (stopping)
+ {
+ return;
+ }
+
+ Address address = this.getAddressForNodeId(nodeId, false);
+
+ if (address == null)
+ {
+ throw new IllegalArgumentException("Cannot find address for node " + nodeId);
+ }
+
+ if (trace) { log.trace(this + " sending asynchronously " + request + " to node " + nodeId + "/" + address); }
+
+ byte[] bytes = writeRequest(request);
+ asyncChannel.send(new Message(address, null, bytes));
}
/*
@@ -1781,27 +1746,18 @@
*/
private void syncSendRequest(ClusterRequest request) throws Exception
{
- stoplock.readLock().acquire();
-
- try
- {
- if (!started)
- {
- return;
- }
-
- if (trace) { log.trace(this + " sending synch request " + request); }
-
- Message message = new Message(null, null, writeRequest(request));
-
- controlMessageDispatcher.castMessage(null, message, GroupRequest.GET_ALL, castTimeout);
-
- if (trace) { log.trace(this + " request sent OK"); }
- }
- finally
- {
- stoplock.readLock().release();
- }
+ if (stopping)
+ {
+ return;
+ }
+
+ if (trace) { log.trace(this + " sending synch request " + request); }
+
+ Message message = new Message(null, null, writeRequest(request));
+
+ controlMessageDispatcher.castMessage(null, message, GroupRequest.GET_ALL, castTimeout);
+
+ if (trace) { log.trace(this + " request sent OK"); }
}
//TODO - this is a bit tortuous - needs optimising
@@ -2293,95 +2249,82 @@
{
public byte[] getState()
{
- try
- {
- //We need to get the stop lock
- stoplock.readLock().acquire();
-
- try
- {
- if (!started)
- {
- return null;
- }
-
- //And the general lock
- lock.writeLock().acquire();
-
- try
- {
- if (trace) { log.trace(DefaultClusteredPostOffice.this + ".ControlMessageListener got state"); }
-
- return getStateAsBytes();
- }
- catch (Exception e)
- {
- log.error("Caught Exception in MessageListener", e);
- IllegalStateException e2 = new IllegalStateException(e.getMessage());
- e2.setStackTrace(e.getStackTrace());
- throw e2;
- }
- finally
- {
- lock.writeLock().release();
- }
- }
- finally
- {
- stoplock.readLock().release();
- }
- }
- catch (InterruptedException e)
- {
- log.error("Thread interrupted", e);
-
- return null;
- }
+ if (stopping)
+ {
+ return null;
+ }
+
+ try
+ {
+ lock.writeLock().acquire();
+ }
+ catch (InterruptedException e)
+ {
+ log.error("Thread Interrupted", e);
+ }
+ try
+ {
+ if (trace) { log.trace(DefaultClusteredPostOffice.this + ".ControlMessageListener got state"); }
+ return getStateAsBytes();
+ }
+ catch (Exception e)
+ {
+ log.error("Caught Exception in MessageListener", e);
+ IllegalStateException e2 = new IllegalStateException(e.getMessage());
+ e2.setStackTrace(e.getStackTrace());
+ throw e2;
+ }
+ finally
+ {
+ lock.writeLock().release();
+ }
}
public void receive(Message message)
{
+ if (stopping)
+ {
+ return;
+ }
}
public void setState(byte[] bytes)
{
- if (bytes != null)
- {
- try
- {
- log.info("Receiving state!!!");
-
- lock.writeLock().acquire();
+ if (stopping)
+ {
+ return;
+ }
- try
- {
- log.info("Processing state!");
-
- processStateBytes(bytes);
-
- if (trace) { log.trace(DefaultClusteredPostOffice.this + ".ControlMessageListener has set state"); }
- }
- catch (Exception e)
- {
- log.error("Caught Exception in MessageListener", e);
- IllegalStateException e2 = new IllegalStateException(e.getMessage());
- e2.setStackTrace(e.getStackTrace());
- throw e2;
- }
- finally
- {
- lock.writeLock().release();
- }
+ if (bytes != null)
+ {
+ try
+ {
+ lock.writeLock().acquire();
}
- catch (InterruptedException e)
- {
- log.error("Thread interrupted", e);
- }
+ catch (InterruptedException e)
+ {
+ log.error("Thread interrupted", e);
+ }
+ try
+ {
+ processStateBytes(bytes);
+ if (trace) { log.trace(DefaultClusteredPostOffice.this + ".ControlMessageListener has set state"); }
+ }
+ catch (Exception e)
+ {
+ log.error("Caught Exception in MessageListener", e);
+ IllegalStateException e2 = new IllegalStateException(e.getMessage());
+ e2.setStackTrace(e.getStackTrace());
+ throw e2;
+ }
+ finally
+ {
+ lock.writeLock().release();
+ }
}
synchronized (setStateLock)
{
- log.info("notifying set state lock");
stateSet = true;
setStateLock.notify();
}
@@ -2405,29 +2348,19 @@
public void viewAccepted(View newView)
{
-
+ if (stopping)
+ {
+ return;
+ }
+
try
{
- stoplock.readLock().acquire();
-
- try
- {
- if (!started)
- {
- return;
- }
-
- // We queue up changes and execute them asynchronously.
- // This is because JGroups will not let us do stuff like send synch messages using the
- // same thread that delivered the view change and this is what we need to do in
- // failover, for example.
-
- viewExecutor.execute(new HandleViewAcceptedRunnable(newView));
- }
- finally
- {
- stoplock.readLock().release();
- }
+ // We queue up changes and execute them asynchronously.
+ // This is because JGroups will not let us do stuff like send synch messages using the
+ // same thread that delivered the view change and this is what we need to do in
+ // failover, for example.
+
+ viewExecutor.execute(new HandleViewAcceptedRunnable(newView));
}
catch (InterruptedException e)
{
@@ -2557,47 +2490,28 @@
{
public Object handle(Message message)
{
- try
- {
-
- stoplock.readLock().acquire();
-
- try
- {
- if (!started)
- {
- return null;
- }
-
- if (trace) { log.trace(DefaultClusteredPostOffice.this + ".RequestHandler received " + message + " on the SYNC channel"); }
-
- try
- {
- byte[] bytes = message.getBuffer();
-
- ClusterRequest request = readRequest(bytes);
-
- return request.execute(DefaultClusteredPostOffice.this);
- }
- catch (Throwable e)
- {
- log.error("Caught Exception in RequestHandler", e);
- IllegalStateException e2 = new IllegalStateException(e.getMessage());
- e2.setStackTrace(e.getStackTrace());
- throw e2;
- }
- }
- finally
- {
- stoplock.readLock().release();
- }
- }
- catch (InterruptedException e)
- {
- log.error("Thread interrupted", e);
-
- return null;
- }
+ if (stopping)
+ {
+ return null;
+ }
+
+ if (trace) { log.trace(DefaultClusteredPostOffice.this + ".RequestHandler received " + message + " on the SYNC channel"); }
+
+ try
+ {
+ byte[] bytes = message.getBuffer();
+
+ ClusterRequest request = readRequest(bytes);
+
+ return request.execute(DefaultClusteredPostOffice.this);
+ }
+ catch (Throwable e)
+ {
+ log.error("Caught Exception in RequestHandler", e);
+ IllegalStateException e2 = new IllegalStateException(e.getMessage());
+ e2.setStackTrace(e.getStackTrace());
+ throw e2;
+ }
}
}
More information about the jboss-cvs-commits
mailing list