[jboss-cvs] JBoss Messaging SVN: r1962 - in trunk: tests/src/org/jboss/test/messaging/jms/bridge and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Jan 11 16:02:36 EST 2007
Author: timfox
Date: 2007-01-11 16:02:31 -0500 (Thu, 11 Jan 2007)
New Revision: 1962
Modified:
trunk/src/main/org/jboss/jms/server/bridge/Bridge.java
trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTest.java
Log:
More bridge work
Modified: trunk/src/main/org/jboss/jms/server/bridge/Bridge.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/bridge/Bridge.java 2007-01-11 18:13:23 UTC (rev 1961)
+++ trunk/src/main/org/jboss/jms/server/bridge/Bridge.java 2007-01-11 21:02:31 UTC (rev 1962)
@@ -21,10 +21,8 @@
*/
package org.jboss.jms.server.bridge;
-import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
-import java.util.List;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
@@ -61,14 +59,6 @@
private static boolean trace;
- //Transaction modes
-
-// public static final int TRANSACTIONS_NONE = 0;
-//
-// public static final int TRANSACTIONS_LOCAL = 1;
-//
-// public static final int TRANSACTIONS_XA = 2;
-
//Quality of service modes
public static final int QOS_AT_MOST_ONCE = 0;
@@ -78,7 +68,12 @@
public static final int QOS_ONCE_AND_ONLY_ONCE = 2;
/*
+ *
+ * Quality of service (QoS) levels
+ * ===============================
+ *
* QOS_AT_MOST_ONCE
+ * ----------------
*
* With this QoS mode messages will reach the destination from the source at most once.
* The messages are consumed from the source and acknowledged
@@ -88,6 +83,7 @@
* This mode is avilable for both persistent and non persistent messages.
*
* QOS_DUPLICATES_OK
+ * -----------------
*
* With this QoS mode, the messages are consumed from the source and then acknowledged
* after they have been successfully sent to the destination. Therefore there is a possibility that if
@@ -96,6 +92,7 @@
* This mode is available for both persistent and non persistent messages.
*
* QOS_ONCE_AND_ONLY_ONCE
+ * ----------------------
*
* This QoS mode ensures messages will reach the destination from the source once and only once.
* (Sometimes this mode is known as "exactly once").
@@ -118,7 +115,6 @@
* may be a good choice depending on your specific application.
*
*
- *
*/
static
@@ -142,6 +138,8 @@
private long failureRetryInterval;
+ private int maxRetries;
+
private int qualityOfServiceMode;
private int maxBatchSize;
@@ -158,9 +156,9 @@
private Object lock;
- private ConnectionFactory cfSource;
+ private ConnectionFactoryFactory sourceCfFactory;
- private ConnectionFactory cfDest;
+ private ConnectionFactoryFactory destCfFactory;
private Connection connSource;
@@ -191,23 +189,27 @@
private boolean manualAck;
private boolean manualCommit;
-
- public Bridge(ConnectionFactory cfSource, ConnectionFactory cfDest,
+
+ /*
+ * This constructor is used when source and destination are on different servers
+ */
+ public Bridge(ConnectionFactoryFactory sourceCfFactory, ConnectionFactoryFactory destCfFactory,
Destination destSource, Destination destDest,
String sourceUsername, String sourcePassword,
String destUsername, String destPassword,
- String selector, long failureRetryInterval,
+ String selector, long failureRetryInterval,
+ int maxRetries,
int qosMode,
int maxBatchSize, long maxBatchTime,
String subName, String clientID)
{
- if (cfSource == null)
+ if (sourceCfFactory == null)
{
- throw new IllegalArgumentException("cfSource cannot be null");
+ throw new IllegalArgumentException("sourceCfFactory cannot be null");
}
- if (cfDest == null)
+ if (destCfFactory == null)
{
- throw new IllegalArgumentException("cfDest cannot be null");
+ throw new IllegalArgumentException("destCfFactory cannot be null");
}
if (destSource == null)
{
@@ -221,26 +223,30 @@
{
throw new IllegalArgumentException("failureRetryInterval must be > 0 or -1 to represent no retry");
}
- if (maxBatchSize < 1 && maxBatchSize != -1)
+ if (maxRetries < 0)
{
- throw new IllegalArgumentException("maxBatchSize must be >= 1 or -1 to represent unlimited batch size");
+ throw new IllegalArgumentException("maxRetries must be >= 0");
}
+ if (failureRetryInterval == -1 && maxRetries > 0)
+ {
+ throw new IllegalArgumentException("If failureRetryInterval == -1 maxRetries must be 0");
+ }
+ if (maxBatchSize < 1)
+ {
+ throw new IllegalArgumentException("maxBatchSize must be >= 1");
+ }
if (maxBatchTime < 1 && maxBatchTime != -1)
{
throw new IllegalArgumentException("maxBatchTime must be >= 1 or -1 to represent unlimited batch time");
}
- if (maxBatchTime == -1 && maxBatchSize == -1)
- {
- throw new IllegalArgumentException("Cannot have unlimited batch size and unlimited batch time");
- }
if (qosMode != QOS_AT_MOST_ONCE && qosMode != QOS_DUPLICATES_OK && qosMode != QOS_ONCE_AND_ONLY_ONCE)
{
throw new IllegalArgumentException("Invalid QoS mode " + qosMode);
}
- this.cfSource = cfSource;
+ this.sourceCfFactory = sourceCfFactory;
- this.cfDest = cfDest;
+ this.destCfFactory = destCfFactory;
this.destSource = destSource;
@@ -258,6 +264,8 @@
this.failureRetryInterval = failureRetryInterval;
+ this.maxRetries = maxRetries;
+
this.qualityOfServiceMode = qosMode;
this.maxBatchSize = maxBatchSize;
@@ -278,9 +286,9 @@
}
}
-
+
// MessagingComponent overrides --------------------------------------------------
-
+
public synchronized void start() throws Exception
{
if (started)
@@ -291,139 +299,33 @@
if (trace) { log.trace("Starting " + this); }
- connSource = createConnection(sourceUsername, sourcePassword, cfSource);
+ boolean ok = setupJMSObjectsWithRetry();
- connDest = createConnection(destUsername, destPassword, cfDest);
-
- if (clientID != null)
- {
- connSource.setClientID(clientID);
- }
-
- Session sess;
-
- if (qualityOfServiceMode == QOS_ONCE_AND_ONLY_ONCE)
- {
- //Create an XASession for consuming from the source
- if (trace) { log.trace("Creating XA source session"); }
- sessSource = ((XAConnection)connSource).createXASession();
+ if (ok)
+ {
+ started = true;
- sess = ((XASession)sessSource).getSession();
- }
- else
- {
- if (trace) { log.trace("Creating non XA source session"); }
-
- //Create a standard session for consuming from the source
-
- //If the QoS is at_most_once, and max batch size is 1 then we use AUTO_ACKNOWLEDGE
- //If the QoS is at_most_once, and max batch size > 1 or -1, then we use CLIENT_ACKNOWLEDGE
- //We could use CLIENT_ACKNOWLEDGE for both the above but AUTO_ACKNOWLEGE may be slightly more
- //performant in some implementations that manually acking every time but it really depends
- //on the implementation.
- //We could also use local transacted for both the above but don't for the same reasons.
-
- //If the QoS is duplicates_ok, we use CLIENT_ACKNOWLEDGE
- //We could use local transacted, whether one is faster than the other probably depends on the
- //messaging implementation but there's probably not much in it
-
- int ackMode;
- if (qualityOfServiceMode == QOS_AT_MOST_ONCE && maxBatchSize == 1)
+ if (maxBatchTime != -1)
{
- ackMode = Session.AUTO_ACKNOWLEDGE;
- }
- else
- {
- ackMode = Session.CLIENT_ACKNOWLEDGE;
+ if (trace) { log.trace("Starting time checker thread"); }
+
+ timeChecker = new BatchTimeChecker();
- manualAck = true;
- }
+ checkerThread = new Thread(timeChecker);
+
+ batchExpiryTime = System.currentTimeMillis() + maxBatchTime;
+
+ checkerThread.start();
+
+ if (trace) { log.trace("Started time checker thread"); }
+ }
- sessSource = connSource.createSession(false, ackMode);
-
- sess = sessSource;
+ if (trace) { log.trace("Started " + this); }
}
-
- if (subName == null)
- {
- if (selector == null)
- {
- consumer = sess.createConsumer(destSource);
- }
- else
- {
- consumer = sess.createConsumer(destSource, selector, false);
- }
- }
else
{
- //Durable subscription
- if (selector == null)
- {
- consumer = sess.createDurableSubscriber((Topic)destSource, subName);
- }
- else
- {
- consumer = sess.createDurableSubscriber((Topic)destSource, subName, selector, false);
- }
+ log.warn("Failed to start bridge");
}
-
- if (qualityOfServiceMode == QOS_ONCE_AND_ONLY_ONCE)
- {
- if (trace) { log.trace("Creating XA dest session"); }
-
- //Create an XA sesion for sending to the destination
-
- sessDest = ((XAConnection)connDest).createXASession();
-
- sess = ((XASession)sessDest).getSession();
- }
- else
- {
- if (trace) { log.trace("Creating non XA dest session"); }
-
- //Create a standard session for sending to the destination
-
- //If maxBatchSize == 1 we just create a non transacted session, otherwise we
- //create a transacted session for the send, since sending the batch in a transaction
- //is likely to be more efficient than sending messages individually
-
- manualCommit = maxBatchSize == 1;
-
- sessDest = connDest.createSession(manualCommit, manualCommit ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
-
- sess = sessDest;
- }
-
- if (qualityOfServiceMode == QOS_ONCE_AND_ONLY_ONCE)
- {
- if (trace) { log.trace("Starting JTA transaction"); }
-
- tx = startTx();
-
- enlistResources(tx);
- }
-
- producer = sess.createProducer(destDest);
-
- consumer.setMessageListener(new SourceListener());
-
- connSource.start();
-
- started = true;
-
- if (maxBatchTime != -1)
- {
- timeChecker = new BatchTimeChecker();
-
- checkerThread = new Thread(timeChecker);
-
- batchExpiryTime = System.currentTimeMillis() + maxBatchTime;
-
- checkerThread.start();
- }
-
- if (trace) { log.trace("Started " + this); }
}
public synchronized void stop() throws Exception
@@ -450,7 +352,11 @@
//This must be outside sync block
if (checkerThread != null)
{
+ if (trace) { log.trace("Waiting for checker thread to finish");}
+
checkerThread.join();
+
+ if (trace) { log.trace("Checker thread has finished"); }
}
connSource.close();
@@ -460,7 +366,11 @@
if (tx != null)
{
//Terminate any transaction
+ if (trace) { log.trace("Rolling back remaining tx"); }
+
tx.rollback();
+
+ if (trace) { log.trace("Rolled back remaining tx"); }
}
if (trace) { log.trace("Stopped " + this); }
@@ -565,11 +475,19 @@
return tm;
}
- private Connection createConnection(String username, String password, ConnectionFactory cf)
+ private Connection createConnection(String username, String password, ConnectionFactoryFactory cff)
throws Exception
{
Connection conn;
+ ConnectionFactory cf = cff.createConnectionFactory();
+
+ if (qualityOfServiceMode == QOS_ONCE_AND_ONLY_ONCE &&
+ !(cf instanceof XAConnectionFactory))
+ {
+ throw new IllegalArgumentException("Connection factory must be XAConnectionFactory");
+ }
+
if (username == null)
{
if (qualityOfServiceMode == QOS_ONCE_AND_ONLY_ONCE)
@@ -598,106 +516,296 @@
}
return conn;
}
-
- private void sendBatch()
+
+ private boolean setupJMSObjects()
{
- if (trace) { log.trace("Sending batch of " + messages.size() + " messages"); }
-
- synchronized (lock)
+ try
{
- if (paused)
+ connSource = createConnection(sourceUsername, sourcePassword, sourceCfFactory);
+
+ connDest = createConnection(destUsername, destPassword, destCfFactory);
+
+ if (clientID != null)
{
- //Don't send now
- if (trace) { log.trace("Paused, so not sending now"); }
-
- return;
+ connSource.setClientID(clientID);
}
+
+ Session sess;
- if (qualityOfServiceMode == QOS_AT_MOST_ONCE)
+ if (qualityOfServiceMode == QOS_ONCE_AND_ONLY_ONCE)
{
- //We ack before we send
- if (manualAck)
+ //Create an XASession for consuming from the source
+ if (trace) { log.trace("Creating XA source session"); }
+ sessSource = ((XAConnection)connSource).createXASession();
+
+ sess = ((XASession)sessSource).getSession();
+ }
+ else
+ {
+ if (trace) { log.trace("Creating non XA source session"); }
+
+ //Create a standard session for consuming from the source
+
+ //If the QoS is at_most_once, and max batch size is 1 then we use AUTO_ACKNOWLEDGE
+ //If the QoS is at_most_once, and max batch size > 1 or -1, then we use CLIENT_ACKNOWLEDGE
+ //We could use CLIENT_ACKNOWLEDGE for both the above but AUTO_ACKNOWLEGE may be slightly more
+ //performant in some implementations that manually acking every time but it really depends
+ //on the implementation.
+ //We could also use local transacted for both the above but don't for the same reasons.
+
+ //If the QoS is duplicates_ok, we use CLIENT_ACKNOWLEDGE
+ //We could use local transacted, whether one is faster than the other probably depends on the
+ //messaging implementation but there's probably not much in it
+
+ int ackMode;
+ if (qualityOfServiceMode == QOS_AT_MOST_ONCE && maxBatchSize == 1)
{
- //Ack on the last message
- try
- {
- ((Message)messages.getLast()).acknowledge();
- }
- catch (Throwable t)
- {
- //Deal with this
- t.printStackTrace();
- }
+ ackMode = Session.AUTO_ACKNOWLEDGE;
}
+ else
+ {
+ ackMode = Session.CLIENT_ACKNOWLEDGE;
+
+ manualAck = true;
+ }
+
+ sessSource = connSource.createSession(false, ackMode);
+
+ sess = sessSource;
}
+
+ if (subName == null)
+ {
+ if (selector == null)
+ {
+ consumer = sess.createConsumer(destSource);
+ }
+ else
+ {
+ consumer = sess.createConsumer(destSource, selector, false);
+ }
+ }
+ else
+ {
+ //Durable subscription
+ if (selector == null)
+ {
+ consumer = sess.createDurableSubscriber((Topic)destSource, subName);
+ }
+ else
+ {
+ consumer = sess.createDurableSubscriber((Topic)destSource, subName, selector, false);
+ }
+ }
- //Now send the message(s)
+ if (qualityOfServiceMode == QOS_ONCE_AND_ONLY_ONCE)
+ {
+ if (trace) { log.trace("Creating XA dest session"); }
- Iterator iter = messages.iterator();
+ //Create an XA sesion for sending to the destination
+
+ sessDest = ((XAConnection)connDest).createXASession();
+
+ sess = ((XASession)sessDest).getSession();
+ }
+ else
+ {
+ if (trace) { log.trace("Creating non XA dest session"); }
+
+ //Create a standard session for sending to the destination
+
+ //If maxBatchSize == 1 we just create a non transacted session, otherwise we
+ //create a transacted session for the send, since sending the batch in a transaction
+ //is likely to be more efficient than sending messages individually
+
+ manualCommit = maxBatchSize == 1;
+
+ sessDest = connDest.createSession(manualCommit, manualCommit ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
+
+ sess = sessDest;
+ }
- Message msg = null;
+ if (qualityOfServiceMode == QOS_ONCE_AND_ONLY_ONCE)
+ {
+ if (trace) { log.trace("Starting JTA transaction"); }
+
+ tx = startTx();
+
+ enlistResources(tx);
+ }
- while (iter.hasNext())
+ producer = sess.createProducer(destDest);
+
+ consumer.setMessageListener(new SourceListener());
+
+ connSource.start();
+
+ return true;
+ }
+ catch (Exception e)
+ {
+ log.warn("Failed to set up connections", e);
+
+ //If this fails we should attempt to cleanup or we might end up in some weird state
+
+ cleanup();
+
+ return false;
+ }
+ }
+
+ private void cleanup()
+ {
+ if (tx != null)
+ {
+ try
{
- msg = (Message)iter.next();
+ delistResources(tx);
+ }
+ catch (Throwable ignore)
+ {
+ }
+ try
+ {
+ //Terminate the tx
+ tx.rollback();
+ }
+ catch (Throwable ignore)
+ {
+ }
+ }
+
+ //Close the old objects
+ try
+ {
+ connSource.close();
+ }
+ catch (Throwable ignore)
+ {
+ }
+ try
+ {
+ connDest.close();
+ }
+ catch (Throwable ignore)
+ {
+ }
+ }
+
+ private void pause(long interval)
+ {
+ long start = System.currentTimeMillis();
+ while (System.currentTimeMillis() - start < failureRetryInterval)
+ {
+ try
+ {
+ Thread.sleep(failureRetryInterval);
+ }
+ catch (InterruptedException ex)
+ {
+ }
+ }
+ }
+
+ private boolean setupJMSObjectsWithRetry()
+ {
+ if (trace) { log.trace("Setting up connections"); }
+
+ int count = 0;
+
+ while (true)
+ {
+ boolean ok = setupJMSObjects();
+
+ if (ok)
+ {
+ return true;
+ }
+
+ count++;
+
+ if (maxRetries != -1 && count == maxRetries)
+ {
+ break;
+ }
+
+ log.warn("Failed to set up connections, will retry after a pause of " + failureRetryInterval);
+
+ pause(failureRetryInterval);
+ }
+
+ //If we get here then we exceed maxRetries
+ return false;
+ }
+
+ /*
+ * If one of the JMS operations fail, then we try and lookup the connection factories, create
+ * the connections and retry, up to a certain number of times
+ */
+ private void sendBatch()
+ {
+ if (trace) { log.trace("Sending batch of " + messages.size() + " messages"); }
+
+ synchronized (lock)
+ {
+ try
+ {
+ if (paused)
+ {
+ //Don't send now
+ if (trace) { log.trace("Paused, so not sending now"); }
+
+ return;
+ }
- try
+ if (qualityOfServiceMode == QOS_AT_MOST_ONCE)
{
+ //We ack before we send
+ if (manualAck)
+ {
+ //Ack on the last message
+ ((Message)messages.getLast()).acknowledge();
+ }
+ }
+
+ //Now send the message(s)
+
+ Iterator iter = messages.iterator();
+
+ Message msg = null;
+
+ while (iter.hasNext())
+ {
+ msg = (Message)iter.next();
+
if (trace) { log.trace("Sending message " + msg); }
producer.send(msg);
- if (trace) { log.trace("Sent message " + msg); }
+ if (trace) { log.trace("Sent message " + msg); }
}
- catch (Throwable t)
+
+ if (qualityOfServiceMode == QOS_DUPLICATES_OK)
{
- //Failed to send - deal with retries
- t.printStackTrace();
- }
- }
-
- if (qualityOfServiceMode == QOS_DUPLICATES_OK)
- {
- //We ack the source message(s) after sending
-
- if (manualAck)
- {
- try
- {
+ //We ack the source message(s) after sending
+
+ if (manualAck)
+ {
//Ack on the last message
((Message)messages.getLast()).acknowledge();
- }
- catch (Throwable t)
- {
- //Deal with this
- t.printStackTrace();
- }
- }
- }
-
- //Now we commit the sending session if necessary
- if (manualCommit)
- {
- try
+ }
+ }
+
+ //Now we commit the sending session if necessary
+ if (manualCommit)
{
sessDest.commit();
}
- catch (Throwable t)
+
+ if (qualityOfServiceMode == QOS_ONCE_AND_ONLY_ONCE)
{
- //Deal with this
- t.printStackTrace();
- }
- }
-
- messages.clear();
-
- if (qualityOfServiceMode == QOS_ONCE_AND_ONLY_ONCE)
- {
- //Commit the JTA transaction and start another
-
- //XA
- try
- {
+ //Commit the JTA transaction and start another
+
delistResources(tx);
if (trace) { log.trace("Committing JTA transaction"); }
@@ -710,13 +818,61 @@
enlistResources(tx);
}
- catch (Throwable t)
+
+ //Clear the messages
+ messages.clear();
+ }
+ catch (Exception e)
+ {
+ log.warn("Failed to send + acknowledge batch, closing JMS objects", e);
+
+ /*
+ * If an Exception occurs in attempting to send / ack the batch, this might be due
+ * to a network problem on either the source or destination connection.
+ * If it was on the source connection then the server has probably NACKed the unacked
+ * messages back to the destination anyway.
+ * If the failure occurred during 2PC commit protocol then the participants may or may not
+ * have reached the prepared state, if they do then the tx will commit at some time during
+ * recovery.
+ *
+ * So we can safely close the dead connections, without fear of stepping outside our
+ * QoS guarantee.
+ *
+ *
+ */
+
+ //Clear the messages
+ messages.clear();
+
+ cleanup();
+
+ boolean ok = false;
+
+ if (maxRetries > 0 || maxRetries == -1)
{
- //Deal with this
- t.printStackTrace();
+ log.warn("Will try and re-set up connections after a pause of " + failureRetryInterval);
+
+ pause(this.failureRetryInterval);
+
+ //Now we try
+ ok = setupJMSObjectsWithRetry();
}
- }
-
+
+ if (!ok)
+ {
+ //We haven't managed to recreate connections or maxRetries = 0
+ log.warn("Unable to set up connections, bridge will be stopped");
+
+ try
+ {
+ stop();
+ }
+ catch (Exception ignore)
+ {
+ }
+ }
+
+ }
}
}
@@ -781,10 +937,12 @@
messages.add(msg);
- batchExpiryTime = System.currentTimeMillis() + maxBatchTime;
+ batchExpiryTime = System.currentTimeMillis() + maxBatchTime;
if (trace) { log.trace(this + " rescheduled batchExpiryTime to " + batchExpiryTime); }
+ if (trace) { log.trace("max Batch Size is " + maxBatchSize); }
+
if (maxBatchSize != -1 && messages.size() >= maxBatchSize)
{
if (trace) { log.trace(this + " maxBatchSize has been reached so sending batch"); }
Modified: trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTest.java 2007-01-11 18:13:23 UTC (rev 1961)
+++ trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTest.java 2007-01-11 21:02:31 UTC (rev 1962)
@@ -25,6 +25,7 @@
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
@@ -34,6 +35,9 @@
import javax.naming.InitialContext;
import org.jboss.jms.server.bridge.Bridge;
+import org.jboss.jms.server.bridge.ConnectionFactoryFactory;
+import org.jboss.jms.server.bridge.JNDIConnectionFactoryFactory;
+import org.jboss.logging.Logger;
import org.jboss.test.messaging.MessagingTestCase;
import org.jboss.test.messaging.tools.ServerManagement;
@@ -48,6 +52,8 @@
*/
public class BridgeTest extends MessagingTestCase
{
+ private static final Logger log = Logger.getLogger(BridgeTest.class);
+
public BridgeTest(String name)
{
super(name);
@@ -63,68 +69,219 @@
super.tearDown();
}
- public void testMaxBatchSizeNoMaxBatchTime_AtMostOnce() throws Exception
+ // MaxBatchSize but no MaxBatchTime
+
+ public void testNoMaxBatchTime_AtMostOnce_P() throws Exception
{
if (!ServerManagement.isRemote())
{
return;
}
- testMaxBatchSizeNoMaxBatchTime(Bridge.QOS_AT_MOST_ONCE);
+ testNoMaxBatchTime(Bridge.QOS_AT_MOST_ONCE, true);
}
- public void testMaxBatchSizeNoMaxBatchTime_DuplicatesOk() throws Exception
+ public void testNoMaxBatchTime_DuplicatesOk_P() throws Exception
{
if (!ServerManagement.isRemote())
{
return;
}
- testMaxBatchSizeNoMaxBatchTime(Bridge.QOS_DUPLICATES_OK);
+ testNoMaxBatchTime(Bridge.QOS_DUPLICATES_OK, true);
}
- public void testMaxBatchSizeNoMaxBatchTime_OnceAndOnlyOnce() throws Exception
+ public void testNoMaxBatchTime_OnceAndOnlyOnce_P() throws Exception
{
if (!ServerManagement.isRemote())
{
return;
}
- testMaxBatchSizeNoMaxBatchTime(Bridge.QOS_ONCE_AND_ONLY_ONCE);
+ testNoMaxBatchTime(Bridge.QOS_ONCE_AND_ONLY_ONCE, true);
}
+ public void testNoMaxBatchTime_AtMostOnce_NP() throws Exception
+ {
+ if (!ServerManagement.isRemote())
+ {
+ return;
+ }
+ testNoMaxBatchTime(Bridge.QOS_AT_MOST_ONCE, false);
+ }
- public void testMaxBatchTimeNoMaxBatchSize_AtMostOnce() throws Exception
+ public void testNoMaxBatchTime_DuplicatesOk_NP() throws Exception
{
if (!ServerManagement.isRemote())
{
return;
}
- this.testMaxBatchTimeNoMaxBatchSize(Bridge.QOS_AT_MOST_ONCE);
+ testNoMaxBatchTime(Bridge.QOS_DUPLICATES_OK, false);
}
- public void testMaxBatchTimeNoMaxBatchSize_DuplicatesOk() throws Exception
+ public void testNoMaxBatchTime_OnceAndOnlyOnce_NP() throws Exception
{
if (!ServerManagement.isRemote())
{
return;
}
- this.testMaxBatchTimeNoMaxBatchSize(Bridge.QOS_DUPLICATES_OK);
+ testNoMaxBatchTime(Bridge.QOS_ONCE_AND_ONLY_ONCE, false);
}
- public void testMaxBatchTimeNoMaxBatchSize_OnceAndOnlyOnce() throws Exception
+
+ // MaxBatchTime but no MaxBatchSize
+
+ public void testMaxBatchTime_AtMostOnce_P() throws Exception
{
if (!ServerManagement.isRemote())
{
return;
}
- testMaxBatchTimeNoMaxBatchSize(Bridge.QOS_ONCE_AND_ONLY_ONCE);
+ this.testMaxBatchTime(Bridge.QOS_AT_MOST_ONCE, true);
}
+
+ public void testMaxBatchTime_DuplicatesOk_P() throws Exception
+ {
+ if (!ServerManagement.isRemote())
+ {
+ return;
+ }
+ this.testMaxBatchTime(Bridge.QOS_DUPLICATES_OK, true);
+ }
+
+ public void testMaxBatchTime_OnceAndOnlyOnce_P() throws Exception
+ {
+ if (!ServerManagement.isRemote())
+ {
+ return;
+ }
+ testMaxBatchTime(Bridge.QOS_ONCE_AND_ONLY_ONCE, true);
+ }
+
+ public void testMaxBatchTime_AtMostOnce_NP() throws Exception
+ {
+ if (!ServerManagement.isRemote())
+ {
+ return;
+ }
+ this.testMaxBatchTime(Bridge.QOS_AT_MOST_ONCE, false);
+ }
+
+ public void testMaxBatchTime_DuplicatesOk_NP() throws Exception
+ {
+ if (!ServerManagement.isRemote())
+ {
+ return;
+ }
+ this.testMaxBatchTime(Bridge.QOS_DUPLICATES_OK, false);
+ }
+
+ public void testMaxBatchTime_OnceAndOnlyOnce_NP() throws Exception
+ {
+ if (!ServerManagement.isRemote())
+ {
+ return;
+ }
+ testMaxBatchTime(Bridge.QOS_ONCE_AND_ONLY_ONCE, false);
+ }
+
+
+ // Stress
+
+ public void testStress_AtMostOnce_P() throws Exception
+ {
+ if (!ServerManagement.isRemote())
+ {
+ return;
+ }
+ testStress(Bridge.QOS_AT_MOST_ONCE, true);
+ }
+
+ public void testStress_DuplicatesOk_P() throws Exception
+ {
+ if (!ServerManagement.isRemote())
+ {
+ return;
+ }
+ testStress(Bridge.QOS_DUPLICATES_OK, true);
+ }
+
+ public void testStress_OnceAndOnlyOnce_P() throws Exception
+ {
+ if (!ServerManagement.isRemote())
+ {
+ return;
+ }
+ testStress(Bridge.QOS_ONCE_AND_ONLY_ONCE, true);
+ }
+
+ public void testStress_AtMostOnce_NP() throws Exception
+ {
+ if (!ServerManagement.isRemote())
+ {
+ return;
+ }
+ testStress(Bridge.QOS_AT_MOST_ONCE, false);
+ }
+
+ public void testStress_DuplicatesOk_NP() throws Exception
+ {
+ if (!ServerManagement.isRemote())
+ {
+ return;
+ }
+ testStress(Bridge.QOS_DUPLICATES_OK, false);
+ }
+
+ public void testStress_OnceAndOnlyOnce_NP() throws Exception
+ {
+ if (!ServerManagement.isRemote())
+ {
+ return;
+ }
+ testStress(Bridge.QOS_ONCE_AND_ONLY_ONCE, false);
+ }
+
+
+
+ private static class Sender implements Runnable
+ {
+ int numMessages;
- private void testMaxBatchSizeNoMaxBatchTime(int qosMode) throws Exception
+ Session sess;
+
+ MessageProducer prod;
+
+ Exception ex;
+
+ public void run()
+ {
+ try
+ {
+ for (int i = 0; i < numMessages; i++)
+ {
+ TextMessage tm = sess.createTextMessage("message" + i);
+
+ prod.send(tm);
+
+ log.trace("Sent message " + i);
+ }
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to send", e);
+ ex = e;
+ }
+ }
+
+ }
+
+ private void testStress(int qosMode, boolean persistent) throws Exception
{
Connection connSource = null;
Connection connDest = null;
Bridge bridge = null;
+
+ Thread t = null;
try
{
@@ -139,7 +296,11 @@
Hashtable props0 = ServerManagement.getJNDIEnvironment(0);
Hashtable props1 = ServerManagement.getJNDIEnvironment(1);
-
+
+ ConnectionFactoryFactory cff0 = new JNDIConnectionFactoryFactory(props0, "/ConnectionFactory");
+
+ ConnectionFactoryFactory cff1 = new JNDIConnectionFactoryFactory(props1, "/ConnectionFactory");
+
InitialContext ic0 = new InitialContext(props0);
InitialContext ic1 = new InitialContext(props1);
@@ -152,12 +313,169 @@
Queue destQueue = (Queue)ic1.lookup("/queue/destQueue");
+ final int BATCH_SIZE = 50;
+
+ bridge = new Bridge(cff0, cff1, sourceQueue, destQueue,
+ null, null, null, null,
+ null, 5000, 10, qosMode,
+ BATCH_SIZE, -1,
+ null, null);
+
+ bridge.start();
+
+ connSource = cf0.createConnection();
+
+ connDest = cf1.createConnection();
+
+ Session sessSend = connSource.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod = sessSend.createProducer(sourceQueue);
+
+ final int NUM_MESSAGES = 2000;
+
+ Sender sender = new Sender();
+ sender.sess = sessSend;
+ sender.prod = prod;
+ sender.numMessages = NUM_MESSAGES;
+ prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+ Session sessRec = connDest.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer cons = sessRec.createConsumer(destQueue);
+
+ connDest.start();
+
+ t = new Thread(sender);
+
+ t.start();
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons.receive(5000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+
+ Message m = cons.receive(1000);
+
+ assertNull(m);
+
+ t.join();
+
+ if (sender.ex != null)
+ {
+ //An error occurred during the send
+ throw sender.ex;
+ }
+
+ }
+ finally
+ {
+ if (t != null)
+ {
+ t.join(10000);
+ }
+
+ if (connSource != null)
+ {
+ try
+ {
+ connSource.close();
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to close connection", e);
+ }
+ }
+
+ if (connDest != null)
+ {
+ try
+ {
+ connDest.close();
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to close connection", e);
+ }
+ }
+
+ if (bridge != null)
+ {
+ bridge.stop();
+ }
+
+ try
+ {
+ ServerManagement.undeployQueue("sourceQueue", 0);
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to undeploy", e);
+ }
+
+ try
+ {
+ ServerManagement.undeployQueue("destQueue", 1);
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to undeploy", e);
+ }
+
+ ServerManagement.stop(0);
+
+ ServerManagement.stop(1);
+ }
+ }
+
+
+ private void testNoMaxBatchTime(int qosMode, boolean persistent) throws Exception
+ {
+ Connection connSource = null;
+
+ Connection connDest = null;
+
+ Bridge bridge = null;
+
+ try
+ {
+ ServerManagement.start(0, "all", null, true);
+
+ ServerManagement.start(1, "all", null, false);
+
+ ServerManagement.deployQueue("sourceQueue", 0);
+
+ ServerManagement.deployQueue("destQueue", 1);
+
+ Hashtable props0 = ServerManagement.getJNDIEnvironment(0);
+
+ Hashtable props1 = ServerManagement.getJNDIEnvironment(1);
+
+ ConnectionFactoryFactory cff0 = new JNDIConnectionFactoryFactory(props0, "/ConnectionFactory");
+
+ ConnectionFactoryFactory cff1 = new JNDIConnectionFactoryFactory(props1, "/ConnectionFactory");
+
+ InitialContext ic0 = new InitialContext(props0);
+
+ InitialContext ic1 = new InitialContext(props1);
+
+ ConnectionFactory cf0 = (ConnectionFactory)ic0.lookup("/ConnectionFactory");
+
+ ConnectionFactory cf1 = (ConnectionFactory)ic1.lookup("/ConnectionFactory");
+
+ Queue sourceQueue = (Queue)ic0.lookup("/queue/sourceQueue");
+
+ Queue destQueue = (Queue)ic1.lookup("/queue/destQueue");
+
final int BATCH_SIZE = 10;
- bridge = new Bridge(cf0, cf1, sourceQueue, destQueue,
+ bridge = new Bridge(cff0, cff1, sourceQueue, destQueue,
null, null, null, null,
- null, 0, qosMode,
- 10, -1,
+ null, 5000, 10, qosMode,
+ BATCH_SIZE, -1,
null, null);
bridge.start();
@@ -170,6 +488,8 @@
MessageProducer prod = sessSend.createProducer(sourceQueue);
+ prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
//Send half the messges
for (int i = 0; i < BATCH_SIZE / 2; i++)
@@ -276,12 +596,7 @@
m = cons2.receive(1000);
- assertNull(m);
-
- connSource.close();
-
- connDest.close();
-
+ assertNull(m);
}
finally
{
@@ -338,7 +653,7 @@
}
}
- private void testMaxBatchTimeNoMaxBatchSize(int qosMode) throws Exception
+ private void testMaxBatchTime(int qosMode, boolean persistent) throws Exception
{
Connection connSource = null;
@@ -359,6 +674,10 @@
Hashtable props0 = ServerManagement.getJNDIEnvironment(0);
Hashtable props1 = ServerManagement.getJNDIEnvironment(1);
+
+ ConnectionFactoryFactory cff0 = new JNDIConnectionFactoryFactory(props0, "/ConnectionFactory");
+
+ ConnectionFactoryFactory cff1 = new JNDIConnectionFactoryFactory(props1, "/ConnectionFactory");
InitialContext ic0 = new InitialContext(props0);
@@ -374,10 +693,12 @@
final long MAX_BATCH_TIME = 3000;
- bridge = new Bridge(cf0, cf1, sourceQueue, destQueue,
+ final int MAX_BATCH_SIZE = 100000; // something big so it won't reach it
+
+ bridge = new Bridge(cff0, cff1, sourceQueue, destQueue,
null, null, null, null,
- null, 0, qosMode,
- -1, MAX_BATCH_TIME,
+ null, 5000, 10, qosMode,
+ MAX_BATCH_SIZE, MAX_BATCH_TIME,
null, null);
bridge.start();
@@ -390,6 +711,8 @@
MessageProducer prod = sessSend.createProducer(sourceQueue);
+ prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
final int NUM_MESSAGES = 10;
//Send some message
@@ -442,10 +765,6 @@
assertNull(m);
- connSource.close();
-
- connDest.close();
-
}
finally
{
More information about the jboss-cvs-commits
mailing list