[jboss-cvs] JBoss Messaging SVN: r2879 - in trunk: tests/src/org/jboss/test/messaging/jms/clustering and 1 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Jul 11 20:09:22 EDT 2007


Author: timfox
Date: 2007-07-11 20:09:22 -0400 (Wed, 11 Jul 2007)
New Revision: 2879

Modified:
   trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java
   trunk/tests/src/org/jboss/test/messaging/jms/clustering/MultipleFailoverTest.java
   trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java
Log:
Fixed MultipleFailoverTest


Modified: trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java	2007-07-11 21:38:09 UTC (rev 2878)
+++ trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java	2007-07-12 00:09:22 UTC (rev 2879)
@@ -30,18 +30,19 @@
 import javax.jms.MessageListener;
 import javax.jms.Session;
 
+import org.jboss.jms.delegate.Cancel;
 import org.jboss.jms.delegate.ConsumerDelegate;
+import org.jboss.jms.delegate.DefaultCancel;
+import org.jboss.jms.delegate.DeliveryInfo;
 import org.jboss.jms.delegate.SessionDelegate;
 import org.jboss.jms.message.MessageProxy;
-import org.jboss.jms.delegate.Cancel;
-import org.jboss.jms.delegate.DefaultCancel;
-import org.jboss.jms.delegate.DeliveryInfo;
 import org.jboss.logging.Logger;
 import org.jboss.messaging.core.contract.Message;
 import org.jboss.messaging.util.Future;
 import org.jboss.messaging.util.prioritylinkedlist.BasicPriorityLinkedList;
 import org.jboss.messaging.util.prioritylinkedlist.PriorityLinkedList;
 
+import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
 import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
 
 /**
@@ -176,7 +177,11 @@
       // or add anything to the tx for this session
       if (!isConnectionConsumer)
       {
+      	if (trace) { log.trace("Calling postDeliver"); }
+      	
          sess.postDeliver();
+         
+         if (trace) { log.trace("Called postDeliver"); }
       }   
    }
    
@@ -245,7 +250,7 @@
    }
         
    // Public ---------------------------------------------------------------------------------------
-
+  
    /**
     * Handles a message sent from the server.
     *
@@ -737,6 +742,9 @@
    {
       boolean notified = false;
       
+      if (trace) { log.trace("Receiver thread:" + receiverThread + " listener:" + listener + " listenerRunning:" + listenerRunning + 
+      		" sessionExecutor:" + sessionExecutor); }
+      
       // If we have a thread waiting on receive() we notify it
       if (receiverThread != null)
       {
@@ -754,6 +762,7 @@
             listenerRunning = true;
 
             if (trace) { log.trace(this + " scheduled a new ListenerRunner"); }
+            
             this.queueRunner(new ListenerRunner());
          }     
          
@@ -763,6 +772,8 @@
       // Make sure we notify any thread waiting for last delivery
       if (waitingForLastDelivery && !notified)
       {
+      	if (trace) { log.trace("Notifying"); }
+      	
          mainLock.notifyAll();
       }
    }
@@ -895,8 +906,54 @@
 
             mp = (MessageProxy)buffer.removeFirst();
                           
-            if (!buffer.isEmpty())
+//            if (!buffer.isEmpty())
+//            {
+//            	//Queue up the next runner to run
+//            	
+//            	if (trace) { log.trace("More messages in buffer so queueing next onMessage to run"); }
+//            	
+//            	queueRunner(this);
+//            	
+//            	if (trace) { log.trace("Queued next onMessage to run"); }
+//            }
+//            else
+//            {
+//            	if (trace) { log.trace("no more messages in buffer, marking listener as not running"); }
+//            	
+//            	listenerRunning  = false;
+//            }               
+         }
+         
+         /*
+          * Bug here is as follows:
+          * The next runner gets scheduled BEFORE the on message is executed
+          * so if the onmessage fails on acking it will be put on hold
+          * and failover will kick in, this will clear the executor
+          * so the next queud one disappears at everything grinds to a halt
+          * 
+          * Solution - don't use a session executor - have a sesion thread instead much nicer
+          */
+         
+                        
+         if (mp != null)
+         {
+            try
             {
+               callOnMessage(sessionDelegate, theListener, consumerID, queueName,
+                             false, mp, ackMode, maxDeliveries, null, shouldAck);
+               
+               if (trace) { log.trace("Called callonMessage"); }
+            }
+            catch (Throwable t)
+            {
+               log.error("Failed to deliver message", t);
+            } 
+         }
+         
+         synchronized (mainLock)
+         {
+         	if (!buffer.isEmpty())
+            {
             	//Queue up the next runner to run
             	
             	if (trace) { log.trace("More messages in buffer so queueing next onMessage to run"); }
@@ -910,26 +967,15 @@
             	if (trace) { log.trace("no more messages in buffer, marking listener as not running"); }
             	
             	listenerRunning  = false;
-            }               
+            }   
          }
-                        
-         if (mp != null)
-         {
-            try
-            {
-               callOnMessage(sessionDelegate, theListener, consumerID, queueName,
-                             false, mp, ackMode, maxDeliveries, null, shouldAck);
-            }
-            catch (JMSException e)
-            {
-               log.error("Failed to deliver message", e);
-            } 
-         }
                   
          if (handleFlowControl)
          {
          	checkStart();                                                   
          }
