[jboss-cvs] JBoss Messaging SVN: r2782 - trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Jun 14 08:16:17 EDT 2007


Author: timfox
Date: 2007-06-14 08:16:17 -0400 (Thu, 14 Jun 2007)
New Revision: 2782

Modified:
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
Log:
Reverted http://jira.jboss.com/jira/browse/JBMESSAGING-819


Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2007-06-13 18:15:18 UTC (rev 2781)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2007-06-14 12:16:17 UTC (rev 2782)
@@ -85,8 +85,6 @@
 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
 import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
 import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
-import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock;
-import EDU.oswego.cs.dl.util.concurrent.ReentrantWriterPreferenceReadWriteLock;
 
 /**
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
@@ -162,7 +160,15 @@
 
    private volatile boolean started;
    
-   private ReadWriteLock stoplock;
+   //FIXME using a stopping flag is not a good approach and introduces a race condition
+   //http://jira.jboss.org/jira/browse/JBMESSAGING-819
+   //the code can check stopping and find it to be false, then the service can stop, setting stopping to true
+   //then actually stopping the post office, then the same thread that checked stopping continues and performs
+   //its action only to find the service stopped
+   //Should use a read-write lock instead
+   //One way to minimise the chance of the race happening is to sleep for a little while after setting stopping to true
+   //before actually stopping the service (see below)
+   private volatile boolean stopping;
 
    private JChannelFactory jChannelFactory;
 
@@ -272,129 +278,107 @@
 
       this.jChannelFactory = JChannelFactory;
       
-      pooledExecutor = new PooledExecutor(new LinkedQueue(), poolSize);
+      this.pooledExecutor = new PooledExecutor(new LinkedQueue(), poolSize);
       
-      pooledExecutor.setMinimumPoolSize(poolSize);
-      
-      stoplock = new ReentrantWriterPreferenceReadWriteLock();
+      this.pooledExecutor.setMinimumPoolSize(poolSize);
    }
 
    // MessagingComponent overrides -----------------------------------------------------------------
 
    public synchronized void start() throws Exception
    {
-   	stoplock.writeLock().acquire();
-   	
-   	try
-   	{	   	
-	      if (started)
-	      {
-	         log.warn("Attempt to start() but " + this + " is already started");
-	         
-	         return;
-	      }
-	
-	      if (trace) { log.trace(this + " starting"); }
-	      
-	      //We set started = true at the beginning since otherwise when state arrives it will get rejected
-	      started = true;
-	   		
-	      this.syncChannel = jChannelFactory.createSyncChannel();
-	      this.asyncChannel = jChannelFactory.createASyncChannel();
-	
-	      // We don't want to receive local messages on any of the channels
-	      syncChannel.setOpt(Channel.LOCAL, Boolean.FALSE);
-	
-	      asyncChannel.setOpt(Channel.LOCAL, Boolean.FALSE);
-	
-	      MessageListener cml = new ControlMessageListener();
-	      MembershipListener ml = new ControlMembershipListener();
-	      RequestHandler rh = new PostOfficeRequestHandler();
-	
-	      // register as a listener for nodeid-adress mapping events
-	      nodeAddressMapListener = new NodeAddressMapListener();
-	
-	      registerListener(nodeAddressMapListener);
-	
-	      this.controlMessageDispatcher = new MessageDispatcher(syncChannel, cml, ml, rh, true);
-	
-	      Receiver r = new DataReceiver();
-	      asyncChannel.setReceiver(r);
-	
-	      syncChannel.connect(groupName);
-	      asyncChannel.connect(groupName);
-	
-	      super.start();
-	      
-	      //Sanity check - we check there aren't any other nodes already in the cluster with the same node id
-	      if (knowAboutNodeId(currentNodeId))
-	      {
-	      	throw new IllegalArgumentException("Cannot start post office since there is already a post office in the " +
-	      			"cluster with the same node id (" + this.currentNodeId + "). " +
-	      			"Are you sure you have given each node a unique node id during installation?");
-	      }
-	
-	      Address syncAddress = syncChannel.getLocalAddress();
-	      Address asyncAddress = asyncChannel.getLocalAddress();
-	      PostOfficeAddressInfo info = new PostOfficeAddressInfo(syncAddress, asyncAddress);
-	      put(ADDRESS_INFO_KEY, info);
-	
-	      statsSender.start();
-	
-	      log.debug(this + " started");
-   	}
-   	finally
-   	{
-   		stoplock.writeLock().release();
-   	}
+      if (started)
+      {
+         log.warn("Attempt to start() but " + this + " is already started");
+      }
+
+      if (trace) { log.trace(this + " starting"); }
+
+      this.syncChannel = jChannelFactory.createSyncChannel();
+      this.asyncChannel = jChannelFactory.createASyncChannel();
+
+      // We don't want to receive local messages on any of the channels
+      syncChannel.setOpt(Channel.LOCAL, Boolean.FALSE);
+
+      asyncChannel.setOpt(Channel.LOCAL, Boolean.FALSE);
+
+      MessageListener cml = new ControlMessageListener();
+      MembershipListener ml = new ControlMembershipListener();
+      RequestHandler rh = new PostOfficeRequestHandler();
+
+      // register as a listener for nodeid-adress mapping events
+      nodeAddressMapListener = new NodeAddressMapListener();
+
+      registerListener(nodeAddressMapListener);
+
+      this.controlMessageDispatcher = new MessageDispatcher(syncChannel, cml, ml, rh, true);
+
+      Receiver r = new DataReceiver();
+      asyncChannel.setReceiver(r);
+
+      syncChannel.connect(groupName);
+      asyncChannel.connect(groupName);
+
+      super.start();
+      
+      //Sanity check - we check there aren't any other nodes already in the cluster with the same node id
+      if (knowAboutNodeId(currentNodeId))
+      {
+      	throw new IllegalArgumentException("Cannot start post office since there is already a post office in the " +
+      			"cluster with the same node id (" + this.currentNodeId + "). " +
+      			"Are you sure you have given each node a unique node id during installation?");
+      }
+
+      Address syncAddress = syncChannel.getLocalAddress();
+      Address asyncAddress = asyncChannel.getLocalAddress();
+      PostOfficeAddressInfo info = new PostOfficeAddressInfo(syncAddress, asyncAddress);
+      put(ADDRESS_INFO_KEY, info);
+
+      statsSender.start();
+
+      started = true;
+
+      log.debug(this + " started");
    }
 
    public synchronized void stop(boolean sendNotification) throws Exception
    {
       if (trace) { log.trace(this + " stopping"); }
-      
-      stoplock.writeLock().acquire();
-      
-      try
-      {	
-	      if (!started)
-	      {
-	         log.warn("Attempt to stop() but " + this + " is not started");
-	         
-	         return;
-	      }
-	      
-	      started = false;	   	
-	      
-	      //Need to send this *before* stopping
-	      syncSendRequest(new LeaveClusterRequest(getNodeId()));
-	
-	      //FIXME http://jira.jboss.org/jira/browse/JBMESSAGING-819 this is a temporary kludge for now
-	      Thread.sleep(1000);
-	
-	      statsSender.stop();
-	
-	      super.stop(sendNotification);
-	      
-	      pooledExecutor.shutdownAfterProcessingCurrentlyQueuedTasks();
-	
-	      //  TODO in case of shared channels, we should have some sort of unsetReceiver(r)
-	      asyncChannel.setReceiver(null);
-	
-	      unregisterListener(nodeAddressMapListener);
-	
-	      // TODO - what happens if we share the channel? Don't we mess up the other applications this way?
-	      syncChannel.close();
-	
-	      // TODO - what happens if we share the channel? Don't we mess up the other applications this way?
-	      asyncChannel.close();
-	      
-	      log.debug(this + " stopped");
-      }
-      finally
+
+      if (!started)
       {
-      	stoplock.writeLock().release();
+         log.warn("Attempt to stop() but " + this + " is not started");
+         return;
       }
+      
+      //Need to send this *before* stopping
+      syncSendRequest(new LeaveClusterRequest(getNodeId()));
+
+      stopping = true;
+      
+      //FIXME http://jira.jboss.org/jira/browse/JBMESSAGING-819 this is a temporary kludge for now
+      Thread.sleep(1000);
+
+      statsSender.stop();
+
+      super.stop(sendNotification);
+      
+      pooledExecutor.shutdownAfterProcessingCurrentlyQueuedTasks();
+
+      //  TODO in case of shared channels, we should have some sort of unsetReceiver(r)
+      asyncChannel.setReceiver(null);
+
+      unregisterListener(nodeAddressMapListener);
+
+      // TODO - what happens if we share the channel? Don't we mess up the other applications this way?
+      syncChannel.close();
+
+      // TODO - what happens if we share the channel? Don't we mess up the other applications this way?
+      asyncChannel.close();
+      
+      started = false;
+
+      log.debug(this + " stopped");
    }
 
    // NotificationBroadcaster implementation -------------------------------------------------------
@@ -729,25 +713,15 @@
     */
    public void asyncSendRequest(ClusterRequest request) throws Exception
    {
-   	stoplock.readLock().acquire();
-   	
-   	try
-   	{
-	      if (!started)
-	      {
-	         return;
-	      }
+      if (stopping)
+      {
+         return;
+      }
       
-	      if (trace) { log.trace(this + " sending asynchronously " + request + " to group"); }
-	
-	      byte[] bytes = writeRequest(request);
-	      
-	      asyncChannel.send(new Message(null, null, bytes));
-   	}
-   	finally
-   	{
-   		stoplock.readLock().release();
-   	}
+      if (trace) { log.trace(this + " sending asynchronously " + request + " to group"); }
+
+      byte[] bytes = writeRequest(request);
+      asyncChannel.send(new Message(null, null, bytes));
    }
 
    /*
@@ -755,31 +729,22 @@
     */
    public void asyncSendRequest(ClusterRequest request, int nodeId) throws Exception
    {
-   	stoplock.readLock().acquire();
-   	
-   	try
-   	{
-	      if (!started)
-	      {
-	         return;
-	      }
-	      
-	      Address address = this.getAddressForNodeId(nodeId, false);
-	
-	      if (address == null)
-	      {
-	         throw new IllegalArgumentException("Cannot find address for node " + nodeId);
-	      }
-	
-	      if (trace) { log.trace(this + " sending asynchronously " + request + " to node  " + nodeId + "/" + address); }
-	
-	      byte[] bytes = writeRequest(request);
-	      asyncChannel.send(new Message(address, null, bytes));
-   	}
-   	finally
-   	{
-   		stoplock.readLock().release();
-   	}
+      if (stopping)
+      {
+         return;
+      }
+      
+      Address address = this.getAddressForNodeId(nodeId, false);
+
+      if (address == null)
+      {
+         throw new IllegalArgumentException("Cannot find address for node " + nodeId);
+      }
+
+      if (trace) { log.trace(this + " sending asynchronously " + request + " to node  " + nodeId + "/" + address); }
+
+      byte[] bytes = writeRequest(request);
+      asyncChannel.send(new Message(address, null, bytes));
    }
 
    /*
@@ -1781,27 +1746,18 @@
     */
    private void syncSendRequest(ClusterRequest request) throws Exception
    {
-   	stoplock.readLock().acquire();
-   	
-   	try
-   	{
-	      if (!started)
-	      {
-	         return;
-	      }
-	            
-	      if (trace) { log.trace(this + " sending synch request " + request); }
-	
-	      Message message = new Message(null, null, writeRequest(request));
-	
-	      controlMessageDispatcher.castMessage(null, message, GroupRequest.GET_ALL, castTimeout);
-	
-	      if (trace) { log.trace(this + " request sent OK"); }	      
-   	}
-   	finally
-   	{
-   		stoplock.readLock().release();
-   	}
+      if (stopping)
+      {
+         return;
+      }
+      
+      if (trace) { log.trace(this + " sending synch request " + request); }
+
+      Message message = new Message(null, null, writeRequest(request));
+
+      controlMessageDispatcher.castMessage(null, message, GroupRequest.GET_ALL, castTimeout);
+
+      if (trace) { log.trace(this + " request sent OK"); }
    }
 
    //TODO - this is a bit tortuous - needs optimising
