From do-not-reply at jboss.org Wed Oct 14 09:42:12 2009 Content-Type: multipart/mixed; boundary="===============3975140103221713773==" MIME-Version: 1.0 From: do-not-reply at jboss.org To: hornetq-commits at lists.jboss.org Subject: [hornetq-commits] JBoss hornetq SVN: r8110 - branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution. Date: Wed, 14 Oct 2009 09:42:12 -0400 Message-ID: <200910141342.n9EDgCV9016346@svn01.web.mwc.hst.phx2.redhat.com> --===============3975140103221713773== Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: quoted-printable Author: ataylor Date: 2009-10-14 09:42:12 -0400 (Wed, 14 Oct 2009) New Revision: 8110 Modified: branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluste= r/distribution/ClusterTestBase.java branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluste= r/distribution/ClusteredGroupingTest.java Log: new test Modified: branches/hornetq_grouping/tests/src/org/hornetq/tests/integration= /cluster/distribution/ClusterTestBase.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/clust= er/distribution/ClusterTestBase.java 2009-10-14 12:59:50 UTC (rev 8109) +++ branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/clust= er/distribution/ClusterTestBase.java 2009-10-14 13:42:12 UTC (rev 8110) @@ -444,7 +444,6 @@ = message.putStringProperty(key, val); message.putIntProperty(COUNT_PROP, i); - System.out.println("i =3D " + i); producer.send(message); } = @@ -511,7 +510,6 @@ int msgEnd, int... consumerIDs) throws= Exception { - boolean outOfOrder =3D false; HashMap groupIdsReceived =3D new HashMap(); for (int i =3D 0; i < consumerIDs.length; i++) { @@ -546,6 +544,7 @@ } = SimpleString id =3D (SimpleString) message.getProperty(Message= Impl.HDR_GROUP_ID); + System.out.println("received " + id + " on consumer " + i); if(groupIdsReceived.get(id) =3D=3D null) { groupIdsReceived.put(id, i); Modified: branches/hornetq_grouping/tests/src/org/hornetq/tests/integration= /cluster/distribution/ClusteredGroupingTest.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/clust= er/distribution/ClusteredGroupingTest.java 2009-10-14 12:59:50 UTC (rev 810= 9) +++ branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/clust= er/distribution/ClusteredGroupingTest.java 2009-10-14 13:42:12 UTC (rev 811= 0) @@ -21,6 +21,9 @@ import org.hornetq.core.management.Notification; import org.hornetq.utils.SimpleString; = +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + /** * @author Andy Taylor */ @@ -128,7 +131,7 @@ = public void onNotification(Notification notification) { - = + } }, 0); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1, 1); @@ -421,7 +424,6 @@ } = = - public void testGroupingRoundRobin() throws Exception { setupServer(0, isFileStorage(), isNetty()); @@ -612,8 +614,8 @@ = addConsumer(1, 1, "queue0", null); waitForBindings(1, "queues.testaddress", 1, 1, true); - = = + verifyReceiveAllInRange(10, 20, 1); = System.out.println("*********************************************= ********************************"); @@ -629,77 +631,77 @@ } = public void testGroupingSendTo3queuesPinnedNodeGoesDownSendBeforeStop()= throws Exception - { - setupServer(0, isFileStorage(), isNetty()); - setupServer(1, isFileStorage(), isNetty()); - setupServer(2, isFileStorage(), isNetty()); + { + setupServer(0, isFileStorage(), isNetty()); + setupServer(1, isFileStorage(), isNetty()); + setupServer(2, isFileStorage(), isNetty()); = - setupClusterConnection("cluster0", "queues", false, 1, isNetty(),= 0, 1, 2); + setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0,= 1, 2); = - setupClusterConnection("cluster1", "queues", false, 1, isNetty(),= 1, 0, 2); + setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1,= 0, 2); = - setupClusterConnection("cluster2", "queues", false, 1, isNetty(),= 2, 0, 1); + setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2,= 0, 1); = - startServers(0, 1, 2); + startServers(0, 1, 2); = - try - { - setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0); - setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1); - setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2); + try + { + setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0); + setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1); + setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2); = - setupSessionFactory(0, isNetty()); - setupSessionFactory(1, isNetty()); - setupSessionFactory(2, isNetty()); + setupSessionFactory(0, isNetty()); + setupSessionFactory(1, isNetty()); + setupSessionFactory(2, isNetty()); = - createQueue(0, "queues.testaddress", "queue0", null, true); - createQueue(1, "queues.testaddress", "queue0", null, true); - createQueue(2, "queues.testaddress", "queue0", null, true); + createQueue(0, "queues.testaddress", "queue0", null, true); + createQueue(1, "queues.testaddress", "queue0", null, true); + createQueue(2, "queues.testaddress", "queue0", null, true); = - addConsumer(0, 1, "queue0", null); + addConsumer(0, 1, "queue0", null); = - waitForBindings(0, "queues.testaddress", 1, 0, true); - waitForBindings(1, "queues.testaddress", 1, 1, true); - waitForBindings(2, "queues.testaddress", 1, 0, true); + waitForBindings(0, "queues.testaddress", 1, 0, true); + waitForBindings(1, "queues.testaddress", 1, 1, true); + waitForBindings(2, "queues.testaddress", 1, 0, true); = - waitForBindings(0, "queues.testaddress", 2, 1, false); - waitForBindings(1, "queues.testaddress", 2, 0, false); - waitForBindings(2, "queues.testaddress", 2, 1, false); + waitForBindings(0, "queues.testaddress", 2, 1, false); + waitForBindings(1, "queues.testaddress", 2, 0, false); + waitForBindings(2, "queues.testaddress", 2, 1, false); = - sendInRange(1, "queues.testaddress", 0, 10, true, MessageImpl.= HDR_GROUP_ID, new SimpleString("id1")); + sendInRange(1, "queues.testaddress", 0, 10, true, MessageImpl.HDR= _GROUP_ID, new SimpleString("id1")); = - verifyReceiveAllInRange(true, 0, 10, 0); + verifyReceiveAllInRange(true, 0, 10, 0); = - closeAllConsumers(); + closeAllConsumers(); = - sendInRange(2, "queues.testaddress", 10, 20, true, MessageImpl= .HDR_GROUP_ID, new SimpleString("id1")); + sendInRange(2, "queues.testaddress", 10, 20, true, MessageImpl.HD= R_GROUP_ID, new SimpleString("id1")); = = - sendInRange(0, "queues.testaddress", 20, 30, true, MessageImpl= .HDR_GROUP_ID, new SimpleString("id1")); + sendInRange(0, "queues.testaddress", 20, 30, true, MessageImpl.HD= R_GROUP_ID, new SimpleString("id1")); = - stopServers(1); + stopServers(1); = - startServers(1); + startServers(1); = - addConsumer(1, 1, "queue0", null); + addConsumer(1, 1, "queue0", null); = - waitForBindings(1, "queues.testaddress", 1, 1, true); + waitForBindings(1, "queues.testaddress", 1, 1, true); = = - verifyReceiveAllInRange(10, 30, 1); + verifyReceiveAllInRange(10, 30, 1); = = - System.out.println("******************************************= ***********************************"); - } - finally - { - //closeAllConsumers(); + System.out.println("*********************************************= ********************************"); + } + finally + { + //closeAllConsumers(); = - closeAllSessionFactories(); + closeAllSessionFactories(); = - stopServers(0, 1, 2); - } + stopServers(0, 1, 2); } + } = = public void testGroupingSendTo3queuesPinnedNodeGoesDownSendAfterRestart= () throws Exception @@ -747,7 +749,6 @@ stopServers(1); = = - startServers(1); = sendInRange(2, "queues.testaddress", 10, 20, false, MessageImpl.H= DR_GROUP_ID, new SimpleString("id1")); @@ -822,8 +823,6 @@ sendInRange(2, "queues.testaddress", 10, 20, false, MessageImpl.H= DR_GROUP_ID, new SimpleString("id1")); = = - = - startServers(0); = waitForBindings(0, "queues.testaddress", 1, 0, true); @@ -845,70 +844,137 @@ } = public void testGroupingMultipleQueuesOnAddress() throws Exception + { + setupServer(0, isFileStorage(), isNetty()); + setupServer(1, isFileStorage(), isNetty()); + setupServer(2, isFileStorage(), isNetty()); + + setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0,= 1, 2); + + setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1,= 0, 2); + + setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2,= 0, 1); + + startServers(0, 1, 2); + + try { - setupServer(0, isFileStorage(), isNetty()); - setupServer(1, isFileStorage(), isNetty()); - setupServer(2, isFileStorage(), isNetty()); + setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0); + setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1); + setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2); = - setupClusterConnection("cluster0", "queues", false, 1, isNetty(),= 0, 1, 2); + setupSessionFactory(0, isNetty()); + setupSessionFactory(1, isNetty()); + setupSessionFactory(2, isNetty()); = - setupClusterConnection("cluster1", "queues", false, 1, isNetty(),= 1, 0, 2); + createQueue(0, "queues.testaddress", "queue0", null, false); + createQueue(1, "queues.testaddress", "queue0", null, false); + createQueue(2, "queues.testaddress", "queue0", null, false); = - setupClusterConnection("cluster2", "queues", false, 1, isNetty(),= 2, 0, 1); + createQueue(0, "queues.testaddress", "queue1", null, false); + createQueue(1, "queues.testaddress", "queue1", null, false); + createQueue(2, "queues.testaddress", "queue1", null, false); = - startServers(0, 1, 2); + addConsumer(0, 0, "queue0", null); + addConsumer(1, 1, "queue0", null); + addConsumer(2, 2, "queue0", null); = - try - { - setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0); - setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1); - setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2); + addConsumer(3, 0, "queue0", null); + addConsumer(4, 1, "queue0", null); + addConsumer(5, 2, "queue0", null); = - setupSessionFactory(0, isNetty()); - setupSessionFactory(1, isNetty()); - setupSessionFactory(2, isNetty()); + waitForBindings(0, "queues.testaddress", 2, 2, true); + waitForBindings(1, "queues.testaddress", 2, 2, true); + waitForBindings(2, "queues.testaddress", 2, 2, true); = - createQueue(0, "queues.testaddress", "queue0", null, false); - createQueue(1, "queues.testaddress", "queue0", null, false); - createQueue(2, "queues.testaddress", "queue0", null, false); + waitForBindings(0, "queues.testaddress", 4, 4, false); + waitForBindings(1, "queues.testaddress", 4, 4, false); + waitForBindings(2, "queues.testaddress", 4, 4, false); = - createQueue(0, "queues.testaddress", "queue1", null, false); - createQueue(1, "queues.testaddress", "queue1", null, false); - createQueue(2, "queues.testaddress", "queue1", null, false); = - addConsumer(0, 0, "queue0", null); - addConsumer(1, 1, "queue0", null); - addConsumer(2, 2, "queue0", null); - = - addConsumer(3, 0, "queue0", null); - addConsumer(4, 1, "queue0", null); - addConsumer(5, 2, "queue0", null); + sendWithProperty(0, "queues.testaddress", 10, false, MessageImpl.= HDR_GROUP_ID, new SimpleString("id1")); = - waitForBindings(0, "queues.testaddress", 2, 2, true); - waitForBindings(1, "queues.testaddress", 2, 2, true); - waitForBindings(2, "queues.testaddress", 2, 2, true); + verifyReceiveAll(10, 0); = - waitForBindings(0, "queues.testaddress", 4, 4, false); - waitForBindings(1, "queues.testaddress", 4, 4, false); - waitForBindings(2, "queues.testaddress", 4, 4, false); + System.out.println("*********************************************= ********************************"); + } + finally + { + closeAllConsumers(); = + closeAllSessionFactories(); = - sendWithProperty(0, "queues.testaddress", 10, false, MessageIm= pl.HDR_GROUP_ID, new SimpleString("id1")); + stopServers(0, 1, 2); + } + } = - verifyReceiveAll(10, 0); + public void testGroupingMultipleSending() throws Exception + { + setupServer(0, isFileStorage(), isNetty()); + setupServer(1, isFileStorage(), isNetty()); + setupServer(2, isFileStorage(), isNetty()); = - System.out.println("******************************************= ***********************************"); + setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0,= 1, 2); + + setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1,= 0, 2); + + setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2,= 0, 1); + + startServers(0, 1, 2); + + try + { + setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0); + setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1); + setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2); + + setupSessionFactory(0, isNetty()); + setupSessionFactory(1, isNetty()); + setupSessionFactory(2, isNetty()); + + createQueue(0, "queues.testaddress", "queue0", null, false); + createQueue(1, "queues.testaddress", "queue0", null, false); + createQueue(2, "queues.testaddress", "queue0", null, false); + + addConsumer(0, 0, "queue0", null); + addConsumer(1, 1, "queue0", null); + addConsumer(2, 2, "queue0", null); + + waitForBindings(0, "queues.testaddress", 1, 1, true); + waitForBindings(1, "queues.testaddress", 1, 1, true); + waitForBindings(2, "queues.testaddress", 1, 1, true); + + waitForBindings(0, "queues.testaddress", 2, 2, false); + waitForBindings(1, "queues.testaddress", 2, 2, false); + waitForBindings(2, "queues.testaddress", 2, 2, false); + + CountDownLatch latch =3D new CountDownLatch(1); + Thread[] threads =3D new Thread[9]; + int range =3D 0; + for(int i =3D 0 ; i < 9; i++,range+=3D10) + { + threads[i] =3D new Thread(new ThreadSender(range, range+10, 1,= new SimpleString("id" + i), latch, i < 8)); } - finally + for (Thread thread : threads) { - closeAllConsumers(); + thread.start(); + } = - closeAllSessionFactories(); + verifyReceiveAllWithGroupIDRoundRobin(0, 30, 0, 1, 2); = - stopServers(0, 1, 2); - } + System.out.println("*********************************************= ********************************"); } + finally + { + closeAllConsumers(); = + closeAllSessionFactories(); + + stopServers(0, 1, 2); + } + } + + public boolean isNetty() { return true; @@ -918,5 +984,52 @@ { return true; } + + class ThreadSender implements Runnable + { + private int msgStart; + private int msgEnd; + private SimpleString id; + private CountDownLatch latch; + private boolean wait; + private int node; + + public ThreadSender(int msgStart, int msgEnd, int node, SimpleString= id, CountDownLatch latch, boolean wait) + { + this.msgStart =3D msgStart; + this.msgEnd =3D msgEnd; + this.node =3D node; + this.id =3D id; + this.latch =3D latch; + this.wait =3D wait; + } + + public void run() + { + if (wait) + { + try + { + latch.await(5, TimeUnit.SECONDS); + } + catch (InterruptedException e) + { + e.printStackTrace(); //To change body of catch statement u= se File | Settings | File Templates. + } + } + else + { + latch.countDown(); + } + try + { + sendInRange(node, "queues.testaddress", msgStart, msgEnd, fals= e, MessageImpl.HDR_GROUP_ID, id); + } + catch (Exception e) + { + e.printStackTrace(); //To change body of catch statement use = File | Settings | File Templates. + } + } + } } = --===============3975140103221713773==--