[hornetq-commits] JBoss hornetq SVN: r8540 - in trunk: src/main/org/hornetq/core/server/cluster/impl and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Dec 3 15:09:44 EST 2009


Author: timfox
Date: 2009-12-03 15:09:44 -0500 (Thu, 03 Dec 2009)
New Revision: 8540

Modified:
   trunk/src/main/org/hornetq/core/server/cluster/MessageFlowRecord.java
   trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
   trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
   trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
   trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java
   trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTestBase.java
Log:
mainly added reset of remote bindings on bridge failure + cluster test base speedup

Modified: trunk/src/main/org/hornetq/core/server/cluster/MessageFlowRecord.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/MessageFlowRecord.java	2009-12-03 19:50:45 UTC (rev 8539)
+++ trunk/src/main/org/hornetq/core/server/cluster/MessageFlowRecord.java	2009-12-03 20:09:44 UTC (rev 8540)
@@ -34,5 +34,7 @@
    Bridge getBridge();
 
    void close() throws Exception;
+   
+   void reset() throws Exception;
 
 }

Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2009-12-03 19:50:45 UTC (rev 8539)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2009-12-03 20:09:44 UTC (rev 8540)
@@ -564,6 +564,8 @@
    {
       if (flowRecord != null)
       {
+         flowRecord.reset();
+         
          if (notifConsumer != null)
          {
             try

Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2009-12-03 19:50:45 UTC (rev 8539)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2009-12-03 20:09:44 UTC (rev 8540)
@@ -492,6 +492,11 @@
 
          clearBindings();
       }
+      
+      public void reset() throws Exception
+      {
+         clearBindings();
+      }
 
       public void setBridge(final Bridge bridge)
       {

Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2009-12-03 19:50:45 UTC (rev 8539)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2009-12-03 20:09:44 UTC (rev 8540)
@@ -16,6 +16,7 @@
 import static org.hornetq.core.remoting.impl.invm.TransportConstants.SERVER_ID_PROP_NAME;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
@@ -122,9 +123,13 @@
       final ClientConsumer consumer;
 
       final ClientSession session;
+      
+      final int id;
 
-      ConsumerHolder(final ClientConsumer consumer, final ClientSession session)
+      ConsumerHolder(final int id, final ClientConsumer consumer, final ClientSession session)
       {
+         this.id = id;
+         
          this.consumer = consumer;
 
          this.session = session;
@@ -162,11 +167,8 @@
       {
          messageCount = getMessageCount(po, address);
 
-         // log.info(node + " messageCount " + messageCount);
-
          if (messageCount == count)
-         {
-            // log.info("Waited " + (System.currentTimeMillis() - start));
+         {          
             return;
          }
 
@@ -174,8 +176,6 @@
       }
       while (System.currentTimeMillis() - start < WAIT_TIMEOUT);
 
-      // System.out.println(threadDump(" - fired by ClusterTestBase::waitForBindings"));
-
       throw new IllegalStateException("Timed out waiting for messages (messageCount = " + messageCount +
                                       ", expecting = " +
                                       count);
@@ -250,11 +250,9 @@
             }
          }
 
-         // log.info(node + " binding count " + bindingCount + " consumer Count " + totConsumers);
 
          if (bindingCount == count && totConsumers == consumerCount)
-         {
-            // log.info("Waited " + (System.currentTimeMillis() - start));
+         {            
             return;
          }
 
@@ -360,7 +358,7 @@
 
          session.start();
 
-         consumers[consumerID] = new ConsumerHolder(consumer, session);
+         consumers[consumerID] = new ConsumerHolder(consumerID, consumer, session);
       }
       catch (Exception e)
       {
@@ -799,71 +797,118 @@
     */
    protected void verifyReceiveRoundRobinInSomeOrder(int numMessages, int... consumerIDs) throws Exception
    {
+      if (numMessages < consumerIDs.length)
+      {
+         throw new IllegalStateException("You must send more messages than consumers specified or the algorithm " +
+                                         "won't work");
+      }
+       
       verifyReceiveRoundRobinInSomeOrder(true, numMessages, consumerIDs);
    }
+   
+   class OrderedConsumerHolder implements Comparable<OrderedConsumerHolder>
+   {
+      ConsumerHolder consumer;
 
+      int order;
+
+      public int compareTo(OrderedConsumerHolder o)
+      {
+         int thisOrder = this.order;
+         int otherOrder = o.order;
+         return (thisOrder < otherOrder ? -1 : (thisOrder == otherOrder ? 0 : 1));
+      }
+   }
+   
+
    protected void verifyReceiveRoundRobinInSomeOrder(boolean ack, int numMessages, int... consumerIDs) throws Exception
    {
-      Map<Integer, Integer> countMap = new HashMap<Integer, Integer>();
+      if (numMessages < consumerIDs.length)
+      {
+         throw new IllegalStateException("not enough messages");
+      }
+      
+      // First get one from each consumer to determine the order, then we sort them in this order
 
-      Set<Integer> counts = new HashSet<Integer>();
-
+      List<OrderedConsumerHolder> sorted = new ArrayList<OrderedConsumerHolder>();
+      
       for (int i = 0; i < consumerIDs.length; i++)
       {
          ConsumerHolder holder = consumers[consumerIDs[i]];
-
-         if (holder == null)
+         
+         ClientMessage msg = holder.consumer.receive(10000);
+         
+         assertNotNull(msg);
+         
+         int count = msg.getIntProperty(COUNT_PROP);
+         
+         OrderedConsumerHolder orderedHolder = new OrderedConsumerHolder();
+         
+         orderedHolder.consumer = holder;
+         orderedHolder.order = count;
+         
+         sorted.add(orderedHolder);
+         
+         if (ack)
          {
-            throw new IllegalArgumentException("No consumer at " + consumerIDs[i]);
+            msg.acknowledge();
          }
+      }
+      
+      //Now sort them
+      
+      Collections.sort(sorted);
+      
+      //First verify the first lot received are ok
+      
+      int count = 0;
+      
+      for (OrderedConsumerHolder holder: sorted)
+      {
+         if (holder.order != count)
+         {
+            throw new IllegalStateException("Out of order");
+         }
+         
+         count++;
+      }
+      
+      //Now check the rest are in order too
 
-         ClientMessage message;
-         do
+      outer: while (count < numMessages)
+      {
+         for (OrderedConsumerHolder holder: sorted)
          {
-            message = holder.consumer.receive(1000);
+            ClientMessage msg = holder.consumer.consumer.receive(10000);
             
-            if (message != null)
+            assertNotNull(msg);
+            
+            int p = msg.getIntProperty(COUNT_PROP);
+                        
+            if (p != count)
             {
-               int count = (Integer)message.getObjectProperty(COUNT_PROP);
-
-               Integer prevCount = countMap.get(i);
-
-               if (prevCount != null)
-               {
-                  assertEquals("consumer " + i + " received unround-robined message (previous was " + prevCount + ")",
-                               prevCount + consumerIDs.length,
-                               count);
-               }
-
-               assertFalse(counts.contains(count));
-
-               counts.add(count);
-
-               countMap.put(i, count);
-
-               if (ack)
-               {
-                  message.acknowledge();
-               }
-
-               // log.info("consumer " + consumerIDs[i] + " returns " + count);
+               throw new IllegalStateException("Out of order 2");
             }
-            else
+                       
+            if (ack)
             {
-               // log.info("consumer " + consumerIDs[i] +" returns null");
+               msg.acknowledge();
             }
+            
+            count++;
+            
+            if (count == numMessages)
+            {
+               break outer;
+            }
+            
          }
-         while (message != null);
-      }
-
-      for (int i = 0; i < numMessages; i++)
-      {
-         assertTrue("did not receive message " + i, counts.contains(i));
-      }
+      }           
    }
-
+   
+   
    protected void verifyReceiveRoundRobinInSomeOrderWithCounts(boolean ack, int[] messageCounts, int... consumerIDs) throws Exception
-   {
+   {      
       List<LinkedList<Integer>> receivedCounts = new ArrayList<LinkedList<Integer>>();
 
       Set<Integer> counts = new HashSet<Integer>();
@@ -940,7 +985,6 @@
 
          assertEquals(messageCounts[i], elem);
 
-         // log.info("got elem " + messageCounts[i] + " at pos " + index);
 
          index++;
 
@@ -954,6 +998,12 @@
 
    protected void verifyReceiveRoundRobinInSomeOrderNoAck(int numMessages, int... consumerIDs) throws Exception
    {
+      if (numMessages < consumerIDs.length)
+      {
+         throw new IllegalStateException("You must send more messages than consumers specified or the algorithm " +
+                                         "won't work");
+      }
+      
       verifyReceiveRoundRobinInSomeOrder(false, numMessages, consumerIDs);
    }
 

Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java	2009-12-03 19:50:45 UTC (rev 8539)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java	2009-12-03 20:09:44 UTC (rev 8540)
@@ -202,15 +202,22 @@
 
    public void testRoundRobinMultipleQueues() throws Exception
    {
+      log.info("starting");
       setupCluster();
 
+      log.info("setup cluster");
+      
       startServers();
 
+      log.info("started servers");
+      
       setupSessionFactory(0, isNetty());
       setupSessionFactory(1, isNetty());
       setupSessionFactory(2, isNetty());
       setupSessionFactory(3, isNetty());
       setupSessionFactory(4, isNetty());
+      
+      log.info("Set up session factories");
 
       createQueue(0, "queues.testaddress", "queue0", null, false);
       createQueue(1, "queues.testaddress", "queue0", null, false);
@@ -229,6 +236,8 @@
       createQueue(2, "queues.testaddress", "queue2", null, false);
       createQueue(3, "queues.testaddress", "queue2", null, false);
       createQueue(4, "queues.testaddress", "queue2", null, false);
+      
+      log.info("created queues");
 
       addConsumer(0, 0, "queue0", null);
       addConsumer(1, 1, "queue0", null);
@@ -247,6 +256,8 @@
       addConsumer(12, 2, "queue2", null);
       addConsumer(13, 3, "queue2", null);
       addConsumer(14, 4, "queue2", null);
+      
+      log.info("added consumers");
 
       waitForBindings(0, "queues.testaddress", 3, 3, true);
       waitForBindings(1, "queues.testaddress", 3, 3, true);
@@ -259,12 +270,24 @@
       waitForBindings(2, "queues.testaddress", 12, 12, false);
       waitForBindings(3, "queues.testaddress", 12, 12, false);
       waitForBindings(4, "queues.testaddress", 12, 12, false);
+      
+      log.info("waited for bindings");
 
       send(0, "queues.testaddress", 10, false, null);
+      
+      log.info("sent messages");
 
       verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2, 3, 4);
+      
+      log.info("verified 1");
+      
       verifyReceiveRoundRobinInSomeOrder(10, 5, 6, 7, 8, 9);
+      
+      log.info("verified 2");
+      
       verifyReceiveRoundRobinInSomeOrder(10, 10, 11, 12, 13, 14);
+      
+      log.info("verified 3");
    }
 
    public void testMultipleNonLoadBalancedQueues() throws Exception

Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTestBase.java	2009-12-03 19:50:45 UTC (rev 8539)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTestBase.java	2009-12-03 20:09:44 UTC (rev 8540)
@@ -1097,136 +1097,17 @@
     */
    protected void doTestL(final ClientSessionFactory sf) throws Exception
    {
-      final int numSessions = 1000;
+      final int numSessions = 100;
 
       for (int i = 0; i < numSessions; i++)
       {
          ClientSession session = sf.createSession(false, false, false);
          
-         log.info("Created session " + System.identityHashCode(session));
-
          session.close();
-         
-         log.info("closed session");
       }
    }
 
-   // Browsers
-   // FIXME - this test won't work until we use a proper iterator for browsing a queue.
-   // Making a copy of the queue for a browser consumer doesn't work well with replication since
-   // When replicating the create consumer (browser) to the backup, when executed on the backup the
-   // backup may have different messages in its queue since been added on different threads.
-   // So when replicating deliveries they may not be found.
-   // https://jira.jboss.org/jira/browse/JBMESSAGING-1433
-   // protected void doTestM(final ClientSessionFactory sf, final int threadNum) throws Exception
-   // {
-   // long start = System.currentTimeMillis();
-   //
-   // ClientSession sessSend = sf.createSession(false, true, true, false);
-   //      
-   // ClientSession sessConsume = sf.createSession(false, true, true, false);
-   //      
-   // sessConsume.createQueue(ADDRESS, new SimpleString(threadNum + "sub"), null, false, false);
-   //
-   // final int numMessages = 100;
-   //
-   // ClientProducer producer = sessSend.createProducer(ADDRESS);
-   //
-   // sendMessages(sessSend, producer, numMessages, threadNum);
-   //      
-   // ClientConsumer browser = sessConsume.createConsumer(new SimpleString(threadNum + "sub"),
-   // null, false, true);
-   //      
-   // Map<Integer, Integer> consumerCounts = new HashMap<Integer, Integer>();
-   //      
-   // for (int i = 0; i < numMessages; i++)
-   // {
-   // ClientMessage msg = browser.receive(RECEIVE_TIMEOUT);
-   //
-   // assertNotNull(msg);
-   //
-   // int tn = (Integer)msg.getProperty(new SimpleString("threadnum"));
-   // int cnt = (Integer)msg.getProperty(new SimpleString("count"));
-   //
-   // Integer c = consumerCounts.get(tn);
-   // if (c == null)
-   // {
-   // c = new Integer(cnt);
-   // }
-   //
-   // if (cnt != c.intValue())
-   // {
-   // throw new Exception("Invalid count, expected " + c + " got " + cnt);
-   // }
-   //         
-   // c++;
-   //         
-   // //Wrap
-   // if (c == numMessages)
-   // {
-   // c = 0;
-   // }
-   //         
-   // consumerCounts.put(tn, c);
-   //
-   // msg.acknowledge();
-   // }
-   //
-   // sessConsume.close();
-   //      
-   // sessConsume = sf.createSession(false, true, true, false);
-   //      
-   // browser = sessConsume.createConsumer(new SimpleString(threadNum + "sub"),
-   // null, false, true);
-   //      
-   // //Messages should still be there
-   //      
-   // consumerCounts.clear();
-   //      
-   // for (int i = 0; i < numMessages; i++)
-   // {
-   // ClientMessage msg = browser.receive(RECEIVE_TIMEOUT);
-   //
-   // assertNotNull(msg);
-   //
-   // int tn = (Integer)msg.getProperty(new SimpleString("threadnum"));
-   // int cnt = (Integer)msg.getProperty(new SimpleString("count"));
-   //
-   // Integer c = consumerCounts.get(tn);
-   // if (c == null)
-   // {
-   // c = new Integer(cnt);
-   // }
-   //
-   // if (cnt != c.intValue())
-   // {
-   // throw new Exception("Invalid count, expected " + c + " got " + cnt);
-   // }
-   //         
-   // c++;
-   //         
-   // //Wrap
-   // if (c == numMessages)
-   // {
-   // c = 0;
-   // }
-   //         
-   // consumerCounts.put(tn, c);
-   //
-   // msg.acknowledge();
-   // }
-   //      
-   // sessConsume.close();
-   //      
-   // sessSend.deleteQueue(new SimpleString(threadNum + "sub"));
-   //      
-   // sessSend.close();
-   //
-   // long end = System.currentTimeMillis();
-   //
-   // log.info("duration " + (end - start));
-   // }
-
+  
    protected void doTestN(final ClientSessionFactory sf, final int threadNum) throws Exception
    {
       ClientSession sessCreate = sf.createSession(false, true, true);



More information about the hornetq-commits mailing list