[jboss-cvs] JBoss Messaging SVN: r6575 - in trunk: src/main/org/jboss/messaging/core/postoffice and 4 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Apr 27 08:29:17 EDT 2009


Author: timfox
Date: 2009-04-27 08:29:17 -0400 (Mon, 27 Apr 2009)
New Revision: 6575

Modified:
   trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java
   trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/LocalQueueBinding.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/RemoteQueueBindingImpl.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
some fixes

Modified: trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java	2009-04-27 08:01:50 UTC (rev 6574)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java	2009-04-27 12:29:17 UTC (rev 6575)
@@ -586,53 +586,60 @@
          // This needs to be synchronized since we need to ensure notifications are processed in strict sequence
          synchronized (this)
          {
-            // First send to any local listeners
-            for (NotificationListener listener : listeners)
+            //We also need to synchronize on the post office notification lock
+            //otherwise we can get notifications arriving in wrong order / missing
+            //if a notification occurs at same time as sendQueueInfoToQueue is processed
+            synchronized (postOffice.getNotificationLock())
             {
-               try
+               
+               // First send to any local listeners
+               for (NotificationListener listener : listeners)
                {
-                  listener.onNotification(notification);
+                  try
+                  {
+                     listener.onNotification(notification);
+                  }
+                  catch (Exception e)
+                  {
+                     // Exception thrown from one listener should not stop execution of others
+                     log.error("Failed to call listener", e);
+                  }
                }
-               catch (Exception e)
+   
+               // Now send message
+   
+               ServerMessage notificationMessage = new ServerMessageImpl(storageManager.generateUniqueID());
+   
+               notificationMessage.setBody(ChannelBuffers.EMPTY_BUFFER);
+               // Notification messages are always durable so the user can choose whether to add a durable queue to consume
+               // them in
+               notificationMessage.setDurable(true);
+               notificationMessage.setDestination(managementNotificationAddress);
+   
+               TypedProperties notifProps;
+               if (notification.getProperties() != null)
                {
-                  // Exception thrown from one listener should not stop execution of others
-                  log.error("Failed to call listener", e);
+                  notifProps = new TypedProperties(notification.getProperties());
                }
+               else
+               {
+                  notifProps = new TypedProperties();
+               }
+   
+               notifProps.putStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE,
+                                            new SimpleString(notification.getType().toString()));
+   
+               notifProps.putLongProperty(ManagementHelper.HDR_NOTIFICATION_TIMESTAMP, System.currentTimeMillis());
+   
+               if (notification.getUID() != null)
+               {
+                  notifProps.putStringProperty(new SimpleString("foobar"), new SimpleString(notification.getUID()));
+               }
+   
+               notificationMessage.putTypedProperties(notifProps);
+   
+               postOffice.route(notificationMessage, null);
             }
-
-            // Now send message
-
-            ServerMessage notificationMessage = new ServerMessageImpl(storageManager.generateUniqueID());
-
-            notificationMessage.setBody(ChannelBuffers.EMPTY_BUFFER);
-            // Notification messages are always durable so the user can choose whether to add a durable queue to consume
-            // them in
-            notificationMessage.setDurable(true);
-            notificationMessage.setDestination(managementNotificationAddress);
-
-            TypedProperties notifProps;
-            if (notification.getProperties() != null)
-            {
-               notifProps = new TypedProperties(notification.getProperties());
-            }
-            else
-            {
-               notifProps = new TypedProperties();
-            }
-
-            notifProps.putStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE,
-                                         new SimpleString(notification.getType().toString()));
-
-            notifProps.putLongProperty(ManagementHelper.HDR_NOTIFICATION_TIMESTAMP, System.currentTimeMillis());
-
-            if (notification.getUID() != null)
-            {
-               notifProps.putStringProperty(new SimpleString("foobar"), new SimpleString(notification.getUID()));
-            }
-
-            notificationMessage.putTypedProperties(notifProps);
-
-            postOffice.route(notificationMessage, null);
          }
       }
    }

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java	2009-04-27 08:01:50 UTC (rev 6574)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java	2009-04-27 12:29:17 UTC (rev 6575)
@@ -74,4 +74,6 @@
    DuplicateIDCache getDuplicateIDCache(SimpleString address);
    
    void sendQueueInfoToQueue(SimpleString queueName, SimpleString address) throws Exception;
+   
+   Object getNotificationLock();     
 }

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/LocalQueueBinding.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/LocalQueueBinding.java	2009-04-27 08:01:50 UTC (rev 6574)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/LocalQueueBinding.java	2009-04-27 12:29:17 UTC (rev 6575)
@@ -161,7 +161,7 @@
    }
    
    public void willRoute(final ServerMessage message)
