JBoss hornetq SVN: r8261 - in trunk: tests/src/org/hornetq/tests/integration/cluster/distribution and 1 other directories.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-11-11 09:12:14 -0500 (Wed, 11 Nov 2009)
New Revision: 8261
Modified:
trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverReplicationTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java
Log:
reversed andy's patch
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2009-11-11 14:08:48 UTC (rev 8260)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2009-11-11 14:12:14 UTC (rev 8261)
@@ -676,9 +676,8 @@
distance + 1);
bindings.put(clusterName, binding);
- log.info(clusterName + " binding " + binding + " to " + server.getNodeID() + " distance = " + distance + server.getConfiguration().isBackup());
- Binding b = postOffice.getBinding(clusterName);
- if (b != null)
+
+ if (postOffice.getBinding(clusterName) != null)
{
// Sanity check - this means the binding has already been added via another bridge, probably max
// hops is too high
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2009-11-11 14:08:48 UTC (rev 8260)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2009-11-11 14:12:14 UTC (rev 8261)
@@ -84,9 +84,7 @@
protected void setUp() throws Exception
{
super.setUp();
- consumers = new ConsumerHolder[MAX_CONSUMERS];
- servers = new HornetQServer[MAX_SERVERS];
- sfs = new ClientSessionFactory[MAX_SERVERS];
+
checkFreePort(PORTS);
clearData();
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverReplicationTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverReplicationTest.java 2009-11-11 14:08:48 UTC (rev 8260)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverReplicationTest.java 2009-11-11 14:12:14 UTC (rev 8261)
@@ -30,18 +30,6 @@
{
private static final Logger log = Logger.getLogger(GroupingFailoverReplicationTest.class);
- public void test() throws Exception
- {
- int count = 0;
- while (true)
- {
- log.info("**** ITERATION " + count++);
- testGroupingLocalHandlerFails();
- tearDown();
- setUp();
- }
- }
-
protected void setupReplicatedServer(int node, boolean fileStorage, boolean netty, int backupNode)
{
if (servers[node] != null)
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java 2009-11-11 14:08:48 UTC (rev 8260)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java 2009-11-11 14:12:14 UTC (rev 8261)
@@ -25,7 +25,6 @@
import org.hornetq.core.server.cluster.MessageFlowRecord;
import org.hornetq.core.server.cluster.impl.ClusterConnectionImpl;
import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
-import org.hornetq.core.logging.Logger;
import org.hornetq.tests.integration.cluster.distribution.ClusterTestBase;
import org.hornetq.utils.SimpleString;
@@ -35,10 +34,8 @@
*/
public abstract class GroupingFailoverTestBase extends ClusterTestBase
{
- private static final Logger log = Logger.getLogger(GroupingFailoverTestBase.class);
public void testGroupingLocalHandlerFails() throws Exception
{
- log.info("***********************************start******************************************");
setupReplicatedServer(2, isFileStorage(), isNetty(), 0);
setupMasterServer(0, isFileStorage(), isNetty());
@@ -46,17 +43,16 @@
setupServer(1, isFileStorage(), isNetty());
setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1);
- setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0);
- //setupClusterConnectionWithBackups("cluster1", "queues", false, 1, isNetty(), 1, new int[]{0}, new int[]{2});
+ setupClusterConnectionWithBackups("cluster1", "queues", false, 1, isNetty(), 1, new int[]{0}, new int[]{2});
- //setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 1);
+ setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 1);
setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
- //setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 2);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 2);
startServers(2, 0, 1);
@@ -71,8 +67,6 @@
waitForBindings(0, "queues.testaddress", 1, 0, true);
waitForBindings(1, "queues.testaddress", 1, 0, true);
- waitForBindings(0, "queues.testaddress", 1, 0, false);
- waitForBindings(1, "queues.testaddress", 1, 0, false);
addConsumer(0, 0, "queue0", null);
addConsumer(1, 1, "queue0", null);
@@ -80,7 +74,7 @@
waitForBindings(0, "queues.testaddress", 1, 1, false);
waitForBindings(1, "queues.testaddress", 1, 1, false);
- /*sendWithProperty(0, "queues.testaddress", 10, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+ sendWithProperty(0, "queues.testaddress", 10, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
verifyReceiveAll(10, 0);
@@ -130,8 +124,8 @@
sendWithProperty(1, "queues.testaddress", 10, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
verifyReceiveAll(10, 2);
-*/
+ System.out.println("*****************************************************************************");
}
finally
{
@@ -139,9 +133,8 @@
closeAllSessionFactories();
- stopServers(2, 0, 1);
+ stopServers(0, 1, 2);
}
- log.info("***********************************end******************************************");
}
public void testGroupingLocalHandlerFailsMultipleGroups() throws Exception
15 years, 2 months
JBoss hornetq SVN: r8260 - in trunk: src/main/org/hornetq/core/journal/impl and 7 other directories.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-11-11 09:08:48 -0500 (Wed, 11 Nov 2009)
New Revision: 8260
Modified:
trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverReplicationTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java
trunk/tests/src/org/hornetq/tests/unit/core/asyncio/TimedBufferTest.java
Log:
fixed failuredeadlocktest
Modified: trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java 2009-11-11 13:46:28 UTC (rev 8259)
+++ trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java 2009-11-11 14:08:48 UTC (rev 8260)
@@ -482,7 +482,9 @@
}
private void failoverOrReconnect(final Object connectionID, final HornetQException me)
- {
+ {
+ Set<ClientSessionInternal> sessionsToClose = null;
+
synchronized (failoverLock)
{
if (connection == null || connection.getID() != connectionID)
@@ -622,26 +624,32 @@
connection = null;
}
+
+ callFailureListeners(me, true);
if (connection == null)
{
- // If connection is null it means we didn't succeed in failing over or reconnecting
- // so we close all the sessions, so they will throw exceptions when attempted to be used
-
- for (ClientSessionInternal session: new HashSet<ClientSessionInternal>(sessions))
+ sessionsToClose = new HashSet<ClientSessionInternal>(sessions);
+ }
+ }
+
+ //This needs to be outside the failover lock to prevent deadlock
+ if (sessionsToClose != null)
+ {
+ // If connection is null it means we didn't succeed in failing over or reconnecting
+ // so we close all the sessions, so they will throw exceptions when attempted to be used
+
+ for (ClientSessionInternal session: sessionsToClose)
+ {
+ try
{
- try
- {
- session.cleanUp();
- }
- catch (Exception e)
- {
- log.error("Failed to cleanup session");
- }
+ session.cleanUp();
}
+ catch (Exception e)
+ {
+ log.error("Failed to cleanup session");
+ }
}
-
- callFailureListeners(me, true);
}
}
@@ -1004,9 +1012,10 @@
{
public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
{
- if (connection != null && connectionID == connection.getID())
+ RemotingConnection theConn = connection;
+ if (theConn != null && connectionID == theConn.getID())
{
- connection.bufferReceived(connectionID, buffer);
+ theConn.bufferReceived(connectionID, buffer);
}
}
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java 2009-11-11 13:46:28 UTC (rev 8259)
+++ trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java 2009-11-11 14:08:48 UTC (rev 8260)
@@ -452,9 +452,8 @@
}
}
- class LocalBufferObserver implements TimedBufferObserver
+ private class LocalBufferObserver implements TimedBufferObserver
{
-
public void flushBuffer(ByteBuffer buffer, List<AIOCallback> callbacks)
{
buffer.flip();
Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2009-11-11 13:46:28 UTC (rev 8259)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2009-11-11 14:08:48 UTC (rev 8260)
@@ -168,6 +168,13 @@
pageSize = addressSettings.getPageSizeBytes();
this.addressFullMessagePolicy = addressSettings.getAddressFullMessagePolicy();
+
+ if (addressFullMessagePolicy == AddressFullMessagePolicy.PAGE && maxSize != -1 && pageSize >= maxSize)
+ {
+ throw new IllegalStateException("pageSize for address " + address +
+ " >= maxSize. Normally pageSize should" +
+ " be significantly smaller than maxSize, ms: " + maxSize + " ps " + pageSize);
+ }
this.executor = executor;
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-11 13:46:28 UTC (rev 8259)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-11 14:08:48 UTC (rev 8260)
@@ -467,7 +467,8 @@
}
String uid = UUIDGenerator.getInstance().generateStringUUID();
-
+ // log.info("sending binding" + binding +" added " + binding.getClusterName() + " binding.getDistance() = " + binding.getDistance() + " " + server.getConfiguration().isBackup());
+ //Thread.dumpStack();
managementService.sendNotification(new Notification(uid, NotificationType.BINDING_ADDED, props));
}
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2009-11-11 13:46:28 UTC (rev 8259)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2009-11-11 14:08:48 UTC (rev 8260)
@@ -676,8 +676,9 @@
distance + 1);
bindings.put(clusterName, binding);
-
- if (postOffice.getBinding(clusterName) != null)
+ log.info(clusterName + " binding " + binding + " to " + server.getNodeID() + " distance = " + distance + server.getConfiguration().isBackup());
+ Binding b = postOffice.getBinding(clusterName);
+ if (b != null)
{
// Sanity check - this means the binding has already been added via another bridge, probably max
// hops is too high
Modified: trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2009-11-11 13:46:28 UTC (rev 8259)
+++ trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2009-11-11 14:08:48 UTC (rev 8260)
@@ -645,7 +645,7 @@
settings.put(ADDRESS.toString(), set);
- HornetQServer server = createServer(true, config, 10 * 1024, 10 * 1024, settings);
+ HornetQServer server = createServer(true, config, 1024, 10 * 1024, settings);
server.start();
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2009-11-11 13:46:28 UTC (rev 8259)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2009-11-11 14:08:48 UTC (rev 8260)
@@ -84,7 +84,9 @@
protected void setUp() throws Exception
{
super.setUp();
-
+ consumers = new ConsumerHolder[MAX_CONSUMERS];
+ servers = new HornetQServer[MAX_SERVERS];
+ sfs = new ClientSessionFactory[MAX_SERVERS];
checkFreePort(PORTS);
clearData();
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverReplicationTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverReplicationTest.java 2009-11-11 13:46:28 UTC (rev 8259)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverReplicationTest.java 2009-11-11 14:08:48 UTC (rev 8260)
@@ -30,7 +30,18 @@
{
private static final Logger log = Logger.getLogger(GroupingFailoverReplicationTest.class);
-
+ public void test() throws Exception
+ {
+ int count = 0;
+ while (true)
+ {
+ log.info("**** ITERATION " + count++);
+ testGroupingLocalHandlerFails();
+ tearDown();
+ setUp();
+ }
+ }
+
protected void setupReplicatedServer(int node, boolean fileStorage, boolean netty, int backupNode)
{
if (servers[node] != null)
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java 2009-11-11 13:46:28 UTC (rev 8259)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java 2009-11-11 14:08:48 UTC (rev 8260)
@@ -25,6 +25,7 @@
import org.hornetq.core.server.cluster.MessageFlowRecord;
import org.hornetq.core.server.cluster.impl.ClusterConnectionImpl;
import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
+import org.hornetq.core.logging.Logger;
import org.hornetq.tests.integration.cluster.distribution.ClusterTestBase;
import org.hornetq.utils.SimpleString;
@@ -34,8 +35,10 @@
*/
public abstract class GroupingFailoverTestBase extends ClusterTestBase
{
+ private static final Logger log = Logger.getLogger(GroupingFailoverTestBase.class);
public void testGroupingLocalHandlerFails() throws Exception
{
+ log.info("***********************************start******************************************");
setupReplicatedServer(2, isFileStorage(), isNetty(), 0);
setupMasterServer(0, isFileStorage(), isNetty());
@@ -43,16 +46,17 @@
setupServer(1, isFileStorage(), isNetty());
setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1);
+ setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0);
- setupClusterConnectionWithBackups("cluster1", "queues", false, 1, isNetty(), 1, new int[]{0}, new int[]{2});
+ //setupClusterConnectionWithBackups("cluster1", "queues", false, 1, isNetty(), 1, new int[]{0}, new int[]{2});
- setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 1);
+ //setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 1);
setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
- setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 2);
+ //setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 2);
startServers(2, 0, 1);
@@ -67,6 +71,8 @@
waitForBindings(0, "queues.testaddress", 1, 0, true);
waitForBindings(1, "queues.testaddress", 1, 0, true);
+ waitForBindings(0, "queues.testaddress", 1, 0, false);
+ waitForBindings(1, "queues.testaddress", 1, 0, false);
addConsumer(0, 0, "queue0", null);
addConsumer(1, 1, "queue0", null);
@@ -74,7 +80,7 @@
waitForBindings(0, "queues.testaddress", 1, 1, false);
waitForBindings(1, "queues.testaddress", 1, 1, false);
- sendWithProperty(0, "queues.testaddress", 10, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+ /*sendWithProperty(0, "queues.testaddress", 10, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
verifyReceiveAll(10, 0);
@@ -124,8 +130,8 @@
sendWithProperty(1, "queues.testaddress", 10, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
verifyReceiveAll(10, 2);
+*/
- System.out.println("*****************************************************************************");
}
finally
{
@@ -133,8 +139,9 @@
closeAllSessionFactories();
- stopServers(0, 1, 2);
+ stopServers(2, 0, 1);
}
+ log.info("***********************************end******************************************");
}
public void testGroupingLocalHandlerFailsMultipleGroups() throws Exception
Modified: trunk/tests/src/org/hornetq/tests/unit/core/asyncio/TimedBufferTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/asyncio/TimedBufferTest.java 2009-11-11 13:46:28 UTC (rev 8259)
+++ trunk/tests/src/org/hornetq/tests/unit/core/asyncio/TimedBufferTest.java 2009-11-11 14:08:48 UTC (rev 8260)
@@ -108,8 +108,7 @@
timedBuffer.checkSize(10);
timedBuffer.addBytes(bytes, false, dummyCallback);
}
-
-
+
assertEquals(1, flushTimes.get());
ByteBuffer flushedBuffer = buffers.get(0);
15 years, 2 months
JBoss hornetq SVN: r8259 - trunk/tests/src/org/hornetq/tests/integration/cluster/failover.
by do-not-reply@jboss.org
Author: ataylor
Date: 2009-11-11 08:46:28 -0500 (Wed, 11 Nov 2009)
New Revision: 8259
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedDistrubtionTest.java
Log:
test fix
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedDistrubtionTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedDistrubtionTest.java 2009-11-11 09:37:55 UTC (rev 8258)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedDistrubtionTest.java 2009-11-11 13:46:28 UTC (rev 8259)
@@ -69,7 +69,7 @@
sessionThree.start();
waitForBindings(3, "test.SomeAddress", 1, 1, true);
-
+ waitForBindings(1, "test.SomeAddress", 1, 1, false);
try
{
ClientProducer producer = sessionOne.createProducer(ADDRESS);
@@ -94,11 +94,8 @@
int received = (Integer)msg.getObjectProperty(new SimpleString("key"));
- if (i != received)
- {
- // Shouldn't this be a failure?
- System.out.println(i + "!=" + received);
- }
+ assertEquals(i, received);
+
msg.acknowledge();
}
@@ -110,6 +107,7 @@
// Redistribution may loose messages between the nodes.
Thread.sleep(500);
+
fail(sessionThree);
// sessionThree.close();
@@ -132,11 +130,8 @@
int received = (Integer)msg.getObjectProperty(new SimpleString("key"));
- if (i != received)
- {
- // Shouldn't this be a failure?
- System.out.println(i + "!=" + received);
- }
+ assertEquals(i, received);
+
msg.acknowledge();
}
15 years, 2 months
JBoss hornetq SVN: r8258 - in trunk/examples/jms/bridge: server1 and 1 other directories.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-11-11 04:37:55 -0500 (Wed, 11 Nov 2009)
New Revision: 8258
Modified:
trunk/examples/jms/bridge/server0/hornetq-configuration.xml
trunk/examples/jms/bridge/server1/hornetq-configuration.xml
trunk/examples/jms/bridge/src/org/hornetq/jms/example/BridgeExample.java
Log:
put bridge example back like it was
Modified: trunk/examples/jms/bridge/server0/hornetq-configuration.xml
===================================================================
--- trunk/examples/jms/bridge/server0/hornetq-configuration.xml 2009-11-10 22:54:21 UTC (rev 8257)
+++ trunk/examples/jms/bridge/server0/hornetq-configuration.xml 2009-11-11 09:37:55 UTC (rev 8258)
@@ -41,24 +41,11 @@
<forwarding-address>jms.queue.mincing-machine</forwarding-address>
<filter string="name='aardvark'"/>
<transformer-class-name>org.hornetq.jms.example.HatColourChangeTransformer</transformer-class-name>
- <reconnect-attempts>-1</reconnect-attempts>
-
- <confirmation-window-size>500000</confirmation-window-size>
-
+ <reconnect-attempts>-1</reconnect-attempts>
<connector-ref connector-name="remote-connector"/>
</bridge>
</bridges>
-
- <address-settings>
- <address-setting match="jms.queue.sausage-factory">
- <max-size-bytes>10000000</max-size-bytes>
- <page-size-bytes>1000000</page-size-bytes>
- <address-full-policy>PAGE</address-full-policy>
- </address-setting>
- </address-settings>
-
-
<!-- Other config -->
<security-settings>
Modified: trunk/examples/jms/bridge/server1/hornetq-configuration.xml
===================================================================
--- trunk/examples/jms/bridge/server1/hornetq-configuration.xml 2009-11-10 22:54:21 UTC (rev 8257)
+++ trunk/examples/jms/bridge/server1/hornetq-configuration.xml 2009-11-11 09:37:55 UTC (rev 8258)
@@ -22,15 +22,6 @@
</acceptors>
<!-- Other config -->
-
- <address-settings>
-
- <address-setting match="jms.queue.sausage-factory">
- <max-size-bytes>10000</max-size-bytes>
- <page-size-bytes>1000</page-size-bytes>
- <address-full-policy>PAGE</address-full-policy>
- </address-setting>
- </address-settings>
<security-settings>
<!--security for example queue-->
Modified: trunk/examples/jms/bridge/src/org/hornetq/jms/example/BridgeExample.java
===================================================================
--- trunk/examples/jms/bridge/src/org/hornetq/jms/example/BridgeExample.java 2009-11-10 22:54:21 UTC (rev 8257)
+++ trunk/examples/jms/bridge/src/org/hornetq/jms/example/BridgeExample.java 2009-11-11 09:37:55 UTC (rev 8258)
@@ -12,7 +12,6 @@
*/
package org.hornetq.jms.example;
-import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
@@ -50,7 +49,7 @@
try
{
- // Step 1 - we create an initial context for looking up JNDI on node 0
+ // Step 1 - we create an initial context for looking up JNDI on node 0
ic0 = getContext(0);
@@ -99,61 +98,42 @@
// Step 12. We create a JMS MessageProducer object on server 0
MessageProducer producer = session0.createProducer(sausageFactory);
- final int numMessages = 100000;
-
- byte[] bytes = new byte[1000];
+ // Step 13. We create and send a message representing an aardvark with a green hat to the sausage-factory
+ // on node 0
+ Message message = session0.createMessage();
- for (int i = 0; i < numMessages; i++)
- {
+ message.setStringProperty("name", "aardvark");
- // Step 13. We create and send a message representing an aardvark with a green hat to the sausage-factory
- // on node 0
- BytesMessage message = session0.createBytesMessage();
+ message.setStringProperty("hat", "green");
- message.setStringProperty("name", "aardvark");
+ producer.send(message);
- message.setStringProperty("hat", "green");
-
- message.writeBytes(bytes);
+ System.out.println("Sent " + message.getStringProperty("name") +
+ " message with " +
+ message.getStringProperty("hat") +
+ " hat to sausage-factory on node 0");
- producer.send(message);
-
- if (i % 1000 == 0)
- {
- System.out.println("Sent " + i);
- }
- }
-
// Step 14 - we successfully receive the aardvark message from the mincing-machine one node 1. The aardvark's
// hat is now blue since it has been transformed!
- for (int i = 0; i < numMessages; i++)
- {
- Message receivedMessage = consumer.receive(5000);
-
- if (receivedMessage == null)
- {
- throw new IllegalStateException("Did not receive message");
- }
+ Message receivedMessage = consumer.receive(5000);
- if (i % 1000 == 0)
- {
- System.out.println("Received " + i);
- }
+ System.out.println("Received " + receivedMessage.getStringProperty("name") +
+ " message with " +
+ receivedMessage.getStringProperty("hat") +
+ " hat from mincing-machine on node 1");
- // Step 13. We create and send another message, this time representing a sasquatch with a mauve hat to the
- // sausage-factory on node 0. This won't be bridged to the mincing-machine since we only want aardvarks, not
- // sasquatches
- }
-
- Message message = session0.createMessage();
+ // Step 13. We create and send another message, this time representing a sasquatch with a mauve hat to the
+ // sausage-factory on node 0. This won't be bridged to the mincing-machine since we only want aardvarks, not sasquatches
+ message = session0.createMessage();
+
message.setStringProperty("name", "sasquatch");
- message.setStringProperty("hat", "mauve");
+ message.setStringProperty("hat", "mauve");
producer.send(message);
-
+
System.out.println("Sent " + message.getStringProperty("name") +
" message with " +
message.getStringProperty("hat") +
@@ -161,7 +141,7 @@
// Step 14. We don't receive the message since it has not been bridged.
- Message receivedMessage = (TextMessage)consumer.receive(1000);
+ receivedMessage = (TextMessage)consumer.receive(1000);
if (receivedMessage == null)
{
15 years, 2 months
JBoss hornetq SVN: r8257 - in trunk: tests/src/org/hornetq/tests/integration/client and 4 other directories.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-11-10 17:54:21 -0500 (Tue, 10 Nov 2009)
New Revision: 8257
Added:
trunk/tests/src/org/hornetq/tests/integration/client/SessionClosedOnRemotingConnectionFailureTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/client/SessionClosedOnRemotingConnectionFailureTest.java
Modified:
trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
trunk/tests/src/org/hornetq/tests/integration/client/ConsumerCloseTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java
trunk/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java
trunk/tests/src/org/hornetq/tests/util/JMSTestBase.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-173
Modified: trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java 2009-11-10 20:03:11 UTC (rev 8256)
+++ trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java 2009-11-10 22:54:21 UTC (rev 8257)
@@ -482,7 +482,7 @@
}
private void failoverOrReconnect(final Object connectionID, final HornetQException me)
- {
+ {
synchronized (failoverLock)
{
if (connection == null || connection.getID() != connectionID)
@@ -620,9 +620,27 @@
{
connection.destroy();
- connection = null;
- }
+ connection = null;
+ }
+ if (connection == null)
+ {
+ // If connection is null it means we didn't succeed in failing over or reconnecting
+ // so we close all the sessions, so they will throw exceptions when attempted to be used
+
+ for (ClientSessionInternal session: new HashSet<ClientSessionInternal>(sessions))
+ {
+ try
+ {
+ session.cleanUp();
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to cleanup session");
+ }
+ }
+ }
+
callFailureListeners(me, true);
}
}
Modified: trunk/tests/src/org/hornetq/tests/integration/client/ConsumerCloseTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/ConsumerCloseTest.java 2009-11-10 20:03:11 UTC (rev 8256)
+++ trunk/tests/src/org/hornetq/tests/integration/client/ConsumerCloseTest.java 2009-11-10 22:54:21 UTC (rev 8257)
@@ -166,6 +166,8 @@
session.createQueue(address, queue, false);
}
+
+
private ClientSessionFactory sf;
Added: trunk/tests/src/org/hornetq/tests/integration/client/SessionClosedOnRemotingConnectionFailureTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/SessionClosedOnRemotingConnectionFailureTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/client/SessionClosedOnRemotingConnectionFailureTest.java 2009-11-10 22:54:21 UTC (rev 8257)
@@ -0,0 +1,146 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.client;
+
+import org.hornetq.core.client.ClientConsumer;
+import org.hornetq.core.client.ClientProducer;
+import org.hornetq.core.client.ClientSession;
+import org.hornetq.core.client.ClientSessionFactory;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.client.impl.ClientSessionInternal;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.TransportConfiguration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
+import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
+import org.hornetq.core.server.HornetQ;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.tests.util.UnitTestCase;
+
+/**
+ * A SessionClosedOnRemotingConnectionFailureTest
+ *
+ * @author Tim Fox
+
+ */
+public class SessionClosedOnRemotingConnectionFailureTest extends UnitTestCase
+{
+ private HornetQServer server;
+
+ private ClientSessionFactory sf;
+
+ public void testSessionClosedOnRemotingConnectionFailure() throws Exception
+ {
+ ClientSession session = null;
+
+ try
+ {
+ session = sf.createSession();
+
+ session.createQueue("fooaddress", "fooqueue");
+
+ ClientProducer prod = session.createProducer("fooaddress");
+
+ ClientConsumer cons = session.createConsumer("fooqueue");
+
+ session.start();
+
+ prod.send(session.createClientMessage(false));
+
+ assertNotNull(cons.receive());
+
+ // Now fail the underlying connection
+
+ RemotingConnection connection = ((ClientSessionInternal)session).getConnection();
+
+ connection.fail(new HornetQException(HornetQException.NOT_CONNECTED));
+
+ assertTrue(session.isClosed());
+
+ assertTrue(prod.isClosed());
+
+ assertTrue(cons.isClosed());
+
+ //Now try and use the producer
+
+ try
+ {
+ prod.send(session.createClientMessage(false));
+
+ fail("Should throw exception");
+ }
+ catch (HornetQException e)
+ {
+ assertEquals(HornetQException.OBJECT_CLOSED, e.getCode());
+ }
+
+ try
+ {
+ cons.receive();
+
+ fail("Should throw exception");
+ }
+ catch (HornetQException e)
+ {
+ assertEquals(HornetQException.OBJECT_CLOSED, e.getCode());
+ }
+
+ session.close();
+ }
+ finally
+ {
+ if (session != null)
+ {
+ session.close();
+ }
+ }
+ }
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ Configuration config = new ConfigurationImpl();
+ config.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getCanonicalName()));
+ config.setSecurityEnabled(false);
+ server = HornetQ.newHornetQServer(config, false);
+
+ server.start();
+
+ sf = new ClientSessionFactoryImpl(new TransportConfiguration(InVMConnectorFactory.class.getName()));
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ if (sf != null)
+ {
+ sf.close();
+ }
+
+ if (server != null)
+ {
+ server.stop();
+ }
+
+ sf = null;
+
+ server = null;
+
+ super.tearDown();
+ }
+}
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java 2009-11-10 20:03:11 UTC (rev 8256)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java 2009-11-10 22:54:21 UTC (rev 8257)
@@ -411,13 +411,19 @@
conn.fail(new HornetQException(HornetQException.NOT_CONNECTED));
- session.start();
+ //Should throw exception since didn't reconnect
+
+ try
+ {
+ session.start();
+
+ fail("Should throw exception");
+ }
+ catch (HornetQException e)
+ {
+ assertEquals(HornetQException.OBJECT_CLOSED, e.getCode());
+ }
- // Should be null since failed to reconnect
- ClientMessage message = consumer.receive(500);
-
- assertNull(message);
-
session.close();
sf.close();
Added: trunk/tests/src/org/hornetq/tests/integration/jms/client/SessionClosedOnRemotingConnectionFailureTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/client/SessionClosedOnRemotingConnectionFailureTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/client/SessionClosedOnRemotingConnectionFailureTest.java 2009-11-10 22:54:21 UTC (rev 8257)
@@ -0,0 +1,195 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.jms.client;
+
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_ACKNOWLEDGE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_PERSISTENT_SEND;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONFIRMATION_WINDOW_SIZE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_RETRY_INTERVAL;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_THREAD_POOL_MAX_SIZE;
+import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_USE_GLOBAL_POOLS;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.client.impl.ClientSessionInternal;
+import org.hornetq.core.config.TransportConfiguration;
+import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.integration.transports.netty.NettyConnectorFactory;
+import org.hornetq.jms.client.HornetQSession;
+import org.hornetq.tests.util.JMSTestBase;
+import org.hornetq.utils.Pair;
+
+/**
+ *
+ * A SessionClosedOnRemotingConnectionFailureTest
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public class SessionClosedOnRemotingConnectionFailureTest extends JMSTestBase
+{
+ // Constants -----------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(SessionClosedOnRemotingConnectionFailureTest.class);
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testSessionClosedOnRemotingConnectionFailure() throws Exception
+ {
+ List<Pair<TransportConfiguration, TransportConfiguration>> connectorConfigs = new ArrayList<Pair<TransportConfiguration, TransportConfiguration>>();
+ connectorConfigs.add(new Pair<TransportConfiguration, TransportConfiguration>(new TransportConfiguration(NettyConnectorFactory.class.getName()),
+ null));
+
+ List<String> jndiBindings = new ArrayList<String>();
+ jndiBindings.add("/cffoo");
+
+ jmsServer.createConnectionFactory("cffoo",
+ connectorConfigs,
+ null,
+ DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ DEFAULT_CONNECTION_TTL,
+ ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
+ DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
+ DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ DEFAULT_CONSUMER_WINDOW_SIZE,
+ DEFAULT_CONSUMER_MAX_RATE,
+ DEFAULT_CONFIRMATION_WINDOW_SIZE,
+ DEFAULT_PRODUCER_MAX_RATE,
+ DEFAULT_BLOCK_ON_ACKNOWLEDGE,
+ DEFAULT_BLOCK_ON_PERSISTENT_SEND,
+ DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND,
+ DEFAULT_AUTO_GROUP,
+ DEFAULT_PRE_ACKNOWLEDGE,
+ DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME,
+ DEFAULT_ACK_BATCH_SIZE,
+ DEFAULT_ACK_BATCH_SIZE,
+ DEFAULT_USE_GLOBAL_POOLS,
+ DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
+ DEFAULT_THREAD_POOL_MAX_SIZE,
+ DEFAULT_RETRY_INTERVAL,
+ DEFAULT_RETRY_INTERVAL_MULTIPLIER,
+ DEFAULT_MAX_RETRY_INTERVAL,
+ 0,
+ false,
+ jndiBindings);
+
+
+ cf = (ConnectionFactory)context.lookup("/cffoo");
+
+ Connection conn = cf.createConnection();
+
+ Queue queue = createQueue("testQueue");
+
+ try
+ {
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod = session.createProducer(queue);
+
+ MessageConsumer cons = session.createConsumer(queue);
+
+ conn.start();
+
+ prod.send(session.createMessage());
+
+ assertNotNull(cons.receive());
+
+ // Now fail the underlying connection
+
+ RemotingConnection connection = ((ClientSessionInternal)((HornetQSession)session).getCoreSession()).getConnection();
+
+ connection.fail(new HornetQException(HornetQException.NOT_CONNECTED));
+
+ // Now try and use the producer
+
+ try
+ {
+ prod.send(session.createMessage());
+
+ fail("Should throw exception");
+ }
+ catch (JMSException e)
+ {
+ // assertEquals(HornetQException.OBJECT_CLOSED, e.getCode());
+ }
+
+ try
+ {
+ cons.receive();
+
+ fail("Should throw exception");
+ }
+ catch (JMSException e)
+ {
+ // assertEquals(HornetQException.OBJECT_CLOSED, e.getCode());
+ }
+
+ session.close();
+
+ conn.close();
+ }
+ finally
+ {
+ try
+ {
+ conn.close();
+ }
+ catch (Throwable igonred)
+ {
+ }
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Modified: trunk/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java 2009-11-10 20:03:11 UTC (rev 8256)
+++ trunk/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java 2009-11-10 22:54:21 UTC (rev 8257)
@@ -251,7 +251,7 @@
public void testGetScheduledCount() throws Exception
{
- long delay = 2000;
+ long delay = 500;
SimpleString address = randomSimpleString();
SimpleString queue = randomSimpleString();
@@ -268,7 +268,7 @@
assertEquals(1, queueControl.getScheduledCount());
consumeMessages(0, session, queue);
- Thread.sleep(delay);
+ Thread.sleep(delay * 2);
assertEquals(0, queueControl.getScheduledCount());
consumeMessages(1, session, queue);
Modified: trunk/tests/src/org/hornetq/tests/util/JMSTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/util/JMSTestBase.java 2009-11-10 20:03:11 UTC (rev 8256)
+++ trunk/tests/src/org/hornetq/tests/util/JMSTestBase.java 2009-11-10 22:54:21 UTC (rev 8257)
@@ -63,7 +63,7 @@
protected HornetQServer server;
protected JMSServerManagerImpl jmsServer;
-
+
protected ConnectionFactory cf;
protected InVMContext context;
@@ -96,7 +96,7 @@
{
return false;
}
-
+
/**
* @throws Exception
* @throws NamingException
@@ -104,10 +104,9 @@
protected Queue createQueue(String name) throws Exception, NamingException
{
jmsServer.createQueue(name, "/jms/" + name, null, true);
-
+
return (Queue)context.lookup("/jms/" + name);
}
-
@Override
protected void setUp() throws Exception
@@ -178,9 +177,9 @@
jndiBindings.add("/cf");
createCF(connectorConfigs, jndiBindings);
-
+
cf = (ConnectionFactory)context.lookup("/cf");
-
+
}
/**
@@ -189,7 +188,7 @@
* @throws Exception
*/
protected void createCF(List<Pair<TransportConfiguration, TransportConfiguration>> connectorConfigs,
- List<String> jndiBindings) throws Exception
+ List<String> jndiBindings) throws Exception
{
int retryInterval = 1000;
double retryIntervalMultiplier = 1.0;
@@ -198,34 +197,34 @@
int callTimeout = 30000;
jmsServer.createConnectionFactory("ManualReconnectionToSingleServerTest",
- connectorConfigs,
- null,
- DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
- DEFAULT_CONNECTION_TTL,
- callTimeout,
- DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
- DEFAULT_MIN_LARGE_MESSAGE_SIZE,
- DEFAULT_CONSUMER_WINDOW_SIZE,
- DEFAULT_CONSUMER_MAX_RATE,
- DEFAULT_CONFIRMATION_WINDOW_SIZE,
- DEFAULT_PRODUCER_MAX_RATE,
- DEFAULT_BLOCK_ON_ACKNOWLEDGE,
- DEFAULT_BLOCK_ON_PERSISTENT_SEND,
- DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND,
- DEFAULT_AUTO_GROUP,
- DEFAULT_PRE_ACKNOWLEDGE,
- DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME,
- DEFAULT_ACK_BATCH_SIZE,
- DEFAULT_ACK_BATCH_SIZE,
- DEFAULT_USE_GLOBAL_POOLS,
- DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
- DEFAULT_THREAD_POOL_MAX_SIZE,
- retryInterval,
- retryIntervalMultiplier,
- DEFAULT_MAX_RETRY_INTERVAL,
- reconnectAttempts,
- failoverOnServerShutdown,
- jndiBindings);
+ connectorConfigs,
+ null,
+ DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ DEFAULT_CONNECTION_TTL,
+ callTimeout,
+ DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
+ DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ DEFAULT_CONSUMER_WINDOW_SIZE,
+ DEFAULT_CONSUMER_MAX_RATE,
+ DEFAULT_CONFIRMATION_WINDOW_SIZE,
+ DEFAULT_PRODUCER_MAX_RATE,
+ DEFAULT_BLOCK_ON_ACKNOWLEDGE,
+ DEFAULT_BLOCK_ON_PERSISTENT_SEND,
+ DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND,
+ DEFAULT_AUTO_GROUP,
+ DEFAULT_PRE_ACKNOWLEDGE,
+ DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME,
+ DEFAULT_ACK_BATCH_SIZE,
+ DEFAULT_ACK_BATCH_SIZE,
+ DEFAULT_USE_GLOBAL_POOLS,
+ DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
+ DEFAULT_THREAD_POOL_MAX_SIZE,
+ retryInterval,
+ retryIntervalMultiplier,
+ DEFAULT_MAX_RETRY_INTERVAL,
+ reconnectAttempts,
+ failoverOnServerShutdown,
+ jndiBindings);
}
}
15 years, 2 months
JBoss hornetq SVN: r8256 - in trunk: examples/jms/bridge/server0 and 17 other directories.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-11-10 15:03:11 -0500 (Tue, 10 Nov 2009)
New Revision: 8256
Modified:
trunk/examples/common/src/org/hornetq/common/example/HornetQExample.java
trunk/examples/jms/bridge/server0/hornetq-configuration.xml
trunk/examples/jms/bridge/server1/hornetq-configuration.xml
trunk/examples/jms/bridge/src/org/hornetq/jms/example/BridgeExample.java
trunk/examples/jms/bridge/src/org/hornetq/jms/example/HatColourChangeTransformer.java
trunk/examples/jms/large-message/build.xml
trunk/examples/jms/large-message/src/org/hornetq/jms/example/LargeMessageExample.java
trunk/src/config/common/schema/hornetq-configuration.xsd
trunk/src/main/org/hornetq/core/config/cluster/BridgeConfiguration.java
trunk/src/main/org/hornetq/core/config/cluster/ClusterConnectionConfiguration.java
trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java
trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java
trunk/src/main/org/hornetq/core/persistence/impl/journal/FileLargeServerMessage.java
trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/hornetq/core/server/ServerMessage.java
trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/hornetq/utils/XMLUtil.java
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithDiscoveryGroupStartTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
trunk/tests/src/org/hornetq/tests/integration/management/BridgeControlTest.java
trunk/tests/src/org/hornetq/tests/integration/management/BridgeControlUsingCoreTest.java
trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControl2Test.java
trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControlTest.java
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
Log:
mainly https://jira.jboss.org/jira/browse/HORNETQ-182
Modified: trunk/examples/common/src/org/hornetq/common/example/HornetQExample.java
===================================================================
--- trunk/examples/common/src/org/hornetq/common/example/HornetQExample.java 2009-11-10 17:25:45 UTC (rev 8255)
+++ trunk/examples/common/src/org/hornetq/common/example/HornetQExample.java 2009-11-10 20:03:11 UTC (rev 8256)
@@ -178,7 +178,10 @@
{
for (Process server : servers)
{
- stopServer(server);
+ if (server != null)
+ {
+ stopServer(server);
+ }
}
}
Modified: trunk/examples/jms/bridge/server0/hornetq-configuration.xml
===================================================================
--- trunk/examples/jms/bridge/server0/hornetq-configuration.xml 2009-11-10 17:25:45 UTC (rev 8255)
+++ trunk/examples/jms/bridge/server0/hornetq-configuration.xml 2009-11-10 20:03:11 UTC (rev 8256)
@@ -42,10 +42,23 @@
<filter string="name='aardvark'"/>
<transformer-class-name>org.hornetq.jms.example.HatColourChangeTransformer</transformer-class-name>
<reconnect-attempts>-1</reconnect-attempts>
+
+ <confirmation-window-size>500000</confirmation-window-size>
+
<connector-ref connector-name="remote-connector"/>
</bridge>
</bridges>
+
+
+ <address-settings>
+ <address-setting match="jms.queue.sausage-factory">
+ <max-size-bytes>10000000</max-size-bytes>
+ <page-size-bytes>1000000</page-size-bytes>
+ <address-full-policy>PAGE</address-full-policy>
+ </address-setting>
+ </address-settings>
+
<!-- Other config -->
<security-settings>
Modified: trunk/examples/jms/bridge/server1/hornetq-configuration.xml
===================================================================
--- trunk/examples/jms/bridge/server1/hornetq-configuration.xml 2009-11-10 17:25:45 UTC (rev 8255)
+++ trunk/examples/jms/bridge/server1/hornetq-configuration.xml 2009-11-10 20:03:11 UTC (rev 8256)
@@ -23,6 +23,15 @@
<!-- Other config -->
+ <address-settings>
+
+ <address-setting match="jms.queue.sausage-factory">
+ <max-size-bytes>10000</max-size-bytes>
+ <page-size-bytes>1000</page-size-bytes>
+ <address-full-policy>PAGE</address-full-policy>
+ </address-setting>
+ </address-settings>
+
<security-settings>
<!--security for example queue-->
<security-setting match="jms.queue.#">
Modified: trunk/examples/jms/bridge/src/org/hornetq/jms/example/BridgeExample.java
===================================================================
--- trunk/examples/jms/bridge/src/org/hornetq/jms/example/BridgeExample.java 2009-11-10 17:25:45 UTC (rev 8255)
+++ trunk/examples/jms/bridge/src/org/hornetq/jms/example/BridgeExample.java 2009-11-10 20:03:11 UTC (rev 8256)
@@ -12,6 +12,7 @@
*/
package org.hornetq.jms.example;
+import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
@@ -98,42 +99,61 @@
// Step 12. We create a JMS MessageProducer object on server 0
MessageProducer producer = session0.createProducer(sausageFactory);
- // Step 13. We create and send a message representing an aardvark with a green hat to the sausage-factory
- // on node 0
- Message message = session0.createMessage();
+ final int numMessages = 100000;
+
+ byte[] bytes = new byte[1000];
- message.setStringProperty("name", "aardvark");
+ for (int i = 0; i < numMessages; i++)
+ {
- message.setStringProperty("hat", "green");
+ // Step 13. We create and send a message representing an aardvark with a green hat to the sausage-factory
+ // on node 0
+ BytesMessage message = session0.createBytesMessage();
- producer.send(message);
+ message.setStringProperty("name", "aardvark");
- System.out.println("Sent " + message.getStringProperty("name") +
- " message with " +
- message.getStringProperty("hat") +
- " hat to sausage-factory on node 0");
+ message.setStringProperty("hat", "green");
+
+ message.writeBytes(bytes);
+ producer.send(message);
+
+ if (i % 1000 == 0)
+ {
+ System.out.println("Sent " + i);
+ }
+ }
+
// Step 14 - we successfully receive the aardvark message from the mincing-machine one node 1. The aardvark's
// hat is now blue since it has been transformed!
- Message receivedMessage = consumer.receive(5000);
+ for (int i = 0; i < numMessages; i++)
+ {
+ Message receivedMessage = consumer.receive(5000);
+
+ if (receivedMessage == null)
+ {
+ throw new IllegalStateException("Did not receive message");
+ }
- System.out.println("Received " + receivedMessage.getStringProperty("name") +
- " message with " +
- receivedMessage.getStringProperty("hat") +
- " hat from mincing-machine on node 1");
+ if (i % 1000 == 0)
+ {
+ System.out.println("Received " + i);
+ }
- // Step 13. We create and send another message, this time representing a sasquatch with a mauve hat to the
- // sausage-factory on node 0. This won't be bridged to the mincing-machine since we only want aardvarks, not sasquatches
+ // Step 13. We create and send another message, this time representing a sasquatch with a mauve hat to the
+ // sausage-factory on node 0. This won't be bridged to the mincing-machine since we only want aardvarks, not
+ // sasquatches
+ }
+
+ Message message = session0.createMessage();
- message = session0.createMessage();
-
message.setStringProperty("name", "sasquatch");
- message.setStringProperty("hat", "mauve");
+ message.setStringProperty("hat", "mauve");
producer.send(message);
-
+
System.out.println("Sent " + message.getStringProperty("name") +
" message with " +
message.getStringProperty("hat") +
@@ -141,7 +161,7 @@
// Step 14. We don't receive the message since it has not been bridged.
- receivedMessage = (TextMessage)consumer.receive(1000);
+ Message receivedMessage = (TextMessage)consumer.receive(1000);
if (receivedMessage == null)
{
Modified: trunk/examples/jms/bridge/src/org/hornetq/jms/example/HatColourChangeTransformer.java
===================================================================
--- trunk/examples/jms/bridge/src/org/hornetq/jms/example/HatColourChangeTransformer.java 2009-11-10 17:25:45 UTC (rev 8255)
+++ trunk/examples/jms/bridge/src/org/hornetq/jms/example/HatColourChangeTransformer.java 2009-11-10 20:03:11 UTC (rev 8256)
@@ -33,7 +33,7 @@
SimpleString oldProp = message.getSimpleStringProperty(propName);
- System.out.println("Old hat colour is " + oldProp);
+ //System.out.println("Old hat colour is " + oldProp);
//Change the colour
message.putStringProperty(propName, new SimpleString("blue"));
Modified: trunk/examples/jms/large-message/build.xml
===================================================================
--- trunk/examples/jms/large-message/build.xml 2009-11-10 17:25:45 UTC (rev 8255)
+++ trunk/examples/jms/large-message/build.xml 2009-11-10 20:03:11 UTC (rev 8256)
@@ -23,7 +23,7 @@
<antcall target="runExample">
<param name="example.classname" value="org.hornetq.jms.example.LargeMessageExample"/>
- <!-- We limit the client to running in only 50MB of RAM -->
+ <!-- We limit the client to running in only 50MB of RAM -->
<param name="java-min-memory" value="50M"/>
<param name="java-max-memory" value="50M"/>
</antcall>
@@ -36,7 +36,7 @@
</antcall>
</target>
- <target name="delete-large-messages">
+ <target name="delete-large-messages" depends="clean">
<delete file="huge_message_to_send.dat"/>
<delete file="huge_message_received.dat"/>
</target>
Modified: trunk/examples/jms/large-message/src/org/hornetq/jms/example/LargeMessageExample.java
===================================================================
--- trunk/examples/jms/large-message/src/org/hornetq/jms/example/LargeMessageExample.java 2009-11-10 17:25:45 UTC (rev 8255)
+++ trunk/examples/jms/large-message/src/org/hornetq/jms/example/LargeMessageExample.java 2009-11-10 20:03:11 UTC (rev 8256)
@@ -45,11 +45,13 @@
new LargeMessageExample().run(args);
}
- // The message we will send is size 256MB, even though we are only running in 50MB of RAM on both client and server.
- // HornetQ will support much larger message sizes, but we use 512MB so the example runs in reasonable time.
- // private final long FILE_SIZE = 256L * 1024 * 1024;
+ // The message we will send is size 10GiB, even though we are only running in 50MB of RAM on both client and server.
+ // This may take some considerable time to create, send and consume - if it takes too long or you don't have
+ // enough disk space just reduce the file size here
- private final long FILE_SIZE = 2L * 1024 * 1024 * 1024; // 2 GiB message
+ // private final long FILE_SIZE = 256L * 1024 * 1024;
+
+ private final long FILE_SIZE = 10L * 1024 * 1024 * 1024; // 10 GiB message
public boolean runExample() throws Exception
{
Modified: trunk/src/config/common/schema/hornetq-configuration.xsd
===================================================================
--- trunk/src/config/common/schema/hornetq-configuration.xsd 2009-11-10 17:25:45 UTC (rev 8255)
+++ trunk/src/config/common/schema/hornetq-configuration.xsd 2009-11-10 20:03:11 UTC (rev 8256)
@@ -316,6 +316,8 @@
</xsd:element>
<xsd:element maxOccurs="1" minOccurs="0" name="use-duplicate-detection" type="xsd:boolean">
</xsd:element>
+ <xsd:element maxOccurs="1" minOccurs="0" name="confirmation-window-size" type="xsd:int">
+ </xsd:element>
<xsd:choice>
<xsd:element maxOccurs="1" minOccurs="1" name="connector-ref" type="connector-refType">
</xsd:element>
@@ -342,7 +344,8 @@
</xsd:element>
<xsd:element maxOccurs="1" minOccurs="0" name="max-hops" type="xsd:int">
</xsd:element>
-
+ <xsd:element maxOccurs="1" minOccurs="0" name="confirmation-window-size" type="xsd:int">
+ </xsd:element>
<xsd:choice>
<xsd:element maxOccurs="unbounded" minOccurs="1" name="connector-ref" type="connector-refType">
</xsd:element>
Modified: trunk/src/main/org/hornetq/core/config/cluster/BridgeConfiguration.java
===================================================================
--- trunk/src/main/org/hornetq/core/config/cluster/BridgeConfiguration.java 2009-11-10 17:25:45 UTC (rev 8255)
+++ trunk/src/main/org/hornetq/core/config/cluster/BridgeConfiguration.java 2009-11-10 20:03:11 UTC (rev 8256)
@@ -53,6 +53,8 @@
private boolean failoverOnServerShutdown;
private boolean useDuplicateDetection;
+
+ private int confirmationWindowSize;
public BridgeConfiguration(final String name,
final String queueName,
@@ -64,6 +66,7 @@
final int reconnectAttempts,
final boolean failoverOnServerShutdown,
final boolean useDuplicateDetection,
+ final int confirmationWindowSize,
final Pair<String, String> connectorPair)
{
this.name = name;
@@ -76,6 +79,7 @@
this.reconnectAttempts = reconnectAttempts;
this.failoverOnServerShutdown = failoverOnServerShutdown;
this.useDuplicateDetection = useDuplicateDetection;
+ this.confirmationWindowSize = confirmationWindowSize;
this.connectorPair = connectorPair;
this.discoveryGroupName = null;
}
@@ -90,6 +94,7 @@
final int reconnectAttempts,
final boolean failoverOnServerShutdown,
final boolean useDuplicateDetection,
+ final int confirmationWindowSize,
final String discoveryGroupName)
{
this.name = name;
@@ -102,6 +107,7 @@
this.reconnectAttempts = reconnectAttempts;
this.failoverOnServerShutdown = failoverOnServerShutdown;
this.useDuplicateDetection = useDuplicateDetection;
+ this.confirmationWindowSize = confirmationWindowSize;
this.connectorPair = null;
this.discoveryGroupName = discoveryGroupName;
}
@@ -165,6 +171,11 @@
{
return useDuplicateDetection;
}
+
+ public int getConfirmationWindowSize()
+ {
+ return confirmationWindowSize;
+ }
/**
* @param name the name to set
@@ -261,4 +272,12 @@
{
this.useDuplicateDetection = useDuplicateDetection;
}
+
+ /**
+ * @param confirmationWindowSize the confirmationWindowSize to set
+ */
+ public void setConfirmationWindowSize(int confirmationWindowSize)
+ {
+ this.confirmationWindowSize = confirmationWindowSize;
+ }
}
Modified: trunk/src/main/org/hornetq/core/config/cluster/ClusterConnectionConfiguration.java
===================================================================
--- trunk/src/main/org/hornetq/core/config/cluster/ClusterConnectionConfiguration.java 2009-11-10 17:25:45 UTC (rev 8255)
+++ trunk/src/main/org/hornetq/core/config/cluster/ClusterConnectionConfiguration.java 2009-11-10 20:03:11 UTC (rev 8256)
@@ -47,12 +47,15 @@
private final int maxHops;
+ private final int confirmationWindowSize;
+
public ClusterConnectionConfiguration(final String name,
final String address,
final long retryInterval,
final boolean duplicateDetection,
final boolean forwardWhenNoConsumers,
final int maxHops,
+ final int confirmationWindowSize,
final List<Pair<String, String>> staticConnectorNamePairs)
{
this.name = name;
@@ -63,6 +66,7 @@
this.forwardWhenNoConsumers = forwardWhenNoConsumers;
this.discoveryGroupName = null;
this.maxHops = maxHops;
+ this.confirmationWindowSize = confirmationWindowSize;
}
public ClusterConnectionConfiguration(final String name,
@@ -71,6 +75,7 @@
final boolean duplicateDetection,
final boolean forwardWhenNoConsumers,
final int maxHops,
+ final int confirmationWindowSize,
final String discoveryGroupName)
{
this.name = name;
@@ -81,6 +86,7 @@
this.discoveryGroupName = discoveryGroupName;
this.staticConnectorNamePairs = null;
this.maxHops = maxHops;
+ this.confirmationWindowSize = confirmationWindowSize;
}
public String getName()
@@ -108,6 +114,11 @@
return maxHops;
}
+ public int getConfirmationWindowSize()
+ {
+ return confirmationWindowSize;
+ }
+
public List<Pair<String, String>> getStaticConnectorNamePairs()
{
return staticConnectorNamePairs;
Modified: trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java 2009-11-10 17:25:45 UTC (rev 8255)
+++ trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java 2009-11-10 20:03:11 UTC (rev 8256)
@@ -154,7 +154,7 @@
public static final int DEFAULT_CLUSTER_MAX_HOPS = 1;
- public static final int DEFAULT_CLUSTER_RETRY_INTERVAL = 500;
+ public static final long DEFAULT_CLUSTER_RETRY_INTERVAL = 500;
public static final boolean DEFAULT_DIVERT_EXCLUSIVE = false;
@@ -166,7 +166,7 @@
public static final int DEFAULT_MEMORY_WARNING_THRESHOLD = 25;
- public static final long DEFAULT_MEMORY_MEASURE_INTERVAL = 3000; // in milliseconds
+ public static final long DEFAULT_MEMORY_MEASURE_INTERVAL = -1; // in milliseconds
public static final int DEFAULT_BACKUP_WINDOW_SIZE = 1024 * 1024;
Modified: trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java
===================================================================
--- trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java 2009-11-10 17:25:45 UTC (rev 8255)
+++ trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java 2009-11-10 20:03:11 UTC (rev 8256)
@@ -36,7 +36,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.config.TransportConfiguration;
@@ -74,25 +73,28 @@
private static final String DEFAULT_CONFIGURATION_URL = "hornetq-configuration.xml";
private static final String CONFIGURATION_SCHEMA_URL = "schema/hornetq-configuration.xsd";
+
+ //For a bridge confirmations must be activated or send acknowledgements won't return
+
+ public static final int DEFAULT_CONFIRMATION_WINDOW_SIZE = 1024 * 1024;
// Static --------------------------------------------------------------------------
-
+
// Attributes ----------------------------------------------------------------------
private String configurationUrl = DEFAULT_CONFIGURATION_URL;
-
private boolean started;
-
+
// Public -------------------------------------------------------------------------
public synchronized void start() throws Exception
- {
+ {
if (started)
{
return;
}
-
+
URL url = getClass().getClassLoader().getResource(configurationUrl);
log.debug("Loading server configuration from " + url);
@@ -105,20 +107,22 @@
clustered = getBoolean(e, "clustered", clustered);
backup = getBoolean(e, "backup", backup);
-
+
sharedStore = getBoolean(e, "shared-store", sharedStore);
-
- //Defaults to true when using FileConfiguration
+
+ // Defaults to true when using FileConfiguration
fileDeploymentEnabled = getBoolean(e, "file-deployment-enabled", true);
-
+
persistenceEnabled = getBoolean(e, "persistence-enabled", persistenceEnabled);
- persistDeliveryCountBeforeDelivery = getBoolean(e, "persist-delivery-count-before-delivery", persistDeliveryCountBeforeDelivery);
-
+ persistDeliveryCountBeforeDelivery = getBoolean(e,
+ "persist-delivery-count-before-delivery",
+ persistDeliveryCountBeforeDelivery);
+
// NOTE! All the defaults come from the super class
scheduledThreadPoolMaxSize = getInteger(e, "scheduled-thread-pool-max-size", scheduledThreadPoolMaxSize, GT_ZERO);
-
+
threadPoolMaxSize = getInteger(e, "thread-pool-max-size", threadPoolMaxSize, MINUS_ONE_OR_GT_ZERO);
securityEnabled = getBoolean(e, "security-enabled", securityEnabled);
@@ -130,35 +134,53 @@
securityInvalidationInterval = getLong(e, "security-invalidation-interval", securityInvalidationInterval, GT_ZERO);
connectionTTLOverride = getLong(e, "connection-ttl-override", connectionTTLOverride, MINUS_ONE_OR_GT_ZERO);
-
- asyncConnectionExecutionEnabled = getBoolean(e, "async-connection-execution-enabled", asyncConnectionExecutionEnabled);
+ asyncConnectionExecutionEnabled = getBoolean(e,
+ "async-connection-execution-enabled",
+ asyncConnectionExecutionEnabled);
+
transactionTimeout = getLong(e, "transaction-timeout", transactionTimeout, GT_ZERO);
- transactionTimeoutScanPeriod = getLong(e, "transaction-timeout-scan-period", transactionTimeoutScanPeriod, GT_ZERO);
+ transactionTimeoutScanPeriod = getLong(e,
+ "transaction-timeout-scan-period",
+ transactionTimeoutScanPeriod,
+ GT_ZERO);
messageExpiryScanPeriod = getLong(e, "message-expiry-scan-period", messageExpiryScanPeriod, GT_ZERO);
- messageExpiryThreadPriority = getInteger(e, "message-expiry-thread-priority", messageExpiryThreadPriority, THREAD_PRIORITY_RANGE);
+ messageExpiryThreadPriority = getInteger(e,
+ "message-expiry-thread-priority",
+ messageExpiryThreadPriority,
+ THREAD_PRIORITY_RANGE);
idCacheSize = getInteger(e, "id-cache-size", idCacheSize, GT_ZERO);
persistIDCache = getBoolean(e, "persist-id-cache", persistIDCache);
- managementAddress = new SimpleString(getString(e, "management-address", managementAddress.toString(), NOT_NULL_OR_EMPTY));
+ managementAddress = new SimpleString(getString(e,
+ "management-address",
+ managementAddress.toString(),
+ NOT_NULL_OR_EMPTY));
managementNotificationAddress = new SimpleString(getString(e,
"management-notification-address",
- managementNotificationAddress.toString(), NOT_NULL_OR_EMPTY));
+ managementNotificationAddress.toString(),
+ NOT_NULL_OR_EMPTY));
- managementClusterPassword = getString(e, "management-cluster-password", managementClusterPassword, NOT_NULL_OR_EMPTY);
+ managementClusterPassword = getString(e,
+ "management-cluster-password",
+ managementClusterPassword,
+ NOT_NULL_OR_EMPTY);
managementClusterUser = getString(e, "management-cluster-user", managementClusterUser, NOT_NULL_OR_EMPTY);
managementRequestTimeout = getLong(e, "management-request-timeout", managementRequestTimeout, GT_ZERO);
-
- logDelegateFactoryClassName = getString(e, "log-delegate-factory-class-name", logDelegateFactoryClassName, NOT_NULL_OR_EMPTY);
+ logDelegateFactoryClassName = getString(e,
+ "log-delegate-factory-class-name",
+ logDelegateFactoryClassName,
+ NOT_NULL_OR_EMPTY);
+
NodeList interceptorNodes = e.getElementsByTagName("remoting-interceptors");
ArrayList<String> interceptorList = new ArrayList<String>();
@@ -259,7 +281,7 @@
for (int i = 0; i < gaNodes.getLength(); i++)
{
- Element gaNode = (Element) gaNodes.item(i);
+ Element gaNode = (Element)gaNodes.item(i);
parseGroupingHandlerConfiguration(gaNode);
}
@@ -312,23 +334,23 @@
journalSyncNonTransactional = getBoolean(e, "journal-sync-non-transactional", journalSyncNonTransactional);
journalFileSize = getInteger(e, "journal-file-size", journalFileSize, GT_ZERO);
-
+
journalAIOFlushSync = getBoolean(e, "journal-aio-flush-on-sync", DEFAULT_JOURNAL_AIO_FLUSH_SYNC);
-
+
journalAIOBufferTimeout = getInteger(e, "journal-aio-buffer-timeout", DEFAULT_JOURNAL_AIO_BUFFER_TIMEOUT, GT_ZERO);
-
+
journalAIOBufferSize = getInteger(e, "journal-aio-buffer-size", DEFAULT_JOURNAL_AIO_BUFFER_SIZE, GT_ZERO);
journalMinFiles = getInteger(e, "journal-min-files", journalMinFiles, GT_ZERO);
-
+
journalCompactMinFiles = getInteger(e, "journal-compact-min-files", journalCompactMinFiles, GE_ZERO);
journalCompactPercentage = getInteger(e, "journal-compact-percentage", journalCompactPercentage, PERCENTAGE);
journalMaxAIO = getInteger(e, "journal-max-aio", journalMaxAIO, GT_ZERO);
-
+
logJournalWriteRate = getBoolean(e, "log-journal-write-rate", DEFAULT_JOURNAL_LOG_WRITE_RATE);
-
+
journalPerfBlastPages = getInteger(e, "perf-blast-pages", DEFAULT_JOURNAL_PERF_BLAST_PAGES, MINUS_ONE_OR_GT_ZERO);
runSyncSpeedTest = getBoolean(e, "run-sync-speed-test", runSyncSpeedTest);
@@ -339,23 +361,28 @@
messageCounterSamplePeriod = getLong(e, "message-counter-sample-period", messageCounterSamplePeriod, GT_ZERO);
- messageCounterMaxDayHistory = getInteger(e, "message-counter-max-day-history", messageCounterMaxDayHistory, GT_ZERO);
-
- serverDumpInterval = getLong(e, "server-dump-interval", serverDumpInterval, MINUS_ONE_OR_GT_ZERO); // in milliseconds
+ messageCounterMaxDayHistory = getInteger(e,
+ "message-counter-max-day-history",
+ messageCounterMaxDayHistory,
+ GT_ZERO);
+ serverDumpInterval = getLong(e, "server-dump-interval", serverDumpInterval, MINUS_ONE_OR_GT_ZERO); // in
+ // milliseconds
+
memoryWarningThreshold = getInteger(e, "memory-warning-threshold", memoryWarningThreshold, PERCENTAGE);
-
- memoryMeasureInterval = getLong(e, "memory-measure-interval", memoryMeasureInterval, MINUS_ONE_OR_GT_ZERO); // in milliseconds
-
+
+ memoryMeasureInterval = getLong(e, "memory-measure-interval", memoryMeasureInterval, MINUS_ONE_OR_GT_ZERO); // in
+ // milliseconds
+
backupWindowSize = getInteger(e, "backup-window-size", DEFAULT_BACKUP_WINDOW_SIZE, MINUS_ONE_OR_GT_ZERO);
-
+
started = true;
}
-
+
public synchronized void stop() throws Exception
{
super.stop();
-
+
started = false;
}
@@ -386,7 +413,7 @@
for (int i = 0; i < paramsNodes.getLength(); i++)
{
Node paramNode = paramsNodes.item(i);
- NamedNodeMap attributes =paramNode.getAttributes();
+ NamedNodeMap attributes = paramNode.getAttributes();
Node nkey = attributes.getNamedItem("key");
@@ -405,7 +432,7 @@
String name = e.getAttribute("name");
String localAddress = getString(e, "local-bind-address", null, NO_CHECK);
-
+
int localBindPort = getInteger(e, "local-bind-port", -1, MINUS_ONE_OR_GT_ZERO);
String groupAddress = getString(e, "group-address", null, NOT_NULL_OR_EMPTY);
@@ -487,12 +514,16 @@
boolean duplicateDetection = getBoolean(e, "use-duplicate-detection", DEFAULT_CLUSTER_DUPLICATE_DETECTION);
- boolean forwardWhenNoConsumers = getBoolean(e, "forward-when-no-consumers", DEFAULT_CLUSTER_FORWARD_WHEN_NO_CONSUMERS);
+ boolean forwardWhenNoConsumers = getBoolean(e,
+ "forward-when-no-consumers",
+ DEFAULT_CLUSTER_FORWARD_WHEN_NO_CONSUMERS);
int maxHops = getInteger(e, "max-hops", DEFAULT_CLUSTER_MAX_HOPS, GE_ZERO);
- long retryInterval = getLong(e, "retry-interval", (long)DEFAULT_CLUSTER_RETRY_INTERVAL, GT_ZERO);
+ long retryInterval = getLong(e, "retry-interval", DEFAULT_CLUSTER_RETRY_INTERVAL, GT_ZERO);
+ int confirmationWindowSize = getInteger(e, "confirmation-window-size", DEFAULT_CONFIRMATION_WINDOW_SIZE, GT_ZERO);
+
String discoveryGroupName = null;
List<Pair<String, String>> connectorPairs = new ArrayList<Pair<String, String>>();
@@ -531,21 +562,23 @@
if (discoveryGroupName == null)
{
config = new ClusterConnectionConfiguration(name,
- address,
- retryInterval,
+ address,
+ retryInterval,
duplicateDetection,
forwardWhenNoConsumers,
maxHops,
+ confirmationWindowSize,
connectorPairs);
}
else
{
config = new ClusterConnectionConfiguration(name,
address,
- retryInterval,
+ retryInterval,
duplicateDetection,
forwardWhenNoConsumers,
maxHops,
+ confirmationWindowSize,
discoveryGroupName);
}
@@ -556,15 +589,15 @@
{
String name = node.getAttribute("name");
String type = getString(node, "type", null, NOT_NULL_OR_EMPTY);
- String address = getString(node, "address",null, NOT_NULL_OR_EMPTY);
+ String address = getString(node, "address", null, NOT_NULL_OR_EMPTY);
Integer timeout = getInteger(node, "timeout", GroupingHandlerConfiguration.DEFAULT_TIMEOUT, GT_ZERO);
groupingHandlerConfiguration = new GroupingHandlerConfiguration(new SimpleString(name),
- type.equals(GroupingHandlerConfiguration.TYPE.LOCAL.getType())? GroupingHandlerConfiguration.TYPE.LOCAL: GroupingHandlerConfiguration.TYPE.REMOTE,
- new SimpleString(address),
- timeout);
+ type.equals(GroupingHandlerConfiguration.TYPE.LOCAL.getType()) ? GroupingHandlerConfiguration.TYPE.LOCAL
+ : GroupingHandlerConfiguration.TYPE.REMOTE,
+ new SimpleString(address),
+ timeout);
}
-
private void parseBridgeConfiguration(final Element brNode)
{
String name = brNode.getAttribute("name");
@@ -576,17 +609,28 @@
String transformerClassName = getString(brNode, "transformer-class-name", null, NO_CHECK);
long retryInterval = getLong(brNode, "retry-interval", DEFAULT_RETRY_INTERVAL, GT_ZERO);
-
- double retryIntervalMultiplier = getDouble(brNode, "retry-interval-multiplier", DEFAULT_RETRY_INTERVAL_MULTIPLIER, GT_ZERO);
- int reconnectAttempts = getInteger(brNode, "reconnect-attempts", DEFAULT_BRIDGE_RECONNECT_ATTEMPTS, MINUS_ONE_OR_GE_ZERO);
+ //Default bridge conf
+ int confirmationWindowSize = getInteger(brNode, "confirmation-window-size", DEFAULT_CONFIRMATION_WINDOW_SIZE, GT_ZERO);
+
+ double retryIntervalMultiplier = getDouble(brNode,
+ "retry-interval-multiplier",
+ DEFAULT_RETRY_INTERVAL_MULTIPLIER,
+ GT_ZERO);
- boolean failoverOnServerShutdown = getBoolean(brNode, "failover-on-server-shutdown", ClientSessionFactoryImpl.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN);
+ int reconnectAttempts = getInteger(brNode,
+ "reconnect-attempts",
+ DEFAULT_BRIDGE_RECONNECT_ATTEMPTS,
+ MINUS_ONE_OR_GE_ZERO);
+ boolean failoverOnServerShutdown = getBoolean(brNode,
+ "failover-on-server-shutdown",
+ ClientSessionFactoryImpl.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN);
+
boolean useDuplicateDetection = getBoolean(brNode, "use-duplicate-detection", DEFAULT_BRIDGE_DUPLICATE_DETECTION);
String filterString = null;
-
+
Pair<String, String> connectorPair = null;
String discoveryGroupName = null;
@@ -636,6 +680,7 @@
reconnectAttempts,
failoverOnServerShutdown,
useDuplicateDetection,
+ confirmationWindowSize,
connectorPair);
}
else
@@ -650,6 +695,7 @@
reconnectAttempts,
failoverOnServerShutdown,
useDuplicateDetection,
+ confirmationWindowSize,
discoveryGroupName);
}
Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/FileLargeServerMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/FileLargeServerMessage.java 2009-11-10 17:25:45 UTC (rev 8255)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/FileLargeServerMessage.java 2009-11-10 20:03:11 UTC (rev 8256)
@@ -15,19 +15,17 @@
import static org.hornetq.utils.DataConstants.SIZE_INT;
-import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.paging.PagingStore;
+import org.hornetq.core.message.BodyEncoder;
import org.hornetq.core.remoting.spi.HornetQBuffer;
import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.ServerMessage;
-import org.hornetq.core.message.BodyEncoder;
import org.hornetq.core.server.impl.ServerMessageImpl;
/**
@@ -222,9 +220,9 @@
}
@Override
- public synchronized int decrementRefCount(PagingStore pagingStore, MessageReference reference) throws Exception
+ public synchronized int decrementRefCount(MessageReference reference) throws Exception
{
- int currentRefCount = super.decrementRefCount(pagingStore, reference);
+ int currentRefCount = super.decrementRefCount(reference);
// We use <= as this could be used by load.
// because of a failure, no references were loaded, so we have 0... and we still need to delete the associated
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-10 17:25:45 UTC (rev 8255)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-10 20:03:11 UTC (rev 8256)
@@ -245,7 +245,7 @@
{
throw new IllegalArgumentException("ID is null");
}
-
+
long id = props.getLongProperty(ManagementHelper.HDR_BINDING_ID);
SimpleString filterString = props.getSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING);
@@ -254,7 +254,7 @@
{
throw new IllegalArgumentException("Distance is null");
}
-
+
int distance = props.getIntProperty(ManagementHelper.HDR_DISTANCE);
QueueInfo info = new QueueInfo(routingName, clusterName, address, filterString, id, distance);
@@ -271,7 +271,7 @@
{
throw new IllegalStateException("No cluster name");
}
-
+
SimpleString clusterName = props.getSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME);
QueueInfo info = queueInfos.remove(clusterName);
@@ -291,7 +291,7 @@
{
throw new IllegalStateException("No cluster name");
}
-
+
SimpleString clusterName = props.getSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME);
SimpleString filterString = props.getSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING);
@@ -606,7 +606,7 @@
cache.addToCache(duplicateIDBytes, context.getTransaction());
}
-
+
setPagingStore(message);
if (context.getTransaction() == null)
@@ -679,7 +679,7 @@
public MessageReference reroute(final ServerMessage message, final Queue queue, final Transaction tx) throws Exception
{
setPagingStore(message);
-
+
MessageReference reference = message.createReference(queue);
if (message.containsProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME))
@@ -690,10 +690,8 @@
message.incrementDurableRefCount();
- PagingStore store = pagingManager.getPageStore(message.getDestination());
+ message.incrementRefCount(reference);
- message.incrementRefCount(store, reference);
-
if (tx == null)
{
queue.addLast(reference);
@@ -843,7 +841,7 @@
{
PagingStore store = pagingManager.getPageStore(message.getDestination());
- message.setPagingStore(store);
+ message.setPagingStore(store);
}
private void routeDirect(final ServerMessage message, final Queue queue, final boolean applyFilters) throws Exception
@@ -864,8 +862,6 @@
Transaction tx = context.getTransaction();
- PagingStore store = pagingManager.getPageStore(message.getDestination());
-
for (Queue queue : context.getQueues())
{
MessageReference reference = message.createReference(queue);
@@ -918,7 +914,7 @@
}
}
- message.incrementRefCount(store, reference);
+ message.incrementRefCount(reference);
}
if (tx != null)
@@ -1255,9 +1251,7 @@
message.decrementDurableRefCount();
}
- PagingStore store = pagingManager.getPageStore(message.getDestination());
-
- message.decrementRefCount(store, ref);
+ message.decrementRefCount(ref);
}
}
}
Modified: trunk/src/main/org/hornetq/core/server/ServerMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerMessage.java 2009-11-10 17:25:45 UTC (rev 8255)
+++ trunk/src/main/org/hornetq/core/server/ServerMessage.java 2009-11-10 20:03:11 UTC (rev 8256)
@@ -31,18 +31,16 @@
MessageReference createReference(Queue queue);
- int incrementRefCount(PagingStore pagingStore, MessageReference reference)
- throws Exception;
+ int incrementRefCount(MessageReference reference) throws Exception;
- int decrementRefCount(PagingStore pagingStore, MessageReference reference)
- throws Exception;
-
+ int decrementRefCount(MessageReference reference) throws Exception;
+
int incrementDurableRefCount();
int decrementDurableRefCount();
ServerMessage copy(long newID) throws Exception;
-
+
ServerMessage copy() throws Exception;
int getMemoryEstimate();
@@ -50,16 +48,16 @@
int getRefCount();
ServerMessage makeCopyForExpiryOrDLA(long newID, boolean expiry) throws Exception;
-
- void setOriginalHeaders(ServerMessage other, boolean expiry);
-
+
+ void setOriginalHeaders(ServerMessage other, boolean expiry);
+
void setPagingStore(PagingStore store);
-
+
PagingStore getPagingStore();
-
+
boolean page(boolean duplicateDetection) throws Exception;
-
+
boolean page(long transactionID, boolean duplicateDetection) throws Exception;
-
+
boolean storeIsPaging();
}
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2009-11-10 17:25:45 UTC (rev 8255)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2009-11-10 20:03:11 UTC (rev 8256)
@@ -117,6 +117,8 @@
private final boolean failoverOnServerShutdown;
+ private final int confirmationWindowSize;
+
private final SimpleString idsHeaderName;
private MessageFlowRecord flowRecord;
@@ -133,6 +135,8 @@
private NotificationService notificationService;
+ private ClientConsumer notifConsumer;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -160,6 +164,7 @@
final int reconnectAttempts,
final boolean failoverOnServerShutdown,
final boolean useDuplicateDetection,
+ final int confirmationWindowSize,
final SimpleString managementAddress,
final SimpleString managementNotificationAddress,
final String clusterUser,
@@ -184,6 +189,13 @@
this.useDuplicateDetection = useDuplicateDetection;
+ if (!(confirmationWindowSize > 0))
+ {
+ throw new IllegalStateException("confirmation-window-size must be > 0 for a bridge");
+ }
+
+ this.confirmationWindowSize = confirmationWindowSize;
+
this.discoveryAddress = discoveryAddress;
this.discoveryPort = discoveryPort;
@@ -253,9 +265,11 @@
}
Queue queue = null;
+
for (MessageReference ref2 : list)
{
queue = ref2.getQueue();
+
queue.cancel(ref2);
}
@@ -381,7 +395,7 @@
{
return HandleStatus.NO_MATCH;
}
-
+
synchronized (this)
{
if (!active)
@@ -521,15 +535,15 @@
{
active = false;
}
-
+
cancelRefs();
}
else
{
setupNotificationConsumer();
-
+
active = true;
-
+
if (queue != null)
{
queue.deliverAsync(executor);
@@ -543,8 +557,6 @@
}
}
- private ClientConsumer notifConsumer;
-
// TODO - we should move this code to the ClusterConnectorImpl - and just execute it when the bridge
// connection is opened and closed - we can use
// a callback to tell us that
@@ -552,7 +564,6 @@
{
if (flowRecord != null)
{
-
if (notifConsumer != null)
{
try
@@ -600,7 +611,7 @@
"%')");
session.createQueue(managementNotificationAddress, notifQueueName, filter, false);
-
+
notifConsumer = session.createConsumer(notifQueueName);
notifConsumer.setMessageHandler(flowRecord);
@@ -645,6 +656,10 @@
csf.setReconnectAttempts(reconnectAttempts);
csf.setBlockOnPersistentSend(false);
+ // Must have confirmations enabled so we get send acks
+
+ csf.setConfirmationWindowSize(confirmationWindowSize);
+
// Session is pre-acknowledge
session = (ClientSessionInternal)csf.createSession(clusterUser, clusterPassword, false, true, true, true, 1);
@@ -732,41 +747,4 @@
}
}
}
-
- // private class FailRunnable implements Runnable
- // {
- // public void run()
- // {
- // synchronized (BridgeImpl.this)
- // {
- // if (!started)
- // {
- // return;
- // }
- //
- // active = false;
- // }
- //
- // try
- // {
- // queue.removeConsumer(BridgeImpl.this);
- //
- // session.cleanUp();
- //
- // cancelRefs();
- //
- // csf.close();
- // }
- // catch (Exception e)
- // {
- // log.error("Failed to stop", e);
- // }
- //
- // if (!createObjects())
- // {
- // started = false;
- // }
- // }
- // }
-
}
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2009-11-10 17:25:45 UTC (rev 8255)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2009-11-10 20:03:11 UTC (rev 8256)
@@ -81,6 +81,8 @@
private final long retryInterval;
private final boolean useDuplicateDetection;
+
+ private final int confirmationWindowSize;
private final boolean routeWhenNoConsumers;
@@ -108,6 +110,7 @@
final long retryInterval,
final boolean useDuplicateDetection,
final boolean routeWhenNoConsumers,
+ final int confirmationWindowSize,
final org.hornetq.utils.ExecutorFactory executorFactory,
final HornetQServer server,
final PostOffice postOffice,
@@ -127,6 +130,8 @@
this.useDuplicateDetection = useDuplicateDetection;
this.routeWhenNoConsumers = routeWhenNoConsumers;
+
+ this.confirmationWindowSize = confirmationWindowSize;
this.executorFactory = executorFactory;
@@ -167,6 +172,7 @@
final long retryInterval,
final boolean useDuplicateDetection,
final boolean routeWhenNoConsumers,
+ final int confirmationWindowSize,
final ExecutorFactory executorFactory,
final HornetQServer server,
final PostOffice postOffice,
@@ -198,6 +204,8 @@
this.useDuplicateDetection = useDuplicateDetection;
this.routeWhenNoConsumers = routeWhenNoConsumers;
+
+ this.confirmationWindowSize = confirmationWindowSize;
this.maxHops = maxHops;
@@ -221,12 +229,14 @@
}
started = true;
-
+
if (managementService != null)
{
TypedProperties props = new TypedProperties();
props.putSimpleStringProperty(new SimpleString("name"), name);
- Notification notification = new Notification(nodeUUID.toString(), NotificationType.CLUSTER_CONNECTION_STARTED, props);
+ Notification notification = new Notification(nodeUUID.toString(),
+ NotificationType.CLUSTER_CONNECTION_STARTED,
+ props);
managementService.sendNotification(notification);
}
}
@@ -258,10 +268,12 @@
{
TypedProperties props = new TypedProperties();
props.putSimpleStringProperty(new SimpleString("name"), name);
- Notification notification = new Notification(nodeUUID.toString(), NotificationType.CLUSTER_CONNECTION_STOPPED, props);
+ Notification notification = new Notification(nodeUUID.toString(),
+ NotificationType.CLUSTER_CONNECTION_STOPPED,
+ props);
managementService.sendNotification(notification);
}
-
+
started = false;
}
@@ -274,7 +286,7 @@
{
return name;
}
-
+
public String getNodeID()
{
return nodeUUID.toString();
@@ -283,7 +295,7 @@
public synchronized Map<String, String> getNodes()
{
Map<String, String> nodes = new HashMap<String, String>();
- for (Entry<String, MessageFlowRecord> record : records.entrySet( ))
+ for (Entry<String, MessageFlowRecord> record : records.entrySet())
{
if (record.getValue().getBridge().getForwardingConnection() != null)
{
@@ -292,7 +304,7 @@
}
return nodes;
}
-
+
public synchronized void activate()
{
if (!started)
@@ -428,6 +440,7 @@
-1,
true,
useDuplicateDetection,
+ confirmationWindowSize,
managementService.getManagementAddress(),
managementService.getManagementNotificationAddress(),
managementService.getClusterUser(),
@@ -484,7 +497,7 @@
{
this.bridge = bridge;
}
-
+
public Bridge getBridge()
{
return bridge;
@@ -570,7 +583,7 @@
{
throw new IllegalStateException("proposal type is null");
}
-
+
SimpleString type = message.getSimpleStringProperty(ManagementHelper.HDR_PROPOSAL_GROUP_ID);
SimpleString val = message.getSimpleStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE);
@@ -579,7 +592,7 @@
Response response = server.getGroupingHandler().receive(new Proposal(type, val), hops + 1);
- if(response != null)
+ if (response != null)
{
server.getGroupingHandler().send(response, 0);
}
@@ -634,7 +647,7 @@
{
throw new IllegalStateException("routingName is null");
}
-
+
if (!message.containsProperty(ManagementHelper.HDR_BINDING_ID))
{
throw new IllegalStateException("queueID is null");
@@ -697,7 +710,7 @@
{
throw new IllegalStateException("clusterName is null");
}
-
+
SimpleString clusterName = message.getSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME);
removeBinding(clusterName);
@@ -756,7 +769,7 @@
{
throw new IllegalStateException("distance is null");
}
-
+
if (!message.containsProperty(ManagementHelper.HDR_CLUSTER_NAME))
{
throw new IllegalStateException("clusterName is null");
@@ -782,7 +795,7 @@
// Need to propagate the consumer close
Notification notification = new Notification(null, CONSUMER_CLOSED, message.getProperties());
- managementService.sendNotification(notification);
+ managementService.sendNotification(notification);
}
}
@@ -830,7 +843,7 @@
theBindings.setRouteWhenNoConsumers(routeWhenNoConsumers);
}
- //for testing only
+ // for testing only
public Map<String, MessageFlowRecord> getRecords()
{
return records;
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2009-11-10 17:25:45 UTC (rev 8255)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2009-11-10 20:03:11 UTC (rev 8256)
@@ -37,7 +37,6 @@
import org.hornetq.core.management.ManagementService;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.PostOffice;
-import org.hornetq.core.remoting.Channel;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.cluster.Bridge;
@@ -71,7 +70,7 @@
private final Map<String, ClusterConnection> clusters = new HashMap<String, ClusterConnection>();
private final org.hornetq.utils.ExecutorFactory executorFactory;
-
+
private final HornetQServer server;
private final PostOffice postOffice;
@@ -83,9 +82,9 @@
private final Configuration configuration;
private final UUID nodeUUID;
-
+
private volatile boolean started;
-
+
private boolean backup;
public ClusterManagerImpl(final org.hornetq.utils.ExecutorFactory executorFactory,
@@ -94,16 +93,16 @@
final ScheduledExecutorService scheduledExecutor,
final ManagementService managementService,
final Configuration configuration,
- final UUID nodeUUID,
+ final UUID nodeUUID,
final boolean backup)
{
if (nodeUUID == null)
{
throw new IllegalArgumentException("Node uuid is null");
}
-
+
this.executorFactory = executorFactory;
-
+
this.server = server;
this.postOffice = postOffice;
@@ -115,7 +114,7 @@
this.configuration = configuration;
this.nodeUUID = nodeUUID;
-
+
this.backup = backup;
}
@@ -203,34 +202,34 @@
{
return new HashSet<ClusterConnection>(clusters.values());
}
-
+
public Set<BroadcastGroup> getBroadcastGroups()
{
return new HashSet<BroadcastGroup>(broadcastGroups.values());
}
-
+
public ClusterConnection getClusterConnection(final SimpleString name)
{
- return clusters.get(name.toString());
+ return clusters.get(name.toString());
}
-
+
public synchronized void activate()
- {
- for (BroadcastGroup bg: broadcastGroups.values())
+ {
+ for (BroadcastGroup bg : broadcastGroups.values())
{
bg.activate();
}
-
- for (Bridge bridge: bridges.values())
+
+ for (Bridge bridge : bridges.values())
{
bridge.activate();
}
-
- for (ClusterConnection cc: clusters.values())
+
+ for (ClusterConnection cc : clusters.values())
{
cc.activate();
}
-
+
backup = false;
}
@@ -385,14 +384,16 @@
if (config.getDiscoveryGroupName() != null)
{
- DiscoveryGroupConfiguration discoveryGroupConfiguration = configuration.getDiscoveryGroupConfigurations().get(config.getDiscoveryGroupName());
+ DiscoveryGroupConfiguration discoveryGroupConfiguration = configuration.getDiscoveryGroupConfigurations()
+ .get(config.getDiscoveryGroupName());
if (discoveryGroupConfiguration == null)
{
- log.warn("No discovery group configured with name '" + config.getDiscoveryGroupName() + "'. The bridge will not be deployed.");
+ log.warn("No discovery group configured with name '" + config.getDiscoveryGroupName() +
+ "'. The bridge will not be deployed.");
return;
}
-
+
bridge = new BridgeImpl(nodeUUID,
new SimpleString(config.getName()),
queue,
@@ -409,11 +410,12 @@
config.getReconnectAttempts(),
config.isFailoverOnServerShutdown(),
config.isUseDuplicateDetection(),
+ config.getConfirmationWindowSize(),
managementService.getManagementAddress(),
managementService.getManagementNotificationAddress(),
managementService.getClusterUser(),
managementService.getClusterPassword(),
- null,
+ null,
!backup,
server.getStorageManager());
}
@@ -461,11 +463,12 @@
config.getReconnectAttempts(),
config.isFailoverOnServerShutdown(),
config.isUseDuplicateDetection(),
+ config.getConfirmationWindowSize(),
managementService.getManagementAddress(),
managementService.getManagementNotificationAddress(),
managementService.getClusterUser(),
managementService.getClusterPassword(),
- null,
+ null,
!backup,
server.getStorageManager());
}
@@ -534,17 +537,18 @@
clusterConnection = new ClusterConnectionImpl(new SimpleString(config.getName()),
new SimpleString(config.getAddress()),
- config.getRetryInterval(),
+ config.getRetryInterval(),
config.isDuplicateDetection(),
config.isForwardWhenNoConsumers(),
+ config.getConfirmationWindowSize(),
executorFactory,
- server,
+ server,
postOffice,
managementService,
- scheduledExecutor,
+ scheduledExecutor,
connectors,
config.getMaxHops(),
- nodeUUID,
+ nodeUUID,
backup);
}
else
@@ -559,17 +563,18 @@
clusterConnection = new ClusterConnectionImpl(new SimpleString(config.getName()),
new SimpleString(config.getAddress()),
- config.getRetryInterval(),
+ config.getRetryInterval(),
config.isDuplicateDetection(),
config.isForwardWhenNoConsumers(),
+ config.getConfirmationWindowSize(),
executorFactory,
- server,
+ server,
postOffice,
managementService,
- scheduledExecutor,
+ scheduledExecutor,
dg,
config.getMaxHops(),
- nodeUUID,
+ nodeUUID,
backup);
}
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-11-10 17:25:45 UTC (rev 8255)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-11-10 20:03:11 UTC (rev 8256)
@@ -36,8 +36,6 @@
import org.hornetq.core.list.impl.PriorityLinkedListImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.impl.MessageImpl;
-import org.hornetq.core.paging.PagingManager;
-import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.Bindings;
import org.hornetq.core.postoffice.PostOffice;
@@ -51,7 +49,6 @@
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.cluster.impl.Redistributor;
import org.hornetq.core.settings.HierarchicalRepository;
-import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.core.transaction.TransactionOperation;
@@ -111,12 +108,8 @@
private final Runnable deliverRunner = new DeliverRunner();
- private final PagingManager pagingManager;
-
private final Semaphore lock = new Semaphore(1);
- private volatile PagingStore pagingStore;
-
private final StorageManager storageManager;
private final HierarchicalRepository<AddressSettings> addressSettingsRepository;
@@ -172,15 +165,6 @@
this.scheduledExecutor = scheduledExecutor;
- if (postOffice == null)
- {
- pagingManager = null;
- }
- else
- {
- pagingManager = postOffice.getPagingManager();
- }
-
direct = true;
scheduledDeliveryHandler = new ScheduledDeliveryHandlerImpl(scheduledExecutor);
@@ -1091,12 +1075,14 @@
if (reference == null)
{
nullReferences.add(consumer);
+
if (nullReferences.size() + busyConsumers.size() == totalConsumers)
{
- startDepaging();
// We delivered all the messages - go into direct delivery
direct = true;
+
promptDelivery = false;
+
return;
}
@@ -1133,8 +1119,6 @@
}
}
- initPagingStore(reference.getMessage().getDestination());
-
final SimpleString groupID = reference.getMessage().getSimpleStringProperty(MessageImpl.HDR_GROUP_ID);
if (groupID != null)
@@ -1407,19 +1391,7 @@
queue.deliveringCount.decrementAndGet();
- PagingStore store;
- if (pagingManager != null)
- {
- // TODO: We could optimize this by storing the paging-store for the address on the Queue. We would need to know
- // the Address for the Queue
- store = pagingManager.getPageStore(ref.getMessage().getDestination());
- }
- else
- {
- store = null;
- }
-
- message.decrementRefCount(store, ref);
+ message.decrementRefCount(ref);
}
void postRollback(final LinkedList<MessageReference> refs) throws Exception
@@ -1437,42 +1409,6 @@
}
}
- private synchronized void initPagingStore(final SimpleString destination)
- {
- // PagingManager would be null only on testcases
- if (pagingStore == null && pagingManager != null)
- {
- // TODO: It would be better if we could initialize the pagingStore during the construction
- try
- {
- pagingStore = pagingManager.getPageStore(destination);
- }
- catch (Exception e)
- {
- // This shouldn't happen, and if it happens, this shouldn't abort the route
- }
- }
- }
-
- private synchronized void startDepaging()
- {
- if (pagingStore != null)
- {
- // If the queue is empty, we need to check if there are pending messages, and throw a warning
- if (pagingStore.isPaging() && pagingStore.getAddressFullMessagePolicy() == AddressFullMessagePolicy.PAGE)
- {
- // This is just a *request* to depage. Depage will only happens if there is space on the Address
- // and GlobalSize
- pagingStore.startDepaging();
-
- log.warn("The Queue " + name +
- " is empty, however there are pending messages on Paging for the address " +
- pagingStore.getStoreName() +
- " waiting message ACK before they could be routed");
- }
- }
- }
-
// Inner classes
// --------------------------------------------------------------------------
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2009-11-10 17:25:45 UTC (rev 8255)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2009-11-10 20:03:11 UTC (rev 8256)
@@ -102,7 +102,7 @@
return ref;
}
- public int incrementRefCount(final PagingStore pagingStore, final MessageReference reference) throws Exception
+ public int incrementRefCount(final MessageReference reference) throws Exception
{
int count = refCount.incrementAndGet();
@@ -119,7 +119,7 @@
return count;
}
- public int decrementRefCount(final PagingStore pagingStore, final MessageReference reference) throws Exception
+ public int decrementRefCount(final MessageReference reference) throws Exception
{
int count = refCount.decrementAndGet();
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-11-10 17:25:45 UTC (rev 8255)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-11-10 20:03:11 UTC (rev 8256)
@@ -387,7 +387,7 @@
if (binding == null || binding.getType() != BindingType.LOCAL_QUEUE)
{
- throw new HornetQException(HornetQException.QUEUE_DOES_NOT_EXIST, "Binding " + name + " does not exist");
+ throw new HornetQException(HornetQException.QUEUE_DOES_NOT_EXIST, "Queue " + name + " does not exist");
}
securityStore.check(binding.getAddress(), CheckType.CONSUME, this);
Modified: trunk/src/main/org/hornetq/utils/XMLUtil.java
===================================================================
--- trunk/src/main/org/hornetq/utils/XMLUtil.java 2009-11-10 17:25:45 UTC (rev 8255)
+++ trunk/src/main/org/hornetq/utils/XMLUtil.java 2009-11-10 20:03:11 UTC (rev 8256)
@@ -493,6 +493,8 @@
}
catch (SAXException e)
{
+ log.error("Invalid configuration", e);
+
throw new IllegalStateException("Invalid configuration", e);
}
}
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java 2009-11-10 17:25:45 UTC (rev 8255)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java 2009-11-10 20:03:11 UTC (rev 8256)
@@ -93,6 +93,7 @@
final long retryInterval = 50;
final double retryIntervalMultiplier = 1d;
final int reconnectAttempts = 1;
+ final int confirmationWindowSize = 1024;
Pair<String, String> connectorPair = new Pair<String, String>(server1tc.getName(), server2tc.getName());
@@ -106,6 +107,7 @@
reconnectAttempts,
true,
false,
+ confirmationWindowSize,
connectorPair);
List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
@@ -219,6 +221,7 @@
final long retryInterval = 50;
final double retryIntervalMultiplier = 1d;
final int reconnectAttempts = 3;
+ final int confirmationWindowSize = 1024;
Pair<String, String> connectorPair = new Pair<String, String>(server1tc.getName(), server2tc.getName());
@@ -232,6 +235,7 @@
reconnectAttempts,
true,
false,
+ confirmationWindowSize,
connectorPair);
List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
@@ -338,6 +342,7 @@
final long retryInterval = 50;
final double retryIntervalMultiplier = 1d;
final int reconnectAttempts = 3;
+ final int confirmationWindowSize = 1024;
Pair<String, String> connectorPair = new Pair<String, String>(server1tc.getName(), null);
@@ -351,6 +356,7 @@
reconnectAttempts,
true,
false,
+ confirmationWindowSize,
connectorPair);
List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
@@ -452,6 +458,7 @@
final long retryInterval = 50;
final double retryIntervalMultiplier = 1d;
final int reconnectAttempts = -1;
+ final int confirmationWindowSize = 1024;
Pair<String, String> connectorPair = new Pair<String, String>(server1tc.getName(), null);
@@ -465,6 +472,7 @@
reconnectAttempts,
true,
false,
+ confirmationWindowSize,
connectorPair);
List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
@@ -513,7 +521,7 @@
prod0.send(message);
}
-
+
log.info("sent messages");
for (int i = 0; i < numMessages; i++)
@@ -523,7 +531,7 @@
assertEquals(i, r1.getObjectProperty(propKey));
log.info("got message " + r1.getObjectProperty(propKey));
}
-
+
log.info("got messages");
session0.close();
@@ -566,6 +574,7 @@
final long retryInterval = 50;
final double retryIntervalMultiplier = 1d;
final int reconnectAttempts = 3;
+ final int confirmationWindowSize = 1024;
Pair<String, String> connectorPair = new Pair<String, String>(server1tc.getName(), null);
@@ -579,6 +588,7 @@
reconnectAttempts,
true,
false,
+ confirmationWindowSize,
connectorPair);
List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java 2009-11-10 17:25:45 UTC (rev 8255)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java 2009-11-10 20:03:11 UTC (rev 8256)
@@ -86,6 +86,7 @@
0,
true,
true,
+ 1024,
connectorPair);
List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
@@ -229,6 +230,7 @@
-1,
true,
true,
+ 1024,
connectorPair);
List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
@@ -419,6 +421,7 @@
0,
false,
false,
+ 1024,
connectorPair);
List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
@@ -547,6 +550,7 @@
1,
false,
true,
+ 1024,
connectorPair);
List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2009-11-10 17:25:45 UTC (rev 8255)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2009-11-10 20:03:11 UTC (rev 8256)
@@ -76,7 +76,6 @@
try
{
-
Map<String, Object> server0Params = new HashMap<String, Object>();
server0 = createClusteredServerWithParams(0, useFiles, server0Params);
@@ -101,6 +100,10 @@
Pair<String, String> connectorPair = new Pair<String, String>(server1tc.getName(), null);
+ final int messageSize = 1024;
+
+ final int numMessages = 10;
+
BridgeConfiguration bridgeConfiguration = new BridgeConfiguration("bridge1",
queueName0,
forwardAddress,
@@ -111,6 +114,9 @@
-1,
true,
false,
+ // Choose confirmation size to make sure acks
+ // are sent
+ numMessages * messageSize / 2,
connectorPair);
List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
@@ -144,7 +150,7 @@
session1.start();
- final int numMessages = 10;
+ final byte[] bytes = new byte[messageSize];
final SimpleString propKey = new SimpleString("testkey");
@@ -159,6 +165,8 @@
message.putIntProperty(propKey, i);
+ message.getBody().writeBytes(bytes);
+
producer0.send(message);
}
@@ -286,6 +294,7 @@
-1,
true,
false,
+ 1024,
connectorPair);
List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
@@ -332,7 +341,7 @@
message.putIntProperty(propKey, i);
message.putStringProperty(selectorKey, new SimpleString("monkey"));
-
+
if (largeMessage)
{
message.setBodyInputStream(createFakeLargeStream(1024 * 1024));
@@ -368,7 +377,7 @@
assertEquals((Integer)i, (Integer)message.getObjectProperty(propKey));
message.acknowledge();
-
+
if (largeMessage)
{
readMessages(message);
@@ -407,7 +416,7 @@
}
}
-
+
public void testWithTransformer() throws Exception
{
internaltestWithTransformer(false);
@@ -453,6 +462,7 @@
-1,
true,
false,
+ 1024,
connectorPair);
List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
@@ -497,7 +507,7 @@
message.putStringProperty(propKey, new SimpleString("bing"));
message.getBody().writeString("doo be doo be doo be doo");
-
+
producer0.send(message);
}
@@ -516,8 +526,7 @@
assertEquals("dee be dee be dee be dee", sval);
message.acknowledge();
-
-
+
}
assertNull(consumer1.receiveImmediate());
@@ -534,10 +543,8 @@
server1.stop();
}
-
- // https://jira.jboss.org/jira/browse/HORNETQ-182
- public void disabled_testBridgeWithPaging() throws Exception
+ public void testBridgeWithPaging() throws Exception
{
HornetQServer server0 = null;
HornetQServer server1 = null;
@@ -583,6 +590,7 @@
-1,
true,
false,
+ 1024,
connectorPair);
List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
@@ -623,7 +631,7 @@
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = session0.createClientMessage(false);
-
+
message.setBody(ChannelBuffers.wrappedBuffer(new byte[1024]));
message.putIntProperty(propKey, i);
@@ -674,13 +682,12 @@
}
-
protected void setUp() throws Exception
{
super.setUp();
clearData();
}
-
+
protected void tearDown() throws Exception
{
clearData();
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithDiscoveryGroupStartTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithDiscoveryGroupStartTest.java 2009-11-10 17:25:45 UTC (rev 8255)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithDiscoveryGroupStartTest.java 2009-11-10 20:03:11 UTC (rev 8256)
@@ -99,6 +99,7 @@
0,
true,
true,
+ 1024,
"dg1");
List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2009-11-10 17:25:45 UTC (rev 8255)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2009-11-10 20:03:11 UTC (rev 8256)
@@ -1407,6 +1407,7 @@
true,
forwardWhenNoConsumers,
maxHops,
+ 1024,
pairs);
serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
}
@@ -1458,6 +1459,7 @@
true,
forwardWhenNoConsumers,
maxHops,
+ 1024,
pairs);
serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
@@ -1526,6 +1528,7 @@
true,
forwardWhenNoConsumers,
maxHops,
+ 1024,
pairs);
serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
@@ -1552,6 +1555,7 @@
true,
forwardWhenNoConsumers,
maxHops,
+ 1024,
discoveryGroupName);
List<ClusterConnectionConfiguration> clusterConfs = server.getConfiguration().getClusterConfigurations();
Modified: trunk/tests/src/org/hornetq/tests/integration/management/BridgeControlTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/BridgeControlTest.java 2009-11-10 17:25:45 UTC (rev 8255)
+++ trunk/tests/src/org/hornetq/tests/integration/management/BridgeControlTest.java 2009-11-10 20:03:11 UTC (rev 8256)
@@ -162,6 +162,7 @@
randomPositiveInt(),
randomBoolean(),
randomBoolean(),
+ randomPositiveInt(),
connectorPair);
Configuration conf_1 = new ConfigurationImpl();
Modified: trunk/tests/src/org/hornetq/tests/integration/management/BridgeControlUsingCoreTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/BridgeControlUsingCoreTest.java 2009-11-10 17:25:45 UTC (rev 8255)
+++ trunk/tests/src/org/hornetq/tests/integration/management/BridgeControlUsingCoreTest.java 2009-11-10 20:03:11 UTC (rev 8256)
@@ -61,7 +61,7 @@
private BridgeConfiguration bridgeConfig;
private HornetQServer server_1;
-
+
private ClientSession session;
// Constructors --------------------------------------------------
@@ -79,10 +79,14 @@
assertEquals(bridgeConfig.getForwardingAddress(), (String)proxy.retrieveAttributeValue("forwardingAddress"));
assertEquals(bridgeConfig.getFilterString(), (String)proxy.retrieveAttributeValue("filterString"));
assertEquals(bridgeConfig.getRetryInterval(), ((Long)proxy.retrieveAttributeValue("retryInterval")).longValue());
- assertEquals(bridgeConfig.getRetryIntervalMultiplier(), (Double)proxy.retrieveAttributeValue("retryIntervalMultiplier"));
- assertEquals(bridgeConfig.getReconnectAttempts(), ((Integer)proxy.retrieveAttributeValue("reconnectAttempts")).intValue());
- assertEquals(bridgeConfig.isFailoverOnServerShutdown(), ((Boolean)proxy.retrieveAttributeValue("failoverOnServerShutdown")).booleanValue());
- assertEquals(bridgeConfig.isUseDuplicateDetection(), ((Boolean)proxy.retrieveAttributeValue("useDuplicateDetection")).booleanValue());
+ assertEquals(bridgeConfig.getRetryIntervalMultiplier(),
+ (Double)proxy.retrieveAttributeValue("retryIntervalMultiplier"));
+ assertEquals(bridgeConfig.getReconnectAttempts(),
+ ((Integer)proxy.retrieveAttributeValue("reconnectAttempts")).intValue());
+ assertEquals(bridgeConfig.isFailoverOnServerShutdown(),
+ ((Boolean)proxy.retrieveAttributeValue("failoverOnServerShutdown")).booleanValue());
+ assertEquals(bridgeConfig.isUseDuplicateDetection(),
+ ((Boolean)proxy.retrieveAttributeValue("useDuplicateDetection")).booleanValue());
Object[] data = (Object[])proxy.retrieveAttributeValue("connectorPair");
assertEquals(bridgeConfig.getConnectorPair().a, data[0]);
@@ -99,10 +103,10 @@
// started by the server
assertTrue((Boolean)proxy.retrieveAttributeValue("Started"));
- proxy.invokeOperation("stop");
+ proxy.invokeOperation("stop");
assertFalse((Boolean)proxy.retrieveAttributeValue("Started"));
- proxy.invokeOperation("start");
+ proxy.invokeOperation("start");
assertTrue((Boolean)proxy.retrieveAttributeValue("Started"));
}
@@ -138,6 +142,7 @@
randomPositiveInt(),
randomBoolean(),
randomBoolean(),
+ randomPositiveInt(),
connectorPair);
Configuration conf_1 = new ConfigurationImpl();
@@ -175,20 +180,18 @@
server_1.stop();
session = null;
-
+
server_0 = null;
-
+
server_1 = null;
-
-
+
super.tearDown();
}
-
+
protected CoreMessagingProxy createProxy(final String name) throws Exception
{
- CoreMessagingProxy proxy = new CoreMessagingProxy(session,
- ResourceNames.CORE_BRIDGE + name);
-
+ CoreMessagingProxy proxy = new CoreMessagingProxy(session, ResourceNames.CORE_BRIDGE + name);
+
return proxy;
}
Modified: trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControl2Test.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControl2Test.java 2009-11-10 17:25:45 UTC (rev 8255)
+++ trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControl2Test.java 2009-11-10 20:03:11 UTC (rev 8256)
@@ -136,6 +136,7 @@
false,
false,
1,
+ 1024,
discoveryName);
List<Pair<String, String>> connectorInfos = new ArrayList<Pair<String, String>>();
connectorInfos.add(new Pair<String, String>("netty", null));
Modified: trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControlTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControlTest.java 2009-11-10 17:25:45 UTC (rev 8255)
+++ trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControlTest.java 2009-11-10 20:03:11 UTC (rev 8256)
@@ -204,6 +204,7 @@
randomBoolean(),
randomBoolean(),
randomPositiveInt(),
+ randomPositiveInt(),
pairs);
clusterConnectionConfig2 = new ClusterConnectionConfiguration(randomString(),
@@ -212,6 +213,7 @@
randomBoolean(),
randomBoolean(),
randomPositiveInt(),
+ randomPositiveInt(),
randomString());
Configuration conf_1 = new ConfigurationImpl();
Modified: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2009-11-10 17:25:45 UTC (rev 8255)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2009-11-10 20:03:11 UTC (rev 8256)
@@ -1125,6 +1125,18 @@
return null;
}
+ public int decrementRefCount(MessageReference reference) throws Exception
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ public int incrementRefCount(MessageReference reference) throws Exception
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
}
class FakeFilter implements Filter
15 years, 2 months
JBoss hornetq SVN: r8255 - in trunk/tests/src/org/hornetq/tests: stress/journal and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-10 12:25:45 -0500 (Tue, 10 Nov 2009)
New Revision: 8255
Added:
trunk/tests/src/org/hornetq/tests/stress/journal/CompactingStressTest.java
Removed:
trunk/tests/src/org/hornetq/tests/integration/client/CompactingTest.java
Log:
Moving CompactingTest as CompactingStressTest (where it really belongs to).
This test can't just use less iterations, as compacting wouldn't be called otherwise. This is really a stress test.
Deleted: trunk/tests/src/org/hornetq/tests/integration/client/CompactingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/CompactingTest.java 2009-11-09 21:20:59 UTC (rev 8254)
+++ trunk/tests/src/org/hornetq/tests/integration/client/CompactingTest.java 2009-11-10 17:25:45 UTC (rev 8255)
@@ -1,509 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.tests.integration.client;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
-import org.hornetq.core.buffers.ChannelBuffers;
-import org.hornetq.core.client.ClientConsumer;
-import org.hornetq.core.client.ClientMessage;
-import org.hornetq.core.client.ClientProducer;
-import org.hornetq.core.client.ClientSession;
-import org.hornetq.core.client.ClientSessionFactory;
-import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
-import org.hornetq.core.exception.HornetQException;
-import org.hornetq.core.message.Message;
-import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.JournalType;
-import org.hornetq.tests.util.ServiceTestBase;
-
-/**
- * A CompactingTest
- *
- * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- *
- *
- */
-public class CompactingTest extends ServiceTestBase
-{
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private static final String AD1 = "ad1";
-
- private static final String AD2 = "ad2";
-
- private static final String AD3 = "ad3";
-
- private static final String Q1 = "q1";
-
- private static final String Q2 = "q2";
-
- private static final String Q3 = "q3";
-
- private static final int TOT_AD3 = 5000;
-
- private HornetQServer server;
-
- private ClientSessionFactory sf;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- public void testCleanupAIO() throws Throwable
- {
- if (AsynchronousFileImpl.isLoaded())
- {
- for (int i = 0; i < 3; i++)
- {
- System.out.println("Test # " + i);
- internalTestCleanup(JournalType.ASYNCIO);
- tearDown();
- setUp();
- }
- }
- }
-
- public void testCleanupNIO() throws Throwable
- {
- for (int i = 0; i < 3; i++)
- {
- System.out.println("Test # " + i);
- internalTestCleanup(JournalType.NIO);
- tearDown();
- setUp();
- }
- }
-
- private void internalTestCleanup(JournalType journalType) throws Throwable
- {
- setupServer(journalType);
-
- ClientSession session = sf.createSession(false, true, true);
-
- ClientProducer prod = session.createProducer(AD1);
-
- for (int i = 0; i < 500; i++)
- {
- prod.send(session.createClientMessage(true));
- }
-
- session.commit();
-
- prod.close();
-
- ClientConsumer cons = session.createConsumer(Q2);
- prod = session.createProducer(AD2);
-
- session.start();
-
- for (int i = 0; i < 200; i++)
- {
- System.out.println("Iteration " + i);
- // Sending non transactionally, so it would test non transactional stuff on the journal
- for (int j = 0; j < 1000; j++)
- {
- Message msg = session.createClientMessage(true);
- msg.getBody().writeBytes(new byte[1024]);
-
- prod.send(msg);
- }
-
- // I need to guarantee a roundtrip to the server, to make sure everything is persisted
- session.commit();
-
- for (int j = 0; j < 1000; j++)
- {
- ClientMessage msg = cons.receive(2000);
- assertNotNull(msg);
- msg.acknowledge();
- }
-
- // I need to guarantee a roundtrip to the server, to make sure everything is persisted
- session.commit();
-
- }
-
- assertNull(cons.receiveImmediate());
-
- session.close();
-
- server.stop();
-
- server.start();
-
- session = sf.createSession(false, true, true);
- cons = session.createConsumer(Q1);
- session.start();
-
- for (int i = 0; i < 500; i++)
- {
- ClientMessage msg = cons.receive(1000);
- assertNotNull(msg);
- msg.acknowledge();
- }
-
- assertNull(cons.receiveImmediate());
-
- prod = session.createProducer(AD2);
-
- session.close();
-
- }
-
- public void testMultiProducerAndCompactAIO() throws Throwable
- {
- internalTestMultiProducer(JournalType.ASYNCIO);
- }
-
- public void testMultiProducerAndCompactNIO() throws Throwable
- {
- internalTestMultiProducer(JournalType.NIO);
- }
-
- public void internalTestMultiProducer(JournalType journalType) throws Throwable
- {
-
- setupServer(journalType);
-
- ClientSession session = sf.createSession(false, false);
-
- try
- {
- ClientProducer producer = session.createProducer(AD3);
-
- byte[] buffer = new byte[10 * 1024];
-
- ClientMessage msg = session.createClientMessage(true);
- msg.setBody(ChannelBuffers.wrappedBuffer(buffer));
- for (int i = 0; i < TOT_AD3; i++)
- {
- producer.send(msg);
- if (i % 100 == 0)
- {
- session.commit();
- }
- }
-
- session.commit();
- }
- finally
- {
- session.close();
- }
-
- server.stop();
-
- setupServer(journalType);
-
- final AtomicInteger numberOfMessages = new AtomicInteger(0);
- final int NUMBER_OF_FAST_MESSAGES = 100000;
- final int SLOW_INTERVAL = 100;
-
- final CountDownLatch latchReady = new CountDownLatch(2);
- final CountDownLatch latchStart = new CountDownLatch(1);
-
- class FastProducer extends Thread
- {
- Throwable e;
-
- FastProducer()
- {
- super("Fast-Thread");
- }
-
- public void run()
- {
- ClientSession session = null;
- ClientSession sessionSlow = null;
- latchReady.countDown();
- try
- {
- latchStart.await();
- session = sf.createSession(true, true);
- sessionSlow = sf.createSession(false, false);
- ClientProducer prod = session.createProducer(AD2);
- ClientProducer slowProd = sessionSlow.createProducer(AD1);
- for (int i = 0; i < NUMBER_OF_FAST_MESSAGES; i++)
- {
- if (i % SLOW_INTERVAL == 0)
- {
- if (numberOfMessages.incrementAndGet() % 5 == 0)
- {
- sessionSlow.commit();
- }
- slowProd.send(session.createClientMessage(true));
- }
- ClientMessage msg = session.createClientMessage(true);
- msg.setBody(ChannelBuffers.wrappedBuffer(new byte[1024]));
- prod.send(msg);
- }
- sessionSlow.commit();
- }
- catch (Throwable e)
- {
- this.e = e;
- }
- finally
- {
- try
- {
- session.close();
- }
- catch (Throwable e)
- {
- this.e = e;
- }
- try
- {
- sessionSlow.close();
- }
- catch (Throwable e)
- {
- this.e = e;
- }
- }
- }
- }
-
- class FastConsumer extends Thread
- {
- Throwable e;
-
- FastConsumer()
- {
- super("Fast-Consumer");
- }
-
- public void run()
- {
- ClientSession session = null;
- latchReady.countDown();
- try
- {
- latchStart.await();
- session = sf.createSession(true, true);
- session.start();
- ClientConsumer cons = session.createConsumer(Q2);
- for (int i = 0; i < NUMBER_OF_FAST_MESSAGES; i++)
- {
- ClientMessage msg = cons.receive(60 * 1000);
- msg.acknowledge();
- }
-
- assertNull(cons.receiveImmediate());
- }
- catch (Throwable e)
- {
- this.e = e;
- }
- finally
- {
- try
- {
- session.close();
- }
- catch (Throwable e)
- {
- this.e = e;
- }
- }
- }
- }
-
- FastConsumer f1 = new FastConsumer();
- f1.start();
-
- FastProducer p1 = new FastProducer();
- p1.start();
-
- latchReady.await();
- latchStart.countDown();
-
- p1.join();
-
- if (p1.e != null)
- {
- throw p1.e;
- }
-
- f1.join();
-
- if (f1.e != null)
- {
- throw f1.e;
- }
-
- sf.close();
-
- server.stop();
-
- setupServer(journalType);
-
- ClientSession sess = null;
-
- try
- {
-
- sess = sf.createSession(true, true);
-
- ClientConsumer cons = sess.createConsumer(Q1);
-
- sess.start();
-
- for (int i = 0; i < numberOfMessages.intValue(); i++)
- {
- ClientMessage msg = cons.receive(60000);
- assertNotNull(msg);
- msg.acknowledge();
- }
-
- assertNull(cons.receiveImmediate());
-
- cons.close();
-
- cons = sess.createConsumer(Q2);
-
- assertNull(cons.receiveImmediate());
-
- cons.close();
-
- cons = sess.createConsumer(Q3);
-
- for (int i = 0; i < TOT_AD3; i++)
- {
- ClientMessage msg = cons.receive(60000);
- assertNotNull(msg);
- msg.acknowledge();
- }
-
- assertNull(cons.receiveImmediate());
-
- }
- finally
- {
- try
- {
- sess.close();
- }
- catch (Throwable ignored)
- {
- }
- }
- }
-
- @Override
- protected void setUp() throws Exception
- {
- super.setUp();
-
- clearData();
- }
-
- /**
- * @throws Exception
- * @throws HornetQException
- */
- private void setupServer(JournalType journalType) throws Exception, HornetQException
- {
- Configuration config = createDefaultConfig();
- config.setJournalSyncNonTransactional(false);
- config.setJournalFileSize(ConfigurationImpl.DEFAULT_JOURNAL_FILE_SIZE);
-
- config.setJournalType(journalType);
-
- config.setJournalCompactMinFiles(10);
- config.setJournalCompactPercentage(50);
-
- server = createServer(true, config);
-
- server.start();
-
- sf = createInVMFactory();
- sf.setBlockOnPersistentSend(false);
- sf.setBlockOnAcknowledge(false);
-
- ClientSession sess = sf.createSession();
-
- try
- {
- sess.createQueue(AD1, Q1, true);
- }
- catch (Exception ignored)
- {
- }
-
- try
- {
- sess.createQueue(AD2, Q2, true);
- }
- catch (Exception ignored)
- {
- }
-
- try
- {
- sess.createQueue(AD3, Q3, true);
- }
- catch (Exception ignored)
- {
- }
-
- sess.close();
- }
-
- @Override
- protected void tearDown() throws Exception
- {
- try
- {
- if (sf != null)
- {
- sf.close();
- }
-
- if (server != null)
- {
- server.stop();
- }
- }
- catch (Exception e)
- {
- e.printStackTrace(); // system.out -> junit reports
- }
-
- server = null;
-
- sf = null;
-
- super.tearDown();
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
Copied: trunk/tests/src/org/hornetq/tests/stress/journal/CompactingStressTest.java (from rev 8254, trunk/tests/src/org/hornetq/tests/integration/client/CompactingTest.java)
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/journal/CompactingStressTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/stress/journal/CompactingStressTest.java 2009-11-10 17:25:45 UTC (rev 8255)
@@ -0,0 +1,501 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.stress.journal;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
+import org.hornetq.core.buffers.ChannelBuffers;
+import org.hornetq.core.client.ClientConsumer;
+import org.hornetq.core.client.ClientMessage;
+import org.hornetq.core.client.ClientProducer;
+import org.hornetq.core.client.ClientSession;
+import org.hornetq.core.client.ClientSessionFactory;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.message.Message;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.JournalType;
+import org.hornetq.tests.util.ServiceTestBase;
+
+/**
+ * A CompactingTest
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class CompactingStressTest extends ServiceTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private static final String AD1 = "ad1";
+
+ private static final String AD2 = "ad2";
+
+ private static final String AD3 = "ad3";
+
+ private static final String Q1 = "q1";
+
+ private static final String Q2 = "q2";
+
+ private static final String Q3 = "q3";
+
+ private static final int TOT_AD3 = 5000;
+
+ private HornetQServer server;
+
+ private ClientSessionFactory sf;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testCleanupAIO() throws Throwable
+ {
+ if (AsynchronousFileImpl.isLoaded())
+ {
+ internalTestCleanup(JournalType.ASYNCIO);
+ tearDown();
+ setUp();
+ }
+ }
+
+ public void testCleanupNIO() throws Throwable
+ {
+ internalTestCleanup(JournalType.NIO);
+ tearDown();
+ setUp();
+ }
+
+ private void internalTestCleanup(JournalType journalType) throws Throwable
+ {
+ setupServer(journalType);
+
+ ClientSession session = sf.createSession(false, true, true);
+
+ ClientProducer prod = session.createProducer(AD1);
+
+ for (int i = 0; i < 500; i++)
+ {
+ prod.send(session.createClientMessage(true));
+ }
+
+ session.commit();
+
+ prod.close();
+
+ ClientConsumer cons = session.createConsumer(Q2);
+ prod = session.createProducer(AD2);
+
+ session.start();
+
+ for (int i = 0; i < 200; i++)
+ {
+ System.out.println("Iteration " + i);
+ // Sending non transactionally, so it would test non transactional stuff on the journal
+ for (int j = 0; j < 1000; j++)
+ {
+ Message msg = session.createClientMessage(true);
+ msg.getBody().writeBytes(new byte[1024]);
+
+ prod.send(msg);
+ }
+
+ // I need to guarantee a roundtrip to the server, to make sure everything is persisted
+ session.commit();
+
+ for (int j = 0; j < 1000; j++)
+ {
+ ClientMessage msg = cons.receive(2000);
+ assertNotNull(msg);
+ msg.acknowledge();
+ }
+
+ // I need to guarantee a roundtrip to the server, to make sure everything is persisted
+ session.commit();
+
+ }
+
+ assertNull(cons.receiveImmediate());
+
+ session.close();
+
+ server.stop();
+
+ server.start();
+
+ session = sf.createSession(false, true, true);
+ cons = session.createConsumer(Q1);
+ session.start();
+
+ for (int i = 0; i < 500; i++)
+ {
+ ClientMessage msg = cons.receive(1000);
+ assertNotNull(msg);
+ msg.acknowledge();
+ }
+
+ assertNull(cons.receiveImmediate());
+
+ prod = session.createProducer(AD2);
+
+ session.close();
+
+ }
+
+ public void testMultiProducerAndCompactAIO() throws Throwable
+ {
+ internalTestMultiProducer(JournalType.ASYNCIO);
+ }
+
+ public void testMultiProducerAndCompactNIO() throws Throwable
+ {
+ internalTestMultiProducer(JournalType.NIO);
+ }
+
+ public void internalTestMultiProducer(JournalType journalType) throws Throwable
+ {
+
+ setupServer(journalType);
+
+ ClientSession session = sf.createSession(false, false);
+
+ try
+ {
+ ClientProducer producer = session.createProducer(AD3);
+
+ byte[] buffer = new byte[10 * 1024];
+
+ ClientMessage msg = session.createClientMessage(true);
+ msg.setBody(ChannelBuffers.wrappedBuffer(buffer));
+ for (int i = 0; i < TOT_AD3; i++)
+ {
+ producer.send(msg);
+ if (i % 100 == 0)
+ {
+ session.commit();
+ }
+ }
+
+ session.commit();
+ }
+ finally
+ {
+ session.close();
+ }
+
+ server.stop();
+
+ setupServer(journalType);
+
+ final AtomicInteger numberOfMessages = new AtomicInteger(0);
+ final int NUMBER_OF_FAST_MESSAGES = 100000;
+ final int SLOW_INTERVAL = 100;
+
+ final CountDownLatch latchReady = new CountDownLatch(2);
+ final CountDownLatch latchStart = new CountDownLatch(1);
+
+ class FastProducer extends Thread
+ {
+ Throwable e;
+
+ FastProducer()
+ {
+ super("Fast-Thread");
+ }
+
+ public void run()
+ {
+ ClientSession session = null;
+ ClientSession sessionSlow = null;
+ latchReady.countDown();
+ try
+ {
+ latchStart.await();
+ session = sf.createSession(true, true);
+ sessionSlow = sf.createSession(false, false);
+ ClientProducer prod = session.createProducer(AD2);
+ ClientProducer slowProd = sessionSlow.createProducer(AD1);
+ for (int i = 0; i < NUMBER_OF_FAST_MESSAGES; i++)
+ {
+ if (i % SLOW_INTERVAL == 0)
+ {
+ if (numberOfMessages.incrementAndGet() % 5 == 0)
+ {
+ sessionSlow.commit();
+ }
+ slowProd.send(session.createClientMessage(true));
+ }
+ ClientMessage msg = session.createClientMessage(true);
+ msg.setBody(ChannelBuffers.wrappedBuffer(new byte[1024]));
+ prod.send(msg);
+ }
+ sessionSlow.commit();
+ }
+ catch (Throwable e)
+ {
+ this.e = e;
+ }
+ finally
+ {
+ try
+ {
+ session.close();
+ }
+ catch (Throwable e)
+ {
+ this.e = e;
+ }
+ try
+ {
+ sessionSlow.close();
+ }
+ catch (Throwable e)
+ {
+ this.e = e;
+ }
+ }
+ }
+ }
+
+ class FastConsumer extends Thread
+ {
+ Throwable e;
+
+ FastConsumer()
+ {
+ super("Fast-Consumer");
+ }
+
+ public void run()
+ {
+ ClientSession session = null;
+ latchReady.countDown();
+ try
+ {
+ latchStart.await();
+ session = sf.createSession(true, true);
+ session.start();
+ ClientConsumer cons = session.createConsumer(Q2);
+ for (int i = 0; i < NUMBER_OF_FAST_MESSAGES; i++)
+ {
+ ClientMessage msg = cons.receive(60 * 1000);
+ msg.acknowledge();
+ }
+
+ assertNull(cons.receiveImmediate());
+ }
+ catch (Throwable e)
+ {
+ this.e = e;
+ }
+ finally
+ {
+ try
+ {
+ session.close();
+ }
+ catch (Throwable e)
+ {
+ this.e = e;
+ }
+ }
+ }
+ }
+
+ FastConsumer f1 = new FastConsumer();
+ f1.start();
+
+ FastProducer p1 = new FastProducer();
+ p1.start();
+
+ latchReady.await();
+ latchStart.countDown();
+
+ p1.join();
+
+ if (p1.e != null)
+ {
+ throw p1.e;
+ }
+
+ f1.join();
+
+ if (f1.e != null)
+ {
+ throw f1.e;
+ }
+
+ sf.close();
+
+ server.stop();
+
+ setupServer(journalType);
+
+ ClientSession sess = null;
+
+ try
+ {
+
+ sess = sf.createSession(true, true);
+
+ ClientConsumer cons = sess.createConsumer(Q1);
+
+ sess.start();
+
+ for (int i = 0; i < numberOfMessages.intValue(); i++)
+ {
+ ClientMessage msg = cons.receive(60000);
+ assertNotNull(msg);
+ msg.acknowledge();
+ }
+
+ assertNull(cons.receiveImmediate());
+
+ cons.close();
+
+ cons = sess.createConsumer(Q2);
+
+ assertNull(cons.receiveImmediate());
+
+ cons.close();
+
+ cons = sess.createConsumer(Q3);
+
+ for (int i = 0; i < TOT_AD3; i++)
+ {
+ ClientMessage msg = cons.receive(60000);
+ assertNotNull(msg);
+ msg.acknowledge();
+ }
+
+ assertNull(cons.receiveImmediate());
+
+ }
+ finally
+ {
+ try
+ {
+ sess.close();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ clearData();
+ }
+
+ /**
+ * @throws Exception
+ * @throws HornetQException
+ */
+ private void setupServer(JournalType journalType) throws Exception, HornetQException
+ {
+ Configuration config = createDefaultConfig();
+ config.setJournalSyncNonTransactional(false);
+ config.setJournalFileSize(ConfigurationImpl.DEFAULT_JOURNAL_FILE_SIZE);
+
+ config.setJournalType(journalType);
+
+ config.setJournalCompactMinFiles(10);
+ config.setJournalCompactPercentage(50);
+
+ server = createServer(true, config);
+
+ server.start();
+
+ sf = createInVMFactory();
+ sf.setBlockOnPersistentSend(false);
+ sf.setBlockOnAcknowledge(false);
+
+ ClientSession sess = sf.createSession();
+
+ try
+ {
+ sess.createQueue(AD1, Q1, true);
+ }
+ catch (Exception ignored)
+ {
+ }
+
+ try
+ {
+ sess.createQueue(AD2, Q2, true);
+ }
+ catch (Exception ignored)
+ {
+ }
+
+ try
+ {
+ sess.createQueue(AD3, Q3, true);
+ }
+ catch (Exception ignored)
+ {
+ }
+
+ sess.close();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ try
+ {
+ if (sf != null)
+ {
+ sf.close();
+ }
+
+ if (server != null)
+ {
+ server.stop();
+ }
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace(); // system.out -> junit reports
+ }
+
+ server = null;
+
+ sf = null;
+
+ super.tearDown();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
15 years, 2 months
JBoss hornetq SVN: r8254 - in trunk: src/main/org/hornetq/core/server/impl and 17 other directories.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-11-09 16:20:59 -0500 (Mon, 09 Nov 2009)
New Revision: 8254
Modified:
trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
trunk/tests/src/org/hornetq/tests/integration/DuplicateDetectionTest.java
trunk/tests/src/org/hornetq/tests/integration/InterceptorTest.java
trunk/tests/src/org/hornetq/tests/integration/client/CompactingTest.java
trunk/tests/src/org/hornetq/tests/integration/client/DeadLetterAddressTest.java
trunk/tests/src/org/hornetq/tests/integration/client/ExpiryAddressTest.java
trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
trunk/tests/src/org/hornetq/tests/integration/client/MessageDurabilityTest.java
trunk/tests/src/org/hornetq/tests/integration/client/MessageExpirationTest.java
trunk/tests/src/org/hornetq/tests/integration/client/MessageGroupingTest.java
trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
trunk/tests/src/org/hornetq/tests/integration/client/TransactionDurabilityTest.java
trunk/tests/src/org/hornetq/tests/integration/client/WildCardRoutingTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithDiscoveryGroupStartTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
trunk/tests/src/org/hornetq/tests/integration/divert/DivertTest.java
trunk/tests/src/org/hornetq/tests/integration/divert/PersistentDivertTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/client/PreACKJMSTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/consumer/ConsumerTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java
trunk/tests/src/org/hornetq/tests/integration/management/ManagementTestBase.java
trunk/tests/src/org/hornetq/tests/integration/management/NotificationTest.java
trunk/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java
trunk/tests/src/org/hornetq/tests/integration/management/SecurityNotificationTest.java
trunk/tests/src/org/hornetq/tests/integration/paging/PageCrashTest.java
trunk/tests/src/org/hornetq/tests/integration/scheduling/ScheduledMessageTest.java
trunk/tests/src/org/hornetq/tests/integration/server/LVQRecoveryTest.java
trunk/tests/src/org/hornetq/tests/integration/server/LVQTest.java
trunk/tests/src/org/hornetq/tests/integration/server/PredefinedQueueTest.java
trunk/tests/src/org/hornetq/tests/integration/xa/BasicXaRecoveryTest.java
trunk/tests/src/org/hornetq/tests/integration/xa/XaTimeoutTest.java
trunk/tests/src/org/hornetq/tests/stress/journal/LargeJournalStressTest.java
Log:
replace receive(x) with receiveImmediate in many tests
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2009-11-09 15:26:12 UTC (rev 8253)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2009-11-09 21:20:59 UTC (rev 8254)
@@ -213,10 +213,12 @@
{
break;
}
+
// we only force delivery once per call to receive
if (!deliveryForced)
{
session.forceDelivery(id, forceDeliveryCount.incrementAndGet());
+
deliveryForced = true;
}
}
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-11-09 15:26:12 UTC (rev 8253)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-11-09 21:20:59 UTC (rev 8254)
@@ -591,7 +591,7 @@
final boolean preAcknowledge,
final boolean xa,
final int sendWindowSize) throws Exception
- {
+ {
if (!started)
{
throw new HornetQException(HornetQException.SESSION_CREATION_REJECTED, "Server not started");
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-11-09 15:26:12 UTC (rev 8253)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-11-09 21:20:59 UTC (rev 8254)
@@ -496,19 +496,7 @@
public synchronized int getMessageCount()
{
- int count = messageReferences.size() + getScheduledCount() + getDeliveringCount();
-
- // log.info(System.identityHashCode(this) + " message count is " +
- // count +
- // " ( mr:" +
- // messageReferences.size() +
- // " sc:" +
- // getScheduledCount() +
- // " dc:" +
- // getDeliveringCount() +
- // ")");
-
- return count;
+ return messageReferences.size() + getScheduledCount() + getDeliveringCount();
}
public synchronized int getScheduledCount()
@@ -1432,7 +1420,6 @@
}
message.decrementRefCount(store, ref);
-
}
void postRollback(final LinkedList<MessageReference> refs) throws Exception
Modified: trunk/tests/src/org/hornetq/tests/integration/DuplicateDetectionTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/DuplicateDetectionTest.java 2009-11-09 15:26:12 UTC (rev 8253)
+++ trunk/tests/src/org/hornetq/tests/integration/DuplicateDetectionTest.java 2009-11-09 21:20:59 UTC (rev 8254)
@@ -83,13 +83,13 @@
message = createMessage(session, 2);
message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
producer.send(message);
- message2 = consumer.receive(250);
+ message2 = consumer.receiveImmediate();
assertNull(message2);
message = createMessage(session, 3);
message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
producer.send(message);
- message2 = consumer.receive(250);
+ message2 = consumer.receiveImmediate();
assertNull(message2);
// Now try with a different id
@@ -104,13 +104,13 @@
message = createMessage(session, 5);
message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2.getData());
producer.send(message);
- message2 = consumer.receive(1000);
+ message2 = consumer.receiveImmediate();
assertNull(message2);
message = createMessage(session, 6);
message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
producer.send(message);
- message2 = consumer.receive(250);
+ message2 = consumer.receiveImmediate();
assertNull(message2);
session.close();
@@ -149,13 +149,13 @@
message = createMessage(session, 2);
message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
producer.send(message);
- message2 = consumer.receive(250);
+ message2 = consumer.receiveImmediate();
assertNull(message2);
message = createMessage(session, 3);
message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
producer.send(message);
- message2 = consumer.receive(250);
+ message2 = consumer.receiveImmediate();
assertNull(message2);
// Now try with a different id
@@ -170,13 +170,13 @@
message = createMessage(session, 5);
message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2);
producer.send(message);
- message2 = consumer.receive(1000);
+ message2 = consumer.receiveImmediate();
assertNull(message2);
message = createMessage(session, 6);
message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
producer.send(message);
- message2 = consumer.receive(250);
+ message2 = consumer.receiveImmediate();
assertNull(message2);
session.close();
@@ -253,11 +253,11 @@
producer3.send(message);
}
- ClientMessage message = consumer1.receive(100);
+ ClientMessage message = consumer1.receiveImmediate();
assertNull(message);
- message = consumer2.receive(100);
+ message = consumer2.receiveImmediate();
assertNull(message);
- message = consumer3.receive(100);
+ message = consumer3.receiveImmediate();
assertNull(message);
for (int i = 0; i < cacheSize; i++)
@@ -299,11 +299,11 @@
producer3.send(message);
}
- message = consumer1.receive(100);
+ message = consumer1.receiveImmediate();
assertNull(message);
- message = consumer2.receive(100);
+ message = consumer2.receiveImmediate();
assertNull(message);
- message = consumer3.receive(100);
+ message = consumer3.receiveImmediate();
assertNull(message);
// Should be able to send the first lot again now - since the second lot pushed the
@@ -379,7 +379,7 @@
message = consumer.receive(250);
assertEquals(1, message.getObjectProperty(propKey));
- message = consumer.receive(250);
+ message = consumer.receiveImmediate();
assertNull(message);
session.close();
@@ -421,7 +421,7 @@
message = consumer.receive(250);
assertEquals(1, message.getObjectProperty(propKey));
- message = consumer.receive(250);
+ message = consumer.receiveImmediate();
assertNull(message);
session.close();
@@ -475,7 +475,7 @@
message = consumer.receive(250);
assertEquals(1, message.getObjectProperty(propKey));
- message = consumer.receive(250);
+ message = consumer.receiveImmediate();
assertNull(message);
session.close();
@@ -536,7 +536,7 @@
message = consumer.receive(250);
assertEquals(0, message.getObjectProperty(propKey));
- message = consumer.receive(250);
+ message = consumer.receiveImmediate();
assertNull(message);
session.close();
@@ -602,7 +602,7 @@
message = consumer.receive(250);
assertEquals(1, message.getObjectProperty(propKey));
- message = consumer.receive(250);
+ message = consumer.receiveImmediate();
assertNull(message);
log.info("ending session");
@@ -679,7 +679,7 @@
message = consumer.receive(250);
assertEquals(1, message.getObjectProperty(propKey));
- message = consumer.receive(250);
+ message = consumer.receiveImmediate();
assertNull(message);
log.info("ending session");
@@ -755,7 +755,7 @@
message = consumer.receive(250);
- message = consumer.receive(250);
+ message = consumer.receiveImmediate();
assertNull(message);
log.info("ending session");
@@ -833,7 +833,7 @@
message = consumer.receive(250);
- message = consumer.receive(250);
+ message = consumer.receiveImmediate();
assertNull(message);
log.info("ending session");
@@ -924,13 +924,13 @@
message = createMessage(session, 1);
message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
producer.send(message);
- message2 = consumer.receive(200);
+ message2 = consumer.receiveImmediate();
assertNull(message2);
message = createMessage(session, 2);
message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2.getData());
producer.send(message);
- message2 = consumer.receive(200);
+ message2 = consumer.receiveImmediate();
assertNull(message2);
session.close();
@@ -1006,7 +1006,7 @@
SimpleString dupID = new SimpleString("abcdefg" + i);
message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
producer.send(message);
- ClientMessage message2 = consumer.receive(100);
+ ClientMessage message2 = consumer.receiveImmediate();
assertNull(message2);
}
@@ -1094,7 +1094,7 @@
}
else
{
- ClientMessage message2 = consumer.receive(100);
+ ClientMessage message2 = consumer.receiveImmediate();
assertNull(message2);
}
}
@@ -1194,7 +1194,7 @@
}
else
{
- ClientMessage message2 = consumer.receive(100);
+ ClientMessage message2 = consumer.receiveImmediate();
assertNull(message2);
}
}
@@ -1448,14 +1448,14 @@
message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
producer.send(message);
session.commit();
- message2 = consumer.receive(200);
+ message2 = consumer.receiveImmediate();
assertNull(message2);
message = createMessage(session, 2);
message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2.getData());
producer.send(message);
session.commit();
- message2 = consumer.receive(200);
+ message2 = consumer.receiveImmediate();
assertNull(message2);
session.close();
@@ -1749,10 +1749,10 @@
session.start(xid3, XAResource.TMNOFLAGS);
- ClientMessage message2 = consumer.receive(200);
+ ClientMessage message2 = consumer.receiveImmediate();
assertNull(message2);
- message2 = consumer.receive(200);
+ message2 = consumer.receiveImmediate();
assertNull(message2);
session.close();
@@ -1847,10 +1847,10 @@
session.start(xid3, XAResource.TMNOFLAGS);
- ClientMessage message2 = consumer.receive(200);
+ ClientMessage message2 = consumer.receiveImmediate();
assertNull(message2);
- message2 = consumer.receive(200);
+ message2 = consumer.receiveImmediate();
assertNull(message2);
session.close();
Modified: trunk/tests/src/org/hornetq/tests/integration/InterceptorTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/InterceptorTest.java 2009-11-09 15:26:12 UTC (rev 8253)
+++ trunk/tests/src/org/hornetq/tests/integration/InterceptorTest.java 2009-11-09 21:20:59 UTC (rev 8254)
@@ -325,7 +325,7 @@
session.start();
- ClientMessage message = consumer.receive(250);
+ ClientMessage message = consumer.receiveImmediate();
assertNull(message);
@@ -416,7 +416,7 @@
session.start();
- ClientMessage message = consumer.receive(250);
+ ClientMessage message = consumer.receive(100);
assertNull(message);
@@ -500,7 +500,7 @@
producer.send(message);
}
- ClientMessage message = consumer.receive(250);
+ ClientMessage message = consumer.receiveImmediate();
assertNull(message);
@@ -589,7 +589,7 @@
producer.send(message);
}
- ClientMessage message = consumer.receive(250);
+ ClientMessage message = consumer.receive(100);
assertNull(message);
Modified: trunk/tests/src/org/hornetq/tests/integration/client/CompactingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/CompactingTest.java 2009-11-09 15:26:12 UTC (rev 8253)
+++ trunk/tests/src/org/hornetq/tests/integration/client/CompactingTest.java 2009-11-09 21:20:59 UTC (rev 8254)
@@ -383,7 +383,7 @@
cons = sess.createConsumer(Q2);
- assertNull(cons.receive(100));
+ assertNull(cons.receiveImmediate());
cons.close();
Modified: trunk/tests/src/org/hornetq/tests/integration/client/DeadLetterAddressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/DeadLetterAddressTest.java 2009-11-09 15:26:12 UTC (rev 8253)
+++ trunk/tests/src/org/hornetq/tests/integration/client/DeadLetterAddressTest.java 2009-11-09 21:20:59 UTC (rev 8254)
@@ -64,7 +64,7 @@
assertEquals(m.getBody().readString(), "heyho!");
// force a cancel
clientSession.rollback();
- m = clientConsumer.receive(500);
+ m = clientConsumer.receiveImmediate();
assertNull(m);
clientConsumer.close();
clientConsumer = clientSession.createConsumer(dlq);
@@ -96,7 +96,7 @@
assertEquals(m.getBody().readString(), "heyho!");
// force a cancel
clientSession.rollback();
- m = clientConsumer.receive(500);
+ m = clientConsumer.receiveImmediate();
assertNull(m);
clientConsumer.close();
clientConsumer = clientSession.createConsumer(dlq);
@@ -130,7 +130,7 @@
assertEquals(m.getBody().readString(), "heyho!");
// force a cancel
clientSession.rollback();
- m = clientConsumer.receive(500);
+ m = clientConsumer.receiveImmediate();
assertNull(m);
clientConsumer.close();
}
@@ -180,7 +180,7 @@
}
assertEquals(0, ((Queue)server.getPostOffice().getBinding(qName).getBindable()).getMessageCount());
- ClientMessage m = clientConsumer.receive(1000);
+ ClientMessage m = clientConsumer.receiveImmediate();
assertNull(m);
// All the messages should now be in the DLQ
@@ -281,7 +281,7 @@
m.acknowledge();
clientSession.rollback();
}
- ClientMessage m = clientConsumer.receive(500);
+ ClientMessage m = clientConsumer.receiveImmediate();
assertNull(m);
clientConsumer.close();
@@ -334,8 +334,8 @@
clientSession.rollback();
}
- assertNull(defaultDeadLetterConsumer.receive(500));
- assertNull(specificDeadLetterConsumer.receive(500));
+ assertNull(defaultDeadLetterConsumer.receiveImmediate());
+ assertNull(specificDeadLetterConsumer.receiveImmediate());
// one more redelivery attempt:
ClientMessage m = clientConsumer.receive(500);
@@ -344,7 +344,7 @@
m.acknowledge();
clientSession.rollback();
- assertNull(defaultDeadLetterConsumer.receive(500));
+ assertNull(defaultDeadLetterConsumer.receiveImmediate());
assertNotNull(specificDeadLetterConsumer.receive(500));
}
Modified: trunk/tests/src/org/hornetq/tests/integration/client/ExpiryAddressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/ExpiryAddressTest.java 2009-11-09 15:26:12 UTC (rev 8253)
+++ trunk/tests/src/org/hornetq/tests/integration/client/ExpiryAddressTest.java 2009-11-09 21:20:59 UTC (rev 8254)
@@ -60,10 +60,10 @@
clientSession.start();
ClientConsumer clientConsumer = clientSession.createConsumer(qName);
- ClientMessage m = clientConsumer.receive(500);
+ ClientMessage m = clientConsumer.receiveImmediate();
assertNull(m);
System.out.println("size3 = " + server.getPostOffice().getPagingManager().getTotalMemory());
- m = clientConsumer.receive(500);
+ m = clientConsumer.receiveImmediate();
assertNull(m);
clientConsumer.close();
clientConsumer = clientSession.createConsumer(eq);
@@ -100,7 +100,7 @@
clientSession.start();
ClientConsumer clientConsumer = clientSession.createConsumer(qName);
- ClientMessage m = clientConsumer.receive(500);
+ ClientMessage m = clientConsumer.receiveImmediate();
System.out.println("pageSize after message received = " + server.getPostOffice().getPagingManager().getTotalMemory());
@@ -155,7 +155,7 @@
producer.send(clientMessage);
clientSession.start();
ClientConsumer clientConsumer = clientSession.createConsumer(qName);
- ClientMessage m = clientConsumer.receive(500);
+ ClientMessage m = clientConsumer.receiveImmediate();
assertNull(m);
clientConsumer.close();
}
@@ -185,7 +185,7 @@
ClientConsumer clientConsumer = clientSession.createConsumer(qName);
clientSession.start();
- ClientMessage m = clientConsumer.receive(1000);
+ ClientMessage m = clientConsumer.receiveImmediate();
assertNull(m);
// All the messages should now be in the EQ
@@ -227,7 +227,7 @@
clientSession.start();
ClientConsumer clientConsumer = clientSession.createConsumer(qName);
- ClientMessage m = clientConsumer.receive(500);
+ ClientMessage m = clientConsumer.receiveImmediate();
assertNull(m);
clientConsumer.close();
@@ -257,7 +257,7 @@
clientSession.start();
ClientConsumer clientConsumer = clientSession.createConsumer(qName);
- ClientMessage m = clientConsumer.receive(500);
+ ClientMessage m = clientConsumer.receiveImmediate();
assertNull(m);
clientConsumer.close();
@@ -295,12 +295,12 @@
clientSession.start();
ClientConsumer clientConsumer = clientSession.createConsumer(queue);
- ClientMessage m = clientConsumer.receive(500);
+ ClientMessage m = clientConsumer.receiveImmediate();
assertNull(m);
clientConsumer.close();
clientConsumer = clientSession.createConsumer(defaultExpiryQueue);
- m = clientConsumer.receive(500);
+ m = clientConsumer.receiveImmediate();
assertNull(m);
clientConsumer.close();
Modified: trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2009-11-09 15:26:12 UTC (rev 8253)
+++ trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2009-11-09 21:20:59 UTC (rev 8254)
@@ -397,7 +397,7 @@
ClientConsumer consumerExpired = session.createConsumer(ADDRESS);
// to kick expiry quicker than waiting reaper thread
- assertNull(consumerExpired.receive(1000));
+ assertNull(consumerExpired.receiveImmediate());
consumerExpired.close();
ClientConsumer consumerExpiry = session.createConsumer(ADDRESS_EXPIRY);
@@ -532,7 +532,7 @@
ClientConsumer consumerExpired = session.createConsumer(ADDRESS);
// to kick expiry quicker than waiting reaper thread
- assertNull(consumerExpired.receive(1000));
+ assertNull(consumerExpired.receiveImmediate());
consumerExpired.close();
ClientConsumer consumerExpiry = session.createConsumer(ADDRESS_EXPIRY);
@@ -666,7 +666,7 @@
// Creating a consumer just to make the expiry process go faster and not have to wait for the reaper
ClientConsumer consumer2 = session.createConsumer(ADDRESS);
- assertNull(consumer2.receive(1000));
+ assertNull(consumer2.receiveImmediate());
ClientMessage msg1 = consumer.receive(50000);
@@ -1370,7 +1370,7 @@
ClientConsumer consumer = session.createConsumer(queue[1]);
ClientMessage msg = consumer.receive(RECEIVE_WAIT_TIME);
- assertNull(consumer.receive(1000));
+ assertNull(consumer.receiveImmediate());
assertNotNull(msg);
msg.acknowledge();
Modified: trunk/tests/src/org/hornetq/tests/integration/client/MessageDurabilityTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/MessageDurabilityTest.java 2009-11-09 15:26:12 UTC (rev 8253)
+++ trunk/tests/src/org/hornetq/tests/integration/client/MessageDurabilityTest.java 2009-11-09 21:20:59 UTC (rev 8254)
@@ -91,7 +91,7 @@
session.start();
ClientConsumer consumer = session.createConsumer(queue);
- assertNull(consumer.receive(500));
+ assertNull(consumer.receiveImmediate());
consumer.close();
session.deleteQueue(queue);
Modified: trunk/tests/src/org/hornetq/tests/integration/client/MessageExpirationTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/MessageExpirationTest.java 2009-11-09 15:26:12 UTC (rev 8253)
+++ trunk/tests/src/org/hornetq/tests/integration/client/MessageExpirationTest.java 2009-11-09 21:20:59 UTC (rev 8254)
@@ -72,7 +72,7 @@
session.start();
ClientConsumer consumer = session.createConsumer(queue);
- ClientMessage message2 = consumer.receive(500);
+ ClientMessage message2 = consumer.receiveImmediate();
assertNull(message2);
consumer.close();
@@ -100,9 +100,8 @@
assertEquals(0, ((Queue)server.getPostOffice().getBinding(queue).getBindable()).getDeliveringCount());
assertEquals(0, ((Queue)server.getPostOffice().getBinding(queue).getBindable()).getMessageCount());
-
- ClientMessage message2 = consumer.receive(500);
+ ClientMessage message2 = consumer.receiveImmediate();
assertNull(message2);
consumer.close();
@@ -126,7 +125,7 @@
Thread.sleep(EXPIRATION * 2);
ClientConsumer consumer = session.createConsumer(queue);
- ClientMessage message2 = consumer.receive(500);
+ ClientMessage message2 = consumer.receiveImmediate();
assertNull(message2);
assertEquals(0, ((Queue)server.getPostOffice().getBinding(queue).getBindable()).getDeliveringCount());
@@ -166,7 +165,7 @@
session.start();
ClientConsumer consumer = session.createConsumer(queue);
- ClientMessage message2 = consumer.receive(500);
+ ClientMessage message2 = consumer.receiveImmediate();
assertNull(message2);
ClientConsumer expiryConsumer = session.createConsumer(expiryQueue);
Modified: trunk/tests/src/org/hornetq/tests/integration/client/MessageGroupingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/MessageGroupingTest.java 2009-11-09 15:26:12 UTC (rev 8253)
+++ trunk/tests/src/org/hornetq/tests/integration/client/MessageGroupingTest.java 2009-11-09 21:20:59 UTC (rev 8254)
@@ -303,7 +303,7 @@
consumer.close();
consumer2.close();
consumer = this.clientSession.createConsumer(qName);
- assertNull(consumer.receive(500));
+ assertNull(consumer.receiveImmediate());
clientSession.close();
}
@@ -372,7 +372,7 @@
i += 2;
}
consumer = this.clientSession.createConsumer(qName);
- assertNull(consumer.receive(500));
+ assertNull(consumer.receiveImmediate());
clientSession.close();
}
@@ -429,7 +429,7 @@
consumer.close();
consumer2.close();
consumer = this.clientSession.createConsumer(qName);
- assertNull(consumer.receive(500));
+ assertNull(consumer.receiveImmediate());
clientSession.close();
}
@@ -506,7 +506,7 @@
i += 2;
}
consumer = this.clientSession.createConsumer(qName);
- assertNull(consumer.receive(500));
+ assertNull(consumer.receiveImmediate());
clientSession.close();
}
Modified: trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2009-11-09 15:26:12 UTC (rev 8253)
+++ trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2009-11-09 21:20:59 UTC (rev 8254)
@@ -279,7 +279,7 @@
assertNotNull(msg);
}
- assertNull(consumer.receive(100));
+ assertNull(consumer.receiveImmediate());
consumer.close();
}
@@ -292,7 +292,7 @@
ClientConsumer consumer = session.createConsumer(ADDRESS);
- assertNull(consumer.receive(100));
+ assertNull(consumer.receiveImmediate());
sessionTransacted.commit();
@@ -312,7 +312,7 @@
message.acknowledge();
}
- assertNull(consumer.receive(100));
+ assertNull(consumer.receiveImmediate());
consumer.close();
@@ -524,7 +524,7 @@
session.start();
- assertNull(consumer.receive(100));
+ assertNull(consumer.receiveImmediate());
session.close();
@@ -690,7 +690,7 @@
message2.acknowledge();
}
- assertNull(consumer.receive(100));
+ assertNull(consumer.receiveImmediate());
assertEquals(0, server.getPostOffice().getPagingManager().getTotalMemory());
assertEquals(0, server.getPostOffice().getPagingManager().getPageStore(ADDRESS).getAddressSize());
@@ -714,7 +714,7 @@
message2.acknowledge();
}
- assertNull(consumer.receive(100));
+ assertNull(consumer.receiveImmediate());
session.close();
@@ -749,7 +749,7 @@
session.commit();
- assertNull(consumer.receive(100));
+ assertNull(consumer.receiveImmediate());
session.close();
Modified: trunk/tests/src/org/hornetq/tests/integration/client/TransactionDurabilityTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/TransactionDurabilityTest.java 2009-11-09 15:26:12 UTC (rev 8253)
+++ trunk/tests/src/org/hornetq/tests/integration/client/TransactionDurabilityTest.java 2009-11-09 21:20:59 UTC (rev 8254)
@@ -129,7 +129,7 @@
consumer2 = session2.createConsumer(queue2);
- m1 = consumer1.receive(100);
+ m1 = consumer1.receiveImmediate();
assertNull(m1);
@@ -161,11 +161,11 @@
consumer2 = session2.createConsumer(queue2);
- m1 = consumer1.receive(100);
+ m1 = consumer1.receiveImmediate();
assertNull(m1);
- m2 = consumer2.receive(100);
+ m2 = consumer2.receiveImmediate();
assertNull(m2);
Modified: trunk/tests/src/org/hornetq/tests/integration/client/WildCardRoutingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/WildCardRoutingTest.java 2009-11-09 15:26:12 UTC (rev 8253)
+++ trunk/tests/src/org/hornetq/tests/integration/client/WildCardRoutingTest.java 2009-11-09 21:20:59 UTC (rev 8254)
@@ -60,7 +60,7 @@
assertNotNull(m);
assertEquals("m2", m.getBody().readString());
m.acknowledge();
- m = clientConsumer.receive(500);
+ m = clientConsumer.receiveImmediate();
assertNull(m);
}
@@ -85,7 +85,7 @@
assertNotNull(m);
assertEquals("m2", m.getBody().readString());
m.acknowledge();
- m = clientConsumer.receive(500);
+ m = clientConsumer.receiveImmediate();
assertNull(m);
clientConsumer.close();
clientSession.deleteQueue(queueName);
@@ -118,7 +118,7 @@
assertNotNull(m);
assertEquals("m2", m.getBody().readString());
m.acknowledge();
- m = clientConsumer.receive(500);
+ m = clientConsumer.receiveImmediate();
assertNull(m);
clientConsumer.close();
clientSession.deleteQueue(queueName);
@@ -159,7 +159,7 @@
assertNotNull(m);
assertEquals("m2", m.getBody().readString());
m.acknowledge();
- m = clientConsumer.receive(500);
+ m = clientConsumer.receiveImmediate();
assertNull(m);
}
@@ -188,7 +188,7 @@
assertNotNull(m);
assertEquals("m2", m.getBody().readString());
m.acknowledge();
- m = clientConsumer.receive(500);
+ m = clientConsumer.receiveImmediate();
assertNull(m);
}
@@ -306,7 +306,7 @@
assertNotNull(m);
assertEquals("m9", m.getBody().readString());
m.acknowledge();
- m = clientConsumer.receive(500);
+ m = clientConsumer.receiveImmediate();
assertNull(m);
//now remove all the queues
clientSession.deleteQueue(queueName1);
@@ -403,7 +403,7 @@
assertNotNull(m);
assertEquals("m9", m.getBody().readString());
m.acknowledge();
- m = clientConsumer.receive(500);
+ m = clientConsumer.receiveImmediate();
assertNull(m);
//now remove all the queues
clientSession.deleteQueue(queueName1);
@@ -445,7 +445,7 @@
assertNotNull(m);
assertEquals("m2", m.getBody().readString());
m.acknowledge();
- m = clientConsumer.receive(500);
+ m = clientConsumer.receiveImmediate();
assertNull(m);
}
@@ -474,7 +474,7 @@
assertNotNull(m);
assertEquals("m2", m.getBody().readString());
m.acknowledge();
- m = clientConsumer.receive(500);
+ m = clientConsumer.receiveImmediate();
assertNull(m);
}
@@ -504,7 +504,7 @@
assertNotNull(m);
assertEquals("m2", m.getBody().readString());
m.acknowledge();
- m = clientConsumer.receive(500);
+ m = clientConsumer.receiveImmediate();
assertNull(m);
}
@@ -533,7 +533,7 @@
assertNotNull(m);
assertEquals("m2", m.getBody().readString());
m.acknowledge();
- m = clientConsumer.receive(500);
+ m = clientConsumer.receiveImmediate();
assertNull(m);
}
@@ -558,7 +558,7 @@
assertNotNull(m);
assertEquals("m1", m.getBody().readString());
m.acknowledge();
- m = clientConsumer.receive(500);
+ m = clientConsumer.receiveImmediate();
assertNull(m);
}
@@ -610,7 +610,7 @@
assertNotNull(m);
assertEquals("m2", m.getBody().readString());
m.acknowledge();
- m = clientConsumer.receive(500);
+ m = clientConsumer.receiveImmediate();
assertNull(m);
}
@@ -635,7 +635,7 @@
assertNotNull(m);
assertEquals("m2", m.getBody().readString());
m.acknowledge();
- m = clientConsumer.receive(500);
+ m = clientConsumer.receiveImmediate();
assertNull(m);
}
@@ -660,7 +660,7 @@
assertNotNull(m);
assertEquals("m1", m.getBody().readString());
m.acknowledge();
- m = clientConsumer.receive(500);
+ m = clientConsumer.receiveImmediate();
assertNull(m);
}
@@ -685,7 +685,7 @@
assertNotNull(m);
assertEquals("m1", m.getBody().readString());
m.acknowledge();
- m = clientConsumer.receive(500);
+ m = clientConsumer.receiveImmediate();
assertNull(m);
}
@@ -710,7 +710,7 @@
assertNotNull(m);
assertEquals("m1", m.getBody().readString());
m.acknowledge();
- m = clientConsumer.receive(500);
+ m = clientConsumer.receiveImmediate();
assertNull(m);
}
@@ -742,7 +742,7 @@
assertNotNull(m);
assertEquals("m2", m.getBody().readString());
m.acknowledge();
- m = clientConsumer.receive(500);
+ m = clientConsumer.receiveImmediate();
assertNull(m);
clientConsumer.close();
clientSession.deleteQueue(queueName);
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java 2009-11-09 15:26:12 UTC (rev 8253)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java 2009-11-09 21:20:59 UTC (rev 8254)
@@ -143,7 +143,7 @@
message.acknowledge();
}
- assertNull(consumer1.receive(200));
+ assertNull(consumer1.receiveImmediate());
Bridge bridge = server0.getClusterManager().getBridges().get(bridgeName);
@@ -158,7 +158,7 @@
producer0.send(message);
}
- assertNull(consumer1.receive(500));
+ assertNull(consumer1.receiveImmediate());
bridge.start();
@@ -173,7 +173,7 @@
message.acknowledge();
}
- assertNull(consumer1.receive(200));
+ assertNull(consumer1.receiveImmediate());
session0.close();
@@ -294,7 +294,7 @@
message.acknowledge();
}
- assertNull(consumer1.receive(200));
+ assertNull(consumer1.receiveImmediate());
for (int i = 0; i < numMessages; i++)
{
@@ -316,7 +316,7 @@
message.acknowledge();
}
- assertNull(consumer1.receive(200));
+ assertNull(consumer1.receiveImmediate());
session1.close();
@@ -364,7 +364,7 @@
message.acknowledge();
}
- assertNull(consumer1.receive(200));
+ assertNull(consumer1.receiveImmediate());
session1.close();
@@ -474,7 +474,7 @@
session1.start();
// Won't be received since the bridge was deactivated
- assertNull(consumer1.receive(200));
+ assertNull(consumer1.receiveImmediate());
// Now start the bridge manually
@@ -495,7 +495,7 @@
message.acknowledge();
}
- assertNull(consumer1.receive(200));
+ assertNull(consumer1.receiveImmediate());
session1.close();
@@ -605,7 +605,7 @@
message.acknowledge();
}
- assertNull(consumer1.receive(200));
+ assertNull(consumer1.receiveImmediate());
// Now stop the bridge manually
@@ -624,7 +624,7 @@
producer0.send(message);
}
- assertNull(consumer1.receive(200));
+ assertNull(consumer1.receiveImmediate());
bridge.start();
@@ -644,7 +644,7 @@
message.acknowledge();
}
- assertNull(consumer1.receive(200));
+ assertNull(consumer1.receiveImmediate());
bridge.stop();
@@ -657,7 +657,7 @@
producer0.send(message);
}
- assertNull(consumer1.receive(200));
+ assertNull(consumer1.receiveImmediate());
bridge.start();
@@ -672,7 +672,7 @@
message.acknowledge();
}
- assertNull(consumer1.receive(200));
+ assertNull(consumer1.receiveImmediate());
session1.close();
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2009-11-09 15:26:12 UTC (rev 8253)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2009-11-09 21:20:59 UTC (rev 8254)
@@ -178,7 +178,7 @@
message.acknowledge();
}
- assertNull(consumer1.receive(200));
+ assertNull(consumer1.receiveImmediate());
session0.close();
@@ -341,7 +341,7 @@
producer0.send(message);
}
- assertNull(consumer1.receive(200));
+ assertNull(consumer1.receiveImmediate());
for (int i = 0; i < numMessages; i++)
{
@@ -375,7 +375,7 @@
}
}
- assertNull(consumer1.receive(200));
+ assertNull(consumer1.receiveImmediate());
session0.close();
@@ -520,7 +520,7 @@
}
- assertNull(consumer1.receive(200));
+ assertNull(consumer1.receiveImmediate());
session0.close();
@@ -642,7 +642,7 @@
message.acknowledge();
}
- assertNull(consumer1.receive(200));
+ assertNull(consumer1.receiveImmediate());
session0.close();
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithDiscoveryGroupStartTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithDiscoveryGroupStartTest.java 2009-11-09 15:26:12 UTC (rev 8253)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithDiscoveryGroupStartTest.java 2009-11-09 21:20:59 UTC (rev 8254)
@@ -156,7 +156,7 @@
message.acknowledge();
}
- assertNull(consumer1.receive(200));
+ assertNull(consumer1.receiveImmediate());
Bridge bridge = server0.getClusterManager().getBridges().get(bridgeName);
@@ -171,7 +171,7 @@
producer0.send(message);
}
- assertNull(consumer1.receive(500));
+ assertNull(consumer1.receiveImmediate());
bridge.start();
@@ -186,7 +186,7 @@
message.acknowledge();
}
- assertNull(consumer1.receive(200));
+ assertNull(consumer1.receiveImmediate());
session0.close();
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2009-11-09 15:26:12 UTC (rev 8253)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2009-11-09 21:20:59 UTC (rev 8254)
@@ -171,7 +171,7 @@
return;
}
- Thread.sleep(100);
+ Thread.sleep(10);
}
while (System.currentTimeMillis() - start < WAIT_TIMEOUT);
@@ -191,7 +191,7 @@
{
return;
}
- Thread.sleep(100);
+ Thread.sleep(10);
}
while (System.currentTimeMillis() - start < WAIT_TIMEOUT);
String msg = "Timed out waiting for server starting = " + node;
@@ -259,7 +259,7 @@
return;
}
- Thread.sleep(100);
+ Thread.sleep(10);
}
while (System.currentTimeMillis() - start < WAIT_TIMEOUT);
@@ -983,7 +983,7 @@
throw new IllegalArgumentException("No consumer at " + consumerIDs[i]);
}
- assertNull("consumer " + i + " received message", holder.consumer.receive(200));
+ assertNull("consumer " + i + " received message", holder.consumer.receiveImmediate());
}
}
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2009-11-09 15:26:12 UTC (rev 8253)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2009-11-09 21:20:59 UTC (rev 8254)
@@ -318,7 +318,7 @@
session.start();
- ClientMessage message = consumer.receive(500);
+ ClientMessage message = consumer.receiveImmediate();
assertNull(message);
@@ -399,7 +399,7 @@
}
}
- assertNull(consumer.receive(1000));
+ assertNull(consumer.receiveImmediate());
session.commit();
@@ -488,7 +488,7 @@
}
}
- assertNull(consumer.receive(1000));
+ assertNull(consumer.receiveImmediate());
session.commit();
@@ -666,7 +666,7 @@
session2.commit();
- assertNull(consumer.receive(1000));
+ assertNull(consumer.receiveImmediate());
session1.close();
@@ -736,7 +736,7 @@
session.start();
- ClientMessage message = consumer.receive(500);
+ ClientMessage message = consumer.receiveImmediate();
assertNull(message);
@@ -808,7 +808,7 @@
session.start();
- ClientMessage message = consumer.receive(500);
+ ClientMessage message = consumer.receiveImmediate();
assertNull(message);
@@ -883,7 +883,7 @@
session.start();
- ClientMessage message = consumer.receive(500);
+ ClientMessage message = consumer.receiveImmediate();
assertNull(message);
@@ -1887,7 +1887,7 @@
message.acknowledge();
}
- ClientMessage message = consumer.receive(500);
+ ClientMessage message = consumer.receiveImmediate();
assertNull(message);
@@ -2027,7 +2027,7 @@
message.acknowledge();
}
- ClientMessage message = consumer.receive(500);
+ ClientMessage message = consumer.receiveImmediate();
assertNull(message);
@@ -2134,7 +2134,7 @@
message2.acknowledge();
}
- ClientMessage message3 = consumer.receive(250);
+ ClientMessage message3 = consumer.receiveImmediate();
assertNull(message3);
Modified: trunk/tests/src/org/hornetq/tests/integration/divert/DivertTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/divert/DivertTest.java 2009-11-09 15:26:12 UTC (rev 8253)
+++ trunk/tests/src/org/hornetq/tests/integration/divert/DivertTest.java 2009-11-09 21:20:59 UTC (rev 8254)
@@ -118,7 +118,7 @@
message.acknowledge();
}
- assertNull(consumer1.receive(500));
+ assertNull(consumer1.receiveImmediate());
for (int i = 0; i < numMessages; i++)
{
@@ -131,7 +131,7 @@
message.acknowledge();
}
- assertNull(consumer2.receive(TIMEOUT));
+ assertNull(consumer2.receiveImmediate());
session.close();
@@ -728,7 +728,7 @@
assertNull(consumer3.receiveImmediate());
- assertNull(consumer4.receive(200));
+ assertNull(consumer4.receiveImmediate());
session.close();
Modified: trunk/tests/src/org/hornetq/tests/integration/divert/PersistentDivertTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/divert/PersistentDivertTest.java 2009-11-09 15:26:12 UTC (rev 8253)
+++ trunk/tests/src/org/hornetq/tests/integration/divert/PersistentDivertTest.java 2009-11-09 21:20:59 UTC (rev 8254)
@@ -133,7 +133,7 @@
message.acknowledge();
}
- assertNull(consumer1.receive(200));
+ assertNull(consumer1.receiveImmediate());
for (int i = 0; i < numMessages; i++)
{
@@ -146,7 +146,7 @@
message.acknowledge();
}
- assertNull(consumer2.receive(200));
+ assertNull(consumer2.receiveImmediate());
for (int i = 0; i < numMessages; i++)
{
@@ -159,7 +159,7 @@
message.acknowledge();
}
- assertNull(consumer3.receive(200));
+ assertNull(consumer3.receiveImmediate());
for (int i = 0; i < numMessages; i++)
{
@@ -172,7 +172,7 @@
message.acknowledge();
}
- assertNull(consumer4.receive(200));
+ assertNull(consumer4.receiveImmediate());
session.close();
@@ -285,7 +285,7 @@
message.acknowledge();
}
- assertNull(consumer1.receive(200));
+ assertNull(consumer1.receiveImmediate());
for (int i = 0; i < numMessages; i++)
{
@@ -298,7 +298,7 @@
message.acknowledge();
}
- assertNull(consumer2.receive(200));
+ assertNull(consumer2.receiveImmediate());
for (int i = 0; i < numMessages; i++)
{
@@ -311,7 +311,7 @@
message.acknowledge();
}
- assertNull(consumer3.receive(200));
+ assertNull(consumer3.receiveImmediate());
for (int i = 0; i < numMessages; i++)
{
@@ -324,7 +324,7 @@
message.acknowledge();
}
- assertNull(consumer4.receive(200));
+ assertNull(consumer4.receiveImmediate());
session.close();
@@ -348,13 +348,13 @@
consumer4 = session.createConsumer(queueName4);
- assertNull(consumer1.receive(200));
+ assertNull(consumer1.receiveImmediate());
- assertNull(consumer2.receive(200));
+ assertNull(consumer2.receiveImmediate());
- assertNull(consumer3.receive(200));
+ assertNull(consumer3.receiveImmediate());
- assertNull(consumer4.receive(200));
+ assertNull(consumer4.receiveImmediate());
session.close();
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeTest.java 2009-11-09 15:26:12 UTC (rev 8253)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeTest.java 2009-11-09 21:20:59 UTC (rev 8254)
@@ -528,7 +528,7 @@
assertEquals("message" + i, tm.getText());
}
- Message m = cons.receive(1000);
+ Message m = cons.receiveNoWait();
assertNull(m);
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/client/PreACKJMSTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/client/PreACKJMSTest.java 2009-11-09 15:26:12 UTC (rev 8253)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/client/PreACKJMSTest.java 2009-11-09 21:20:59 UTC (rev 8254)
@@ -116,7 +116,7 @@
cons = sess.createConsumer(queue);
- msg2 = (TextMessage)cons.receive(10);
+ msg2 = (TextMessage)cons.receiveNoWait();
assertNull("ConnectionFactory is on PreACK mode, the message shouldn't be received", msg2);
}
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java 2009-11-09 15:26:12 UTC (rev 8253)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java 2009-11-09 21:20:59 UTC (rev 8254)
@@ -134,7 +134,7 @@
assertEquals("message" + i, tm.getText());
}
- TextMessage tm = (TextMessage)consumer.receive(1000);
+ TextMessage tm = (TextMessage)consumer.receiveNoWait();
assertNull(tm);
@@ -219,7 +219,7 @@
assertEquals("message" + i, tm.getText());
}
- TextMessage tm = (TextMessage)consumerBackup.receive(1000);
+ TextMessage tm = (TextMessage)consumerBackup.receiveNoWait();
assertNull(tm);
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/consumer/ConsumerTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/consumer/ConsumerTest.java 2009-11-09 15:26:12 UTC (rev 8253)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/consumer/ConsumerTest.java 2009-11-09 21:20:59 UTC (rev 8254)
@@ -22,6 +22,7 @@
import javax.jms.TextMessage;
import org.hornetq.core.config.TransportConfiguration;
+import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.Queue;
import org.hornetq.jms.HornetQQueue;
import org.hornetq.jms.client.HornetQConnectionFactory;
@@ -34,6 +35,8 @@
*/
public class ConsumerTest extends JMSTestBase
{
+ private static final Logger log = Logger.getLogger(ConsumerTest.class);
+
private static final String Q_NAME = "ConsumerTestQueue";
private HornetQQueue jBossQueue;
@@ -112,6 +115,8 @@
public void testPreCommitAcksWithMessageExpiry() throws Exception
{
+ log.info("starting test");
+
Connection conn = cf.createConnection();
Session session = conn.createSession(false, HornetQSession.PRE_ACKNOWLEDGE);
jBossQueue = new HornetQQueue(Q_NAME);
@@ -128,12 +133,12 @@
Thread.sleep(2);
conn.start();
- Message m = consumer.receive(500);
+
+ Message m = consumer.receiveNoWait();
assertNull(m);
SimpleString queueName = new SimpleString(HornetQQueue.JMS_QUEUE_ADDRESS_PREFIX + Q_NAME);
- assertEquals(0, ((Queue)server.getPostOffice().getBinding(queueName).getBindable()).getDeliveringCount());
- assertEquals(0, ((Queue)server.getPostOffice().getBinding(queueName).getBindable()).getMessageCount());
+ assertEquals(0, ((Queue)server.getPostOffice().getBinding(queueName).getBindable()).getDeliveringCount());
conn.close();
}
@@ -156,12 +161,11 @@
Thread.sleep(2);
conn.start();
- Message m = consumer.receive(500);
+ Message m = consumer.receiveNoWait();
assertNull(m);
SimpleString queueName = new SimpleString(HornetQQueue.JMS_QUEUE_ADDRESS_PREFIX + Q_NAME);
- assertEquals(0, ((Queue)server.getPostOffice().getBinding(queueName).getBindable()).getDeliveringCount());
- assertEquals(0, ((Queue)server.getPostOffice().getBinding(queueName).getBindable()).getMessageCount());
+ assertEquals(0, ((Queue)server.getPostOffice().getBinding(queueName).getBindable()).getDeliveringCount());
conn.close();
}
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlTest.java 2009-11-09 15:26:12 UTC (rev 8253)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlTest.java 2009-11-09 21:20:59 UTC (rev 8254)
@@ -232,7 +232,7 @@
connection.start();
MessageConsumer consumer = JMSUtil.createConsumer(connection, queue);
- assertNull(consumer.receive(500));
+ assertNull(consumer.receiveNoWait());
connection.close();
}
@@ -379,7 +379,7 @@
assertEquals(msg_2.getJMSMessageID(), message.getJMSMessageID());
assertEquals(unmatchingValue, message.getLongProperty(key));
- assertNull(consumer.receive(500));
+ assertNull(consumer.receiveNoWait());
connection.close();
}
@@ -720,7 +720,7 @@
Message message = consumer.receive(500);
assertNotNull(message);
assertEquals(unmatchingValue, message.getLongProperty(key));
- assertNull(consumer.receive(500));
+ assertNull(consumer.receiveNoWait());
JMSUtil.consumeMessages(1, otherQueue);
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java 2009-11-09 15:26:12 UTC (rev 8253)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java 2009-11-09 21:20:59 UTC (rev 8254)
@@ -149,7 +149,7 @@
m = consumer.receive(500);
Assert.assertNotNull("expected to received " + expected + " messages, got only " + (i + 1), m);
}
- m = consumer.receive(500);
+ m = consumer.receiveNoWait();
Assert.assertNull("received one more message than expected (" + expected + ")", m);
}
finally
Modified: trunk/tests/src/org/hornetq/tests/integration/management/ManagementTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/ManagementTestBase.java 2009-11-09 15:26:12 UTC (rev 8253)
+++ trunk/tests/src/org/hornetq/tests/integration/management/ManagementTestBase.java 2009-11-09 21:20:59 UTC (rev 8254)
@@ -55,7 +55,7 @@
m.acknowledge();
}
session.commit();
- m = consumer.receive(500);
+ m = consumer.receiveImmediate();
assertNull("received one more message than expected (" + expected + ")", m);
}
finally
Modified: trunk/tests/src/org/hornetq/tests/integration/management/NotificationTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/NotificationTest.java 2009-11-09 15:26:12 UTC (rev 8253)
+++ trunk/tests/src/org/hornetq/tests/integration/management/NotificationTest.java 2009-11-09 21:20:59 UTC (rev 8254)
@@ -259,7 +259,7 @@
messages[i] = m;
m.acknowledge();
}
- m = consumer.receive(500);
+ m = consumer.receiveImmediate();
if (m != null)
{
for (SimpleString key : m.getPropertyNames())
Modified: trunk/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java 2009-11-09 15:26:12 UTC (rev 8253)
+++ trunk/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java 2009-11-09 21:20:59 UTC (rev 8254)
@@ -778,7 +778,7 @@
m.acknowledge();
// check there is no other message to consume:
- m = consumer.receive(500);
+ m = consumer.receiveImmediate();
assertNull(m);
consumer.close();
@@ -928,7 +928,7 @@
m.acknowledge();
// check there is no other message to consume:
- m = consumer.receive(500);
+ m = consumer.receiveImmediate();
assertNull(m);
consumer.close();
Modified: trunk/tests/src/org/hornetq/tests/integration/management/SecurityNotificationTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/SecurityNotificationTest.java 2009-11-09 15:26:12 UTC (rev 8253)
+++ trunk/tests/src/org/hornetq/tests/integration/management/SecurityNotificationTest.java 2009-11-09 21:20:59 UTC (rev 8254)
@@ -212,7 +212,7 @@
messages[i] = m;
m.acknowledge();
}
- m = consumer.receive(500);
+ m = consumer.receiveImmediate();
if (m != null)
{
for (SimpleString key : m.getPropertyNames())
Modified: trunk/tests/src/org/hornetq/tests/integration/paging/PageCrashTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/paging/PageCrashTest.java 2009-11-09 15:26:12 UTC (rev 8253)
+++ trunk/tests/src/org/hornetq/tests/integration/paging/PageCrashTest.java 2009-11-09 21:20:59 UTC (rev 8254)
@@ -102,7 +102,7 @@
ClientConsumer consumer = session.createConsumer(ADDRESS);
- assertNull(consumer.receive(200));
+ assertNull(consumer.receiveImmediate());
session.close();
}
Modified: trunk/tests/src/org/hornetq/tests/integration/scheduling/ScheduledMessageTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/scheduling/ScheduledMessageTest.java 2009-11-09 15:26:12 UTC (rev 8253)
+++ trunk/tests/src/org/hornetq/tests/integration/scheduling/ScheduledMessageTest.java 2009-11-09 21:20:59 UTC (rev 8254)
@@ -156,7 +156,7 @@
// Make sure no more messages
consumer.close();
consumer = session.createConsumer(atestq);
- assertNull(consumer.receive(1000));
+ assertNull(consumer.receiveImmediate());
session.close();
}
@@ -204,7 +204,7 @@
consumer.close();
consumer2.close();
consumer = session.createConsumer(atestq);
- assertNull(consumer.receive(1000));
+ assertNull(consumer.receiveImmediate());
session.close();
}
@@ -266,7 +266,7 @@
consumer.close();
consumer2.close();
consumer = session.createConsumer(atestq);
- assertNull(consumer.receive(1000));
+ assertNull(consumer.receiveImmediate());
session.close();
}
@@ -316,7 +316,7 @@
// Make sure no more messages
consumer.close();
consumer = session.createConsumer(atestq);
- assertNull(consumer.receive(1000));
+ assertNull(consumer.receiveImmediate());
session.close();
}
@@ -396,7 +396,7 @@
// Make sure no more messages
consumer.close();
consumer = session.createConsumer(atestq);
- assertNull(consumer.receive(1000));
+ assertNull(consumer.receiveImmediate());
session.close();
}
@@ -477,7 +477,7 @@
// Make sure no more messages
consumer.close();
consumer = session.createConsumer(atestq);
- assertNull(consumer.receive(1000));
+ assertNull(consumer.receiveImmediate());
session.close();
}
@@ -549,7 +549,7 @@
// Make sure no more messages
consumer.close();
consumer = session.createConsumer(atestq);
- assertNull(consumer.receive(1000));
+ assertNull(consumer.receiveImmediate());
session.close();
}
@@ -602,7 +602,7 @@
consumer.close();
// Make sure no more messages
consumer = session.createConsumer(atestq);
- assertNull(consumer.receive(1000));
+ assertNull(consumer.receiveImmediate());
session.close();
}
Modified: trunk/tests/src/org/hornetq/tests/integration/server/LVQRecoveryTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/server/LVQRecoveryTest.java 2009-11-09 15:26:12 UTC (rev 8253)
+++ trunk/tests/src/org/hornetq/tests/integration/server/LVQRecoveryTest.java 2009-11-09 21:20:59 UTC (rev 8254)
@@ -154,7 +154,7 @@
assertNotNull(m);
m.acknowledge();
assertEquals(m.getBody().readString(), "m6");
- m = consumer.receive(1000);
+ m = consumer.receiveImmediate();
assertNull(m);
}
protected void tearDown() throws Exception
Modified: trunk/tests/src/org/hornetq/tests/integration/server/LVQTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/server/LVQTest.java 2009-11-09 15:26:12 UTC (rev 8253)
+++ trunk/tests/src/org/hornetq/tests/integration/server/LVQTest.java 2009-11-09 21:20:59 UTC (rev 8254)
@@ -138,7 +138,7 @@
assertNotNull(m);
assertEquals("m2", m.getBody().readString());
m.acknowledge();
- m = consumer.receive(1000);
+ m = consumer.receiveImmediate();
assertNull(m);
}
@@ -191,7 +191,7 @@
assertNotNull(m);
m.acknowledge();
assertEquals(m.getBody().readString(), "m6");
- m = consumer.receive(1000);
+ m = consumer.receiveImmediate();
assertNull(m);
}
@@ -242,7 +242,7 @@
m.acknowledge();
assertEquals(m.getBody().readString(), "m4");
clientSessionTxReceives.commit();
- m = consumer.receive(1000);
+ m = consumer.receiveImmediate();
assertNull(m);
}
@@ -357,7 +357,7 @@
assertNotNull(m);
m.acknowledge();
assertEquals(m.getBody().readString(), "m6");
- m = consumer.receive(250);
+ m = consumer.receiveImmediate();
assertNull(m);
}
@@ -396,7 +396,7 @@
assertNotNull(m);
m.acknowledge();
assertEquals(m.getBody().readString(), "m6");
- m = consumer.receive(250);
+ m = consumer.receiveImmediate();
assertNull(m);
}
Modified: trunk/tests/src/org/hornetq/tests/integration/server/PredefinedQueueTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/server/PredefinedQueueTest.java 2009-11-09 15:26:12 UTC (rev 8253)
+++ trunk/tests/src/org/hornetq/tests/integration/server/PredefinedQueueTest.java 2009-11-09 21:20:59 UTC (rev 8254)
@@ -190,8 +190,8 @@
message.acknowledge();
}
- assertNull(consumer1.receive(200));
- assertNull(consumer2.receive(200));
+ assertNull(consumer1.receiveImmediate());
+ assertNull(consumer2.receiveImmediate());
session.close();
@@ -293,9 +293,9 @@
message.acknowledge();
}
- assertNull(consumer1.receive(200));
- assertNull(consumer2.receive(200));
- assertNull(consumer3.receive(200));
+ assertNull(consumer1.receiveImmediate());
+ assertNull(consumer2.receiveImmediate());
+ assertNull(consumer3.receiveImmediate());
session.close();
@@ -370,7 +370,7 @@
ClientConsumer consumer2 = session.createConsumer(queueName2);
- ClientMessage message = consumer1.receive(200);
+ ClientMessage message = consumer1.receiveImmediate();
assertNull(message);
@@ -382,8 +382,8 @@
message.acknowledge();
}
- assertNull(consumer1.receive(200));
- assertNull(consumer2.receive(200));
+ assertNull(consumer1.receiveImmediate());
+ assertNull(consumer2.receiveImmediate());
session.close();
@@ -450,7 +450,7 @@
message.acknowledge();
}
- assertNull(consumer1.receive(200));
+ assertNull(consumer1.receiveImmediate());
for (int i = 0; i < numMessages; i++)
{
@@ -463,7 +463,7 @@
producer.send(message);
}
- assertNull(consumer1.receive(200));
+ assertNull(consumer1.receiveImmediate());
session.close();
Modified: trunk/tests/src/org/hornetq/tests/integration/xa/BasicXaRecoveryTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/xa/BasicXaRecoveryTest.java 2009-11-09 15:26:12 UTC (rev 8253)
+++ trunk/tests/src/org/hornetq/tests/integration/xa/BasicXaRecoveryTest.java 2009-11-09 21:20:59 UTC (rev 8254)
@@ -380,7 +380,7 @@
ClientConsumer pageConsumer = clientSession.createConsumer(pageQueue);
- assertNull(pageConsumer.receive(100));
+ assertNull(pageConsumer.receiveImmediate());
long globalSize = this.server.getPostOffice().getPagingManager().getTotalMemory();
// Management message (from createQueue) will not be taken into account again as it is nonPersistent
@@ -560,8 +560,7 @@
assertEquals(xids.length, 0);
clientSession.rollback(xid);
clientSession.start();
- ClientMessage m = clientConsumer.receive(100);
- log.info("m is " + m);
+ ClientMessage m = clientConsumer.receiveImmediate();
assertNull(m);
}
@@ -746,7 +745,7 @@
clientSession.rollback(xid);
clientSession.rollback(xid2);
clientSession.start();
- ClientMessage m = clientConsumer.receive(100);
+ ClientMessage m = clientConsumer.receiveImmediate();
assertNull(m);
}
@@ -810,7 +809,7 @@
m = clientConsumer.receive(1000);
assertNotNull(m);
assertEquals(m.getBody().readString(), "m4");
- m = clientConsumer.receive(100);
+ m = clientConsumer.receiveImmediate();
assertNull(m);
}
@@ -941,7 +940,7 @@
assertEquals(xids.length, 0);
clientSession.commit(xid, true);
clientSession.start();
- m = clientConsumer.receive(100);
+ m = clientConsumer.receiveImmediate();
assertNull(m);
}
@@ -1103,7 +1102,7 @@
assertEquals(xids.length, 0);
clientSession.commit(xid, true);
clientSession.start();
- m = clientConsumer.receive(100);
+ m = clientConsumer.receiveImmediate();
assertNull(m);
}
Modified: trunk/tests/src/org/hornetq/tests/integration/xa/XaTimeoutTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/xa/XaTimeoutTest.java 2009-11-09 15:26:12 UTC (rev 8253)
+++ trunk/tests/src/org/hornetq/tests/integration/xa/XaTimeoutTest.java 2009-11-09 21:20:59 UTC (rev 8254)
@@ -153,7 +153,7 @@
assertTrue(e.errorCode == XAException.XAER_NOTA);
}
clientSession.start();
- ClientMessage m = clientConsumer.receive(500);
+ ClientMessage m = clientConsumer.receiveImmediate();
assertNull(m);
}
@@ -302,7 +302,7 @@
m.acknowledge();
assertNotNull(m);
assertEquals(m.getBody().readString(), "m4");
- m = consumer.receive(500);
+ m = consumer.receiveImmediate();
assertNull(m);
clientSession2.close();
}
@@ -377,7 +377,7 @@
m.acknowledge();
assertNotNull(m);
assertEquals(m.getBody().readString(), "m8");
- m = consumer.receive(500);
+ m = consumer.receiveImmediate();
assertNull(m);
clientSession2.close();
}
@@ -410,7 +410,7 @@
assertTrue(e.errorCode == XAException.XAER_NOTA);
}
clientSession.start();
- ClientMessage m = clientConsumer.receive(500);
+ ClientMessage m = clientConsumer.receiveImmediate();
assertNull(m);
}
@@ -513,7 +513,7 @@
session.close();
}
clientSession.start();
- ClientMessage m = clientConsumer.receive(500);
+ ClientMessage m = clientConsumer.receiveImmediate();
assertNull(m);
}
Modified: trunk/tests/src/org/hornetq/tests/stress/journal/LargeJournalStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/journal/LargeJournalStressTest.java 2009-11-09 15:26:12 UTC (rev 8253)
+++ trunk/tests/src/org/hornetq/tests/stress/journal/LargeJournalStressTest.java 2009-11-09 21:20:59 UTC (rev 8254)
@@ -241,7 +241,7 @@
cons = sess.createConsumer(Q2);
- assertNull(cons.receive(100));
+ assertNull(cons.receiveImmediate());
sess.close();
15 years, 2 months
JBoss hornetq SVN: r8253 - trunk/tests/src/org/hornetq/tests/integration/jms.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-11-09 10:26:12 -0500 (Mon, 09 Nov 2009)
New Revision: 8253
Modified:
trunk/tests/src/org/hornetq/tests/integration/jms/ManualReconnectionToSingleServerTest.java
Log:
rewrite ManualReconnectinToSingleServerTest
* check manual reconnection for consumer only
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/ManualReconnectionToSingleServerTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/ManualReconnectionToSingleServerTest.java 2009-11-09 09:47:36 UTC (rev 8252)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/ManualReconnectionToSingleServerTest.java 2009-11-09 15:26:12 UTC (rev 8253)
@@ -13,33 +13,13 @@
package org.hornetq.tests.integration.jms;
import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE;
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP;
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_ACKNOWLEDGE;
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND;
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_PERSISTENT_SEND;
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD;
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME;
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL;
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE;
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE;
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE;
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE;
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONFIRMATION_WINDOW_SIZE;
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE;
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_THREAD_POOL_MAX_SIZE;
-import static org.hornetq.core.client.impl.ClientSessionFactoryImpl.DEFAULT_USE_GLOBAL_POOLS;
-import static org.hornetq.tests.util.RandomUtil.randomString;
-import java.util.ArrayList;
import java.util.Date;
-import java.util.List;
import java.util.concurrent.CountDownLatch;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
@@ -58,10 +38,15 @@
import org.hornetq.core.server.HornetQServer;
import org.hornetq.integration.transports.netty.NettyAcceptorFactory;
import org.hornetq.integration.transports.netty.NettyConnectorFactory;
+import org.hornetq.jms.server.JMSServerManager;
+import org.hornetq.jms.server.config.ConnectionFactoryConfiguration;
+import org.hornetq.jms.server.config.JMSConfiguration;
+import org.hornetq.jms.server.config.impl.ConnectionFactoryConfigurationImpl;
+import org.hornetq.jms.server.config.impl.JMSConfigurationImpl;
+import org.hornetq.jms.server.config.impl.QueueConfigurationImpl;
import org.hornetq.jms.server.impl.JMSServerManagerImpl;
import org.hornetq.tests.unit.util.InVMContext;
import org.hornetq.tests.util.UnitTestCase;
-import org.hornetq.utils.Pair;
/**
* @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
@@ -74,34 +59,22 @@
private Connection connection;
- private Session session;
-
- private MessageProducer producer;
-
private MessageConsumer consumer;
private CountDownLatch exceptionLatch = new CountDownLatch(1);
private CountDownLatch reconnectionLatch = new CountDownLatch(1);
- private volatile boolean afterRestart = false;
+ private CountDownLatch allMessagesReceived = new CountDownLatch(1);
- private volatile boolean receivedMessagesAfterRestart = false;
-
- private int callTimeout;
+ private JMSServerManager serverManager;
- private MessageListener listener = new MessageListener()
- {
- public void onMessage(Message msg)
- {
- if (afterRestart)
- {
- receivedMessagesAfterRestart = true;
- }
- log.info(receivedMessagesAfterRestart + " " + msg);
- }
- };
+ private InVMContext context;
+ private final String queueName = "ManualReconnectionToSingleServerTest.queue";
+
+ private final int num = 20;
+
private ExceptionListener exceptionListener = new ExceptionListener()
{
public void onException(JMSException e)
@@ -113,14 +86,8 @@
}
};
- private HornetQServer server;
+ private Listener listener;
- private JMSServerManagerImpl serverManager;
-
- private InVMContext context;
-
- private final String queueName = randomString();
-
// Static --------------------------------------------------------
// Attributes ----------------------------------------------------
@@ -133,48 +100,48 @@
public void testExceptionListener() throws Exception
{
- long start = System.currentTimeMillis();
-
connect();
- int num = 20;
+ ConnectionFactory cf = (ConnectionFactory)context.lookup("/cf");
+ Destination dest = (Destination)context.lookup(queueName);
+ Connection conn = cf.createConnection();
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer prod = sess.createProducer(dest);
+
for (int i = 0; i < num; i++)
{
- try
- {
- Message message = session.createTextMessage((new Date()).toString());
- producer.send(message);
- Thread.sleep(500);
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
+ Message message = sess.createTextMessage((new Date()).toString());
+ message.setIntProperty("counter", i + 1);
+ prod.send(message);
if (i == num / 2)
{
- killServer();
+ conn.close();
+ serverManager.stop();
Thread.sleep(5000);
- startServer();
- afterRestart = true;
+ serverManager.start();
+ cf = (ConnectionFactory)context.lookup("/cf");
+ dest = (Destination)context.lookup(queueName);
+ conn = cf.createConnection();
+ sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ prod = sess.createProducer(dest);
}
}
+ conn.close();
+
boolean gotException = exceptionLatch.await(10, SECONDS);
assertTrue(gotException);
boolean clientReconnected = reconnectionLatch.await(10, SECONDS);
+
assertTrue("client did not reconnect after server was restarted", clientReconnected);
- assertTrue(receivedMessagesAfterRestart);
+ boolean gotAllMessages = allMessagesReceived.await(10, SECONDS);
+ assertTrue(gotAllMessages);
+
connection.close();
-
- long end = System.currentTimeMillis();
-
- log.info("That took " + (end - start));
-
- //Make sure it doesn't pass by just timing out on blocking send
- assertTrue(end - start < callTimeout);
+
}
// Package protected ---------------------------------------------
@@ -190,26 +157,42 @@
conf.setSecurityEnabled(false);
conf.setJMXManagementEnabled(true);
conf.getAcceptorConfigurations().add(new TransportConfiguration(NettyAcceptorFactory.class.getName()));
- server = HornetQ.newHornetQServer(conf, false);
- server.start();
+ HornetQServer server = HornetQ.newHornetQServer(conf, false);
- serverManager = new JMSServerManagerImpl(server);
- startServer();
+ JMSConfiguration configuration = new JMSConfigurationImpl();
+ context = new InVMContext();
+ configuration.setContext(context);
+ configuration.getQueueConfigurations().add(new QueueConfigurationImpl(queueName, null, false, queueName));
+
+ ConnectionFactoryConfiguration cfConfig = new ConnectionFactoryConfigurationImpl("cf",
+ new TransportConfiguration(NettyConnectorFactory.class.getName()),
+ "/cf");
+ cfConfig.setRetryInterval(1000);
+ cfConfig.setRetryIntervalMultiplier(1.0);
+ cfConfig.setReconnectAttempts(-1);
+ cfConfig.setFailoverOnServerShutdown(true);
+ configuration.getConnectionFactoryConfigurations().add(cfConfig);
+ serverManager = new JMSServerManagerImpl(server, configuration);
+ serverManager.start();
+
+ listener = new Listener();
+
+ exceptionLatch = new CountDownLatch(1);
+ reconnectionLatch = new CountDownLatch(1);
+ allMessagesReceived = new CountDownLatch(1);
}
@Override
protected void tearDown() throws Exception
- {
- server.stop();
-
- server = null;
-
+ {
+ serverManager.stop();
+
serverManager = null;
-
+
connection = null;
super.tearDown();
-
+
System.gc();
}
@@ -217,68 +200,6 @@
// Inner classes -------------------------------------------------
- private void startServer() throws Exception
- {
- serverManager.start();
- serverManager.activated();
- context = new InVMContext();
- serverManager.setContext(context);
- serverManager.createQueue(queueName, queueName, null, false);
- registerConnectionFactory();
- }
-
- private void killServer() throws Exception
- {
- context = null;
- serverManager.stop();
- }
-
- private void registerConnectionFactory() throws Exception
- {
- int retryInterval = 1000;
- double retryIntervalMultiplier = 1.0;
- int reconnectAttempts = -1;
- boolean failoverOnServerShutdown = true;
- callTimeout = 30000;
-
- List<Pair<TransportConfiguration, TransportConfiguration>> connectorConfigs = new ArrayList<Pair<TransportConfiguration, TransportConfiguration>>();
- connectorConfigs.add(new Pair<TransportConfiguration, TransportConfiguration>(new TransportConfiguration(NettyConnectorFactory.class.getName()),
- null));
-
- List<String> jndiBindings = new ArrayList<String>();
- jndiBindings.add("/cf");
-
- serverManager.createConnectionFactory("ManualReconnectionToSingleServerTest",
- connectorConfigs,
- null,
- DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
- DEFAULT_CONNECTION_TTL,
- callTimeout,
- DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
- DEFAULT_MIN_LARGE_MESSAGE_SIZE,
- DEFAULT_CONSUMER_WINDOW_SIZE,
- DEFAULT_CONSUMER_MAX_RATE,
- DEFAULT_CONFIRMATION_WINDOW_SIZE,
- DEFAULT_PRODUCER_MAX_RATE,
- DEFAULT_BLOCK_ON_ACKNOWLEDGE,
- DEFAULT_BLOCK_ON_PERSISTENT_SEND,
- DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND,
- DEFAULT_AUTO_GROUP,
- DEFAULT_PRE_ACKNOWLEDGE,
- DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME,
- DEFAULT_ACK_BATCH_SIZE,
- DEFAULT_ACK_BATCH_SIZE,
- DEFAULT_USE_GLOBAL_POOLS,
- DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
- DEFAULT_THREAD_POOL_MAX_SIZE,
- retryInterval,
- retryIntervalMultiplier,
- 1000,
- reconnectAttempts,
- failoverOnServerShutdown,
- jndiBindings);
- }
-
protected void disconnect()
{
log.info("calling disconnect");
@@ -294,6 +215,7 @@
log.info("closing the connection");
connection.close();
connection = null;
+ log.info("connection closed");
}
catch (Exception e)
{
@@ -314,7 +236,7 @@
Queue queue;
ConnectionFactory cf;
while (true)
- {
+ {
try
{
queue = (Queue)initialContext.lookup(queueName);
@@ -323,19 +245,16 @@
}
catch (Exception e)
{
- //retry until server is up
+ // retry until server is up
Thread.sleep(100);
}
}
connection = cf.createConnection();
connection.setExceptionListener(exceptionListener);
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- producer = session.createProducer(queue);
- System.out.println("creating consumer");
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
consumer = session.createConsumer(queue);
consumer.setMessageListener(listener);
connection.start();
- System.out.println("started new connection");
}
catch (Exception e)
{
@@ -352,4 +271,28 @@
}
}
}
+
+ private class Listener implements MessageListener
+ {
+ private int count = 0;
+
+ public void onMessage(Message msg)
+ {
+ count++;
+
+ try
+ {
+ int counter = msg.getIntProperty("counter");
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace();
+ }
+ if (count == num)
+ {
+ allMessagesReceived.countDown();
+ }
+ }
+ };
+
}
15 years, 2 months
JBoss hornetq SVN: r8252 - trunk/tests/src/org/hornetq/tests/integration/divert.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-11-09 04:47:36 -0500 (Mon, 09 Nov 2009)
New Revision: 8252
Modified:
trunk/tests/src/org/hornetq/tests/integration/divert/DivertTest.java
Log:
increased receive timeout
* test failed because a long Full GC while calling consumer.receive(200)
Modified: trunk/tests/src/org/hornetq/tests/integration/divert/DivertTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/divert/DivertTest.java 2009-11-09 09:46:10 UTC (rev 8251)
+++ trunk/tests/src/org/hornetq/tests/integration/divert/DivertTest.java 2009-11-09 09:47:36 UTC (rev 8252)
@@ -44,6 +44,8 @@
{
private static final Logger log = Logger.getLogger(DivertTest.class);
+ private static final int TIMEOUT = 500;
+
public void testSingleNonExclusiveDivert() throws Exception
{
Configuration conf = createDefaultConfig();
@@ -107,7 +109,7 @@
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = consumer1.receive(200);
+ ClientMessage message = consumer1.receive(TIMEOUT);
assertNotNull(message);
@@ -116,11 +118,11 @@
message.acknowledge();
}
- assertNull(consumer1.receive(200));
+ assertNull(consumer1.receive(500));
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = consumer2.receive(200);
+ ClientMessage message = consumer2.receive(TIMEOUT);
assertNotNull(message);
@@ -129,7 +131,7 @@
message.acknowledge();
}
- assertNull(consumer2.receive(200));
+ assertNull(consumer2.receive(TIMEOUT));
session.close();
@@ -213,7 +215,7 @@
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = consumer1.receive(200);
+ ClientMessage message = consumer1.receive(TIMEOUT);
assertNotNull(message);
@@ -222,11 +224,11 @@
message.acknowledge();
}
- assertNull(consumer1.receive(200));
+ assertNull(consumer1.receiveImmediate());
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = consumer2.receive(200);
+ ClientMessage message = consumer2.receive(TIMEOUT);
assertNotNull(message);
@@ -235,11 +237,11 @@
message.acknowledge();
}
- assertNull(consumer2.receive(200));
+ assertNull(consumer2.receiveImmediate());
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = consumer3.receive(200);
+ ClientMessage message = consumer3.receive(TIMEOUT);
assertNotNull(message);
@@ -248,11 +250,11 @@
message.acknowledge();
}
- assertNull(consumer3.receive(200));
+ assertNull(consumer3.receiveImmediate());
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = consumer4.receive(200);
+ ClientMessage message = consumer4.receive(TIMEOUT);
assertNotNull(message);
@@ -261,7 +263,7 @@
message.acknowledge();
}
- assertNull(consumer4.receive(200));
+ assertNull(consumer4.receiveImmediate());
session.close();
@@ -327,7 +329,7 @@
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = consumer1.receive(200);
+ ClientMessage message = consumer1.receive(TIMEOUT);
assertNotNull(message);
@@ -336,7 +338,7 @@
message.acknowledge();
}
- assertNull(consumer1.receive(200));
+ assertNull(consumer1.receiveImmediate());
session.close();
@@ -418,7 +420,7 @@
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = consumer1.receive(200);
+ ClientMessage message = consumer1.receive(TIMEOUT);
assertNotNull(message);
@@ -427,13 +429,13 @@
message.acknowledge();
}
- assertNull(consumer1.receive(200));
+ assertNull(consumer1.receiveImmediate());
- assertNull(consumer2.receive(200));
+ assertNull(consumer2.receiveImmediate());
- assertNull(consumer3.receive(200));
+ assertNull(consumer3.receiveImmediate());
- assertNull(consumer4.receive(200));
+ assertNull(consumer4.receiveImmediate());
session.close();
@@ -537,7 +539,7 @@
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = consumer1.receive(200);
+ ClientMessage message = consumer1.receive(TIMEOUT);
assertNotNull(message);
@@ -546,11 +548,11 @@
message.acknowledge();
}
- assertNull(consumer1.receive(200));
+ assertNull(consumer1.receiveImmediate());
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = consumer2.receive(200);
+ ClientMessage message = consumer2.receive(TIMEOUT);
assertNotNull(message);
@@ -559,11 +561,11 @@
message.acknowledge();
}
- assertNull(consumer2.receive(200));
+ assertNull(consumer2.receiveImmediate());
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = consumer3.receive(200);
+ ClientMessage message = consumer3.receive(TIMEOUT);
assertNotNull(message);
@@ -572,11 +574,11 @@
message.acknowledge();
}
- assertNull(consumer3.receive(200));
+ assertNull(consumer3.receiveImmediate());
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = consumer4.receive(200);
+ ClientMessage message = consumer4.receive(TIMEOUT);
assertNotNull(message);
@@ -585,7 +587,7 @@
message.acknowledge();
}
- assertNull(consumer4.receive(200));
+ assertNull(consumer4.receiveImmediate());
session.close();
@@ -689,7 +691,7 @@
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = consumer1.receive(200);
+ ClientMessage message = consumer1.receive(TIMEOUT);
assertNotNull(message);
@@ -698,11 +700,11 @@
message.acknowledge();
}
- assertNull(consumer1.receive(200));
+ assertNull(consumer1.receiveImmediate());
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = consumer2.receive(200);
+ ClientMessage message = consumer2.receive(TIMEOUT);
assertNotNull(message);
@@ -711,11 +713,11 @@
message.acknowledge();
}
- assertNull(consumer2.receive(200));
+ assertNull(consumer2.receiveImmediate());
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = consumer3.receive(200);
+ ClientMessage message = consumer3.receive(TIMEOUT);
assertNotNull(message);
@@ -724,7 +726,7 @@
message.acknowledge();
}
- assertNull(consumer3.receive(200));
+ assertNull(consumer3.receiveImmediate());
assertNull(consumer4.receive(200));
@@ -830,7 +832,7 @@
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = consumer1.receive(200);
+ ClientMessage message = consumer1.receive(TIMEOUT);
assertNotNull(message);
@@ -839,11 +841,11 @@
message.acknowledge();
}
- assertNull(consumer1.receive(200));
+ assertNull(consumer1.receiveImmediate());
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = consumer2.receive(200);
+ ClientMessage message = consumer2.receive(TIMEOUT);
assertNotNull(message);
@@ -852,11 +854,11 @@
message.acknowledge();
}
- assertNull(consumer2.receive(200));
+ assertNull(consumer2.receiveImmediate());
- assertNull(consumer3.receive(200));
+ assertNull(consumer3.receiveImmediate());
- assertNull(consumer4.receive(200));
+ assertNull(consumer4.receiveImmediate());
session.close();
@@ -974,11 +976,11 @@
// message.acknowledge();
// }
- assertNull(consumer1.receive(200));
+ assertNull(consumer1.receiveImmediate());
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = consumer2.receive(200);
+ ClientMessage message = consumer2.receive(TIMEOUT);
assertNotNull(message);
@@ -987,11 +989,11 @@
message.acknowledge();
}
- assertNull(consumer2.receive(200));
+ assertNull(consumer2.receiveImmediate());
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = consumer3.receive(200);
+ ClientMessage message = consumer3.receive(TIMEOUT);
assertNotNull(message);
@@ -1000,11 +1002,11 @@
message.acknowledge();
}
- assertNull(consumer3.receive(200));
+ assertNull(consumer3.receiveImmediate());
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = consumer4.receive(200);
+ ClientMessage message = consumer4.receive(TIMEOUT);
assertNotNull(message);
@@ -1013,7 +1015,7 @@
message.acknowledge();
}
- assertNull(consumer4.receive(200));
+ assertNull(consumer4.receiveImmediate());
for (int i = 0; i < numMessages; i++)
{
@@ -1028,7 +1030,7 @@
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = consumer1.receive(200);
+ ClientMessage message = consumer1.receive(TIMEOUT);
assertNotNull(message);
@@ -1037,13 +1039,13 @@
message.acknowledge();
}
- assertNull(consumer1.receive(200));
+ assertNull(consumer1.receiveImmediate());
- assertNull(consumer2.receive(200));
+ assertNull(consumer2.receiveImmediate());
- assertNull(consumer3.receive(200));
+ assertNull(consumer3.receiveImmediate());
- assertNull(consumer4.receive(200));
+ assertNull(consumer4.receiveImmediate());
session.close();
@@ -1147,7 +1149,7 @@
for (int i = 0; i < numMessages;)
{
- ClientMessage message = consumer1.receive(200);
+ ClientMessage message = consumer1.receive(TIMEOUT);
assertNotNull(message);
@@ -1162,7 +1164,7 @@
break;
}
- message = consumer2.receive(200);
+ message = consumer2.receive(TIMEOUT);
assertNotNull(message);
@@ -1177,7 +1179,7 @@
break;
}
- message = consumer3.receive(200);
+ message = consumer3.receive(TIMEOUT);
assertNotNull(message);
@@ -1188,13 +1190,13 @@
i++;
}
- assertNull(consumer1.receive(200));
- assertNull(consumer2.receive(200));
- assertNull(consumer3.receive(200));
+ assertNull(consumer1.receiveImmediate());
+ assertNull(consumer2.receiveImmediate());
+ assertNull(consumer3.receiveImmediate());
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = consumer4.receive(200);
+ ClientMessage message = consumer4.receive(TIMEOUT);
assertNotNull(message);
@@ -1203,7 +1205,7 @@
message.acknowledge();
}
- assertNull(consumer4.receive(200));
+ assertNull(consumer4.receiveImmediate());
session.close();
@@ -1309,7 +1311,7 @@
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = consumer1.receive(200);
+ ClientMessage message = consumer1.receive(TIMEOUT);
assertNotNull(message);
@@ -1318,13 +1320,13 @@
message.acknowledge();
}
- assertNull(consumer1.receive(200));
+ assertNull(consumer1.receiveImmediate());
- assertNull(consumer2.receive(200));
+ assertNull(consumer2.receiveImmediate());
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = consumer3.receive(200);
+ ClientMessage message = consumer3.receive(TIMEOUT);
assertNotNull(message);
@@ -1333,11 +1335,11 @@
message.acknowledge();
}
- assertNull(consumer3.receive(200));
+ assertNull(consumer3.receiveImmediate());
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = consumer4.receive(200);
+ ClientMessage message = consumer4.receive(TIMEOUT);
assertNotNull(message);
@@ -1346,7 +1348,7 @@
message.acknowledge();
}
- assertNull(consumer4.receive(200));
+ assertNull(consumer4.receiveImmediate());
session.close();
15 years, 2 months