[jboss-cvs] JBoss Messaging SVN: r3052 - in trunk: src/main/org/jboss/jms/server and 4 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Aug 24 08:35:34 EDT 2007


Author: timfox
Date: 2007-08-24 08:35:34 -0400 (Fri, 24 Aug 2007)
New Revision: 3052

Modified:
   trunk/src/etc/server/default/deploy/mysql-persistence-service.xml
   trunk/src/main/org/jboss/jms/server/ServerPeer.java
   trunk/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java
   trunk/tests/build.xml
   trunk/tests/src/org/jboss/test/messaging/core/PostOfficeTestBase.java
   trunk/tests/src/org/jboss/test/messaging/core/postoffice/ClusteredPostOfficeTest.java
Log:
Some logging changes and small fixes


Modified: trunk/src/etc/server/default/deploy/mysql-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/mysql-persistence-service.xml	2007-08-24 06:54:33 UTC (rev 3051)
+++ trunk/src/etc/server/default/deploy/mysql-persistence-service.xml	2007-08-24 12:35:34 UTC (rev 3052)
@@ -144,8 +144,8 @@
       <attribute name="ChannelPartitionName">${jboss.partition.name:DefaultPartition}-JMS</attribute>
       -->
       
-      <!-- JGroups stack configuration for the data channel - used for sending data across the cluster -->               
-      
+      <!-- JGroups stack configuration for the data channel - used for sending data across the cluster --> 
+                        
       <attribute name="DataChannelConfig">
          <config>
             <UDP
@@ -189,12 +189,13 @@
                         view_ack_collection_timeout="5000"/>
             <FC max_credits="2000000" down_thread="false" up_thread="false"
                 min_threshold="0.10"/>
-            <FRAG2 frag_size="60000" down_thread="false" up_thread="false"/>   
+            <FRAG2 frag_size="60000" down_thread="false" up_thread="false"/>         
+            
          </config>
       </attribute>
       
       <!-- JGroups stack configuration to use for the control channel - used for control messages -->         
-       
+              
       <attribute name="ControlChannelConfig">
          <config>
             <UDP
@@ -239,7 +240,8 @@
             <pbcast.STATE_TRANSFER down_thread="false" up_thread="false" use_flush="true" flush_timeout="3000"/>
             <pbcast.FLUSH down_thread="false" up_thread="false" timeout="20000" auto_flush_conf="false"/>
         </config>
-      </attribute>
+     </attribute>	    
+      
    </mbean>
    
    <!-- Messaging JMS User Manager MBean config

Modified: trunk/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/ServerPeer.java	2007-08-24 06:54:33 UTC (rev 3051)
+++ trunk/src/main/org/jboss/jms/server/ServerPeer.java	2007-08-24 12:35:34 UTC (rev 3052)
@@ -1509,16 +1509,10 @@
             
       //Unbind the destination's queues
       
-      log.info("Destroying destination " + name);
-      
-      log.info("Got queues " + queues.size());
-
       while (iter.hasNext())            
       {
          Queue queue = (Queue)iter.next();
          
-         log.info("Queue is " + queue);
-         
          queue.removeAllReferences();
          
          //Durable subs need to be removed on all nodes

Modified: trunk/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java	2007-08-24 06:54:33 UTC (rev 3051)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java	2007-08-24 12:35:34 UTC (rev 3052)
@@ -44,6 +44,10 @@
 import org.jgroups.util.Rsp;
 import org.jgroups.util.RspList;
 
+import EDU.oswego.cs.dl.util.concurrent.Executor;
+import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
+import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
+
 /**
  * 
  * This class handles the interface with JGroups
@@ -92,6 +96,12 @@
    
    private volatile int startedState;
    
+   private volatile Thread viewThread;
+   
+   //We need to process view changes on a different thread, since if we have more than one node running
+   //in the same VM then the thread that sends the leave message ends up executing the view change on the other node
+   //We probably don't need this if all nodes are in different VMs
+
    public GroupMember(String groupName, long stateTimeout, long castTimeout,
    		             JChannelFactory jChannelFactory, RequestTarget requestTarget,
    		             GroupListener groupListener)
@@ -116,7 +126,7 @@
       this.dataChannel = jChannelFactory.createDataChannel();
       
       this.startedState = STOPPED;
-
+      
       // We don't want to receive local messages on any of the channels
       controlChannel.setOpt(Channel.LOCAL, Boolean.FALSE);
 
@@ -172,9 +182,14 @@
    	
       dataChannel.connect(groupName);
    }
-      
+   
    public void stop() throws Exception
    {	
+   	if (startedState == STOPPED)
+   	{
+   		throw new IllegalStateException("Is already stopped");
+   	}
+   	
    	try
    	{
    		dataChannel.close();
@@ -195,11 +210,9 @@
    	
    	controlChannel = null;
    	
-   	dataChannel = null;
+   	dataChannel = null;   	   	
    	
-   	currentView = null;
-   	
-   	startedState = STOPPED;
+   	currentView = null;   	
    }
    
    public Address getSyncAddress()
@@ -317,7 +330,7 @@
    		
    		if (startedState != newState)
    		{
-   			throw new IllegalStateException("Timed out waiting for state to arrive");
+   			throw new IllegalStateException("Timed out waiting for state to change");
    		}
    	}
    }
@@ -406,7 +419,7 @@
          }
       }
    }
-
+   
    /*
     * We use this class so we notice when members leave the group
     */
@@ -422,10 +435,10 @@
          // NOOP
       }
 