@@ -2293,95 +2249,82 @@
    {
       public byte[] getState()
       {
-      	try
-         {	      	
-	      	//We need to get the stop lock
-	         stoplock.readLock().acquire();
-	         	      	
-	      	try
-	      	{	
-		         if (!started)
-		         {
-		         	return null;
-		         }
-	      		
-	      		//And the general lock
-		         lock.writeLock().acquire();
-		
-		         try
-		         {
-		            if (trace) { log.trace(DefaultClusteredPostOffice.this + ".ControlMessageListener got state"); }
-		            
-		            return getStateAsBytes();
-		         }
-		         catch (Exception e)
-		         {
-		            log.error("Caught Exception in MessageListener", e);
-		            IllegalStateException e2 = new IllegalStateException(e.getMessage());
-		            e2.setStackTrace(e.getStackTrace());
-		            throw e2;
-		         }
-		         finally
-		         {
-		            lock.writeLock().release();
-		         }
-	      	}
-	      	finally
-	      	{
-	      		stoplock.readLock().release();
-	      	}
-      	}
-      	catch (InterruptedException e)
-      	{
-      		log.error("Thread interrupted", e);
-      		
-      		return null;
-      	}
+         if (stopping)
+         {
+            return null;
+         }
+
+         try
+         {
+            lock.writeLock().acquire();
+         }
+         catch (InterruptedException e)
+         {
+            log.error("Thread Interrupted", e);
+         }
+         try
+         {
+            if (trace) { log.trace(DefaultClusteredPostOffice.this + ".ControlMessageListener got state"); }
+            return getStateAsBytes();
+         }
+         catch (Exception e)
+         {
+            log.error("Caught Exception in MessageListener", e);
+            IllegalStateException e2 = new IllegalStateException(e.getMessage());
+            e2.setStackTrace(e.getStackTrace());
+            throw e2;
+         }
+         finally
+         {
+            lock.writeLock().release();
+         }
       }
 
       public void receive(Message message)
       {
+         if (stopping)
+         {
+            return;
+         }
       }
 
       public void setState(byte[] bytes)
       {
-         if (bytes != null)
-         {         	
-         	try
-            {	      	         		
-         		log.info("Receiving state!!!");
-         		
-	            lock.writeLock().acquire();
+         if (stopping)
+         {
+            return;
+         }
 
-	            try
-	            {
-	            	log.info("Processing state!");
-	            	
-	               processStateBytes(bytes);
-	               
-	               if (trace) { log.trace(DefaultClusteredPostOffice.this + ".ControlMessageListener has set state"); }
-	            }
-	            catch (Exception e)
-	            {
-	               log.error("Caught Exception in MessageListener", e);
-	               IllegalStateException e2 = new IllegalStateException(e.getMessage());
-	               e2.setStackTrace(e.getStackTrace());
-	               throw e2;
-	            }
-	            finally
-	            {
-	               lock.writeLock().release();
-	            }
+         if (bytes != null)
+         {
+            try
+            {
+               lock.writeLock().acquire();
             }
-         	catch (InterruptedException e)
-         	{
-         		log.error("Thread interrupted", e);
-         	}
+            catch (InterruptedException e)
+            {
+               log.error("Thread interrupted", e);
+            }
+            try
+            {
+               processStateBytes(bytes);
+               if (trace) { log.trace(DefaultClusteredPostOffice.this + ".ControlMessageListener has set state"); }
+            }
+            catch (Exception e)
+            {
+               log.error("Caught Exception in MessageListener", e);
+               IllegalStateException e2 = new IllegalStateException(e.getMessage());
+               e2.setStackTrace(e.getStackTrace());
+               throw e2;
+            }
+            finally
+            {
+               lock.writeLock().release();
+            }
          }
 
          synchronized (setStateLock)
          {
-         	log.info("notifying set state lock");
             stateSet = true;
             setStateLock.notify();
          }
@@ -2405,29 +2348,19 @@
 
       public void viewAccepted(View newView)
       {
-                  
+         if (stopping)
+         {
+            return;
+         }
+
          try
          {
-         	stoplock.readLock().acquire();
-         	         	
-         	try
-         	{         		
-            	if (!started)
-            	{
-            		return;
-            	}
-         	
-	            // We queue up changes and execute them asynchronously.
-	            // This is because JGroups will not let us do stuff like send synch messages using the
-	            // same thread that delivered the view change and this is what we need to do in
-	            // failover, for example.
-	
-	            viewExecutor.execute(new HandleViewAcceptedRunnable(newView));
-         	}
-         	finally
-         	{
-         		stoplock.readLock().release();
-         	}
+            // We queue up changes and execute them asynchronously.
+            // This is because JGroups will not let us do stuff like send synch messages using the
+            // same thread that delivered the view change and this is what we need to do in
+            // failover, for example.
+
+            viewExecutor.execute(new HandleViewAcceptedRunnable(newView));
          }
          catch (InterruptedException e)
          {
@@ -2557,47 +2490,28 @@
    {
       public Object handle(Message message)
       {
-      	try
-      	{
-	      	
-	      	stoplock.readLock().acquire();
-	      	
-	      	try
-	      	{
-	      		if (!started)
-	      		{
-	      			return null;
-	      		}
-	      		
-	      		if (trace) { log.trace(DefaultClusteredPostOffice.this + ".RequestHandler received " + message + " on the SYNC channel"); }
-	
-	            try
-	            {
-	               byte[] bytes = message.getBuffer();
-	
-	               ClusterRequest request = readRequest(bytes);
-	
-	               return request.execute(DefaultClusteredPostOffice.this);
-	            }
-	            catch (Throwable e)
-	            {
-	               log.error("Caught Exception in RequestHandler", e);
-	               IllegalStateException e2 = new IllegalStateException(e.getMessage());
-	               e2.setStackTrace(e.getStackTrace());
-	               throw e2;
-	            }
-	      	}
-	      	finally
-	      	{
-	      		stoplock.readLock().release();
-	      	}
-      	}
-      	catch (InterruptedException e)
-      	{
-      		log.error("Thread interrupted", e);
-      		
-      		return null;
-      	}
+         if (stopping)
+         {
+            return null;
+         }
+
+         if (trace) { log.trace(DefaultClusteredPostOffice.this + ".RequestHandler received " + message + " on the SYNC channel"); }
+
+         try
+         {
+            byte[] bytes = message.getBuffer();
+
+            ClusterRequest request = readRequest(bytes);
+
+            return request.execute(DefaultClusteredPostOffice.this);
+         }
+         catch (Throwable e)
+         {
+            log.error("Caught Exception in RequestHandler", e);
+            IllegalStateException e2 = new IllegalStateException(e.getMessage());
+            e2.setStackTrace(e.getStackTrace());
+            throw e2;
+         }
       }
    }
 




More information about the jboss-cvs-commits mailing list