[jboss-cvs] JBoss Messaging SVN: r3013 - in trunk/src: main/org/jboss/messaging/core/impl/postoffice and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Aug 15 08:08:02 EDT 2007


Author: timfox
Date: 2007-08-15 08:08:02 -0400 (Wed, 15 Aug 2007)
New Revision: 3013

Modified:
   trunk/src/etc/server/default/deploy/mysql-persistence-service.xml
   trunk/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java
   trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
Log:
Interim commit for group member cleanup


Modified: trunk/src/etc/server/default/deploy/mysql-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/mysql-persistence-service.xml	2007-08-15 08:43:42 UTC (rev 3012)
+++ trunk/src/etc/server/default/deploy/mysql-persistence-service.xml	2007-08-15 12:08:02 UTC (rev 3013)
@@ -141,51 +141,101 @@
       <attribute name="ChannelPartitionName">${jboss.partition.name:DefaultPartition}-JMS</attribute>
       -->
       
-      <!-- JGroups stack configuration for the data channel - used when casting messages across the cluster -->               
+      <!-- JGroups stack configuration for the data channel - used for sending data across the cluster -->               
       
       <attribute name="DataChannelConfig">
          <config>
-            <UDP mcast_recv_buf_size="500000" down_thread="false" ip_mcast="true" mcast_send_buf_size="32000"
-           mcast_port="45567" ucast_recv_buf_size="500000" use_incoming_packet_handler="false"
-           mcast_addr="228.8.8.8" use_outgoing_packet_handler="true" loopback="true" ucast_send_buf_size="32000" ip_ttl="32"/>
-            <AUTOCONF down_thread="false" up_thread="false"/>
+            <UDP
+               mcast_addr="228.8.8.8"
+               mcast_port="45567"
+               tos="8"
+               ucast_recv_buf_size="20000000"
+               ucast_send_buf_size="640000"
+               mcast_recv_buf_size="25000000"
+               mcast_send_buf_size="640000"
+               loopback="false"
+               discard_incompatible_packets="true"
+               max_bundle_size="64000"
+               max_bundle_timeout="30"
+               use_incoming_packet_handler="true"
+               use_outgoing_packet_handler="false"
+               ip_ttl="2"
+               down_thread="false" up_thread="false"
+               enable_bundling="false"/>
             <PING timeout="2000" down_thread="false" num_initial_members="3" up_thread="false"/>
-            <MERGE2 max_interval="10000" down_thread="false" min_interval="5000" up_thread="false"/>
+            <MERGE2 max_interval="100000" down_thread="false" min_interval="20000" up_thread="false"/>
             <FD_SOCK down_thread="false" up_thread="false"/>
-            <FD timeout="20000" max_tries="3" down_thread="false" up_thread="false" shun="true"/>
+            <FD timeout="10000" max_tries="5" down_thread="false" up_thread="false" shun="true"/>
             <VERIFY_SUSPECT timeout="1500" down_thread="false" up_thread="false"/>
-            <pbcast.NAKACK max_xmit_size="8192" down_thread="false" use_mcast_xmit="true" gc_lag="50" up_thread="false"
-                         retransmit_timeout="100,200,600,1200,2400,4800"/>
-            <UNICAST timeout="1200,2400,3600" down_thread="false" up_thread="false"/>
-            <pbcast.STABLE stability_delay="1000" desired_avg_gossip="20000" down_thread="false" max_bytes="0" up_thread="false"/>
-            <FRAG frag_size="8192" down_thread="false" up_thread="false"/>
-            <VIEW_SYNC avg_send_interval="60000" down_thread="false" up_thread="false" />
-            <pbcast.GMS print_local_addr="true" join_timeout="3000" down_thread="false" join_retry_timeout="2000" up_thread="false" shun="true"/>
+            <pbcast.NAKACK
+               max_xmit_size="60000"
+               use_mcast_xmit="false"
+               gc_lag="0"
+               retransmit_timeout="300,600,1200,2400,4800"
+               down_thread="false" up_thread="false"
+               discard_delivered_msgs="true"/>
+            <UNICAST timeout="300,600,1200,2400,3600"
+                     down_thread="false" up_thread="false"/>
+            <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
+                           down_thread="false" up_thread="false"
+                           max_bytes="400000"/>
+            <pbcast.GMS print_local_addr="true" join_timeout="3000" use_flush="true" flush_timeout="3000"
+                        down_thread="false" up_thread="false"
+                        join_retry_timeout="2000" shun="false"
+                        view_bundling="true"
+                        view_ack_collection_timeout="5000"/>
+            <FC max_credits="2000000" down_thread="false" up_thread="false"
+                min_threshold="0.10"/>
+            <FRAG2 frag_size="60000" down_thread="false" up_thread="false"/>      
          </config>
       </attribute>
       