-      public void viewAccepted(View newView)
-      {
+      public void viewAccepted(final View newView)
+      {     	
       	log.debug(this  + " got new view " + newView + ", old view is " + currentView);
-      	
+		      
       	if (currentView == null)
       	{
       		//The first view is arriving
@@ -435,61 +448,80 @@
       			throw new IllegalStateException("Got first view but started state is " + startedState);
       		}
       	}
-
-         // JGroups will make sure this method is never called by more than one thread concurrently
-
-         View oldView = currentView;
-         
-         currentView = newView;
-
-         try
-         {
-            // Act on membership change, on both cases when an old member left or a new member joined
-
-            if (oldView != null)
-            {
-            	List leftNodes = new ArrayList();
-               for (Iterator i = oldView.getMembers().iterator(); i.hasNext(); )
-               {
-                  Address address = (Address)i.next();
-                  if (!newView.containsMember(address))
-                  {
-                  	leftNodes.add(address);
-                  }
-               }
-               if (!leftNodes.isEmpty())
-               {
-               	groupListener.nodesLeft(leftNodes);
-               }
-            }
-
-            for (Iterator i = newView.getMembers().iterator(); i.hasNext(); )
-            {
-               Address address = (Address)i.next();
-               if (oldView == null || !oldView.containsMember(address))
-               {
-                  groupListener.nodeJoined(address);
-               }
-            }
-         }
-         catch (Throwable e)
-         {
-            log.error("Caught Exception in MembershipListener", e);
-            IllegalStateException e2 = new IllegalStateException(e.getMessage());
-            e2.setStackTrace(e.getStackTrace());
-            throw e2;
-         }
-         
-         if (startedState == WAITING_FOR_FIRST_VIEW)
-   		{
-         	synchronized (waitLock)
-         	{         	
-	   			startedState = WAITING_FOR_STATE;
-	   			
-	   			waitLock.notify();
-         	}
-   		}
+      	else
+      	{
+      		if (startedState != STARTED)
+      		{
+      			return;
+      		}
+      	}
+      	
+      	class ViewChangeRunnable implements Runnable
+      	{	
+      		public void run()
+      		{      		
+   	         // JGroups will make sure this method is never called by more than one thread concurrently
+   	
+   	         View oldView = currentView;
+   	         
+   	         currentView = newView;
+   	
+   	         try
+   	         {
+   	            // Act on membership change, on both cases when an old member left or a new member joined
+   	
+   	            if (oldView != null)
+   	            {
+   	            	List leftNodes = new ArrayList();
+   	               for (Iterator i = oldView.getMembers().iterator(); i.hasNext(); )
+   	               {
+   	                  Address address = (Address)i.next();
+   	                  if (!newView.containsMember(address))
+   	                  {
+   	                  	leftNodes.add(address);
+   	                  }
+   	               }
+   	               if (!leftNodes.isEmpty())
+   	               {
+   	               	groupListener.nodesLeft(leftNodes);
+   	               }
+   	            }
+   	
+   	            for (Iterator i = newView.getMembers().iterator(); i.hasNext(); )
+   	            {
+   	               Address address = (Address)i.next();
+   	               if (oldView == null || !oldView.containsMember(address))
+   	               {
+   	                  groupListener.nodeJoined(address);
+   	               }
+   	            }
+   	         }
+   	         catch (Throwable e)
+   	         {
+   	            log.error("Caught Exception in MembershipListener", e);
+   	            IllegalStateException e2 = new IllegalStateException(e.getMessage());
+   	            e2.setStackTrace(e.getStackTrace());
+   	            throw e2;
+   	         }
+   	         
+   	         if (startedState == WAITING_FOR_FIRST_VIEW)
+   	   		{
+   	         	synchronized (waitLock)
+   	         	{         	
+   		   			startedState = WAITING_FOR_STATE;
+   		   			
+   		   			waitLock.notify();
+   	         	}
+   	   		}
+      		}
+      	}
+      	
+      	//Needs to be executed on different thread to avoid deadlock when running invm
+      	viewThread = new Thread(new ViewChangeRunnable());
+	      	
+	      viewThread.start();      	
       }
