[jboss-cvs] JBoss Messaging SVN: r4576 - in trunk: src/main/org/jboss/messaging/jms/bridge/impl and 1 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Jun 25 09:00:39 EDT 2008
Author: jmesnil
Date: 2008-06-25 09:00:39 -0400 (Wed, 25 Jun 2008)
New Revision: 4576
Added:
trunk/src/main/org/jboss/messaging/jms/bridge/impl/
trunk/src/main/org/jboss/messaging/jms/bridge/impl/BridgeImpl.java
trunk/src/main/org/jboss/messaging/jms/bridge/impl/JNDIConnectionFactoryFactory.java
trunk/src/main/org/jboss/messaging/jms/bridge/impl/JNDIDestinationFactory.java
trunk/src/main/org/jboss/messaging/jms/bridge/impl/JNDIFactorySupport.java
Removed:
trunk/src/main/org/jboss/messaging/jms/bridge/JNDIConnectionFactoryFactory.java
trunk/src/main/org/jboss/messaging/jms/bridge/JNDIDestinationFactory.java
trunk/src/main/org/jboss/messaging/jms/bridge/JNDIFactorySupport.java
Modified:
trunk/src/main/org/jboss/messaging/jms/bridge/Bridge.java
trunk/src/main/org/jboss/messaging/jms/bridge/BridgeService.java
trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/bridge/BridgeMBeanTest.java
trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/bridge/BridgeTest.java
trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/bridge/BridgeTestBase.java
trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/bridge/ReconnectTest.java
Log:
renamed Bridge class into o.j.message.jms.bridge.impl.BridgeImpl & added Bridge interface
added unit tests + refactoring of BridgeImpl (WIP)
Modified: trunk/src/main/org/jboss/messaging/jms/bridge/Bridge.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/bridge/Bridge.java 2008-06-25 12:28:42 UTC (rev 4575)
+++ trunk/src/main/org/jboss/messaging/jms/bridge/Bridge.java 2008-06-25 13:00:39 UTC (rev 4576)
@@ -1,6 +1,6 @@
/*
* JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * Copyright 2008, Red Hat Middleware LLC, and individual contributors
* by the @authors tag. See the copyright.txt in the distribution for a
* full listing of individual contributors.
*
@@ -18,1633 +18,94 @@
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
+ */
package org.jboss.messaging.jms.bridge;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.Map;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.Topic;
-import javax.jms.XAConnection;
-import javax.jms.XAConnectionFactory;
-import javax.jms.XASession;
-import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
-import javax.transaction.xa.XAResource;
-import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.server.MessagingComponent;
-import org.jboss.messaging.jms.client.JBossMessage;
-import org.jboss.messaging.jms.client.JBossSession;
-import org.jboss.tm.TransactionManagerLocator;
/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
*
- * A Bridge
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @version <tt>$Revision$</tt>
- *
- * $Id$
- *
+ *
*/
-public class Bridge implements MessagingComponent
+public interface Bridge extends MessagingComponent
{
- private static final Logger log;
-
- private static boolean trace;
-
- //Quality of service modes
-
- public static final int QOS_AT_MOST_ONCE = 0;
-
- public static final int QOS_DUPLICATES_OK = 1;
-
- public static final int QOS_ONCE_AND_ONLY_ONCE = 2;
-
- /*
- *
- * Quality of service (QoS) levels
- * ===============================
- *
- * QOS_AT_MOST_ONCE
- * ----------------
- *
- * With this QoS mode messages will reach the destination from the source at most once.
- * The messages are consumed from the source and acknowledged
- * before sending to the destination. Therefore there is a possibility that if failure occurs between
- * removing them from the source and them arriving at the destination they could be lost. Hence delivery
- * will occur at most once.
- * This mode is avilable for both persistent and non persistent messages.
- *
- * QOS_DUPLICATES_OK
- * -----------------
- *
- * With this QoS mode, the messages are consumed from the source and then acknowledged
- * after they have been successfully sent to the destination. Therefore there is a possibility that if
- * failure occurs after sending to the destination but before acknowledging them, they could be sent again
- * when the system recovers. I.e. the destination might receive duplicates after a failure.
- * This mode is available for both persistent and non persistent messages.
- *
- * QOS_ONCE_AND_ONLY_ONCE
- * ----------------------
- *
- * This QoS mode ensures messages will reach the destination from the source once and only once.
- * (Sometimes this mode is known as "exactly once").
- * If both the source and the destination are on the same JBoss Messaging server instance then this can
- * be achieved by sending and acknowledging the messages in the same local transaction.
- * If the source and destination are on different servers this is achieved by enlisting the sending and consuming
- * sessions in a JTA transaction. The JTA transaction is controlled by JBoss Transactions JTA implementation which
- * is a fully recovering transaction manager, thus providing a very high degree of durability.
- * If JTA is required then both supplied connection factories need to be XAConnectionFactory implementations.
- * This mode is only available for persistent messages.
- * This is likely to be the slowest mode since it requires extra persistence for the transaction logging.
- *
- * Note:
- * For a specific application it may possible to provide once and only once semantics without using the
- * QOS_ONCE_AND_ONLY_ONCE QoS level. This can be done by using the QOS_DUPLICATES_OK mode and then checking for
- * duplicates at the destination and discarding them. Some JMS servers provide automatic duplicate message detection
- * functionality, or this may be possible to implement on the application level by maintaining a cache of received
- * message ids on disk and comparing received messages to them. The cache would only be valid
- * for a certain period of time so this approach is not as watertight as using QOS_ONCE_AND_ONLY_ONCE but
- * may be a good choice depending on your specific application.
- *
- *
- */
-
- static
- {
- log = Logger.getLogger(Bridge.class);
-
- trace = log.isTraceEnabled();
- }
+ void pause() throws Exception;
- private String sourceUsername;
-
- private String sourcePassword;
-
- private String targetUsername;
-
- private String targetPassword;
-
- private TransactionManager tm;
-
- private String selector;
-
- private long failureRetryInterval;
-
- private int maxRetries;
-
- private int qualityOfServiceMode;
-
- private int maxBatchSize;
-
- private long maxBatchTime;
-
- private String subName;
-
- private String clientID;
-
- private volatile boolean addMessageIDInHeader;
-
-
-
- private boolean started;
-
- private LinkedList<Message> messages;
-
- private Object lock;
-
- private ConnectionFactoryFactory sourceCff;
-
- private ConnectionFactoryFactory targetCff;
-
- private DestinationFactory sourceDestinationFactory;
-
- private DestinationFactory targetDestinationFactory;
-
- private Connection sourceConn;
-
- private Connection targetConn;
-
- private Destination sourceDestination;
-
- private Destination targetDestination;
-
- private Session sourceSession;
-
- private Session targetSession;
-
- private MessageConsumer consumer;
-
- private MessageProducer producer;
-
- private BatchTimeChecker timeChecker;
-
- private Thread checkerThread;
-
- private long batchExpiryTime;
-
- private boolean paused;
-
- private Transaction tx;
-
- private boolean failed;
-
- private int forwardMode;
-
- private static final int FORWARD_MODE_XA = 0;
-
- private static final int FORWARD_MODE_LOCALTX = 1;
-
- private static final int FORWARD_MODE_NONTX = 2;
-
- /*
- * Constructor for MBean
- */
- public Bridge()
- {
- this.messages = new LinkedList<Message>();
-
- this.lock = new Object();
- }
-
- public Bridge(ConnectionFactoryFactory sourceCff, ConnectionFactoryFactory destCff,
- DestinationFactory sourceDestinationFactory, DestinationFactory targetDestinationFactory,
- String sourceUsername, String sourcePassword,
- String targetUsername, String targetPassword,
- String selector, long failureRetryInterval,
- int maxRetries,
- int qosMode,
- int maxBatchSize, long maxBatchTime,
- String subName, String clientID,
- boolean addMessageIDInHeader)
- {
- this();
-
- this.sourceCff = sourceCff;
-
- this.targetCff = destCff;
-
- this.sourceDestinationFactory = sourceDestinationFactory;
-
- this.targetDestinationFactory = targetDestinationFactory;
-
- this.sourceUsername = sourceUsername;
-
- this.sourcePassword = sourcePassword;
-
- this.targetUsername = targetUsername;
-
- this.targetPassword = targetPassword;
-
- this.selector = selector;
-
- this.failureRetryInterval = failureRetryInterval;
-
- this.maxRetries = maxRetries;
-
- this.qualityOfServiceMode = qosMode;
-
- this.maxBatchSize = maxBatchSize;
-
- this.maxBatchTime = maxBatchTime;
-
- this.subName = subName;
-
- this.clientID = clientID;
-
- this.addMessageIDInHeader = addMessageIDInHeader;
-
- if (trace)
- {
- log.trace("Created " + this);
- }
- }
-
-
- // MessagingComponent overrides --------------------------------------------------
-
- public synchronized void start() throws Exception
- {
- if (started)
- {
- log.warn("Attempt to start, but is already started");
- return;
- }
-
- if (trace) { log.trace("Starting " + this); }
-
- checkParams();
-
- TransactionManager tm = getTm();
-
- //There may already be a JTA transaction associated to the thread
-
- boolean ok;
-
- Transaction toResume = null;
- try
- {
- toResume = tm.suspend();
-
- ok = setupJMSObjects();
- }
- finally
- {
- if (toResume != null)
- {
- tm.resume(toResume);
- }
- }
-
- if (ok)
- {
- //start the source connection
-
- sourceConn.start();
-
- started = true;
-
- if (maxBatchTime != -1)
- {
- if (trace) { log.trace("Starting time checker thread"); }
-
- timeChecker = new BatchTimeChecker();
-
- checkerThread = new Thread(timeChecker);
-
- batchExpiryTime = System.currentTimeMillis() + maxBatchTime;
-
- checkerThread.start();
-
- if (trace) { log.trace("Started time checker thread"); }
- }
-
- if (trace) { log.trace("Started " + this); }
- }
- else
- {
- log.warn("Failed to start bridge");
- handleFailureOnStartup();
- }
- }
-
- public synchronized void stop() throws Exception
- {
- if (!started)
- {
- log.warn("Attempt to stop, but is already stopped");
- return;
- }
-
- if (trace) { log.trace("Stopping " + this); }
-
- synchronized (lock)
- {
- started = false;
-
- //This must be inside sync block
- if (checkerThread != null)
- {
- checkerThread.interrupt();
- }
- }
-
- //This must be outside sync block
- if (checkerThread != null)
- {
- if (trace) { log.trace("Waiting for checker thread to finish");}
-
- checkerThread.join();
-
- if (trace) { log.trace("Checker thread has finished"); }
- }
-
- if (tx != null)
- {
- //Terminate any transaction
- if (trace) { log.trace("Rolling back remaining tx"); }
-
- try
- {
- tx.rollback();
- }
- catch (Exception ignore)
- {
- if (trace) { log.trace("Failed to rollback", ignore); }
- }
-
- if (trace) { log.trace("Rolled back remaining tx"); }
- }
-
- try
- {
- sourceConn.close();
- }
- catch (Exception ignore)
- {
- if (trace) { log.trace("Failed to close source conn", ignore); }
- }
-
- if (targetConn != null)
- {
- try
- {
- targetConn.close();
- }
- catch (Exception ignore)
- {
- if (trace) { log.trace("Failed to close target conn", ignore); }
- }
- }
-
- if (trace) { log.trace("Stopped " + this); }
- }
-
- // Public ---------------------------------------------------------------------------
-
- public synchronized void pause() throws Exception
- {
- if (trace) { log.trace("Pausing " + this); }
-
- synchronized (lock)
- {
- paused = true;
-
- sourceConn.stop();
- }
-
- if (trace) { log.trace("Paused " + this); }
- }
-
- public synchronized void resume() throws Exception
- {
- if (trace) { log.trace("Resuming " + this); }
-
- synchronized (lock)
- {
- paused = false;
-
- sourceConn.start();
- }
-
- if (trace) { log.trace("Resumed " + this); }
- }
-
- public DestinationFactory getSourceDestinationFactory()
- {
- return sourceDestinationFactory;
- }
+ void resume() throws Exception;
- public void setSourceDestinationFactory(DestinationFactory dest)
- {
- if (started)
- {
- log.warn("Cannot set SourceDestinationFactory while bridge is started");
- return;
- }
- sourceDestinationFactory = dest;
- }
-
- public DestinationFactory getTargetDestinationFactory()
- {
- return targetDestinationFactory;
- }
+ DestinationFactory getSourceDestinationFactory();
- public void setTargetDestinationFactory(DestinationFactory dest)
- {
- if (started)
- {
- log.warn("Cannot set TargetDestinationFactory while bridge is started");
- return;
- }
- targetDestinationFactory = dest;
- }
-
- public String getSourceUsername()
- {
- return sourceUsername;
- }
-
- public synchronized void setSourceUsername(String name)
- {
- if (started)
- {
- log.warn("Cannot set SourceUsername while bridge is started");
- return;
- }
- sourceUsername = name;
- }
-
- public synchronized String getSourcePassword()
- {
- return sourcePassword;
- }
-
- public synchronized void setSourcePassword(String pwd)
- {
- if (started)
- {
- log.warn("Cannot set SourcePassword while bridge is started");
- return;
- }
- sourcePassword = pwd;
- }
-
- public synchronized String getDestUsername()
- {
- return targetUsername;
- }
-
- public synchronized void setDestUserName(String name)
- {
- if (started)
- {
- log.warn("Cannot set DestUserName while bridge is started");
- return;
- }
- this.targetUsername = name;
- }
-
- public synchronized String getDestPassword()
- {
- return this.targetPassword;
- }
-
- public synchronized void setDestPassword(String pwd)
- {
- if (started)
- {
- log.warn("Cannot set DestPassword while bridge is started");
- return;
- }
- this.targetPassword = pwd;
- }
-
- public synchronized String getSelector()
- {
- return selector;
- }
-
- public synchronized void setSelector(String selector)
- {
- if (started)
- {
- log.warn("Cannot set Selector while bridge is started");
- return;
- }
- this.selector = selector;
- }
-
- public synchronized long getFailureRetryInterval()
- {
- return failureRetryInterval;
- }
-
- public synchronized void setFailureRetryInterval(long interval)
- {
- if (started)
- {
- log.warn("Cannot set FailureRetryInterval while bridge is started");
- return;
- }
-
- this.failureRetryInterval = interval;
- }
-
- public synchronized int getMaxRetries()
- {
- return maxRetries;
- }
-
- public synchronized void setMaxRetries(int retries)
- {
- if (started)
- {
- log.warn("Cannot set MaxRetries while bridge is started");
- return;
- }
-
- this.maxRetries = retries;
- }
-
- public synchronized int getQualityOfServiceMode()
- {
- return qualityOfServiceMode;
- }
-
- public synchronized void setQualityOfServiceMode(int mode)
- {
- if (started)
- {
- log.warn("Cannot set QualityOfServiceMode while bridge is started");
- return;
- }
-
- qualityOfServiceMode = mode;
- }
-
- public synchronized int getMaxBatchSize()
- {
- return maxBatchSize;
- }
-
- public synchronized void setMaxBatchSize(int size)
- {
- if (started)
- {
- log.warn("Cannot set MaxBatchSize while bridge is started");
- return;
- }
-
- maxBatchSize = size;
- }
-
- public synchronized long getMaxBatchTime()
- {
- return maxBatchTime;
- }
-
- public synchronized void setMaxBatchTime(long time)
- {
- if (started)
- {
- log.warn("Cannot set MaxBatchTime while bridge is started");
- return;
- }
-
- maxBatchTime = time;
- }
-
- public synchronized String getSubName()
- {
- return this.subName;
- }
-
- public synchronized void setSubName(String subname)
- {
- if (started)
- {
- log.warn("Cannot set SubName while bridge is started");
- return;
- }
-
- this.subName = subname;
- }
-
- public synchronized String getClientID()
- {
- return clientID;
- }
-
- public synchronized void setClientID(String clientID)
- {
- if (started)
- {
- log.warn("Cannot set ClientID while bridge is started");
- return;
- }
-
- this.clientID = clientID;
- }
-
- public boolean isAddMessageIDInHeader()
- {
- return this.addMessageIDInHeader;
- }
-
- public void setAddMessageIDInHeader(boolean value)
- {
- this.addMessageIDInHeader = value;
- }
-
- public synchronized boolean isPaused()
- {
- return paused;
- }
-
- public synchronized boolean isFailed()
- {
- return failed;
- }
-
- public synchronized boolean isStarted()
- {
- return started;
- }
-
- public synchronized void setSourceConnectionFactoryFactory(ConnectionFactoryFactory cff)
- {
- if (started)
- {
- log.warn("Cannot set SourceConnectionFactoryFactory while bridge is started");
- return;
- }
- this.sourceCff = cff;
- }
-
- public synchronized void setDestConnectionFactoryFactory(ConnectionFactoryFactory cff)
- {
- if (started)
- {
- log.warn("Cannot set DestConnectionFactoryFactory while bridge is started");
- return;
- }
- this.targetCff = cff;
- }
-
- // Private -------------------------------------------------------------------
-
- private void checkParams()
- {
- if (sourceCff == null)
- {
- throw new IllegalArgumentException("sourceCff cannot be null");
- }
- if (targetCff == null)
- {
- throw new IllegalArgumentException("targetCff cannot be null");
- }
- if (sourceDestinationFactory == null)
- {
- throw new IllegalArgumentException("sourceDestinationFactory cannot be null");
- }
- if (targetDestinationFactory == null)
- {
- throw new IllegalArgumentException("targetDestinationFactory cannot be null");
- }
- if (failureRetryInterval < 0 && failureRetryInterval != -1)
- {
- throw new IllegalArgumentException("failureRetryInterval must be > 0 or -1 to represent no retry");
- }
- if (maxRetries < 0 && maxRetries != -1)
- {
- throw new IllegalArgumentException("maxRetries must be >= 0 or -1 to represent infinite retries");
- }
- if (failureRetryInterval == -1 && maxRetries > 0)
- {
- throw new IllegalArgumentException("If failureRetryInterval == -1 maxRetries must be 0");
- }
- if (!(maxBatchSize >= 1))
- {
- throw new IllegalArgumentException("maxBatchSize must be >= 1");
- }
- if (!(maxBatchTime == -1 || maxBatchTime >=1))
- {
- throw new IllegalArgumentException("maxBatchTime must be >= 1 or -1 to represent unlimited batch time");
- }
- if (qualityOfServiceMode != QOS_AT_MOST_ONCE && qualityOfServiceMode != QOS_DUPLICATES_OK && qualityOfServiceMode != QOS_ONCE_AND_ONLY_ONCE)
- {
- throw new IllegalArgumentException("Invalid quality of service mode " + qualityOfServiceMode);
- }
- }
-
- private void enlistResources(Transaction tx) throws Exception
- {
- if (trace) { log.trace("Enlisting resources in tx"); }
-
- XAResource resSource = ((XASession)sourceSession).getXAResource();
-
- tx.enlistResource(resSource);
-
- XAResource resDest = ((XASession)targetSession).getXAResource();
-
- tx.enlistResource(resDest);
-
- if (trace) { log.trace("Enlisted resources in tx"); }
- }
-
- private void delistResources(Transaction tx) throws Exception
- {
- if (trace) { log.trace("Delisting resources from tx"); }
-
- XAResource resSource = ((XASession)sourceSession).getXAResource();
-
- tx.delistResource(resSource, XAResource.TMSUCCESS);
-
- XAResource resDest = ((XASession)targetSession).getXAResource();
-
- tx.delistResource(resDest, XAResource.TMSUCCESS);
-
- if (trace) { log.trace("Delisted resources from tx"); }
- }
-
- private Transaction startTx() throws Exception
- {
- if (trace) { log.trace("Starting JTA transaction"); }
-
- TransactionManager tm = getTm();
-
- //Set timeout to a large value since we do not want to time out while waiting for messages
- //to arrive - 10 years should be enough
- tm.setTransactionTimeout(60 * 60 * 24 * 365 * 10);
-
- tm.begin();
-
- Transaction tx = tm.getTransaction();
-
- //Remove the association between current thread - we don't want it
- //we will be committing /rolling back directly on the transaction object
-
- tm.suspend();
-
- if (trace) { log.trace("Started JTA transaction"); }
-
- return tx;
- }
-
- private TransactionManager getTm()
- {
- if (tm == null)
- {
- tm = TransactionManagerLocator.getInstance().locate();
-
- if (tm == null)
- {
- throw new IllegalStateException("Cannot locate a transaction manager");
- }
- }
-
- return tm;
- }
-
- private Connection createConnection(String username, String password, ConnectionFactoryFactory cff)
- throws Exception
- {
- Connection conn;
-
- ConnectionFactory cf = cff.createConnectionFactory();
-
- if (qualityOfServiceMode == QOS_ONCE_AND_ONLY_ONCE &&
- !(cf instanceof XAConnectionFactory))
- {
- throw new IllegalArgumentException("Connection factory must be XAConnectionFactory");
- }
-
- if (username == null)
- {
- if (qualityOfServiceMode == QOS_ONCE_AND_ONLY_ONCE)
- {
- if (trace) { log.trace("Creating an XA connection"); }
- conn = ((XAConnectionFactory)cf).createXAConnection();
- }
- else
- {
- if (trace) { log.trace("Creating a non XA connection"); }
- conn = cf.createConnection();
- }
- }
- else
- {
- if (qualityOfServiceMode == QOS_ONCE_AND_ONLY_ONCE)
- {
- if (trace) { log.trace("Creating an XA connection"); }
- conn = ((XAConnectionFactory)cf).createXAConnection(username, password);
- }
- else
- {
- if (trace) { log.trace("Creating a non XA connection"); }
- conn = cf.createConnection(username, password);
- }
- }
-
- return conn;
- }
-
- /*
- * Source and target on same server
- * --------------------------------
- * If the source and target destinations are on the same server (same resource manager) then,
- * in order to get QOS_ONCE_AND_ONLY_ONCE, we simply need to consuming and send in a single
- * local JMS transaction.
- *
- * We actually use a single local transacted session for the other QoS modes too since this
- * is more performant than using DUPS_OK_ACKNOWLEDGE or AUTO_ACKNOWLEDGE session ack modes, so effectively
- * the QoS is upgraded.
- *
- * Source and target on different server
- * -------------------------------------
- * If the source and target destinations are on a different servers (different resource managers) then:
- *
- * If desired QoS is QOS_ONCE_AND_ONLY_ONCE, then we start a JTA transaction and enlist the consuming and sending
- * XAResources in that.
- *
- * If desired QoS is QOS_DUPLICATES_OK then, we use CLIENT_ACKNOWLEDGE for the consuming session and
- * AUTO_ACKNOWLEDGE (this is ignored) for the sending session if the maxBatchSize == 1, otherwise we
- * use a local transacted session for the sending session where maxBatchSize > 1, since this is more performant
- * When bridging a batch, we make sure to manually acknowledge the consuming session, if it is CLIENT_ACKNOWLEDGE
- * *after* the batch has been sent
- *
- * If desired QoS is QOS_AT_MOST_ONCE then, if maxBatchSize == 1, we use AUTO_ACKNOWLEDGE for the consuming session,
- * and AUTO_ACKNOWLEDGE for the sending session.
- * If maxBatchSize > 1, we use CLIENT_ACKNOWLEDGE for the consuming session and a local transacted session for the
- * sending session.
- *
- * When bridging a batch, we make sure to manually acknowledge the consuming session, if it is CLIENT_ACKNOWLEDGE
- * *before* the batch has been sent
- *
- */
- private boolean setupJMSObjects()
- {
- try
- {
- //Lookup the destinations
- sourceDestination = sourceDestinationFactory.createDestination();
-
- targetDestination = targetDestinationFactory.createDestination();
-
- if (sourceCff == targetCff)
- {
- //Source and target destinations are on the server - we can get once and only once
- //just using a local transacted session
- //everything becomes once and only once
-
- forwardMode = FORWARD_MODE_LOCALTX;
- }
- else
- {
- //Different servers
- if (qualityOfServiceMode == QOS_ONCE_AND_ONLY_ONCE)
- {
- //Use XA
-
- forwardMode = FORWARD_MODE_XA;
- }
- else
- {
- forwardMode = FORWARD_MODE_NONTX;
- }
- }
-
- //Lookup the destinations
- sourceDestination = sourceDestinationFactory.createDestination();
-
- targetDestination = targetDestinationFactory.createDestination();
-
- sourceConn = createConnection(sourceUsername, sourcePassword, sourceCff);
-
- if (forwardMode != FORWARD_MODE_LOCALTX)
- {
- targetConn = createConnection(targetUsername, targetPassword, targetCff);
-
- targetConn.setExceptionListener(new BridgeExceptionListener());
- }
-
- if (clientID != null)
- {
- sourceConn.setClientID(clientID);
- }
-
- sourceConn.setExceptionListener(new BridgeExceptionListener());
-
- Session sess;
-
- if (forwardMode == FORWARD_MODE_LOCALTX)
- {
- //We simply use a single local transacted session for consuming and sending
-
- sourceSession = sourceConn.createSession(true, Session.SESSION_TRANSACTED);
-
- sess = sourceSession;
- }
- else
- {
- if (forwardMode == FORWARD_MODE_XA)
- {
- //Create an XASession for consuming from the source
- if (trace) { log.trace("Creating XA source session"); }
-
- sourceSession = ((XAConnection)sourceConn).createXASession();
-
- sess = ((XASession)sourceSession).getSession();
- }
- else
- {
- if (trace) { log.trace("Creating non XA source session"); }
-
- //Create a standard session for consuming from the source
-
- //We use ack mode client ack
-
- sourceSession = sourceConn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
- sess = sourceSession;
- }
- }
-
- if (forwardMode == FORWARD_MODE_XA && sourceSession instanceof JBossSession)
- {
- JBossSession jsession = (JBossSession)sourceSession;
+ void setSourceDestinationFactory(DestinationFactory dest);
- org.jboss.messaging.core.client.ClientSession session = jsession.getDelegate();
-
- //session.setTreatAsNonTransactedWhenNotEnlisted(false);
- }
-
- if (subName == null)
- {
- if (selector == null)
- {
- consumer = sess.createConsumer(sourceDestination);
- }
- else
- {
- consumer = sess.createConsumer(sourceDestination, selector, false);
- }
- }
- else
- {
- //Durable subscription
- if (selector == null)
- {
- consumer = sess.createDurableSubscriber((Topic)sourceDestination, subName);
- }
- else
- {
- consumer = sess.createDurableSubscriber((Topic)sourceDestination, subName, selector, false);
- }
- }
-
- //Now the sending session
-
-
- if (forwardMode != FORWARD_MODE_LOCALTX)
- {
- if (forwardMode == FORWARD_MODE_XA)
- {
- if (trace) { log.trace("Creating XA dest session"); }
-
- //Create an XA sesion for sending to the destination
-
- targetSession = ((XAConnection)targetConn).createXASession();
-
- sess = ((XASession)targetSession).getSession();
- }
- else
- {
- if (trace) { log.trace("Creating non XA dest session"); }
-
- //Create a standard session for sending to the target
-
- //If batch size > 1 we use a transacted session since is more efficient
-
- boolean transacted = maxBatchSize > 1;
-
- targetSession = targetConn.createSession(transacted, transacted ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
-
- sess = targetSession;
- }
- }
-
- if (forwardMode == FORWARD_MODE_XA)
- {
- if (trace) { log.trace("Starting JTA transaction"); }
-
- tx = startTx();
-
- enlistResources(tx);
- }
-
- producer = sess.createProducer(null);
-
- consumer.setMessageListener(new SourceListener());
-
- return true;
- }
- catch (Exception e)
- {
- log.warn("Failed to set up connections", e);
-
- //If this fails we should attempt to cleanup or we might end up in some weird state
-
- cleanup();
-
- return false;
- }
- }
-
- private void cleanup()
- {
-
- //Close the old objects
- try
- {
- sourceConn.close();
- }
- catch (Throwable ignore)
- {
- if (trace) { log.trace("Failed to close source connection", ignore); }
- }
- try
- {
- if (targetConn != null)
- {
- targetConn.close();
- }
- }
- catch (Throwable ignore)
- {
- if (trace) { log.trace("Failed to close target connection", ignore); }
- }
+ DestinationFactory getTargetDestinationFactory();
-
- if (tx != null)
- {
- try
- {
- delistResources(tx);
- }
- catch (Throwable ignore)
- {
- if (trace) { log.trace("Failed to delist resources", ignore); }
- }
- try
- {
- //Terminate the tx
- tx.rollback();
- }
- catch (Throwable ignore)
- {
- if (trace) { log.trace("Failed to rollback", ignore); }
- }
- }
- }
-
- private void pause(long interval)
- {
- long start = System.currentTimeMillis();
- while (System.currentTimeMillis() - start < failureRetryInterval)
- {
- try
- {
- Thread.sleep(failureRetryInterval);
- }
- catch (InterruptedException ex)
- {
- }
- }
- }
-
- private boolean setupJMSObjectsWithRetry()
- {
- if (trace) { log.trace("Setting up connections"); }
-
- int count = 0;
-
- while (true)
- {
- boolean ok = setupJMSObjects();
-
- if (ok)
- {
- return true;
- }
-
- count++;
-
- if (maxRetries != -1 && count == maxRetries)
- {
- break;
- }
-
- log.warn("Failed to set up connections, will retry after a pause of " + failureRetryInterval + " ms");
-
- pause(failureRetryInterval);
- }
-
- //If we get here then we exceed maxRetries
- return false;
- }
-
- private void sendBatchNonTransacted()
- {
- try
- {
- if (qualityOfServiceMode == QOS_AT_MOST_ONCE)
- {
- //We client ack before sending
-
- if (trace) { log.trace("Client acking source session"); }
-
- ((Message)messages.getLast()).acknowledge();
-
- if (trace) { log.trace("Client acked source session"); }
- }
-
- sendMessages();
-
- if (maxBatchSize > 1)
- {
- //The sending session is transacted - we need to commit it
-
- if (trace) { log.trace("Committing target session"); }
-
- targetSession.commit();
-
- if (trace) { log.trace("Committed source session"); }
- }
-
- if (qualityOfServiceMode == QOS_DUPLICATES_OK)
- {
- //We client ack after sending
-
- //Note we could actually use Session.DUPS_OK_ACKNOWLEDGE here
- //For a slightly less strong delivery guarantee
-
- if (trace) { log.trace("Client acking source session"); }
-
- ((Message)messages.getLast()).acknowledge();
-
- if (trace) { log.trace("Client acked source session"); }
- }
-
- //Clear the messages
- messages.clear();
- }
- catch (Exception e)
- {
- log.warn("Failed to send + acknowledge batch, closing JMS objects", e);
-
- handleFailureOnSend();
- }
- }
-
- private void sendBatchXA()
- {
- try
- {
- sendMessages();
-
- //Commit the JTA transaction and start another
-
- delistResources(tx);
-
- if (trace) { log.trace("Committing JTA transaction"); }
-
- tx.commit();
+ void setTargetDestinationFactory(DestinationFactory dest);
- if (trace) { log.trace("Committed JTA transaction"); }
-
- tx = startTx();
-
- enlistResources(tx);
-
- //Clear the messages
- messages.clear();
- }
- catch (Exception e)
- {
- log.warn("Failed to send + acknowledge batch, closing JMS objects", e);
-
- handleFailureOnSend();
- }
- }
-
- private void sendBatchLocalTx()
- {
- try
- {
- sendMessages();
-
- if (trace) { log.trace("Committing source session"); }
-
- sourceSession.commit();
-
- if (trace) { log.trace("Committed source session"); }
-
- //Clear the messages
- messages.clear();
- }
- catch (Exception e)
- {
- log.warn("Failed to send + acknowledge batch, closing JMS objects", e);
-
- handleFailureOnSend();
- }
- }
-
- private void sendBatch()
- {
- if (trace) { log.trace("Sending batch of " + messages.size() + " messages"); }
-
- if (paused)
- {
- //Don't send now
- if (trace) { log.trace("Paused, so not sending now"); }
-
- return;
- }
-
- if (forwardMode == FORWARD_MODE_LOCALTX)
- {
- sendBatchLocalTx();
- }
- else if (forwardMode == FORWARD_MODE_XA)
- {
- sendBatchXA();
- }
- else
- {
- sendBatchNonTransacted();
- }
- }
-
- private void sendMessages() throws Exception
- {
- Iterator iter = messages.iterator();
-
- Message msg = null;
-
- while (iter.hasNext())
- {
- msg = (Message)iter.next();
-
- if (addMessageIDInHeader)
- {
- addMessageIDInHeader(msg);
- }
-
- if (trace) { log.trace("Sending message " + msg); }
-
- //Make sure the correct time to live gets propagated
-
- long timeToLive = msg.getJMSExpiration();
-
- if (timeToLive != 0)
- {
- timeToLive -= System.currentTimeMillis();
-
- if (timeToLive <= 0)
- {
- timeToLive = 1; //Should have already expired - set to 1 so it expires when it is consumed or delivered
- }
- }
-
- producer.send(targetDestination, msg, msg.getJMSDeliveryMode(), msg.getJMSPriority(), timeToLive);
-
- if (trace) { log.trace("Sent message " + msg); }
- }
- }
-
- private void handleFailureOnSend()
- {
- handleFailure(new FailureHandler());
- }
-
- private void handleFailureOnStartup()
- {
- handleFailure(new StartupFailureHandler());
- }
-
- private void handleFailure(Runnable failureHandler)
- {
- failed = true;
+ String getSourceUsername();
- //Failure must be handled on a separate thread to the calling thread (either onMessage or start).
- //In the case of onMessage we can't close the connection from inside the onMessage method
- //since it will block waiting for onMessage to complete. In the case of start we want to return
- //from the call before the connections are reestablished so that the caller is not blocked unnecessarily.
- Thread t = new Thread(failureHandler);
-
- t.start();
- }
-
- private void addMessageIDInHeader(Message msg) throws Exception
- {
- //We concatenate the old message id as a header in the message
- //This allows the target to then use this as the JMSCorrelationID of any response message
- //thus enabling a distributed request-response pattern.
- //Each bridge (if there are more than one) in the chain can concatenate the message id
- //So in the case of multiple bridges having routed the message this can be used in a multi-hop
- //distributed request/response
- if (trace) { log.trace("Adding old message id in Message header"); }
-
- //Now JMS is really dumb and does not let you add a property on received message without first
- //calling clearProperties, so we need to save and re-add all the old properties so we
- //don't lose them!!
-
- Enumeration en = msg.getPropertyNames();
-
- Map<String, Object> oldProps = null;
-
- while (en.hasMoreElements())
- {
- String propName = (String)en.nextElement();
-
- if (oldProps == null)
- {
- oldProps = new HashMap<String, Object>();
- }
-
- oldProps.put(propName, msg.getObjectProperty(propName));
- }
-
- msg.clearProperties();
-
- if (oldProps != null)
- {
- Iterator iter2 = oldProps.entrySet().iterator();
-
- while (iter2.hasNext())
- {
- Map.Entry entry = (Map.Entry)iter2.next();
+ void setSourceUsername(String name);
- String propName = (String)entry.getKey();
+ String getSourcePassword();
- msg.setObjectProperty(propName, entry.getValue());
- }
- }
+ void setSourcePassword(String pwd);
- String val = null;
-
- val = msg.getStringProperty(JBossMessage.JBOSS_MESSAGING_BRIDGE_MESSAGE_ID_LIST);
-
- if (val == null)
- {
- val = msg.getJMSMessageID();
- }
- else
- {
- StringBuffer sb = new StringBuffer(val);
-
- sb.append(",").append(msg.getJMSMessageID());
-
- val = sb.toString();
- }
-
- msg.setStringProperty(JBossMessage.JBOSS_MESSAGING_BRIDGE_MESSAGE_ID_LIST, val);
- }
-
- // Inner classes ---------------------------------------------------------------
-
- private class FailureHandler implements Runnable
- {
- /**
- * Start the source connection - note the source connection must not be started before
- * otherwise messages will be received and ignored
- */
- protected void startSourceConnection()
- {
- try
- {
- sourceConn.start();
- }
- catch (JMSException e)
- {
- log.error("Failed to start source connection", e);
- }
- }
+ String getTargetUsername();
- protected void succeeded()
- {
- log.info("Succeeded in reconnecting to servers");
-
- synchronized (lock)
- {
- failed = false;
+ void setTargetUsername(String name);
- startSourceConnection();
- }
- }
-
- protected void failed()
- {
- //We haven't managed to recreate connections or maxRetries = 0
- log.warn("Unable to set up connections, bridge will be stopped");
-
- try
- {
- stop();
- }
- catch (Exception ignore)
- {
- }
- }
+ String getTargetPassword();
- public void run()
- {
- if (trace) { log.trace("Failure handler running"); }
-
- // Clear the messages
- messages.clear();
+ void setTargetPassword(String pwd);
- cleanup();
-
- boolean ok = false;
-
- if (maxRetries > 0 || maxRetries == -1)
- {
- log.warn("Will retry after a pause of " + failureRetryInterval + " ms");
-
- pause(failureRetryInterval);
-
- //Now we try
- ok = setupJMSObjectsWithRetry();
- }
-
- if (!ok)
- {
- failed();
- }
- else
- {
- succeeded();
- }
- }
- }
-
- private class StartupFailureHandler extends FailureHandler
- {
- protected void failed()
- {
- // Don't call super
- log.warn("Unable to set up connections, bridge will not be started");
- }
-
- protected void succeeded()
- {
- // Don't call super - a bit ugly in this case but better than taking the lock twice.
- log.info("Succeeded in connecting to servers");
-
- synchronized (lock)
- {
- failed = false;
- started = true;
-
- //Start the source connection - note the source connection must not be started before
- //otherwise messages will be received and ignored
-
- try
- {
- sourceConn.start();
- }
- catch (JMSException e)
- {
- log.error("Failed to start source connection", e);
- }
- }
- }
- }
-
- private class BatchTimeChecker implements Runnable
- {
- public void run()
- {
- if (trace) { log.trace(this + " running"); }
-
- synchronized (lock)
- {
- while (started)
- {
- long toWait = batchExpiryTime - System.currentTimeMillis();
-
- if (toWait <= 0)
- {
- if (trace) { log.trace(this + " waited enough"); }
-
- synchronized (lock)
- {
- if (!failed && !messages.isEmpty())
- {
- if (trace) { log.trace(this + " got some messages so sending batch"); }
-
- sendBatch();
-
- if (trace) { log.trace(this + " sent batch"); }
- }
- }
-
- batchExpiryTime = System.currentTimeMillis() + maxBatchTime;
- }
- else
- {
- try
- {
- if (trace) { log.trace(this + " waiting for " + toWait); }
-
- lock.wait(toWait);
-
- if (trace) { log.trace(this + " woke up"); }
- }
- catch (InterruptedException e)
- {
- //Ignore
- if (trace) { log.trace(this + " thread was interrupted"); }
- }
-
- }
- }
- }
- }
- }
-
- private class SourceListener implements MessageListener
- {
- public void onMessage(Message msg)
- {
- synchronized (lock)
- {
- if (failed)
- {
- //Ignore the message
- if (trace) { log.trace("Bridge has failed so ignoring message"); }
-
- return;
- }
-
- if (trace) { log.trace(this + " received message " + msg); }
-
- messages.add(msg);
-
- batchExpiryTime = System.currentTimeMillis() + maxBatchTime;
-
- if (trace) { log.trace(this + " rescheduled batchExpiryTime to " + batchExpiryTime); }
-
- if (maxBatchSize != -1 && messages.size() >= maxBatchSize)
- {
- if (trace) { log.trace(this + " maxBatchSize has been reached so sending batch"); }
-
- sendBatch();
-
- if (trace) { log.trace(this + " sent batch"); }
- }
- }
- }
- }
-
- private class BridgeExceptionListener implements ExceptionListener
- {
- public void onException(JMSException e)
- {
- log.warn("Detected failure on connection", e);
-
- synchronized (lock)
- {
- if (failed)
- {
- //The failure has already been detected and is being handled
- if (trace) { log.trace("Failure recovery already in progress"); }
- }
- else
- {
- handleFailure(new FailureHandler());
- }
- }
- }
- }
-}
+ String getSelector();
+
+ void setSelector(String selector);
+
+ long getFailureRetryInterval();
+
+ void setFailureRetryInterval(long interval);
+
+ int getMaxRetries();
+
+ void setMaxRetries(int retries);
+
+ QualityOfServiceMode getQualityOfServiceMode();
+
+ void setQualityOfServiceMode(QualityOfServiceMode mode);
+
+ int getMaxBatchSize();
+
+ void setMaxBatchSize(int size);
+
+ long getMaxBatchTime();
+
+ void setMaxBatchTime(long time);
+
+ String getSubscriptionName();
+
+ void setSubscriptionName(String subname);
+
+ String getClientID();
+
+ void setClientID(String clientID);
+
+ boolean isAddMessageIDInHeader();
+
+ void setAddMessageIDInHeader(boolean value);
+
+ boolean isPaused();
+
+ boolean isFailed();
+
+ void setSourceConnectionFactoryFactory(ConnectionFactoryFactory cff);
+
+ void setTargetConnectionFactoryFactory(ConnectionFactoryFactory cff);
+
+ void setTransactionManager(TransactionManager tm);
+
+}
\ No newline at end of file
Modified: trunk/src/main/org/jboss/messaging/jms/bridge/BridgeService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/bridge/BridgeService.java 2008-06-25 12:28:42 UTC (rev 4575)
+++ trunk/src/main/org/jboss/messaging/jms/bridge/BridgeService.java 2008-06-25 13:00:39 UTC (rev 4576)
@@ -26,6 +26,7 @@
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.server.MessagingComponent;
+import org.jboss.messaging.jms.bridge.impl.BridgeImpl;
/**
* A BridgeService
@@ -52,7 +53,7 @@
public BridgeService()
{
- bridge = new Bridge();
+ bridge = new BridgeImpl();
}
// JMX attributes ----------------------------------------------------------------
@@ -139,32 +140,32 @@
public String getTargetUsername()
{
- return bridge.getDestUsername();
+ return bridge.getTargetUsername();
}
public String getTargetPassword()
{
- return bridge.getDestPassword();
+ return bridge.getTargetPassword();
}
public void setTargetUsername(String name)
{
- bridge.setDestUserName(name);
+ bridge.setTargetUsername(name);
}
public void setTargetPassword(String pwd)
{
- bridge.setDestPassword(pwd);
+ bridge.setTargetPassword(pwd);
}
public int getQualityOfServiceMode()
{
- return bridge.getQualityOfServiceMode();
+ return bridge.getQualityOfServiceMode().intValue();
}
public void setQualityOfServiceMode(int mode)
{
- bridge.setQualityOfServiceMode(mode);
+ bridge.setQualityOfServiceMode(QualityOfServiceMode.valueOf(mode));
}
public String getSelector()
@@ -199,12 +200,12 @@
public String getSubName()
{
- return bridge.getSubName();
+ return bridge.getSubscriptionName();
}
public void setSubName(String subname)
{
- bridge.setSubName(subname);
+ bridge.setSubscriptionName(subname);
}
public String getClientID()
Deleted: trunk/src/main/org/jboss/messaging/jms/bridge/JNDIConnectionFactoryFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/bridge/JNDIConnectionFactoryFactory.java 2008-06-25 12:28:42 UTC (rev 4575)
+++ trunk/src/main/org/jboss/messaging/jms/bridge/JNDIConnectionFactoryFactory.java 2008-06-25 13:00:39 UTC (rev 4576)
@@ -1,50 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.jms.bridge;
-
-import java.util.Hashtable;
-
-import javax.jms.ConnectionFactory;
-
-/**
- * A JNDIConnectionFactoryFactory
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision$</tt>
- *
- * $Id$
- *
- */
-public class JNDIConnectionFactoryFactory extends JNDIFactorySupport implements ConnectionFactoryFactory
-{
- public JNDIConnectionFactoryFactory(Hashtable jndiProperties, String lookup)
- {
- super(jndiProperties, lookup);
- }
-
- public ConnectionFactory createConnectionFactory() throws Exception
- {
- return (ConnectionFactory)createObject();
- }
-
-}
Deleted: trunk/src/main/org/jboss/messaging/jms/bridge/JNDIDestinationFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/bridge/JNDIDestinationFactory.java 2008-06-25 12:28:42 UTC (rev 4575)
+++ trunk/src/main/org/jboss/messaging/jms/bridge/JNDIDestinationFactory.java 2008-06-25 13:00:39 UTC (rev 4576)
@@ -1,48 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.jms.bridge;
-
-import java.util.Hashtable;
-
-import javax.jms.Destination;
-
-/**
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision: $</tt>10 Oct 2007
- *
- * $Id: $
- *
- */
-public class JNDIDestinationFactory extends JNDIFactorySupport implements DestinationFactory
-{
- public JNDIDestinationFactory(Hashtable jndiProperties, String lookup)
- {
- super(jndiProperties, lookup);
- }
-
- public Destination createDestination() throws Exception
- {
- return (Destination)createObject();
- }
-}
Deleted: trunk/src/main/org/jboss/messaging/jms/bridge/JNDIFactorySupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/bridge/JNDIFactorySupport.java 2008-06-25 12:28:42 UTC (rev 4575)
+++ trunk/src/main/org/jboss/messaging/jms/bridge/JNDIFactorySupport.java 2008-06-25 13:00:39 UTC (rev 4576)
@@ -1,78 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.jms.bridge;
-
-import java.util.Hashtable;
-
-import javax.naming.InitialContext;
-
-/**
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision: $</tt>10 Oct 2007
- *
- * $Id: $
- *
- */
-public abstract class JNDIFactorySupport
-{
- protected Hashtable jndiProperties;
-
- protected String lookup;
-
- protected JNDIFactorySupport(Hashtable jndiProperties, String lookup)
- {
- this.jndiProperties = jndiProperties;
-
- this.lookup = lookup;
- }
-
- protected Object createObject() throws Exception
- {
- InitialContext ic = null;
-
- Object obj = null;
-
- try
- {
- if (jndiProperties == null)
- {
- ic = new InitialContext();
- }
- else
- {
- ic = new InitialContext(jndiProperties);
- }
-
- obj = ic.lookup(lookup);
- }
- finally
- {
- if (ic != null)
- {
- ic.close();
- }
- }
- return obj;
- }
-}
Added: trunk/src/main/org/jboss/messaging/jms/bridge/impl/BridgeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/bridge/impl/BridgeImpl.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/jms/bridge/impl/BridgeImpl.java 2008-06-25 13:00:39 UTC (rev 4576)
@@ -0,0 +1,1579 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.jms.bridge.impl;
+
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.XAConnection;
+import javax.jms.XAConnectionFactory;
+import javax.jms.XASession;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+import javax.transaction.xa.XAResource;
+
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.server.MessagingComponent;
+import org.jboss.messaging.jms.bridge.Bridge;
+import org.jboss.messaging.jms.bridge.ConnectionFactoryFactory;
+import org.jboss.messaging.jms.bridge.DestinationFactory;
+import org.jboss.messaging.jms.bridge.QualityOfServiceMode;
+import org.jboss.messaging.jms.client.JBossMessage;
+import org.jboss.messaging.jms.client.JBossSession;
+import org.jboss.tm.TransactionManagerLocator;
+
+/**
+ *
+ * A Bridge
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision:4566 $</tt>
+ *
+ * $Id:Bridge.java 4566 2008-06-24 08:01:35Z jmesnil $
+ *
+ */
+public class BridgeImpl implements MessagingComponent, Bridge
+{
+ private static final Logger log;
+
+ private static boolean trace;
+
+ static
+ {
+ log = Logger.getLogger(BridgeImpl.class);
+
+ trace = log.isTraceEnabled();
+ }
+
+ private static final int TEN_YEARS = 60 * 60 * 24 * 365 * 10; // in ms
+
+ private final Object lock = new Object();
+
+ private String sourceUsername;
+
+ private String sourcePassword;
+
+ private String targetUsername;
+
+ private String targetPassword;
+
+ private TransactionManager tm;
+
+ private String selector;
+
+ private long failureRetryInterval;
+
+ private int maxRetries;
+
+ private QualityOfServiceMode qualityOfServiceMode;
+
+ private int maxBatchSize;
+
+ private long maxBatchTime;
+
+ private String subName;
+
+ private String clientID;
+
+ private volatile boolean addMessageIDInHeader;
+
+ private boolean started;
+
+ private LinkedList<Message> messages;
+
+ private ConnectionFactoryFactory sourceCff;
+
+ private ConnectionFactoryFactory targetCff;
+
+ private DestinationFactory sourceDestinationFactory;
+
+ private DestinationFactory targetDestinationFactory;
+
+ private Connection sourceConn;
+
+ private Connection targetConn;
+
+ private Destination sourceDestination;
+
+ private Destination targetDestination;
+
+ private Session sourceSession;
+
+ private Session targetSession;
+
+ private MessageConsumer sourceConsumer;
+
+ private MessageProducer targetProducer;
+
+ private BatchTimeChecker timeChecker;
+
+ private Thread checkerThread;
+
+ private long batchExpiryTime;
+
+ private boolean paused;
+
+ private Transaction tx;
+
+ private boolean failed;
+
+ private int forwardMode;
+
+ private static final int FORWARD_MODE_XA = 0;
+
+ private static final int FORWARD_MODE_LOCALTX = 1;
+
+ private static final int FORWARD_MODE_NONTX = 2;
+
+ /*
+ * Constructor for MBean
+ */
+ public BridgeImpl()
+ {
+ this.messages = new LinkedList<Message>();
+ }
+
+ public BridgeImpl(ConnectionFactoryFactory sourceCff, ConnectionFactoryFactory targetCff,
+ DestinationFactory sourceDestinationFactory, DestinationFactory targetDestinationFactory,
+ String sourceUsername, String sourcePassword,
+ String targetUsername, String targetPassword,
+ String selector, long failureRetryInterval,
+ int maxRetries,
+ QualityOfServiceMode qosMode,
+ int maxBatchSize, long maxBatchTime,
+ String subName, String clientID,
+ boolean addMessageIDInHeader)
+ {
+ this();
+
+ this.sourceCff = sourceCff;
+
+ this.targetCff = targetCff;
+
+ this.sourceDestinationFactory = sourceDestinationFactory;
+
+ this.targetDestinationFactory = targetDestinationFactory;
+
+ this.sourceUsername = sourceUsername;
+
+ this.sourcePassword = sourcePassword;
+
+ this.targetUsername = targetUsername;
+
+ this.targetPassword = targetPassword;
+
+ this.selector = selector;
+
+ this.failureRetryInterval = failureRetryInterval;
+
+ this.maxRetries = maxRetries;
+
+ this.qualityOfServiceMode = qosMode;
+
+ this.maxBatchSize = maxBatchSize;
+
+ this.maxBatchTime = maxBatchTime;
+
+ this.subName = subName;
+
+ this.clientID = clientID;
+
+ this.addMessageIDInHeader = addMessageIDInHeader;
+
+ checkParams();
+
+ if (trace)
+ {
+ log.trace("Created " + this);
+ }
+ }
+
+ // MessagingComponent overrides --------------------------------------------------
+
+ public synchronized void start() throws Exception
+ {
+ if (started)
+ {
+ log.warn("Attempt to start, but is already started");
+ return;
+ }
+
+ if (trace) { log.trace("Starting " + this); }
+
+ checkParams();
+
+ TransactionManager tm = getTm();
+
+ //There may already be a JTA transaction associated to the thread
+
+ boolean ok;
+
+ Transaction toResume = null;
+ try
+ {
+ toResume = tm.suspend();
+
+ ok = setupJMSObjects();
+ }
+ finally
+ {
+ if (toResume != null)
+ {
+ tm.resume(toResume);
+ }
+ }
+
+ if (ok)
+ {
+ //start the source connection
+
+ sourceConn.start();
+
+ started = true;
+
+ if (maxBatchTime != -1)
+ {
+ if (trace) { log.trace("Starting time checker thread"); }
+
+ timeChecker = new BatchTimeChecker();
+
+ checkerThread = new Thread(timeChecker);
+
+ batchExpiryTime = System.currentTimeMillis() + maxBatchTime;
+
+ checkerThread.start();
+
+ if (trace) { log.trace("Started time checker thread"); }
+ }
+
+ if (trace) { log.trace("Started " + this); }
+ }
+ else
+ {
+ log.warn("Failed to start bridge");
+ handleFailureOnStartup();
+ }
+ }
+
+ public synchronized void stop() throws Exception
+ {
+ if (!started)
+ {
+ log.warn("Attempt to stop, but is already stopped");
+ return;
+ }
+
+ if (trace) { log.trace("Stopping " + this); }
+
+ synchronized (lock)
+ {
+ started = false;
+
+ //This must be inside sync block
+ if (checkerThread != null)
+ {
+ checkerThread.interrupt();
+ }
+ }
+
+ //This must be outside sync block
+ if (checkerThread != null)
+ {
+ if (trace) { log.trace("Waiting for checker thread to finish");}
+
+ checkerThread.join();
+
+ if (trace) { log.trace("Checker thread has finished"); }
+ }
+
+ if (tx != null)
+ {
+ //Terminate any transaction
+ if (trace) { log.trace("Rolling back remaining tx"); }
+
+ try
+ {
+ tx.rollback();
+ }
+ catch (Exception ignore)
+ {
+ if (trace) { log.trace("Failed to rollback", ignore); }
+ }
+
+ if (trace) { log.trace("Rolled back remaining tx"); }
+ }
+
+ try
+ {
+ sourceConn.close();
+ }
+ catch (Exception ignore)
+ {
+ if (trace) { log.trace("Failed to close source conn", ignore); }
+ }
+
+ if (targetConn != null)
+ {
+ try
+ {
+ targetConn.close();
+ }
+ catch (Exception ignore)
+ {
+ if (trace) { log.trace("Failed to close target conn", ignore); }
+ }
+ }
+
+ if (trace) { log.trace("Stopped " + this); }
+ }
+
+ public synchronized boolean isStarted()
+ {
+ return started;
+ }
+
+ // Bridge implementation ------------------------------------------------------------
+
+ public synchronized void pause() throws Exception
+ {
+ if (trace) { log.trace("Pausing " + this); }
+
+ synchronized (lock)
+ {
+ paused = true;
+
+ sourceConn.stop();
+ }
+
+ if (trace) { log.trace("Paused " + this); }
+ }
+
+ public synchronized void resume() throws Exception
+ {
+ if (trace) { log.trace("Resuming " + this); }
+
+ synchronized (lock)
+ {
+ paused = false;
+
+ sourceConn.start();
+ }
+
+ if (trace) { log.trace("Resumed " + this); }
+ }
+
+ public DestinationFactory getSourceDestinationFactory()
+ {
+ return sourceDestinationFactory;
+ }
+
+ public void setSourceDestinationFactory(DestinationFactory dest)
+ {
+ checkBridgeNotStarted();
+ checkNotNull(dest, "TargetDestinationFactory");
+
+ sourceDestinationFactory = dest;
+ }
+
+ public DestinationFactory getTargetDestinationFactory()
+ {
+ return targetDestinationFactory;
+ }
+
+ public void setTargetDestinationFactory(DestinationFactory dest)
+ {
+ checkBridgeNotStarted();
+ checkNotNull(dest, "TargetDestinationFactory");
+
+ targetDestinationFactory = dest;
+ }
+
+ public String getSourceUsername()
+ {
+ return sourceUsername;
+ }
+
+ public synchronized void setSourceUsername(String name)
+ {
+ checkBridgeNotStarted();
+
+ sourceUsername = name;
+ }
+
+ public synchronized String getSourcePassword()
+ {
+ return sourcePassword;
+ }
+
+ public synchronized void setSourcePassword(String pwd)
+ {
+ checkBridgeNotStarted();
+
+ sourcePassword = pwd;
+ }
+
+ public synchronized String getTargetUsername()
+ {
+ return targetUsername;
+ }
+
+ public synchronized void setTargetUsername(String name)
+ {
+ checkBridgeNotStarted();
+
+ this.targetUsername = name;
+ }
+
+ public synchronized String getTargetPassword()
+ {
+ return this.targetPassword;
+ }
+
+ public synchronized void setTargetPassword(String pwd)
+ {
+ checkBridgeNotStarted();
+
+ this.targetPassword = pwd;
+ }
+
+ public synchronized String getSelector()
+ {
+ return selector;
+ }
+
+ public synchronized void setSelector(String selector)
+ {
+ checkBridgeNotStarted();
+
+ this.selector = selector;
+ }
+
+ public synchronized long getFailureRetryInterval()
+ {
+ return failureRetryInterval;
+ }
+
+ public synchronized void setFailureRetryInterval(long interval)
+ {
+ checkBridgeNotStarted();
+ checkValidValue(interval, "FailureRetryInterval");
+
+ this.failureRetryInterval = interval;
+ }
+
+ public synchronized int getMaxRetries()
+ {
+ return maxRetries;
+ }
+
+ public synchronized void setMaxRetries(int retries)
+ {
+ checkBridgeNotStarted();
+ checkValidValue(retries, "MaxRetries");
+
+ this.maxRetries = retries;
+ }
+
+ public synchronized QualityOfServiceMode getQualityOfServiceMode()
+ {
+ return qualityOfServiceMode;
+ }
+
+ public synchronized void setQualityOfServiceMode(QualityOfServiceMode mode)
+ {
+ checkBridgeNotStarted();
+ checkNotNull(mode, "QualityOfServiceMode");
+
+ qualityOfServiceMode = mode;
+ }
+
+ public synchronized int getMaxBatchSize()
+ {
+ return maxBatchSize;
+ }
+
+ public synchronized void setMaxBatchSize(int size)
+ {
+ checkBridgeNotStarted();
+ checkMaxBatchSize(size);
+
+ maxBatchSize = size;
+ }
+
+ public synchronized long getMaxBatchTime()
+ {
+ return maxBatchTime;
+ }
+
+ public synchronized void setMaxBatchTime(long time)
+ {
+ checkBridgeNotStarted();
+ checkValidValue(time, "MaxBatchTime");
+
+ maxBatchTime = time;
+ }
+
+ public synchronized String getSubscriptionName()
+ {
+ return this.subName;
+ }
+
+ public synchronized void setSubscriptionName(String subname)
+ {
+ checkBridgeNotStarted();
+ this.subName = subname;
+ }
+
+ public synchronized String getClientID()
+ {
+ return clientID;
+ }
+
+ public synchronized void setClientID(String clientID)
+ {
+ checkBridgeNotStarted();
+
+ this.clientID = clientID;
+ }
+
+ public boolean isAddMessageIDInHeader()
+ {
+ return this.addMessageIDInHeader;
+ }
+
+ public void setAddMessageIDInHeader(boolean value)
+ {
+ this.addMessageIDInHeader = value;
+ }
+
+ public synchronized boolean isPaused()
+ {
+ return paused;
+ }
+
+ public synchronized boolean isFailed()
+ {
+ return failed;
+ }
+
+ public synchronized void setSourceConnectionFactoryFactory(ConnectionFactoryFactory cff)
+ {
+ checkBridgeNotStarted();
+ checkNotNull(cff, "SourceConnectionFactoryFactory");
+
+ this.sourceCff = cff;
+ }
+
+ public synchronized void setTargetConnectionFactoryFactory(ConnectionFactoryFactory cff)
+ {
+ checkBridgeNotStarted();
+ checkNotNull(cff, "TargetConnectionFactoryFactory");
+
+ this.targetCff = cff;
+ }
+
+ public void setTransactionManager(TransactionManager tm)
+ {
+ this.tm = tm;
+ }
+
+ // Public ---------------------------------------------------------------------------
+
+ // Private -------------------------------------------------------------------
+
+ private void checkParams()
+ {
+ checkNotNull(sourceCff, "sourceCff");
+ checkNotNull(targetCff, "targetCff");
+ checkNotNull(sourceDestinationFactory, "sourceDestinationFactory");
+ checkNotNull(targetDestinationFactory, "targetDestinationFactory");
+ checkValidValue(failureRetryInterval, "failureRetryInterval");
+ checkValidValue(maxRetries, "maxRetries");
+ if (failureRetryInterval == -1 && maxRetries > 0)
+ {
+ throw new IllegalArgumentException("If failureRetryInterval == -1 maxRetries must be 0");
+ }
+ checkMaxBatchSize(maxBatchSize);
+ checkValidValue(maxBatchTime, "maxBatchTime");
+ checkNotNull(qualityOfServiceMode, "qualityOfServiceMode");
+ }
+
+ /**
+ * Check the object is not null
+ *
+ * @throws IllegalArgumentException if the object is null
+ */
+ private static void checkNotNull(Object obj, String name)
+ {
+ if (obj == null)
+ {
+ throw new IllegalArgumentException(name + " cannot be null");
+ }
+ }
+
+ /**
+ * Check the bridge is not started
+ *
+ * @throws IllegalStateException if the bridge is started
+ */
+ private void checkBridgeNotStarted()
+ {
+ if (started)
+ {
+ throw new IllegalStateException("Cannot set bridge attributes while it is started");
+ }
+ }
+
+ /**
+ * Check that value is either equals to -1 or greater than 0
+ *
+ * @throws IllegalArgumentException if the value is not valid
+ */
+ private static void checkValidValue(long value, String name)
+ {
+ if (!(value == -1 || value > 0))
+ {
+ throw new IllegalArgumentException(name + " must be > 0 or -1");
+ }
+ }
+
+ private static void checkMaxBatchSize(int size)
+ {
+ if (!(size >= 1))
+ {
+ throw new IllegalArgumentException("maxBatchSize must be >= 1");
+ }
+ }
+
+ private void enlistResources(Transaction tx) throws Exception
+ {
+ if (trace) { log.trace("Enlisting resources in tx"); }
+
+ XAResource resSource = ((XASession)sourceSession).getXAResource();
+
+ tx.enlistResource(resSource);
+
+ XAResource resDest = ((XASession)targetSession).getXAResource();
+
+ tx.enlistResource(resDest);
+
+ if (trace) { log.trace("Enlisted resources in tx"); }
+ }
+
+ private void delistResources(Transaction tx) throws Exception
+ {
+ if (trace) { log.trace("Delisting resources from tx"); }
+
+ XAResource resSource = ((XASession)sourceSession).getXAResource();
+
+ tx.delistResource(resSource, XAResource.TMSUCCESS);
+
+ XAResource resDest = ((XASession)targetSession).getXAResource();
+
+ tx.delistResource(resDest, XAResource.TMSUCCESS);
+
+ if (trace) { log.trace("Delisted resources from tx"); }
+ }
+
+ private Transaction startTx() throws Exception
+ {
+ if (trace) { log.trace("Starting JTA transaction"); }
+
+ TransactionManager tm = getTm();
+
+ //Set timeout to a large value since we do not want to time out while waiting for messages
+ //to arrive - 10 years should be enough
+ tm.setTransactionTimeout(TEN_YEARS);
+
+ tm.begin();
+
+ Transaction tx = tm.getTransaction();
+
+ //Remove the association between current thread - we don't want it
+ //we will be committing /rolling back directly on the transaction object
+
+ tm.suspend();
+
+ if (trace) { log.trace("Started JTA transaction"); }
+
+ return tx;
+ }
+
+ private TransactionManager getTm()
+ {
+ if (tm == null)
+ {
+ tm = TransactionManagerLocator.getInstance().locate();
+
+ if (tm == null)
+ {
+ throw new IllegalStateException("Cannot locate a transaction manager");
+ }
+ }
+
+ return tm;
+ }
+
+ private Connection createConnection(String username, String password, ConnectionFactoryFactory cff)
+ throws Exception
+ {
+ Connection conn;
+
+ ConnectionFactory cf = cff.createConnectionFactory();
+
+ if (qualityOfServiceMode == QualityOfServiceMode.ONCE_AND_ONLY_ONCE &&
+ !(cf instanceof XAConnectionFactory))
+ {
+ throw new IllegalArgumentException("Connection factory must be XAConnectionFactory");
+ }
+
+ if (username == null)
+ {
+ if (qualityOfServiceMode == QualityOfServiceMode.ONCE_AND_ONLY_ONCE)
+ {
+ if (trace) { log.trace("Creating an XA connection"); }
+ conn = ((XAConnectionFactory)cf).createXAConnection();
+ }
+ else
+ {
+ if (trace) { log.trace("Creating a non XA connection"); }
+ conn = cf.createConnection();
+ }
+ }
+ else
+ {
+ if (qualityOfServiceMode == QualityOfServiceMode.ONCE_AND_ONLY_ONCE)
+ {
+ if (trace) { log.trace("Creating an XA connection"); }
+ conn = ((XAConnectionFactory)cf).createXAConnection(username, password);
+ }
+ else
+ {
+ if (trace) { log.trace("Creating a non XA connection"); }
+ conn = cf.createConnection(username, password);
+ }
+ }
+
+ return conn;
+ }
+
+ /*
+ * Source and target on same server
+ * --------------------------------
+ * If the source and target destinations are on the same server (same resource manager) then,
+ * in order to get ONCE_AND_ONLY_ONCE, we simply need to consuming and send in a single
+ * local JMS transaction.
+ *
+ * We actually use a single local transacted session for the other QoS modes too since this
+ * is more performant than using DUPS_OK_ACKNOWLEDGE or AUTO_ACKNOWLEDGE session ack modes, so effectively
+ * the QoS is upgraded.
+ *
+ * Source and target on different server
+ * -------------------------------------
+ * If the source and target destinations are on a different servers (different resource managers) then:
+ *
+ * If desired QoS is ONCE_AND_ONLY_ONCE, then we start a JTA transaction and enlist the consuming and sending
+ * XAResources in that.
+ *
+ * If desired QoS is DUPLICATES_OK then, we use CLIENT_ACKNOWLEDGE for the consuming session and
+ * AUTO_ACKNOWLEDGE (this is ignored) for the sending session if the maxBatchSize == 1, otherwise we
+ * use a local transacted session for the sending session where maxBatchSize > 1, since this is more performant
+ * When bridging a batch, we make sure to manually acknowledge the consuming session, if it is CLIENT_ACKNOWLEDGE
+ * *after* the batch has been sent
+ *
+ * If desired QoS is AT_MOST_ONCE then, if maxBatchSize == 1, we use AUTO_ACKNOWLEDGE for the consuming session,
+ * and AUTO_ACKNOWLEDGE for the sending session.
+ * If maxBatchSize > 1, we use CLIENT_ACKNOWLEDGE for the consuming session and a local transacted session for the
+ * sending session.
+ *
+ * When bridging a batch, we make sure to manually acknowledge the consuming session, if it is CLIENT_ACKNOWLEDGE
+ * *before* the batch has been sent
+ *
+ */
+ private boolean setupJMSObjects()
+ {
+ try
+ {
+ if (sourceCff == targetCff)
+ {
+ //Source and target destinations are on the server - we can get once and only once
+ //just using a local transacted session
+ //everything becomes once and only once
+
+ forwardMode = FORWARD_MODE_LOCALTX;
+ }
+ else
+ {
+ //Different servers
+ if (qualityOfServiceMode == QualityOfServiceMode.ONCE_AND_ONLY_ONCE)
+ {
+ //Use XA
+
+ forwardMode = FORWARD_MODE_XA;
+ }
+ else
+ {
+ forwardMode = FORWARD_MODE_NONTX;
+ }
+ }
+
+ //Lookup the destinations
+ sourceDestination = sourceDestinationFactory.createDestination();
+
+ targetDestination = targetDestinationFactory.createDestination();
+
+ sourceConn = createConnection(sourceUsername, sourcePassword, sourceCff);
+
+ if (forwardMode != FORWARD_MODE_LOCALTX)
+ {
+ targetConn = createConnection(targetUsername, targetPassword, targetCff);
+
+ targetConn.setExceptionListener(new BridgeExceptionListener());
+ }
+
+ if (clientID != null)
+ {
+ sourceConn.setClientID(clientID);
+ }
+
+ sourceConn.setExceptionListener(new BridgeExceptionListener());
+
+ Session sess;
+
+ if (forwardMode == FORWARD_MODE_LOCALTX)
+ {
+ //We simply use a single local transacted session for consuming and sending
+
+ sourceSession = sourceConn.createSession(true, Session.SESSION_TRANSACTED);
+
+ sess = sourceSession;
+ }
+ else
+ {
+ if (forwardMode == FORWARD_MODE_XA)
+ {
+ //Create an XASession for consuming from the source
+ if (trace) { log.trace("Creating XA source session"); }
+
+ sourceSession = ((XAConnection)sourceConn).createXASession();
+
+ sess = ((XASession)sourceSession).getSession();
+ }
+ else
+ {
+ if (trace) { log.trace("Creating non XA source session"); }
+
+ //Create a standard session for consuming from the source
+
+ //We use ack mode client ack
+
+ sourceSession = sourceConn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ sess = sourceSession;
+ }
+ }
+
+ if (forwardMode == FORWARD_MODE_XA && sourceSession instanceof JBossSession)
+ {
+ JBossSession jsession = (JBossSession)sourceSession;
+
+ ClientSession clientSession = jsession.getDelegate();
+
+ //clientSession.setTreatAsNonTransactedWhenNotEnlisted(false);
+ }
+
+ if (subName == null)
+ {
+ if (selector == null)
+ {
+ sourceConsumer = sess.createConsumer(sourceDestination);
+ }
+ else
+ {
+ sourceConsumer = sess.createConsumer(sourceDestination, selector, false);
+ }
+ }
+ else
+ {
+ //Durable subscription
+ if (selector == null)
+ {
+ sourceConsumer = sess.createDurableSubscriber((Topic)sourceDestination, subName);
+ }
+ else
+ {
+ sourceConsumer = sess.createDurableSubscriber((Topic)sourceDestination, subName, selector, false);
+ }
+ }
+
+ //Now the sending session
+
+
+ if (forwardMode != FORWARD_MODE_LOCALTX)
+ {
+ if (forwardMode == FORWARD_MODE_XA)
+ {
+ if (trace) { log.trace("Creating XA dest session"); }
+
+ //Create an XA sesion for sending to the destination
+
+ targetSession = ((XAConnection)targetConn).createXASession();
+
+ sess = ((XASession)targetSession).getSession();
+ }
+ else
+ {
+ if (trace) { log.trace("Creating non XA dest session"); }
+
+ //Create a standard session for sending to the target
+
+ //If batch size > 1 we use a transacted session since is more efficient
+
+ boolean transacted = maxBatchSize > 1;
+
+ targetSession = targetConn.createSession(transacted, transacted ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
+
+ sess = targetSession;
+ }
+ }
+
+ if (forwardMode == FORWARD_MODE_XA)
+ {
+ if (trace) { log.trace("Starting JTA transaction"); }
+
+ tx = startTx();
+
+ enlistResources(tx);
+ }
+
+ targetProducer = sess.createProducer(null);
+
+ sourceConsumer.setMessageListener(new SourceListener());
+
+ return true;
+ }
+ catch (Exception e)
+ {
+ log.warn("Failed to set up connections", e);
+
+ //If this fails we should attempt to cleanup or we might end up in some weird state
+
+ cleanup();
+
+ return false;
+ }
+ }
+
+ private void cleanup()
+ {
+
+ //Close the old objects
+ try
+ {
+ sourceConn.close();
+ }
+ catch (Throwable ignore)
+ {
+ if (trace) { log.trace("Failed to close source connection", ignore); }
+ }
+ try
+ {
+ if (targetConn != null)
+ {
+ targetConn.close();
+ }
+ }
+ catch (Throwable ignore)
+ {
+ if (trace) { log.trace("Failed to close target connection", ignore); }
+ }
+
+
+ if (tx != null)
+ {
+ try
+ {
+ delistResources(tx);
+ }
+ catch (Throwable ignore)
+ {
+ if (trace) { log.trace("Failed to delist resources", ignore); }
+ }
+ try
+ {
+ //Terminate the tx
+ tx.rollback();
+ }
+ catch (Throwable ignore)
+ {
+ if (trace) { log.trace("Failed to rollback", ignore); }
+ }
+ }
+ }
+
+ private void pause(long interval)
+ {
+ long start = System.currentTimeMillis();
+ while (System.currentTimeMillis() - start < failureRetryInterval)
+ {
+ try
+ {
+ Thread.sleep(failureRetryInterval);
+ }
+ catch (InterruptedException ex)
+ {
+ }
+ }
+ }
+
+ private boolean setupJMSObjectsWithRetry()
+ {
+ if (trace) { log.trace("Setting up connections"); }
+
+ int count = 0;
+
+ while (true)
+ {
+ boolean ok = setupJMSObjects();
+
+ if (ok)
+ {
+ return true;
+ }
+
+ count++;
+
+ if (maxRetries != -1 && count == maxRetries)
+ {
+ break;
+ }
+
+ log.warn("Failed to set up connections, will retry after a pause of " + failureRetryInterval + " ms");
+
+ pause(failureRetryInterval);
+ }
+
+ //If we get here then we exceed maxRetries
+ return false;
+ }
+
+
+ private void sendBatch()
+ {
+ if (trace) { log.trace("Sending batch of " + messages.size() + " messages"); }
+
+ if (paused)
+ {
+ //Don't send now
+ if (trace) { log.trace("Paused, so not sending now"); }
+
+ return;
+ }
+
+ if (forwardMode == FORWARD_MODE_LOCALTX)
+ {
+ sendBatchLocalTx();
+ }
+ else if (forwardMode == FORWARD_MODE_XA)
+ {
+ sendBatchXA();
+ }
+ else
+ {
+ sendBatchNonTransacted();
+ }
+ }
+
+ private void sendBatchNonTransacted()
+ {
+ try
+ {
+ if (qualityOfServiceMode == QualityOfServiceMode.ONCE_AND_ONLY_ONCE)
+ {
+ //We client ack before sending
+
+ if (trace) { log.trace("Client acking source session"); }
+
+ ((Message)messages.getLast()).acknowledge();
+
+ if (trace) { log.trace("Client acked source session"); }
+ }
+
+ sendMessages();
+
+ if (maxBatchSize > 1)
+ {
+ //The sending session is transacted - we need to commit it
+
+ if (trace) { log.trace("Committing target session"); }
+
+ targetSession.commit();
+
+ if (trace) { log.trace("Committed source session"); }
+ }
+
+ if (qualityOfServiceMode == QualityOfServiceMode.DUPLICATES_OK)
+ {
+ //We client ack after sending
+
+ //Note we could actually use Session.DUPS_OK_ACKNOWLEDGE here
+ //For a slightly less strong delivery guarantee
+
+ if (trace) { log.trace("Client acking source session"); }
+
+ messages.getLast().acknowledge();
+
+ if (trace) { log.trace("Client acked source session"); }
+ }
+
+ //Clear the messages
+ messages.clear();
+ }
+ catch (Exception e)
+ {
+ log.warn("Failed to send + acknowledge batch, closing JMS objects", e);
+
+ handleFailureOnSend();
+ }
+ }
+
+ private void sendBatchXA()
+ {
+ try
+ {
+ sendMessages();
+
+ //Commit the JTA transaction and start another
+
+ delistResources(tx);
+
+ if (trace) { log.trace("Committing JTA transaction"); }
+
+ tx.commit();
+
+ if (trace) { log.trace("Committed JTA transaction"); }
+
+ tx = startTx();
+
+ enlistResources(tx);
+
+ //Clear the messages
+ messages.clear();
+ }
+ catch (Exception e)
+ {
+ log.warn("Failed to send + acknowledge batch, closing JMS objects", e);
+
+ handleFailureOnSend();
+ }
+ }
+
+ private void sendBatchLocalTx()
+ {
+ try
+ {
+ sendMessages();
+
+ if (trace) { log.trace("Committing source session"); }
+
+ sourceSession.commit();
+
+ if (trace) { log.trace("Committed source session"); }
+
+ //Clear the messages
+ messages.clear();
+ }
+ catch (Exception e)
+ {
+ log.warn("Failed to send + acknowledge batch, closing JMS objects", e);
+
+ handleFailureOnSend();
+ }
+ }
+
+ private void sendMessages() throws Exception
+ {
+ Iterator iter = messages.iterator();
+
+ Message msg = null;
+
+ while (iter.hasNext())
+ {
+ msg = (Message)iter.next();
+
+ if (addMessageIDInHeader)
+ {
+ addMessageIDInHeader(msg);
+ }
+
+ if (trace) { log.trace("Sending message " + msg); }
+
+ //Make sure the correct time to live gets propagated
+
+ long timeToLive = msg.getJMSExpiration();
+
+ if (timeToLive != 0)
+ {
+ timeToLive -= System.currentTimeMillis();
+
+ if (timeToLive <= 0)
+ {
+ timeToLive = 1; //Should have already expired - set to 1 so it expires when it is consumed or delivered
+ }
+ }
+
+ targetProducer.send(targetDestination, msg, msg.getJMSDeliveryMode(), msg.getJMSPriority(), timeToLive);
+
+ if (trace) { log.trace("Sent message " + msg); }
+ }
+ }
+
+ private void handleFailureOnSend()
+ {
+ handleFailure(new FailureHandler());
+ }
+
+ private void handleFailureOnStartup()
+ {
+ handleFailure(new StartupFailureHandler());
+ }
+
+ private void handleFailure(Runnable failureHandler)
+ {
+ failed = true;
+
+ //Failure must be handled on a separate thread to the calling thread (either onMessage or start).
+ //In the case of onMessage we can't close the connection from inside the onMessage method
+ //since it will block waiting for onMessage to complete. In the case of start we want to return
+ //from the call before the connections are reestablished so that the caller is not blocked unnecessarily.
+ Thread t = new Thread(failureHandler);
+
+ t.start();
+ }
+
+ private void addMessageIDInHeader(Message msg) throws Exception
+ {
+ //We concatenate the old message id as a header in the message
+ //This allows the target to then use this as the JMSCorrelationID of any response message
+ //thus enabling a distributed request-response pattern.
+ //Each bridge (if there are more than one) in the chain can concatenate the message id
+ //So in the case of multiple bridges having routed the message this can be used in a multi-hop
+ //distributed request/response
+ if (trace) { log.trace("Adding old message id in Message header"); }
+
+ copyProperties(msg);
+
+ String val = null;
+
+ val = msg.getStringProperty(JBossMessage.JBOSS_MESSAGING_BRIDGE_MESSAGE_ID_LIST);
+
+ if (val == null)
+ {
+ val = msg.getJMSMessageID();
+ }
+ else
+ {
+ StringBuffer sb = new StringBuffer(val);
+
+ sb.append(",").append(msg.getJMSMessageID());
+
+ val = sb.toString();
+ }
+
+ msg.setStringProperty(JBossMessage.JBOSS_MESSAGING_BRIDGE_MESSAGE_ID_LIST, val);
+ }
+
+
+ /*
+ * JMS does not let you add a property on received message without first
+ * calling clearProperties, so we need to save and re-add all the old properties so we
+ * don't lose them!!
+ */
+ private static void copyProperties(Message msg) throws JMSException
+ {
+ Enumeration en = msg.getPropertyNames();
+
+ Map<String, Object> oldProps = null;
+
+ while (en.hasMoreElements())
+ {
+ String propName = (String)en.nextElement();
+
+ if (oldProps == null)
+ {
+ oldProps = new HashMap<String, Object>();
+ }
+
+ oldProps.put(propName, msg.getObjectProperty(propName));
+ }
+
+ msg.clearProperties();
+
+ if (oldProps != null)
+ {
+ Iterator oldPropsIter = oldProps.entrySet().iterator();
+
+ while (oldPropsIter.hasNext())
+ {
+ Map.Entry entry = (Map.Entry)oldPropsIter.next();
+
+ String propName = (String)entry.getKey();
+
+ msg.setObjectProperty(propName, entry.getValue());
+ }
+ }
+ }
+
+ // Inner classes ---------------------------------------------------------------
+
+ private class FailureHandler implements Runnable
+ {
+ /**
+ * Start the source connection - note the source connection must not be started before
+ * otherwise messages will be received and ignored
+ */
+ protected void startSourceConnection()
+ {
+ try
+ {
+ sourceConn.start();
+ }
+ catch (JMSException e)
+ {
+ log.error("Failed to start source connection", e);
+ }
+ }
+
+ protected void succeeded()
+ {
+ log.info("Succeeded in reconnecting to servers");
+
+ synchronized (lock)
+ {
+ failed = false;
+
+ startSourceConnection();
+ }
+ }
+
+ protected void failed()
+ {
+ //We haven't managed to recreate connections or maxRetries = 0
+ log.warn("Unable to set up connections, bridge will be stopped");
+
+ try
+ {
+ stop();
+ }
+ catch (Exception ignore)
+ {
+ }
+ }
+
+ public void run()
+ {
+ if (trace) { log.trace("Failure handler running"); }
+
+ // Clear the messages
+ messages.clear();
+
+ cleanup();
+
+ boolean ok = false;
+
+ if (maxRetries > 0 || maxRetries == -1)
+ {
+ log.warn("Will retry after a pause of " + failureRetryInterval + " ms");
+
+ pause(failureRetryInterval);
+
+ //Now we try
+ ok = setupJMSObjectsWithRetry();
+ }
+
+ if (!ok)
+ {
+ failed();
+ }
+ else
+ {
+ succeeded();
+ }
+ }
+ }
+
+ private class StartupFailureHandler extends FailureHandler
+ {
+ protected void failed()
+ {
+ // Don't call super
+ log.warn("Unable to set up connections, bridge will not be started");
+ }
+
+ protected void succeeded()
+ {
+ // Don't call super - a bit ugly in this case but better than taking the lock twice.
+ log.info("Succeeded in connecting to servers");
+
+ synchronized (lock)
+ {
+ failed = false;
+ started = true;
+
+ //Start the source connection - note the source connection must not be started before
+ //otherwise messages will be received and ignored
+
+ try
+ {
+ sourceConn.start();
+ }
+ catch (JMSException e)
+ {
+ log.error("Failed to start source connection", e);
+ }
+ }
+ }
+ }
+
+ private class BatchTimeChecker implements Runnable
+ {
+ public void run()
+ {
+ if (trace) { log.trace(this + " running"); }
+
+ synchronized (lock)
+ {
+ while (started)
+ {
+ long toWait = batchExpiryTime - System.currentTimeMillis();
+
+ if (toWait <= 0)
+ {
+ if (trace) { log.trace(this + " waited enough"); }
+
+ synchronized (lock)
+ {
+ if (!failed && !messages.isEmpty())
+ {
+ if (trace) { log.trace(this + " got some messages so sending batch"); }
+
+ sendBatch();
+
+ if (trace) { log.trace(this + " sent batch"); }
+ }
+ }
+
+ batchExpiryTime = System.currentTimeMillis() + maxBatchTime;
+ }
+ else
+ {
+ try
+ {
+ if (trace) { log.trace(this + " waiting for " + toWait); }
+
+ lock.wait(toWait);
+
+ if (trace) { log.trace(this + " woke up"); }
+ }
+ catch (InterruptedException e)
+ {
+ //Ignore
+ if (trace) { log.trace(this + " thread was interrupted"); }
+ }
+
+ }
+ }
+ }
+ }
+ }
+
+ private class SourceListener implements MessageListener
+ {
+ public void onMessage(Message msg)
+ {
+ synchronized (lock)
+ {
+ if (failed)
+ {
+ //Ignore the message
+ if (trace) { log.trace("Bridge has failed so ignoring message"); }
+
+ return;
+ }
+
+ if (trace) { log.trace(this + " received message " + msg); }
+
+ messages.add(msg);
+
+ batchExpiryTime = System.currentTimeMillis() + maxBatchTime;
+
+ if (trace) { log.trace(this + " rescheduled batchExpiryTime to " + batchExpiryTime); }
+
+ if (maxBatchSize != -1 && messages.size() >= maxBatchSize)
+ {
+ if (trace) { log.trace(this + " maxBatchSize has been reached so sending batch"); }
+
+ sendBatch();
+
+ if (trace) { log.trace(this + " sent batch"); }
+ }
+ }
+ }
+ }
+
+ private class BridgeExceptionListener implements ExceptionListener
+ {
+ public void onException(JMSException e)
+ {
+ log.warn("Detected failure on connection", e);
+
+ synchronized (lock)
+ {
+ if (failed)
+ {
+ //The failure has already been detected and is being handled
+ if (trace) { log.trace("Failure recovery already in progress"); }
+ }
+ else
+ {
+ handleFailure(new FailureHandler());
+ }
+ }
+ }
+ }
+}
Copied: trunk/src/main/org/jboss/messaging/jms/bridge/impl/JNDIConnectionFactoryFactory.java (from rev 4566, trunk/src/main/org/jboss/messaging/jms/bridge/JNDIConnectionFactoryFactory.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/bridge/impl/JNDIConnectionFactoryFactory.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/jms/bridge/impl/JNDIConnectionFactoryFactory.java 2008-06-25 13:00:39 UTC (rev 4576)
@@ -0,0 +1,52 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.jms.bridge.impl;
+
+import java.util.Hashtable;
+
+import javax.jms.ConnectionFactory;
+
+import org.jboss.messaging.jms.bridge.ConnectionFactoryFactory;
+
+/**
+ * A JNDIConnectionFactoryFactory
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision:4566 $</tt>
+ *
+ * $Id:JNDIConnectionFactoryFactory.java 4566 2008-06-24 08:01:35Z jmesnil $
+ *
+ */
+public class JNDIConnectionFactoryFactory extends JNDIFactorySupport implements ConnectionFactoryFactory
+{
+ public JNDIConnectionFactoryFactory(Hashtable jndiProperties, String lookup)
+ {
+ super(jndiProperties, lookup);
+ }
+
+ public ConnectionFactory createConnectionFactory() throws Exception
+ {
+ return (ConnectionFactory)createObject();
+ }
+
+}
Copied: trunk/src/main/org/jboss/messaging/jms/bridge/impl/JNDIDestinationFactory.java (from rev 4566, trunk/src/main/org/jboss/messaging/jms/bridge/JNDIDestinationFactory.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/bridge/impl/JNDIDestinationFactory.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/jms/bridge/impl/JNDIDestinationFactory.java 2008-06-25 13:00:39 UTC (rev 4576)
@@ -0,0 +1,50 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.jms.bridge.impl;
+
+import java.util.Hashtable;
+
+import javax.jms.Destination;
+
+import org.jboss.messaging.jms.bridge.DestinationFactory;
+
+/**
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: $</tt>10 Oct 2007
+ *
+ * $Id: $
+ *
+ */
+public class JNDIDestinationFactory extends JNDIFactorySupport implements DestinationFactory
+{
+ public JNDIDestinationFactory(Hashtable jndiProperties, String lookup)
+ {
+ super(jndiProperties, lookup);
+ }
+
+ public Destination createDestination() throws Exception
+ {
+ return (Destination)createObject();
+ }
+}
Copied: trunk/src/main/org/jboss/messaging/jms/bridge/impl/JNDIFactorySupport.java (from rev 4566, trunk/src/main/org/jboss/messaging/jms/bridge/JNDIFactorySupport.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/bridge/impl/JNDIFactorySupport.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/jms/bridge/impl/JNDIFactorySupport.java 2008-06-25 13:00:39 UTC (rev 4576)
@@ -0,0 +1,78 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.jms.bridge.impl;
+
+import java.util.Hashtable;
+
+import javax.naming.InitialContext;
+
+/**
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: $</tt>10 Oct 2007
+ *
+ * $Id: $
+ *
+ */
+public abstract class JNDIFactorySupport
+{
+ protected Hashtable jndiProperties;
+
+ protected String lookup;
+
+ protected JNDIFactorySupport(Hashtable jndiProperties, String lookup)
+ {
+ this.jndiProperties = jndiProperties;
+
+ this.lookup = lookup;
+ }
+
+ protected Object createObject() throws Exception
+ {
+ InitialContext ic = null;
+
+ Object obj = null;
+
+ try
+ {
+ if (jndiProperties == null)
+ {
+ ic = new InitialContext();
+ }
+ else
+ {
+ ic = new InitialContext(jndiProperties);
+ }
+
+ obj = ic.lookup(lookup);
+ }
+ finally
+ {
+ if (ic != null)
+ {
+ ic.close();
+ }
+ }
+ return obj;
+ }
+}
Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/bridge/BridgeMBeanTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/bridge/BridgeMBeanTest.java 2008-06-25 12:28:42 UTC (rev 4575)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/bridge/BridgeMBeanTest.java 2008-06-25 13:00:39 UTC (rev 4576)
@@ -37,6 +37,7 @@
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.jms.bridge.Bridge;
+import org.jboss.messaging.jms.bridge.QualityOfServiceMode;
import org.jboss.test.messaging.tools.ServerManagement;
/**
@@ -103,7 +104,7 @@
on = deployBridge(0, "Bridge1", sourceProviderLoader, targetProviderLoader,
"/queue/sourceQueue", "/queue/targetQueue",
null, null, null, null,
- Bridge.QOS_AT_MOST_ONCE, null, 1,
+ QualityOfServiceMode.AT_MOST_ONCE, null, 1,
-1, null, null, 5000, -1, false);
log.info("Deployed bridge");
@@ -267,7 +268,7 @@
on = deployBridge(0, "Bridge2", sourceProviderLoader, targetProviderLoader,
"/queue/sourceQueue", "/queue/targetQueue",
null, null, null, null,
- Bridge.QOS_ONCE_AND_ONLY_ONCE, null, 1,
+ QualityOfServiceMode.ONCE_AND_ONLY_ONCE, null, 1,
-1, null, null, 5000, -1, false);
log.trace("Constructed bridge");
@@ -351,11 +352,11 @@
{
Integer qos = (Integer)ServerManagement.getAttribute(on, "QualityOfServiceMode");
- assertEquals(Bridge.QOS_ONCE_AND_ONLY_ONCE, qos.intValue());
- ServerManagement.setAttribute(on, "QualityOfServiceMode", String.valueOf(Bridge.QOS_AT_MOST_ONCE));
+ assertEquals(QualityOfServiceMode.ONCE_AND_ONLY_ONCE.intValue(), qos.intValue());
+ ServerManagement.setAttribute(on, "QualityOfServiceMode", String.valueOf(QualityOfServiceMode.AT_MOST_ONCE.intValue()));
qos = (Integer)ServerManagement.getAttribute(on, "QualityOfServiceMode");
- assertEquals(new Integer(Bridge.QOS_AT_MOST_ONCE), qos);
- ServerManagement.setAttribute(on, "QualityOfServiceMode", String.valueOf(Bridge.QOS_ONCE_AND_ONLY_ONCE));
+ assertEquals(new Integer(QualityOfServiceMode.AT_MOST_ONCE.intValue()), qos);
+ ServerManagement.setAttribute(on, "QualityOfServiceMode", String.valueOf(QualityOfServiceMode.ONCE_AND_ONLY_ONCE.intValue()));
}
{
@@ -492,10 +493,10 @@
{
Integer qos = (Integer)ServerManagement.getAttribute(on, "QualityOfServiceMode");
- assertEquals(Bridge.QOS_ONCE_AND_ONLY_ONCE, qos.intValue());
- ServerManagement.setAttribute(on, "QualityOfServiceMode", String.valueOf(Bridge.QOS_AT_MOST_ONCE));
+ assertEquals(QualityOfServiceMode.ONCE_AND_ONLY_ONCE.intValue(), qos.intValue());
+ ServerManagement.setAttribute(on, "QualityOfServiceMode", String.valueOf(QualityOfServiceMode.AT_MOST_ONCE.intValue()));
qos = (Integer)ServerManagement.getAttribute(on, "QualityOfServiceMode");
- assertEquals(new Integer(Bridge.QOS_ONCE_AND_ONLY_ONCE), qos);
+ assertEquals(new Integer(QualityOfServiceMode.ONCE_AND_ONLY_ONCE.intValue()), qos);
}
{
@@ -593,7 +594,7 @@
String sourceDestLookup, String targetDestLookup,
String sourceUsername, String sourcePassword,
String targetUsername, String targetPassword,
- int qos, String selector, int maxBatchSize,
+ QualityOfServiceMode qos, String selector, int maxBatchSize,
long maxBatchTime, String subName, String clientID,
long failureRetryInterval, int maxRetries, boolean addMessageIDInHeader) throws Exception
{
@@ -621,7 +622,7 @@
{
config += "<attribute name=\"TargetPassword\">" + targetPassword + "</attribute>";
}
- config += "<attribute name=\"QualityOfServiceMode\">" + qos +"</attribute>";
+ config += "<attribute name=\"QualityOfServiceMode\">" + qos.intValue() +"</attribute>";
if (selector != null)
{
config += "<attribute name=\"Selector\">" + selector + "</attribute>";
Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/bridge/BridgeTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/bridge/BridgeTest.java 2008-06-25 12:28:42 UTC (rev 4575)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/bridge/BridgeTest.java 2008-06-25 13:00:39 UTC (rev 4576)
@@ -37,6 +37,8 @@
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.jms.bridge.Bridge;
+import org.jboss.messaging.jms.bridge.QualityOfServiceMode;
+import org.jboss.messaging.jms.bridge.impl.BridgeImpl;
import org.jboss.messaging.jms.client.JBossMessage;
import org.jboss.tm.TransactionManagerLocator;
@@ -62,32 +64,32 @@
public void testNoMaxBatchTime_AtMostOnce_P() throws Exception
{
- testNoMaxBatchTime(Bridge.QOS_AT_MOST_ONCE, true);
+ testNoMaxBatchTime(QualityOfServiceMode.AT_MOST_ONCE, true);
}
public void testNoMaxBatchTime_DuplicatesOk_P() throws Exception
{
- testNoMaxBatchTime(Bridge.QOS_DUPLICATES_OK, true);
+ testNoMaxBatchTime(QualityOfServiceMode.DUPLICATES_OK, true);
}
public void testNoMaxBatchTime_OnceAndOnlyOnce_P() throws Exception
{
- testNoMaxBatchTime(Bridge.QOS_ONCE_AND_ONLY_ONCE, true);
+ testNoMaxBatchTime(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, true);
}
public void testNoMaxBatchTime_AtMostOnce_NP() throws Exception
{
- testNoMaxBatchTime(Bridge.QOS_AT_MOST_ONCE, false);
+ testNoMaxBatchTime(QualityOfServiceMode.AT_MOST_ONCE, false);
}
public void testNoMaxBatchTime_DuplicatesOk_NP() throws Exception
{
- testNoMaxBatchTime(Bridge.QOS_DUPLICATES_OK, false);
+ testNoMaxBatchTime(QualityOfServiceMode.DUPLICATES_OK, false);
}
public void testNoMaxBatchTime_OnceAndOnlyOnce_NP() throws Exception
{
- testNoMaxBatchTime(Bridge.QOS_ONCE_AND_ONLY_ONCE, false);
+ testNoMaxBatchTime(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, false);
}
//Same server
@@ -96,32 +98,32 @@
public void testNoMaxBatchTimeSameServer_AtMostOnce_P() throws Exception
{
- testNoMaxBatchTimeSameServer(Bridge.QOS_AT_MOST_ONCE, true);
+ testNoMaxBatchTimeSameServer(QualityOfServiceMode.AT_MOST_ONCE, true);
}
public void testNoMaxBatchTimeSameServer_DuplicatesOk_P() throws Exception
{
- testNoMaxBatchTimeSameServer(Bridge.QOS_DUPLICATES_OK, true);
+ testNoMaxBatchTimeSameServer(QualityOfServiceMode.DUPLICATES_OK, true);
}
public void testNoMaxBatchTimeSameServer_OnceAndOnlyOnce_P() throws Exception
{
- testNoMaxBatchTimeSameServer(Bridge.QOS_ONCE_AND_ONLY_ONCE, true);
+ testNoMaxBatchTimeSameServer(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, true);
}
public void testNoMaxBatchTimeSameServer_AtMostOnce_NP() throws Exception
{
- testNoMaxBatchTimeSameServer(Bridge.QOS_AT_MOST_ONCE, false);
+ testNoMaxBatchTimeSameServer(QualityOfServiceMode.AT_MOST_ONCE, false);
}
public void testNoMaxBatchTimeSameServer_DuplicatesOk_NP() throws Exception
{
- testNoMaxBatchTimeSameServer(Bridge.QOS_DUPLICATES_OK, false);
+ testNoMaxBatchTimeSameServer(QualityOfServiceMode.DUPLICATES_OK, false);
}
public void testNoMaxBatchTimeSameServer_OnceAndOnlyOnce_NP() throws Exception
{
- testNoMaxBatchTimeSameServer(Bridge.QOS_ONCE_AND_ONLY_ONCE, false);
+ testNoMaxBatchTimeSameServer(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, false);
}
@@ -129,32 +131,32 @@
public void testMaxBatchTime_AtMostOnce_P() throws Exception
{
- this.testMaxBatchTime(Bridge.QOS_AT_MOST_ONCE, true);
+ this.testMaxBatchTime(QualityOfServiceMode.AT_MOST_ONCE, true);
}
public void testMaxBatchTime_DuplicatesOk_P() throws Exception
{
- this.testMaxBatchTime(Bridge.QOS_DUPLICATES_OK, true);
+ this.testMaxBatchTime(QualityOfServiceMode.DUPLICATES_OK, true);
}
public void testMaxBatchTime_OnceAndOnlyOnce_P() throws Exception
{
- testMaxBatchTime(Bridge.QOS_ONCE_AND_ONLY_ONCE, true);
+ testMaxBatchTime(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, true);
}
public void testMaxBatchTime_AtMostOnce_NP() throws Exception
{
- this.testMaxBatchTime(Bridge.QOS_AT_MOST_ONCE, false);
+ this.testMaxBatchTime(QualityOfServiceMode.AT_MOST_ONCE, false);
}
public void testMaxBatchTime_DuplicatesOk_NP() throws Exception
{
- this.testMaxBatchTime(Bridge.QOS_DUPLICATES_OK, false);
+ this.testMaxBatchTime(QualityOfServiceMode.DUPLICATES_OK, false);
}
public void testMaxBatchTime_OnceAndOnlyOnce_NP() throws Exception
{
- testMaxBatchTime(Bridge.QOS_ONCE_AND_ONLY_ONCE, false);
+ testMaxBatchTime(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, false);
}
// Same server
@@ -163,108 +165,108 @@
public void testMaxBatchTimeSameServer_AtMostOnce_P() throws Exception
{
- this.testMaxBatchTimeSameServer(Bridge.QOS_AT_MOST_ONCE, true);
+ this.testMaxBatchTimeSameServer(QualityOfServiceMode.AT_MOST_ONCE, true);
}
public void testMaxBatchTimeSameServer_DuplicatesOk_P() throws Exception
{
- this.testMaxBatchTimeSameServer(Bridge.QOS_DUPLICATES_OK, true);
+ this.testMaxBatchTimeSameServer(QualityOfServiceMode.DUPLICATES_OK, true);
}
public void testMaxBatchTimeSameServer_OnceAndOnlyOnce_P() throws Exception
{
- testMaxBatchTimeSameServer(Bridge.QOS_ONCE_AND_ONLY_ONCE, true);
+ testMaxBatchTimeSameServer(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, true);
}
public void testMaxBatchTimeSameServer_AtMostOnce_NP() throws Exception
{
- this.testMaxBatchTimeSameServer(Bridge.QOS_AT_MOST_ONCE, false);
+ this.testMaxBatchTimeSameServer(QualityOfServiceMode.AT_MOST_ONCE, false);
}
public void testMaxBatchTimeSameServer_DuplicatesOk_NP() throws Exception
{
- this.testMaxBatchTimeSameServer(Bridge.QOS_DUPLICATES_OK, false);
+ this.testMaxBatchTimeSameServer(QualityOfServiceMode.DUPLICATES_OK, false);
}
public void testMaxBatchTimeSameServer_OnceAndOnlyOnce_NP() throws Exception
{
- testMaxBatchTimeSameServer(Bridge.QOS_ONCE_AND_ONLY_ONCE, false);
+ testMaxBatchTimeSameServer(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, false);
}
// Stress with batch size of 50
public void testStress_AtMostOnce_P_50() throws Exception
{
- testStress(Bridge.QOS_AT_MOST_ONCE, true, 50);
+ testStress(QualityOfServiceMode.AT_MOST_ONCE, true, 50);
}
public void testStress_DuplicatesOk_P_50() throws Exception
{
- testStress(Bridge.QOS_DUPLICATES_OK, true, 50);
+ testStress(QualityOfServiceMode.DUPLICATES_OK, true, 50);
}
public void testStress_OnceAndOnlyOnce_P_50() throws Exception
{
- testStress(Bridge.QOS_ONCE_AND_ONLY_ONCE, true, 50);
+ testStress(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, true, 50);
}
public void testStress_AtMostOnce_NP_50() throws Exception
{
- testStress(Bridge.QOS_AT_MOST_ONCE, false, 50);
+ testStress(QualityOfServiceMode.AT_MOST_ONCE, false, 50);
}
public void testStress_DuplicatesOk_NP_50() throws Exception
{
- testStress(Bridge.QOS_DUPLICATES_OK, false, 50);
+ testStress(QualityOfServiceMode.DUPLICATES_OK, false, 50);
}
public void testStress_OnceAndOnlyOnce_NP_50() throws Exception
{
- testStress(Bridge.QOS_ONCE_AND_ONLY_ONCE, false, 50);
+ testStress(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, false, 50);
}
// Stress with batch size of 1
public void testStress_AtMostOnce_P_1() throws Exception
{
- testStress(Bridge.QOS_AT_MOST_ONCE, true, 1);
+ testStress(QualityOfServiceMode.AT_MOST_ONCE, true, 1);
}
public void testStress_DuplicatesOk_P_1() throws Exception
{
- testStress(Bridge.QOS_DUPLICATES_OK, true, 1);
+ testStress(QualityOfServiceMode.DUPLICATES_OK, true, 1);
}
public void testStress_OnceAndOnlyOnce_P_1() throws Exception
{
- testStress(Bridge.QOS_ONCE_AND_ONLY_ONCE, true, 1);
+ testStress(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, true, 1);
}
public void testStress_AtMostOnce_NP_1() throws Exception
{
- testStress(Bridge.QOS_AT_MOST_ONCE, false, 1);
+ testStress(QualityOfServiceMode.AT_MOST_ONCE, false, 1);
}
public void testStress_DuplicatesOk_NP_1() throws Exception
{
- testStress(Bridge.QOS_DUPLICATES_OK, false, 1);
+ testStress(QualityOfServiceMode.DUPLICATES_OK, false, 1);
}
public void testStress_OnceAndOnlyOnce_NP_1() throws Exception
{
- testStress(Bridge.QOS_ONCE_AND_ONLY_ONCE, false, 1);
+ testStress(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, false, 1);
}
// Max batch time
public void testStressMaxBatchTime_OnceAndOnlyOnce_NP() throws Exception
{
- this.testStressBatchTime(Bridge.QOS_ONCE_AND_ONLY_ONCE, false, 200);
+ this.testStressBatchTime(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, false, 200);
}
public void testStressMaxBatchTime_OnceAndOnlyOnce_P() throws Exception
{
- this.testStressBatchTime(Bridge.QOS_ONCE_AND_ONLY_ONCE, true, 200);
+ this.testStressBatchTime(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, true, 200);
}
@@ -274,73 +276,73 @@
public void testStressSameServer_AtMostOnce_P_50() throws Exception
{
- testStressSameServer(Bridge.QOS_AT_MOST_ONCE, true, 50);
+ testStressSameServer(QualityOfServiceMode.AT_MOST_ONCE, true, 50);
}
public void testStressSameServer_DuplicatesOk_P_50() throws Exception
{
- testStressSameServer(Bridge.QOS_DUPLICATES_OK, true, 50);
+ testStressSameServer(QualityOfServiceMode.DUPLICATES_OK, true, 50);
}
public void testStressSameServer_OnceAndOnlyOnce_P_50() throws Exception
{
- testStress(Bridge.QOS_ONCE_AND_ONLY_ONCE, true, 50);
+ testStress(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, true, 50);
}
public void testStressSameServer_AtMostOnce_NP_50() throws Exception
{
- testStressSameServer(Bridge.QOS_AT_MOST_ONCE, false, 50);
+ testStressSameServer(QualityOfServiceMode.AT_MOST_ONCE, false, 50);
}
public void testStressSameServer_DuplicatesOk_NP_50() throws Exception
{
- testStressSameServer(Bridge.QOS_DUPLICATES_OK, false, 50);
+ testStressSameServer(QualityOfServiceMode.DUPLICATES_OK, false, 50);
}
public void testStressSameServer_OnceAndOnlyOnce_NP_50() throws Exception
{
- testStressSameServer(Bridge.QOS_ONCE_AND_ONLY_ONCE, false, 50);
+ testStressSameServer(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, false, 50);
}
// Stress with batch size of 1
public void testStressSameServer_AtMostOnce_P_1() throws Exception
{
- testStressSameServer(Bridge.QOS_AT_MOST_ONCE, true, 1);
+ testStressSameServer(QualityOfServiceMode.AT_MOST_ONCE, true, 1);
}
public void testStressSameServer_DuplicatesOk_P_1() throws Exception
{
- testStressSameServer(Bridge.QOS_DUPLICATES_OK, true, 1);
+ testStressSameServer(QualityOfServiceMode.DUPLICATES_OK, true, 1);
}
public void testStressSameServer_OnceAndOnlyOnce_P_1() throws Exception
{
- testStressSameServer(Bridge.QOS_ONCE_AND_ONLY_ONCE, true, 1);
+ testStressSameServer(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, true, 1);
}
public void testStressSameServer_AtMostOnce_NP_1() throws Exception
{
- testStressSameServer(Bridge.QOS_AT_MOST_ONCE, false, 1);
+ testStressSameServer(QualityOfServiceMode.AT_MOST_ONCE, false, 1);
}
public void testStressSameServer_DuplicatesOk_NP_1() throws Exception
{
- testStressSameServer(Bridge.QOS_DUPLICATES_OK, false, 1);
+ testStressSameServer(QualityOfServiceMode.DUPLICATES_OK, false, 1);
}
public void testStressSameServer_OnceAndOnlyOnce_NP_1() throws Exception
{
- testStressSameServer(Bridge.QOS_ONCE_AND_ONLY_ONCE, false, 1);
+ testStressSameServer(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, false, 1);
}
public void testParams() throws Exception
{
- Bridge bridge = null;
+ BridgeImpl bridge = null;
try
{
- int qosMode = Bridge.QOS_AT_MOST_ONCE;
+ QualityOfServiceMode qosMode = QualityOfServiceMode.AT_MOST_ONCE;
int batchSize = 10;
@@ -366,7 +368,7 @@
try
{
- bridge= new Bridge(null, cff1, sourceQueueFactory, targetQueueFactory,
+ bridge= new BridgeImpl(null, cff1, sourceQueueFactory, targetQueueFactory,
sourceUsername, sourcePassword, destUsername, destPassword,
selector, failureRetryInterval, maxRetries, qosMode,
batchSize, maxBatchTime,
@@ -379,7 +381,7 @@
try
{
- bridge= new Bridge(cff0, null, sourceQueueFactory, targetQueueFactory,
+ bridge= new BridgeImpl(cff0, null, sourceQueueFactory, targetQueueFactory,
sourceUsername, sourcePassword, destUsername, destPassword,
selector, failureRetryInterval, maxRetries, qosMode,
batchSize, maxBatchTime,
@@ -392,7 +394,7 @@
try
{
- bridge= new Bridge(cff0, cff1, null, targetQueueFactory,
+ bridge= new BridgeImpl(cff0, cff1, null, targetQueueFactory,
sourceUsername, sourcePassword, destUsername, destPassword,
selector, failureRetryInterval, maxRetries, qosMode,
batchSize, maxBatchTime,
@@ -405,7 +407,7 @@
try
{
- bridge= new Bridge(cff0, cff1, sourceQueueFactory, null,
+ bridge= new BridgeImpl(cff0, cff1, sourceQueueFactory, null,
sourceUsername, sourcePassword, destUsername, destPassword,
selector, failureRetryInterval, maxRetries, qosMode,
batchSize, maxBatchTime,
@@ -418,7 +420,7 @@
try
{
- bridge= new Bridge(cff0, cff1, sourceQueueFactory, targetQueueFactory,
+ bridge= new BridgeImpl(cff0, cff1, sourceQueueFactory, targetQueueFactory,
sourceUsername, sourcePassword, destUsername, destPassword,
selector, -2, maxRetries, qosMode,
batchSize, maxBatchTime,
@@ -431,7 +433,7 @@
try
{
- bridge= new Bridge(cff0, cff1, sourceQueueFactory, targetQueueFactory,
+ bridge= new BridgeImpl(cff0, cff1, sourceQueueFactory, targetQueueFactory,
sourceUsername, sourcePassword, destUsername, destPassword,
selector, -1, 10, qosMode,
batchSize, maxBatchTime,
@@ -444,35 +446,9 @@
try
{
- bridge= new Bridge(cff0, cff1, sourceQueueFactory, null,
+ bridge= new BridgeImpl(cff0, cff1, sourceQueueFactory, null,
sourceUsername, sourcePassword, destUsername, destPassword,
- selector, failureRetryInterval, maxRetries, -2,
- batchSize, maxBatchTime,
- subName, clientID, false);
- }
- catch (IllegalArgumentException e)
- {
- //Ok
- }
-
- try
- {
- bridge= new Bridge(cff0, cff1, sourceQueueFactory, null,
- sourceUsername, sourcePassword, destUsername, destPassword,
- selector, failureRetryInterval, maxRetries, 3,
- batchSize, maxBatchTime,
- subName, clientID, false);
- }
- catch (IllegalArgumentException e)
- {
- //Ok
- }
-
- try
- {
- bridge= new Bridge(cff0, cff1, sourceQueueFactory, null,
- sourceUsername, sourcePassword, destUsername, destPassword,
- selector, failureRetryInterval, maxRetries, 3,
+ selector, failureRetryInterval, maxRetries, qosMode,
0, maxBatchTime,
subName, clientID, false);
}
@@ -483,9 +459,9 @@
try
{
- bridge= new Bridge(cff0, cff1, sourceQueueFactory, null,
+ bridge= new BridgeImpl(cff0, cff1, sourceQueueFactory, null,
sourceUsername, sourcePassword, destUsername, destPassword,
- selector, failureRetryInterval, maxRetries, 3,
+ selector, failureRetryInterval, maxRetries, qosMode,
batchSize, -2,
subName, clientID, false);
}
@@ -505,7 +481,7 @@
public void testSelector() throws Exception
{
- Bridge bridge = null;
+ BridgeImpl bridge = null;
Connection connSource = null;
@@ -517,9 +493,9 @@
String selector = "vegetable='radish'";
- bridge = new Bridge(cff0, cff1, sourceQueueFactory, targetQueueFactory,
+ bridge = new BridgeImpl(cff0, cff1, sourceQueueFactory, targetQueueFactory,
null, null, null, null,
- selector, 5000, 10, Bridge.QOS_AT_MOST_ONCE,
+ selector, 5000, 10, QualityOfServiceMode.AT_MOST_ONCE,
1, -1,
null, null, false);
@@ -592,7 +568,7 @@
public void testStartBridgeWithJTATransactionAlreadyRunning() throws Exception
{
- Bridge bridge = null;
+ BridgeImpl bridge = null;
Transaction toResume = null;
@@ -611,9 +587,9 @@
final int NUM_MESSAGES = 10;
- bridge = new Bridge(cff0, cff1, sourceTopicFactory, targetQueueFactory,
+ bridge = new BridgeImpl(cff0, cff1, sourceTopicFactory, targetQueueFactory,
null, null, null, null,
- null, 5000, 10, Bridge.QOS_AT_MOST_ONCE,
+ null, 5000, 10, QualityOfServiceMode.AT_MOST_ONCE,
1, -1,
null, null, false);
@@ -657,15 +633,15 @@
public void testNonDurableSubscriber() throws Exception
{
- Bridge bridge = null;
+ BridgeImpl bridge = null;
try
{
final int NUM_MESSAGES = 10;
- bridge = new Bridge(cff0, cff1, sourceTopicFactory, targetQueueFactory,
+ bridge = new BridgeImpl(cff0, cff1, sourceTopicFactory, targetQueueFactory,
null, null, null, null,
- null, 5000, 10, Bridge.QOS_AT_MOST_ONCE,
+ null, 5000, 10, QualityOfServiceMode.AT_MOST_ONCE,
1, -1,
null, null, false);
@@ -686,15 +662,15 @@
public void testDurableSubscriber() throws Exception
{
- Bridge bridge = null;
+ BridgeImpl bridge = null;
try
{
final int NUM_MESSAGES = 10;
- bridge = new Bridge(cff0, cff1, sourceTopicFactory, targetQueueFactory,
+ bridge = new BridgeImpl(cff0, cff1, sourceTopicFactory, targetQueueFactory,
null, null, null, null,
- null, 5000, 10, Bridge.QOS_AT_MOST_ONCE,
+ null, 5000, 10, QualityOfServiceMode.AT_MOST_ONCE,
1, -1,
"subTest", "clientid123", false);
@@ -732,7 +708,7 @@
private void messageIDInHeader(boolean on) throws Exception
{
- Bridge bridge = null;
+ BridgeImpl bridge = null;
Connection connSource = null;
@@ -742,9 +718,9 @@
{
final int NUM_MESSAGES = 10;
- bridge = new Bridge(cff0, cff1, sourceQueueFactory, targetQueueFactory,
+ bridge = new BridgeImpl(cff0, cff1, sourceQueueFactory, targetQueueFactory,
null, null, null, null,
- null, 5000, 10, Bridge.QOS_AT_MOST_ONCE,
+ null, 5000, 10, QualityOfServiceMode.AT_MOST_ONCE,
1, -1,
null, null, on);
@@ -903,7 +879,7 @@
private void propertiesPreserved(boolean persistent, boolean messageIDInHeader) throws Exception
{
- Bridge bridge = null;
+ BridgeImpl bridge = null;
Connection connSource = null;
@@ -913,9 +889,9 @@
{
final int NUM_MESSAGES = 10;
- bridge = new Bridge(cff0, cff1, sourceQueueFactory, targetQueueFactory,
+ bridge = new BridgeImpl(cff0, cff1, sourceQueueFactory, targetQueueFactory,
null, null, null, null,
- null, 5000, 10, Bridge.QOS_AT_MOST_ONCE,
+ null, 5000, 10, QualityOfServiceMode.AT_MOST_ONCE,
1, -1,
null, null, messageIDInHeader);
@@ -1064,7 +1040,7 @@
public void testNoMessageIDInHeader() throws Exception
{
- Bridge bridge = null;
+ BridgeImpl bridge = null;
Connection connSource = null;
@@ -1074,9 +1050,9 @@
{
final int NUM_MESSAGES = 10;
- bridge = new Bridge(cff0, cff1, sourceQueueFactory, targetQueueFactory,
+ bridge = new BridgeImpl(cff0, cff1, sourceQueueFactory, targetQueueFactory,
null, null, null, null,
- null, 5000, 10, Bridge.QOS_AT_MOST_ONCE,
+ null, 5000, 10, QualityOfServiceMode.AT_MOST_ONCE,
1, -1,
null, null, false);
@@ -1151,17 +1127,17 @@
// Private -------------------------------------------------------------------------------
- private void testStress(int qosMode, boolean persistent, int batchSize) throws Exception
+ private void testStress(QualityOfServiceMode qosMode, boolean persistent, int batchSize) throws Exception
{
Connection connSource = null;
- Bridge bridge = null;
+ BridgeImpl bridge = null;
Thread t = null;
try
{
- bridge = new Bridge(cff0, cff1, sourceQueueFactory, targetQueueFactory,
+ bridge = new BridgeImpl(cff0, cff1, sourceQueueFactory, targetQueueFactory,
null, null, null, null,
null, 5000, 10, qosMode,
batchSize, -1,
@@ -1224,17 +1200,17 @@
}
}
- private void testStressBatchTime(int qosMode, boolean persistent, int maxBatchTime) throws Exception
+ private void testStressBatchTime(QualityOfServiceMode qosMode, boolean persistent, int maxBatchTime) throws Exception
{
Connection connSource = null;
- Bridge bridge = null;
+ BridgeImpl bridge = null;
Thread t = null;
try
{
- bridge = new Bridge(cff0, cff1, sourceQueueFactory, targetQueueFactory,
+ bridge = new BridgeImpl(cff0, cff1, sourceQueueFactory, targetQueueFactory,
null, null, null, null,
null, 5000, 10, qosMode,
2, maxBatchTime,
@@ -1298,17 +1274,17 @@
}
//Both source and destination on same rm
- private void testStressSameServer(int qosMode, boolean persistent, int batchSize) throws Exception
+ private void testStressSameServer(QualityOfServiceMode qosMode, boolean persistent, int batchSize) throws Exception
{
Connection connSource = null;
- Bridge bridge = null;
+ BridgeImpl bridge = null;
Thread t = null;
try
{
- bridge = new Bridge(cff0, cff0, sourceQueueFactory, localTargetQueueFactory,
+ bridge = new BridgeImpl(cff0, cff0, sourceQueueFactory, localTargetQueueFactory,
null, null, null, null,
null, 5000, 10, qosMode,
batchSize, -1,
@@ -1372,15 +1348,15 @@
}
- private void testNoMaxBatchTime(int qosMode, boolean persistent) throws Exception
+ private void testNoMaxBatchTime(QualityOfServiceMode qosMode, boolean persistent) throws Exception
{
- Bridge bridge = null;
+ BridgeImpl bridge = null;
try
{
final int NUM_MESSAGES = 10;
- bridge = new Bridge(cff0, cff1, sourceQueueFactory, targetQueueFactory,
+ bridge = new BridgeImpl(cff0, cff1, sourceQueueFactory, targetQueueFactory,
null, null, null, null,
null, 5000, 10, qosMode,
NUM_MESSAGES, -1,
@@ -1430,15 +1406,15 @@
}
}
- private void testNoMaxBatchTimeSameServer(int qosMode, boolean persistent) throws Exception
+ private void testNoMaxBatchTimeSameServer(QualityOfServiceMode qosMode, boolean persistent) throws Exception
{
- Bridge bridge = null;
+ BridgeImpl bridge = null;
try
{
final int NUM_MESSAGES = 10;
- bridge = new Bridge(cff0, cff0, sourceQueueFactory, localTargetQueueFactory,
+ bridge = new BridgeImpl(cff0, cff0, sourceQueueFactory, localTargetQueueFactory,
null, null, null, null,
null, 5000, 10, qosMode,
NUM_MESSAGES, -1,
@@ -1488,9 +1464,9 @@
}
}
- private void testMaxBatchTime(int qosMode, boolean persistent) throws Exception
+ private void testMaxBatchTime(QualityOfServiceMode qosMode, boolean persistent) throws Exception
{
- Bridge bridge = null;
+ BridgeImpl bridge = null;
try
{
@@ -1498,7 +1474,7 @@
final int MAX_BATCH_SIZE = 100000; // something big so it won't reach it
- bridge = new Bridge(cff0, cff1, sourceQueueFactory, targetQueueFactory,
+ bridge = new BridgeImpl(cff0, cff1, sourceQueueFactory, targetQueueFactory,
null, null, null, null,
null, 3000, 10, qosMode,
MAX_BATCH_SIZE, MAX_BATCH_TIME,
@@ -1529,9 +1505,9 @@
}
}
- private void testMaxBatchTimeSameServer(int qosMode, boolean persistent) throws Exception
+ private void testMaxBatchTimeSameServer(QualityOfServiceMode qosMode, boolean persistent) throws Exception
{
- Bridge bridge = null;
+ BridgeImpl bridge = null;
try
{
@@ -1539,7 +1515,7 @@
final int MAX_BATCH_SIZE = 100000; // something big so it won't reach it
- bridge = new Bridge(cff0, cff0, sourceQueueFactory, localTargetQueueFactory,
+ bridge = new BridgeImpl(cff0, cff0, sourceQueueFactory, localTargetQueueFactory,
null, null, null, null,
null, 3000, 10, qosMode,
MAX_BATCH_SIZE, MAX_BATCH_TIME,
Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/bridge/BridgeTestBase.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/bridge/BridgeTestBase.java 2008-06-25 12:28:42 UTC (rev 4575)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/bridge/BridgeTestBase.java 2008-06-25 13:00:39 UTC (rev 4576)
@@ -38,11 +38,12 @@
import javax.naming.InitialContext;
import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.jms.bridge.Bridge;
import org.jboss.messaging.jms.bridge.ConnectionFactoryFactory;
import org.jboss.messaging.jms.bridge.DestinationFactory;
-import org.jboss.messaging.jms.bridge.JNDIConnectionFactoryFactory;
-import org.jboss.messaging.jms.bridge.JNDIDestinationFactory;
+import org.jboss.messaging.jms.bridge.Bridge;
+import org.jboss.messaging.jms.bridge.QualityOfServiceMode;
+import org.jboss.messaging.jms.bridge.impl.JNDIConnectionFactoryFactory;
+import org.jboss.messaging.jms.bridge.impl.JNDIDestinationFactory;
import org.jboss.test.messaging.JBMServerTestCase;
import org.jboss.test.messaging.tools.ServerManagement;
import org.jboss.test.messaging.tools.container.ServiceContainer;
@@ -208,7 +209,7 @@
}
- protected void checkMessagesReceived(ConnectionFactory cf, Destination dest, int qosMode,
+ protected void checkMessagesReceived(ConnectionFactory cf, Destination dest, QualityOfServiceMode qosMode,
int numMessages, boolean longWaitForFirst) throws Exception
{
Connection conn = null;
@@ -248,7 +249,7 @@
}
- if (qosMode == Bridge.QOS_ONCE_AND_ONLY_ONCE || qosMode == Bridge.QOS_DUPLICATES_OK)
+ if (qosMode == QualityOfServiceMode.ONCE_AND_ONLY_ONCE || qosMode == QualityOfServiceMode.DUPLICATES_OK)
{
//All the messages should be received
@@ -258,12 +259,12 @@
}
//Should be no more
- if (qosMode == Bridge.QOS_ONCE_AND_ONLY_ONCE)
+ if (qosMode == QualityOfServiceMode.ONCE_AND_ONLY_ONCE)
{
assertEquals(numMessages, msgs.size());
}
}
- else if (qosMode == Bridge.QOS_AT_MOST_ONCE)
+ else if (qosMode == QualityOfServiceMode.AT_MOST_ONCE)
{
//No *guarantee* that any messages will be received
//but you still might get some depending on how/where the crash occurred
Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/bridge/ReconnectTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/bridge/ReconnectTest.java 2008-06-25 12:28:42 UTC (rev 4575)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/bridge/ReconnectTest.java 2008-06-25 13:00:39 UTC (rev 4576)
@@ -23,6 +23,8 @@
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.jms.bridge.Bridge;
+import org.jboss.messaging.jms.bridge.QualityOfServiceMode;
+import org.jboss.messaging.jms.bridge.impl.BridgeImpl;
import org.jboss.test.messaging.tools.ServerManagement;
/**
@@ -47,36 +49,36 @@
public void testCrashAndReconnectDestBasic_OnceAndOnlyOnce_P() throws Exception
{
- testCrashAndReconnectDestBasic(Bridge.QOS_ONCE_AND_ONLY_ONCE, true);
+ testCrashAndReconnectDestBasic(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, true);
}
public void testCrashAndReconnectDestBasic_OnceAndOnlyOnce_NP() throws Exception
{
- testCrashAndReconnectDestBasic(Bridge.QOS_ONCE_AND_ONLY_ONCE, false);
+ testCrashAndReconnectDestBasic(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, false);
}
// dups ok
public void testCrashAndReconnectDestBasic_DuplicatesOk_P() throws Exception
{
- testCrashAndReconnectDestBasic(Bridge.QOS_DUPLICATES_OK, true);
+ testCrashAndReconnectDestBasic(QualityOfServiceMode.DUPLICATES_OK, true);
}
public void testCrashAndReconnectDestBasic_DuplicatesOk_NP() throws Exception
{
- testCrashAndReconnectDestBasic(Bridge.QOS_DUPLICATES_OK, false);
+ testCrashAndReconnectDestBasic(QualityOfServiceMode.DUPLICATES_OK, false);
}
// At most once
public void testCrashAndReconnectDestBasic_AtMostOnce_P() throws Exception
{
- testCrashAndReconnectDestBasic(Bridge.QOS_AT_MOST_ONCE, true);
+ testCrashAndReconnectDestBasic(QualityOfServiceMode.AT_MOST_ONCE, true);
}
public void testCrashAndReconnectDestBasic_AtMostOnce_NP() throws Exception
{
- testCrashAndReconnectDestBasic(Bridge.QOS_AT_MOST_ONCE, false);
+ testCrashAndReconnectDestBasic(QualityOfServiceMode.AT_MOST_ONCE, false);
}
// Crash tests specific to XA transactions
@@ -97,9 +99,9 @@
{
ServerManagement.kill(1);
- Bridge bridge = new Bridge(cff0, cff1, sourceQueueFactory, targetQueueFactory,
+ BridgeImpl bridge = new BridgeImpl(cff0, cff1, sourceQueueFactory, targetQueueFactory,
null, null, null, null,
- null, 1000, -1, Bridge.QOS_DUPLICATES_OK,
+ null, 1000, -1, QualityOfServiceMode.DUPLICATES_OK,
10, -1,
null, null, false);
@@ -138,13 +140,13 @@
* Send some more messages
* Verify all messages are received
*/
- private void testCrashAndReconnectDestBasic(int qosMode, boolean persistent) throws Exception
+ private void testCrashAndReconnectDestBasic(QualityOfServiceMode qosMode, boolean persistent) throws Exception
{
- Bridge bridge = null;
+ BridgeImpl bridge = null;
try
{
- bridge = new Bridge(cff0, cff1, sourceQueueFactory, targetQueueFactory,
+ bridge = new BridgeImpl(cff0, cff1, sourceQueueFactory, targetQueueFactory,
null, null, null, null,
null, 1000, -1, qosMode,
10, -1,
@@ -220,13 +222,13 @@
*/
private void testCrashAndReconnectDestCrashBeforePrepare(boolean persistent) throws Exception
{
- Bridge bridge = null;
+ BridgeImpl bridge = null;
try
{
- bridge = new Bridge(cff0, cff1, sourceQueueFactory, targetQueueFactory,
+ bridge = new BridgeImpl(cff0, cff1, sourceQueueFactory, targetQueueFactory,
null, null, null, null,
- null, 1000, -1, Bridge.QOS_ONCE_AND_ONLY_ONCE,
+ null, 1000, -1, QualityOfServiceMode.ONCE_AND_ONLY_ONCE,
10, 5000,
null, null, false);
@@ -262,7 +264,7 @@
sendMessages(cf0, sourceQueue, NUM_MESSAGES / 2, NUM_MESSAGES / 2, persistent);
- checkMessagesReceived(cf1, targetQueue, Bridge.QOS_ONCE_AND_ONLY_ONCE, NUM_MESSAGES, false);
+ checkMessagesReceived(cf1, targetQueue, QualityOfServiceMode.ONCE_AND_ONLY_ONCE, NUM_MESSAGES, false);
}
finally
{
More information about the jboss-cvs-commits
mailing list