-   {                 
+   {                     
    }
      
    public boolean isQueueBinding()

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2009-04-27 08:01:50 UTC (rev 6574)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2009-04-27 12:29:17 UTC (rev 6575)
@@ -736,6 +736,11 @@
 
       return cache;
    }
+   
+   public Object getNotificationLock()
+   {
+      return notificationLock;
+   }
 
    public void sendQueueInfoToQueue(final SimpleString queueName, final SimpleString address) throws Exception
    {

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/RemoteQueueBindingImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/RemoteQueueBindingImpl.java	2009-04-27 08:01:50 UTC (rev 6574)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/RemoteQueueBindingImpl.java	2009-04-27 12:29:17 UTC (rev 6575)
@@ -195,7 +195,7 @@
    }
    
    public void willRoute(final ServerMessage message)
-   {          
+   {                      
       //We add a header with the name of the queue, holding a list of the transient ids of the queues to route to
       
       //TODO - this can be optimised

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java	2009-04-27 08:01:50 UTC (rev 6574)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java	2009-04-27 12:29:17 UTC (rev 6575)
@@ -149,7 +149,7 @@
       }
       while (System.currentTimeMillis() - start < WAIT_TIMEOUT);
 
-      System.out.println(threadDump(" - fired by ClusterTestBase::waitForBindings"));
+      //System.out.println(threadDump(" - fired by ClusterTestBase::waitForBindings"));
 
       throw new IllegalStateException("Timed out waiting for messages (messageCount = " + messageCount +
                                       ", expecting = " +
@@ -162,7 +162,7 @@
                                   final int consumerCount,
                                   final boolean local) throws Exception
    {
-//       log.info("waiting for bindings on node " + node +
+//      log.info("waiting for bindings on node " + node +
 //               " address " +
 //               address +
 //               " count " +
@@ -206,7 +206,7 @@
             }
          }
 
-        // log.info(node + " binding count " + bindingCount + " consumer Count " + totConsumers);
+         //log.info(node + " binding count " + bindingCount + " consumer Count " + totConsumers);
 
          if (bindingCount == count && totConsumers == consumerCount)
          {
@@ -218,11 +218,15 @@
       }
       while (System.currentTimeMillis() - start < WAIT_TIMEOUT);
 
-      System.out.println(threadDump(" - fired by ClusterTestBase::waitForBindings"));
+      // System.out.println(threadDump(" - fired by ClusterTestBase::waitForBindings"));
 
-      throw new IllegalStateException("Timed out waiting for bindings (bindingCount = " + bindingCount +
-                                      ", totConsumers = " +
-                                      totConsumers);
+      String msg = "Timed out waiting for bindings (bindingCount = " + bindingCount +
+                   ", totConsumers = " +
+                   totConsumers;
+
+      log.error(msg);
+
+      throw new IllegalStateException(msg);
    }
 
    protected void createQueue(int node, String address, String queueName, String filterVal, boolean durable) throws Exception
@@ -399,18 +403,22 @@
    {
       sendInRange(node, address, 0, numMessages, durable, filterVal);
    }
-   
+
    protected void verifyReceiveAllInRange(boolean ack, int msgStart, int msgEnd, int... consumerIDs) throws Exception
    {
       verifyReceiveAllInRangeNotBefore(ack, -1, msgStart, msgEnd, consumerIDs);
    }
-   
+
    protected void verifyReceiveAllInRange(int msgStart, int msgEnd, int... consumerIDs) throws Exception
    {
       verifyReceiveAllInRangeNotBefore(false, -1, msgStart, msgEnd, consumerIDs);
    }
 