+      	
 
       public byte[] getState()
       {
@@ -498,6 +530,11 @@
       }
    }
    
+      
+      
+   	
+   	
+      
    /*
     * This class is used to listen for messages on the async channel
     */

Modified: trunk/tests/build.xml
===================================================================
--- trunk/tests/build.xml	2007-08-24 06:54:33 UTC (rev 3051)
+++ trunk/tests/build.xml	2007-08-24 12:35:34 UTC (rev 3052)
@@ -343,7 +343,6 @@
 	       <include name="**/messaging/core/**/${test-mask}.class"/> 
 	       <include name="**/jms/**/${test-mask}.class"/>
                <include name="**/messaging/util/**/${test-mask}.class"/>
-
                <exclude name="**/jms/MemLeakTest.class"/>
                <exclude name="**/jms/RemotingConnectionConfigurationTest.class"/>
                <exclude name="**/jms/XAResourceRecoveryTest.class"/>

Modified: trunk/tests/src/org/jboss/test/messaging/core/PostOfficeTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/PostOfficeTestBase.java	2007-08-24 06:54:33 UTC (rev 3051)
+++ trunk/tests/src/org/jboss/test/messaging/core/PostOfficeTestBase.java	2007-08-24 12:35:34 UTC (rev 3052)
@@ -267,17 +267,20 @@
 
    protected void tearDown() throws Exception
    {
+   	Thread.sleep(2000);
+   	
       pm.reapUnreferencedMessages();
       
       if (this.checkNoMessageData())
       {
       	fail("Message data still exists");
-      }
+      }      
       
       if (this.checkNoBindingData())
       {
       	fail("Binding data still exists");
       }
+      
       sc.stop();
       sc = null;
       

Modified: trunk/tests/src/org/jboss/test/messaging/core/postoffice/ClusteredPostOfficeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/postoffice/ClusteredPostOfficeTest.java	2007-08-24 06:54:33 UTC (rev 3051)
+++ trunk/tests/src/org/jboss/test/messaging/core/postoffice/ClusteredPostOfficeTest.java	2007-08-24 12:35:34 UTC (rev 3052)
@@ -410,6 +410,8 @@
          // Stop office 2
          office2.stop();
          
+         Thread.sleep(1000);
+         
          queues = office3.getQueuesForCondition(condition1, false);
          assertNotNull(queues);
          assertEquals(1, queues.size());
@@ -468,8 +470,9 @@
          
          //Unbind it
          
+         log.info("Removing queue6 binding");
          removed = office1.removeBinding(queue6.getName(), false);
-         assertNotNull(removed);
+         assertNotNull(removed);         
          
          queues = office1.getQueuesForCondition(condition1, false);
          assertNotNull(queues);
@@ -558,7 +561,10 @@
          assertNotNull(queues);
          assertEquals(2, queues.size());
          assertTrue(queues.contains(queue8));
-         assertTrue(queues.contains(queue9));                   
+         assertTrue(queues.contains(queue9));        
+         
+         log.info("at end");    
+         //Thread.sleep(10000000);
       }
       catch (Throwable e)
       {
@@ -569,23 +575,37 @@
       {
          if (office1 != null)
          {
-            office1.stop();
+         	try
+         	{
+         		office1.stop();
+         	}
+         	catch (Exception ignore)
+         	{         		
+         	}
          }
          
          if (office2 != null)
          {
-            office2.stop();
+         	try
+         	{
+         		office2.stop();
+         	}
+         	catch (Exception ignore)
+         	{         		
+         	}
          }
          
          if (office3 != null)
          {
-            office3.stop();
+         	try
+         	{
+         		office3.stop();
+         	}
+         	catch (Exception ignore)
+         	{         		
+         	}
          }
          
-         if (checkNoBindingData())
-         {
-            fail("data still in database");
-         }
       }
    }
    
@@ -712,12 +732,7 @@
          assertTrue(bindings.isEmpty());
          
          bindings = office3.getAllBindings();         