-      <!-- JGroups stack configuration to use for the control channel - used for bind/unbind requests amongst others -->           
-                  
+      <!-- JGroups stack configuration to use for the control channel - used for control messages -->         
+       
       <attribute name="ControlChannelConfig">
          <config>
-            <UDP mcast_recv_buf_size="500000" down_thread="false" ip_mcast="true" mcast_send_buf_size="32000"
-           mcast_port="45568" ucast_recv_buf_size="500000" use_incoming_packet_handler="false"
-           mcast_addr="228.8.8.8" use_outgoing_packet_handler="true" loopback="true" ucast_send_buf_size="32000" ip_ttl="32"/>
-            <AUTOCONF down_thread="false" up_thread="false"/>
-            <PING timeout="2000" down_thread="false" num_initial_members="3" up_thread="false"/>
-            <MERGE2 max_interval="10000" down_thread="false" min_interval="5000" up_thread="false"/>
+            <UDP
+                 mcast_addr="228.8.8.8"
+                 mcast_port="45568"
+                 tos="8"
+                 ucast_recv_buf_size="20000000"
+                 ucast_send_buf_size="640000"
+                 mcast_recv_buf_size="25000000"
+                 mcast_send_buf_size="640000"
+                 loopback="false"
+                 discard_incompatible_packets="true"
+                 max_bundle_size="64000"
+                 max_bundle_timeout="30"
+                 use_incoming_packet_handler="true"
+                 use_outgoing_packet_handler="false"
+                 ip_ttl="2"
+                 down_thread="false" up_thread="false"
+                 enable_bundling="false"/>
+            <PING timeout="2000"
+                  down_thread="false" up_thread="false" num_initial_members="3"/>
+            <MERGE2 max_interval="100000"
+                    down_thread="false" up_thread="false" min_interval="20000"/>
             <FD_SOCK down_thread="false" up_thread="false"/>
-            <FD timeout="20000" max_tries="3" down_thread="false" up_thread="false" shun="true"/>
+            <FD timeout="10000" max_tries="5" down_thread="false" up_thread="false" shun="true"/>
             <VERIFY_SUSPECT timeout="1500" down_thread="false" up_thread="false"/>
-            <pbcast.NAKACK max_xmit_size="8192" down_thread="false" use_mcast_xmit="true" gc_lag="50" up_thread="false"
-                         retransmit_timeout="100,200,600,1200,2400,4800"/>
-            <UNICAST timeout="1200,2400,3600" down_thread="false" up_thread="false"/>
-            <pbcast.STABLE stability_delay="1000" desired_avg_gossip="20000" down_thread="false" max_bytes="0" up_thread="false"/>
-            <FRAG frag_size="8192" down_thread="false" up_thread="false"/>
-            <VIEW_SYNC avg_send_interval="60000" down_thread="false" up_thread="false" />
-            <pbcast.GMS print_local_addr="true" join_timeout="3000" down_thread="false" join_retry_timeout="2000" up_thread="false" shun="true"/>
-            <pbcast.STATE_TRANSFER down_thread="false" up_thread="false"/>
-         </config>
+            <pbcast.NAKACK max_xmit_size="60000"
+                           use_mcast_xmit="false" gc_lag="0"
+                           retransmit_timeout="300,600,1200,2400,4800"
+                           down_thread="false" up_thread="false"
+                           discard_delivered_msgs="true"/>
+            <UNICAST timeout="300,600,1200,2400,3600"
+                     down_thread="false" up_thread="false"/>
+            <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
+                           down_thread="false" up_thread="false"
+                           max_bytes="400000"/>
+            <pbcast.GMS print_local_addr="true" join_timeout="3000" use_flush="true" flush_timeout="3000"
+                        down_thread="false" up_thread="false"
+                        join_retry_timeout="2000" shun="false"
+                        view_bundling="true"/>
+            <FRAG2 frag_size="60000" down_thread="false" up_thread="false"/>
+            <pbcast.STATE_TRANSFER down_thread="false" up_thread="false" use_flush="true" flush_timeout="3000"/>
+            <pbcast.FLUSH down_thread="false" up_thread="false" timeout="20000" auto_flush_conf="false"/>
+        </config>
       </attribute>
    </mbean>
    

