[jboss-cvs] JBoss Messaging SVN: r2809 - in trunk: src/main/org/jboss/jms/server/endpoint and 2 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Jun 27 18:54:43 EDT 2007


Author: timfox
Date: 2007-06-27 18:54:42 -0400 (Wed, 27 Jun 2007)
New Revision: 2809

Modified:
   trunk/src/main/org/jboss/jms/client/remoting/ConnectionFactoryCallbackHandler.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
   trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java
   trunk/tests/src/org/jboss/test/messaging/jms/clustering/MergeQueueTest.java
Log:
a few fixes


Modified: trunk/src/main/org/jboss/jms/client/remoting/ConnectionFactoryCallbackHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/ConnectionFactoryCallbackHandler.java	2007-06-27 20:23:20 UTC (rev 2808)
+++ trunk/src/main/org/jboss/jms/client/remoting/ConnectionFactoryCallbackHandler.java	2007-06-27 22:54:42 UTC (rev 2809)
@@ -65,16 +65,21 @@
 
       ConnectionFactoryUpdate viewChange = (ConnectionFactoryUpdate)message;
 
-      Object d = getState().getClusteredConnectionFactoryDelegate();
+      ConnectionState state = getState();
+            
+      if (state != null)
+      {      
+      	Object d = state.getClusteredConnectionFactoryDelegate();
+      	
+      	if (d instanceof ClientClusteredConnectionFactoryDelegate)
+         {
+            ClientClusteredConnectionFactoryDelegate clusteredDelegate =
+               (ClientClusteredConnectionFactoryDelegate)d;
 
-      if (d instanceof ClientClusteredConnectionFactoryDelegate)
-      {
-         ClientClusteredConnectionFactoryDelegate clusteredDelegate =
-            (ClientClusteredConnectionFactoryDelegate)d;
-
-         clusteredDelegate.updateFailoverInfo(viewChange.getDelegates(),
-                                              viewChange.getFailoverMap());
-      }
+            clusteredDelegate.updateFailoverInfo(viewChange.getDelegates(),
+                                                 viewChange.getFailoverMap());
+         }
+      }      
    }
 
    public String toString()

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2007-06-27 20:23:20 UTC (rev 2808)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2007-06-27 22:54:42 UTC (rev 2809)
@@ -24,6 +24,7 @@
 import javax.jms.IllegalStateException;
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
+import javax.jms.TextMessage;
 
 import org.jboss.jms.delegate.ConsumerEndpoint;
 import org.jboss.jms.destination.JBossDestination;
@@ -49,7 +50,6 @@
 import org.jboss.messaging.util.ExceptionUtil;
 import org.jboss.remoting.Client;
 import org.jboss.remoting.callback.Callback;
-import org.jboss.remoting.callback.HandleCallbackException;
 import org.jboss.remoting.callback.ServerInvokerCallbackHandler;
 
 /**
@@ -240,6 +240,17 @@
          if (trace) { log.trace(this + " has startStopLock lock, preparing the message for delivery"); }
 
          Message message = ref.getMessage();
+         
+         TextMessage tm = (TextMessage)message;
+         
+         try
+         {
+         	log.info("TRYING TO DELIVER " + tm.getText());
+         }
+         catch (Exception e)
+         {
+         	
+         }
 
          boolean selectorRejected = !this.accept(message);
          
@@ -317,18 +328,24 @@
                this.lastDeliveryID = deliveryId;
             }
          }
-         catch (HandleCallbackException e)
+         catch (Throwable t)
          {
             // it's an oneway callback, so exception could only have happened on the server, while
             // trying to send the callback. This is a good reason to smack the whole connection.
             // I trust remoting to have already done its own cleanup via a CallbackErrorHandler,
             // I need to do my own cleanup at ConnectionManager level.
 
-            log.debug(this + " failed to handle callback", e);
+            log.debug(this + " failed to handle callback", t);
             
-            //We stop the consumer - some time later the lease will expire and the connection will be closed                       
+            //We stop the consumer - some time later the lease will expire and the connection will be closed        
+            //which will remove the consumer
+            
+            started = false;
 
-            return null;
+            //** IMPORTANT NOTE! We must return the delivery NOT null. **
+            //This is because if we return NULL then message will remain in the queue, but later
+            //the connection checker will cleanup and close this consumer which will cancel all the deliveries in it
+            //including this one, so the message will go back on the queue twice!
          }
 
          return delivery;

Modified: trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java	2007-06-27 20:23:20 UTC (rev 2808)
+++ trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java	2007-06-27 22:54:42 UTC (rev 2809)
@@ -155,7 +155,7 @@
 	 * 
 	 */
 	public synchronized void notify(ClusterNotification notification)
