[jboss-cvs] JBoss Messaging SVN: r2034 - in trunk: src/etc/xmdesc and 8 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Jan 24 11:35:14 EST 2007
Author: timfox
Date: 2007-01-24 11:35:14 -0500 (Wed, 24 Jan 2007)
New Revision: 2034
Modified:
trunk/src/etc/aop-messaging-client.xml
trunk/src/etc/xmdesc/ConnectionFactory-xmbean.xml
trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java
trunk/src/main/org/jboss/jms/server/bridge/Bridge.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
trunk/src/main/org/jboss/jms/tx/ResourceManager.java
trunk/tests/build.xml
trunk/tests/etc/log4j.xml
trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeMBeanTest.java
trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTest.java
trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTestBase.java
trunk/tests/src/org/jboss/test/messaging/jms/bridge/ReconnectTest.java
trunk/tests/src/org/jboss/test/messaging/jms/message/JMSXDeliveryCountTest.java
Log:
Few fixes
Modified: trunk/src/etc/aop-messaging-client.xml
===================================================================
--- trunk/src/etc/aop-messaging-client.xml 2007-01-24 13:49:49 UTC (rev 2033)
+++ trunk/src/etc/aop-messaging-client.xml 2007-01-24 16:35:14 UTC (rev 2034)
@@ -25,7 +25,7 @@
-->
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientClusteredConnectionFactoryDelegate->$implementing{org.jboss.jms.delegate.ConnectionFactoryDelegate}(..))">
- <interceptor-ref name="org.jboss.jms.client.container.ClientLogInterceptor"/>
+ <interceptor-ref name="org.jboss.jms.client.container.ClientLogInterceptor"/>
<interceptor-ref name="org.jboss.jms.client.container.ExceptionInterceptor"/>
</bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientClusteredConnectionFactoryDelegate->createConnectionDelegate(..))">
Modified: trunk/src/etc/xmdesc/ConnectionFactory-xmbean.xml
===================================================================
--- trunk/src/etc/xmdesc/ConnectionFactory-xmbean.xml 2007-01-24 13:49:49 UTC (rev 2033)
+++ trunk/src/etc/xmdesc/ConnectionFactory-xmbean.xml 2007-01-24 16:35:14 UTC (rev 2034)
@@ -41,30 +41,30 @@
<name>JNDIBindings</name>
<type>org.w3c.dom.Element</type>
</attribute>
-
+
<attribute access="read-write" getMethod="getPrefetchSize" setMethod="setPrefetchSize">
<description>The maximum number of messages that will be prefetched by the client side consumer</description>
<name>PrefetchSize</name>
- <type>int</type>
+ <type>int</type>
</attribute>
-
+
<attribute access="read-write" getMethod="getDefaultTempQueueFullSize" setMethod="setDefaultTempQueueFullSize">
<description>The default value of paging full size for any temporary queues created for connections from this connection factory</description>
<name>DefaultTempQueueFullSize</name>
<type>int</type>
- </attribute>
-
+ </attribute>
+
<attribute access="read-write" getMethod="getDefaultTempQueuePageSize" setMethod="setDefaultTempQueuePageSize">
<description>The default value of paging page size for any temporary queues created for connections from this connection factory</description>
<name>DefaultTempQueuePageSize</name>
<type>int</type>
</attribute>
-
+
<attribute access="read-write" getMethod="getDefaultTempQueueDownCacheSize" setMethod="setDefaultTempQueueDownCacheSize">
<description>The default value of paging down cache size for any temporary queues created for connections from this connection factory</description>
<name>DefaultTempQueueDownCacheSize</name>
<type>int</type>
- </attribute>
+ </attribute>
<!-- ServerPeer ObjectName is configured as a dependency optional-attribute-name, this is the
only reason for this attribute to be writable. Any write attempt on this attribute after
@@ -75,13 +75,13 @@
<name>ServerPeer</name>
<type>javax.management.ObjectName</type>
</attribute>
-
+
<attribute access="read-write" getMethod="getConnector" setMethod="setConnector">
<description>The ObjectName of the remoting connector this destination will use</description>
<name>Connector</name>
<type>javax.management.ObjectName</type>
</attribute>
-
+
<attribute access="read-write" getMethod="isClustered" setMethod="setClustered">
<description>Is this a clustered connection factory?</description>
<name>Clustered</name>
@@ -115,5 +115,5 @@
<description>JBoss Service lifecycle operation</description>
<name>destroy</name>
</operation>
-
+
</mbean>
\ No newline at end of file
Modified: trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java 2007-01-24 13:49:49 UTC (rev 2033)
+++ trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java 2007-01-24 16:35:14 UTC (rev 2034)
@@ -116,6 +116,8 @@
MessagingMarshallable request = new MessagingMarshallable(version, invocation);
// select invocations ought to be sent "one way" for increased performance
+
+ //TODO polymorphism: shouldn't this be ClientSessionDelegate::invoke rather than the super class??
if ("changeRate".equals(methodName))
{
if (trace) { log.trace(this + " invoking " + methodName + "(..) asynchronously on server"); }
Modified: trunk/src/main/org/jboss/jms/server/bridge/Bridge.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/bridge/Bridge.java 2007-01-24 13:49:49 UTC (rev 2033)
+++ trunk/src/main/org/jboss/jms/server/bridge/Bridge.java 2007-01-24 16:35:14 UTC (rev 2034)
@@ -353,23 +353,44 @@
if (trace) { log.trace("Checker thread has finished"); }
}
- sourceConn.close();
-
- if (targetConn != null)
- {
- targetConn.close();
- }
-
if (tx != null)
{
//Terminate any transaction
if (trace) { log.trace("Rolling back remaining tx"); }
- tx.rollback();
+ try
+ {
+ tx.rollback();
+ }
+ catch (Exception ignore)
+ {
+ if (trace) { log.trace("Failed to rollback", ignore); }
+ }
if (trace) { log.trace("Rolled back remaining tx"); }
}
+ try
+ {
+ sourceConn.close();
+ }
+ catch (Exception ignore)
+ {
+ if (trace) { log.trace("Failed to close source conn", ignore); }
+ }
+
+ if (targetConn != null)
+ {
+ try
+ {
+ targetConn.close();
+ }
+ catch (Exception ignore)
+ {
+ if (trace) { log.trace("Failed to close target conn", ignore); }
+ }
+ }
+
if (trace) { log.trace("Stopped " + this); }
}
@@ -737,8 +758,12 @@
TransactionManager tm = getTm();
+ //Set timeout to a large value since we do not want to time out while waiting for messages
+ //to arrive - 10 years should be enough
+ tm.setTransactionTimeout(60 * 60 * 24 * 365 * 10);
+
tm.begin();
-
+
Transaction tx = tm.getTransaction();
//Remove the association between current thread - we don't want it
@@ -1185,7 +1210,7 @@
if (trace) { log.trace("Committing JTA transaction"); }
tx.commit();
-
+
if (trace) { log.trace("Committed JTA transaction"); }
tx = startTx();
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-01-24 13:49:49 UTC (rev 2033)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-01-24 16:35:14 UTC (rev 2034)
@@ -1101,6 +1101,8 @@
}
//Need to prompt delivery on the dlq/expiry queue
+
+ //TODO - are we sure this is the right place to prompt delivery?
if (queue != null)
{
queue.deliver(false);
Modified: trunk/src/main/org/jboss/jms/tx/ResourceManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/tx/ResourceManager.java 2007-01-24 13:49:49 UTC (rev 2033)
+++ trunk/src/main/org/jboss/jms/tx/ResourceManager.java 2007-01-24 16:35:14 UTC (rev 2034)
@@ -390,7 +390,7 @@
void endTx(Xid xid, boolean success) throws XAException
{
if (trace) { log.trace("ending " + xid + ", success=" + success); }
-
+
ClientTransaction state = getTxInternal(xid);
if (state == null)
@@ -494,7 +494,7 @@
Xid startTx(Xid xid) throws XAException
{
if (trace) { log.trace("starting " + xid); }
-
+
ClientTransaction state = getTxInternal(xid);
if (state != null)
Modified: trunk/tests/build.xml
===================================================================
--- trunk/tests/build.xml 2007-01-24 13:49:49 UTC (rev 2033)
+++ trunk/tests/build.xml 2007-01-24 16:35:14 UTC (rev 2034)
@@ -353,10 +353,13 @@
<antcall target="crash-tests"/>
<antcall target="invm-tests"/>
<antcall target="remote-tests"/> <!-- default remoting configuration (socket) -->
+
+ <!--
<antcall target="clustering-tests"/>
<antcall target="remote-tests">
<param name="test.remoting" value="http"/>
</antcall>
+ -->
</target>
<target name="http-tests" depends="tests-jar, prepare-testdirs, clear-test-logs">
Modified: trunk/tests/etc/log4j.xml
===================================================================
--- trunk/tests/etc/log4j.xml 2007-01-24 13:49:49 UTC (rev 2033)
+++ trunk/tests/etc/log4j.xml 2007-01-24 16:35:14 UTC (rev 2034)
@@ -47,7 +47,7 @@
</category>
<category name="org.jboss.remoting">
- <priority value="INFO"/>
+ <priority value="DEBUG"/>
</category>
<category name="org.jboss">
Modified: trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeMBeanTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeMBeanTest.java 2007-01-24 13:49:49 UTC (rev 2033)
+++ trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeMBeanTest.java 2007-01-24 16:35:14 UTC (rev 2034)
@@ -38,6 +38,7 @@
import org.jboss.jms.server.bridge.Bridge;
import org.jboss.logging.Logger;
import org.jboss.test.messaging.tools.ServerManagement;
+import org.jboss.test.messaging.tools.jmx.ServiceContainer;
/**
* A BridgeMBeanTest
@@ -53,7 +54,6 @@
{
private static final Logger log = Logger.getLogger(BridgeMBeanTest.class);
-
public BridgeMBeanTest(String name)
{
super(name);
@@ -63,12 +63,15 @@
{
nodeCount = 3;
+ useArjuna = true;
+
super.setUp();
}
protected void tearDown() throws Exception
{
super.tearDown();
+
}
public void testStopStartPauseResume() throws Exception
Modified: trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTest.java 2007-01-24 13:49:49 UTC (rev 2033)
+++ trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTest.java 2007-01-24 16:35:14 UTC (rev 2034)
@@ -57,8 +57,6 @@
{
private static final Logger log = Logger.getLogger(BridgeTest.class);
- private static final int NODE_COUNT = 2;
-
public BridgeTest(String name)
{
super(name);
@@ -73,6 +71,7 @@
{
super.tearDown();
}
+
// MaxBatchSize but no MaxBatchTime
@@ -536,30 +535,12 @@
return;
}
+ Bridge bridge = null;
+
try
{
- ServerManagement.deployQueue("sourceQueue", 0);
-
- ServerManagement.deployQueue("destQueue", 1);
-
- Hashtable props0 = ServerManagement.getJNDIEnvironment(0);
-
- Hashtable props1 = ServerManagement.getJNDIEnvironment(1);
-
- ConnectionFactoryFactory cff0 = new JNDIConnectionFactoryFactory(props0, "/ConnectionFactory");
-
- ConnectionFactoryFactory cff1 = new JNDIConnectionFactoryFactory(props1, "/ConnectionFactory");
-
- InitialContext ic0 = new InitialContext(props0);
-
- InitialContext ic1 = new InitialContext(props1);
-
- Queue sourceQueue = (Queue)ic0.lookup("/queue/sourceQueue");
-
- Queue destQueue = (Queue)ic1.lookup("/queue/destQueue");
-
- Bridge bridge;
-
+ setUpAdministeredObjects();
+
int qosMode = Bridge.QOS_AT_MOST_ONCE;
int batchSize = 10;
@@ -716,23 +697,10 @@
}
finally
{
- try
+ if (bridge != null)
{
- ServerManagement.undeployQueue("sourceQueue", 0);
+ bridge.stop();
}
- catch (Exception e)
- {
- log.error("Failed to undeploy", e);
- }
-
- try
- {
- ServerManagement.undeployQueue("destQueue", 1);
- }
- catch (Exception e)
- {
- log.error("Failed to undeploy", e);
- }
}
}
@@ -743,40 +711,18 @@
return;
}
+ Bridge bridge = null;
+
Connection connSource = null;
- Connection connDest = null;
-
- Bridge bridge = null;
+ Connection connTarget = null;
try
{
- ServerManagement.deployQueue("sourceQueue", 0);
+ setUpAdministeredObjects();
- ServerManagement.deployQueue("destQueue", 1);
+ final int NUM_MESSAGES = 10;
- Hashtable props0 = ServerManagement.getJNDIEnvironment(0);
-
- Hashtable props1 = ServerManagement.getJNDIEnvironment(1);
-
- ConnectionFactoryFactory cff0 = new JNDIConnectionFactoryFactory(props0, "/ConnectionFactory");
-
- ConnectionFactoryFactory cff1 = new JNDIConnectionFactoryFactory(props1, "/ConnectionFactory");
-
- InitialContext ic0 = new InitialContext(props0);
-
- InitialContext ic1 = new InitialContext(props1);
-
- ConnectionFactory cf0 = (ConnectionFactory)ic0.lookup("/ConnectionFactory");
-
- ConnectionFactory cf1 = (ConnectionFactory)ic1.lookup("/ConnectionFactory");
-
- Queue sourceQueue = (Queue)ic0.lookup("/queue/sourceQueue");
-
- Queue destQueue = (Queue)ic1.lookup("/queue/destQueue");
-
- final int BATCH_SIZE = 10;
-
String selector = "vegetable='radish'";
bridge = new Bridge(cff0, cff1, sourceQueue, destQueue,
@@ -789,19 +735,15 @@
connSource = cf0.createConnection();
- connDest = cf1.createConnection();
-
Session sessSend = connSource.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer prod = sessSend.createProducer(sourceQueue);
- //Send half the messges
-
- for (int i = 0; i < BATCH_SIZE; i++)
+ for (int i = 0; i < NUM_MESSAGES; i++)
{
TextMessage tm = sessSend.createTextMessage("message" + i);
- if (i >= BATCH_SIZE / 2)
+ if (i >= NUM_MESSAGES / 2)
{
tm.setStringProperty("vegetable", "radish");
}
@@ -813,13 +755,15 @@
prod.send(tm);
}
- Session sessRec = connDest.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ connTarget = cf1.createConnection();
+ Session sessRec = connTarget.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
MessageConsumer cons = sessRec.createConsumer(destQueue);
- connDest.start();
+ connTarget.start();
- for (int i = BATCH_SIZE / 2 ; i < BATCH_SIZE; i++)
+ for (int i = NUM_MESSAGES / 2 ; i < NUM_MESSAGES; i++)
{
TextMessage tm = (TextMessage)cons.receive(1000);
@@ -847,11 +791,11 @@
}
}
- if (connDest != null)
+ if (connTarget != null)
{
try
{
- connDest.close();
+ connTarget.close();
}
catch (Exception e)
{
@@ -891,10 +835,6 @@
return;
}
- Connection connSource = null;
-
- Connection connDest = null;
-
Bridge bridge = null;
Transaction toResume = null;
@@ -912,32 +852,10 @@
started = mgr.getTransaction();
- ServerManagement.deployTopic("sourceTopic", 0);
+ setUpAdministeredObjects();
- ServerManagement.deployQueue("destQueue", 1);
+ final int NUM_MESSAGES = 10;
- Hashtable props0 = ServerManagement.getJNDIEnvironment(0);
-
- Hashtable props1 = ServerManagement.getJNDIEnvironment(1);
-
- ConnectionFactoryFactory cff0 = new JNDIConnectionFactoryFactory(props0, "/ConnectionFactory");
-
- ConnectionFactoryFactory cff1 = new JNDIConnectionFactoryFactory(props1, "/ConnectionFactory");
-
- InitialContext ic0 = new InitialContext(props0);
-
- InitialContext ic1 = new InitialContext(props1);
-
- ConnectionFactory cf0 = (ConnectionFactory)ic0.lookup("/ConnectionFactory");
-
- ConnectionFactory cf1 = (ConnectionFactory)ic1.lookup("/ConnectionFactory");
-
- Topic sourceTopic = (Topic)ic0.lookup("/topic/sourceTopic");
-
- Queue destQueue = (Queue)ic1.lookup("/queue/destQueue");
-
- final int BATCH_SIZE = 10;
-
bridge = new Bridge(cff0, cff1, sourceTopic, destQueue,
null, null, null, null,
null, 5000, 10, Bridge.QOS_AT_MOST_ONCE,
@@ -945,41 +863,13 @@
null, null);
bridge.start();
-
- connSource = cf0.createConnection();
- connDest = cf1.createConnection();
-
- Session sessSend = connSource.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageProducer prod = sessSend.createProducer(sourceTopic);
-
- for (int i = 0; i < BATCH_SIZE; i++)
- {
- TextMessage tm = sessSend.createTextMessage("message" + i);
-
- prod.send(tm);
- }
-
- Session sessRec = connDest.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageConsumer cons = sessRec.createConsumer(destQueue);
-
- connDest.start();
-
- for (int i = 0 ; i < BATCH_SIZE; i++)
- {
- TextMessage tm = (TextMessage)cons.receive(1000);
+ this.sendMessages(cf0, sourceTopic, 0, NUM_MESSAGES, false);
- assertNotNull(tm);
-
- assertEquals("message" + i, tm.getText());
- }
+ this.checkAllMessageReceivedInOrder(cf1, destQueue, 0, NUM_MESSAGES);
- Message m = cons.receive(1000);
-
- assertNull(m);
-
+ this.checkNoneReceived(cf1, destQueue);
+
}
finally
{
@@ -1005,54 +895,11 @@
{
log.error("Failed to resume", e);
}
- }
-
- if (connSource != null)
- {
- try
- {
- connSource.close();
- }
- catch (Exception e)
- {
- log.error("Failed to close connection", e);
- }
- }
-
- if (connDest != null)
- {
- try
- {
- connDest.close();
- }
- catch (Exception e)
- {
- log.error("Failed to close connection", e);
- }
- }
-
+ }
if (bridge != null)
{
bridge.stop();
- }
-
- try
- {
- ServerManagement.undeployTopic("sourceTopic", 0);
- }
- catch (Exception e)
- {
- log.error("Failed to undeploy", e);
- }
-
- try
- {
- ServerManagement.undeployQueue("destQueue", 1);
- }
- catch (Exception e)
- {
- log.error("Failed to undeploy", e);
- }
+ }
}
}
@@ -1063,40 +910,14 @@
return;
}
- Connection connSource = null;
-
- Connection connDest = null;
-
Bridge bridge = null;
try
{
- ServerManagement.deployTopic("sourceTopic", 0);
+ this.setUpAdministeredObjects();
- ServerManagement.deployQueue("destQueue", 1);
+ final int NUM_MESSAGES = 10;
- Hashtable props0 = ServerManagement.getJNDIEnvironment(0);
-
- Hashtable props1 = ServerManagement.getJNDIEnvironment(1);
-
- ConnectionFactoryFactory cff0 = new JNDIConnectionFactoryFactory(props0, "/ConnectionFactory");
-
- ConnectionFactoryFactory cff1 = new JNDIConnectionFactoryFactory(props1, "/ConnectionFactory");
-
- InitialContext ic0 = new InitialContext(props0);
-
- InitialContext ic1 = new InitialContext(props1);
-
- ConnectionFactory cf0 = (ConnectionFactory)ic0.lookup("/ConnectionFactory");
-
- ConnectionFactory cf1 = (ConnectionFactory)ic1.lookup("/ConnectionFactory");
-
- Topic sourceTopic = (Topic)ic0.lookup("/topic/sourceTopic");
-
- Queue destQueue = (Queue)ic1.lookup("/queue/destQueue");
-
- final int BATCH_SIZE = 10;
-
bridge = new Bridge(cff0, cff1, sourceTopic, destQueue,
null, null, null, null,
null, 5000, 10, Bridge.QOS_AT_MOST_ONCE,
@@ -1105,89 +926,19 @@
bridge.start();
- connSource = cf0.createConnection();
+ sendMessages(cf0, sourceTopic, 0, NUM_MESSAGES, false);
- connDest = cf1.createConnection();
+ checkAllMessageReceivedInOrder(cf1, destQueue, 0, NUM_MESSAGES);
- Session sessSend = connSource.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageProducer prod = sessSend.createProducer(sourceTopic);
-
- for (int i = 0; i < BATCH_SIZE; i++)
- {
- TextMessage tm = sessSend.createTextMessage("message" + i);
-
- prod.send(tm);
- }
-
- Session sessRec = connDest.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageConsumer cons = sessRec.createConsumer(destQueue);
-
- connDest.start();
-
- for (int i = 0 ; i < BATCH_SIZE; i++)
- {
- TextMessage tm = (TextMessage)cons.receive(1000);
-
- assertNotNull(tm);
-
- assertEquals("message" + i, tm.getText());
- }
-
- Message m = cons.receive(1000);
-
- assertNull(m);
+ this.checkNoneReceived(cf1, destQueue);
}
finally
- {
- if (connSource != null)
- {
- try
- {
- connSource.close();
- }
- catch (Exception e)
- {
- log.error("Failed to close connection", e);
- }
- }
-
- if (connDest != null)
- {
- try
- {
- connDest.close();
- }
- catch (Exception e)
- {
- log.error("Failed to close connection", e);
- }
- }
-
+ {
if (bridge != null)
{
bridge.stop();
}
-
- try
- {
- ServerManagement.undeployTopic("sourceTopic", 0);
- }
- catch (Exception e)
- {
- log.error("Failed to undeploy", e);
- }
-
- try
- {
- ServerManagement.undeployQueue("destQueue", 1);
- }
- catch (Exception e)
- {
- log.error("Failed to undeploy", e);
- }
}
}
@@ -1198,40 +949,14 @@
return;
}
- Connection connSource = null;
-
- Connection connDest = null;
-
Bridge bridge = null;
try
{
- ServerManagement.deployTopic("sourceTopic", 0);
+ this.setUpAdministeredObjects();
- ServerManagement.deployQueue("destQueue", 1);
+ final int NUM_MESSAGES = 10;
- Hashtable props0 = ServerManagement.getJNDIEnvironment(0);
-
- Hashtable props1 = ServerManagement.getJNDIEnvironment(1);
-
- ConnectionFactoryFactory cff0 = new JNDIConnectionFactoryFactory(props0, "/ConnectionFactory");
-
- ConnectionFactoryFactory cff1 = new JNDIConnectionFactoryFactory(props1, "/ConnectionFactory");
-
- InitialContext ic0 = new InitialContext(props0);
-
- InitialContext ic1 = new InitialContext(props1);
-
- ConnectionFactory cf0 = (ConnectionFactory)ic0.lookup("/ConnectionFactory");
-
- ConnectionFactory cf1 = (ConnectionFactory)ic1.lookup("/ConnectionFactory");
-
- Topic sourceTopic = (Topic)ic0.lookup("/topic/sourceTopic");
-
- Queue destQueue = (Queue)ic1.lookup("/queue/destQueue");
-
- final int BATCH_SIZE = 10;
-
bridge = new Bridge(cff0, cff1, sourceTopic, destQueue,
null, null, null, null,
null, 5000, 10, Bridge.QOS_AT_MOST_ONCE,
@@ -1240,132 +965,90 @@
bridge.start();
- connSource = cf0.createConnection();
+ sendMessages(cf0, sourceTopic, 0, NUM_MESSAGES, true);
- connDest = cf1.createConnection();
+ checkAllMessageReceivedInOrder(cf1, destQueue, 0, NUM_MESSAGES);
- Session sessSend = connSource.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageProducer prod = sessSend.createProducer(sourceTopic);
-
- for (int i = 0; i < BATCH_SIZE; i++)
+ this.checkNoneReceived(cf1, destQueue);
+
+ }
+ finally
+ {
+ if (bridge != null)
{
- TextMessage tm = sessSend.createTextMessage("message" + i);
-
- prod.send(tm);
+ bridge.stop();
}
+ }
+ }
+
+ public void testTimeout() throws Exception
+ {
+ Bridge bridge = null;
+
+ try
+ {
+ this.setUpAdministeredObjects();
- Session sessRec = connDest.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final int NUM_MESSAGES = 10;
- MessageConsumer cons = sessRec.createConsumer(destQueue);
+ bridge = new Bridge(cff0, cff1, sourceQueue, destQueue,
+ null, null, null, null,
+ null, 5000, 10, Bridge.QOS_AT_MOST_ONCE,
+ NUM_MESSAGES, -1,
+ null, null);
- connDest.start();
-
- for (int i = 0 ; i < BATCH_SIZE; i++)
- {
- TextMessage tm = (TextMessage)cons.receive(1000);
+ bridge.start();
+
+ boolean persistent = true;
- assertNotNull(tm);
-
- assertEquals("message" + i, tm.getText());
- }
+ //Send half the messges
+
+ this.sendMessages(cf0, sourceQueue, 0, NUM_MESSAGES / 2, persistent);
+
+ //Verify none are received
- Message m = cons.receive(1000);
+ this.checkNoneReceived(cf1, destQueue);
- assertNull(m);
-
+ log.info("Waiting");
+ Thread.sleep(120000);
+
+ //Send the other half
+
+ this.sendMessages(cf0, sourceQueue, NUM_MESSAGES / 2, NUM_MESSAGES / 2, persistent);
+
+ //This should now be receivable
+
+ this.checkAllMessageReceivedInOrder(cf1, destQueue, 0, NUM_MESSAGES);
+
+
+ this.checkNoneReceived(cf1, destQueue);
+
+ this.checkNoneReceived(cf0, sourceQueue);
}
finally
{
- if (connSource != null)
- {
- try
- {
- connSource.close();
- }
- catch (Exception e)
- {
- log.error("Failed to close connection", e);
- }
- }
-
- if (connDest != null)
- {
- try
- {
- connDest.close();
- }
- catch (Exception e)
- {
- log.error("Failed to close connection", e);
- }
- }
-
if (bridge != null)
{
+ log.info("Stopping bridge");
bridge.stop();
- }
-
- try
- {
- ServerManagement.undeployTopic("sourceTopic", 0);
- }
- catch (Exception e)
- {
- log.error("Failed to undeploy", e);
- }
-
- try
- {
- ServerManagement.undeployQueue("destQueue", 1);
- }
- catch (Exception e)
- {
- log.error("Failed to undeploy", e);
- }
+ }
}
}
// Private -------------------------------------------------------------------------------
-
-
-
+
private void testStress(int qosMode, boolean persistent, int batchSize) throws Exception
{
Connection connSource = null;
- Connection connDest = null;
-
Bridge bridge = null;
Thread t = null;
try
{
- ServerManagement.deployQueue("sourceQueue", 0);
-
- ServerManagement.deployQueue("destQueue", 1);
-
- Hashtable props0 = ServerManagement.getJNDIEnvironment(0);
-
- Hashtable props1 = ServerManagement.getJNDIEnvironment(1);
-
- ConnectionFactoryFactory cff0 = new JNDIConnectionFactoryFactory(props0, "/ConnectionFactory");
-
- ConnectionFactoryFactory cff1 = new JNDIConnectionFactoryFactory(props1, "/ConnectionFactory");
-
- InitialContext ic0 = new InitialContext(props0);
-
- InitialContext ic1 = new InitialContext(props1);
-
- ConnectionFactory cf0 = (ConnectionFactory)ic0.lookup("/ConnectionFactory");
-
- ConnectionFactory cf1 = (ConnectionFactory)ic1.lookup("/ConnectionFactory");
-
- Queue sourceQueue = (Queue)ic0.lookup("/queue/sourceQueue");
-
- Queue destQueue = (Queue)ic1.lookup("/queue/destQueue");
+ this.setUpAdministeredObjects();
bridge = new Bridge(cff0, cff1, sourceQueue, destQueue,
null, null, null, null,
@@ -1377,8 +1060,6 @@
connSource = cf0.createConnection();
- connDest = cf1.createConnection();
-
Session sessSend = connSource.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer prod = sessSend.createProducer(sourceQueue);
@@ -1391,29 +1072,16 @@
sender.numMessages = NUM_MESSAGES;
prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
- Session sessRec = connDest.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageConsumer cons = sessRec.createConsumer(destQueue);
-
- connDest.start();
-
t = new Thread(sender);
t.start();
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = (TextMessage)cons.receive(5000);
-
- assertNotNull(tm);
-
- assertEquals("message" + i, tm.getText());
- }
- Message m = cons.receive(1000);
+ this.checkAllMessageReceivedInOrder(cf1, destQueue, 0, NUM_MESSAGES);
- assertNull(m);
+ this.checkNoneReceived(cf1, destQueue);
+ this.checkNoneReceived(cf0, sourceQueue);
+
t.join();
if (sender.ex != null)
@@ -1440,42 +1108,12 @@
{
log.error("Failed to close connection", e);
}
- }
+ }
- if (connDest != null)
- {
- try
- {
- connDest.close();
- }
- catch (Exception e)
- {
- log.error("Failed to close connection", e);
- }
- }
-
if (bridge != null)
{
bridge.stop();
- }
-
- try
- {
- ServerManagement.undeployQueue("sourceQueue", 0);
- }
- catch (Exception e)
- {
- log.error("Failed to undeploy", e);
- }
-
- try
- {
- ServerManagement.undeployQueue("destQueue", 1);
- }
- catch (Exception e)
- {
- log.error("Failed to undeploy", e);
- }
+ }
}
}
@@ -1490,23 +1128,9 @@
try
{
- ServerManagement.deployQueue("sourceQueue", 0);
-
- ServerManagement.deployQueue("destQueue", 0);
-
- Hashtable props0 = ServerManagement.getJNDIEnvironment(0);
-
- ConnectionFactoryFactory cff0 = new JNDIConnectionFactoryFactory(props0, "/ConnectionFactory");
-
- InitialContext ic0 = new InitialContext(props0);
-
- ConnectionFactory cf0 = (ConnectionFactory)ic0.lookup("/ConnectionFactory");
-
- Queue sourceQueue = (Queue)ic0.lookup("/queue/sourceQueue");
-
- Queue destQueue = (Queue)ic0.lookup("/queue/destQueue");
+ this.setUpAdministeredObjects();
- bridge = new Bridge(cff0, cff0, sourceQueue, destQueue,
+ bridge = new Bridge(cff0, cff0, sourceQueue, localDestQueue,
null, null, null, null,
null, 5000, 10, qosMode,
batchSize, -1,
@@ -1528,29 +1152,16 @@
sender.numMessages = NUM_MESSAGES;
prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
- Session sessRec = connSource.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageConsumer cons = sessRec.createConsumer(destQueue);
-
- connSource.start();
-
t = new Thread(sender);
t.start();
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = (TextMessage)cons.receive(5000);
-
- assertNotNull(tm);
-
- assertEquals("message" + i, tm.getText());
- }
- Message m = cons.receive(1000);
+ this.checkAllMessageReceivedInOrder(cf0, localDestQueue, 0, NUM_MESSAGES);
- assertNull(m);
+ this.checkNoneReceived(cf0, localDestQueue);
+ this.checkNoneReceived(cf0, sourceQueue);
+
t.join();
if (sender.ex != null)
@@ -1583,464 +1194,153 @@
{
bridge.stop();
}
-
- try
- {
- ServerManagement.undeployQueue("sourceQueue", 0);
- }
- catch (Exception e)
- {
- log.error("Failed to undeploy", e);
- }
-
- try
- {
- ServerManagement.undeployQueue("destQueue", 0);
- }
- catch (Exception e)
- {
- log.error("Failed to undeploy", e);
- }
}
}
private void testNoMaxBatchTime(int qosMode, boolean persistent) throws Exception
{
- Connection connSource = null;
-
- Connection connDest = null;
-
Bridge bridge = null;
try
{
- ServerManagement.deployQueue("sourceQueue", 0);
+ this.setUpAdministeredObjects();
- ServerManagement.deployQueue("destQueue", 1);
+ final int NUM_MESSAGES = 10;
- Hashtable props0 = ServerManagement.getJNDIEnvironment(0);
-
- Hashtable props1 = ServerManagement.getJNDIEnvironment(1);
-
- ConnectionFactoryFactory cff0 = new JNDIConnectionFactoryFactory(props0, "/ConnectionFactory");
-
- ConnectionFactoryFactory cff1 = new JNDIConnectionFactoryFactory(props1, "/ConnectionFactory");
-
- InitialContext ic0 = new InitialContext(props0);
-
- InitialContext ic1 = new InitialContext(props1);
-
- ConnectionFactory cf0 = (ConnectionFactory)ic0.lookup("/ConnectionFactory");
-
- ConnectionFactory cf1 = (ConnectionFactory)ic1.lookup("/ConnectionFactory");
-
- Queue sourceQueue = (Queue)ic0.lookup("/queue/sourceQueue");
-
- Queue destQueue = (Queue)ic1.lookup("/queue/destQueue");
-
- final int BATCH_SIZE = 10;
-
bridge = new Bridge(cff0, cff1, sourceQueue, destQueue,
null, null, null, null,
null, 5000, 10, qosMode,
- BATCH_SIZE, -1,
+ NUM_MESSAGES, -1,
null, null);
bridge.start();
- connSource = cf0.createConnection();
-
- connDest = cf1.createConnection();
-
- Session sessSend = connSource.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageProducer prod = sessSend.createProducer(sourceQueue);
-
- prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-
//Send half the messges
- for (int i = 0; i < BATCH_SIZE / 2; i++)
- {
- TextMessage tm = sessSend.createTextMessage("message" + i);
-
- prod.send(tm);
- }
-
- Session sessRec = connDest.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageConsumer cons = sessRec.createConsumer(destQueue);
-
- connDest.start();
-
+ this.sendMessages(cf0, sourceQueue, 0, NUM_MESSAGES / 2, persistent);
+
//Verify none are received
- Message m = cons.receive(2000);
+ this.checkNoneReceived(cf1, destQueue);
- assertNull(m);
-
//Send the other half
- for (int i = BATCH_SIZE / 2; i < BATCH_SIZE; i++)
- {
- TextMessage tm = sessSend.createTextMessage("message" + i);
-
- prod.send(tm);
- }
+ this.sendMessages(cf0, sourceQueue, NUM_MESSAGES / 2, NUM_MESSAGES / 2, persistent);
//This should now be receivable
- for (int i = 0; i < BATCH_SIZE; i++)
- {
- TextMessage tm = (TextMessage)cons.receive(1000);
-
- assertNotNull(tm);
-
- assertEquals("message" + i, tm.getText());
- }
+ this.checkAllMessageReceivedInOrder(cf1, destQueue, 0, NUM_MESSAGES);
- m = cons.receive(1000);
-
- assertNull(m);
-
//Send another batch with one more than batch size
- for (int i = 0; i < BATCH_SIZE + 1; i++)
- {
- TextMessage tm = sessSend.createTextMessage("message" + i);
-
- prod.send(tm);
- }
-
+ this.sendMessages(cf0, sourceQueue, 0, NUM_MESSAGES + 1, persistent);
+
//Make sure only batch size are received
- for (int i = 0; i < BATCH_SIZE; i++)
- {
- TextMessage tm = (TextMessage)cons.receive(1000);
-
- assertNotNull(tm);
-
- assertEquals("message" + i, tm.getText());
- }
+ this.checkAllMessageReceivedInOrder(cf1, destQueue, 0, NUM_MESSAGES);
- m = cons.receive(2000);
-
- assertNull(m);
-
//Final batch
- for (int i = 0; i < BATCH_SIZE - 1; i++)
- {
- TextMessage tm = sessSend.createTextMessage("message" + i);
-
- prod.send(tm);
- }
+ this.sendMessages(cf0, sourceQueue, 0, NUM_MESSAGES - 1, persistent);
- TextMessage tm = (TextMessage)cons.receive(1000);
+ this.checkAllMessageReceivedInOrder(cf1, destQueue, NUM_MESSAGES, 1);
- assertNotNull(tm);
+ this.checkAllMessageReceivedInOrder(cf1, destQueue, 0, NUM_MESSAGES - 1);
- assertEquals("message" + BATCH_SIZE, tm.getText());
+ //Make sure no messages are left
- for (int i = 0; i < BATCH_SIZE - 1; i++)
- {
- tm = (TextMessage)cons.receive(1000);
-
- assertNotNull(tm);
-
- assertEquals("message" + i, tm.getText());
- }
+ this.checkNoneReceived(cf1, destQueue);
- m = cons.receive(1000);
-
- assertNull(m);
-
-
- //Make sure no messages are left in the source dest
-
- MessageConsumer cons2 = sessSend.createConsumer(sourceQueue);
-
- connSource.start();
-
- m = cons2.receive(1000);
-
- assertNull(m);
+ this.checkNoneReceived(cf0, sourceQueue);
}
finally
{
- if (connSource != null)
- {
- try
- {
- connSource.close();
- }
- catch (Exception e)
- {
- log.error("Failed to close connection", e);
- }
- }
-
- if (connDest != null)
- {
- try
- {
- connDest.close();
- }
- catch (Exception e)
- {
- log.error("Failed to close connection", e);
- }
- }
-
if (bridge != null)
{
+ log.info("Stopping bridge");
bridge.stop();
- }
-
- try
- {
- ServerManagement.undeployQueue("sourceQueue", 0);
- }
- catch (Exception e)
- {
- log.error("Failed to undeploy", e);
- }
-
- try
- {
- ServerManagement.undeployQueue("destQueue", 1);
- }
- catch (Exception e)
- {
- log.error("Failed to undeploy", e);
- }
+ }
}
}
+
+
+
private void testNoMaxBatchTimeSameServer(int qosMode, boolean persistent) throws Exception
{
- Connection connSource = null;
-
Bridge bridge = null;
try
{
- ServerManagement.deployQueue("sourceQueue", 0);
+ this.setUpAdministeredObjects();
- ServerManagement.deployQueue("destQueue", 0);
+ final int NUM_MESSAGES = 10;
- Hashtable props0 = ServerManagement.getJNDIEnvironment(0);
-
- ConnectionFactoryFactory cff0 = new JNDIConnectionFactoryFactory(props0, "/ConnectionFactory");
-
- InitialContext ic0 = new InitialContext(props0);
-
- ConnectionFactory cf0 = (ConnectionFactory)ic0.lookup("/ConnectionFactory");
-
- Queue sourceQueue = (Queue)ic0.lookup("/queue/sourceQueue");
-
- Queue destQueue = (Queue)ic0.lookup("/queue/destQueue");
-
- final int BATCH_SIZE = 10;
-
- bridge = new Bridge(cff0, cff0, sourceQueue, destQueue,
+ bridge = new Bridge(cff0, cff0, sourceQueue, localDestQueue,
null, null, null, null,
null, 5000, 10, qosMode,
- BATCH_SIZE, -1,
+ NUM_MESSAGES, -1,
null, null);
bridge.start();
- connSource = cf0.createConnection();
+ this.sendMessages(cf0, sourceQueue, 0, NUM_MESSAGES / 2, persistent);
- Session sessSend = connSource.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ this.checkNoneReceived(cf1, destQueue);
- MessageProducer prod = sessSend.createProducer(sourceQueue);
-
- prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-
- //Send half the messges
-
- for (int i = 0; i < BATCH_SIZE / 2; i++)
- {
- TextMessage tm = sessSend.createTextMessage("message" + i);
-
- prod.send(tm);
- }
-
- Session sessRec = connSource.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageConsumer cons = sessRec.createConsumer(destQueue);
-
- connSource.start();
-
- //Verify none are received
-
- Message m = cons.receive(2000);
-
- assertNull(m);
-
//Send the other half
- for (int i = BATCH_SIZE / 2; i < BATCH_SIZE; i++)
- {
- TextMessage tm = sessSend.createTextMessage("message" + i);
-
- prod.send(tm);
- }
+ this.sendMessages(cf0, sourceQueue, NUM_MESSAGES / 2, NUM_MESSAGES /2, persistent);
+
//This should now be receivable
- for (int i = 0; i < BATCH_SIZE; i++)
- {
- TextMessage tm = (TextMessage)cons.receive(1000);
-
- assertNotNull(tm);
-
- assertEquals("message" + i, tm.getText());
- }
+ this.checkAllMessageReceivedInOrder(cf0, localDestQueue, 0, NUM_MESSAGES);
- m = cons.receive(1000);
+ this.checkNoneReceived(cf0, localDestQueue);
- assertNull(m);
+ this.checkNoneReceived(cf0, sourceQueue);
//Send another batch with one more than batch size
- for (int i = 0; i < BATCH_SIZE + 1; i++)
- {
- TextMessage tm = sessSend.createTextMessage("message" + i);
-
- prod.send(tm);
- }
+ this.sendMessages(cf0, sourceQueue, 0, NUM_MESSAGES + 1, persistent);
//Make sure only batch size are received
- for (int i = 0; i < BATCH_SIZE; i++)
- {
- TextMessage tm = (TextMessage)cons.receive(1000);
-
- assertNotNull(tm);
-
- assertEquals("message" + i, tm.getText());
- }
+ this.checkAllMessageReceivedInOrder(cf0, localDestQueue, 0, NUM_MESSAGES);
- m = cons.receive(2000);
-
- assertNull(m);
-
//Final batch
- for (int i = 0; i < BATCH_SIZE - 1; i++)
- {
- TextMessage tm = sessSend.createTextMessage("message" + i);
-
- prod.send(tm);
- }
+ this.sendMessages(cf0, sourceQueue, 0, NUM_MESSAGES - 1, persistent);
- TextMessage tm = (TextMessage)cons.receive(1000);
+ this.checkAllMessageReceivedInOrder(cf0, localDestQueue, NUM_MESSAGES, 1);
- assertNotNull(tm);
+ this.checkAllMessageReceivedInOrder(cf0, localDestQueue, 0, NUM_MESSAGES - 1);
- assertEquals("message" + BATCH_SIZE, tm.getText());
+ //Make sure no messages are left
- for (int i = 0; i < BATCH_SIZE - 1; i++)
- {
- tm = (TextMessage)cons.receive(1000);
-
- assertNotNull(tm);
-
- assertEquals("message" + i, tm.getText());
- }
+ this.checkNoneReceived(cf0, localDestQueue);
- m = cons.receive(1000);
-
- assertNull(m);
-
-
- //Make sure no messages are left in the source dest
-
- MessageConsumer cons2 = sessSend.createConsumer(sourceQueue);
-
- connSource.start();
-
- m = cons2.receive(1000);
-
- assertNull(m);
+ this.checkNoneReceived(cf0, sourceQueue);
}
finally
- {
- if (connSource != null)
- {
- try
- {
- connSource.close();
- }
- catch (Exception e)
- {
- log.error("Failed to close connection", e);
- }
- }
-
+ {
if (bridge != null)
{
bridge.stop();
- }
-
- try
- {
- ServerManagement.undeployQueue("sourceQueue", 0);
- }
- catch (Exception e)
- {
- log.error("Failed to undeploy", e);
- }
-
- try
- {
- ServerManagement.undeployQueue("destQueue", 1);
- }
- catch (Exception e)
- {
- log.error("Failed to undeploy", e);
- }
+ }
}
}
private void testMaxBatchTime(int qosMode, boolean persistent) throws Exception
{
- Connection connSource = null;
-
- Connection connDest = null;
-
Bridge bridge = null;
try
{
- ServerManagement.deployQueue("sourceQueue", 0);
+ this.setUpAdministeredObjects();
- ServerManagement.deployQueue("destQueue", 1);
-
- Hashtable props0 = ServerManagement.getJNDIEnvironment(0);
-
- Hashtable props1 = ServerManagement.getJNDIEnvironment(1);
-
- ConnectionFactoryFactory cff0 = new JNDIConnectionFactoryFactory(props0, "/ConnectionFactory");
-
- ConnectionFactoryFactory cff1 = new JNDIConnectionFactoryFactory(props1, "/ConnectionFactory");
-
- InitialContext ic0 = new InitialContext(props0);
-
- InitialContext ic1 = new InitialContext(props1);
-
- ConnectionFactory cf0 = (ConnectionFactory)ic0.lookup("/ConnectionFactory");
-
- ConnectionFactory cf1 = (ConnectionFactory)ic1.lookup("/ConnectionFactory");
-
- Queue sourceQueue = (Queue)ic0.lookup("/queue/sourceQueue");
-
- Queue destQueue = (Queue)ic1.lookup("/queue/destQueue");
-
final long MAX_BATCH_TIME = 3000;
final int MAX_BATCH_SIZE = 100000; // something big so it won't reach it
@@ -2053,255 +1353,93 @@
bridge.start();
- connSource = cf0.createConnection();
-
- connDest = cf1.createConnection();
-
- Session sessSend = connSource.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageProducer prod = sessSend.createProducer(sourceQueue);
-
- prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-
final int NUM_MESSAGES = 10;
//Send some message
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = sessSend.createTextMessage("message" + i);
-
- prod.send(tm);
- }
+ this.sendMessages(cf0, sourceQueue, 0, NUM_MESSAGES, persistent);
- Session sessRec = connDest.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageConsumer cons = sessRec.createConsumer(destQueue);
-
- connDest.start();
-
//Verify none are received
- Message m = cons.receive(2000);
+ this.checkNoneReceived(cf1, destQueue);
- assertNull(m);
-
//Wait a bit longer
Thread.sleep(1500);
//Messages should now be receivable
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = (TextMessage)cons.receive(1000);
-
- assertNotNull(tm);
-
- assertEquals("message" + i, tm.getText());
- }
+ this.checkAllMessageReceivedInOrder(cf1, destQueue, 0, NUM_MESSAGES);
- m = cons.receive(1000);
+ this.checkNoneReceived(cf1, destQueue);
- assertNull(m);
-
//Make sure no messages are left in the source dest
- MessageConsumer cons2 = sessSend.createConsumer(sourceQueue);
+ this.checkNoneReceived(cf0, sourceQueue);
- connSource.start();
-
- m = cons2.receive(1000);
-
- assertNull(m);
-
}
finally
{
- if (connSource != null)
- {
- try
- {
- connSource.close();
- }
- catch (Exception e)
- {
- log.error("Failed to close connection", e);
- }
- }
-
- if (connDest != null)
- {
- try
- {
- connDest.close();
- }
- catch (Exception e)
- {
- log.error("Failed to close connection", e);
- }
- }
-
if (bridge != null)
{
bridge.stop();
- }
-
- try
- {
- ServerManagement.undeployQueue("sourceQueue", 0);
- }
- catch (Exception e)
- {
- log.error("Failed to undeploy", e);
- }
-
- try
- {
- ServerManagement.undeployQueue("destQueue", 0);
- }
- catch (Exception e)
- {
- log.error("Failed to undeploy", e);
- }
+ }
}
}
private void testMaxBatchTimeSameServer(int qosMode, boolean persistent) throws Exception
{
- Connection connSource = null;
-
Bridge bridge = null;
try
{
- ServerManagement.deployQueue("sourceQueue", 0);
+ this.setUpAdministeredObjects();
- ServerManagement.deployQueue("destQueue", 0);
-
- Hashtable props0 = ServerManagement.getJNDIEnvironment(0);
-
- ConnectionFactoryFactory cff0 = new JNDIConnectionFactoryFactory(props0, "/ConnectionFactory");
-
- InitialContext ic0 = new InitialContext(props0);
-
- ConnectionFactory cf0 = (ConnectionFactory)ic0.lookup("/ConnectionFactory");
-
- ConnectionFactory cf1 = (ConnectionFactory)ic0.lookup("/ConnectionFactory");
-
- Queue sourceQueue = (Queue)ic0.lookup("/queue/sourceQueue");
-
- Queue destQueue = (Queue)ic0.lookup("/queue/destQueue");
-
final long MAX_BATCH_TIME = 3000;
final int MAX_BATCH_SIZE = 100000; // something big so it won't reach it
- bridge = new Bridge(cff0, cff0, sourceQueue, destQueue,
+ bridge = new Bridge(cff0, cff0, sourceQueue, localDestQueue,
null, null, null, null,
null, 5000, 10, qosMode,
MAX_BATCH_SIZE, MAX_BATCH_TIME,
null, null);
bridge.start();
-
- connSource = cf0.createConnection();
-
- Session sessSend = connSource.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer prod = sessSend.createProducer(sourceQueue);
-
- prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-
final int NUM_MESSAGES = 10;
//Send some message
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = sessSend.createTextMessage("message" + i);
-
- prod.send(tm);
- }
+ //Send some message
+
+ this.sendMessages(cf0, sourceQueue, 0, NUM_MESSAGES, persistent);
- Session sessRec = connSource.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageConsumer cons = sessRec.createConsumer(destQueue);
-
- connSource.start();
-
//Verify none are received
- Message m = cons.receive(2000);
+ this.checkNoneReceived(cf0, localDestQueue);
- assertNull(m);
-
//Wait a bit longer
Thread.sleep(1500);
//Messages should now be receivable
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = (TextMessage)cons.receive(1000);
-
- assertNotNull(tm);
-
- assertEquals("message" + i, tm.getText());
- }
+ this.checkAllMessageReceivedInOrder(cf0, localDestQueue, 0, NUM_MESSAGES);
- m = cons.receive(1000);
+ this.checkNoneReceived(cf0, localDestQueue);
- assertNull(m);
-
//Make sure no messages are left in the source dest
- MessageConsumer cons2 = sessSend.createConsumer(sourceQueue);
+ this.checkNoneReceived(cf0, sourceQueue);
- connSource.start();
-
- m = cons2.receive(1000);
-
- assertNull(m);
-
}
finally
- {
- if (connSource != null)
- {
- try
- {
- connSource.close();
- }
- catch (Exception e)
- {
- log.error("Failed to close connection", e);
- }
- }
-
+ {
if (bridge != null)
{
bridge.stop();
- }
-
- try
- {
- ServerManagement.undeployQueue("sourceQueue", 0);
- }
- catch (Exception e)
- {
- log.error("Failed to undeploy", e);
- }
-
- try
- {
- ServerManagement.undeployQueue("destQueue", 1);
- }
- catch (Exception e)
- {
- log.error("Failed to undeploy", e);
- }
+ }
}
}
Modified: trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTestBase.java 2007-01-24 13:49:49 UTC (rev 2033)
+++ trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTestBase.java 2007-01-24 16:35:14 UTC (rev 2034)
@@ -21,6 +21,26 @@
*/
package org.jboss.test.messaging.jms.bridge;
+import java.util.HashSet;
+import java.util.Hashtable;
+import java.util.Set;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.naming.InitialContext;
+
+import org.jboss.jms.server.bridge.Bridge;
+import org.jboss.jms.server.bridge.ConnectionFactoryFactory;
+import org.jboss.jms.server.bridge.JNDIConnectionFactoryFactory;
import org.jboss.logging.Logger;
import org.jboss.test.messaging.MessagingTestCase;
import org.jboss.test.messaging.tools.ServerManagement;
@@ -38,12 +58,20 @@
*/
public class BridgeTestBase extends MessagingTestCase
{
- private static final Logger log = Logger.getLogger(BridgeTest.class);
+ private static final Logger log = Logger.getLogger(BridgeTestBase.class);
protected int nodeCount = 2;
protected ServiceContainer sc;
+ protected ConnectionFactoryFactory cff0, cff1;
+
+ protected ConnectionFactory cf0, cf1;
+
+ protected Destination sourceQueue, destQueue, sourceTopic, localDestQueue;
+
+ protected boolean useArjuna;
+
public BridgeTestBase(String name)
{
super(name);
@@ -62,14 +90,36 @@
// make sure all servers are created and started; make sure that database is zapped
// ONLY for the first server, the others rely on values they expect to find in shared
// tables; don't clear the database for those.
- ServerManagement.start(i, "all,-transaction,jbossjta", i == 0);
+ if (useArjuna)
+ {
+ ServerManagement.start(i, "all,-transaction, jbossjta", i == 0);
+ }
+ else
+ {
+ ServerManagement.start(i, "all", i == 0);
+ }
}
//We need a local transaction and recovery manager
//We must start this after the remote servers have been created or it won't
//have deleted the database and the recovery manager may attempt to recover transactions
- sc = new ServiceContainer("jbossjta");
- sc.start(false);
+ if (useArjuna)
+ {
+ sc = new ServiceContainer("jbossjta");
+ }
+ else
+ {
+ sc = new ServiceContainer("transaction");
+ }
+ sc.start(false);
+
+ ServerManagement.deployQueue("sourceQueue", 0);
+
+ ServerManagement.deployTopic("sourceTopic", 0);
+
+ ServerManagement.deployQueue("localDestQueue", 0);
+
+ ServerManagement.deployQueue("destQueue", 1);
}
}
@@ -77,6 +127,44 @@
{
if (ServerManagement.isRemote())
{
+ try
+ {
+ ServerManagement.undeployQueue("sourceQueue", 0);
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to undeploy", e);
+ }
+
+ try
+ {
+ ServerManagement.undeployTopic("sourceTopic", 0);
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to undeploy", e);
+ }
+
+ try
+ {
+ ServerManagement.undeployQueue("destQueue", 1);
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to undeploy", e);
+ }
+
+ try
+ {
+ ServerManagement.undeployQueue("localDestQueue", 0);
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to undeploy", e);
+ }
+
+
+
for (int i = 0; i < nodeCount; i++)
{
try
@@ -115,7 +203,209 @@
}
+ protected void setUpAdministeredObjects() throws Exception
+ {
+ InitialContext ic0 = null, ic1 = null;
+ try
+ {
+ Hashtable props0 = ServerManagement.getJNDIEnvironment(0);
+
+ Hashtable props1 = ServerManagement.getJNDIEnvironment(1);
+
+ cff0 = new JNDIConnectionFactoryFactory(props0, "/ConnectionFactory");
+
+ cff1 = new JNDIConnectionFactoryFactory(props1, "/ConnectionFactory");
+
+ ic0 = new InitialContext(props0);
+
+ ic1 = new InitialContext(props1);
+
+ cf0 = (ConnectionFactory)ic0.lookup("/ConnectionFactory");
+
+ cf1 = (ConnectionFactory)ic1.lookup("/ConnectionFactory");
+
+ sourceQueue = (Queue)ic0.lookup("/queue/sourceQueue");
+
+ destQueue = (Queue)ic1.lookup("/queue/destQueue");
+
+ sourceTopic = (Topic)ic0.lookup("/topic/sourceTopic");
+
+ localDestQueue = (Queue)ic0.lookup("/queue/localDestQueue");
+ }
+ finally
+ {
+ if (ic0 != null)
+ {
+ ic0.close();
+ }
+ if (ic1 != null)
+ {
+ ic1.close();
+ }
+ }
+ }
+ protected void sendMessages(ConnectionFactory cf, Destination dest, int start, int numMessages, boolean persistent)
+ throws Exception
+ {
+ Connection conn = null;
+
+ try
+ {
+ conn = cf.createConnection();
+
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod = sess.createProducer(dest);
+
+ prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+ for (int i = start; i < start + numMessages; i++)
+ {
+ TextMessage tm = sess.createTextMessage("message" + i);
+
+ prod.send(tm);
+ }
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
+ protected void checkNoneReceived(ConnectionFactory cf, Destination dest) throws Exception
+ {
+ Connection conn = null;
+
+ try
+ {
+ conn = cf.createConnection();
+
+ conn.start();
+
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer cons = sess.createConsumer(dest);
+
+ Message m = cons.receive(2000);
+
+ assertNull(m);
+
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
+ protected void checkMessagesReceived(ConnectionFactory cf, Destination dest, int qosMode, int numMessages) throws Exception
+ {
+ Connection conn = null;
+
+ try
+ {
+ conn = cf.createConnection();
+
+ conn.start();
+
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer cons = sess.createConsumer(dest);
+
+ // Consume the messages
+
+ Set msgs = new HashSet();
+
+ int count = 0;
+
+ while (true)
+ {
+ TextMessage tm = (TextMessage)cons.receive(2000);
+
+ if (tm == null)
+ {
+ break;
+ }
+
+ msgs.add(tm.getText());
+
+ count++;
+
+ }
+
+ if (qosMode == Bridge.QOS_ONCE_AND_ONLY_ONCE || qosMode == Bridge.QOS_DUPLICATES_OK)
+ {
+ //All the messages should be received
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ assertTrue(msgs.contains("message" + i));
+ }
+
+ //Should be no more
+ if (qosMode == Bridge.QOS_ONCE_AND_ONLY_ONCE)
+ {
+ assertEquals(numMessages, msgs.size());
+ }
+ }
+ else if (qosMode == Bridge.QOS_AT_MOST_ONCE)
+ {
+ //No *guarantee* that any messages will be received
+ //but you still might get some depending on how/where the crash occurred
+ }
+
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
+
+ protected void checkAllMessageReceivedInOrder(ConnectionFactory cf, Destination dest, int start, int numMessages) throws Exception
+ {
+ Connection conn = null;
+
+ try
+ {
+ conn = cf.createConnection();
+
+ conn.start();
+
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer cons = sess.createConsumer(dest);
+
+ // Consume the messages
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ TextMessage tm = (TextMessage)cons.receive(2000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + (i + start), tm.getText());
+ }
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
+
// Inner classes -------------------------------------------------------------------
}
Modified: trunk/tests/src/org/jboss/test/messaging/jms/bridge/ReconnectTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/bridge/ReconnectTest.java 2007-01-24 13:49:49 UTC (rev 2033)
+++ trunk/tests/src/org/jboss/test/messaging/jms/bridge/ReconnectTest.java 2007-01-24 16:35:14 UTC (rev 2034)
@@ -21,25 +21,7 @@
*/
package org.jboss.test.messaging.jms.bridge;
-import java.util.HashSet;
-import java.util.Hashtable;
-import java.util.Set;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.naming.InitialContext;
-
import org.jboss.jms.server.bridge.Bridge;
-import org.jboss.jms.server.bridge.ConnectionFactoryFactory;
-import org.jboss.jms.server.bridge.JNDIConnectionFactoryFactory;
import org.jboss.logging.Logger;
import org.jboss.test.messaging.tools.ServerManagement;
import org.jboss.test.messaging.tools.aop.PoisonInterceptor;
@@ -57,56 +39,23 @@
public class ReconnectTest extends BridgeTestBase
{
private static final Logger log = Logger.getLogger(ReconnectTest.class);
+
- protected ConnectionFactoryFactory cff0, cff1;
-
- protected ConnectionFactory cf0, cf1;
-
- protected Destination sourceQueue, destQueue;
-
-
public ReconnectTest(String name)
{
super(name);
}
protected void setUp() throws Exception
- {
- super.setUp();
+ {
+ useArjuna = true;
- if (ServerManagement.isRemote())
- {
- ServerManagement.deployQueue("sourceQueue", 0);
-
- ServerManagement.deployQueue("destQueue", 1);
- }
-
+ super.setUp();
}
protected void tearDown() throws Exception
{
super.tearDown();
-
- if (ServerManagement.isRemote())
- {
- try
- {
- ServerManagement.undeployQueue("sourceQueue", 0);
- }
- catch (Exception e)
- {
- log.error("Failed to undeploy", e);
- }
-
- try
- {
- ServerManagement.undeployQueue("destQueue", 1);
- }
- catch (Exception e)
- {
- log.error("Failed to undeploy", e);
- }
- }
}
// Crash and reconnect
@@ -214,44 +163,8 @@
testCrashAndReconnectDestCrashOnCommit(false);
}
- private void setUpAdministeredObjects() throws Exception
- {
- InitialContext ic0 = null, ic1 = null;
- try
- {
- Hashtable props0 = ServerManagement.getJNDIEnvironment(0);
-
- Hashtable props1 = ServerManagement.getJNDIEnvironment(1);
-
- cff0 = new JNDIConnectionFactoryFactory(props0, "/ConnectionFactory");
-
- cff1 = new JNDIConnectionFactoryFactory(props1, "/ConnectionFactory");
-
- ic0 = new InitialContext(props0);
-
- ic1 = new InitialContext(props1);
-
- cf0 = (ConnectionFactory)ic0.lookup("/ConnectionFactory");
-
- cf1 = (ConnectionFactory)ic1.lookup("/ConnectionFactory");
-
- sourceQueue = (Queue)ic0.lookup("/queue/sourceQueue");
-
- destQueue = (Queue)ic1.lookup("/queue/destQueue");
- }
- finally
- {
- if (ic0 != null)
- {
- ic0.close();
- }
- if (ic1 != null)
- {
- ic1.close();
- }
- }
- }
+
/*
* Send some messages
* Crash the destination server
@@ -532,137 +445,8 @@
}
}
- private void sendMessages(ConnectionFactory cf, Destination dest, int start, int numMessages, boolean persistent)
- throws Exception
- {
- Connection conn = null;
-
- try
- {
- conn = cf.createConnection();
-
- Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageProducer prod = sess.createProducer(dest);
-
- prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-
- for (int i = start; i < start + numMessages; i++)
- {
- TextMessage tm = sess.createTextMessage("message" + i);
-
- prod.send(tm);
- }
- }
- finally
- {
- if (conn != null)
- {
- conn.close();
- }
- }
- }
- private void checkNoneReceived(ConnectionFactory cf, Destination dest) throws Exception
- {
- Connection conn = null;
-
- try
- {
- conn = cf.createConnection();
-
- conn.start();
-
- Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageConsumer cons = sess.createConsumer(dest);
-
- Message m = cons.receive(2000);
-
- assertNull(m);
-
- }
- finally
- {
- if (conn != null)
- {
- conn.close();
- }
- }
- }
- private void checkMessagesReceived(ConnectionFactory cf, Destination dest, int qosMode, int numMessages) throws Exception
- {
- Connection conn = null;
-
- try
- {
- conn = cf.createConnection();
-
- conn.start();
-
- Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageConsumer cons = sess.createConsumer(dest);
-
- // Consume the messages
-
- Set msgs = new HashSet();
-
- log.info("checkMessagesReceived");
-
- int count = 0;
-
- while (true)
- {
- TextMessage tm = (TextMessage)cons.receive(2000);
-
- if (tm == null)
- {
- break;
- }
-
- log.info("got message:" + tm.getJMSMessageID());
-
- msgs.add(tm.getText());
-
- count++;
-
- }
-
- log.info("message received " + count);
-
- if (qosMode == Bridge.QOS_ONCE_AND_ONLY_ONCE || qosMode == Bridge.QOS_DUPLICATES_OK)
- {
- //All the messages should be received
-
- for (int i = 0; i < numMessages; i++)
- {
- assertTrue(msgs.contains("message" + i));
- }
-
- //Should be no more
- if (qosMode == Bridge.QOS_ONCE_AND_ONLY_ONCE)
- {
- assertEquals(numMessages, msgs.size());
- }
- }
- else if (qosMode == Bridge.QOS_AT_MOST_ONCE)
- {
- //No *guarantee* that any messages will be received
- //but you still might get some depending on how/where the crash occurred
- }
-
- }
- finally
- {
- if (conn != null)
- {
- conn.close();
- }
- }
- }
-
// Inner classes -------------------------------------------------------------------
}
Modified: trunk/tests/src/org/jboss/test/messaging/jms/message/JMSXDeliveryCountTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/message/JMSXDeliveryCountTest.java 2007-01-24 13:49:49 UTC (rev 2033)
+++ trunk/tests/src/org/jboss/test/messaging/jms/message/JMSXDeliveryCountTest.java 2007-01-24 16:35:14 UTC (rev 2034)
@@ -43,6 +43,7 @@
import org.jboss.jms.client.JBossConnectionFactory;
import org.jboss.test.messaging.MessagingTestCase;
import org.jboss.test.messaging.tools.ServerManagement;
+import org.jboss.test.messaging.tools.jmx.ServiceContainer;
import org.jboss.tm.TransactionManagerLocator;
/**
@@ -66,6 +67,8 @@
protected JBossConnectionFactory cf;
protected Queue queue;
protected Topic topic;
+
+ protected ServiceContainer sc;
// Constructors --------------------------------------------------
@@ -97,11 +100,24 @@
topic = (Topic)initialContext.lookup("/topic/Topic");
+ if (ServerManagement.isRemote())
+ {
+ //We need to start a service container otherwise transaction manager jndi lookup
+ //will fail
+ sc = new ServiceContainer("transaction");
+
+ sc.start(false);
+ }
}
public void tearDown() throws Exception
{
super.tearDown();
+
+ if (ServerManagement.isRemote())
+ {
+ sc.stop();
+ }
}
public void testSimpleJMSXDeliveryCount() throws Exception
More information about the jboss-cvs-commits
mailing list