+         
+         if (trace) { log.trace("Exiting run()"); }
       }
    }   
 }

Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/MultipleFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/MultipleFailoverTest.java	2007-07-11 21:38:09 UTC (rev 2878)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/MultipleFailoverTest.java	2007-07-12 00:09:22 UTC (rev 2879)
@@ -34,6 +34,7 @@
 import javax.jms.Session;
 import javax.jms.TextMessage;
 
+import org.jboss.jms.client.JBossConnection;
 import org.jboss.test.messaging.tools.ServerManagement;
 
 /**
@@ -197,17 +198,17 @@
    
    public void testFailoverFloodTwoServers() throws Exception
    {
-      Connection conn = null;
+      JBossConnection conn = null;
 
       try
       {
-         conn = this.createConnectionOnServer(cf, 1);
+         conn = (JBossConnection)this.createConnectionOnServer(cf, 1);
 
          Session sessSend = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
          Session sessCons = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
-         MessageConsumer cons = sessCons.createConsumer(queue[0]);
+         MessageConsumer cons = sessCons.createConsumer(queue[1]);
 
          MyListener list = new MyListener();
 
@@ -215,7 +216,7 @@
 
          conn.start();
 
-         MessageProducer prod = sessSend.createProducer(queue[0]);
+         MessageProducer prod = sessSend.createProducer(queue[1]);
 
          prod.setDeliveryMode(DeliveryMode.PERSISTENT);
 
@@ -236,13 +237,19 @@
             
             if (count % 100 == 0)
             {
-               log.info("sent " + count);
+               log.info("sent " + count + " server id " + conn.getServerID());
             }
-
+            
             count++;
+            
+            Thread.sleep(5);
          }
-              
+         
+         log.info("done send");
+         
+         log.info("Waiting to join thread");
          t.join(5 * 60 * 60 * 1000);
+         log.info("joined");
          
          if (killer.failed)
          {
@@ -252,13 +259,14 @@
          //We check that we received all the message
          //we allow for duplicates, see http://jira.jboss.org/jira/browse/JBMESSAGING-604
          
+         if (!list.waitFor(count - 1))
+         {
+         	fail("Timed out waiting for message");
+         }
+         
          conn.close();
          conn = null;
          
-         if (!list.waitFor(count))
-         {
-         	fail("Timed out waiting for message");
-         }
                   
          count = 0;
          Iterator iter = list.msgs.iterator();
@@ -337,10 +345,11 @@
       {
          try
          {                                     
-            Thread.sleep(10000);
+            Thread.sleep(5000);
                
             log.info("Killing server 1");
             ServerManagement.kill(1);
+            log.info("Killed server 1");
             
             Thread.sleep(5000);
             
@@ -392,10 +401,13 @@
             ServerManagement.start(1, "all", false);
             ServerManagement.deployQueue("testDistributedQueue", 1);
             
+            Thread.sleep(10000);
+            
             log.info("killer DONE");
          }
          catch (Exception e)
          {               
+         	log.error("Killer failed", e);
             failed = true;
          }
          
@@ -406,8 +418,6 @@
    
    class MyListener implements MessageListener
    {
-      int count = 0;
-          
       volatile boolean failed;
       
       Set msgs = new TreeSet();
@@ -418,40 +428,41 @@
       
       boolean waitFor(int i)
       {
+      	log.info("Waiting for message " + i);
       	synchronized (obj)
       	{
+      		log.info("here");
       		long toWait = 30000;
       		while (maxcnt < i && toWait > 0)
       		{
       			long start = System.currentTimeMillis();
       			try
-      			{      				
-      				obj.wait(30000);
+      			{      		
+      				obj.wait(60000);
       			}
       			catch (InterruptedException e)
       			{}
-      			if (i <= maxcnt)
+      			if (maxcnt < i)
       			{
       				toWait -= System.currentTimeMillis() - start;
       			}            			      
       		}
-      		return maxcnt < i;
+      		return maxcnt == i;
       	}
+      	
       }
    
       public void onMessage(Message msg)
       {
          try
          {
-            TextMessage tm = (TextMessage)msg;
-            
-            if (count % 100 == 0)
+            int cnt = msg.getIntProperty("cnt");
+                        
+            if (cnt % 100 == 0)
             {
-               log.info("Received message " + tm.getText() + " (" + tm + ")");
+               log.info(this + " Received message " + cnt);
             }
             
-            count++;
-            
             /*
             
             IMPORTANT NOTE 
@@ -474,16 +485,15 @@
             
             Therefore we only count that the total messages were received
             */      
-            
-            int cnt = msg.getIntProperty("cnt");
-            
+              
             msgs.add(new Integer(cnt));
             
             maxcnt = Math.max(maxcnt, cnt);
+            
             synchronized (obj)
             {
             	obj.notify();
-            }                        
+            }         
          }
          catch (Exception e)
          {

Modified: trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java	2007-07-11 21:38:09 UTC (rev 2878)
+++ trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java	2007-07-12 00:09:22 UTC (rev 2879)
@@ -145,7 +145,7 @@
          }
          else
          {
-            Server s = acquireRemote(2, i, true);
+            Server s = acquireRemote(10, i, true);
 
             if (s != null)
             {
@@ -543,7 +543,7 @@
 
       log.info("spawned server " + i + ", waiting for it to come online");
 
-      while(System.currentTimeMillis() - startTime < maxWaitTime * 1000)
+      while (System.currentTimeMillis() - startTime < maxWaitTime * 1000)
       {
          s = acquireRemote(1, i, true);
          if (s != null)




More information about the jboss-cvs-commits mailing list