-	{
+	{	
 		if (replicator == null)
 		{
 			//Non clustered

Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/MergeQueueTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/MergeQueueTest.java	2007-06-27 20:23:20 UTC (rev 2808)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/MergeQueueTest.java	2007-06-27 22:54:42 UTC (rev 2809)
@@ -6,6 +6,9 @@
  */
 package org.jboss.test.messaging.jms.clustering;
 
+import java.util.HashSet;
+import java.util.Set;
+
 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
 import javax.jms.MessageConsumer;
@@ -201,7 +204,13 @@
          log.info("Sent next 15 on node 1");
 
          // creates another consumer... before killing the server
+         
+         // This will actually end up sucking messages from node 0
          MessageConsumer consumer1 = session1.createConsumer(queue[1]);
+         
+         //Give it enough time to suck
+         
+         Thread.sleep(5000);
 
          log.info("Killing node1");
          ServerManagement.killAndWait(1);
@@ -217,13 +226,28 @@
          
          log.info("creating new consumer");
          
+         //We should now be able to consume the messages 5 to 19.
+         //Note that they will be in a different order since 10 to 10 were sucked to node 0 before crashing
+         //Also there is the possibility that after crashing the queue attempted to delivery to one or more of the remote consumers
+         //for the node that crashed, (YES it is possible to send more than one message on a failed connection before getting
+         //an exception), so this won't be cancelled until the connection checker kicks in any closes the consumer         
+         
+         Set msgs = new HashSet();
+         
          for (int i = 5; i < 20; i++)
          {
-            msg = (TextMessage)consumer0.receive(5000);
+            msg = (TextMessage)consumer0.receive(60000);
             assertNotNull(msg);
-            log.info("msg = " + msg.getText());
-            assertEquals("message " + i, msg.getText());
+            
+            log.info("Got message " + msg.getText());
+            
+            msgs.add(msg.getText());
          }
+         
+         for (int i = 5; i < 20; i++)
+         {
+            assertTrue(msgs.contains("message " + i));
+         }
 
          assertNull(consumer0.receive(5000));
 
@@ -289,75 +313,29 @@
          session1.commit();
          
          
-         //Make sure messages exist
+         //Don't consume them or they will be pulled from one node to another
          
-         MessageConsumer cons0 = session0.createConsumer(queue[0]);
          
-         conn0.start();
-         
-         TextMessage tm;
-         
-         for (int i = 0; i < 10; i++)
-         {
-            tm = (TextMessage)cons0.receive(1000);
-            
-            assertNotNull(tm);
-            
-            assertEquals("message " + i, tm.getText());
-         }
-         
-         tm = (TextMessage)cons0.receive(1000);
-         
-         assertNull(tm);
-         
-         session0.rollback();
-         
-         cons0.close();
-         
-         cons0 = null;
-         
-         
-         MessageConsumer cons1 = session1.createConsumer(queue[0]);
-         
-         conn1.start();
-         
-         for (int i = 10; i < 20; i++)
-         {
-            tm = (TextMessage)cons1.receive(1000);
-            
-            assertNotNull(tm);
-            
-            assertEquals("message " + i, tm.getText());
-         }
-         
-         tm = (TextMessage)cons1.receive(1000);
-         
-         assertNull(tm);
-         
-         session1.rollback();
-         
-         cons1.close();
-         
-         cons1 = null;
-         
-         
          //Now kill the server
          waitForFailoverComplete(1, conn1);
 
          //Messages should all be available on node 0
          
-         cons0 = session0.createConsumer(queue[0]);
+         MessageConsumer cons0 = session0.createConsumer(queue[0]);
          
+         TextMessage tm;
+         
+         conn0.start();
+         
          for (int i = 0; i < 20; i++)
          {
-            tm = (TextMessage)cons0.receive(1000);
+            tm = (TextMessage)cons0.receive(60000);
             
             assertNotNull(tm);
             
             log.info("received message " + tm.getText());
-            
-            
-            //assertEquals("message " + i, tm.getText());
+                        
+            assertEquals("message " + i, tm.getText());
          }
          
          tm = (TextMessage)cons0.receive(1000);
@@ -516,6 +494,8 @@
          for (int i = 0; i < messages0; i++)
          {
             producer0.send(session0.createTextMessage("message " + i));
+            
+            log.info("Sent message: message " + i);
          }
          
          session0.commit();
@@ -530,62 +510,14 @@
          for (int i = messages0; i < messages0 + messages1; i++)
          {
             producer1.send(session1.createTextMessage("message " + i));
+            
+            log.info("Sent message: message " + i);
          }
          
          session1.commit();
          
+         MessageConsumer cons0 = null;
          
-         //Make sure messages exist
-         
-         MessageConsumer cons0 = session0.createConsumer(queue0);
-         
-         conn0.start();
-         
-         TextMessage tm;
-         
-         for (int i = 0; i < messages0; i++)
-         {
-            tm = (TextMessage)cons0.receive(1000);
-            
-            assertNotNull(tm);
-            
-            assertEquals("message " + i, tm.getText());
-         }
-         
-         tm = (TextMessage)cons0.receive(2000);
-         
-         assertNull(tm);
-         
-         session0.rollback();
-         
-         cons0.close();
-         
-         cons0 = null;
-         
-         
-         MessageConsumer cons1 = session1.createConsumer(queue1);
-         
-         conn1.start();
-         
-         for (int i = messages0; i < messages0 + messages1; i++)
-         {
-            tm = (TextMessage)cons1.receive(2000);
-            
-            assertNotNull(tm);
-            
-            assertEquals("message " + i, tm.getText());
-         }
-         
-         tm = (TextMessage)cons1.receive(2000);
-         
-         assertNull(tm);
-         
-         session1.rollback();
-         
-         cons1.close();
-         
-         cons1 = null;
-         
          if (fillConsumer)
          {
             //Creating the consumer immediately after kill should ensure that all the messages are in the consumer and
@@ -604,22 +536,38 @@
 
          //Messages should all be available on node 0
          
+         //Note they may be in a different order due to being pulled in to the consumer before killing the server
+         //And also because they may have been attempted to have been delivered to a remote consumer corresponding to a
+         //remote consumer for the failed node, so that delivery or one after may fail, so those messages may not get cancelled
+         //back until the connection checker kicks in and closes the consumer
+         
          conn0.start();                 
-                                    
-         log.info("now consuming");
+                 
+         
+         Set msgs = new HashSet();
+         
+         TextMessage tm;
+         
          for (int i = 0; i < messages0 + messages1; i++)
          {
-            tm = (TextMessage)cons0.receive(5000);
+         	//Need a long timeout to allow for connection checker to kick in and close consumer
+         	tm = (TextMessage)cons0.receive(60000);
             
             assertNotNull(tm);
             
-            log.info("received message " + tm.getText());
+            log.info("Got message " + tm.getText());
+            
+            msgs.add(tm.getText());
          }
          
+         for (int i = 0; i < messages0 + messages1; i++)
+         {
+            assertTrue(msgs.contains("message " + i));
+         }
+                  
          tm = (TextMessage)cons0.receive(2000);
          
-         assertNull(tm);
-         
+         assertNull(tm);         
       }
       finally
       {




More information about the jboss-cvs-commits mailing list