[jboss-cvs] JBoss Messaging SVN: r1957 - in trunk: tests/src/org/jboss/test/messaging/jms/bridge and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Jan 11 07:55:53 EST 2007
Author: timfox
Date: 2007-01-11 07:55:47 -0500 (Thu, 11 Jan 2007)
New Revision: 1957
Modified:
trunk/src/main/org/jboss/jms/server/bridge/Bridge.java
trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTest.java
Log:
More bridge work
Modified: trunk/src/main/org/jboss/jms/server/bridge/Bridge.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/bridge/Bridge.java 2007-01-11 03:56:53 UTC (rev 1956)
+++ trunk/src/main/org/jboss/jms/server/bridge/Bridge.java 2007-01-11 12:55:47 UTC (rev 1957)
@@ -22,8 +22,8 @@
package org.jboss.jms.server.bridge;
import java.util.ArrayList;
-import java.util.Hashtable;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import javax.jms.Connection;
@@ -33,19 +33,17 @@
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.QueueConnection;
-import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.Topic;
-import javax.jms.TopicConnection;
-import javax.jms.TopicSession;
-import javax.naming.InitialContext;
+import javax.jms.XAConnection;
+import javax.jms.XAConnectionFactory;
+import javax.jms.XASession;
+import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
+import javax.transaction.xa.XAResource;
import org.jboss.logging.Logger;
import org.jboss.messaging.core.plugin.contract.MessagingComponent;
-import org.jboss.tm.TransactionManagerLocator;
/**
*
@@ -63,25 +61,73 @@
private static boolean trace;
+ //Transaction modes
+
+// public static final int TRANSACTIONS_NONE = 0;
+//
+// public static final int TRANSACTIONS_LOCAL = 1;
+//
+// public static final int TRANSACTIONS_XA = 2;
+
+ //Quality of service modes
+
+ public static final int QOS_AT_MOST_ONCE = 0;
+
+ public static final int QOS_DUPLICATES_OK = 1;
+
+ public static final int QOS_ONCE_AND_ONLY_ONCE = 2;
+
+ /*
+ * QOS_AT_MOST_ONCE
+ *
+ * With this QoS mode messages will reach the destination from the source at most once.
+ * The messages are consumed from the source and acknowledged
+ * before sending to the destination. Therefore there is a possibility that if failure occurs between
+ * removing them from the source and them arriving at the destination they could be lost. Hence delivery
+ * will occur at most once.
+ * This mode is avilable for both persistent and non persistent messages.
+ *
+ * QOS_DUPLICATES_OK
+ *
+ * With this QoS mode, the messages are consumed from the source and then acknowledged
+ * after they have been successfully sent to the destination. Therefore there is a possibility that if
+ * failure occurs after sending to the destination but before acknowledging them, they could be sent again
+ * when the system recovers. I.e. the destination might receive duplicates after a failure.
+ * This mode is available for both persistent and non persistent messages.
+ *
+ * QOS_ONCE_AND_ONLY_ONCE
+ *
+ * This QoS mode ensures messages will reach the destination from the source once and only once.
+ * (Sometimes this mode is known as "exactly once").
+ * If both the source and the destination are on the same JBoss Messaging server instance then this can
+ * be achieved by sending and acknowledging the messages in the same local transaction.
+ * If the source and destination are on different servers this is achieved by enlisting the sending and consuming
+ * sessions in a JTA transaction. The JTA transaction is controlled by JBoss Transactions JTA implementation which
+ * is a fully recovering transaction manager, thus providing a very high degree of durability.
+ * If JTA is required then both supplied connection factories need to be XAConnectionFactory implementations.
+ * This mode is only available for persistent messages.
+ * This is likely to be the slowest mode since it requires extra persistence for the transaction logging.
+ *
+ * Note:
+ * For a specific application it may possible to provide once and only once semantics without using the
+ * QOS_ONCE_AND_ONLY_ONCE QoS level. This can be done by using the QOS_DUPLICATES_OK mode and then checking for
+ * duplicates at the destination and discarding them. Some JMS servers provide automatic duplicate message detection
+ * functionality, or this may be possible to implement on the application level by maintaining a cache of received
+ * message ids on disk and comparing received messages to them. The cache would only be valid
+ * for a certain period of time so this approach is not as watertight as using QOS_ONCE_AND_ONLY_ONCE but
+ * may be a good choice depending on your specific application.
+ *
+ *
+ *
+ */
+
static
{
log = Logger.getLogger(Bridge.class);
trace = log.isTraceEnabled();
}
-
- private Hashtable sourceJNDIProperties;
-
- private Hashtable destJNDIProperties;
-
- private String sourceConnectionFactoryLookup;
-
- private String destConnectionFactoryLookup;
-
- private String sourceDestinationLookup;
-
- private String destDestinationLookup;
-
+
private String sourceUsername;
private String sourcePassword;
@@ -96,10 +142,8 @@
private long failureRetryInterval;
- private boolean transactional;
+ private int qualityOfServiceMode;
- private boolean XA;
-
private int maxBatchSize;
private long maxBatchTime;
@@ -110,19 +154,22 @@
private boolean started;
- private List messages;
+ private LinkedList messages;
private Object lock;
- //Needed since we use the 1.0.2 JMS API so we can work with 1.0.2 providers
- private boolean sourceIsTopic;
+ private ConnectionFactory cfSource;
- private boolean destIsTopic;
+ private ConnectionFactory cfDest;
private Connection connSource;
private Connection connDest;
+ private Destination destSource;
+
+ private Destination destDest;
+
private Session sessSource;
private Session sessDest;
@@ -139,45 +186,37 @@
private boolean paused;
- private InitialContext icSource;
+ private Transaction tx;
- private InitialContext icDest;
+ private boolean manualAck;
+ private boolean manualCommit;
- public Bridge(Hashtable sourceJNDIProperties, Hashtable destJNDIProperties,
- String sourceConnectionFactoryLookup, String destConnectionFactoryLookup,
- String sourceDestinationLookup, String destDestinationLookup,
+ public Bridge(ConnectionFactory cfSource, ConnectionFactory cfDest,
+ Destination destSource, Destination destDest,
String sourceUsername, String sourcePassword,
String destUsername, String destPassword,
- String selector, long failureRetryInterval, boolean transactional,
- boolean XA, int maxBatchSize, long maxBatchTime,
- String subName, String clientID,
- boolean sourceIsTopic, boolean destIsTopic)
+ String selector, long failureRetryInterval,
+ int qosMode,
+ int maxBatchSize, long maxBatchTime,
+ String subName, String clientID)
{
- if (sourceJNDIProperties == null)
+ if (cfSource == null)
{
- throw new IllegalArgumentException("sourceJNDIProperties cannot be null");
+ throw new IllegalArgumentException("cfSource cannot be null");
}
- if (destJNDIProperties == null)
+ if (cfDest == null)
{
- throw new IllegalArgumentException("destJNDIProperties cannot be null");
+ throw new IllegalArgumentException("cfDest cannot be null");
}
- if (sourceConnectionFactoryLookup == null)
+ if (destSource == null)
{
- throw new IllegalArgumentException("sourceConnectionFactoryLookup cannot be null");
+ throw new IllegalArgumentException("destSource cannot be null");
}
- if (destConnectionFactoryLookup == null)
+ if (destDest == null)
{
- throw new IllegalArgumentException("destConnectionFactoryLookup cannot be null");
+ throw new IllegalArgumentException("destDest cannot be null");
}
- if (sourceDestinationLookup == null)
- {
- throw new IllegalArgumentException("sourceDestinationLookup cannot be null");
- }
- if (destDestinationLookup == null)
- {
- throw new IllegalArgumentException("destDestinationLookup cannot be null");
- }
if (failureRetryInterval < 0 && failureRetryInterval != -1)
{
throw new IllegalArgumentException("failureRetryInterval must be > 0 or -1 to represent no retry");
@@ -194,19 +233,19 @@
{
throw new IllegalArgumentException("Cannot have unlimited batch size and unlimited batch time");
}
+ if (qosMode != QOS_AT_MOST_ONCE && qosMode != QOS_DUPLICATES_OK && qosMode != QOS_ONCE_AND_ONLY_ONCE)
+ {
+ throw new IllegalArgumentException("Invalid QoS mode " + qosMode);
+ }
- this.sourceJNDIProperties = sourceJNDIProperties;
+ this.cfSource = cfSource;
- this.destJNDIProperties = destJNDIProperties;
+ this.cfDest = cfDest;
- this.sourceConnectionFactoryLookup = sourceConnectionFactoryLookup;
+ this.destSource = destSource;
- this.destConnectionFactoryLookup = destConnectionFactoryLookup;
+ this.destDest = destDest;
- this.sourceDestinationLookup = sourceDestinationLookup;
-
- this.destDestinationLookup = destDestinationLookup;
-
this.sourceUsername = sourceUsername;
this.sourcePassword = sourcePassword;
@@ -219,24 +258,18 @@
this.failureRetryInterval = failureRetryInterval;
- this.transactional = transactional;
+ this.qualityOfServiceMode = qosMode;
- this.XA = XA;
-
this.maxBatchSize = maxBatchSize;
this.maxBatchTime = maxBatchTime;
-
+
this.subName = subName;
this.clientID = clientID;
- this.sourceIsTopic = sourceIsTopic;
+ this.messages = new LinkedList();
- this.destIsTopic = destIsTopic;
-
- this.messages = new ArrayList(maxBatchSize);
-
this.lock = new Object();
if (trace)
@@ -245,23 +278,9 @@
}
}
- private TransactionManager getTm()
- {
- if (tm == null)
- {
- tm = TransactionManagerLocator.getInstance().locate();
-
- if (tm == null)
- {
- throw new IllegalStateException("Cannot locate a transaction manager");
- }
- }
-
- return tm;
- }
// MessagingComponent overrides --------------------------------------------------
-
+
public synchronized void start() throws Exception
{
if (started)
@@ -270,117 +289,128 @@
return;
}
- if (trace) { log.trace("Starting " + this); }
+ if (trace) { log.trace("Starting " + this); }
- icSource = new InitialContext(sourceJNDIProperties);
+ connSource = createConnection(sourceUsername, sourcePassword, cfSource);
- icDest = new InitialContext(destJNDIProperties);
+ connDest = createConnection(destUsername, destPassword, cfDest);
- ConnectionFactory cfSource = (ConnectionFactory)icSource.lookup(sourceConnectionFactoryLookup);
-
- ConnectionFactory cfDest = (ConnectionFactory)icDest.lookup(destConnectionFactoryLookup);
-
- Destination sourceDest = (Destination)icSource.lookup(sourceDestinationLookup);
-
- Destination destDest = (Destination)icDest.lookup(destDestinationLookup);
-
- if (sourceUsername == null)
+ if (clientID != null)
{
- connSource = cfSource.createConnection();
+ connSource.setClientID(clientID);
}
- else
- {
- connSource = cfSource.createConnection(sourceUsername, sourcePassword);
- }
+
+ Session sess;
- if (destUsername == null)
+ if (qualityOfServiceMode == QOS_ONCE_AND_ONLY_ONCE)
{
- connDest = cfDest.createConnection();
+ //Create an XASession for consuming from the source
+ if (trace) { log.trace("Creating XA source session"); }
+ sessSource = ((XAConnection)connSource).createXASession();
+
+ sess = ((XASession)sessSource).getSession();
}
else
{
- connDest = cfDest.createConnection(destUsername, destPassword);
+ if (trace) { log.trace("Creating non XA source session"); }
+
+ //Create a standard session for consuming from the source
+
+ //If the QoS is at_most_once, and max batch size is 1 then we use AUTO_ACKNOWLEDGE
+ //If the QoS is at_most_once, and max batch size > 1 or -1, then we use CLIENT_ACKNOWLEDGE
+ //We could use CLIENT_ACKNOWLEDGE for both the above but AUTO_ACKNOWLEGE may be slightly more
+ //performant in some implementations that manually acking every time but it really depends
+ //on the implementation.
+ //We could also use local transacted for both the above but don't for the same reasons.
+
+ //If the QoS is duplicates_ok, we use CLIENT_ACKNOWLEDGE
+ //We could use local transacted, whether one is faster than the other probably depends on the
+ //messaging implementation but there's probably not much in it
+
+ int ackMode;
+ if (qualityOfServiceMode == QOS_AT_MOST_ONCE && maxBatchSize == 1)
+ {
+ ackMode = Session.AUTO_ACKNOWLEDGE;
+ }
+ else
+ {
+ ackMode = Session.CLIENT_ACKNOWLEDGE;
+
+ manualAck = true;
+ }
+
+ sessSource = connSource.createSession(false, ackMode);
+
+ sess = sessSource;
}
-
- if (clientID != null)
+
+ if (subName == null)
{
- connDest.setClientID(clientID);
- }
-
- //Note we use the JMS 1.0.2 API so we can interoperate with JMS providers that don't support
- //JMS 1.1
- if (sourceIsTopic)
- {
- sessSource =
- ((TopicConnection)connSource).createTopicSession(transactional, transactional ? Session.SESSION_TRANSACTED : Session.CLIENT_ACKNOWLEDGE);
-
- if (subName == null)
+ if (selector == null)
{
- //Non durable
- if (selector == null)
- {
- consumer = ((TopicSession)sessSource).createSubscriber((Topic)sourceDest);
- }
- else
- {
- consumer = ((TopicSession)sessSource).createSubscriber((Topic)sourceDest, selector, false);
- }
+ consumer = sess.createConsumer(destSource);
}
else
{
- //Durable
- if (selector == null)
- {
- consumer = ((TopicSession)sessSource).createDurableSubscriber((Topic)sourceDest, subName);
- }
- else
- {
- consumer = ((TopicSession)sessSource).createDurableSubscriber((Topic)sourceDest, subName, selector, false);
- }
+ consumer = sess.createConsumer(destSource, selector, false);
}
}
else
{
- sessSource =
- ((QueueConnection)connSource).createQueueSession(transactional, transactional ? Session.SESSION_TRANSACTED : Session.CLIENT_ACKNOWLEDGE);
-
- if (subName == null)
+ //Durable subscription
+ if (selector == null)
{
- if (selector == null)
- {
- consumer = ((QueueSession)sessSource).createReceiver((Queue)sourceDest);
- }
- else
- {
- consumer = ((QueueSession)sessSource).createReceiver((Queue)sourceDest, selector);
- }
+ consumer = sess.createDurableSubscriber((Topic)destSource, subName);
}
else
{
- //Shouldn't specify sub name for source quuee
- throw new IllegalArgumentException("subName should not be specified if the source destination is a queue");
+ consumer = sess.createDurableSubscriber((Topic)destSource, subName, selector, false);
}
}
- if (destIsTopic)
+ if (qualityOfServiceMode == QOS_ONCE_AND_ONLY_ONCE)
{
- sessDest =
- ((TopicConnection)connDest).createTopicSession(transactional, transactional ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
+ if (trace) { log.trace("Creating XA dest session"); }
- producer = ((TopicSession)sessDest).createProducer((Topic)destDest);
+ //Create an XA sesion for sending to the destination
+
+ sessDest = ((XAConnection)connDest).createXASession();
+
+ sess = ((XASession)sessDest).getSession();
}
else
{
- sessDest =
- ((QueueConnection)connDest).createQueueSession(transactional, transactional ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
-
- producer = ((QueueSession)sessDest).createSender((Queue)destDest);
+ if (trace) { log.trace("Creating non XA dest session"); }
+ //Create a standard session for sending to the destination
+
+ //If maxBatchSize == 1 we just create a non transacted session, otherwise we
+ //create a transacted session for the send, since sending the batch in a transaction
+ //is likely to be more efficient than sending messages individually
+
+ manualCommit = maxBatchSize == 1;
+
+ sessDest = connDest.createSession(manualCommit, manualCommit ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
+
+ sess = sessDest;
}
+ if (qualityOfServiceMode == QOS_ONCE_AND_ONLY_ONCE)
+ {
+ if (trace) { log.trace("Starting JTA transaction"); }
+
+ tx = startTx();
+
+ enlistResources(tx);
+ }
+
+ producer = sess.createProducer(destDest);
+
consumer.setMessageListener(new SourceListener());
connSource.start();
+
+ started = true;
if (maxBatchTime != -1)
{
@@ -391,11 +421,11 @@
batchExpiryTime = System.currentTimeMillis() + maxBatchTime;
checkerThread.start();
- }
+ }
if (trace) { log.trace("Started " + this); }
}
-
+
public synchronized void stop() throws Exception
{
if (!started)
@@ -427,10 +457,12 @@
connDest.close();
- icSource.close();
+ if (tx != null)
+ {
+ //Terminate any transaction
+ tx.rollback();
+ }
- icDest.close();
-
if (trace) { log.trace("Stopped " + this); }
}
@@ -465,11 +497,112 @@
}
// Private -------------------------------------------------------------------
-
- private void sendBatch()
+
+ private void enlistResources(Transaction tx) throws Exception
{
+ if (trace) { log.trace("Enlisting resources in tx"); }
+
+ XAResource resSource = ((XASession)sessSource).getXAResource();
+
+ tx.enlistResource(resSource);
+
+ XAResource resDest = ((XASession)sessDest).getXAResource();
+
+ tx.enlistResource(resDest);
+
+ if (trace) { log.trace("Enlisted resources in tx"); }
+ }
+
+ private void delistResources(Transaction tx) throws Exception
+ {
+ if (trace) { log.trace("Delisting resources from tx"); }
+
+ XAResource resSource = ((XASession)sessSource).getXAResource();
+
+ tx.delistResource(resSource, XAResource.TMSUCCESS);
+
+ XAResource resDest = ((XASession)sessDest).getXAResource();
+
+ tx.delistResource(resDest, XAResource.TMSUCCESS);
+
+ if (trace) { log.trace("Delisted resources from tx"); }
+ }
+
+ private Transaction startTx() throws Exception
+ {
+ if (trace) { log.trace("Starting JTA transaction"); }
+
+ TransactionManager tm = getTm();
+
+ tm.begin();
+
+ Transaction tx = tm.getTransaction();
+
+ //Remove the association between current thread - we don't want it
+ //we will be committing /rolling back directly on the transaction object
+
+ tm.suspend();
+
+ if (trace) { log.trace("Started JTA transaction"); }
+
+ return tx;
+ }
+
+ private TransactionManager getTm()
+ {
+ if (tm == null)
+ {
+// tm = TransactionManagerLocator.getInstance().locate();
+//
+// if (tm == null)
+// {
+// throw new IllegalStateException("Cannot locate a transaction manager");
+// }
+
+ tm = com.arjuna.ats.jta.TransactionManager.transactionManager();
+ }
+
+ return tm;
+ }
+
+ private Connection createConnection(String username, String password, ConnectionFactory cf)
+ throws Exception
+ {
+ Connection conn;
+
+ if (username == null)
+ {
+ if (qualityOfServiceMode == QOS_ONCE_AND_ONLY_ONCE)
+ {
+ if (trace) { log.trace("Creating an XA connection"); }
+ conn = ((XAConnectionFactory)cf).createXAConnection();
+ }
+ else
+ {
+ if (trace) { log.trace("Creating a non XA connection"); }
+ conn = cf.createConnection();
+ }
+ }
+ else
+ {
+ if (qualityOfServiceMode == QOS_ONCE_AND_ONLY_ONCE)
+ {
+ if (trace) { log.trace("Creating an XA connection"); }
+ conn = ((XAConnectionFactory)cf).createXAConnection(username, password);
+ }
+ else
+ {
+ if (trace) { log.trace("Creating a non XA connection"); }
+ conn = cf.createConnection(username, password);
+ }
+ }
+ return conn;
+ }
+
+ private void sendBatch()
+ {
if (trace) { log.trace("Sending batch of " + messages.size() + " messages"); }
-
+
synchronized (lock)
{
if (paused)
@@ -480,6 +613,26 @@
return;
}
+ if (qualityOfServiceMode == QOS_AT_MOST_ONCE)
+ {
+ //We ack before we send
+ if (manualAck)
+ {
+ //Ack on the last message
+ try
+ {
+ ((Message)messages.getLast()).acknowledge();
+ }
+ catch (Throwable t)
+ {
+ //Deal with this
+ t.printStackTrace();
+ }
+ }
+ }
+
+ //Now send the message(s)
+
Iterator iter = messages.iterator();
Message msg = null;
@@ -503,58 +656,67 @@
}
}
- if (transactional)
+ if (qualityOfServiceMode == QOS_DUPLICATES_OK)
{
- try
- {
- if (trace) { log.trace("Committing local sending tx"); }
-
- sessDest.commit();
-
- if (trace) { log.trace("Committed local sending tx"); }
- }
- catch (Throwable t)
- {
- //Deal with this
- t.printStackTrace();
- }
+ //We ack the source message(s) after sending
+
+ if (manualAck)
+ {
+ try
+ {
+ //Ack on the last message
+ ((Message)messages.getLast()).acknowledge();
+ }
+ catch (Throwable t)
+ {
+ //Deal with this
+ t.printStackTrace();
+ }
+ }
}
- messages.clear();
-
- if (transactional)
+ //Now we commit the sending session if necessary
+ if (manualCommit)
{
try
{
- if (trace) { log.trace("Committing local consuming tx"); }
-
- sessSource.commit();
-
- if (trace) { log.trace("Committed local consuming tx"); }
+ sessDest.commit();
}
catch (Throwable t)
{
//Deal with this
t.printStackTrace();
- }
+ }
}
- else
+
+ messages.clear();
+
+ if (qualityOfServiceMode == QOS_ONCE_AND_ONLY_ONCE)
{
+ //Commit the JTA transaction and start another
+
+ //XA
try
{
- if (trace) { log.trace("Acknowledging session"); }
+ delistResources(tx);
+
+ if (trace) { log.trace("Committing JTA transaction"); }
- msg.acknowledge();
+ tx.commit();
- if (trace) { log.trace("Acknowledged session"); }
+ if (trace) { log.trace("Committed JTA transaction"); }
+
+ tx = startTx();
+
+ enlistResources(tx);
}
catch (Throwable t)
{
//Deal with this
t.printStackTrace();
}
+ }
- }
}
}
@@ -623,7 +785,7 @@
if (trace) { log.trace(this + " rescheduled batchExpiryTime to " + batchExpiryTime); }
- if (messages.size() >= maxBatchSize)
+ if (maxBatchSize != -1 && messages.size() >= maxBatchSize)
{
if (trace) { log.trace(this + " maxBatchSize has been reached so sending batch"); }
Modified: trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTest.java 2007-01-11 03:56:53 UTC (rev 1956)
+++ trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTest.java 2007-01-11 12:55:47 UTC (rev 1957)
@@ -48,7 +48,6 @@
*/
public class BridgeTest extends MessagingTestCase
{
-
public BridgeTest(String name)
{
super(name);
@@ -64,26 +63,63 @@
super.tearDown();
}
- public void testMaxBatchSizeNoMaxBatchTimeTransacted() throws Exception
+ public void testMaxBatchSizeNoMaxBatchTime_AtMostOnce() throws Exception
{
if (!ServerManagement.isRemote())
{
return;
}
- testMaxBatchSizeNoMaxBatchTime(true);
+ testMaxBatchSizeNoMaxBatchTime(Bridge.QOS_AT_MOST_ONCE);
}
- public void testMaxBatchSizeNoMaxBatchTimeNonTransacted() throws Exception
+ public void testMaxBatchSizeNoMaxBatchTime_DuplicatesOk() throws Exception
{
if (!ServerManagement.isRemote())
{
return;
}
- testMaxBatchSizeNoMaxBatchTime(false);
+ testMaxBatchSizeNoMaxBatchTime(Bridge.QOS_DUPLICATES_OK);
}
- private void testMaxBatchSizeNoMaxBatchTime(boolean transacted) throws Exception
+ public void testMaxBatchSizeNoMaxBatchTime_OnceAndOnlyOnce() throws Exception
{
+ if (!ServerManagement.isRemote())
+ {
+ return;
+ }
+ testMaxBatchSizeNoMaxBatchTime(Bridge.QOS_ONCE_AND_ONLY_ONCE);
+ }
+
+
+ public void testMaxBatchTimeNoMaxBatchSize_AtMostOnce() throws Exception
+ {
+ if (!ServerManagement.isRemote())
+ {
+ return;
+ }
+ this.testMaxBatchTimeNoMaxBatchSize(Bridge.QOS_AT_MOST_ONCE);
+ }
+
+ public void testMaxBatchTimeNoMaxBatchSize_DuplicatesOk() throws Exception
+ {
+ if (!ServerManagement.isRemote())
+ {
+ return;
+ }
+ this.testMaxBatchTimeNoMaxBatchSize(Bridge.QOS_DUPLICATES_OK);
+ }
+
+ public void testMaxBatchTimeNoMaxBatchSize_OnceAndOnlyOnce() throws Exception
+ {
+ if (!ServerManagement.isRemote())
+ {
+ return;
+ }
+ testMaxBatchTimeNoMaxBatchSize(Bridge.QOS_ONCE_AND_ONLY_ONCE);
+ }
+
+ private void testMaxBatchSizeNoMaxBatchTime(int qosMode) throws Exception
+ {
Connection connSource = null;
Connection connDest = null;
@@ -118,12 +154,11 @@
final int BATCH_SIZE = 10;
- bridge = new Bridge(props0, props1, "/ConnectionFactory", "/ConnectionFactory",
- "/queue/sourceQueue", "/queue/destQueue", null, null, null, null,
- null, 0, false,
- false, 10, -1,
- null, null,
- false, false);
+ bridge = new Bridge(cf0, cf1, sourceQueue, destQueue,
+ null, null, null, null,
+ null, 0, qosMode,
+ 10, -1,
+ null, null);
bridge.start();
@@ -176,6 +211,10 @@
assertEquals("message" + i, tm.getText());
}
+ m = cons.receive(1000);
+
+ assertNull(m);
+
//Send another batch with one more than batch size
for (int i = 0; i < BATCH_SIZE + 1; i++)
@@ -224,7 +263,11 @@
assertEquals("message" + i, tm.getText());
}
+ m = cons.receive(1000);
+ assertNull(m);
+
+
//Make sure no messages are left in the source dest
MessageConsumer cons2 = sessSend.createConsumer(sourceQueue);
@@ -294,4 +337,168 @@
ServerManagement.stop(1);
}
}
+
+ private void testMaxBatchTimeNoMaxBatchSize(int qosMode) throws Exception
+ {
+ Connection connSource = null;
+
+ Connection connDest = null;
+
+ Bridge bridge = null;
+
+ try
+ {
+ ServerManagement.start(0, "all", null, true);
+
+ ServerManagement.start(1, "all", null, false);
+
+ ServerManagement.deployQueue("sourceQueue", 0);
+
+ ServerManagement.deployQueue("destQueue", 1);
+
+ Hashtable props0 = ServerManagement.getJNDIEnvironment(0);
+
+ Hashtable props1 = ServerManagement.getJNDIEnvironment(1);
+
+ InitialContext ic0 = new InitialContext(props0);
+
+ InitialContext ic1 = new InitialContext(props1);
+
+ ConnectionFactory cf0 = (ConnectionFactory)ic0.lookup("/ConnectionFactory");
+
+ ConnectionFactory cf1 = (ConnectionFactory)ic1.lookup("/ConnectionFactory");
+
+ Queue sourceQueue = (Queue)ic0.lookup("/queue/sourceQueue");
+
+ Queue destQueue = (Queue)ic1.lookup("/queue/destQueue");
+
+ final long MAX_BATCH_TIME = 3000;
+
+ bridge = new Bridge(cf0, cf1, sourceQueue, destQueue,
+ null, null, null, null,
+ null, 0, qosMode,
+ -1, MAX_BATCH_TIME,
+ null, null);
+
+ bridge.start();
+
+ connSource = cf0.createConnection();
+
+ connDest = cf1.createConnection();
+
+ Session sessSend = connSource.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod = sessSend.createProducer(sourceQueue);
+
+ final int NUM_MESSAGES = 10;
+
+ //Send some message
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sessSend.createTextMessage("message" + i);
+
+ prod.send(tm);
+ }
+
+ Session sessRec = connDest.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer cons = sessRec.createConsumer(destQueue);
+
+ connDest.start();
+
+ //Verify none are received
+
+ Message m = cons.receive(2000);
+
+ assertNull(m);
+
+ //Wait a bit longer
+
+ Thread.sleep(1500);
+
+ //Messages should now be receivable
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+
+ m = cons.receive(1000);
+
+ assertNull(m);
+
+ //Make sure no messages are left in the source dest
+
+ MessageConsumer cons2 = sessSend.createConsumer(sourceQueue);
+
+ connSource.start();
+
+ m = cons2.receive(1000);
+
+ assertNull(m);
+
+ connSource.close();
+
+ connDest.close();
+
+ }
+ finally
+ {
+ if (connSource != null)
+ {
+ try
+ {
+ connSource.close();
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to close connection", e);
+ }
+ }
+
+ if (connDest != null)
+ {
+ try
+ {
+ connDest.close();
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to close connection", e);
+ }
+ }
+
+ if (bridge != null)
+ {
+ bridge.stop();
+ }
+
+ try
+ {
+ ServerManagement.undeployQueue("sourceQueue", 0);
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to undeploy", e);
+ }
+
+ try
+ {
+ ServerManagement.undeployQueue("destQueue", 1);
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to undeploy", e);
+ }
+
+ ServerManagement.stop(0);
+
+ ServerManagement.stop(1);
+ }
+ }
}
More information about the jboss-cvs-commits
mailing list