-   protected void verifyReceiveAllInRangeNotBefore(boolean ack, long firstReceiveTime, int msgStart, int msgEnd, int... consumerIDs) throws Exception
+   protected void verifyReceiveAllInRangeNotBefore(boolean ack,
+                                                   long firstReceiveTime,
+                                                   int msgStart,
+                                                   int msgEnd,
+                                                   int... consumerIDs) throws Exception
    {
       boolean outOfOrder = false;
       for (int i = 0; i < consumerIDs.length; i++)
@@ -426,13 +434,20 @@
          {
             ClientMessage message = holder.consumer.receive(2000);
 
-            assertNotNull("consumer " + consumerIDs[i] + " did not receive message " + j, message);
+            if (message == null)
+            {
+               log.info("*** dumping consumers:");
 
+               dumpConsumers();
+
+               assertNotNull("consumer " + consumerIDs[i] + " did not receive message " + j, message);
+            }
+           
             if (ack)
             {
                message.acknowledge();
             }
-            
+
             if (firstReceiveTime != -1)
             {
                assertTrue("Message received too soon", System.currentTimeMillis() >= firstReceiveTime);
@@ -448,7 +463,20 @@
 
       assertFalse("Messages were consumed out of order, look at System.out for more information", outOfOrder);
    }
-   
+
+   private void dumpConsumers() throws Exception
+   {
+      for (int i = 0; i < consumers.length; i++)
+      {
+         if (consumers[i] != null)
+         {
+            log.info("Dumping consumer " + i);
+
+            checkReceive(i);
+         }
+      }
+   }
+
    protected void verifyReceiveAll(boolean ack, int numMessages, int... consumerIDs) throws Exception
    {
       verifyReceiveAllInRange(ack, 0, numMessages, consumerIDs);
@@ -574,12 +602,12 @@
                {
                   message.acknowledge();
                }
-               
-               //log.info("consumer " + consumerIDs[i] +" returns " + count);
+
+               // log.info("consumer " + consumerIDs[i] +" returns " + count);
             }
             else
             {
-              // log.info("consumer " + consumerIDs[i] +" returns null");
+               // log.info("consumer " + consumerIDs[i] +" returns null");
             }
          }
          while (message != null);
@@ -954,7 +982,7 @@
 
       TransportConfiguration nettyBackuptc = null;
       TransportConfiguration invmBackuptc = null;
-      
+
       if (backupNode != -1)
       {
          Map<String, Object> backupParams = generateParams(backupNode, netty);
@@ -1000,11 +1028,13 @@
          TransportConfiguration nettytc_c = new TransportConfiguration(NETTY_CONNECTOR_FACTORY, params);
          configuration.getConnectorConfigurations().put(nettytc_c.getName(), nettytc_c);
 
-         connectorPairs.add(new Pair<String, String>(nettytc_c.getName(), nettyBackuptc == null ? null : nettyBackuptc.getName()));
+         connectorPairs.add(new Pair<String, String>(nettytc_c.getName(),
+                                                     nettyBackuptc == null ? null : nettyBackuptc.getName()));
       }
       else
       {
-         connectorPairs.add(new Pair<String, String>(invmtc_c.getName(), invmBackuptc == null ? null : invmBackuptc.getName()));
+         connectorPairs.add(new Pair<String, String>(invmtc_c.getName(), invmBackuptc == null ? null
+                                                                                             : invmBackuptc.getName()));
       }
 
       BroadcastGroupConfiguration bcConfig = new BroadcastGroupConfiguration("bg1",
@@ -1059,7 +1089,7 @@
          servers[nodes[i]] = null;
       }
    }
-   
+
    protected void clearAllServers()
    {
       for (int i = 0; i < servers.length; i++)
@@ -1117,7 +1147,6 @@
       serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
    }
 
-   
    protected void setupClusterConnection(String name,
                                          String address,
                                          boolean forwardWhenNoConsumers,

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterTest.java	2009-04-27 08:01:50 UTC (rev 6574)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterTest.java	2009-04-27 12:29:17 UTC (rev 6575)
@@ -1245,6 +1245,7 @@
       waitForBindings(3, "queues.testaddress", 1, 1, true);
       waitForBindings(4, "queues.testaddress", 1, 1, true);
 
+      waitForBindings(0, "queues.testaddress", 4, 4, false);
       waitForBindings(1, "queues.testaddress", 3, 3, false);
       waitForBindings(2, "queues.testaddress", 3, 3, false);
       waitForBindings(3, "queues.testaddress", 3, 3, false);
@@ -1252,7 +1253,7 @@
 
       send(0, "queues.testaddress", 10, false, null);
 
-      verifyReceiveAll(10, 1, 2, 3, 4);
+      verifyReceiveAll(10, 1, 2, 3, 4);     
    }
 
    public void testNoLocalQueueLoadBalancedQueues() throws Exception
@@ -1282,6 +1283,7 @@
       waitForBindings(3, "queues.testaddress", 1, 1, true);
       waitForBindings(4, "queues.testaddress", 1, 1, true);
 
+      waitForBindings(0, "queues.testaddress", 4, 4, false);
       waitForBindings(1, "queues.testaddress", 3, 3, false);
       waitForBindings(2, "queues.testaddress", 3, 3, false);
       waitForBindings(3, "queues.testaddress", 3, 3, false);

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java	2009-04-27 08:01:50 UTC (rev 6574)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java	2009-04-27 12:29:17 UTC (rev 6575)
@@ -251,6 +251,8 @@
       setupCluster();
 
       startServers();
+      
+      log.info("*** started servers");
 
       setupSessionFactory(0, isNetty());
       setupSessionFactory(1, isNetty());

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java	2009-04-27 08:01:50 UTC (rev 6574)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java	2009-04-27 12:29:17 UTC (rev 6575)
@@ -1175,6 +1175,12 @@
    class FakePostOffice implements PostOffice
    {
 
+      public Object getNotificationLock()
+      {
+         // TODO Auto-generated method stub
+         return null;
+      }
+
       public Bindings getMatchingBindings(SimpleString address)
       {
          // TODO Auto-generated method stub




More information about the jboss-cvs-commits mailing list