-         assertTrue(bindings.isEmpty());
-         
-         if (checkNoBindingData())
-         {
-            fail("data still in database");
-         }                                  
+         assertTrue(bindings.isEmpty());                                        
       }
       finally
       {
@@ -827,12 +842,7 @@
          assertTrue(bindings.isEmpty());
          
          bindings = office3.getAllBindings();         
-         assertTrue(bindings.isEmpty());
-                  
-         if (checkNoBindingData())
-         {
-            fail("data still in database");
-         }                                  
+         assertTrue(bindings.isEmpty());                                              
       }
       finally
       {
@@ -928,12 +938,7 @@
          assertTrue(bindings.isEmpty());
          
          bindings = office3.getAllBindings();         
-         assertTrue(bindings.isEmpty());
-         
-         if (checkNoBindingData())
-         {
-            fail("data still in database");
-         }                                  
+         assertTrue(bindings.isEmpty());                                    
       }
       finally
       {
@@ -1034,13 +1039,7 @@
          assertTrue(bindings.isEmpty());
          
          bindings = office3.getAllBindings();         
-         assertTrue(bindings.isEmpty());
-         
-         
-         if (checkNoBindingData())
-         {
-            fail("data still in database");
-         }                                  
+         assertTrue(bindings.isEmpty());                  
       }
       finally
       {
@@ -1149,12 +1148,7 @@
          assertTrue(bindings.isEmpty());
          
          bindings = office3.getAllBindings();         
-         assertTrue(bindings.isEmpty());
-         
-         if (checkNoBindingData())
-         {
-            fail("data still in database");
-         }                                  
+         assertTrue(bindings.isEmpty());                       
       }
       finally
       {
@@ -1290,12 +1284,7 @@
          assertGotAll(2, bindings, queue1.getName());
          
          bindings = office3.getAllBindings();         
-         assertGotAll(3, bindings, queue1.getName());         
-                  
-         if (checkNoBindingData())
-         {
-            fail("data still in database");
-         }                                  
+         assertGotAll(3, bindings, queue1.getName());                                                         
       }
       finally
       {
@@ -1518,12 +1507,7 @@
    		receiver4.acknowledge(msgRec3, null);
    		msgs = queue4.browse(null);
    		assertNotNull(msgs);
-   		assertTrue(msgs.isEmpty()); 
-   		
-   		if (checkNoMessageData())
-   		{
-   			fail("Message data still in database");
-   		}
+   		assertTrue(msgs.isEmpty());    		
    	}
    	finally
    	{
@@ -1920,13 +1904,7 @@
             msgs = queues[i].browse(null);
             assertNotNull(msgs);
             assertTrue(msgs.isEmpty());
-         }
-         
-        
-         if (checkNoMessageData())
-         {
-            fail("Message data still in database");
-         }
+         }         
       }
       finally
       {
@@ -2056,11 +2034,6 @@
    		msgs = queue3.browse(null);
    		assertNotNull(msgs);
    		assertTrue(msgs.isEmpty()); 
-
-   		if (checkNoMessageData())
-   		{
-   			fail("Message data still in database");
-   		}
    	}
    	finally
    	{
@@ -2210,11 +2183,6 @@
    		msgs = queue4.browse(null);
    		assertNotNull(msgs);
    		assertTrue(msgs.isEmpty()); 
-   		
-   		if (checkNoMessageData())
-   		{
-   			fail("Message data still in database");
-   		}
    	}
    	finally
    	{
@@ -2410,11 +2378,6 @@
    			}
 
    		}
-
-   		if (checkNoMessageData())
-   		{
-   			fail("Message data still in database");
-   		}
    	}
    	finally
    	{
@@ -2609,11 +2572,6 @@
    			}
 
    		}
-
-   		if (checkNoMessageData())
-   		{
-   			fail("Message data still in database");
-   		}
    	}
    	finally
    	{
@@ -2786,13 +2744,6 @@
    		checkNotGetsMessage(queue0, receiver0);
 
    		checkNotGetsMessage(queue2, receiver2);
-
-
-
-   		if (checkNoMessageData())
-   		{
-   			fail("Message data still in database");
-   		}
    	}
    	finally
    	{
@@ -2936,13 +2887,6 @@
    		checkNotGetsMessage(queue0, receiver0);
 
    		checkNotGetsMessage(queue2, receiver2);
-
-
-
-   		if (checkNoMessageData())
-   		{
-   			fail("Message data still in database");
-   		}
    	}
    	finally
    	{




More information about the jboss-cvs-commits mailing list