[jboss-cvs] JBoss Messaging SVN: r1952 - in trunk: tests/src/org/jboss/test/messaging/jms/bridge and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Jan 10 15:51:06 EST 2007
Author: timfox
Date: 2007-01-10 15:51:02 -0500 (Wed, 10 Jan 2007)
New Revision: 1952
Modified:
trunk/src/main/org/jboss/jms/server/bridge/Bridge.java
trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTest.java
Log:
More bridge work
Modified: trunk/src/main/org/jboss/jms/server/bridge/Bridge.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/bridge/Bridge.java 2007-01-10 20:03:51 UTC (rev 1951)
+++ trunk/src/main/org/jboss/jms/server/bridge/Bridge.java 2007-01-10 20:51:02 UTC (rev 1952)
@@ -625,7 +625,7 @@
if (messages.size() >= maxBatchSize)
{
- if (trace) { log.trace(this + " maxBatchSizew has been reached so sending batch"); }
+ if (trace) { log.trace(this + " maxBatchSize has been reached so sending batch"); }
sendBatch();
Modified: trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTest.java 2007-01-10 20:03:51 UTC (rev 1951)
+++ trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTest.java 2007-01-10 20:51:02 UTC (rev 1952)
@@ -64,95 +64,234 @@
super.tearDown();
}
- public void testBridge() throws Exception
+ public void testMaxBatchSizeNoMaxBatchTimeTransacted() throws Exception
{
- ServerManagement.start(0, "all", null, true);
-
- ServerManagement.start(1, "all", null, false);
-
- Hashtable props0 = ServerManagement.getJNDIEnvironment(0);
-
- Hashtable props1 = ServerManagement.getJNDIEnvironment(1);
-
- ServerManagement.deployQueue("sourceQueue", 0);
-
- ServerManagement.deployQueue("destQueue", 1);
-
- InitialContext ic0 = new InitialContext(props0);
-
- InitialContext ic1 = new InitialContext(props1);
-
- ConnectionFactory cf0 = (ConnectionFactory)ic0.lookup("/ConnectionFactory");
-
- ConnectionFactory cf1 = (ConnectionFactory)ic1.lookup("/ConnectionFactory");
-
- Queue sourceQueue = (Queue)ic0.lookup("/queue/sourceQueue");
-
- Queue destQueue = (Queue)ic1.lookup("/queue/destQueue");
-
-
- Bridge bridge = new Bridge(props0, props1, "/ConnectionFactory", "/ConnectionFactory",
- "/queue/sourceQueue", "/queue/destQueue", null, null, null, null,
- null, 0, false,
- false, 10, -1,
- null, null,
- false, false);
-
- bridge.start();
-
-
- Connection connSource = cf0.createConnection();
-
- Connection connDest = cf1.createConnection();
-
- Session sessSend = connSource.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageProducer prod = sessSend.createProducer(sourceQueue);
-
- final int NUM_MESSAGES = 10;
-
- for (int i = 0; i < NUM_MESSAGES; i++)
+ if (!ServerManagement.isRemote())
{
- TextMessage tm = sessSend.createTextMessage("message" + i);
-
- prod.send(tm);
+ return;
}
+ testMaxBatchSizeNoMaxBatchTime(true);
+ }
+
+ public void testMaxBatchSizeNoMaxBatchTimeNonTransacted() throws Exception
+ {
+ if (!ServerManagement.isRemote())
+ {
+ return;
+ }
+ testMaxBatchSizeNoMaxBatchTime(false);
+ }
+
+ private void testMaxBatchSizeNoMaxBatchTime(boolean transacted) throws Exception
+ {
+ Connection connSource = null;
- Session sessRec = connDest.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Connection connDest = null;
- MessageConsumer cons = sessRec.createConsumer(destQueue);
-
- connDest.start();
-
- for (int i = 0; i < NUM_MESSAGES; i++)
+ Bridge bridge = null;
+
+ try
{
+ ServerManagement.start(0, "all", null, true);
+
+ ServerManagement.start(1, "all", null, false);
+
+ ServerManagement.deployQueue("sourceQueue", 0);
+
+ ServerManagement.deployQueue("destQueue", 1);
+
+ Hashtable props0 = ServerManagement.getJNDIEnvironment(0);
+
+ Hashtable props1 = ServerManagement.getJNDIEnvironment(1);
+
+ InitialContext ic0 = new InitialContext(props0);
+
+ InitialContext ic1 = new InitialContext(props1);
+
+ ConnectionFactory cf0 = (ConnectionFactory)ic0.lookup("/ConnectionFactory");
+
+ ConnectionFactory cf1 = (ConnectionFactory)ic1.lookup("/ConnectionFactory");
+
+ Queue sourceQueue = (Queue)ic0.lookup("/queue/sourceQueue");
+
+ Queue destQueue = (Queue)ic1.lookup("/queue/destQueue");
+
+ final int BATCH_SIZE = 10;
+
+ bridge = new Bridge(props0, props1, "/ConnectionFactory", "/ConnectionFactory",
+ "/queue/sourceQueue", "/queue/destQueue", null, null, null, null,
+ null, 0, false,
+ false, 10, -1,
+ null, null,
+ false, false);
+
+ bridge.start();
+
+ connSource = cf0.createConnection();
+
+ connDest = cf1.createConnection();
+
+ Session sessSend = connSource.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod = sessSend.createProducer(sourceQueue);
+
+ //Send half the messges
+
+ for (int i = 0; i < BATCH_SIZE / 2; i++)
+ {
+ TextMessage tm = sessSend.createTextMessage("message" + i);
+
+ prod.send(tm);
+ }
+
+ Session sessRec = connDest.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer cons = sessRec.createConsumer(destQueue);
+
+ connDest.start();
+
+ //Verify none are received
+
+ Message m = cons.receive(2000);
+
+ assertNull(m);
+
+ //Send the other half
+
+ for (int i = BATCH_SIZE / 2; i < BATCH_SIZE; i++)
+ {
+ TextMessage tm = sessSend.createTextMessage("message" + i);
+
+ prod.send(tm);
+ }
+
+ //This should now be receivable
+
+ for (int i = 0; i < BATCH_SIZE; i++)
+ {
+ TextMessage tm = (TextMessage)cons.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+
+ //Send another batch with one more than batch size
+
+ for (int i = 0; i < BATCH_SIZE + 1; i++)
+ {
+ TextMessage tm = sessSend.createTextMessage("message" + i);
+
+ prod.send(tm);
+ }
+
+ //Make sure only batch size are received
+
+ for (int i = 0; i < BATCH_SIZE; i++)
+ {
+ TextMessage tm = (TextMessage)cons.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+
+ m = cons.receive(2000);
+
+ assertNull(m);
+
+ //Final batch
+
+ for (int i = 0; i < BATCH_SIZE - 1; i++)
+ {
+ TextMessage tm = sessSend.createTextMessage("message" + i);
+
+ prod.send(tm);
+ }
+
TextMessage tm = (TextMessage)cons.receive(1000);
assertNotNull(tm);
- assertEquals("message" + i, tm.getText());
+ assertEquals("message" + BATCH_SIZE, tm.getText());
+
+ for (int i = 0; i < BATCH_SIZE - 1; i++)
+ {
+ tm = (TextMessage)cons.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+
+
+ //Make sure no messages are left in the source dest
+
+ MessageConsumer cons2 = sessSend.createConsumer(sourceQueue);
+
+ connSource.start();
+
+ m = cons2.receive(1000);
+
+ assertNull(m);
+
+ connSource.close();
+
+ connDest.close();
+
}
-
- //Make sure no messages are left in the source dest
-
- MessageConsumer cons2 = sessSend.createConsumer(sourceQueue);
-
- connSource.start();
-
- Message m = cons2.receive(1000);
-
- assertNull(m);
-
- connSource.close();
-
- connDest.close();
-
- bridge.stop();
-
- ServerManagement.stop(0);
-
- ServerManagement.stop(1);
-
+ finally
+ {
+ if (connSource != null)
+ {
+ try
+ {
+ connSource.close();
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to close connection", e);
+ }
+ }
+
+ if (connDest != null)
+ {
+ try
+ {
+ connDest.close();
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to close connection", e);
+ }
+ }
+
+ if (bridge != null)
+ {
+ bridge.stop();
+ }
+
+ try
+ {
+ ServerManagement.undeployQueue("sourceQueue", 0);
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to undeploy", e);
+ }
+
+ try
+ {
+ ServerManagement.undeployQueue("destQueue", 1);
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to undeploy", e);
+ }
+
+ ServerManagement.stop(0);
+
+ ServerManagement.stop(1);
+ }
}
-
}
More information about the jboss-cvs-commits
mailing list