JBoss hornetq SVN: r8421 - branches/20-optimisation/tests/src/org/hornetq/tests/integration/paging.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-26 12:39:30 -0500 (Thu, 26 Nov 2009)
New Revision: 8421
Added:
branches/20-optimisation/tests/src/org/hornetq/tests/integration/paging/PagingSendTest.java
Log:
Adding test that fail with paging
Added: branches/20-optimisation/tests/src/org/hornetq/tests/integration/paging/PagingSendTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/integration/paging/PagingSendTest.java (rev 0)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/integration/paging/PagingSendTest.java 2009-11-26 17:39:30 UTC (rev 8421)
@@ -0,0 +1,159 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.paging;
+
+import org.hornetq.core.client.ClientConsumer;
+import org.hornetq.core.client.ClientMessage;
+import org.hornetq.core.client.ClientProducer;
+import org.hornetq.core.client.ClientSession;
+import org.hornetq.core.client.ClientSessionFactory;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.utils.SimpleString;
+
+/**
+ * A SendTest
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class PagingSendTest extends ServiceTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ public static final SimpleString ADDRESS = new SimpleString("SimpleAddress");
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ private HornetQServer newHornetQServer()
+ {
+ Configuration config = createDefaultConfig();
+
+ HornetQServer server = createServer(true, config);
+
+ AddressSettings defaultSetting = new AddressSettings();
+ defaultSetting.setPageSizeBytes(10 * 1024);
+ defaultSetting.setMaxSizeBytes(100 * 1024);
+
+ server.getAddressSettingsRepository().addMatch("#", defaultSetting);
+
+ return server;
+ }
+
+ public void testSameMessageOverAndOverBlocking() throws Exception
+ {
+ dotestSameMessageOverAndOver(true);
+ }
+
+ public void testSameMessageOverAndOverNonBlocking() throws Exception
+ {
+ dotestSameMessageOverAndOver(false);
+ }
+
+ public void dotestSameMessageOverAndOver(final boolean blocking) throws Exception
+ {
+ HornetQServer server = newHornetQServer();
+
+ server.start();
+
+ try
+ {
+ ClientSessionFactory sf = createInVMFactory();
+
+ // Making it synchronous, just because we want to stop sending messages as soon as the page-store becomes in
+ // page mode
+ // and we could only guarantee that by setting it to synchronous
+ sf.setBlockOnNonPersistentSend(blocking);
+ sf.setBlockOnPersistentSend(blocking);
+ sf.setBlockOnAcknowledge(blocking);
+
+ ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
+
+ session.createQueue(ADDRESS, ADDRESS, null, true);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ ClientMessage message = null;
+
+ message = session.createClientMessage(true);
+ message.getBodyBuffer().writeBytes(new byte[1024]);
+
+ for (int i = 0; i < 200; i++)
+ {
+ System.out.println("Sent " + i);
+ producer.send(message);
+ }
+
+ session.close();
+
+ assertTrue(server.getPostOffice().getPagingManager().getTotalMemory() > 0);
+
+ session = sf.createSession(null, null, false, true, true, false, 0);
+
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ session.start();
+
+ for (int i = 0; i < 200; i++)
+ {
+ ClientMessage message2 = consumer.receive(10000);
+
+ assertNotNull(message2);
+
+ if (i==100)
+ {
+ session.commit();
+ }
+
+ message2.acknowledge();
+ }
+
+ consumer.close();
+
+ session.close();
+
+ assertEquals(0, server.getPostOffice().getPagingManager().getTotalMemory());
+
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
15 years, 1 month
JBoss hornetq SVN: r8420 - trunk.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-11-26 12:26:51 -0500 (Thu, 26 Nov 2009)
New Revision: 8420
Modified:
trunk/build-thirdparty.xml
Log:
removed maven --debug flag
Modified: trunk/build-thirdparty.xml
===================================================================
--- trunk/build-thirdparty.xml 2009-11-26 16:40:19 UTC (rev 8419)
+++ trunk/build-thirdparty.xml 2009-11-26 17:26:51 UTC (rev 8420)
@@ -93,7 +93,7 @@
<target name="maven-install" description="Run the install goal against the maven build"
depends="maven-init">
- <property name="maven.opts" value="--debug"/>
+ <property name="maven.opts" value=""/>
<property name="maven.install.opts" value="-Dintegrated-build ${maven.opts}"/>
<maven basedir="${basedir}"
15 years, 1 month
JBoss hornetq SVN: r8419 - trunk/tests/config.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-11-26 11:40:19 -0500 (Thu, 26 Nov 2009)
New Revision: 8419
Modified:
trunk/tests/config/hornetq-jms-for-JMSServerDeployerTest.xml
trunk/tests/config/server-start-stop-jms-config1.xml
trunk/tests/config/server-start-stop-live-jms-config1.xml
Log:
fixed schema for connectors in test configs
Modified: trunk/tests/config/hornetq-jms-for-JMSServerDeployerTest.xml
===================================================================
--- trunk/tests/config/hornetq-jms-for-JMSServerDeployerTest.xml 2009-11-26 16:12:12 UTC (rev 8418)
+++ trunk/tests/config/hornetq-jms-for-JMSServerDeployerTest.xml 2009-11-26 16:40:19 UTC (rev 8419)
@@ -9,7 +9,9 @@
<entry name="java:/xyz/tfullConfigurationConnectionFactory"/>
<entry name="java:/connectionfactories/acme/fullConfigurationConnectionFactory"/>
</entries>
- <connector-ref connector-name="netty"/>
+ <connectors>
+ <connector-ref connector-name="netty"/>
+ </connectors>
<client-failure-check-period>1234</client-failure-check-period>
<call-timeout>5678</call-timeout>
<consumer-window-size>12345</consumer-window-size>
Modified: trunk/tests/config/server-start-stop-jms-config1.xml
===================================================================
--- trunk/tests/config/server-start-stop-jms-config1.xml 2009-11-26 16:12:12 UTC (rev 8418)
+++ trunk/tests/config/server-start-stop-jms-config1.xml 2009-11-26 16:40:19 UTC (rev 8419)
@@ -3,7 +3,9 @@
xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
<!--the connection factory used by the example-->
<connection-factory name="ConnectionFactory">
- <connector-ref connector-name="netty-connector"/>
+ <connectors>
+ <connector-ref connector-name="netty-connector"/>
+ </connectors>
<entries>
<entry name="ConnectionFactory"/>
</entries>
Modified: trunk/tests/config/server-start-stop-live-jms-config1.xml
===================================================================
--- trunk/tests/config/server-start-stop-live-jms-config1.xml 2009-11-26 16:12:12 UTC (rev 8418)
+++ trunk/tests/config/server-start-stop-live-jms-config1.xml 2009-11-26 16:40:19 UTC (rev 8419)
@@ -3,7 +3,9 @@
xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
<!--the connection factory used by the example-->
<connection-factory name="ConnectionFactory">
- <connector-ref connector-name="netty-connector"/>
+ <connectors>
+ <connector-ref connector-name="netty-connector"/>
+ </connectors>
<entries>
<entry name="ConnectionFactory"/>
</entries>
15 years, 1 month
JBoss hornetq SVN: r8418 - branches/20-optimisation/tests/src/org/hornetq/tests/integration/client.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-11-26 11:12:12 -0500 (Thu, 26 Nov 2009)
New Revision: 8418
Added:
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/SimpleSendMultipleQueues.java
Log:
added test
Added: branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/SimpleSendMultipleQueues.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/SimpleSendMultipleQueues.java (rev 0)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/SimpleSendMultipleQueues.java 2009-11-26 16:12:12 UTC (rev 8418)
@@ -0,0 +1,145 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.client;
+
+import org.hornetq.core.client.ClientConsumer;
+import org.hornetq.core.client.ClientMessage;
+import org.hornetq.core.client.ClientProducer;
+import org.hornetq.core.client.ClientSession;
+import org.hornetq.core.client.ClientSessionFactory;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.tests.util.RandomUtil;
+import org.hornetq.tests.util.ServiceTestBase;
+
+/**
+ *
+ * A SimpleSendMultipleQueues
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public class SimpleSendMultipleQueues extends ServiceTestBase
+{
+ private static final Logger log = Logger.getLogger(SimpleSendMultipleQueues.class);
+
+ public static final String address = "testaddress";
+
+ public static final String queueName = "testqueue";
+
+ private HornetQServer server;
+
+ private ClientSession session;
+
+ private ClientProducer producer;
+
+ private ClientConsumer consumer1;
+
+ private ClientConsumer consumer2;
+
+ private ClientConsumer consumer3;
+
+ public void test() throws Exception
+ {
+ for (int i = 0; i < 1000; i++)
+ {
+ ClientMessage message = session.createClientMessage(false);
+
+ final String body = RandomUtil.randomString();
+
+ message.getBodyBuffer().writeString(body);
+
+ producer.send(message);
+
+ ClientMessage received1 = consumer1.receive(1000);
+ assertNotNull(received1);
+ assertEquals(body, received1.getBodyBuffer().readString());
+
+ ClientMessage received2 = consumer2.receive(1000);
+ assertNotNull(received2);
+ assertEquals(body, received2.getBodyBuffer().readString());
+
+ ClientMessage received3 = consumer3.receive(1000);
+ assertNotNull(received3);
+ assertEquals(body, received3.getBodyBuffer().readString());
+ }
+ }
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ server = createServer(false, true);
+
+ server.start();
+
+ ClientSessionFactory cf = createFactory();
+
+ session = cf.createSession();
+
+ session.createQueue(address, "queue1");
+ session.createQueue(address, "queue2");
+ session.createQueue(address, "queue3");
+
+ producer = session.createProducer(address);
+
+ consumer1 = session.createConsumer("queue1");
+
+ consumer2 = session.createConsumer("queue2");
+
+ consumer3 = session.createConsumer("queue3");
+
+ session.start();
+ }
+
+ protected ClientSessionFactory createFactory()
+ {
+ return this.createNettyFactory();
+ }
+
+ protected void tearDown() throws Exception
+ {
+ if (session != null)
+ {
+ consumer1.close();
+
+ consumer2.close();
+
+ consumer3.close();
+
+ session.deleteQueue("queue1");
+ session.deleteQueue("queue2");
+ session.deleteQueue("queue3");
+
+ session.close();
+ }
+
+ if (server.isStarted())
+ {
+ server.stop();
+ }
+
+ super.tearDown();
+ }
+
+ private ClientMessage sendAndReceive(final ClientMessage message) throws Exception
+ {
+ producer.send(message);
+
+ ClientMessage received = consumer1.receive(10000);
+
+ return received;
+ }
+
+}
15 years, 1 month
JBoss hornetq SVN: r8417 - branches/20-optimisation/tests/src/org/hornetq/tests/integration/paging.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-11-26 11:08:54 -0500 (Thu, 26 Nov 2009)
New Revision: 8417
Modified:
branches/20-optimisation/tests/src/org/hornetq/tests/integration/paging/PageCrashTest.java
Log:
tweak
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/integration/paging/PageCrashTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/integration/paging/PageCrashTest.java 2009-11-26 14:50:41 UTC (rev 8416)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/integration/paging/PageCrashTest.java 2009-11-26 16:08:54 UTC (rev 8417)
@@ -148,11 +148,9 @@
ClientProducer producer = session.createProducer(ADDRESS);
- ClientMessage message = null;
-
- message = session.createClientMessage(true);
+ ClientMessage message = session.createClientMessage(true);
message.getBodyBuffer().writeBytes(new byte[1024]);
-
+
PagingStore store = server.getPostOffice().getPagingManager().getPageStore(ADDRESS);
int messages = 0;
15 years, 1 month
JBoss hornetq SVN: r8416 - branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/client/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-26 09:50:41 -0500 (Thu, 26 Nov 2009)
New Revision: 8416
Modified:
branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java
Log:
Fixing LargeMessageTest
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java 2009-11-26 14:31:48 UTC (rev 8415)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java 2009-11-26 14:50:41 UTC (rev 8416)
@@ -231,7 +231,7 @@
final LargeMessageBufferImpl buffer = new LargeMessageBufferImpl(new FakeConsumerInternal(), 10, 10);
- buffer.addPacket(new SessionReceiveContinuationMessage(-1, new byte[] { 0, 1, 2, 3, 4 }, true, true));
+ buffer.addPacket(new FakePacket(-1, new byte[] { 0, 1, 2, 3, 4 }, true, true));
byte bytes[] = new byte[30];
buffer.readBytes(bytes, 0, 5);
@@ -355,7 +355,7 @@
for (int i = 0; i < 3; i++)
{
- outBuffer.addPacket(new SessionReceiveContinuationMessage(-1, new byte[1024], true, false));
+ outBuffer.addPacket(new FakePacket(-1, new byte[1024], true, false));
}
outBuffer.setOutputStream(output);
@@ -389,12 +389,12 @@
for (int i = 0; i < 8; i++)
{
- outBuffer.addPacket(new SessionReceiveContinuationMessage(-1, new byte[1024], true, false));
+ outBuffer.addPacket(new FakePacket(-1, new byte[1024], true, false));
}
assertEquals(1, waiting.getCount());
- outBuffer.addPacket(new SessionReceiveContinuationMessage(-1, new byte[123], false, false));
+ outBuffer.addPacket(new FakePacket(-1, new byte[123], false, false));
assertTrue(done2.await(10, TimeUnit.SECONDS));
@@ -430,7 +430,7 @@
long start = System.currentTimeMillis();
final LargeMessageBufferImpl outBuffer = new LargeMessageBufferImpl(new FakeConsumerInternal(), 5, 30);
- outBuffer.addPacket(new SessionReceiveContinuationMessage(-1, new byte[] { 0, 1, 2, 3, 4 }, true, false));
+ outBuffer.addPacket(new FakePacket(-1, new byte[] { 0, 1, 2, 3, 4 }, true, false));
final CountDownLatch latchBytesWritten1 = new CountDownLatch(5);
final CountDownLatch latchBytesWritten2 = new CountDownLatch(10);
@@ -513,23 +513,39 @@
{
break;
}
+
+ SessionReceiveContinuationMessage packet = null;
if (size < splitFactor)
{
byte[] newSplit = new byte[size];
System.arraycopy(splitElement, 0, newSplit, 0, size);
- outBuffer.addPacket(new SessionReceiveContinuationMessage(1, newSplit, input.available() > 0, false));
+ packet = new FakePacket(1, newSplit, input.available() > 0, false);
}
else
{
- outBuffer.addPacket(new SessionReceiveContinuationMessage(1, splitElement, input.available() > 0, false));
+ packet = new FakePacket(1, splitElement, input.available() > 0, false);
}
+
+ outBuffer.addPacket(packet);
}
return outBuffer;
}
+
+ private class FakePacket extends SessionReceiveContinuationMessage
+ {
+ public FakePacket(final long consumerID,
+ final byte[] body,
+ final boolean continues,
+ final boolean requiresResponse)
+ {
+ super(consumerID, body, continues, requiresResponse);
+ this.size = 1;
+ }
+ }
/**
* @param bytes
15 years, 1 month
JBoss hornetq SVN: r8415 - in trunk: src/config/common/schema and 2 other directories.
by do-not-reply@jboss.org
Author: ataylor
Date: 2009-11-26 09:31:48 -0500 (Thu, 26 Nov 2009)
New Revision: 8415
Modified:
trunk/examples/jms/reconnect-same-node/server0/hornetq-jms.xml
trunk/src/config/common/schema/hornetq-jms.xsd
trunk/src/config/jboss-as/clustered/hornetq-jms.xml
trunk/src/config/jboss-as/non-clustered/hornetq-jms.xml
Log:
fixed schema for connectors and example configs
Modified: trunk/examples/jms/reconnect-same-node/server0/hornetq-jms.xml
===================================================================
--- trunk/examples/jms/reconnect-same-node/server0/hornetq-jms.xml 2009-11-26 14:01:04 UTC (rev 8414)
+++ trunk/examples/jms/reconnect-same-node/server0/hornetq-jms.xml 2009-11-26 14:31:48 UTC (rev 8415)
@@ -31,7 +31,9 @@
<!-- This is used by the example to send the management operations, it's not central to the example -->
<connection-factory name="ConnectionFactory2">
- <connector-ref connector-name="netty-connector2"/>
+ <connectors>
+ <connector-ref connector-name="netty-connector2"/>
+ </connectors>
<entries>
<entry name="ConnectionFactory2"/>
</entries>
Modified: trunk/src/config/common/schema/hornetq-jms.xsd
===================================================================
--- trunk/src/config/common/schema/hornetq-jms.xsd 2009-11-26 14:01:04 UTC (rev 8414)
+++ trunk/src/config/common/schema/hornetq-jms.xsd 2009-11-26 14:31:48 UTC (rev 8415)
@@ -26,7 +26,6 @@
<xsd:element name="connection-factory">
<xsd:complexType>
<xsd:all>
- <xsd:element name="connector-ref" type="connector-refType" maxOccurs="1" minOccurs="0"></xsd:element>
<xsd:element name="discovery-group-ref" type="discovery-group-refType" maxOccurs="1" minOccurs="0"></xsd:element>
<xsd:element name="discovery-initial-wait-timeout" type="xsd:long" maxOccurs="1" minOccurs="0"></xsd:element>
Modified: trunk/src/config/jboss-as/clustered/hornetq-jms.xml
===================================================================
--- trunk/src/config/jboss-as/clustered/hornetq-jms.xml 2009-11-26 14:01:04 UTC (rev 8414)
+++ trunk/src/config/jboss-as/clustered/hornetq-jms.xml 2009-11-26 14:31:48 UTC (rev 8415)
@@ -13,7 +13,9 @@
</connection-factory>
<connection-factory name="InVMConnectionFactory">
- <connector-ref connector-name="in-vm"/>
+ <connectors>
+ <connector-ref connector-name="in-vm"/>
+ </connectors>
<entries>
<entry name="java:/ConnectionFactory"/>
<entry name="java:/XAConnectionFactory"/>
Modified: trunk/src/config/jboss-as/non-clustered/hornetq-jms.xml
===================================================================
--- trunk/src/config/jboss-as/non-clustered/hornetq-jms.xml 2009-11-26 14:01:04 UTC (rev 8414)
+++ trunk/src/config/jboss-as/non-clustered/hornetq-jms.xml 2009-11-26 14:31:48 UTC (rev 8415)
@@ -13,7 +13,9 @@
</connection-factory>
<connection-factory name="InVMConnectionFactory">
- <connector-ref connector-name="in-vm"/>
+ <connectors>
+ <connector-ref connector-name="in-vm"/>
+ </connectors>
<entries>
<entry name="java:/ConnectionFactory"/>
<entry name="java:/XAConnectionFactory"/>
15 years, 1 month
JBoss hornetq SVN: r8414 - in branches/20-optimisation: tests/src/org/hornetq/tests/opt and 1 other directories.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-11-26 09:01:04 -0500 (Thu, 26 Nov 2009)
New Revision: 8414
Modified:
branches/20-optimisation/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
branches/20-optimisation/tests/src/org/hornetq/tests/opt/SendTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/journal/impl/TimedBufferTest.java
Log:
fixed timedbuffertest
Modified: branches/20-optimisation/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-11-26 13:30:41 UTC (rev 8413)
+++ branches/20-optimisation/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-11-26 14:01:04 UTC (rev 8414)
@@ -96,7 +96,7 @@
public TimedBuffer(final int size, final long timeout, final boolean flushOnSync, final boolean logRates)
{
- bufferSize = 490 * 1024;
+ this.bufferSize = size;
this.logRates = logRates;
if (logRates)
{
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/opt/SendTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/opt/SendTest.java 2009-11-26 13:30:41 UTC (rev 8413)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/opt/SendTest.java 2009-11-26 14:01:04 UTC (rev 8414)
@@ -60,7 +60,7 @@
{
try
{
- new SendTest().runConsume();
+ new SendTest().runTextMessage();
}
catch (Exception e)
{
@@ -74,9 +74,9 @@
{
log.info("*** Starting server");
- //System.setProperty("org.hornetq.opt.dontadd", "true");
+ System.setProperty("org.hornetq.opt.dontadd", "true");
// System.setProperty("org.hornetq.opt.routeblast", "true");
- System.setProperty("org.hornetq.opt.generatemessages", "true");
+ //System.setProperty("org.hornetq.opt.generatemessages", "true");
Configuration configuration = new ConfigurationImpl();
configuration.setSecurityEnabled(false);
@@ -285,25 +285,20 @@
prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- byte[] bytes1 = new byte[] { (byte)'A', (byte)'B',(byte)'C',(byte)'D'};
-
- String s = new String(bytes1);
-
- System.out.println("Str is " + s);
-
- byte[] bytes = RandomUtil.randomBytes(512);
+ byte[] bytes = RandomUtil.randomBytes(1024);
String str = new String(bytes);
log.info("Warming up");
- TextMessage tm = sess.createTextMessage();
-
- tm.setText(str);
-
+
for (int i = 0; i < warmup; i++)
{
+ TextMessage tm = sess.createTextMessage();
+
+ tm.setText(str);
+
prod.send(tm);
if (i % 10000 == 0)
@@ -314,14 +309,15 @@
log.info("** WARMUP DONE");
- tm = sess.createTextMessage();
-
- tm.setText(str);
+
long start = System.currentTimeMillis();
for (int i = 0; i < numMessages; i++)
{
+ TextMessage tm = sess.createTextMessage();
+
+ tm.setText(str);
prod.send(tm);
if (i % 10000 == 0)
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/journal/impl/TimedBufferTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/journal/impl/TimedBufferTest.java 2009-11-26 13:30:41 UTC (rev 8413)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/journal/impl/TimedBufferTest.java 2009-11-26 14:01:04 UTC (rev 8414)
@@ -38,6 +38,7 @@
// Attributes ----------------------------------------------------
// Static --------------------------------------------------------
+
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
15 years, 1 month
JBoss hornetq SVN: r8413 - in branches/20-optimisation: src/main/org/hornetq/core/client/impl and 8 other directories.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-11-26 08:30:41 -0500 (Thu, 26 Nov 2009)
New Revision: 8413
Modified:
branches/20-optimisation/build-hornetq.xml
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java
branches/20-optimisation/src/main/org/hornetq/core/server/impl/DivertImpl.java
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java
branches/20-optimisation/tests/src/org/hornetq/tests/integration/management/ManagementHelperTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/opt/SendTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/util/ServiceTestBase.java
Log:
fixed some tests and re-instated new HornetQDecoder
Modified: branches/20-optimisation/build-hornetq.xml
===================================================================
--- branches/20-optimisation/build-hornetq.xml 2009-11-26 13:19:27 UTC (rev 8412)
+++ branches/20-optimisation/build-hornetq.xml 2009-11-26 13:30:41 UTC (rev 8413)
@@ -1223,9 +1223,9 @@
haltonerror="${junit.batchtest.haltonerror}"
failureproperty="tests.failed">
<formatter type="plain" usefile="${junit.formatter.usefile}"/>
- <fileset dir="${test.classes.dir}">
- <!-- <exclude name="**/integration/http/*" /> -->
+ <fileset dir="${test.classes.dir}">
<include name="${tests.param}"/>
+ <exclude name="**/integration/cluster/reattach/Netty*" />
</fileset>
</batchtest>
</junit>
Modified: branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-11-26 13:19:27 UTC (rev 8412)
+++ branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-11-26 13:30:41 UTC (rev 8413)
@@ -697,7 +697,6 @@
log.trace("Setting up flowControlSize to " + message.getPacketSize() + " on message = " + clMessage);
}
- // log.info("setting flow control size as " + message.getPacketSize());
clMessage.setFlowControlSize(message.getPacketSize());
consumer.handleMessage(clMessage);
Modified: branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java 2009-11-26 13:19:27 UTC (rev 8412)
+++ branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java 2009-11-26 13:30:41 UTC (rev 8413)
@@ -259,9 +259,12 @@
public void setDestination(final SimpleString destination)
{
- this.destination = destination;
+ if (this.destination != destination)
+ {
+ this.destination = destination;
- bufferValid = false;
+ bufferValid = false;
+ }
}
public byte getType()
@@ -276,9 +279,12 @@
public void setDurable(final boolean durable)
{
- this.durable = durable;
+ if (this.durable != durable)
+ {
+ this.durable = durable;
- bufferValid = false;
+ bufferValid = false;
+ }
}
public long getExpiration()
@@ -288,9 +294,12 @@
public void setExpiration(final long expiration)
{
+ if (this.expiration != expiration)
+ {
this.expiration = expiration;
bufferValid = false;
+ }
}
public long getTimestamp()
@@ -300,9 +309,12 @@
public void setTimestamp(final long timestamp)
{
+ if (this.timestamp != timestamp)
+ {
this.timestamp = timestamp;
bufferValid = false;
+ }
}
public byte getPriority()
@@ -312,9 +324,12 @@
public void setPriority(final byte priority)
{
- this.priority = priority;
+ if (this.priority != priority)
+ {
+ this.priority = priority;
- bufferValid = false;
+ bufferValid = false;
+ }
}
public boolean isExpired()
Modified: branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java 2009-11-26 13:19:27 UTC (rev 8412)
+++ branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java 2009-11-26 13:30:41 UTC (rev 8413)
@@ -387,7 +387,7 @@
{
lastReceivedCommandID++;
- receivedBytes += packet.getPacketSize();
+ receivedBytes += packet.getPacketSize();
if (receivedBytes >= confWindowSize)
{
Modified: branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java 2009-11-26 13:19:27 UTC (rev 8412)
+++ branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java 2009-11-26 13:30:41 UTC (rev 8413)
@@ -111,6 +111,7 @@
requiresResponse = buffer.readBoolean();
buffer.readerIndex(ri);
+
}
// Private -------------------------------------------------------
Modified: branches/20-optimisation/src/main/org/hornetq/core/server/impl/DivertImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/server/impl/DivertImpl.java 2009-11-26 13:19:27 UTC (rev 8412)
+++ branches/20-optimisation/src/main/org/hornetq/core/server/impl/DivertImpl.java 2009-11-26 13:30:41 UTC (rev 8413)
@@ -81,9 +81,7 @@
// We must make a copy of the message, otherwise things like returning credits to the page won't work
// properly on ack, since the original destination will be overwritten
- // TODO we can optimise this so it doesn't copy if it's not routed anywhere else
-
- log.info("making copy for divert");
+ // TODO we can optimise this so it doesn't copy if it's not routed anywhere else
ServerMessage copy = message.copy();
Modified: branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-11-26 13:19:27 UTC (rev 8412)
+++ branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-11-26 13:30:41 UTC (rev 8413)
@@ -185,10 +185,8 @@
public HandleStatus handle(final MessageReference ref) throws Exception
{
- //log.info("handling message");
if (availableCredits != null && availableCredits.get() <= 0)
{
- // log.info("busy");
return HandleStatus.BUSY;
}
@@ -418,7 +416,6 @@
public void receiveCredits(final int credits) throws Exception
{
- // log.info("Receiving credits " + credits);
if (credits == -1)
{
// No flow control
@@ -592,12 +589,9 @@
if (availableCredits != null)
{
- //log.info("Subtracting credits " + packet.getPacketSize());
availableCredits.addAndGet(-packet.getPacketSize());
}
- // log.info("delivered message");
-
}
// Inner classes
Modified: branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2009-11-26 13:19:27 UTC (rev 8412)
+++ branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2009-11-26 13:30:41 UTC (rev 8413)
@@ -83,11 +83,6 @@
messageID = id;
}
- public void setType(final byte type)
- {
- this.type = type;
- }
-
public MessageReference createReference(final Queue queue)
{
MessageReference ref = new MessageReferenceImpl(this, queue);
Modified: branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-11-26 13:19:27 UTC (rev 8412)
+++ branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-11-26 13:30:41 UTC (rev 8413)
@@ -1574,8 +1574,6 @@
final CreditManagerHolder holder = this.getCreditManagerHolder(address);
int credits = packet.getCredits();
-
- //log.info("requesting credits " + credits);
int gotCredits = holder.manager.acquireCredits(credits, new CreditsAvailableRunnable()
{
@@ -1597,8 +1595,6 @@
}
});
- //log.info("got credits " + gotCredits);
-
if (gotCredits > 0)
{
sendProducerCredits(holder, gotCredits, address);
Modified: branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java 2009-11-26 13:19:27 UTC (rev 8412)
+++ branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java 2009-11-26 13:30:41 UTC (rev 8413)
@@ -47,7 +47,7 @@
public static void addCodecFilter(final ChannelPipeline pipeline, final BufferHandler handler)
{
assert pipeline != null;
- pipeline.addLast("decoder", new HornetQFrameDecoder(handler));
+ pipeline.addLast("decoder", new HornetQFrameDecoder2());
}
public static void addSSLFilter(final ChannelPipeline pipeline, final SSLContext context, final boolean client) throws Exception
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/integration/management/ManagementHelperTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/integration/management/ManagementHelperTest.java 2009-11-26 13:19:27 UTC (rev 8412)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/integration/management/ManagementHelperTest.java 2009-11-26 13:30:41 UTC (rev 8413)
@@ -60,7 +60,7 @@
String operationName = randomString();
String param = randomString();
String[] params = new String[] { randomString(), randomString(), randomString() };
- Message msg = new ClientMessageImpl();
+ Message msg = new ClientMessageImpl((byte)0, false, 0, 0, (byte)4, 1000);
ManagementHelper.putOperationInvocation(msg, resource, operationName, param, params);
Object[] parameters = ManagementHelper.retrieveOperationParameters(msg);
@@ -148,7 +148,7 @@
Object[] params = new Object[] { i, s, d, b, l, map, strArray, maps };
- Message msg = new ClientMessageImpl();
+ Message msg = new ClientMessageImpl((byte)0, false, 0, 0, (byte)4, 1000);
ManagementHelper.putOperationInvocation(msg, resource, operationName, params);
Object[] parameters = ManagementHelper.retrieveOperationParameters(msg);
@@ -214,7 +214,7 @@
Object[] params = new Object[] { "hello", map };
- Message msg = new ClientMessageImpl();
+ Message msg = new ClientMessageImpl((byte)0, false, 0, 0, (byte)4, 1000);
ManagementHelper.putOperationInvocation(msg, resource, operationName, params);
Object[] parameters = ManagementHelper.retrieveOperationParameters(msg);
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/opt/SendTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/opt/SendTest.java 2009-11-26 13:19:27 UTC (rev 8412)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/opt/SendTest.java 2009-11-26 13:30:41 UTC (rev 8413)
@@ -15,11 +15,14 @@
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
import javax.jms.MessageProducer;
-import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
@@ -38,6 +41,7 @@
import org.hornetq.integration.transports.netty.TransportConstants;
import org.hornetq.jms.HornetQQueue;
import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.jms.client.HornetQMessage;
import org.hornetq.jms.client.HornetQSession;
import org.hornetq.tests.util.RandomUtil;
@@ -56,7 +60,7 @@
{
try
{
- new SendTest().runTextMessage();
+ new SendTest().runConsume();
}
catch (Exception e)
{
@@ -70,8 +74,9 @@
{
log.info("*** Starting server");
- System.setProperty("org.hornetq.opt.dontadd", "true");
+ //System.setProperty("org.hornetq.opt.dontadd", "true");
// System.setProperty("org.hornetq.opt.routeblast", "true");
+ System.setProperty("org.hornetq.opt.generatemessages", "true");
Configuration configuration = new ConfigurationImpl();
configuration.setSecurityEnabled(false);
@@ -210,6 +215,134 @@
server.stop();
}
+ public void runSendConsume() throws Exception
+ {
+ startServer();
+
+ Map<String, Object> params = new HashMap<String, Object>();
+
+ // params.put(TransportConstants.HOST_PROP_NAME, "localhost");
+
+ // params.put(TransportConstants.PORT_PROP_NAME, 5445);
+
+ params.put(TransportConstants.TCP_NODELAY_PROPNAME, Boolean.FALSE);
+ //params.put(TransportConstants.USE_NIO_PROP_NAME, Boolean.FALSE);
+
+ TransportConfiguration tc = new TransportConfiguration(NettyConnectorFactory.class.getCanonicalName(), params);
+
+ //TransportConfiguration tc = new TransportConfiguration(InVMConnectorFactory.class.getCanonicalName(), params);
+
+ HornetQConnectionFactory cf = new HornetQConnectionFactory(tc);
+
+ cf.setProducerWindowSize(1024 * 1024);
+
+ Connection conn = cf.createConnection();
+
+ Session sess = conn.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
+
+ ClientSession coreSession = ((HornetQSession)sess).getCoreSession();
+
+ coreSession.createQueue("jms.queue.test_queue", "jms.queue.test_queue");
+
+ Queue queue = new HornetQQueue("test_queue");
+
+ MessageConsumer cons = sess.createConsumer(queue);
+
+ conn.start();
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ final int warmup = 500000;
+
+ final int numMessages = 2000000;
+
+ MessageListener listener = new MessageListener()
+ {
+ int count;
+ public void onMessage(Message message)
+ {
+ count++;
+
+ if (count % 10000 == 0)
+ {
+ log.info("received " + count);
+ }
+
+ if (count == numMessages + warmup)
+ {
+ latch.countDown();
+ }
+ }
+ };
+
+ cons.setMessageListener(listener);
+
+ MessageProducer prod = sess.createProducer(queue);
+
+ prod.setDisableMessageID(true);
+
+ prod.setDisableMessageTimestamp(true);
+
+ prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+ byte[] bytes1 = new byte[] { (byte)'A', (byte)'B',(byte)'C',(byte)'D'};
+
+ String s = new String(bytes1);
+
+ System.out.println("Str is " + s);
+
+ byte[] bytes = RandomUtil.randomBytes(512);
+
+ String str = new String(bytes);
+
+
+ log.info("Warming up");
+
+ TextMessage tm = sess.createTextMessage();
+
+ tm.setText(str);
+
+ for (int i = 0; i < warmup; i++)
+ {
+ prod.send(tm);
+
+ if (i % 10000 == 0)
+ {
+ log.info("sent " + i);
+ }
+ }
+
+ log.info("** WARMUP DONE");
+
+ tm = sess.createTextMessage();
+
+ tm.setText(str);
+
+ long start = System.currentTimeMillis();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ prod.send(tm);
+
+ if (i % 10000 == 0)
+ {
+ log.info("sent " + i);
+ }
+ }
+
+ latch.countDown();
+
+ sess.close();
+
+ long end = System.currentTimeMillis();
+
+ double rate = 1000 * (double)numMessages / (end - start);
+
+ System.out.println("Rate of " + rate + " msgs / sec");
+
+ server.stop();
+ }
+
public void runObjectMessage() throws Exception
{
startServer();
@@ -259,18 +392,24 @@
log.info("sending messages");
+
+
for (int i = 0; i < warmup; i++)
{
- ObjectMessage om = sess.createObjectMessage(str);
+// ObjectMessage om = sess.createObjectMessage(str);
+//
+// prod.send(om);
- prod.send(om);
+ TextMessage tm = sess.createTextMessage(str);
+
+ prod.send(tm);
if (i % 10000 == 0)
{
log.info("sent " + i);
}
- om.setObject(str);
+ //om.setObject(str);
}
log.info("** WARMUP DONE");
@@ -279,18 +418,24 @@
long start = System.currentTimeMillis();
+
+
for (int i = 0; i < numMessages; i++)
- {
- ObjectMessage om = sess.createObjectMessage(str);
+ {
+// ObjectMessage om = sess.createObjectMessage(str);
+//
+// prod.send(om);
- prod.send(om);
+ TextMessage tm = sess.createTextMessage(str);
+
+ prod.send(tm);
if (i % 10000 == 0)
{
log.info("sent " + i);
}
- om.setObject(str);
+ //om.setObject(str);
}
long end = System.currentTimeMillis();
@@ -302,4 +447,120 @@
server.stop();
}
+ public void runConsume() throws Exception
+ {
+ startServer();
+
+ Map<String, Object> params = new HashMap<String, Object>();
+
+ // params.put(TransportConstants.HOST_PROP_NAME, "localhost");
+
+ // params.put(TransportConstants.PORT_PROP_NAME, 5445);
+
+ params.put(TransportConstants.TCP_NODELAY_PROPNAME, Boolean.FALSE);
+ //params.put(TransportConstants.USE_NIO_PROP_NAME, Boolean.FALSE);
+
+ TransportConfiguration tc = new TransportConfiguration(NettyConnectorFactory.class.getCanonicalName(), params);
+
+ //TransportConfiguration tc = new TransportConfiguration(InVMConnectorFactory.class.getCanonicalName(), params);
+
+ HornetQConnectionFactory cf = new HornetQConnectionFactory(tc);
+
+ Connection conn = cf.createConnection();
+
+ Session sess = conn.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
+
+ ClientSession coreSession = ((HornetQSession)sess).getCoreSession();
+
+ coreSession.createQueue("jms.queue.test_queue", "jms.queue.test_queue");
+
+ Queue queue = new HornetQQueue("test_queue");
+
+ MessageConsumer cons = sess.createConsumer(queue);
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ final int warmup = 50000;
+
+ final int numMessages = 2000000;
+
+ MessageListener listener = new MessageListener()
+ {
+ int count;
+ long start;
+ public void onMessage(Message message)
+ {
+ count++;
+
+ log.info("got message " + ((HornetQMessage)message).getCoreMessage().getMessageID());
+
+ if (count == warmup)
+ {
+ log.info("** WARMED UP");
+
+ start = System.currentTimeMillis();
+ }
+
+ if (count % 10000 == 0)
+ {
+ log.info("received " + count);
+ }
+
+ if (count == numMessages + warmup)
+ {
+ long end = System.currentTimeMillis();
+
+ double rate = 1000 * (double)numMessages / (end - start);
+
+ System.out.println("Rate of " + rate + " msgs / sec");
+
+ latch.countDown();
+ }
+ }
+ };
+
+ cons.setMessageListener(listener);
+
+ MessageProducer prod = sess.createProducer(queue);
+
+ prod.setDisableMessageID(true);
+
+ prod.setDisableMessageTimestamp(true);
+
+ prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+ byte[] bytes = RandomUtil.randomBytes(1);
+
+ String str = new String(bytes);
+
+
+ //Load up the queue with messages
+
+ TextMessage tm = sess.createTextMessage();
+
+ tm.setText(str);
+
+ log.info("loading queue with messages");
+
+ for (int i = 0; i < numMessages + warmup; i++)
+ {
+ prod.send(tm);
+
+ if (i % 10000 == 0)
+ {
+ log.info("sent " + i);
+ }
+ }
+
+ log.info("** loaded queue");
+
+ conn.start();
+
+ latch.await();
+
+ sess.close();
+
+ server.stop();
+ }
+
}
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2009-11-26 13:19:27 UTC (rev 8412)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2009-11-26 13:30:41 UTC (rev 8413)
@@ -353,7 +353,7 @@
public String getTextMessage(ClientMessage m)
{
- //m.getBodyBuffer().resetReaderIndex();
+ m.getBodyBuffer().resetReaderIndex();
return m.getBodyBuffer().readString();
}
15 years, 1 month
JBoss hornetq SVN: r8412 - in trunk: src/main/org/hornetq/core/management/impl and 7 other directories.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-11-26 08:19:27 -0500 (Thu, 26 Nov 2009)
New Revision: 8412
Added:
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterWithBackupTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/NettyFileStorageSymmetricClusterWithBackupTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/NettySymmetricClusterWithBackupTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java
Modified:
trunk/src/main/org/hornetq/core/management/AcceptorControl.java
trunk/src/main/org/hornetq/core/management/impl/AcceptorControlImpl.java
trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
trunk/src/main/org/hornetq/core/remoting/spi/Acceptor.java
trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
Log:
re-added cluster with backup tests
* modified RemotingService.stop() sequence to ensure there are no more connections created after the acceptors are paused
* in ClusterConnectionImpl.createNewRecord(), do not failover the record's bridge when the live server is shut down
* removed Acceptor.resume() method (pause() method is only used as step 1 in the 2 steps to stop the remoting service)
Modified: trunk/src/main/org/hornetq/core/management/AcceptorControl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/AcceptorControl.java 2009-11-26 12:46:00 UTC (rev 8411)
+++ trunk/src/main/org/hornetq/core/management/AcceptorControl.java 2009-11-26 13:19:27 UTC (rev 8412)
@@ -28,8 +28,4 @@
String getFactoryClassName();
Map<String, Object> getParameters();
-
- void pause() throws Exception;
-
- void resume() throws Exception;
}
Modified: trunk/src/main/org/hornetq/core/management/impl/AcceptorControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/AcceptorControlImpl.java 2009-11-26 12:46:00 UTC (rev 8411)
+++ trunk/src/main/org/hornetq/core/management/impl/AcceptorControlImpl.java 2009-11-26 13:19:27 UTC (rev 8412)
@@ -78,20 +78,10 @@
acceptor.start();
}
- public void pause()
- {
- acceptor.pause();
- }
-
public void stop() throws Exception
{
acceptor.stop();
}
-
- public void resume() throws Exception
- {
- acceptor.resume();
- }
// Public --------------------------------------------------------
Modified: trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java 2009-11-26 12:46:00 UTC (rev 8411)
+++ trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java 2009-11-26 13:19:27 UTC (rev 8412)
@@ -156,18 +156,6 @@
paused = true;
}
- public synchronized void resume()
- {
- if (!paused || !started)
- {
- return;
- }
-
- InVMRegistry.instance.registerAcceptor(id, this);
-
- paused = false;
- }
-
public void setNotificationService(NotificationService notificationService)
{
this.notificationService = notificationService;
Modified: trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2009-11-26 12:46:00 UTC (rev 8411)
+++ trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2009-11-26 13:19:27 UTC (rev 8412)
@@ -15,7 +15,6 @@
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.DISCONNECT;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -38,8 +37,6 @@
import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.impl.AbstractBufferHandler;
import org.hornetq.core.remoting.impl.RemotingConnectionImpl;
-import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
-import org.hornetq.core.remoting.impl.invm.TransportConstants;
import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
import org.hornetq.core.remoting.impl.wireformat.Ping;
import org.hornetq.core.remoting.server.RemotingService;
@@ -212,8 +209,8 @@
}
}
}
-
- public synchronized void stop() throws Exception
+
+ public void stop() throws Exception
{
if (!started)
{
@@ -228,26 +225,31 @@
acceptor.pause();
}
- for (ConnectionEntry entry : connections.values())
+ synchronized (server)
{
- entry.connection.getChannel(0, -1).sendAndFlush(new PacketImpl(DISCONNECT));
- }
+ for (ConnectionEntry entry : connections.values())
+ {
+ entry.connection.getChannel(0, -1).sendAndFlush(new PacketImpl(DISCONNECT));
+ }
- for (Acceptor acceptor : acceptors)
- {
- acceptor.stop();
- }
+ for (Acceptor acceptor : acceptors)
+ {
+ acceptor.stop();
+ }
- acceptors.clear();
+ acceptors.clear();
- connections.clear();
+ connections.clear();
- if (managementService != null)
- {
- managementService.unregisterAcceptors();
+ if (managementService != null)
+ {
+ managementService.unregisterAcceptors();
+ }
+
+ started = false;
+
}
- started = false;
}
public boolean isStarted()
@@ -297,9 +299,7 @@
RemotingConnection rc = new RemotingConnectionImpl(connection,
interceptors,
- server.getConfiguration().isAsyncConnectionExecutionEnabled() ? server.getExecutorFactory()
- .getExecutor()
- : null);
+ config.isAsyncConnectionExecutionEnabled() ? server.getExecutorFactory().getExecutor() : null);
Channel channel1 = rc.getChannel(1, -1);
Modified: trunk/src/main/org/hornetq/core/remoting/spi/Acceptor.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/spi/Acceptor.java 2009-11-26 12:46:00 UTC (rev 8411)
+++ trunk/src/main/org/hornetq/core/remoting/spi/Acceptor.java 2009-11-26 13:19:27 UTC (rev 8412)
@@ -27,7 +27,5 @@
{
void pause();
- void resume();
-
void setNotificationService(NotificationService notificationService);
}
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2009-11-26 12:46:00 UTC (rev 8411)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2009-11-26 13:19:27 UTC (rev 8412)
@@ -438,7 +438,7 @@
retryInterval,
1d,
-1,
- true,
+ false,
useDuplicateDetection,
confirmationWindowSize,
managementService.getManagementAddress(),
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-11-26 12:46:00 UTC (rev 8411)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-11-26 13:19:27 UTC (rev 8412)
@@ -349,9 +349,13 @@
{
session.getChannel().flushConfirmations();
}
+ }
- remotingService.stop();
-
+ // we stop the remoting service outside a lock
+ remotingService.stop();
+
+ synchronized (this)
+ {
// Stop the deployers
if (configuration.isFileDeploymentEnabled())
{
Modified: trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
===================================================================
--- trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java 2009-11-26 12:46:00 UTC (rev 8411)
+++ trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java 2009-11-26 13:19:27 UTC (rev 8412)
@@ -358,6 +358,8 @@
httpKeepAliveRunnable.close();
}
+ // serverChannelGroup has been unbound in pause()
+ serverChannelGroup.close().awaitUninterruptibly();
ChannelGroupFuture future = channelGroup.close().awaitUninterruptibly();
if (!future.isCompleteSuccess())
@@ -412,7 +414,7 @@
private boolean paused;
- public synchronized void pause()
+ public void pause()
{
if (paused)
{
@@ -425,32 +427,25 @@
}
// We *pause* the acceptor so no new connections are made
-
- serverChannelGroup.close().awaitUninterruptibly();
-
- try
+ ChannelGroupFuture future = serverChannelGroup.unbind().awaitUninterruptibly();
+ if (!future.isCompleteSuccess())
{
- Thread.sleep(500);
+ log.warn("server channel group did not completely unbind");
+ Iterator<Channel> iterator = future.getGroup().iterator();
+ while (iterator.hasNext())
+ {
+ Channel channel = (Channel)iterator.next();
+ if (channel.isBound())
+ {
+ log.warn(channel + " is still bound to " + channel.getRemoteAddress());
+ }
+ }
}
- catch (Exception e)
- {
- }
paused = true;
}
- public synchronized void resume()
- {
- if (!paused)
- {
- return;
- }
-
- startServerChannels();
-
- paused = false;
- }
-
+
public void setNotificationService(final NotificationService notificationService)
{
this.notificationService = notificationService;
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2009-11-26 12:46:00 UTC (rev 8411)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2009-11-26 13:19:27 UTC (rev 8412)
@@ -63,7 +63,7 @@
*
*
*/
-public class ClusterTestBase extends ServiceTestBase
+public abstract class ClusterTestBase extends ServiceTestBase
{
private static final Logger log = Logger.getLogger(ClusterTestBase.class);
@@ -799,7 +799,7 @@
ClientMessage message;
do
{
- message = holder.consumer.receive(200);
+ message = holder.consumer.receive(1000);
if (message != null)
{
@@ -809,7 +809,7 @@
if (prevCount != null)
{
- assertTrue(count == prevCount + consumerIDs.length);
+ assertEquals(prevCount + consumerIDs.length, count);
}
assertFalse(counts.contains(count));
@@ -835,7 +835,7 @@
for (int i = 0; i < numMessages; i++)
{
- assertTrue(counts.contains(i));
+ assertTrue("did not receive message " + i, counts.contains(i));
}
}
@@ -1607,5 +1607,4 @@
}
}
}
-
}
Added: trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterWithBackupTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterWithBackupTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterWithBackupTest.java 2009-11-26 13:19:27 UTC (rev 8412)
@@ -0,0 +1,144 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.hornetq.tests.integration.cluster.distribution;
+
+import org.hornetq.core.logging.Logger;
+
+/**
+ *
+ * A ClusterWithBackupTest
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ *
+ * Created 9 Mar 2009 16:31:21
+ *
+ *
+ */
+public class ClusterWithBackupTest extends ClusterTestBase
+{
+ private static final Logger log = Logger.getLogger(ClusterWithBackupTest.class);
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ setupServers();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ stopServers();
+
+ super.tearDown();
+ }
+
+ protected boolean isNetty()
+ {
+ return false;
+ }
+
+ protected boolean isFileStorage()
+ {
+ return false;
+ }
+
+ public void testBasicRoundRobin() throws Exception
+ {
+ setupCluster();
+
+ startServers(0, 1, 2, 3, 4, 5);
+
+ setupSessionFactory(3, isNetty());
+ setupSessionFactory(4, isNetty());
+ setupSessionFactory(5, isNetty());
+
+ createQueue(3, "queues.testaddress", "queue0", null, false);
+ createQueue(4, "queues.testaddress", "queue0", null, false);
+ createQueue(5, "queues.testaddress", "queue0", null, false);
+
+ addConsumer(0, 3, "queue0", null);
+ addConsumer(1, 4, "queue0", null);
+ addConsumer(2, 5, "queue0", null);
+
+ waitForBindings(3, "queues.testaddress", 1, 1, true);
+ waitForBindings(4, "queues.testaddress", 1, 1, true);
+ waitForBindings(5, "queues.testaddress", 1, 1, true);
+
+ waitForBindings(3, "queues.testaddress", 2, 2, false);
+ waitForBindings(4, "queues.testaddress", 2, 2, false);
+ waitForBindings(5, "queues.testaddress", 2, 2, false);
+
+ send(3, "queues.testaddress", 100, false, null);
+
+ verifyReceiveRoundRobinInSomeOrder(100, 0, 1, 2);
+
+ verifyNotReceive(0, 0, 1, 2);
+ }
+
+ protected void setupCluster() throws Exception
+ {
+ setupCluster(false);
+ }
+
+ protected void setupCluster(final boolean forwardWhenNoConsumers) throws Exception
+ {
+ setupClusterConnection("cluster0", "queues", forwardWhenNoConsumers, 1, isNetty(), 3, 4, 5);
+
+ setupClusterConnection("cluster1", "queues", forwardWhenNoConsumers, 1, isNetty(), 4, 3, 5);
+
+ setupClusterConnection("cluster2", "queues", forwardWhenNoConsumers, 1, isNetty(), 5, 3, 4);
+
+
+ setupClusterConnection("cluster0", "queues", forwardWhenNoConsumers, 1, isNetty(), 0, 4, 5);
+
+ setupClusterConnection("cluster1", "queues", forwardWhenNoConsumers, 1, isNetty(), 1, 3, 5);
+
+ setupClusterConnection("cluster2", "queues", forwardWhenNoConsumers, 1, isNetty(), 2, 3, 4);
+ }
+
+ protected void setupServers() throws Exception
+ {
+ //The backups
+ setupServer(0, isFileStorage(), isNetty(), true);
+ setupServer(1, isFileStorage(), isNetty(), true);
+ setupServer(2, isFileStorage(), isNetty(), true);
+
+ //The lives
+ setupServer(3, isFileStorage(), isNetty(), 0);
+ setupServer(4, isFileStorage(), isNetty(), 1);
+ setupServer(5, isFileStorage(), isNetty(), 2);
+
+ }
+
+ protected void stopServers() throws Exception
+ {
+ closeAllConsumers();
+
+ closeAllSessionFactories();
+
+ stopServers(3, 4, 5, 0, 1, 2);
+ }
+
+}
Added: trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/NettyFileStorageSymmetricClusterWithBackupTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/NettyFileStorageSymmetricClusterWithBackupTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/NettyFileStorageSymmetricClusterWithBackupTest.java 2009-11-26 13:19:27 UTC (rev 8412)
@@ -0,0 +1,45 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.hornetq.tests.integration.cluster.distribution;
+
+/**
+ * A NettyFileStorageSymmetricClusterWithBackupTest
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public class NettyFileStorageSymmetricClusterWithBackupTest extends SymmetricClusterWithBackupTest
+{
+ protected boolean isNetty()
+ {
+ return true;
+ }
+
+ protected boolean isFileStorage()
+ {
+ return true;
+ }
+
+}
Added: trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/NettySymmetricClusterWithBackupTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/NettySymmetricClusterWithBackupTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/NettySymmetricClusterWithBackupTest.java 2009-11-26 13:19:27 UTC (rev 8412)
@@ -0,0 +1,44 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.hornetq.tests.integration.cluster.distribution;
+
+/**
+ * A NettySymmetricClusterWithBackupTest
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public class NettySymmetricClusterWithBackupTest extends SymmetricClusterWithBackupTest
+{
+ protected boolean isNetty()
+ {
+ return true;
+ }
+
+ protected boolean isFileStorage()
+ {
+ return false;
+ }
+}
Added: trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java 2009-11-26 13:19:27 UTC (rev 8412)
@@ -0,0 +1,828 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.hornetq.tests.integration.cluster.distribution;
+
+import org.hornetq.core.logging.Logger;
+
+/**
+ * A SymmetricClusterWithBackupTest
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ *
+ * Created 13 Mar 2009 11:00:31
+ *
+ *
+ */
+public class SymmetricClusterWithBackupTest extends SymmetricClusterTest
+{
+ private static final Logger log = Logger.getLogger(SymmetricClusterWithBackupTest.class);
+
+ public void testStopAllStartAll() throws Throwable
+ {
+ try
+ {
+ setupCluster();
+
+ startServers();
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+ setupSessionFactory(3, isNetty());
+ setupSessionFactory(4, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+ createQueue(1, "queues.testaddress", "queue0", null, false);
+ createQueue(2, "queues.testaddress", "queue0", null, false);
+ createQueue(3, "queues.testaddress", "queue0", null, false);
+ createQueue(4, "queues.testaddress", "queue0", null, false);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 1, "queue0", null);
+ addConsumer(2, 2, "queue0", null);
+ addConsumer(3, 3, "queue0", null);
+ addConsumer(4, 4, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 1, true);
+ waitForBindings(3, "queues.testaddress", 1, 1, true);
+ waitForBindings(4, "queues.testaddress", 1, 1, true);
+
+ waitForBindings(0, "queues.testaddress", 4, 4, false);
+ waitForBindings(1, "queues.testaddress", 4, 4, false);
+ waitForBindings(2, "queues.testaddress", 4, 4, false);
+ waitForBindings(3, "queues.testaddress", 4, 4, false);
+ waitForBindings(4, "queues.testaddress", 4, 4, false);
+
+ System.out.println("waited for all bindings");
+
+ send(0, "queues.testaddress", 10, false, null);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2, 3, 4);
+
+ verifyNotReceive(0, 1, 2, 3, 4);
+
+ closeAllConsumers();
+
+ closeAllSessionFactories();
+
+ stopServers();
+
+ startServers();
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+ setupSessionFactory(3, isNetty());
+ setupSessionFactory(4, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+ createQueue(1, "queues.testaddress", "queue0", null, false);
+ createQueue(2, "queues.testaddress", "queue0", null, false);
+ createQueue(3, "queues.testaddress", "queue0", null, false);
+ createQueue(4, "queues.testaddress", "queue0", null, false);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 1, "queue0", null);
+ addConsumer(2, 2, "queue0", null);
+ addConsumer(3, 3, "queue0", null);
+ addConsumer(4, 4, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 1, true);
+ waitForBindings(3, "queues.testaddress", 1, 1, true);
+ waitForBindings(4, "queues.testaddress", 1, 1, true);
+
+ waitForBindings(0, "queues.testaddress", 4, 4, false);
+ waitForBindings(1, "queues.testaddress", 4, 4, false);
+ waitForBindings(2, "queues.testaddress", 4, 4, false);
+ waitForBindings(3, "queues.testaddress", 4, 4, false);
+ waitForBindings(4, "queues.testaddress", 4, 4, false);
+
+ send(0, "queues.testaddress", 10, false, null);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2, 3, 4);
+
+ this.verifyNotReceive(0, 1, 2, 3, 4);
+
+
+ closeAllConsumers();
+
+ closeAllSessionFactories();
+ }
+ catch (Throwable e)
+ {
+ System.out.println(threadDump("SymmetricClusterWithBackupTest::testStopAllStartAll"));
+ throw e;
+ }
+ }
+
+ @Override
+ public void testMixtureLoadBalancedAndNonLoadBalancedQueuesAddQueuesAndConsumersBeforeAllServersAreStarted() throws Exception
+ {
+ setupCluster();
+
+ startServers(5, 0);
+
+ setupSessionFactory(0, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+ createQueue(0, "queues.testaddress", "queue5", null, false);
+ createQueue(0, "queues.testaddress", "queue10", null, false);
+ createQueue(0, "queues.testaddress", "queue15", null, false);
+ createQueue(0, "queues.testaddress", "queue17", null, false);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(5, 0, "queue5", null);
+
+ startServers(6, 1);
+ setupSessionFactory(1, isNetty());
+
+ createQueue(1, "queues.testaddress", "queue1", null, false);
+ createQueue(1, "queues.testaddress", "queue6", null, false);
+ createQueue(1, "queues.testaddress", "queue11", null, false);
+ createQueue(1, "queues.testaddress", "queue15", null, false);
+ createQueue(1, "queues.testaddress", "queue17", null, false);
+
+ addConsumer(1, 1, "queue1", null);
+ addConsumer(6, 1, "queue6", null);
+ addConsumer(11, 1, "queue11", null);
+ addConsumer(16, 1, "queue15", null);
+
+ startServers(7, 2);
+ setupSessionFactory(2, isNetty());
+
+ createQueue(2, "queues.testaddress", "queue2", null, false);
+ createQueue(2, "queues.testaddress", "queue7", null, false);
+ createQueue(2, "queues.testaddress", "queue12", null, false);
+ createQueue(2, "queues.testaddress", "queue15", null, false);
+ createQueue(2, "queues.testaddress", "queue16", null, false);
+
+ addConsumer(2, 2, "queue2", null);
+ addConsumer(7, 2, "queue7", null);
+ addConsumer(12, 2, "queue12", null);
+ addConsumer(17, 2, "queue15", null);
+
+ startServers(8, 3);
+ setupSessionFactory(3, isNetty());
+
+ createQueue(3, "queues.testaddress", "queue3", null, false);
+ createQueue(3, "queues.testaddress", "queue8", null, false);
+ createQueue(3, "queues.testaddress", "queue13", null, false);
+ createQueue(3, "queues.testaddress", "queue15", null, false);
+ createQueue(3, "queues.testaddress", "queue16", null, false);
+ createQueue(3, "queues.testaddress", "queue18", null, false);
+
+ addConsumer(3, 3, "queue3", null);
+ addConsumer(8, 3, "queue8", null);
+ addConsumer(13, 3, "queue13", null);
+ addConsumer(18, 3, "queue15", null);
+
+ startServers(9, 4);
+ setupSessionFactory(4, isNetty());
+
+ createQueue(4, "queues.testaddress", "queue4", null, false);
+ createQueue(4, "queues.testaddress", "queue9", null, false);
+ createQueue(4, "queues.testaddress", "queue14", null, false);
+ createQueue(4, "queues.testaddress", "queue15", null, false);
+ createQueue(4, "queues.testaddress", "queue16", null, false);
+ createQueue(4, "queues.testaddress", "queue17", null, false);
+ createQueue(4, "queues.testaddress", "queue18", null, false);
+
+ addConsumer(4, 4, "queue4", null);
+ addConsumer(9, 4, "queue9", null);
+ addConsumer(10, 0, "queue10", null);
+ addConsumer(14, 4, "queue14", null);
+
+ addConsumer(15, 0, "queue15", null);
+ addConsumer(19, 4, "queue15", null);
+
+ addConsumer(20, 2, "queue16", null);
+ addConsumer(21, 3, "queue16", null);
+ addConsumer(22, 4, "queue16", null);
+
+ addConsumer(23, 0, "queue17", null);
+ addConsumer(24, 1, "queue17", null);
+ addConsumer(25, 4, "queue17", null);
+
+ addConsumer(26, 3, "queue18", null);
+ addConsumer(27, 4, "queue18", null);
+
+ waitForBindings(0, "queues.testaddress", 5, 5, true);
+ waitForBindings(1, "queues.testaddress", 5, 5, true);
+ waitForBindings(2, "queues.testaddress", 5, 5, true);
+ waitForBindings(3, "queues.testaddress", 6, 6, true);
+ waitForBindings(4, "queues.testaddress", 7, 7, true);
+
+ waitForBindings(0, "queues.testaddress", 23, 23, false);
+ waitForBindings(1, "queues.testaddress", 23, 23, false);
+ waitForBindings(2, "queues.testaddress", 23, 23, false);
+ waitForBindings(3, "queues.testaddress", 22, 22, false);
+ waitForBindings(4, "queues.testaddress", 21, 21, false);
+
+ send(0, "queues.testaddress", 10, false, null);
+
+ verifyReceiveAll(10, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 15, 16, 17, 18, 19);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 20, 21, 22);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 23, 24, 25);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 26, 27);
+
+ closeAllConsumers();
+
+ closeAllSessionFactories();
+ }
+
+ public void _test() throws Exception
+ {
+ for (int i = 0; i < 50; i++)
+ {
+ System.out.println("\n\n" + i + "\n\n");
+ testStartStopWithTwoServers();
+ tearDown();
+ setUp();
+ }
+ }
+
+ public void testStartStopWithTwoServers() throws Exception
+ {
+ setupCluster();
+
+ startServers(5, 6, 0, 1);
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+// setupSessionFactory(2, isNetty());
+// setupSessionFactory(3, isNetty());
+// setupSessionFactory(4, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+ createQueue(1, "queues.testaddress", "queue1", null, false);
+// createQueue(2, "queues.testaddress", "queue2", null, false);
+// createQueue(3, "queues.testaddress", "queue3", null, false);
+// createQueue(4, "queues.testaddress", "queue4", null, false);
+
+ createQueue(0, "queues.testaddress", "queue5", null, false);
+ createQueue(1, "queues.testaddress", "queue6", null, false);
+// createQueue(2, "queues.testaddress", "queue7", null, false);
+// createQueue(3, "queues.testaddress", "queue8", null, false);
+// createQueue(4, "queues.testaddress", "queue9", null, false);
+
+ createQueue(0, "queues.testaddress", "queue10", null, false);
+ createQueue(1, "queues.testaddress", "queue11", null, false);
+// createQueue(2, "queues.testaddress", "queue12", null, false);
+// createQueue(3, "queues.testaddress", "queue13", null, false);
+// createQueue(4, "queues.testaddress", "queue14", null, false);
+
+ createQueue(0, "queues.testaddress", "queue15", null, false);
+ createQueue(1, "queues.testaddress", "queue15", null, false);
+// createQueue(2, "queues.testaddress", "queue15", null, false);
+// createQueue(3, "queues.testaddress", "queue15", null, false);
+// createQueue(4, "queues.testaddress", "queue15", null, false);
+
+// createQueue(2, "queues.testaddress", "queue16", null, false);
+// createQueue(3, "queues.testaddress", "queue16", null, false);
+// createQueue(4, "queues.testaddress", "queue16", null, false);
+
+ createQueue(0, "queues.testaddress", "queue17", null, false);
+ createQueue(1, "queues.testaddress", "queue17", null, false);
+// createQueue(4, "queues.testaddress", "queue17", null, false);
+
+// createQueue(3, "queues.testaddress", "queue18", null, false);
+// createQueue(4, "queues.testaddress", "queue18", null, false);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 1, "queue1", null);
+// addConsumer(2, 2, "queue2", null);
+// addConsumer(3, 3, "queue3", null);
+// addConsumer(4, 4, "queue4", null);
+
+ addConsumer(5, 0, "queue5", null);
+ addConsumer(6, 1, "queue6", null);
+// addConsumer(7, 2, "queue7", null);
+// addConsumer(8, 3, "queue8", null);
+// addConsumer(9, 4, "queue9", null);
+
+ addConsumer(10, 0, "queue10", null);
+ addConsumer(11, 1, "queue11", null);
+// addConsumer(12, 2, "queue12", null);
+// addConsumer(13, 3, "queue13", null);
+// addConsumer(14, 4, "queue14", null);
+
+ addConsumer(15, 0, "queue15", null);
+ addConsumer(16, 1, "queue15", null);
+// addConsumer(17, 2, "queue15", null);
+// addConsumer(18, 3, "queue15", null);
+// addConsumer(19, 4, "queue15", null);
+
+// addConsumer(20, 2, "queue16", null);
+// addConsumer(21, 3, "queue16", null);
+// addConsumer(22, 4, "queue16", null);
+
+ addConsumer(23, 0, "queue17", null);
+ addConsumer(24, 1, "queue17", null);
+// addConsumer(25, 4, "queue17", null);
+
+// addConsumer(26, 3, "queue18", null);
+// addConsumer(27, 4, "queue18", null);
+
+ waitForBindings(0, "queues.testaddress", 5, 5, true);
+ waitForBindings(1, "queues.testaddress", 5, 5, true);
+// waitForBindings(2, "queues.testaddress", 5, 5, true);
+// waitForBindings(3, "queues.testaddress", 6, 6, true);
+// waitForBindings(4, "queues.testaddress", 7, 7, true);
+
+ waitForBindings(0, "queues.testaddress", 5 , 5, false);
+// waitForBindings(0, "queues.testaddress", 23, 23, false);
+ waitForBindings(1, "queues.testaddress", 5, 5, false);
+// waitForBindings(1, "queues.testaddress", 23, 23, false);
+// waitForBindings(2, "queues.testaddress", 23, 23, false);
+// waitForBindings(3, "queues.testaddress", 22, 22, false);
+// waitForBindings(4, "queues.testaddress", 21, 21, false);
+
+ send(0, "queues.testaddress", 10, false, null);
+
+// verifyReceiveAll(10, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);
+ verifyReceiveAll(10, 0, 1, 5, 6, 10, 11);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 15, 16);
+// verifyReceiveRoundRobinInSomeOrder(10, 15, 16, 17, 18, 19);
+
+// verifyReceiveRoundRobinInSomeOrder(10, 20, 21, 22);
+
+// verifyReceiveRoundRobinInSomeOrder(10, 23, 24, 25);
+ verifyReceiveRoundRobinInSomeOrder(10, 23, 24);
+
+// verifyReceiveRoundRobinInSomeOrder(10, 26, 27);
+
+ removeConsumer(0);
+ removeConsumer(5);
+ removeConsumer(10);
+ removeConsumer(15);
+ removeConsumer(23);
+// removeConsumer(3);
+// removeConsumer(8);
+// removeConsumer(13);
+// removeConsumer(18);
+// removeConsumer(21);
+// removeConsumer(26);
+
+ closeSessionFactory(0);
+// closeSessionFactory(3);
+
+ stopServers(0, 5);
+// stopServers(0, 3, 5, 8);
+
+ startServers(5, 0);
+// startServers(5, 8, 0, 3);
+
+ Thread.sleep(2000);
+
+ setupSessionFactory(0, isNetty());
+// setupSessionFactory(3, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+// createQueue(3, "queues.testaddress", "queue3", null, false);
+
+ createQueue(0, "queues.testaddress", "queue5", null, false);
+ // createQueue(3, "queues.testaddress", "queue8", null, false);
+
+ createQueue(0, "queues.testaddress", "queue10", null, false);
+// createQueue(3, "queues.testaddress", "queue13", null, false);
+
+ createQueue(0, "queues.testaddress", "queue15", null, false);
+// createQueue(3, "queues.testaddress", "queue15", null, false);
+
+// createQueue(3, "queues.testaddress", "queue16", null, false);
+
+ createQueue(0, "queues.testaddress", "queue17", null, false);
+
+// createQueue(3, "queues.testaddress", "queue18", null, false);
+
+ addConsumer(0, 0, "queue0", null);
+// addConsumer(3, 3, "queue3", null);
+
+ addConsumer(5, 0, "queue5", null);
+// addConsumer(8, 3, "queue8", null);
+
+ addConsumer(10, 0, "queue10", null);
+// addConsumer(13, 3, "queue13", null);
+
+ addConsumer(15, 0, "queue15", null);
+// addConsumer(18, 3, "queue15", null);
+
+// addConsumer(21, 3, "queue16", null);
+
+ addConsumer(23, 0, "queue17", null);
+
+// addConsumer(26, 3, "queue18", null);
+
+ waitForBindings(0, "queues.testaddress", 5, 5, true);
+ waitForBindings(1, "queues.testaddress", 5, 5, true);
+// waitForBindings(2, "queues.testaddress", 5, 5, true);
+// waitForBindings(3, "queues.testaddress", 6, 6, true);
+// waitForBindings(4, "queues.testaddress", 7, 7, true);
+
+ waitForBindings(0, "queues.testaddress", 5, 5, false);
+ waitForBindings(1, "queues.testaddress", 5, 5, false);
+// waitForBindings(0, "queues.testaddress", 23, 23, false);
+// waitForBindings(1, "queues.testaddress", 23, 23, false);
+// waitForBindings(2, "queues.testaddress", 23, 23, false);
+// waitForBindings(3, "queues.testaddress", 22, 22, false);
+/// waitForBindings(4, "queues.testaddress", 21, 21, false);
+
+ send(0, "queues.testaddress", 10, false, null);
+
+ verifyReceiveAll(10, 0, 1, 5, 6, 10, 11);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 15, 16);
+ verifyReceiveRoundRobinInSomeOrder(10, 23, 24);
+
+ /*
+ verifyReceiveAll(10, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 15, 16, 17, 18, 19);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 20, 21, 22);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 23, 24, 25);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 26, 27);
+ */
+ closeAllConsumers();
+
+ closeAllSessionFactories();
+ }
+
+ @Override
+ public void testStartStopServers() throws Exception
+ {
+ setupCluster();
+
+ startServers();
+
+ log.info("setup session factories: ");
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+ setupSessionFactory(3, isNetty());
+ setupSessionFactory(4, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+ createQueue(1, "queues.testaddress", "queue1", null, false);
+ createQueue(2, "queues.testaddress", "queue2", null, false);
+ createQueue(3, "queues.testaddress", "queue3", null, false);
+ createQueue(4, "queues.testaddress", "queue4", null, false);
+
+ createQueue(0, "queues.testaddress", "queue5", null, false);
+ createQueue(1, "queues.testaddress", "queue6", null, false);
+ createQueue(2, "queues.testaddress", "queue7", null, false);
+ createQueue(3, "queues.testaddress", "queue8", null, false);
+ createQueue(4, "queues.testaddress", "queue9", null, false);
+
+ createQueue(0, "queues.testaddress", "queue10", null, false);
+ createQueue(1, "queues.testaddress", "queue11", null, false);
+ createQueue(2, "queues.testaddress", "queue12", null, false);
+ createQueue(3, "queues.testaddress", "queue13", null, false);
+ createQueue(4, "queues.testaddress", "queue14", null, false);
+
+ createQueue(0, "queues.testaddress", "queue15", null, false);
+ createQueue(1, "queues.testaddress", "queue15", null, false);
+ createQueue(2, "queues.testaddress", "queue15", null, false);
+ createQueue(3, "queues.testaddress", "queue15", null, false);
+ createQueue(4, "queues.testaddress", "queue15", null, false);
+
+ createQueue(2, "queues.testaddress", "queue16", null, false);
+ createQueue(3, "queues.testaddress", "queue16", null, false);
+ createQueue(4, "queues.testaddress", "queue16", null, false);
+
+ createQueue(0, "queues.testaddress", "queue17", null, false);
+ createQueue(1, "queues.testaddress", "queue17", null, false);
+ createQueue(4, "queues.testaddress", "queue17", null, false);
+
+ createQueue(3, "queues.testaddress", "queue18", null, false);
+ createQueue(4, "queues.testaddress", "queue18", null, false);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 1, "queue1", null);
+ addConsumer(2, 2, "queue2", null);
+ addConsumer(3, 3, "queue3", null);
+ addConsumer(4, 4, "queue4", null);
+
+ addConsumer(5, 0, "queue5", null);
+ addConsumer(6, 1, "queue6", null);
+ addConsumer(7, 2, "queue7", null);
+ addConsumer(8, 3, "queue8", null);
+ addConsumer(9, 4, "queue9", null);
+
+ addConsumer(10, 0, "queue10", null);
+ addConsumer(11, 1, "queue11", null);
+ addConsumer(12, 2, "queue12", null);
+ addConsumer(13, 3, "queue13", null);
+ addConsumer(14, 4, "queue14", null);
+
+ addConsumer(15, 0, "queue15", null);
+ addConsumer(16, 1, "queue15", null);
+ addConsumer(17, 2, "queue15", null);
+ addConsumer(18, 3, "queue15", null);
+ addConsumer(19, 4, "queue15", null);
+
+ addConsumer(20, 2, "queue16", null);
+ addConsumer(21, 3, "queue16", null);
+ addConsumer(22, 4, "queue16", null);
+
+ addConsumer(23, 0, "queue17", null);
+ addConsumer(24, 1, "queue17", null);
+ addConsumer(25, 4, "queue17", null);
+
+ addConsumer(26, 3, "queue18", null);
+ addConsumer(27, 4, "queue18", null);
+
+ log.info("wait for bindings...");
+
+ waitForBindings(0, "queues.testaddress", 5, 5, true);
+ waitForBindings(1, "queues.testaddress", 5, 5, true);
+ waitForBindings(2, "queues.testaddress", 5, 5, true);
+ waitForBindings(3, "queues.testaddress", 6, 6, true);
+ waitForBindings(4, "queues.testaddress", 7, 7, true);
+
+ waitForBindings(0, "queues.testaddress", 23, 23, false);
+ waitForBindings(1, "queues.testaddress", 23, 23, false);
+ waitForBindings(2, "queues.testaddress", 23, 23, false);
+ waitForBindings(3, "queues.testaddress", 22, 22, false);
+ waitForBindings(4, "queues.testaddress", 21, 21, false);
+
+ log.info("send and receive messages");
+
+ send(0, "queues.testaddress", 10, false, null);
+
+ verifyReceiveAll(10, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 15, 16, 17, 18, 19);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 20, 21, 22);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 23, 24, 25);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 26, 27);
+
+ removeConsumer(0);
+ removeConsumer(5);
+ removeConsumer(10);
+ removeConsumer(15);
+ removeConsumer(23);
+ removeConsumer(3);
+ removeConsumer(8);
+ removeConsumer(13);
+ removeConsumer(18);
+ removeConsumer(21);
+ removeConsumer(26);
+
+ closeSessionFactory(0);
+ closeSessionFactory(3);
+
+ log.info("stop servers");
+
+ stopServers(0, 3, 5, 8);
+
+ log.info("restart servers");
+
+ startServers(5, 8, 0, 3);
+
+ Thread.sleep(2000);
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(3, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+ createQueue(3, "queues.testaddress", "queue3", null, false);
+
+ createQueue(0, "queues.testaddress", "queue5", null, false);
+ createQueue(3, "queues.testaddress", "queue8", null, false);
+
+ createQueue(0, "queues.testaddress", "queue10", null, false);
+ createQueue(3, "queues.testaddress", "queue13", null, false);
+
+ createQueue(0, "queues.testaddress", "queue15", null, false);
+ createQueue(3, "queues.testaddress", "queue15", null, false);
+
+ createQueue(3, "queues.testaddress", "queue16", null, false);
+
+ createQueue(0, "queues.testaddress", "queue17", null, false);
+
+ createQueue(3, "queues.testaddress", "queue18", null, false);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(3, 3, "queue3", null);
+
+ addConsumer(5, 0, "queue5", null);
+ addConsumer(8, 3, "queue8", null);
+
+ addConsumer(10, 0, "queue10", null);
+ addConsumer(13, 3, "queue13", null);
+
+ addConsumer(15, 0, "queue15", null);
+ addConsumer(18, 3, "queue15", null);
+
+ addConsumer(21, 3, "queue16", null);
+
+ addConsumer(23, 0, "queue17", null);
+
+ addConsumer(26, 3, "queue18", null);
+
+ waitForBindings(0, "queues.testaddress", 5, 5, true);
+ waitForBindings(1, "queues.testaddress", 5, 5, true);
+ waitForBindings(2, "queues.testaddress", 5, 5, true);
+ waitForBindings(3, "queues.testaddress", 6, 6, true);
+ waitForBindings(4, "queues.testaddress", 7, 7, true);
+
+ waitForBindings(0, "queues.testaddress", 23, 23, false);
+ waitForBindings(1, "queues.testaddress", 23, 23, false);
+ waitForBindings(2, "queues.testaddress", 23, 23, false);
+ waitForBindings(3, "queues.testaddress", 22, 22, false);
+ waitForBindings(4, "queues.testaddress", 21, 21, false);
+
+ send(0, "queues.testaddress", 10, false, null);
+
+ verifyReceiveAll(10, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 15, 16, 17, 18, 19);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 20, 21, 22);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 23, 24, 25);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 26, 27);
+
+ closeAllConsumers();
+
+ closeAllSessionFactories();
+ }
+
+ @Override
+ protected void setupCluster(final boolean forwardWhenNoConsumers) throws Exception
+ {
+ // The lives
+ setupClusterConnectionWithBackups("cluster0",
+ "queues",
+ forwardWhenNoConsumers,
+ 1,
+ isNetty(),
+ 0,
+ new int[] { 1, 2, 3, 4 },
+ new int[] { 6, 7, 8, 9 });
+
+ setupClusterConnectionWithBackups("cluster1",
+ "queues",
+ forwardWhenNoConsumers,
+ 1,
+ isNetty(),
+ 1,
+ new int[] { 0, 2, 3, 4 },
+ new int[] { 5, 7, 8, 9 });
+
+ setupClusterConnectionWithBackups("cluster2",
+ "queues",
+ forwardWhenNoConsumers,
+ 1,
+ isNetty(),
+ 2,
+ new int[] { 0, 1, 3, 4 },
+ new int[] { 5, 6, 8, 9 });
+
+ setupClusterConnectionWithBackups("cluster3",
+ "queues",
+ forwardWhenNoConsumers,
+ 1,
+ isNetty(),
+ 3,
+ new int[] { 0, 1, 2, 4 },
+ new int[] { 5, 6, 7, 9 });
+
+ setupClusterConnectionWithBackups("cluster4",
+ "queues",
+ forwardWhenNoConsumers,
+ 1,
+ isNetty(),
+ 4,
+ new int[] { 0, 1, 2, 3 },
+ new int[] { 5, 6, 7, 8 });
+
+ // The backups
+
+ setupClusterConnectionWithBackups("cluster0",
+ "queues",
+ forwardWhenNoConsumers,
+ 1,
+ isNetty(),
+ 5,
+ new int[] { 1, 2, 3, 4 },
+ new int[] { 6, 7, 8, 9 });
+
+ setupClusterConnectionWithBackups("cluster1",
+ "queues",
+ forwardWhenNoConsumers,
+ 1,
+ isNetty(),
+ 6,
+ new int[] { 0, 2, 3, 4 },
+ new int[] { 5, 7, 8, 9 });
+
+ setupClusterConnectionWithBackups("cluster2",
+ "queues",
+ forwardWhenNoConsumers,
+ 1,
+ isNetty(),
+ 7,
+ new int[] { 0, 1, 3, 4 },
+ new int[] { 5, 6, 8, 9 });
+
+ setupClusterConnectionWithBackups("cluster3",
+ "queues",
+ forwardWhenNoConsumers,
+ 1,
+ isNetty(),
+ 8,
+ new int[] { 0, 1, 2, 4 },
+ new int[] { 5, 6, 7, 9 });
+
+ setupClusterConnectionWithBackups("cluster4",
+ "queues",
+ forwardWhenNoConsumers,
+ 1,
+ isNetty(),
+ 9,
+ new int[] { 0, 1, 2, 3 },
+ new int[] { 5, 6, 7, 8 });
+ }
+
+ @Override
+ protected void setupServers() throws Exception
+ {
+ // The backups
+ setupServer(5, isFileStorage(), isNetty(), true);
+ setupServer(6, isFileStorage(), isNetty(), true);
+ setupServer(7, isFileStorage(), isNetty(), true);
+ setupServer(8, isFileStorage(), isNetty(), true);
+ setupServer(9, isFileStorage(), isNetty(), true);
+
+ // The lives
+ setupServer(0, isFileStorage(), isNetty(), 5);
+ setupServer(1, isFileStorage(), isNetty(), 6);
+ setupServer(2, isFileStorage(), isNetty(), 7);
+ setupServer(3, isFileStorage(), isNetty(), 8);
+ setupServer(4, isFileStorage(), isNetty(), 9);
+ }
+
+ @Override
+ protected void startServers() throws Exception
+ {
+ // Need to set backup, since when restarting backup after it has failed over, backup will have been set to false
+
+ getServer(5).getConfiguration().setBackup(true);
+ getServer(6).getConfiguration().setBackup(true);
+ getServer(7).getConfiguration().setBackup(true);
+ getServer(8).getConfiguration().setBackup(true);
+ getServer(9).getConfiguration().setBackup(true);
+
+ startServers(5, 6, 7, 8, 9, 0, 1, 2, 3, 4);
+ }
+
+ @Override
+ protected void stopServers() throws Exception
+ {
+ closeAllConsumers();
+
+ closeAllSessionFactories();
+
+ stopServers(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+ }
+
+}
15 years, 1 month