[hornetq-commits] JBoss hornetq SVN: r8540 - in trunk: src/main/org/hornetq/core/server/cluster/impl and 2 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Thu Dec 3 15:09:44 EST 2009
Author: timfox
Date: 2009-12-03 15:09:44 -0500 (Thu, 03 Dec 2009)
New Revision: 8540
Modified:
trunk/src/main/org/hornetq/core/server/cluster/MessageFlowRecord.java
trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTestBase.java
Log:
mainly added reset of remote bindings on bridge failure + cluster test base speedup
Modified: trunk/src/main/org/hornetq/core/server/cluster/MessageFlowRecord.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/MessageFlowRecord.java 2009-12-03 19:50:45 UTC (rev 8539)
+++ trunk/src/main/org/hornetq/core/server/cluster/MessageFlowRecord.java 2009-12-03 20:09:44 UTC (rev 8540)
@@ -34,5 +34,7 @@
Bridge getBridge();
void close() throws Exception;
+
+ void reset() throws Exception;
}
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2009-12-03 19:50:45 UTC (rev 8539)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2009-12-03 20:09:44 UTC (rev 8540)
@@ -564,6 +564,8 @@
{
if (flowRecord != null)
{
+ flowRecord.reset();
+
if (notifConsumer != null)
{
try
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2009-12-03 19:50:45 UTC (rev 8539)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2009-12-03 20:09:44 UTC (rev 8540)
@@ -492,6 +492,11 @@
clearBindings();
}
+
+ public void reset() throws Exception
+ {
+ clearBindings();
+ }
public void setBridge(final Bridge bridge)
{
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2009-12-03 19:50:45 UTC (rev 8539)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2009-12-03 20:09:44 UTC (rev 8540)
@@ -16,6 +16,7 @@
import static org.hornetq.core.remoting.impl.invm.TransportConstants.SERVER_ID_PROP_NAME;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
@@ -122,9 +123,13 @@
final ClientConsumer consumer;
final ClientSession session;
+
+ final int id;
- ConsumerHolder(final ClientConsumer consumer, final ClientSession session)
+ ConsumerHolder(final int id, final ClientConsumer consumer, final ClientSession session)
{
+ this.id = id;
+
this.consumer = consumer;
this.session = session;
@@ -162,11 +167,8 @@
{
messageCount = getMessageCount(po, address);
- // log.info(node + " messageCount " + messageCount);
-
if (messageCount == count)
- {
- // log.info("Waited " + (System.currentTimeMillis() - start));
+ {
return;
}
@@ -174,8 +176,6 @@
}
while (System.currentTimeMillis() - start < WAIT_TIMEOUT);
- // System.out.println(threadDump(" - fired by ClusterTestBase::waitForBindings"));
-
throw new IllegalStateException("Timed out waiting for messages (messageCount = " + messageCount +
", expecting = " +
count);
@@ -250,11 +250,9 @@
}
}
- // log.info(node + " binding count " + bindingCount + " consumer Count " + totConsumers);
if (bindingCount == count && totConsumers == consumerCount)
- {
- // log.info("Waited " + (System.currentTimeMillis() - start));
+ {
return;
}
@@ -360,7 +358,7 @@
session.start();
- consumers[consumerID] = new ConsumerHolder(consumer, session);
+ consumers[consumerID] = new ConsumerHolder(consumerID, consumer, session);
}
catch (Exception e)
{
@@ -799,71 +797,118 @@
*/
protected void verifyReceiveRoundRobinInSomeOrder(int numMessages, int... consumerIDs) throws Exception
{
+ if (numMessages < consumerIDs.length)
+ {
+ throw new IllegalStateException("You must send more messages than consumers specified or the algorithm " +
+ "won't work");
+ }
+
verifyReceiveRoundRobinInSomeOrder(true, numMessages, consumerIDs);
}
+
+ class OrderedConsumerHolder implements Comparable<OrderedConsumerHolder>
+ {
+ ConsumerHolder consumer;
+ int order;
+
+ public int compareTo(OrderedConsumerHolder o)
+ {
+ int thisOrder = this.order;
+ int otherOrder = o.order;
+ return (thisOrder < otherOrder ? -1 : (thisOrder == otherOrder ? 0 : 1));
+ }
+ }
+
+
protected void verifyReceiveRoundRobinInSomeOrder(boolean ack, int numMessages, int... consumerIDs) throws Exception
{
- Map<Integer, Integer> countMap = new HashMap<Integer, Integer>();
+ if (numMessages < consumerIDs.length)
+ {
+ throw new IllegalStateException("not enough messages");
+ }
+
+ // First get one from each consumer to determine the order, then we sort them in this order
- Set<Integer> counts = new HashSet<Integer>();
-
+ List<OrderedConsumerHolder> sorted = new ArrayList<OrderedConsumerHolder>();
+
for (int i = 0; i < consumerIDs.length; i++)
{
ConsumerHolder holder = consumers[consumerIDs[i]];
-
- if (holder == null)
+
+ ClientMessage msg = holder.consumer.receive(10000);
+
+ assertNotNull(msg);
+
+ int count = msg.getIntProperty(COUNT_PROP);
+
+ OrderedConsumerHolder orderedHolder = new OrderedConsumerHolder();
+
+ orderedHolder.consumer = holder;
+ orderedHolder.order = count;
+
+ sorted.add(orderedHolder);
+
+ if (ack)
{
- throw new IllegalArgumentException("No consumer at " + consumerIDs[i]);
+ msg.acknowledge();
}
+ }
+
+ //Now sort them
+
+ Collections.sort(sorted);
+
+ //First verify the first lot received are ok
+
+ int count = 0;
+
+ for (OrderedConsumerHolder holder: sorted)
+ {
+ if (holder.order != count)
+ {
+ throw new IllegalStateException("Out of order");
+ }
+
+ count++;
+ }
+
+ //Now check the rest are in order too
- ClientMessage message;
- do
+ outer: while (count < numMessages)
+ {
+ for (OrderedConsumerHolder holder: sorted)
{
- message = holder.consumer.receive(1000);
+ ClientMessage msg = holder.consumer.consumer.receive(10000);
- if (message != null)
+ assertNotNull(msg);
+
+ int p = msg.getIntProperty(COUNT_PROP);
+
+ if (p != count)
{
- int count = (Integer)message.getObjectProperty(COUNT_PROP);
-
- Integer prevCount = countMap.get(i);
-
- if (prevCount != null)
- {
- assertEquals("consumer " + i + " received unround-robined message (previous was " + prevCount + ")",
- prevCount + consumerIDs.length,
- count);
- }
-
- assertFalse(counts.contains(count));
-
- counts.add(count);
-
- countMap.put(i, count);
-
- if (ack)
- {
- message.acknowledge();
- }
-
- // log.info("consumer " + consumerIDs[i] + " returns " + count);
+ throw new IllegalStateException("Out of order 2");
}
- else
+
+ if (ack)
{
- // log.info("consumer " + consumerIDs[i] +" returns null");
+ msg.acknowledge();
}
+
+ count++;
+
+ if (count == numMessages)
+ {
+ break outer;
+ }
+
}
- while (message != null);
- }
-
- for (int i = 0; i < numMessages; i++)
- {
- assertTrue("did not receive message " + i, counts.contains(i));
- }
+ }
}
-
+
+
protected void verifyReceiveRoundRobinInSomeOrderWithCounts(boolean ack, int[] messageCounts, int... consumerIDs) throws Exception
- {
+ {
List<LinkedList<Integer>> receivedCounts = new ArrayList<LinkedList<Integer>>();
Set<Integer> counts = new HashSet<Integer>();
@@ -940,7 +985,6 @@
assertEquals(messageCounts[i], elem);
- // log.info("got elem " + messageCounts[i] + " at pos " + index);
index++;
@@ -954,6 +998,12 @@
protected void verifyReceiveRoundRobinInSomeOrderNoAck(int numMessages, int... consumerIDs) throws Exception
{
+ if (numMessages < consumerIDs.length)
+ {
+ throw new IllegalStateException("You must send more messages than consumers specified or the algorithm " +
+ "won't work");
+ }
+
verifyReceiveRoundRobinInSomeOrder(false, numMessages, consumerIDs);
}
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java 2009-12-03 19:50:45 UTC (rev 8539)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java 2009-12-03 20:09:44 UTC (rev 8540)
@@ -202,15 +202,22 @@
public void testRoundRobinMultipleQueues() throws Exception
{
+ log.info("starting");
setupCluster();
+ log.info("setup cluster");
+
startServers();
+ log.info("started servers");
+
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
setupSessionFactory(2, isNetty());
setupSessionFactory(3, isNetty());
setupSessionFactory(4, isNetty());
+
+ log.info("Set up session factories");
createQueue(0, "queues.testaddress", "queue0", null, false);
createQueue(1, "queues.testaddress", "queue0", null, false);
@@ -229,6 +236,8 @@
createQueue(2, "queues.testaddress", "queue2", null, false);
createQueue(3, "queues.testaddress", "queue2", null, false);
createQueue(4, "queues.testaddress", "queue2", null, false);
+
+ log.info("created queues");
addConsumer(0, 0, "queue0", null);
addConsumer(1, 1, "queue0", null);
@@ -247,6 +256,8 @@
addConsumer(12, 2, "queue2", null);
addConsumer(13, 3, "queue2", null);
addConsumer(14, 4, "queue2", null);
+
+ log.info("added consumers");
waitForBindings(0, "queues.testaddress", 3, 3, true);
waitForBindings(1, "queues.testaddress", 3, 3, true);
@@ -259,12 +270,24 @@
waitForBindings(2, "queues.testaddress", 12, 12, false);
waitForBindings(3, "queues.testaddress", 12, 12, false);
waitForBindings(4, "queues.testaddress", 12, 12, false);
+
+ log.info("waited for bindings");
send(0, "queues.testaddress", 10, false, null);
+
+ log.info("sent messages");
verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2, 3, 4);
+
+ log.info("verified 1");
+
verifyReceiveRoundRobinInSomeOrder(10, 5, 6, 7, 8, 9);
+
+ log.info("verified 2");
+
verifyReceiveRoundRobinInSomeOrder(10, 10, 11, 12, 13, 14);
+
+ log.info("verified 3");
}
public void testMultipleNonLoadBalancedQueues() throws Exception
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTestBase.java 2009-12-03 19:50:45 UTC (rev 8539)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTestBase.java 2009-12-03 20:09:44 UTC (rev 8540)
@@ -1097,136 +1097,17 @@
*/
protected void doTestL(final ClientSessionFactory sf) throws Exception
{
- final int numSessions = 1000;
+ final int numSessions = 100;
for (int i = 0; i < numSessions; i++)
{
ClientSession session = sf.createSession(false, false, false);
- log.info("Created session " + System.identityHashCode(session));
-
session.close();
-
- log.info("closed session");
}
}
- // Browsers
- // FIXME - this test won't work until we use a proper iterator for browsing a queue.
- // Making a copy of the queue for a browser consumer doesn't work well with replication since
- // When replicating the create consumer (browser) to the backup, when executed on the backup the
- // backup may have different messages in its queue since been added on different threads.
- // So when replicating deliveries they may not be found.
- // https://jira.jboss.org/jira/browse/JBMESSAGING-1433
- // protected void doTestM(final ClientSessionFactory sf, final int threadNum) throws Exception
- // {
- // long start = System.currentTimeMillis();
- //
- // ClientSession sessSend = sf.createSession(false, true, true, false);
- //
- // ClientSession sessConsume = sf.createSession(false, true, true, false);
- //
- // sessConsume.createQueue(ADDRESS, new SimpleString(threadNum + "sub"), null, false, false);
- //
- // final int numMessages = 100;
- //
- // ClientProducer producer = sessSend.createProducer(ADDRESS);
- //
- // sendMessages(sessSend, producer, numMessages, threadNum);
- //
- // ClientConsumer browser = sessConsume.createConsumer(new SimpleString(threadNum + "sub"),
- // null, false, true);
- //
- // Map<Integer, Integer> consumerCounts = new HashMap<Integer, Integer>();
- //
- // for (int i = 0; i < numMessages; i++)
- // {
- // ClientMessage msg = browser.receive(RECEIVE_TIMEOUT);
- //
- // assertNotNull(msg);
- //
- // int tn = (Integer)msg.getProperty(new SimpleString("threadnum"));
- // int cnt = (Integer)msg.getProperty(new SimpleString("count"));
- //
- // Integer c = consumerCounts.get(tn);
- // if (c == null)
- // {
- // c = new Integer(cnt);
- // }
- //
- // if (cnt != c.intValue())
- // {
- // throw new Exception("Invalid count, expected " + c + " got " + cnt);
- // }
- //
- // c++;
- //
- // //Wrap
- // if (c == numMessages)
- // {
- // c = 0;
- // }
- //
- // consumerCounts.put(tn, c);
- //
- // msg.acknowledge();
- // }
- //
- // sessConsume.close();
- //
- // sessConsume = sf.createSession(false, true, true, false);
- //
- // browser = sessConsume.createConsumer(new SimpleString(threadNum + "sub"),
- // null, false, true);
- //
- // //Messages should still be there
- //
- // consumerCounts.clear();
- //
- // for (int i = 0; i < numMessages; i++)
- // {
- // ClientMessage msg = browser.receive(RECEIVE_TIMEOUT);
- //
- // assertNotNull(msg);
- //
- // int tn = (Integer)msg.getProperty(new SimpleString("threadnum"));
- // int cnt = (Integer)msg.getProperty(new SimpleString("count"));
- //
- // Integer c = consumerCounts.get(tn);
- // if (c == null)
- // {
- // c = new Integer(cnt);
- // }
- //
- // if (cnt != c.intValue())
- // {
- // throw new Exception("Invalid count, expected " + c + " got " + cnt);
- // }
- //
- // c++;
- //
- // //Wrap
- // if (c == numMessages)
- // {
- // c = 0;
- // }
- //
- // consumerCounts.put(tn, c);
- //
- // msg.acknowledge();
- // }
- //
- // sessConsume.close();
- //
- // sessSend.deleteQueue(new SimpleString(threadNum + "sub"));
- //
- // sessSend.close();
- //
- // long end = System.currentTimeMillis();
- //
- // log.info("duration " + (end - start));
- // }
-
+
protected void doTestN(final ClientSessionFactory sf, final int threadNum) throws Exception
{
ClientSession sessCreate = sf.createSession(false, true, true);
More information about the hornetq-commits
mailing list