[jboss-cvs] JBoss Messaging SVN: r2766 - in trunk/src/main/org/jboss: messaging/core/plugin/postoffice/cluster and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Jun 7 08:44:03 EDT 2007
Author: timfox
Date: 2007-06-07 08:44:03 -0400 (Thu, 07 Jun 2007)
New Revision: 2766
Modified:
trunk/src/main/org/jboss/jms/server/bridge/Bridge.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
Log:
Mainly http://jira.jboss.com/jira/browse/JBMESSAGING-819
Modified: trunk/src/main/org/jboss/jms/server/bridge/Bridge.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/bridge/Bridge.java 2007-06-07 12:35:02 UTC (rev 2765)
+++ trunk/src/main/org/jboss/jms/server/bridge/Bridge.java 2007-06-07 12:44:03 UTC (rev 2766)
@@ -1255,7 +1255,7 @@
{
public void run()
{
- if (trace) { log.trace("**** Failure handler running"); }
+ if (trace) { log.trace("Failure handler running"); }
// Clear the messages
messages.clear();
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2007-06-07 12:35:02 UTC (rev 2765)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2007-06-07 12:44:03 UTC (rev 2766)
@@ -85,6 +85,8 @@
import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
+import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock;
+import EDU.oswego.cs.dl.util.concurrent.WriterPreferenceReadWriteLock;
/**
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
@@ -160,15 +162,7 @@
private volatile boolean started;
- //FIXME using a stopping flag is not a good approach and introduces a race condition
- //http://jira.jboss.org/jira/browse/JBMESSAGING-819
- //the code can check stopping and find it to be false, then the service can stop, setting stopping to true
- //then actually stopping the post office, then the same thread that checked stopping continues and performs
- //its action only to find the service stopped
- //Should use a read-write lock instead
- //One way to minimise the chance of the race happening is to sleep for a little while after setting stopping to true
- //before actually stopping the service (see below)
- private volatile boolean stopping;
+ private ReadWriteLock stoplock;
private JChannelFactory jChannelFactory;
@@ -278,107 +272,128 @@
this.jChannelFactory = JChannelFactory;
- this.pooledExecutor = new PooledExecutor(new LinkedQueue(), poolSize);
+ pooledExecutor = new PooledExecutor(new LinkedQueue(), poolSize);
- this.pooledExecutor.setMinimumPoolSize(poolSize);
+ pooledExecutor.setMinimumPoolSize(poolSize);
+
+ stoplock = new WriterPreferenceReadWriteLock();
}
// MessagingComponent overrides -----------------------------------------------------------------
public synchronized void start() throws Exception
{
- if (started)
- {
- log.warn("Attempt to start() but " + this + " is already started");
- }
-
- if (trace) { log.trace(this + " starting"); }
-
- this.syncChannel = jChannelFactory.createSyncChannel();
- this.asyncChannel = jChannelFactory.createASyncChannel();
-
- // We don't want to receive local messages on any of the channels
- syncChannel.setOpt(Channel.LOCAL, Boolean.FALSE);
-
- asyncChannel.setOpt(Channel.LOCAL, Boolean.FALSE);
-
- MessageListener cml = new ControlMessageListener();
- MembershipListener ml = new ControlMembershipListener();
- RequestHandler rh = new PostOfficeRequestHandler();
-
- // register as a listener for nodeid-adress mapping events
- nodeAddressMapListener = new NodeAddressMapListener();
-
- registerListener(nodeAddressMapListener);
-
- this.controlMessageDispatcher = new MessageDispatcher(syncChannel, cml, ml, rh, true);
-
- Receiver r = new DataReceiver();
- asyncChannel.setReceiver(r);
-
- syncChannel.connect(groupName);
- asyncChannel.connect(groupName);
-
- super.start();
-
- //Sanity check - we check there aren't any other nodes already in the cluster with the same node id
- if (knowAboutNodeId(currentNodeId))
- {
- throw new IllegalArgumentException("Cannot start post office since there is already a post office in the " +
- "cluster with the same node id (" + this.currentNodeId + "). " +
- "Are you sure you have given each node a unique node id during installation?");
- }
-
- Address syncAddress = syncChannel.getLocalAddress();
- Address asyncAddress = asyncChannel.getLocalAddress();
- PostOfficeAddressInfo info = new PostOfficeAddressInfo(syncAddress, asyncAddress);
- put(ADDRESS_INFO_KEY, info);
-
- statsSender.start();
-
- started = true;
-
- log.debug(this + " started");
+ stoplock.writeLock().acquire();
+
+ try
+ {
+ if (started)
+ {
+ log.warn("Attempt to start() but " + this + " is already started");
+
+ return;
+ }
+
+ if (trace) { log.trace(this + " starting"); }
+
+ this.syncChannel = jChannelFactory.createSyncChannel();
+ this.asyncChannel = jChannelFactory.createASyncChannel();
+
+ // We don't want to receive local messages on any of the channels
+ syncChannel.setOpt(Channel.LOCAL, Boolean.FALSE);
+
+ asyncChannel.setOpt(Channel.LOCAL, Boolean.FALSE);
+
+ MessageListener cml = new ControlMessageListener();
+ MembershipListener ml = new ControlMembershipListener();
+ RequestHandler rh = new PostOfficeRequestHandler();
+
+ // register as a listener for nodeid-adress mapping events
+ nodeAddressMapListener = new NodeAddressMapListener();
+
+ registerListener(nodeAddressMapListener);
+
+ this.controlMessageDispatcher = new MessageDispatcher(syncChannel, cml, ml, rh, true);
+
+ Receiver r = new DataReceiver();
+ asyncChannel.setReceiver(r);
+
+ syncChannel.connect(groupName);
+ asyncChannel.connect(groupName);
+
+ super.start();
+
+ //Sanity check - we check there aren't any other nodes already in the cluster with the same node id
+ if (knowAboutNodeId(currentNodeId))
+ {
+ throw new IllegalArgumentException("Cannot start post office since there is already a post office in the " +
+ "cluster with the same node id (" + this.currentNodeId + "). " +
+ "Are you sure you have given each node a unique node id during installation?");
+ }
+
+ Address syncAddress = syncChannel.getLocalAddress();
+ Address asyncAddress = asyncChannel.getLocalAddress();
+ PostOfficeAddressInfo info = new PostOfficeAddressInfo(syncAddress, asyncAddress);
+ put(ADDRESS_INFO_KEY, info);
+
+ statsSender.start();
+
+ started = true;
+
+ log.debug(this + " started");
+ }
+ finally
+ {
+ stoplock.writeLock().release();
+ }
}
public synchronized void stop(boolean sendNotification) throws Exception
{
if (trace) { log.trace(this + " stopping"); }
-
- if (!started)
+
+ stoplock.writeLock().acquire();
+
+ try
+ {
+ if (!started)
+ {
+ log.warn("Attempt to stop() but " + this + " is not started");
+
+ return;
+ }
+
+ //Need to send this *before* stopping
+ syncSendRequest(new LeaveClusterRequest(getNodeId()));
+
+ //FIXME http://jira.jboss.org/jira/browse/JBMESSAGING-819 this is a temporary kludge for now
+ Thread.sleep(1000);
+
+ statsSender.stop();
+
+ super.stop(sendNotification);
+
+ pooledExecutor.shutdownAfterProcessingCurrentlyQueuedTasks();
+
+ // TODO in case of shared channels, we should have some sort of unsetReceiver(r)
+ asyncChannel.setReceiver(null);
+
+ unregisterListener(nodeAddressMapListener);
+
+ // TODO - what happens if we share the channel? Don't we mess up the other applications this way?
+ syncChannel.close();
+
+ // TODO - what happens if we share the channel? Don't we mess up the other applications this way?
+ asyncChannel.close();
+
+ started = false;
+
+ log.debug(this + " stopped");
+ }
+ finally
{
- log.warn("Attempt to stop() but " + this + " is not started");
- return;
+ stoplock.writeLock().release();
}
-
- //Need to send this *before* stopping
- syncSendRequest(new LeaveClusterRequest(getNodeId()));
-
- stopping = true;
-
- //FIXME http://jira.jboss.org/jira/browse/JBMESSAGING-819 this is a temporary kludge for now
- Thread.sleep(1000);
-
- statsSender.stop();
-
- super.stop(sendNotification);
-
- pooledExecutor.shutdownAfterProcessingCurrentlyQueuedTasks();
-
- // TODO in case of shared channels, we should have some sort of unsetReceiver(r)
- asyncChannel.setReceiver(null);
-
- unregisterListener(nodeAddressMapListener);
-
- // TODO - what happens if we share the channel? Don't we mess up the other applications this way?
- syncChannel.close();
-
- // TODO - what happens if we share the channel? Don't we mess up the other applications this way?
- asyncChannel.close();
-
- started = false;
-
- log.debug(this + " stopped");
}
// NotificationBroadcaster implementation -------------------------------------------------------
@@ -713,15 +728,25 @@
*/
public void asyncSendRequest(ClusterRequest request) throws Exception
{
- if (stopping)
- {
- return;
- }
+ stoplock.readLock().acquire();
+
+ try
+ {
+ if (!started)
+ {
+ return;
+ }
- if (trace) { log.trace(this + " sending asynchronously " + request + " to group"); }
-
- byte[] bytes = writeRequest(request);
- asyncChannel.send(new Message(null, null, bytes));
+ if (trace) { log.trace(this + " sending asynchronously " + request + " to group"); }
+
+ byte[] bytes = writeRequest(request);
+
+ asyncChannel.send(new Message(null, null, bytes));
+ }
+ finally
+ {
+ stoplock.readLock().release();
+ }
}
/*
@@ -729,22 +754,31 @@
*/
public void asyncSendRequest(ClusterRequest request, int nodeId) throws Exception
{
- if (stopping)
- {
- return;
- }
-
- Address address = this.getAddressForNodeId(nodeId, false);
-
- if (address == null)
- {
- throw new IllegalArgumentException("Cannot find address for node " + nodeId);
- }
-
- if (trace) { log.trace(this + " sending asynchronously " + request + " to node " + nodeId + "/" + address); }
-
- byte[] bytes = writeRequest(request);
- asyncChannel.send(new Message(address, null, bytes));
+ stoplock.readLock().acquire();
+
+ try
+ {
+ if (!started)
+ {
+ return;
+ }
+
+ Address address = this.getAddressForNodeId(nodeId, false);
+
+ if (address == null)
+ {
+ throw new IllegalArgumentException("Cannot find address for node " + nodeId);
+ }
+
+ if (trace) { log.trace(this + " sending asynchronously " + request + " to node " + nodeId + "/" + address); }
+
+ byte[] bytes = writeRequest(request);
+ asyncChannel.send(new Message(address, null, bytes));
+ }
+ finally
+ {
+ stoplock.readLock().release();
+ }
}
/*
@@ -1746,18 +1780,27 @@
*/
private void syncSendRequest(ClusterRequest request) throws Exception
{
- if (stopping)
- {
- return;
- }
-
- if (trace) { log.trace(this + " sending synch request " + request); }
-
- Message message = new Message(null, null, writeRequest(request));
-
- controlMessageDispatcher.castMessage(null, message, GroupRequest.GET_ALL, castTimeout);
-
- if (trace) { log.trace(this + " request sent OK"); }
+ stoplock.readLock().acquire();
+
+ try
+ {
+ if (!started)
+ {
+ return;
+ }
+
+ if (trace) { log.trace(this + " sending synch request " + request); }
+
+ Message message = new Message(null, null, writeRequest(request));
+
+ controlMessageDispatcher.castMessage(null, message, GroupRequest.GET_ALL, castTimeout);
+
+ if (trace) { log.trace(this + " request sent OK"); }
+ }
+ finally
+ {
+ stoplock.readLock().release();
+ }
}
//TODO - this is a bit tortuous - needs optimising
@@ -2249,78 +2292,102 @@
{
public byte[] getState()
{
- if (stopping)
- {
- return null;
- }
-
- try
- {
- lock.writeLock().acquire();
- }
- catch (InterruptedException e)
- {
- log.error("Thread Interrupted", e);
- }
- try
- {
- if (trace) { log.trace(DefaultClusteredPostOffice.this + ".ControlMessageListener got state"); }
- return getStateAsBytes();
- }
- catch (Exception e)
- {
- log.error("Caught Exception in MessageListener", e);
- IllegalStateException e2 = new IllegalStateException(e.getMessage());
- e2.setStackTrace(e.getStackTrace());
- throw e2;
- }
- finally
- {
- lock.writeLock().release();
- }
+ try
+ {
+ //We need to get the stop lock
+ stoplock.readLock().acquire();
+
+ try
+ {
+ if (!started)
+ {
+ return null;
+ }
+
+ //And the general lock
+ lock.writeLock().acquire();
+
+ try
+ {
+ if (trace) { log.trace(DefaultClusteredPostOffice.this + ".ControlMessageListener got state"); }
+
+ return getStateAsBytes();
+ }
+ catch (Exception e)
+ {
+ log.error("Caught Exception in MessageListener", e);
+ IllegalStateException e2 = new IllegalStateException(e.getMessage());
+ e2.setStackTrace(e.getStackTrace());
+ throw e2;
+ }
+ finally
+ {
+ lock.writeLock().release();
+ }
+ }
+ finally
+ {
+ stoplock.readLock().release();
+ }
+ }
+ catch (InterruptedException e)
+ {
+ log.error("Thread interrupted", e);
+
+ return null;
+ }
}
public void receive(Message message)
{
- if (stopping)
- {
- return;
- }
}
public void setState(byte[] bytes)
{
- if (stopping)
- {
- return;
- }
-
if (bytes != null)
- {
- try
- {
- lock.writeLock().acquire();
+ {
+ try
+ {
+ //We need to get the stop lock
+ stoplock.readLock().acquire();
+
+ try
+ {
+ if (!started)
+ {
+ return;
+ }
+
+ //And the general lock
+ lock.writeLock().acquire();
+
+ try
+ {
+ processStateBytes(bytes);
+
+ if (trace) { log.trace(DefaultClusteredPostOffice.this + ".ControlMessageListener has set state"); }
+ }
+ catch (Exception e)
+ {
+ log.error("Caught Exception in MessageListener", e);
+ IllegalStateException e2 = new IllegalStateException(e.getMessage());
+ e2.setStackTrace(e.getStackTrace());
+ throw e2;
+ }
+ finally
+ {
+ lock.writeLock().release();
+ }
+ }
+ finally
+ {
+ stoplock.readLock().release();
+ }
}
- catch (InterruptedException e)
- {
- log.error("Thread interrupted", e);
- }
- try
- {
- processStateBytes(bytes);
- if (trace) { log.trace(DefaultClusteredPostOffice.this + ".ControlMessageListener has set state"); }
- }
- catch (Exception e)
- {
- log.error("Caught Exception in MessageListener", e);
- IllegalStateException e2 = new IllegalStateException(e.getMessage());
- e2.setStackTrace(e.getStackTrace());
- throw e2;
- }
- finally
- {
- lock.writeLock().release();
- }
+ catch (InterruptedException e)
+ {
+ log.error("Thread interrupted", e);
+ }
}
synchronized (setStateLock)
@@ -2348,19 +2415,29 @@
public void viewAccepted(View newView)
{
- if (stopping)
- {
- return;
- }
-
+
try
{
- // We queue up changes and execute them asynchronously.
- // This is because JGroups will not let us do stuff like send synch messages using the
- // same thread that delivered the view change and this is what we need to do in
- // failover, for example.
-
- viewExecutor.execute(new HandleViewAcceptedRunnable(newView));
+ stoplock.readLock().acquire();
+
+ try
+ {
+ if (!started)
+ {
+ return;
+ }
+
+ // We queue up changes and execute them asynchronously.
+ // This is because JGroups will not let us do stuff like send synch messages using the
+ // same thread that delivered the view change and this is what we need to do in
+ // failover, for example.
+
+ viewExecutor.execute(new HandleViewAcceptedRunnable(newView));
+ }
+ finally
+ {
+ stoplock.readLock().release();
+ }
}
catch (InterruptedException e)
{
@@ -2490,28 +2567,47 @@
{
public Object handle(Message message)
{
- if (stopping)
- {
- return null;
- }
-
- if (trace) { log.trace(DefaultClusteredPostOffice.this + ".RequestHandler received " + message + " on the SYNC channel"); }
-
- try
- {
- byte[] bytes = message.getBuffer();
-
- ClusterRequest request = readRequest(bytes);
-
- return request.execute(DefaultClusteredPostOffice.this);
- }
- catch (Throwable e)
- {
- log.error("Caught Exception in RequestHandler", e);
- IllegalStateException e2 = new IllegalStateException(e.getMessage());
- e2.setStackTrace(e.getStackTrace());
- throw e2;
- }
+ try
+ {
+
+ stoplock.readLock().acquire();
+
+ try
+ {
+ if (!started)
+ {
+ return null;
+ }
+
+ if (trace) { log.trace(DefaultClusteredPostOffice.this + ".RequestHandler received " + message + " on the SYNC channel"); }
+
+ try
+ {
+ byte[] bytes = message.getBuffer();
+
+ ClusterRequest request = readRequest(bytes);
+
+ return request.execute(DefaultClusteredPostOffice.this);
+ }
+ catch (Throwable e)
+ {
+ log.error("Caught Exception in RequestHandler", e);
+ IllegalStateException e2 = new IllegalStateException(e.getMessage());
+ e2.setStackTrace(e.getStackTrace());
+ throw e2;
+ }
+ }
+ finally
+ {
+ stoplock.readLock().release();
+ }
+ }
+ catch (InterruptedException e)
+ {
+ log.error("Thread interrupted", e);
+
+ return null;
+ }
}
}
More information about the jboss-cvs-commits
mailing list