Modified: trunk/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java	2007-08-15 08:43:42 UTC (rev 3012)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java	2007-08-15 12:08:02 UTC (rev 3013)
@@ -42,11 +42,6 @@
 import org.jgroups.util.Rsp;
 import org.jgroups.util.RspList;
 
-import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
-import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock;
-import EDU.oswego.cs.dl.util.concurrent.ReentrantWriterPreferenceReadWriteLock;
-
 /**
  * 
  * This class handles the interface with JGroups
@@ -83,15 +78,18 @@
    
    private volatile View currentView;
    
-   private QueuedExecutor viewExecutor;
-   
    private Object waitLock = new Object();
+   
+   private static final int STOPPED = 1;
 
-   //Still needs to be volatile since the ReadWriteLock won't synchronize between threads
-   private volatile boolean started;
+   private static final int WAITING_FOR_FIRST_VIEW = 2;
    
-   private ReadWriteLock lock;
+   private static final int WAITING_FOR_STATE = 3;
    
+   private static final int STARTED = 4;
+   
+   private volatile int startedState;
+   
    public GroupMember(String groupName, long stateTimeout, long castTimeout,
    		             JChannelFactory jChannelFactory, RequestTarget requestTarget,
    		             GroupListener groupListener)
@@ -107,115 +105,99 @@
    	this.requestTarget = requestTarget;
    	
    	this.groupListener = groupListener;
-   	 	
-   	this.lock = new ReentrantWriterPreferenceReadWriteLock();
    }
      
    public void start() throws Exception
-   {
-   	lock.writeLock().acquire();
+   {		
+   	this.controlChannel = jChannelFactory.createControlChannel();
    	
-   	try
-   	{
-      	this.viewExecutor = new QueuedExecutor(new LinkedQueue());
-      	     		
-	   	this.controlChannel = jChannelFactory.createControlChannel();
-	   	
-	      this.dataChannel = jChannelFactory.createDataChannel();
-	
-	      // We don't want to receive local messages on any of the channels
-	      controlChannel.setOpt(Channel.LOCAL, Boolean.FALSE);
-	
-	      dataChannel.setOpt(Channel.LOCAL, Boolean.FALSE);
-	      
-	      MessageListener messageListener = new ControlMessageListener();
-	      
-	      MembershipListener membershipListener = new ControlMembershipListener();
-	      
-	      RequestHandler requestHandler = new ControlRequestHandler();
-	      
-	      dispatcher = new MessageDispatcher(controlChannel, messageListener, membershipListener, requestHandler, true);
-	      	      
-	      Receiver dataReceiver = new DataReceiver();
-	      
-	      dataChannel.setReceiver(dataReceiver);
-	
-	      controlChannel.connect(groupName);
-	      
-	      dataChannel.connect(groupName);
-	      
-	      //Note we're not started until getState() is successfullly called
-	      
-	      if (!getState())
-      	{
-      		if (trace) { log.trace(this + " is the first member of group"); }
-      	}
-      	else
-      	{
-      		if (trace) { log.trace(this + " is not the first member of group"); }
-      	}
-	      
-	      //Wait for the first view to arrive
+      this.dataChannel = jChannelFactory.createDataChannel();
+      
+      this.startedState = STOPPED;
 
-      	synchronized (waitLock)
-      	{ 
-   			long timeRemaining = 5000; //Hardcoded for now
-   			
-   			long start = System.currentTimeMillis();
-   			
-      		while (currentView == null && timeRemaining > 0)
-      		{
-      			waitLock.wait(stateTimeout);
-      			
-      			if (currentView == null)
-      			{
-      				long waited = System.currentTimeMillis() - start;
-      				
-      				timeRemaining -= waited;
-      			}
-      		}
-      		
-      		if (currentView == null)
-      		{
-      			throw new IllegalStateException("Timed out waiting for first view to arrive");
-      		}
-      	}
-      	
-	   	//Now we can be considered started
-	   	started = true;	   	
+      // We don't want to receive local messages on any of the channels
+      controlChannel.setOpt(Channel.LOCAL, Boolean.FALSE);
+
+      dataChannel.setOpt(Channel.LOCAL, Boolean.FALSE);
+      
+      MessageListener messageListener = new ControlMessageListener();
+      
+      MembershipListener membershipListener = new ControlMembershipListener();
+      
+      RequestHandler requestHandler = new ControlRequestHandler();
+      
+      dispatcher = new MessageDispatcher(controlChannel, messageListener, membershipListener, requestHandler, true);
+      	      
+      Receiver dataReceiver = new DataReceiver();
+      
+      dataChannel.setReceiver(dataReceiver);
+      
+      this.startedState = WAITING_FOR_FIRST_VIEW;
+
+      controlChannel.connect(groupName);
+         
+      //The first thing that happens after connect is a view change arrives
+      //Then the state will arrive (if we are not the first member)
+      //Then the control messages will start arriving.
+      //We can guarantee that messages won't arrive until after the state is set because we use 
+      //the FLUSH protocol on the control channel
+      
+      //First wait for view
+      waitForStateChange(WAITING_FOR_STATE);
+      
+      log.debug("First view arrived");
+      
+      //Now wait for state if we are not the first member
+      
+      if (controlChannel.getState(null, stateTimeout))
+   	{
+   		//We are not the first member of the group, so let's wait for state to be got and processed
+   		
+   		waitForStateChange(STARTED);
+   		
+   		log.debug("State arrived");
    	}
-   	finally
+   	else
    	{
-   		lock.writeLock().release();
+   		//We are the first member, no need to wait
+   		
+   		startedState = STARTED;
+   		
+   		log.debug("We are the first member of the group so no need to wait for state");
    	}
+      
+      //Now connect the data channel.
+   	
+      dataChannel.connect(groupName);
    }
       
    public void stop() throws Exception
-   {
-   	lock.writeLock().acquire();
-   	
+   {	
    	try
-   	{	   	
-   		viewExecutor.shutdownAfterProcessingCurrentTask();
-   		
-	   	controlChannel.close();
-	   	
-	   	dataChannel.close();
-	   	
-	   	controlChannel = null;
-	   	
-	   	dataChannel = null;
-	   	
-	   	currentView = null;
-	   	
-	   	viewExecutor = null;
-
-	   	started = false;
+   	{
+   		dataChannel.close();
    	}
-   	finally
+   	catch (Exception e)
    	{
-   		lock.writeLock().release();
+   		log.debug("Failed to close data channel", e);
    	}
+   	   	
+   	try
+   	{
+   		controlChannel.close();
+   	}
+   	catch (Exception e)
+   	{
+   		log.debug("Failed to close control channel", e);
+   	}
+   	
+   	controlChannel = null;
+   	
+   	dataChannel = null;
+   	
+   	currentView = null;
+   	
+   	startedState = STOPPED;
    }
    
    public Address getSyncAddress()
@@ -240,106 +222,53 @@
    
    public void multicastControl(ClusterRequest request, boolean sync) throws Exception
    {
-   	lock.readLock().acquire();
-   	
-   	try
-   	{   	
-	   	if (started)
-	   	{   		
-		   	if (trace) { log.trace(this + " multicasting " + request + " to control channel, sync=" + sync); }
-		
-		   	Message message = new Message(null, null, writeRequest(request));
+   	if (startedState == STARTED)
+   	{   		
+	   	if (trace) { log.trace(this + " multicasting " + request + " to control channel, sync=" + sync); }
+	
+	   	Message message = new Message(null, null, writeRequest(request));
 
-		   	RspList rspList =
-		   		dispatcher.castMessage(null, message, sync ? GroupRequest.GET_ALL: GroupRequest.GET_NONE, castTimeout);	
+	   	RspList rspList =
+	   		dispatcher.castMessage(null, message, sync ? GroupRequest.GET_ALL: GroupRequest.GET_NONE, castTimeout);	
+	   	
+	   	if (sync)
+	   	{			   	
+		   	Iterator iter = rspList.values().iterator();
 		   	
-		   	if (sync)
-		   	{			   	
-			   	Iterator iter = rspList.values().iterator();
-			   	
-			   	while (iter.hasNext())
-			   	{
-			   		Rsp rsp = (Rsp)iter.next();
-			   		
-			   		if (!rsp.wasReceived())
-			   		{
-			   			throw new IllegalStateException(this + " response not received from " + rsp.getSender() + " - there may be others");
-			   		}
-			   	}		
-		   	}
+		   	while (iter.hasNext())
+		   	{
+		   		Rsp rsp = (Rsp)iter.next();
+		   		
+		   		if (!rsp.wasReceived())
+		   		{
+		   			throw new IllegalStateException(this + " response not received from " + rsp.getSender() + " - there may be others");
+		   		}
+		   	}		
 	   	}
    	}
-   	finally
-   	{
-   		lock.readLock().release();
-   	}
    }
    
-   public void unicastControl(ClusterRequest request, Address address, boolean sync) throws Exception
-   {
-   	lock.readLock().acquire();
-   	
-   	try
-   	{   	
-	   	if (started)
-	   	{   		
-		   	if (trace) { log.trace(this + " multicasting " + request + " to control channel, sync=" + sync); }
-		
-		   	Message message = new Message(address, null, writeRequest(request));
-
-		   	String ret = (String)dispatcher.sendMessage(message, sync ? GroupRequest.GET_ALL: GroupRequest.GET_NONE, castTimeout);
-		   				   	
-		   	if (sync && !"ok".equals(ret))
-		   	{			   				   	
-		   		throw new IllegalStateException("Invalid response received " + ret);		   				   				   			   		
-		   	}
-	   	}
-   	}
-   	finally
-   	{
-   		lock.readLock().release();
-   	}
-   }
-   
    public void multicastData(ClusterRequest request) throws Exception
    {
-   	lock.readLock().acquire();
-   	
-   	try
-   	{   	
-	   	if (started)
-	   	{   		
-		   	if (trace) { log.trace(this + " multicasting " + request + " to data channel"); }
-		
-		      byte[] bytes = writeRequest(request);
-		      
-		      dataChannel.send(new Message(null, null, bytes));
-	   	}
+   	if (startedState == STARTED)
+   	{   		
+	   	if (trace) { log.trace(this + " multicasting " + request + " to data channel"); }
+	
+	      byte[] bytes = writeRequest(request);
+	      
+	      dataChannel.send(new Message(null, null, bytes));
    	}
-   	finally
-   	{
-   		lock.readLock().release();
-   	}
    }
    
    public void unicastData(ClusterRequest request, Address address) throws Exception
    {
-   	lock.readLock().acquire();
-   	
-   	try
-   	{ 
-	   	if (started)
-	   	{
-		   	if (trace) { log.trace(this + " unicasting " + request + " to address " + address); }
-		
-		      byte[] bytes = writeRequest(request);
-		      
-		      dataChannel.send(new Message(address, null, bytes));
-	   	}
-   	}
-   	finally
+   	if (startedState == STARTED)
    	{
-   		lock.readLock().release();
+	   	if (trace) { log.trace(this + " unicasting " + request + " to address " + address); }
+	
+	      byte[] bytes = writeRequest(request);
+	      
+	      dataChannel.send(new Message(address, null, bytes));
    	}
    }
    
@@ -352,37 +281,46 @@
    	{
    		//We are not the first member of the group, so let's wait for state to be got and processed
    		
-   		synchronized (waitLock)
-      	{ 
-   			long timeRemaining = stateTimeout;
-   			
-   			long start = System.currentTimeMillis();
-   			
-      		while (!started && timeRemaining > 0)
-      		{
-      			waitLock.wait(stateTimeout);
-      			
-      			if (!started)
-      			{
-      				long waited = System.currentTimeMillis() - start;
-      				
-      				timeRemaining -= waited;
-      			}
-      		}
-      		
-      		if (!started)
-      		{
-      			throw new IllegalStateException("Timed out waiting for state to arrive");
-      		}
-      	}
+   		waitForStateChange(STARTED);
    		
    		retrievedState = true;
    	}
+   	else
+   	{
+   		this.startedState = STARTED;
+   	}
 
    	return retrievedState;
    }
    
+   private void waitForStateChange(int newState) throws Exception
+   {
+   	synchronized (waitLock)
+   	{ 
+			long timeRemaining = stateTimeout;
+			
+			long start = System.currentTimeMillis();
+			
+   		while (startedState != newState && timeRemaining > 0)
+   		{
+   			waitLock.wait(stateTimeout);
+   			
+   			if (startedState != newState)
+   			{
+   				long waited = System.currentTimeMillis() - start;
+   				
+   				timeRemaining -= waited;
+   			}
+   		}
+   		
+   		if (startedState != newState)
+   		{
+   			throw new IllegalStateException("Timed out waiting for state to arrive");
+   		}
+   	}
+   }
    
+   
    private ClusterRequest readRequest(byte[] bytes) throws Exception
    {
       ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
@@ -418,34 +356,23 @@
       public byte[] getState()
       {
          try
-         {
-	      	lock.readLock().acquire();
-	      		      
-	      	try
-	      	{	      	
-		      	if (!started)
-		      	{
-		      		//Ignore if received after stopped
-		      		
-		      		return null;
-		      	}
-	      		
-		         if (trace) { log.trace(this + ".ControlMessageListener got state"); }		         
-	
-		         byte[] state = groupListener.getState();
-		         
-		         return state;		        
-	      	}
-	      	finally
+         {     	
+	      	if (startedState != STARTED)
 	      	{
-	      		lock.readLock().release();
+	      		throw new IllegalStateException("Received control message but group member is not started: " + startedState);
 	      	}
+      		
+	         if (trace) { log.trace(this + ".ControlMessageListener got state"); }		         
+
+	         byte[] state = groupListener.getState();
+	         
+	         return state;		        
       	}
          catch (Exception e)
          {
          	log.error("Failed to get state", e);
          	
-         	return null;
+         	throw new IllegalStateException("Failed to get state");
          }
       }
 
@@ -457,6 +384,11 @@
       {
          synchronized (waitLock)
          {
+         	if (startedState != WAITING_FOR_STATE)
+         	{
+         		throw new IllegalStateException("Received state but started state is " + startedState);
+         	}
+         	
          	try
          	{
          		groupListener.setState(bytes);
@@ -466,7 +398,7 @@
          		log.error("Failed to set state", e);
          	}
          	
-         	started = true;
+         	startedState = STARTED;
          	
             waitLock.notify();
          }
@@ -490,19 +422,67 @@
 
       public void viewAccepted(View newView)
       {
+      	log.debug(this  + " got new view " + newView + ", old view is " + currentView);
+      	
+      	if (currentView == null)
+      	{
+      		//The first view is arriving
+      		
+      		if (startedState != WAITING_FOR_FIRST_VIEW)
+      		{
+      			throw new IllegalStateException("Got first view but started state is " + startedState);
+      		}
+      	}
+
+         // JGroups will make sure this method is never called by more than one thread concurrently
+
+         View oldView = currentView;
+         
+         currentView = newView;
+
          try
          {
-            // We queue up changes and execute them asynchronously.
-            // This is because JGroups will not let us do stuff like send synch messages using the
-            // same thread that delivered the view change and this is what we need to do in
-            // failover, for example.
+            // Act on membership change, on both cases when an old member left or a new member joined
 
-            viewExecutor.execute(new HandleViewAcceptedRunnable(newView));
+            if (oldView != null)
+            {
+               for (Iterator i = oldView.getMembers().iterator(); i.hasNext(); )
+               {
+                  Address address = (Address)i.next();
+                  if (!newView.containsMember(address))
+                  {
+                     // this is where the failover happens, if necessary
+                     groupListener.nodeLeft(address);
+                  }
+               }
+            }
+
+            for (Iterator i = newView.getMembers().iterator(); i.hasNext(); )
+            {
+               Address address = (Address)i.next();
+               if (oldView == null || !oldView.containsMember(address))
+               {
+                  groupListener.nodeJoined(address);
+               }
+            }
          }
-         catch (InterruptedException e)
+         catch (Throwable e)
          {
-            log.warn("Caught InterruptedException", e);
+            log.error("Caught Exception in MembershipListener", e);
+            IllegalStateException e2 = new IllegalStateException(e.getMessage());
+            e2.setStackTrace(e.getStackTrace());
+            throw e2;
          }
+         
+         if (startedState == WAITING_FOR_FIRST_VIEW)
+   		{
+         	synchronized (waitLock)
+         	{         	
+	   			startedState = WAITING_FOR_STATE;
+	   			
+	   			waitLock.notify();
+         	}
+   		}
       }
 
       public byte[] getState()
@@ -544,27 +524,16 @@
 
          try
          {
-         	lock.readLock().acquire();
-         	
-         	try
-         	{
-         		if (!started)
-         		{
-         			//Ignore messages received when not started
-         			
-         			return;
-         		}
-         		
-	            byte[] bytes = message.getBuffer();
-	
-	            ClusterRequest request = readRequest(bytes);
-	            
-	            request.execute(requestTarget);
-         	}
-         	finally
-         	{
-         		lock.readLock().release();
-         	}
+      		if (startedState != STARTED)
+      		{
+      			throw new IllegalStateException("Received data message but member is not started " + startedState);
+      		}
+      		
+            byte[] bytes = message.getBuffer();
+
+            ClusterRequest request = readRequest(bytes);
+            
+            request.execute(requestTarget);
          }
          catch (Throwable e)
          {
@@ -593,27 +562,16 @@
 
          try
          {
-         	lock.readLock().acquire();
-         	
-         	try
-         	{
-         		if (!started)
-         		{
-         			//Ignore messages received when stopped
-         			
-         			return null;
-         		}
-         		         		
-	            byte[] bytes = message.getBuffer();
-	
-	            ClusterRequest request = readRequest(bytes);
-	
-	            return request.execute(requestTarget);
-         	}
-         	finally
-         	{
-         		lock.readLock().release();
-         	}
+      		if (startedState != STARTED)
+      		{
+      			throw new IllegalStateException("Received control message but member is not started " + startedState);
+      		}
+      		         		
+            byte[] bytes = message.getBuffer();
+
+            ClusterRequest request = readRequest(bytes);
+
+            return request.execute(requestTarget);
          }
          catch (Throwable e)
          {
@@ -625,59 +583,5 @@
       }
    }
    
-   private class HandleViewAcceptedRunnable implements Runnable
-   {
-      private View newView;
 
-      HandleViewAcceptedRunnable(View newView)
-      {
-         this.newView = newView;
-      }
-
-      public void run()
-      {
-         log.debug(this  + " got new view " + newView + ", old view is " + currentView);
-
-         // JGroups will make sure this method is never called by more than one thread concurrently
-
-         View oldView = currentView;
-         
-         currentView = newView;
-
-         try
-         {
-            // Act on membership change, on both cases when an old member left or a new member joined
-
-            if (oldView != null)
-            {
-               for (Iterator i = oldView.getMembers().iterator(); i.hasNext(); )
-               {
-                  Address address = (Address)i.next();
-                  if (!newView.containsMember(address))
-                  {
-                     // this is where the failover happens, if necessary
-                     groupListener.nodeLeft(address);
-                  }
-               }
-            }
-
-            for (Iterator i = newView.getMembers().iterator(); i.hasNext(); )
-            {
-               Address address = (Address)i.next();
-               if (oldView == null || !oldView.containsMember(address))
-               {
-                  groupListener.nodeJoined(address);
-               }
-            }
-         }
-         catch (Throwable e)
-         {
-            log.error("Caught Exception in MembershipListener", e);
-            IllegalStateException e2 = new IllegalStateException(e.getMessage());
-            e2.setStackTrace(e.getStackTrace());
-            throw e2;
-         }
-      }
-   }
-
 }

Modified: trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2007-08-15 08:43:42 UTC (rev 3012)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2007-08-15 12:08:02 UTC (rev 3013)
@@ -613,7 +613,7 @@
    		
    		PostOfficeAddressInfo info = (PostOfficeAddressInfo)nodeIDAddressMap.get(new Integer(thisNodeID));
    		
-   		replyAddress = info.getControlChannelAddress();
+   		replyAddress = info.getDataChannelAddress();
    	}
    	
    	ClusterRequest request = new ReplicateDeliveryMessage(thisNodeID, queueName, sessionID, messageID, deliveryID, replyAddress);
@@ -621,11 +621,11 @@
    	if (trace) { log.trace(this + " sending replicate delivery message " + queueName + " " + sessionID + " " + messageID); }
 			   
    	//TODO could be optimised too
-	   Address address = getFailoverNodeForControlChannelAddress();
+	   Address address = getFailoverNodeDataChannelAddress();
 	   	
 	   if (address != null)
 	   {	   
-	   	groupMember.unicastControl(request, address, false);
+	   	groupMember.unicastData(request, address);
 	   }
    }
 
@@ -636,11 +636,11 @@
 	
 	   ClusterRequest request = new ReplicateAckMessage(thisNodeID, queueName, messageID);		   
    	
-	   Address address = getFailoverNodeForControlChannelAddress();
+	   Address address = getFailoverNodeDataChannelAddress();
 	   	
 	   if (address != null)
 	   {	   
-	   	groupMember.unicastControl(request, address, false);
+	   	groupMember.unicastData(request, address);
 	   }
 	}
 	
@@ -1205,7 +1205,7 @@
 		   		{
 		   			try
 		   			{
-		   				groupMember.unicastControl(request, replyAddress, false);
+		   				groupMember.unicastData(request, replyAddress);
 		   			}
 		   			catch (Exception e)
 		   			{
@@ -1604,19 +1604,19 @@
 						
 						PostOfficeAddressInfo info = (PostOfficeAddressInfo)nodeIDAddressMap.get(new Integer(thisNodeID));
 			   		
-			   		Address replyAddress = info.getControlChannelAddress();
+			   		Address replyAddress = info.getDataChannelAddress();
 						
 						ClusterRequest request = new GetReplicatedDeliveriesRequest(queue.getName(), replyAddress);
 						
 					   info = (PostOfficeAddressInfo)nodeIDAddressMap.get(masterNodeID);
 					   			   
-					   Address address = info.getControlChannelAddress();
+					   Address address = info.getDataChannelAddress();
 					   
 					   log.info("Sending the message to node " + masterNodeID + " with address " + address);
 				   	   				   	
 					   if (address != null)
 					   {	   
-					   	groupMember.unicastControl(request, address, false);
+					   	groupMember.unicastData(request, address);
 					   }
 					}
 				}
@@ -1651,7 +1651,7 @@
    	return nodeID;
    }
    
-   private Address getFailoverNodeForControlChannelAddress()
+   private Address getFailoverNodeDataChannelAddress()
    {
    	PostOfficeAddressInfo info = (PostOfficeAddressInfo)nodeIDAddressMap.get(new Integer(failoverNodeID));
    	
@@ -1660,7 +1660,7 @@
    		return null;
    	}
    	
-   	Address address = info.getControlChannelAddress();
+   	Address address = info.getDataChannelAddress();
    	
    	return address;
    }
@@ -2763,7 +2763,7 @@
 	   		
 	   		ClusterRequest request = new AckAllReplicatedDeliveriesMessage(thisNodeID);
 	   		
-	   		groupMember.unicastControl(request, info.getControlChannelAddress(), false);
+	   		groupMember.unicastData(request, info.getDataChannelAddress());
 	   		
 	   		if (trace) { log.trace("Sent AckAllReplicatedDeliveriesMessage"); }
 	   	}
@@ -2813,7 +2813,7 @@
 						
 						ClusterRequest request = new AddAllReplicatedDeliveriesMessage(thisNodeID, deliveries);
 						
-						groupMember.unicastControl(request, info.getControlChannelAddress(), false);
+						groupMember.unicastData(request, info.getDataChannelAddress());
 			   		
 			   		if (trace) { log.trace("Sent AddAllReplicatedDeliveriesMessage"); }
 					}
@@ -3008,7 +3008,7 @@
       			{
    	   			ClusterRequest req = new AddAllReplicatedDeliveriesMessage(thisNodeID, dels);
    	   			
-   	   			groupMember.unicastControl(req, address, false);
+   	   			groupMember.unicastData(req, address);
       			}
       			   			
       		}




More information about the jboss-cvs-commits mailing list