[jboss-cvs] JBoss Messaging SVN: r2766 - in trunk/src/main/org/jboss: messaging/core/plugin/postoffice/cluster and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Jun 7 08:44:03 EDT 2007


Author: timfox
Date: 2007-06-07 08:44:03 -0400 (Thu, 07 Jun 2007)
New Revision: 2766

Modified:
   trunk/src/main/org/jboss/jms/server/bridge/Bridge.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
Log:
Mainly http://jira.jboss.com/jira/browse/JBMESSAGING-819


Modified: trunk/src/main/org/jboss/jms/server/bridge/Bridge.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/bridge/Bridge.java	2007-06-07 12:35:02 UTC (rev 2765)
+++ trunk/src/main/org/jboss/jms/server/bridge/Bridge.java	2007-06-07 12:44:03 UTC (rev 2766)
@@ -1255,7 +1255,7 @@
    {
       public void run()
       {
-      	if (trace) { log.trace("****  Failure handler running"); }
+      	if (trace) { log.trace("Failure handler running"); }
       	
          // Clear the messages
          messages.clear();

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2007-06-07 12:35:02 UTC (rev 2765)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2007-06-07 12:44:03 UTC (rev 2766)
@@ -85,6 +85,8 @@
 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
 import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
 import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
+import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock;
+import EDU.oswego.cs.dl.util.concurrent.WriterPreferenceReadWriteLock;
 
 /**
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
@@ -160,15 +162,7 @@
 
    private volatile boolean started;
    
-   //FIXME using a stopping flag is not a good approach and introduces a race condition
-   //http://jira.jboss.org/jira/browse/JBMESSAGING-819
-   //the code can check stopping and find it to be false, then the service can stop, setting stopping to true
-   //then actually stopping the post office, then the same thread that checked stopping continues and performs
-   //its action only to find the service stopped
-   //Should use a read-write lock instead
-   //One way to minimise the chance of the race happening is to sleep for a little while after setting stopping to true
-   //before actually stopping the service (see below)
-   private volatile boolean stopping;
+   private ReadWriteLock stoplock;
 
    private JChannelFactory jChannelFactory;
 
@@ -278,107 +272,128 @@
 
       this.jChannelFactory = JChannelFactory;
       
-      this.pooledExecutor = new PooledExecutor(new LinkedQueue(), poolSize);
+      pooledExecutor = new PooledExecutor(new LinkedQueue(), poolSize);
       
-      this.pooledExecutor.setMinimumPoolSize(poolSize);
+      pooledExecutor.setMinimumPoolSize(poolSize);
+      
+      stoplock = new WriterPreferenceReadWriteLock();
    }
 
    // MessagingComponent overrides -----------------------------------------------------------------
 
    public synchronized void start() throws Exception
    {
-      if (started)
-      {
-         log.warn("Attempt to start() but " + this + " is already started");
-      }
-
-      if (trace) { log.trace(this + " starting"); }
-
-      this.syncChannel = jChannelFactory.createSyncChannel();
-      this.asyncChannel = jChannelFactory.createASyncChannel();
-
-      // We don't want to receive local messages on any of the channels
-      syncChannel.setOpt(Channel.LOCAL, Boolean.FALSE);
-
-      asyncChannel.setOpt(Channel.LOCAL, Boolean.FALSE);
-
-      MessageListener cml = new ControlMessageListener();
-      MembershipListener ml = new ControlMembershipListener();
-      RequestHandler rh = new PostOfficeRequestHandler();
-
-      // register as a listener for nodeid-adress mapping events
-      nodeAddressMapListener = new NodeAddressMapListener();
-
-      registerListener(nodeAddressMapListener);
-
-      this.controlMessageDispatcher = new MessageDispatcher(syncChannel, cml, ml, rh, true);
-
-      Receiver r = new DataReceiver();
-      asyncChannel.setReceiver(r);
-
-      syncChannel.connect(groupName);
-      asyncChannel.connect(groupName);
-
-      super.start();
-      
-      //Sanity check - we check there aren't any other nodes already in the cluster with the same node id
-      if (knowAboutNodeId(currentNodeId))
-      {
-      	throw new IllegalArgumentException("Cannot start post office since there is already a post office in the " +
-      			"cluster with the same node id (" + this.currentNodeId + "). " +
-      			"Are you sure you have given each node a unique node id during installation?");
-      }
-
-      Address syncAddress = syncChannel.getLocalAddress();
-      Address asyncAddress = asyncChannel.getLocalAddress();
-      PostOfficeAddressInfo info = new PostOfficeAddressInfo(syncAddress, asyncAddress);
-      put(ADDRESS_INFO_KEY, info);
-
-      statsSender.start();
-
-      started = true;
-
-      log.debug(this + " started");
+   	stoplock.writeLock().acquire();
+   	
+   	try
+   	{	   	
+	      if (started)
+	      {
+	         log.warn("Attempt to start() but " + this + " is already started");
+	         
+	         return;
+	      }
+	
+	      if (trace) { log.trace(this + " starting"); }
+	
+	      this.syncChannel = jChannelFactory.createSyncChannel();
+	      this.asyncChannel = jChannelFactory.createASyncChannel();
+	
+	      // We don't want to receive local messages on any of the channels
+	      syncChannel.setOpt(Channel.LOCAL, Boolean.FALSE);
+	
+	      asyncChannel.setOpt(Channel.LOCAL, Boolean.FALSE);
+	
+	      MessageListener cml = new ControlMessageListener();
+	      MembershipListener ml = new ControlMembershipListener();
+	      RequestHandler rh = new PostOfficeRequestHandler();
+	
+	      // register as a listener for nodeid-adress mapping events
+	      nodeAddressMapListener = new NodeAddressMapListener();
+	
+	      registerListener(nodeAddressMapListener);
+	
+	      this.controlMessageDispatcher = new MessageDispatcher(syncChannel, cml, ml, rh, true);
+	
+	      Receiver r = new DataReceiver();
+	      asyncChannel.setReceiver(r);
+	
+	      syncChannel.connect(groupName);
+	      asyncChannel.connect(groupName);
+	
+	      super.start();
+	      
+	      //Sanity check - we check there aren't any other nodes already in the cluster with the same node id
+	      if (knowAboutNodeId(currentNodeId))
+	      {
+	      	throw new IllegalArgumentException("Cannot start post office since there is already a post office in the " +
+	      			"cluster with the same node id (" + this.currentNodeId + "). " +
+	      			"Are you sure you have given each node a unique node id during installation?");
+	      }
+	
+	      Address syncAddress = syncChannel.getLocalAddress();
+	      Address asyncAddress = asyncChannel.getLocalAddress();
+	      PostOfficeAddressInfo info = new PostOfficeAddressInfo(syncAddress, asyncAddress);
+	      put(ADDRESS_INFO_KEY, info);
+	
+	      statsSender.start();
+	
+	      started = true;
+	
+	      log.debug(this + " started");
+   	}
+   	finally
+   	{
+   		stoplock.writeLock().release();
+   	}
    }
 
    public synchronized void stop(boolean sendNotification) throws Exception
    {
       if (trace) { log.trace(this + " stopping"); }
-
-      if (!started)
+      
+      stoplock.writeLock().acquire();
+      
+      try
+      {	
+	      if (!started)
+	      {
+	         log.warn("Attempt to stop() but " + this + " is not started");
+	         
+	         return;
+	      }
+	      
+	      //Need to send this *before* stopping
+	      syncSendRequest(new LeaveClusterRequest(getNodeId()));
+	
+	      //FIXME http://jira.jboss.org/jira/browse/JBMESSAGING-819 this is a temporary kludge for now
+	      Thread.sleep(1000);
+	
+	      statsSender.stop();
+	
+	      super.stop(sendNotification);
+	      
+	      pooledExecutor.shutdownAfterProcessingCurrentlyQueuedTasks();
+	
+	      //  TODO in case of shared channels, we should have some sort of unsetReceiver(r)
+	      asyncChannel.setReceiver(null);
+	
+	      unregisterListener(nodeAddressMapListener);
+	
+	      // TODO - what happens if we share the channel? Don't we mess up the other applications this way?
+	      syncChannel.close();
+	
+	      // TODO - what happens if we share the channel? Don't we mess up the other applications this way?
+	      asyncChannel.close();
+	      
+	      started = false;
+	
+	      log.debug(this + " stopped");
+      }
+      finally
       {
-         log.warn("Attempt to stop() but " + this + " is not started");
-         return;
+      	stoplock.writeLock().release();
       }
-      
-      //Need to send this *before* stopping
-      syncSendRequest(new LeaveClusterRequest(getNodeId()));
-
-      stopping = true;
-      
-      //FIXME http://jira.jboss.org/jira/browse/JBMESSAGING-819 this is a temporary kludge for now
-      Thread.sleep(1000);
-
-      statsSender.stop();
-
-      super.stop(sendNotification);
-      
-      pooledExecutor.shutdownAfterProcessingCurrentlyQueuedTasks();
-
-      //  TODO in case of shared channels, we should have some sort of unsetReceiver(r)
-      asyncChannel.setReceiver(null);
-
-      unregisterListener(nodeAddressMapListener);
-
-      // TODO - what happens if we share the channel? Don't we mess up the other applications this way?
-      syncChannel.close();
-
-      // TODO - what happens if we share the channel? Don't we mess up the other applications this way?
-      asyncChannel.close();
-      
-      started = false;
-
-      log.debug(this + " stopped");
    }
 
    // NotificationBroadcaster implementation -------------------------------------------------------
@@ -713,15 +728,25 @@
     */
    public void asyncSendRequest(ClusterRequest request) throws Exception
    {
-      if (stopping)
-      {
-         return;
-      }
+   	stoplock.readLock().acquire();
+   	
+   	try
+   	{
+	      if (!started)
+	      {
+	         return;
+	      }
       
-      if (trace) { log.trace(this + " sending asynchronously " + request + " to group"); }
-
-      byte[] bytes = writeRequest(request);
-      asyncChannel.send(new Message(null, null, bytes));
+	      if (trace) { log.trace(this + " sending asynchronously " + request + " to group"); }
+	
+	      byte[] bytes = writeRequest(request);
+	      
+	      asyncChannel.send(new Message(null, null, bytes));
+   	}
+   	finally
+   	{
+   		stoplock.readLock().release();
+   	}
    }
 
    /*
@@ -729,22 +754,31 @@
     */
    public void asyncSendRequest(ClusterRequest request, int nodeId) throws Exception
    {
-      if (stopping)
-      {
-         return;
-      }
-      
-      Address address = this.getAddressForNodeId(nodeId, false);
-
-      if (address == null)
-      {
-         throw new IllegalArgumentException("Cannot find address for node " + nodeId);
-      }
-
-      if (trace) { log.trace(this + " sending asynchronously " + request + " to node  " + nodeId + "/" + address); }
-
-      byte[] bytes = writeRequest(request);
-      asyncChannel.send(new Message(address, null, bytes));
+   	stoplock.readLock().acquire();
+   	
+   	try
+   	{
+	      if (!started)
+	      {
+	         return;
+	      }
+	      
+	      Address address = this.getAddressForNodeId(nodeId, false);
+	
+	      if (address == null)
+	      {
+	         throw new IllegalArgumentException("Cannot find address for node " + nodeId);
+	      }
+	
+	      if (trace) { log.trace(this + " sending asynchronously " + request + " to node  " + nodeId + "/" + address); }
+	
+	      byte[] bytes = writeRequest(request);
+	      asyncChannel.send(new Message(address, null, bytes));
+   	}
+   	finally
+   	{
+   		stoplock.readLock().release();
+   	}
    }
 
    /*
@@ -1746,18 +1780,27 @@
     */
    private void syncSendRequest(ClusterRequest request) throws Exception
    {
-      if (stopping)
-      {
-         return;
-      }
-      
-      if (trace) { log.trace(this + " sending synch request " + request); }
-
-      Message message = new Message(null, null, writeRequest(request));
-
-      controlMessageDispatcher.castMessage(null, message, GroupRequest.GET_ALL, castTimeout);
-
-      if (trace) { log.trace(this + " request sent OK"); }
+   	stoplock.readLock().acquire();
+   	
+   	try
+   	{
+	      if (!started)
+	      {
+	         return;
+	      }
+	            
+	      if (trace) { log.trace(this + " sending synch request " + request); }
+	
+	      Message message = new Message(null, null, writeRequest(request));
+	
+	      controlMessageDispatcher.castMessage(null, message, GroupRequest.GET_ALL, castTimeout);
+	
+	      if (trace) { log.trace(this + " request sent OK"); }	      
+   	}
+   	finally
+   	{
+   		stoplock.readLock().release();
+   	}
    }
 
    //TODO - this is a bit tortuous - needs optimising
@@ -2249,78 +2292,102 @@
    {
       public byte[] getState()
       {
-         if (stopping)
-         {
-            return null;
-         }
-
-         try
-         {
-            lock.writeLock().acquire();
-         }
-         catch (InterruptedException e)
-         {
-            log.error("Thread Interrupted", e);
-         }
-         try
-         {
-            if (trace) { log.trace(DefaultClusteredPostOffice.this + ".ControlMessageListener got state"); }
-            return getStateAsBytes();
-         }
-         catch (Exception e)
-         {
-            log.error("Caught Exception in MessageListener", e);
-            IllegalStateException e2 = new IllegalStateException(e.getMessage());
-            e2.setStackTrace(e.getStackTrace());
-            throw e2;
-         }
-         finally
-         {
-            lock.writeLock().release();
-         }
+      	try
+         {	      	
+	      	//We need to get the stop lock
+	         stoplock.readLock().acquire();
+	         	      	
+	      	try
+	      	{	
+		         if (!started)
+		         {
+		         	return null;
+		         }
+	      		
+	      		//And the general lock
+		         lock.writeLock().acquire();
+		
+		         try
+		         {
+		            if (trace) { log.trace(DefaultClusteredPostOffice.this + ".ControlMessageListener got state"); }
+		            
+		            return getStateAsBytes();
+		         }
+		         catch (Exception e)
+		         {
+		            log.error("Caught Exception in MessageListener", e);
+		            IllegalStateException e2 = new IllegalStateException(e.getMessage());
+		            e2.setStackTrace(e.getStackTrace());
+		            throw e2;
+		         }
+		         finally
+		         {
+		            lock.writeLock().release();
+		         }
+	      	}
+	      	finally
+	      	{
+	      		stoplock.readLock().release();
+	      	}
+      	}
+      	catch (InterruptedException e)
+      	{
+      		log.error("Thread interrupted", e);
+      		
+      		return null;
+      	}
       }
 
       public void receive(Message message)
       {
-         if (stopping)
-         {
-            return;
-         }
       }
 
       public void setState(byte[] bytes)
       {
-         if (stopping)
-         {
-            return;
-         }
-
          if (bytes != null)
-         {
-            try
-            {
-               lock.writeLock().acquire();
+         {         	
+         	try
+            {	      	
+   	      	//We need to get the stop lock
+   	         stoplock.readLock().acquire();   	       
+   	      	
+   	      	try
+   	      	{	   
+      	         if (!started)
+      	         {
+      	         	return;
+      	         }
+   	      		
+   	      		//And the general lock
+		            lock.writeLock().acquire();
+	
+		            try
+		            {
+		               processStateBytes(bytes);
+		               
+		               if (trace) { log.trace(DefaultClusteredPostOffice.this + ".ControlMessageListener has set state"); }
+		            }
+		            catch (Exception e)
+		            {
+		               log.error("Caught Exception in MessageListener", e);
+		               IllegalStateException e2 = new IllegalStateException(e.getMessage());
+		               e2.setStackTrace(e.getStackTrace());
+		               throw e2;
+		            }
+		            finally
+		            {
+		               lock.writeLock().release();
+		            }
+		      	}
+		      	finally
+		      	{
+		      		stoplock.readLock().release();
+		      	}
             }
-            catch (InterruptedException e)
-            {
-               log.error("Thread interrupted", e);
-            }
-            try
-            {
-               processStateBytes(bytes);
-               if (trace) { log.trace(DefaultClusteredPostOffice.this + ".ControlMessageListener has set state"); }
-            }
-            catch (Exception e)
-            {
-               log.error("Caught Exception in MessageListener", e);
-               IllegalStateException e2 = new IllegalStateException(e.getMessage());
-               e2.setStackTrace(e.getStackTrace());
-               throw e2;
-            }
-            finally
-            {
-               lock.writeLock().release();
-            }
+         	catch (InterruptedException e)
+         	{
+         		log.error("Thread interrupted", e);
+         	}
          }
 
          synchronized (setStateLock)
@@ -2348,19 +2415,29 @@
 
       public void viewAccepted(View newView)
       {
-         if (stopping)
-         {
-            return;
-         }
-
+                  
          try
          {
-            // We queue up changes and execute them asynchronously.
-            // This is because JGroups will not let us do stuff like send synch messages using the
-            // same thread that delivered the view change and this is what we need to do in
-            // failover, for example.
-
-            viewExecutor.execute(new HandleViewAcceptedRunnable(newView));
+         	stoplock.readLock().acquire();
+         	         	
+         	try
+         	{         		
+            	if (!started)
+            	{
+            		return;
+            	}
+         	
+	            // We queue up changes and execute them asynchronously.
+	            // This is because JGroups will not let us do stuff like send synch messages using the
+	            // same thread that delivered the view change and this is what we need to do in
+	            // failover, for example.
+	
+	            viewExecutor.execute(new HandleViewAcceptedRunnable(newView));
+         	}
+         	finally
+         	{
+         		stoplock.readLock().release();
+         	}
          }
          catch (InterruptedException e)
          {
@@ -2490,28 +2567,47 @@
    {
       public Object handle(Message message)
       {
-         if (stopping)
-         {
-            return null;
-         }
-
-         if (trace) { log.trace(DefaultClusteredPostOffice.this + ".RequestHandler received " + message + " on the SYNC channel"); }
-
-         try
-         {
-            byte[] bytes = message.getBuffer();
-
-            ClusterRequest request = readRequest(bytes);
-
-            return request.execute(DefaultClusteredPostOffice.this);
-         }
-         catch (Throwable e)
-         {
-            log.error("Caught Exception in RequestHandler", e);
-            IllegalStateException e2 = new IllegalStateException(e.getMessage());
-            e2.setStackTrace(e.getStackTrace());
-            throw e2;
-         }
+      	try
+      	{
+	      	
+	      	stoplock.readLock().acquire();
+	      	
+	      	try
+	      	{
+	      		if (!started)
+	      		{
+	      			return null;
+	      		}
+	      		
+	      		if (trace) { log.trace(DefaultClusteredPostOffice.this + ".RequestHandler received " + message + " on the SYNC channel"); }
+	
+	            try
+	            {
+	               byte[] bytes = message.getBuffer();
+	
+	               ClusterRequest request = readRequest(bytes);
+	
+	               return request.execute(DefaultClusteredPostOffice.this);
+	            }
+	            catch (Throwable e)
+	            {
+	               log.error("Caught Exception in RequestHandler", e);
+	               IllegalStateException e2 = new IllegalStateException(e.getMessage());
+	               e2.setStackTrace(e.getStackTrace());
+	               throw e2;
+	            }
+	      	}
+	      	finally
+	      	{
+	      		stoplock.readLock().release();
+	      	}
+      	}
+      	catch (InterruptedException e)
+      	{
+      		log.error("Thread interrupted", e);
+      		
+      		return null;
+      	}
       }
    }
 




More information about the jboss-cvs-commits mailing list