[jboss-cvs] JBoss Messaging SVN: r4576 - in trunk: src/main/org/jboss/messaging/jms/bridge/impl and 1 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Jun 25 09:00:39 EDT 2008


Author: jmesnil
Date: 2008-06-25 09:00:39 -0400 (Wed, 25 Jun 2008)
New Revision: 4576

Added:
   trunk/src/main/org/jboss/messaging/jms/bridge/impl/
   trunk/src/main/org/jboss/messaging/jms/bridge/impl/BridgeImpl.java
   trunk/src/main/org/jboss/messaging/jms/bridge/impl/JNDIConnectionFactoryFactory.java
   trunk/src/main/org/jboss/messaging/jms/bridge/impl/JNDIDestinationFactory.java
   trunk/src/main/org/jboss/messaging/jms/bridge/impl/JNDIFactorySupport.java
Removed:
   trunk/src/main/org/jboss/messaging/jms/bridge/JNDIConnectionFactoryFactory.java
   trunk/src/main/org/jboss/messaging/jms/bridge/JNDIDestinationFactory.java
   trunk/src/main/org/jboss/messaging/jms/bridge/JNDIFactorySupport.java
Modified:
   trunk/src/main/org/jboss/messaging/jms/bridge/Bridge.java
   trunk/src/main/org/jboss/messaging/jms/bridge/BridgeService.java
   trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/bridge/BridgeMBeanTest.java
   trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/bridge/BridgeTest.java
   trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/bridge/BridgeTestBase.java
   trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/bridge/ReconnectTest.java
Log:
renamed Bridge class into o.j.message.jms.bridge.impl.BridgeImpl & added Bridge interface
added unit tests + refactoring of BridgeImpl (WIP)

Modified: trunk/src/main/org/jboss/messaging/jms/bridge/Bridge.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/bridge/Bridge.java	2008-06-25 12:28:42 UTC (rev 4575)
+++ trunk/src/main/org/jboss/messaging/jms/bridge/Bridge.java	2008-06-25 13:00:39 UTC (rev 4576)
@@ -1,6 +1,6 @@
 /*
  * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * Copyright 2008, Red Hat Middleware LLC, and individual contributors
  * by the @authors tag. See the copyright.txt in the distribution for a
  * full listing of individual contributors.
  *
@@ -18,1633 +18,94 @@
  * License along with this software; if not, write to the Free
  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */ 
+ */
 
 package org.jboss.messaging.jms.bridge;
 
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.Map;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.Topic;
-import javax.jms.XAConnection;
-import javax.jms.XAConnectionFactory;
-import javax.jms.XASession;
-import javax.transaction.Transaction;
 import javax.transaction.TransactionManager;
-import javax.transaction.xa.XAResource;
 
-import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.server.MessagingComponent;
-import org.jboss.messaging.jms.client.JBossMessage;
-import org.jboss.messaging.jms.client.JBossSession;
-import org.jboss.tm.TransactionManagerLocator;
 
 /**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
  * 
- * A Bridge
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @version <tt>$Revision$</tt>
- *
- * $Id$
- *
+ * 
  */
-public class Bridge implements MessagingComponent
+public interface Bridge extends MessagingComponent
 {
-   private static final Logger log;
-   
-   private static boolean trace;
-   
-   //Quality of service modes
-   
-   public static final int QOS_AT_MOST_ONCE = 0;
-   
-   public static final int QOS_DUPLICATES_OK = 1;
-   
-   public static final int QOS_ONCE_AND_ONLY_ONCE = 2;
-   
-   /*
-    * 
-    * Quality of service (QoS) levels
-    * ===============================
-    * 
-    * QOS_AT_MOST_ONCE
-    * ----------------
-    * 
-    * With this QoS mode messages will reach the destination from the source at most once.
-    * The messages are consumed from the source and acknowledged
-    * before sending to the destination. Therefore there is a possibility that if failure occurs between
-    * removing them from the source and them arriving at the destination they could be lost. Hence delivery
-    * will occur at most once.
-    * This mode is avilable for both persistent and non persistent messages.
-    * 
-    * QOS_DUPLICATES_OK
-    * -----------------
-    * 
-    * With this QoS mode, the messages are consumed from the source and then acknowledged
-    * after they have been successfully sent to the destination. Therefore there is a possibility that if
-    * failure occurs after sending to the destination but before acknowledging them, they could be sent again
-    * when the system recovers. I.e. the destination might receive duplicates after a failure.
-    * This mode is available for both persistent and non persistent messages.
-    * 
-    * QOS_ONCE_AND_ONLY_ONCE
-    * ----------------------
-    * 
-    * This QoS mode ensures messages will reach the destination from the source once and only once.
-    * (Sometimes this mode is known as "exactly once").
-    * If both the source and the destination are on the same JBoss Messaging server instance then this can 
-    * be achieved by sending and acknowledging the messages in the same local transaction.
-    * If the source and destination are on different servers this is achieved by enlisting the sending and consuming
-    * sessions in a JTA transaction. The JTA transaction is controlled by JBoss Transactions JTA implementation which
-    * is a fully recovering transaction manager, thus providing a very high degree of durability.
-    * If JTA is required then both supplied connection factories need to be XAConnectionFactory implementations.
-    * This mode is only available for persistent messages.
-    * This is likely to be the slowest mode since it requires extra persistence for the transaction logging.
-    * 
-    * Note:
-    * For a specific application it may possible to provide once and only once semantics without using the
-    * QOS_ONCE_AND_ONLY_ONCE QoS level. This can be done by using the QOS_DUPLICATES_OK mode and then checking for
-    * duplicates at the destination and discarding them. Some JMS servers provide automatic duplicate message detection
-    * functionality, or this may be possible to implement on the application level by maintaining a cache of received
-    * message ids on disk and comparing received messages to them. The cache would only be valid
-    * for a certain period of time so this approach is not as watertight as using QOS_ONCE_AND_ONLY_ONCE but
-    * may be a good choice depending on your specific application.
-    * 
-    * 
-    */
-   
-   static
-   {
-      log = Logger.getLogger(Bridge.class);
-      
-      trace = log.isTraceEnabled();
-   }
+   void pause() throws Exception;
 
-   private String sourceUsername;
-   
-   private String sourcePassword;
-   
-   private String targetUsername;
-   
-   private String targetPassword;
-   
-   private TransactionManager tm;
-   
-   private String selector;
-   
-   private long failureRetryInterval;
-   
-   private int maxRetries;
-   
-   private int qualityOfServiceMode;
-   
-   private int maxBatchSize;
-   
-   private long maxBatchTime;
-   
-   private String subName;
-   
-   private String clientID;
-   
-   private volatile boolean addMessageIDInHeader;
-   
-      
-   
-   private boolean started;
-   
-   private LinkedList<Message> messages;
-   
-   private Object lock;
-   
-   private ConnectionFactoryFactory sourceCff;
-   
-   private ConnectionFactoryFactory targetCff;
-   
-   private DestinationFactory sourceDestinationFactory;
-   
-   private DestinationFactory targetDestinationFactory;
-   
-   private Connection sourceConn; 
-   
-   private Connection targetConn;
-   
-   private Destination sourceDestination;
-   
-   private Destination targetDestination;
-   
-   private Session sourceSession;
-   
-   private Session targetSession;
-   
-   private MessageConsumer consumer;
-   
-   private MessageProducer producer;
-        
-   private BatchTimeChecker timeChecker;
-   
-   private Thread checkerThread;
-   
-   private long batchExpiryTime;
-   
-   private boolean paused;         
-   
-   private Transaction tx;  
-   
-   private boolean failed;
-   
-   private int forwardMode;
-   
-   private static final int FORWARD_MODE_XA = 0;
-   
-   private static final int FORWARD_MODE_LOCALTX = 1;
-   
-   private static final int FORWARD_MODE_NONTX = 2;
-   
-   /*
-    * Constructor for MBean
-    */
-   public Bridge()
-   {      
-      this.messages = new LinkedList<Message>();      
-      
-      this.lock = new Object();      
-   }
-   
-   public Bridge(ConnectionFactoryFactory sourceCff, ConnectionFactoryFactory destCff,
-                 DestinationFactory sourceDestinationFactory, DestinationFactory targetDestinationFactory,         
-                 String sourceUsername, String sourcePassword,
-                 String targetUsername, String targetPassword,
-                 String selector, long failureRetryInterval,
-                 int maxRetries,
-                 int qosMode,
-                 int maxBatchSize, long maxBatchTime,
-                 String subName, String clientID,
-                 boolean addMessageIDInHeader)
-   {            
-      this();
-      
-      this.sourceCff = sourceCff;
-      
-      this.targetCff = destCff;
-      
-      this.sourceDestinationFactory = sourceDestinationFactory;
-      
-      this.targetDestinationFactory = targetDestinationFactory;
-      
-      this.sourceUsername = sourceUsername;
-      
-      this.sourcePassword = sourcePassword;
-      
-      this.targetUsername = targetUsername;
-      
-      this.targetPassword = targetPassword;
-      
-      this.selector = selector;
-      
-      this.failureRetryInterval = failureRetryInterval;
-      
-      this.maxRetries = maxRetries;
-      
-      this.qualityOfServiceMode = qosMode;
-      
-      this.maxBatchSize = maxBatchSize;
-      
-      this.maxBatchTime = maxBatchTime;
-      
-      this.subName = subName;
-      
-      this.clientID = clientID;
-      
-      this.addMessageIDInHeader = addMessageIDInHeader;
-              
-      if (trace)
-      {
-         log.trace("Created " + this);
-      }
-   }
-   
-      
-   // MessagingComponent overrides --------------------------------------------------
-        
-   public synchronized void start() throws Exception
-   {
-      if (started)
-      {
-         log.warn("Attempt to start, but is already started");
-         return;
-      }
-      
-      if (trace) { log.trace("Starting " + this); }         
-      
-      checkParams();
-      
-      TransactionManager tm = getTm();
-      
-      //There may already be a JTA transaction associated to the thread
-      
-      boolean ok;
-      
-      Transaction toResume = null;
-      try
-      {
-         toResume = tm.suspend();
-         
-         ok = setupJMSObjects();
-      }
-      finally
-      {
-         if (toResume != null)
-         {
-            tm.resume(toResume);
-         }
-      }
-      
-      if (ok)
-      {         
-         //start the source connection
-         
-         sourceConn.start();
-         
-         started = true;
-         
-         if (maxBatchTime != -1)
-         {
-            if (trace) { log.trace("Starting time checker thread"); }
-                     
-            timeChecker = new BatchTimeChecker();
-            
-            checkerThread = new Thread(timeChecker);
-            
-            batchExpiryTime = System.currentTimeMillis() + maxBatchTime;
-            
-            checkerThread.start();
-            
-            if (trace) { log.trace("Started time checker thread"); }
-         }            
-         
-         if (trace) { log.trace("Started " + this); }
-      }
-      else
-      {
-         log.warn("Failed to start bridge");
-         handleFailureOnStartup();
-      }
-   }
-   
-   public synchronized void stop() throws Exception
-   {
-      if (!started)
-      {
-         log.warn("Attempt to stop, but is already stopped");
-         return;
-      }
-      
-      if (trace) { log.trace("Stopping " + this); }
-      
-      synchronized (lock)
-      {
-         started = false;          
-         
-         //This must be inside sync block
-         if (checkerThread != null)
-         {
-            checkerThread.interrupt();
-         }
-      }
-            
-      //This must be outside sync block
-      if (checkerThread != null)
-      {  
-         if (trace) { log.trace("Waiting for checker thread to finish");}
-         
-         checkerThread.join();
-         
-         if (trace) { log.trace("Checker thread has finished"); }
-      }
-      
-      if (tx != null)
-      {
-         //Terminate any transaction
-         if (trace) { log.trace("Rolling back remaining tx"); }
-         
-         try
-         {
-            tx.rollback();
-         }
-         catch (Exception ignore)
-         {
-            if (trace) { log.trace("Failed to rollback", ignore); }
-         }
-         
-         if (trace) { log.trace("Rolled back remaining tx"); }
-      }
-      
-      try
-      {
-         sourceConn.close();
-      }
-      catch (Exception ignore)
-      {
-         if (trace) { log.trace("Failed to close source conn", ignore); }
-      }
-      
-      if (targetConn != null)
-      {
-         try
-         {
-            targetConn.close();
-         }
-         catch (Exception ignore)
-         {
-            if (trace) { log.trace("Failed to close target conn", ignore); }
-         }
-      }
-            
-      if (trace) { log.trace("Stopped " + this); }
-   }
-   
-   // Public ---------------------------------------------------------------------------
-   
-   public synchronized void pause() throws Exception
-   {
-      if (trace) { log.trace("Pausing " + this); }
-      
-      synchronized (lock)
-      {
-         paused = true;
-         
-         sourceConn.stop();
-      }
-      
-      if (trace) { log.trace("Paused " + this); }
-   }
-   
-   public synchronized void resume() throws Exception
-   {
-      if (trace) { log.trace("Resuming " + this); }
-      
-      synchronized (lock)
-      {
-         paused = false;
-         
-         sourceConn.start();
-      }
-      
-      if (trace) { log.trace("Resumed " + this); }
-   }
-      
-   public DestinationFactory getSourceDestinationFactory()
-   {
-   	return sourceDestinationFactory;
-   }
+   void resume() throws Exception;
 
-   public void setSourceDestinationFactory(DestinationFactory dest)
-   {
-   	if (started)
-   	{
-   		log.warn("Cannot set SourceDestinationFactory while bridge is started");
-   		return;
-   	}
-   	sourceDestinationFactory = dest;
-   }
-   
-   public DestinationFactory getTargetDestinationFactory()
-   {
-   	return targetDestinationFactory;
-   }
+   DestinationFactory getSourceDestinationFactory();
 
-   public void setTargetDestinationFactory(DestinationFactory dest)
-   {
-   	if (started)
-   	{
-   		log.warn("Cannot set TargetDestinationFactory while bridge is started");
-   		return;
-   	}
-   	targetDestinationFactory = dest;
-   }
-   
-   public String getSourceUsername()
-   {
-      return sourceUsername;
-   }
-   
-   public synchronized void setSourceUsername(String name)
-   {
-      if (started)
-      {
-         log.warn("Cannot set SourceUsername while bridge is started");
-         return;
-      }
-      sourceUsername = name;
-   }
-   
-   public synchronized String getSourcePassword()
-   {
-      return sourcePassword;
-   }
-   
-   public synchronized void setSourcePassword(String pwd)
-   {
-      if (started)
-      {
-         log.warn("Cannot set SourcePassword while bridge is started");
-         return;
-      }
-      sourcePassword = pwd;
-   }
-      
-   public synchronized String getDestUsername()
-   {
-      return targetUsername;
-   }
-   
-   public synchronized void setDestUserName(String name)
-   {
-      if (started)
-      {
-         log.warn("Cannot set DestUserName while bridge is started");
-         return;
-      }
-      this.targetUsername = name;
-   }
-   
-   public synchronized String getDestPassword()
-   {
-      return this.targetPassword;
-   }
-   
-   public synchronized void setDestPassword(String pwd)
-   {
-      if (started)
-      {
-         log.warn("Cannot set DestPassword while bridge is started");
-         return;
-      }
-      this.targetPassword = pwd;
-   }
-      
-   public synchronized String getSelector()
-   {
-      return selector;
-   }
-   
-   public synchronized void setSelector(String selector)
-   {
-      if (started)
-      {
-         log.warn("Cannot set Selector while bridge is started");
-         return;
-      }
-      this.selector = selector;
-   }
-   
-   public synchronized long getFailureRetryInterval()
-   {
-      return failureRetryInterval;
-   }
-   
-   public synchronized void setFailureRetryInterval(long interval)
-   {
-      if (started)
-      {
-         log.warn("Cannot set FailureRetryInterval while bridge is started");
-         return;
-      }
-      
-      this.failureRetryInterval = interval;
-   }    
-   
-   public synchronized int getMaxRetries()
-   {
-      return maxRetries;
-   }
-   
-   public synchronized void setMaxRetries(int retries)
-   {
-      if (started)
-      {
-         log.warn("Cannot set MaxRetries while bridge is started");
-         return;
-      }
-      
-      this.maxRetries = retries;
-   }
-      
-   public synchronized int getQualityOfServiceMode()
-   {
-      return qualityOfServiceMode;
-   }
-   
-   public synchronized void setQualityOfServiceMode(int mode)
-   {
-      if (started)
-      {
-         log.warn("Cannot set QualityOfServiceMode while bridge is started");
-         return;
-      }
-      
-      qualityOfServiceMode = mode;
-   }   
-   
-   public synchronized int getMaxBatchSize()
-   {
-      return maxBatchSize;
-   }
-   
-   public synchronized void setMaxBatchSize(int size)
-   {
-      if (started)
-      {
-         log.warn("Cannot set MaxBatchSize while bridge is started");
-         return;
-      }
-      
-      maxBatchSize = size;
-   }
-   
-   public synchronized long getMaxBatchTime()
-   {
-      return maxBatchTime;
-   }
-   
-   public synchronized void setMaxBatchTime(long time)
-   {
-      if (started)
-      {
-         log.warn("Cannot set MaxBatchTime while bridge is started");
-         return;
-      }
-      
-      maxBatchTime = time;
-   }
-      
-   public synchronized String getSubName()
-   {
-      return this.subName;
-   }
-   
-   public synchronized void setSubName(String subname)
-   {
-      if (started)
-      {
-         log.warn("Cannot set SubName while bridge is started");
-         return;
-      }
-      
-      this.subName = subname; 
-   }
-      
-   public synchronized String getClientID()
-   {
-      return clientID;
-   }
-   
-   public synchronized void setClientID(String clientID)
-   {
-      if (started)
-      {
-         log.warn("Cannot set ClientID while bridge is started");
-         return;
-      }
-      
-      this.clientID = clientID; 
-   }
-   
-   public boolean isAddMessageIDInHeader()
-   {
-   	return this.addMessageIDInHeader;
-   }
-   
-   public void setAddMessageIDInHeader(boolean value)
-   {
-   	this.addMessageIDInHeader = value;
-   }
-      
-   public synchronized boolean isPaused()
-   {
-      return paused;
-   }
-   
-   public synchronized boolean isFailed()
-   {
-      return failed;
-   }
-   
-   public synchronized boolean isStarted()
-   {
-      return started;
-   }
-   
-   public synchronized void setSourceConnectionFactoryFactory(ConnectionFactoryFactory cff)
-   {
-      if (started)
-      {
-         log.warn("Cannot set SourceConnectionFactoryFactory while bridge is started");
-         return;
-      }
-      this.sourceCff = cff;
-   }
-   
-   public synchronized void setDestConnectionFactoryFactory(ConnectionFactoryFactory cff)
-   {
-      if (started)
-      {
-         log.warn("Cannot set DestConnectionFactoryFactory while bridge is started");
-         return;
-      }
-      this.targetCff = cff;
-   }
-   
-   // Private -------------------------------------------------------------------
-   
-   private void checkParams()
-   {
-      if (sourceCff == null)
-      {
-         throw new IllegalArgumentException("sourceCff cannot be null");
-      }
-      if (targetCff == null)
-      {
-         throw new IllegalArgumentException("targetCff cannot be null");
-      }
-      if (sourceDestinationFactory == null)
-      {
-         throw new IllegalArgumentException("sourceDestinationFactory cannot be null");
-      }
-      if (targetDestinationFactory == null)
-      {
-         throw new IllegalArgumentException("targetDestinationFactory cannot be null");
-      }
-      if (failureRetryInterval < 0 && failureRetryInterval != -1)
-      {
-         throw new IllegalArgumentException("failureRetryInterval must be > 0 or -1 to represent no retry");
-      }
-      if (maxRetries < 0 && maxRetries != -1)
-      {
-         throw new IllegalArgumentException("maxRetries must be >= 0 or -1 to represent infinite retries");
-      }
-      if (failureRetryInterval == -1 && maxRetries > 0)
-      {
-         throw new IllegalArgumentException("If failureRetryInterval == -1 maxRetries must be 0");
-      }
-      if (!(maxBatchSize >= 1))
-      {
-         throw new IllegalArgumentException("maxBatchSize must be >= 1");
-      }
-      if (!(maxBatchTime == -1 || maxBatchTime >=1))
-      {
-         throw new IllegalArgumentException("maxBatchTime must be >= 1 or -1 to represent unlimited batch time");
-      }
-      if (qualityOfServiceMode != QOS_AT_MOST_ONCE && qualityOfServiceMode != QOS_DUPLICATES_OK && qualityOfServiceMode != QOS_ONCE_AND_ONLY_ONCE)
-      {
-         throw new IllegalArgumentException("Invalid quality of service mode " + qualityOfServiceMode);
-      }
-   }
-   
-   private void enlistResources(Transaction tx) throws Exception
-   {
-      if (trace) { log.trace("Enlisting resources in tx"); }
-      
-      XAResource resSource = ((XASession)sourceSession).getXAResource();
-      
-      tx.enlistResource(resSource);
-      
-      XAResource resDest = ((XASession)targetSession).getXAResource();
-      
-      tx.enlistResource(resDest);
-      
-      if (trace) { log.trace("Enlisted resources in tx"); }
-   }
-   
-   private void delistResources(Transaction tx) throws Exception
-   {
-      if (trace) { log.trace("Delisting resources from tx"); }
-      
-      XAResource resSource = ((XASession)sourceSession).getXAResource();
-      
-      tx.delistResource(resSource, XAResource.TMSUCCESS);
-      
-      XAResource resDest = ((XASession)targetSession).getXAResource();
-      
-      tx.delistResource(resDest, XAResource.TMSUCCESS);
-      
-      if (trace) { log.trace("Delisted resources from tx"); }
-   }
-   
-   private Transaction startTx() throws Exception
-   {
-      if (trace) { log.trace("Starting JTA transaction"); }
-      
-      TransactionManager tm = getTm();
-      
-      //Set timeout to a large value since we do not want to time out while waiting for messages
-      //to arrive - 10 years should be enough
-      tm.setTransactionTimeout(60 * 60 * 24 * 365 * 10);
-      
-      tm.begin();
-         
-      Transaction tx = tm.getTransaction();
-      
-      //Remove the association between current thread - we don't want it
-      //we will be committing /rolling back directly on the transaction object
-      
-      tm.suspend();
-      
-      if (trace) { log.trace("Started JTA transaction"); }
-      
-      return tx;
-   }
-   
-   private TransactionManager getTm()
-   {
-      if (tm == null)
-      {
-         tm = TransactionManagerLocator.getInstance().locate();
-         
-         if (tm == null)
-         {
-            throw new IllegalStateException("Cannot locate a transaction manager");
-         }         
-      }
-      
-      return tm;
-   }
-         
-   private Connection createConnection(String username, String password, ConnectionFactoryFactory cff)
-      throws Exception
-   {
-      Connection conn;
-      
-      ConnectionFactory cf = cff.createConnectionFactory();
-      
-      if (qualityOfServiceMode == QOS_ONCE_AND_ONLY_ONCE &&
-          !(cf instanceof XAConnectionFactory))
-      {
-         throw new IllegalArgumentException("Connection factory must be XAConnectionFactory");
-      }
-      
-      if (username == null)
-      {
-         if (qualityOfServiceMode == QOS_ONCE_AND_ONLY_ONCE)
-         {
-            if (trace) { log.trace("Creating an XA connection"); }
-            conn = ((XAConnectionFactory)cf).createXAConnection();
-         }
-         else
-         {
-            if (trace) { log.trace("Creating a non XA connection"); }
-            conn = cf.createConnection();            
-         }
-      }
-      else
-      {
-         if (qualityOfServiceMode == QOS_ONCE_AND_ONLY_ONCE)
-         {
-            if (trace) { log.trace("Creating an XA connection"); }
-            conn = ((XAConnectionFactory)cf).createXAConnection(username, password);
-         }
-         else
-         {
-            if (trace) { log.trace("Creating a non XA connection"); }
-            conn = cf.createConnection(username, password);            
-         }  
-      }
-      
-      return conn;
-   }
-    
-   /*
-    * Source and target on same server
-    * --------------------------------
-    * If the source and target destinations are on the same server (same resource manager) then,
-    * in order to get QOS_ONCE_AND_ONLY_ONCE, we simply need to consuming and send in a single
-    * local JMS transaction.
-    * 
-    * We actually use a single local transacted session for the other QoS modes too since this
-    * is more performant than using DUPS_OK_ACKNOWLEDGE or AUTO_ACKNOWLEDGE session ack modes, so effectively
-    * the QoS is upgraded.
-    * 
-    * Source and target on different server
-    * -------------------------------------
-    * If the source and target destinations are on a different servers (different resource managers) then:
-    * 
-    * If desired QoS is QOS_ONCE_AND_ONLY_ONCE, then we start a JTA transaction and enlist the consuming and sending
-    * XAResources in that.
-    * 
-    * If desired QoS is QOS_DUPLICATES_OK then, we use CLIENT_ACKNOWLEDGE for the consuming session and
-    * AUTO_ACKNOWLEDGE (this is ignored) for the sending session if the maxBatchSize == 1, otherwise we
-    * use a local transacted session for the sending session where maxBatchSize > 1, since this is more performant
-    * When bridging a batch, we make sure to manually acknowledge the consuming session, if it is CLIENT_ACKNOWLEDGE
-    * *after* the batch has been sent
-    * 
-    * If desired QoS is QOS_AT_MOST_ONCE then, if maxBatchSize == 1, we use AUTO_ACKNOWLEDGE for the consuming session,
-    * and AUTO_ACKNOWLEDGE for the sending session.
-    * If maxBatchSize > 1, we use CLIENT_ACKNOWLEDGE for the consuming session and a local transacted session for the
-    * sending session.
-    * 
-    * When bridging a batch, we make sure to manually acknowledge the consuming session, if it is CLIENT_ACKNOWLEDGE
-    * *before* the batch has been sent
-    * 
-    */
-   private boolean setupJMSObjects()
-   {
-      try
-      {  
-      	//Lookup the destinations
-      	sourceDestination = sourceDestinationFactory.createDestination();
-      	
-      	targetDestination = targetDestinationFactory.createDestination();
-      	      
-         if (sourceCff == targetCff)
-         {
-            //Source and target destinations are on the server - we can get once and only once
-            //just using a local transacted session
-         	//everything becomes once and only once
-         	
-         	forwardMode = FORWARD_MODE_LOCALTX;
-         }
-         else
-         {
-         	//Different servers
-         	if (qualityOfServiceMode == QOS_ONCE_AND_ONLY_ONCE)
-         	{
-         		//Use XA
-         		
-         		forwardMode = FORWARD_MODE_XA;
-         	}
-         	else
-         	{
-         		forwardMode = FORWARD_MODE_NONTX;
-         	}
-         }
-         
-      	//Lookup the destinations
-      	sourceDestination = sourceDestinationFactory.createDestination();
-      	
-      	targetDestination = targetDestinationFactory.createDestination();      	      
-         
-         sourceConn = createConnection(sourceUsername, sourcePassword, sourceCff);
-         
-         if (forwardMode != FORWARD_MODE_LOCALTX)
-         {
-            targetConn = createConnection(targetUsername, targetPassword, targetCff);
-            
-            targetConn.setExceptionListener(new BridgeExceptionListener());            
-         }
-                  
-         if (clientID != null)
-         {
-            sourceConn.setClientID(clientID);
-         }
-         
-         sourceConn.setExceptionListener(new BridgeExceptionListener());         
-          
-         Session sess;
-         
-         if (forwardMode == FORWARD_MODE_LOCALTX)
-         {
-            //We simply use a single local transacted session for consuming and sending      
-            
-            sourceSession = sourceConn.createSession(true, Session.SESSION_TRANSACTED);
-            
-            sess = sourceSession;
-         }
-         else
-         {
-            if (forwardMode == FORWARD_MODE_XA)
-            {
-               //Create an XASession for consuming from the source
-               if (trace) { log.trace("Creating XA source session"); }
-               
-               sourceSession = ((XAConnection)sourceConn).createXASession();
-               
-               sess = ((XASession)sourceSession).getSession();
-            }
-            else
-            {
-               if (trace) { log.trace("Creating non XA source session"); }
-               
-               //Create a standard session for consuming from the source
-               
-               //We use ack mode client ack
-                              
-               sourceSession = sourceConn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-               
-               sess = sourceSession;
-            }
-         }
-         
-         if (forwardMode == FORWARD_MODE_XA && sourceSession instanceof JBossSession)
-         {
-         	JBossSession jsession = (JBossSession)sourceSession;
+   void setSourceDestinationFactory(DestinationFactory dest);
 
-         	org.jboss.messaging.core.client.ClientSession session = jsession.getDelegate();
-            
-         	//session.setTreatAsNonTransactedWhenNotEnlisted(false);
-         }
-            
-         if (subName == null)
-         {
-            if (selector == null)
-            {
-               consumer = sess.createConsumer(sourceDestination);
-            }
-            else
-            {
-               consumer = sess.createConsumer(sourceDestination, selector, false);
-            }
-         }
-         else
-         {
-            //Durable subscription
-            if (selector == null)
-            {
-               consumer = sess.createDurableSubscriber((Topic)sourceDestination, subName);
-            }
-            else
-            {
-               consumer = sess.createDurableSubscriber((Topic)sourceDestination, subName, selector, false);
-            }
-         }
-         
-         //Now the sending session
-         
-         
-         if (forwardMode != FORWARD_MODE_LOCALTX)
-         {            
-            if (forwardMode == FORWARD_MODE_XA)
-            {
-               if (trace) { log.trace("Creating XA dest session"); }
-               
-               //Create an XA sesion for sending to the destination
-               
-               targetSession = ((XAConnection)targetConn).createXASession();
-               
-               sess = ((XASession)targetSession).getSession();
-            }
-            else
-            {
-               if (trace) { log.trace("Creating non XA dest session"); }
-               
-               //Create a standard session for sending to the target
-                                             
-               //If batch size > 1 we use a transacted session since is more efficient
-               
-               boolean transacted = maxBatchSize > 1;
-               
-               targetSession = targetConn.createSession(transacted, transacted ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
-               
-               sess = targetSession;
-            }       
-         }
-         
-         if (forwardMode == FORWARD_MODE_XA)
-         {
-            if (trace) { log.trace("Starting JTA transaction"); }
-            
-            tx = startTx();
-            
-            enlistResources(tx);                  
-         }
-         
-         producer = sess.createProducer(null);
-                          
-         consumer.setMessageListener(new SourceListener());
-         
-         return true;
-      }
-      catch (Exception e)
-      {
-         log.warn("Failed to set up connections", e);
-         
-         //If this fails we should attempt to cleanup or we might end up in some weird state
-         
-         cleanup();
-         
-         return false;
-      }
-   }
-   
-   private void cleanup()
-   {
-   	
-      //Close the old objects
-      try
-      {
-         sourceConn.close();
-      }
-      catch (Throwable ignore)
-      {            
-      	if (trace) { log.trace("Failed to close source connection", ignore); }
-      }
-      try
-      {
-         if (targetConn != null)
-         {
-            targetConn.close();
-         }
-      }
-      catch (Throwable ignore)
-      {    
-      	if (trace) { log.trace("Failed to close target connection", ignore); }
-      }
+   DestinationFactory getTargetDestinationFactory();
 
-   	
-      if (tx != null)
-      {
-         try
-         {
-            delistResources(tx);
-         }
-         catch (Throwable ignore)
-         {
-         	if (trace) { log.trace("Failed to delist resources", ignore); }
-         } 
-         try
-         {
-            //Terminate the tx
-            tx.rollback();
-         }
-         catch (Throwable ignore)
-         {
-         	if (trace) { log.trace("Failed to rollback", ignore); }
-         } 
-      }      
-   }
-   
-   private void pause(long interval)
-   {
-      long start = System.currentTimeMillis();
-      while (System.currentTimeMillis() - start < failureRetryInterval)
-      {
-         try
-         {
-            Thread.sleep(failureRetryInterval);
-         }
-         catch (InterruptedException ex)
-         {                  
-         }
-      }
-   }
-   
-   private boolean setupJMSObjectsWithRetry()
-   {
-      if (trace) { log.trace("Setting up connections"); }
-      
-      int count = 0;
-      
-      while (true)
-      {
-         boolean ok = setupJMSObjects();
-         
-         if (ok)
-         {
-            return true;
-         }
-         
-         count++;
-         
-         if (maxRetries != -1 && count == maxRetries)
-         {
-            break;
-         }
-         
-         log.warn("Failed to set up connections, will retry after a pause of " + failureRetryInterval + " ms");
-         
-         pause(failureRetryInterval);
-      }
-      
-      //If we get here then we exceed maxRetries
-      return false;      
-   }
-      
-   private void sendBatchNonTransacted()
-   {
-   	try
-      {         
-   		if (qualityOfServiceMode == QOS_AT_MOST_ONCE)
-   		{
-   			//We client ack before sending
-   			
-            if (trace) { log.trace("Client acking source session"); }
-               			
-            ((Message)messages.getLast()).acknowledge();
-            
-            if (trace) { log.trace("Client acked source session"); }            
-   		}
-   		   		
-         sendMessages();
-         
-         if (maxBatchSize > 1)
-         {
-         	//The sending session is transacted - we need to commit it
-         	
-            if (trace) { log.trace("Committing target session"); }
-                     	
-         	targetSession.commit();
-         	
-            if (trace) { log.trace("Committed source session"); }            
-         }
-         
-         if (qualityOfServiceMode == QOS_DUPLICATES_OK)
-   		{
-   			//We client ack after sending
-         	
-         	//Note we could actually use Session.DUPS_OK_ACKNOWLEDGE here
-         	//For a slightly less strong delivery guarantee
-   		
-            if (trace) { log.trace("Client acking source session"); }
-               			
-            ((Message)messages.getLast()).acknowledge();
-            
-            if (trace) { log.trace("Client acked source session"); }            
-   		}
-                                         
-         //Clear the messages
-         messages.clear();            
-      }
-      catch (Exception e)
-      {
-         log.warn("Failed to send + acknowledge batch, closing JMS objects", e);
-      
-         handleFailureOnSend();                                                 
-      }	
-   }
-   
-   private void sendBatchXA()
-   {
-   	try
-      {         
-         sendMessages();
-         
-         //Commit the JTA transaction and start another
-                                 
-         delistResources(tx);
-            
-         if (trace) { log.trace("Committing JTA transaction"); }
-         
-         tx.commit();
+   void setTargetDestinationFactory(DestinationFactory dest);
 
-         if (trace) { log.trace("Committed JTA transaction"); }
-         
-         tx = startTx();  
-         
-         enlistResources(tx);
-                  
-         //Clear the messages
-         messages.clear();            
-      }
-      catch (Exception e)
-      {
-         log.warn("Failed to send + acknowledge batch, closing JMS objects", e);
-      
-         handleFailureOnSend();                                                 
-      }
-   }
-   
-   private void sendBatchLocalTx()
-   {
-   	try
-      {         
-         sendMessages();
-                     
-         if (trace) { log.trace("Committing source session"); }
-         
-         sourceSession.commit();
-         
-         if (trace) { log.trace("Committed source session"); }     
-         
-         //Clear the messages
-         messages.clear();           
-      }
-      catch (Exception e)
-      {
-         log.warn("Failed to send + acknowledge batch, closing JMS objects", e);
-      
-         handleFailureOnSend();                                                 
-      }
-   }
-   
-   private void sendBatch() 
-   {
-      if (trace) { log.trace("Sending batch of " + messages.size() + " messages"); }
-        
-      if (paused)
-      {
-         //Don't send now
-         if (trace) { log.trace("Paused, so not sending now"); }
-         
-         return;            
-      }
-      
-      if (forwardMode == FORWARD_MODE_LOCALTX)
-      {
-      	sendBatchLocalTx();
-      }
-      else if (forwardMode == FORWARD_MODE_XA)      	
-      {
-      	sendBatchXA();
-      }
-      else
-      {
-      	sendBatchNonTransacted();
-      }
-   }
-   
-   private void sendMessages() throws Exception
-   {
-      Iterator iter = messages.iterator();
-      
-      Message msg = null;
-      
-      while (iter.hasNext())
-      {
-      	msg = (Message)iter.next();
-      	
-      	if (addMessageIDInHeader)
-         {
-         	addMessageIDInHeader(msg);            	
-         }
-         
-         if (trace) { log.trace("Sending message " + msg); }
-         
-         //Make sure the correct time to live gets propagated
-         
-         long timeToLive = msg.getJMSExpiration();
-         
-   		if (timeToLive != 0)
-   		{
-   			timeToLive -=  System.currentTimeMillis();
-   			
-   			if (timeToLive <= 0)
-   			{
-   				timeToLive = 1; //Should have already expired - set to 1 so it expires when it is consumed or delivered
-   			}
-   		}
-         
-   		producer.send(targetDestination, msg, msg.getJMSDeliveryMode(), msg.getJMSPriority(), timeToLive);
-   		
-         if (trace) { log.trace("Sent message " + msg); }     
-      }
-   }
-   
-   private void handleFailureOnSend()
-   {
-      handleFailure(new FailureHandler());
-   }
-   
-   private void handleFailureOnStartup()
-   {
-      handleFailure(new StartupFailureHandler());
-   }
-   
-   private void handleFailure(Runnable failureHandler)
-   {
-      failed = true;
+   String getSourceUsername();
 
-      //Failure must be handled on a separate thread to the calling thread (either onMessage or start).
-      //In the case of onMessage we can't close the connection from inside the onMessage method
-      //since it will block waiting for onMessage to complete. In the case of start we want to return
-      //from the call before the connections are reestablished so that the caller is not blocked unnecessarily.
-      Thread t = new Thread(failureHandler);
-      
-      t.start();         
-   }
-   
-   private void addMessageIDInHeader(Message msg) throws Exception
-   {
-   	//We concatenate the old message id as a header in the message
-   	//This allows the target to then use this as the JMSCorrelationID of any response message
-   	//thus enabling a distributed request-response pattern.
-   	//Each bridge (if there are more than one) in the chain can concatenate the message id
-   	//So in the case of multiple bridges having routed the message this can be used in a multi-hop
-   	//distributed request/response
-   	if (trace) { log.trace("Adding old message id in Message header"); }
-   	
-   	//Now JMS is really dumb and does not let you add a property on received message without first
-   	//calling clearProperties, so we need to save and re-add all the old properties so we
-   	//don't lose them!!
-   	
-   	Enumeration en = msg.getPropertyNames();
-   	
-   	Map<String, Object> oldProps = null;
-   	
-   	while (en.hasMoreElements())
-   	{
-   		String propName = (String)en.nextElement();
-   		
-   		if (oldProps == null)
-   		{
-   			oldProps = new HashMap<String, Object>();
-   		}
-   		
-   		oldProps.put(propName, msg.getObjectProperty(propName));
-   	}
-   	            	
-   	msg.clearProperties();
-   	
-   	if (oldProps != null)
-   	{
-   		Iterator iter2 = oldProps.entrySet().iterator();
-   		
-   		while (iter2.hasNext())
-   		{
-   			Map.Entry entry = (Map.Entry)iter2.next();
+   void setSourceUsername(String name);
 
-   			String propName = (String)entry.getKey();
+   String getSourcePassword();
 
-   			msg.setObjectProperty(propName, entry.getValue());   			
-   		}
-   	}
+   void setSourcePassword(String pwd);
 
-   	String val = null;
-   	
-   	val = msg.getStringProperty(JBossMessage.JBOSS_MESSAGING_BRIDGE_MESSAGE_ID_LIST);
-   	
-   	if (val == null)
-   	{
-   		val = msg.getJMSMessageID();
-   	}
-   	else
-   	{
-   		StringBuffer sb = new StringBuffer(val);
-   		
-   		sb.append(",").append(msg.getJMSMessageID());
-   		
-   		val = sb.toString();
-   	}
-   	
-   	msg.setStringProperty(JBossMessage.JBOSS_MESSAGING_BRIDGE_MESSAGE_ID_LIST, val);            	   
-   }
-   
-   // Inner classes ---------------------------------------------------------------
-   
-   private class FailureHandler implements Runnable
-   {
-      /**
-       * Start the source connection - note the source connection must not be started before
-       * otherwise messages will be received and ignored
-       */
-      protected void startSourceConnection()
-      {
-         try
-         {
-            sourceConn.start();
-         }
-         catch (JMSException e)
-         {
-            log.error("Failed to start source connection", e);
-         }
-      }
+   String getTargetUsername();
 
-      protected void succeeded()
-      {
-         log.info("Succeeded in reconnecting to servers");
-         
-         synchronized (lock)
-         {
-            failed = false;
+   void setTargetUsername(String name);
 
-            startSourceConnection();
-         }
-      }
-      
-      protected void failed()
-      {
-         //We haven't managed to recreate connections or maxRetries = 0
-         log.warn("Unable to set up connections, bridge will be stopped");
-         
-         try
-         {                  
-            stop();
-         }
-         catch (Exception ignore)
-         {                  
-         }
-      }
+   String getTargetPassword();
 
-      public void run()
-      {
-      	if (trace) { log.trace("Failure handler running"); }
-      	
-         // Clear the messages
-         messages.clear();
+   void setTargetPassword(String pwd);
 
-         cleanup();
-         
-         boolean ok = false;
-         
-         if (maxRetries > 0 || maxRetries == -1)
-         {
-            log.warn("Will retry after a pause of " + failureRetryInterval + " ms");
-            
-            pause(failureRetryInterval);
-            
-            //Now we try
-            ok = setupJMSObjectsWithRetry();
-         }
-         
-         if (!ok)
-         {
-            failed();
-         }
-         else
-         {
-            succeeded();
-         }    
-      }
-   }
-   
-   private class StartupFailureHandler extends FailureHandler
-   {
-      protected void failed()
-      {
-         // Don't call super
-         log.warn("Unable to set up connections, bridge will not be started");
-      }
-      
-      protected void succeeded()
-      {
-         // Don't call super - a bit ugly in this case but better than taking the lock twice.
-         log.info("Succeeded in connecting to servers");
-         
-         synchronized (lock)
-         {
-            failed = false;
-            started = true;
-            
-            //Start the source connection - note the source connection must not be started before
-            //otherwise messages will be received and ignored
-            
-            try
-            {
-               sourceConn.start();
-            }
-            catch (JMSException e)
-            {
-               log.error("Failed to start source connection", e);
-            }
-         }
-      }
-   }
-   
-   private class BatchTimeChecker implements Runnable
-   {
-      public void run()
-      {
-         if (trace) { log.trace(this + " running"); }
-         
-         synchronized (lock)
-         {
-            while (started)
-            {
-               long toWait = batchExpiryTime - System.currentTimeMillis();
-               
-               if (toWait <= 0)
-               {
-                  if (trace) { log.trace(this + " waited enough"); }
-                  
-                  synchronized (lock)
-                  {              
-                     if (!failed && !messages.isEmpty())
-                     {
-                        if (trace) { log.trace(this + " got some messages so sending batch"); }
-                        
-                        sendBatch();                     
-                        
-                        if (trace) { log.trace(this + " sent batch"); }
-                     }
-                  }
-                  
-                  batchExpiryTime = System.currentTimeMillis() + maxBatchTime;
-               }
-               else
-               {                    
-                  try
-                  {
-                     if (trace) { log.trace(this + " waiting for " + toWait); }
-                     
-                     lock.wait(toWait);
-                     
-                     if (trace) { log.trace(this + " woke up"); }
-                  }
-                  catch (InterruptedException e)
-                  {
-                     //Ignore
-                     if (trace) { log.trace(this + " thread was interrupted"); }
-                  }
-                  
-               }
-            }        
-         }
-      }      
-   }  
-   
-   private class SourceListener implements MessageListener
-   {
-      public void onMessage(Message msg)
-      {
-         synchronized (lock)
-         {
-            if (failed)
-            {
-               //Ignore the message
-               if (trace) { log.trace("Bridge has failed so ignoring message"); }
-                              
-               return;
-            }
-            
-            if (trace) { log.trace(this + " received message " + msg); }
-            
-            messages.add(msg);
-            
-            batchExpiryTime = System.currentTimeMillis() + maxBatchTime;            
-            
-            if (trace) { log.trace(this + " rescheduled batchExpiryTime to " + batchExpiryTime); }
-            
-            if (maxBatchSize != -1 && messages.size() >= maxBatchSize)
-            {
-               if (trace) { log.trace(this + " maxBatchSize has been reached so sending batch"); }
-               
-               sendBatch();
-               
-               if (trace) { log.trace(this + " sent batch"); }
-            }                        
-         }
-      }      
-   }   
-   
-   private class BridgeExceptionListener implements ExceptionListener
-   {
-		public void onException(JMSException e)
-		{
-			log.warn("Detected failure on connection", e);
-			
-			synchronized (lock)
-			{
-				if (failed)
-				{
-					//The failure has already been detected and is being handled
-					if (trace) { log.trace("Failure recovery already in progress"); }
-				}
-				else
-			   {				
-					handleFailure(new FailureHandler());
-			   }
-			}
-		}   	
-   }   
-}
+   String getSelector();
+
+   void setSelector(String selector);
+
+   long getFailureRetryInterval();
+
+   void setFailureRetryInterval(long interval);
+
+   int getMaxRetries();
+
+   void setMaxRetries(int retries);
+
+   QualityOfServiceMode getQualityOfServiceMode();
+
+   void setQualityOfServiceMode(QualityOfServiceMode mode);
+
+   int getMaxBatchSize();
+
+   void setMaxBatchSize(int size);
+
+   long getMaxBatchTime();
+
+   void setMaxBatchTime(long time);
+
+   String getSubscriptionName();
+
+   void setSubscriptionName(String subname);
+
+   String getClientID();
+
+   void setClientID(String clientID);
+
+   boolean isAddMessageIDInHeader();
+
+   void setAddMessageIDInHeader(boolean value);
+
+   boolean isPaused();
+
+   boolean isFailed();
+
+   void setSourceConnectionFactoryFactory(ConnectionFactoryFactory cff);
+
+   void setTargetConnectionFactoryFactory(ConnectionFactoryFactory cff);
+
+   void setTransactionManager(TransactionManager tm);
+
+}
\ No newline at end of file

Modified: trunk/src/main/org/jboss/messaging/jms/bridge/BridgeService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/bridge/BridgeService.java	2008-06-25 12:28:42 UTC (rev 4575)
+++ trunk/src/main/org/jboss/messaging/jms/bridge/BridgeService.java	2008-06-25 13:00:39 UTC (rev 4576)
@@ -26,6 +26,7 @@
 
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.server.MessagingComponent;
+import org.jboss.messaging.jms.bridge.impl.BridgeImpl;
 
 /**
  * A BridgeService
@@ -52,7 +53,7 @@
       
    public BridgeService()
    {
-      bridge = new Bridge();
+      bridge = new BridgeImpl();
    }
    
    // JMX attributes ----------------------------------------------------------------
@@ -139,32 +140,32 @@
 
    public String getTargetUsername()
    {
-      return bridge.getDestUsername();
+      return bridge.getTargetUsername();
    }
 
    public String getTargetPassword()
    {
-      return bridge.getDestPassword();
+      return bridge.getTargetPassword();
    }
    
    public void setTargetUsername(String name)
    {
-      bridge.setDestUserName(name);
+      bridge.setTargetUsername(name);
    }
    
    public void setTargetPassword(String pwd)
    {
-      bridge.setDestPassword(pwd);
+      bridge.setTargetPassword(pwd);
    }
    
    public int getQualityOfServiceMode()
    {
-      return bridge.getQualityOfServiceMode();
+      return bridge.getQualityOfServiceMode().intValue();
    }
    
    public void setQualityOfServiceMode(int mode)
    {
-      bridge.setQualityOfServiceMode(mode);
+      bridge.setQualityOfServiceMode(QualityOfServiceMode.valueOf(mode));
    }
    
    public String getSelector()
@@ -199,12 +200,12 @@
 
    public String getSubName()
    {
-      return bridge.getSubName();
+      return bridge.getSubscriptionName();
    }
    
    public void setSubName(String subname)
    {
-      bridge.setSubName(subname);
+      bridge.setSubscriptionName(subname);
    }
 
    public String getClientID()

Deleted: trunk/src/main/org/jboss/messaging/jms/bridge/JNDIConnectionFactoryFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/bridge/JNDIConnectionFactoryFactory.java	2008-06-25 12:28:42 UTC (rev 4575)
+++ trunk/src/main/org/jboss/messaging/jms/bridge/JNDIConnectionFactoryFactory.java	2008-06-25 13:00:39 UTC (rev 4576)
@@ -1,50 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */ 
-
-package org.jboss.messaging.jms.bridge;
-
-import java.util.Hashtable;
-
-import javax.jms.ConnectionFactory;
-
-/**
- * A JNDIConnectionFactoryFactory
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision$</tt>
- *
- * $Id$
- *
- */
-public class JNDIConnectionFactoryFactory extends JNDIFactorySupport implements ConnectionFactoryFactory
-{
-	public JNDIConnectionFactoryFactory(Hashtable jndiProperties, String lookup)
-   {
-      super(jndiProperties, lookup);      
-   }
-
-   public ConnectionFactory createConnectionFactory() throws Exception
-   {
-   	return (ConnectionFactory)createObject();   	
-   }
-
-}

Deleted: trunk/src/main/org/jboss/messaging/jms/bridge/JNDIDestinationFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/bridge/JNDIDestinationFactory.java	2008-06-25 12:28:42 UTC (rev 4575)
+++ trunk/src/main/org/jboss/messaging/jms/bridge/JNDIDestinationFactory.java	2008-06-25 13:00:39 UTC (rev 4576)
@@ -1,48 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */ 
-
-package org.jboss.messaging.jms.bridge;
-
-import java.util.Hashtable;
-
-import javax.jms.Destination;
-
-/**
- * 
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision: $</tt>10 Oct 2007
- *
- * $Id: $
- *
- */
-public class JNDIDestinationFactory extends JNDIFactorySupport implements DestinationFactory
-{
-   public JNDIDestinationFactory(Hashtable jndiProperties, String lookup)
-   {
-      super(jndiProperties, lookup);      
-   }
-
-   public Destination createDestination() throws Exception
-   {
-   	return (Destination)createObject();   	
-   }
-}

Deleted: trunk/src/main/org/jboss/messaging/jms/bridge/JNDIFactorySupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/bridge/JNDIFactorySupport.java	2008-06-25 12:28:42 UTC (rev 4575)
+++ trunk/src/main/org/jboss/messaging/jms/bridge/JNDIFactorySupport.java	2008-06-25 13:00:39 UTC (rev 4576)
@@ -1,78 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */ 
-
-package org.jboss.messaging.jms.bridge;
-
-import java.util.Hashtable;
-
-import javax.naming.InitialContext;
-
-/**
- * 
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision: $</tt>10 Oct 2007
- *
- * $Id: $
- *
- */
-public abstract class JNDIFactorySupport
-{
-	protected Hashtable jndiProperties;
-   
-   protected String lookup;
-   
-   protected JNDIFactorySupport(Hashtable jndiProperties, String lookup)
-   {
-      this.jndiProperties = jndiProperties;
-      
-      this.lookup = lookup;       
-   }
-
-   protected Object createObject() throws Exception
-   {
-      InitialContext ic = null;
-      
-      Object obj = null;
-      
-      try
-      {
-         if (jndiProperties == null)
-         {
-            ic = new InitialContext();
-         }
-         else
-         {
-            ic = new InitialContext(jndiProperties);
-         }
-         
-         obj = ic.lookup(lookup);         
-      }
-      finally
-      {
-         if (ic != null)
-         {
-            ic.close();
-         }
-      }
-      return obj;      
-   }
-}

Added: trunk/src/main/org/jboss/messaging/jms/bridge/impl/BridgeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/bridge/impl/BridgeImpl.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/jms/bridge/impl/BridgeImpl.java	2008-06-25 13:00:39 UTC (rev 4576)
@@ -0,0 +1,1579 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */ 
+
+package org.jboss.messaging.jms.bridge.impl;
+
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.XAConnection;
+import javax.jms.XAConnectionFactory;
+import javax.jms.XASession;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+import javax.transaction.xa.XAResource;
+
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.server.MessagingComponent;
+import org.jboss.messaging.jms.bridge.Bridge;
+import org.jboss.messaging.jms.bridge.ConnectionFactoryFactory;
+import org.jboss.messaging.jms.bridge.DestinationFactory;
+import org.jboss.messaging.jms.bridge.QualityOfServiceMode;
+import org.jboss.messaging.jms.client.JBossMessage;
+import org.jboss.messaging.jms.client.JBossSession;
+import org.jboss.tm.TransactionManagerLocator;
+
+/**
+ * 
+ * A Bridge
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision:4566 $</tt>
+ *
+ * $Id:Bridge.java 4566 2008-06-24 08:01:35Z jmesnil $
+ *
+ */
+public class BridgeImpl implements MessagingComponent, Bridge
+{
+   private static final Logger log;
+   
+   private static boolean trace;
+   
+   static
+   {
+      log = Logger.getLogger(BridgeImpl.class);
+      
+      trace = log.isTraceEnabled();
+   }
+
+   private static final int TEN_YEARS = 60 * 60 * 24 * 365 * 10; // in ms
+
+   private final Object lock = new Object();
+   
+   private String sourceUsername;
+   
+   private String sourcePassword;
+   
+   private String targetUsername;
+   
+   private String targetPassword;
+   
+   private TransactionManager tm;
+   
+   private String selector;
+   
+   private long failureRetryInterval;
+   
+   private int maxRetries;
+   
+   private QualityOfServiceMode qualityOfServiceMode;
+   
+   private int maxBatchSize;
+   
+   private long maxBatchTime;
+   
+   private String subName;
+   
+   private String clientID;
+   
+   private volatile boolean addMessageIDInHeader;
+   
+   private boolean started;
+   
+   private LinkedList<Message> messages;
+   
+   private ConnectionFactoryFactory sourceCff;
+   
+   private ConnectionFactoryFactory targetCff;
+   
+   private DestinationFactory sourceDestinationFactory;
+   
+   private DestinationFactory targetDestinationFactory;
+   
+   private Connection sourceConn; 
+   
+   private Connection targetConn;
+   
+   private Destination sourceDestination;
+   
+   private Destination targetDestination;
+   
+   private Session sourceSession;
+   
+   private Session targetSession;
+   
+   private MessageConsumer sourceConsumer;
+   
+   private MessageProducer targetProducer;
+        
+   private BatchTimeChecker timeChecker;
+   
+   private Thread checkerThread;
+   
+   private long batchExpiryTime;
+   
+   private boolean paused;         
+   
+   private Transaction tx;  
+   
+   private boolean failed;
+   
+   private int forwardMode;
+   
+   private static final int FORWARD_MODE_XA = 0;
+   
+   private static final int FORWARD_MODE_LOCALTX = 1;
+   
+   private static final int FORWARD_MODE_NONTX = 2;
+   
+   /*
+    * Constructor for MBean
+    */
+   public BridgeImpl()
+   {      
+      this.messages = new LinkedList<Message>();      
+   }
+   
+   public BridgeImpl(ConnectionFactoryFactory sourceCff, ConnectionFactoryFactory targetCff,
+                 DestinationFactory sourceDestinationFactory, DestinationFactory targetDestinationFactory,         
+                 String sourceUsername, String sourcePassword,
+                 String targetUsername, String targetPassword,
+                 String selector, long failureRetryInterval,
+                 int maxRetries,
+                 QualityOfServiceMode qosMode,
+                 int maxBatchSize, long maxBatchTime,
+                 String subName, String clientID,
+                 boolean addMessageIDInHeader)
+   {            
+      this();
+      
+      this.sourceCff = sourceCff;
+      
+      this.targetCff = targetCff;
+      
+      this.sourceDestinationFactory = sourceDestinationFactory;
+      
+      this.targetDestinationFactory = targetDestinationFactory;
+      
+      this.sourceUsername = sourceUsername;
+      
+      this.sourcePassword = sourcePassword;
+      
+      this.targetUsername = targetUsername;
+      
+      this.targetPassword = targetPassword;
+      
+      this.selector = selector;
+      
+      this.failureRetryInterval = failureRetryInterval;
+      
+      this.maxRetries = maxRetries;
+      
+      this.qualityOfServiceMode = qosMode;
+      
+      this.maxBatchSize = maxBatchSize;
+      
+      this.maxBatchTime = maxBatchTime;
+      
+      this.subName = subName;
+      
+      this.clientID = clientID;
+      
+      this.addMessageIDInHeader = addMessageIDInHeader;
+              
+      checkParams();
+      
+      if (trace)
+      {
+         log.trace("Created " + this);
+      }
+   }
+         
+   // MessagingComponent overrides --------------------------------------------------
+        
+   public synchronized void start() throws Exception
+   {
+      if (started)
+      {
+         log.warn("Attempt to start, but is already started");
+         return;
+      }
+      
+      if (trace) { log.trace("Starting " + this); }         
+      
+      checkParams();
+      
+      TransactionManager tm = getTm();
+      
+      //There may already be a JTA transaction associated to the thread
+      
+      boolean ok;
+      
+      Transaction toResume = null;
+      try
+      {
+         toResume = tm.suspend();
+         
+         ok = setupJMSObjects();
+      }
+      finally
+      {
+         if (toResume != null)
+         {
+            tm.resume(toResume);
+         }
+      }
+      
+      if (ok)
+      {         
+         //start the source connection
+         
+         sourceConn.start();
+         
+         started = true;
+         
+         if (maxBatchTime != -1)
+         {
+            if (trace) { log.trace("Starting time checker thread"); }
+                     
+            timeChecker = new BatchTimeChecker();
+            
+            checkerThread = new Thread(timeChecker);
+            
+            batchExpiryTime = System.currentTimeMillis() + maxBatchTime;
+            
+            checkerThread.start();
+            
+            if (trace) { log.trace("Started time checker thread"); }
+         }            
+         
+         if (trace) { log.trace("Started " + this); }
+      }
+      else
+      {
+         log.warn("Failed to start bridge");
+         handleFailureOnStartup();
+      }
+   }
+   
+   public synchronized void stop() throws Exception
+   {
+      if (!started)
+      {
+         log.warn("Attempt to stop, but is already stopped");
+         return;
+      }
+      
+      if (trace) { log.trace("Stopping " + this); }
+      
+      synchronized (lock)
+      {
+         started = false;          
+         
+         //This must be inside sync block
+         if (checkerThread != null)
+         {
+            checkerThread.interrupt();
+         }
+      }
+            
+      //This must be outside sync block
+      if (checkerThread != null)
+      {  
+         if (trace) { log.trace("Waiting for checker thread to finish");}
+         
+         checkerThread.join();
+         
+         if (trace) { log.trace("Checker thread has finished"); }
+      }
+      
+      if (tx != null)
+      {
+         //Terminate any transaction
+         if (trace) { log.trace("Rolling back remaining tx"); }
+         
+         try
+         {
+            tx.rollback();
+         }
+         catch (Exception ignore)
+         {
+            if (trace) { log.trace("Failed to rollback", ignore); }
+         }
+         
+         if (trace) { log.trace("Rolled back remaining tx"); }
+      }
+      
+      try
+      {
+         sourceConn.close();
+      }
+      catch (Exception ignore)
+      {
+         if (trace) { log.trace("Failed to close source conn", ignore); }
+      }
+      
+      if (targetConn != null)
+      {
+         try
+         {
+            targetConn.close();
+         }
+         catch (Exception ignore)
+         {
+            if (trace) { log.trace("Failed to close target conn", ignore); }
+         }
+      }
+            
+      if (trace) { log.trace("Stopped " + this); }
+   }
+   
+   public synchronized boolean isStarted()
+   {
+      return started;
+   }
+
+   // Bridge implementation ------------------------------------------------------------
+
+   public synchronized void pause() throws Exception
+   {
+      if (trace) { log.trace("Pausing " + this); }
+      
+      synchronized (lock)
+      {
+         paused = true;
+         
+         sourceConn.stop();
+      }
+      
+      if (trace) { log.trace("Paused " + this); }
+   }
+   
+   public synchronized void resume() throws Exception
+   {
+      if (trace) { log.trace("Resuming " + this); }
+      
+      synchronized (lock)
+      {
+         paused = false;
+         
+         sourceConn.start();
+      }
+      
+      if (trace) { log.trace("Resumed " + this); }
+   }
+      
+   public DestinationFactory getSourceDestinationFactory()
+   {
+   	return sourceDestinationFactory;
+   }
+
+   public void setSourceDestinationFactory(DestinationFactory dest)
+   {
+      checkBridgeNotStarted();
+   	checkNotNull(dest, "TargetDestinationFactory");
+   	
+   	sourceDestinationFactory = dest;
+   }
+   
+   public DestinationFactory getTargetDestinationFactory()
+   {
+   	return targetDestinationFactory;
+   }
+
+   public void setTargetDestinationFactory(DestinationFactory dest)
+   {
+      checkBridgeNotStarted();
+      checkNotNull(dest, "TargetDestinationFactory");
+      
+   	targetDestinationFactory = dest;
+   }
+   
+   public String getSourceUsername()
+   {
+      return sourceUsername;
+   }
+   
+   public synchronized void setSourceUsername(String name)
+   {
+      checkBridgeNotStarted();
+      
+      sourceUsername = name;
+   }
+   
+   public synchronized String getSourcePassword()
+   {
+      return sourcePassword;
+   }
+   
+   public synchronized void setSourcePassword(String pwd)
+   {
+      checkBridgeNotStarted();
+      
+      sourcePassword = pwd;
+   }
+      
+   public synchronized String getTargetUsername()
+   {
+      return targetUsername;
+   }
+   
+   public synchronized void setTargetUsername(String name)
+   {
+      checkBridgeNotStarted();
+      
+      this.targetUsername = name;
+   }
+   
+   public synchronized String getTargetPassword()
+   {
+      return this.targetPassword;
+   }
+   
+   public synchronized void setTargetPassword(String pwd)
+   {
+      checkBridgeNotStarted();
+      
+      this.targetPassword = pwd;
+   }
+
+   public synchronized String getSelector()
+   {
+      return selector;
+   }
+   
+   public synchronized void setSelector(String selector)
+   {
+      checkBridgeNotStarted();
+      
+      this.selector = selector;
+   }
+   
+   public synchronized long getFailureRetryInterval()
+   {
+      return failureRetryInterval;
+   }
+   
+   public synchronized void setFailureRetryInterval(long interval)
+   {
+      checkBridgeNotStarted();
+      checkValidValue(interval, "FailureRetryInterval");
+      
+      this.failureRetryInterval = interval;
+   }  
+   
+   public synchronized int getMaxRetries()
+   {
+      return maxRetries;
+   }
+   
+   public synchronized void setMaxRetries(int retries)
+   {
+      checkBridgeNotStarted();
+      checkValidValue(retries, "MaxRetries");
+      
+      this.maxRetries = retries;
+   }
+      
+   public synchronized QualityOfServiceMode getQualityOfServiceMode()
+   {
+      return qualityOfServiceMode;
+   }
+   
+   public synchronized void setQualityOfServiceMode(QualityOfServiceMode mode)
+   {
+      checkBridgeNotStarted();
+      checkNotNull(mode, "QualityOfServiceMode");
+      
+      qualityOfServiceMode = mode;
+   }   
+   
+   public synchronized int getMaxBatchSize()
+   {
+      return maxBatchSize;
+   }
+   
+   public synchronized void setMaxBatchSize(int size)
+   {
+      checkBridgeNotStarted();
+      checkMaxBatchSize(size);
+      
+      maxBatchSize = size;
+   }
+   
+   public synchronized long getMaxBatchTime()
+   {
+      return maxBatchTime;
+   }
+   
+   public synchronized void setMaxBatchTime(long time)
+   {
+      checkBridgeNotStarted();
+      checkValidValue(time, "MaxBatchTime");
+      
+      maxBatchTime = time;
+   }
+      
+   public synchronized String getSubscriptionName()
+   {
+      return this.subName;
+   }
+   
+   public synchronized void setSubscriptionName(String subname)
+   {
+      checkBridgeNotStarted();
+      this.subName = subname; 
+   }
+      
+   public synchronized String getClientID()
+   {
+      return clientID;
+   }
+   
+   public synchronized void setClientID(String clientID)
+   {
+      checkBridgeNotStarted();
+      
+      this.clientID = clientID; 
+   }
+   
+   public boolean isAddMessageIDInHeader()
+   {
+   	return this.addMessageIDInHeader;
+   }
+   
+   public void setAddMessageIDInHeader(boolean value)
+   {
+   	this.addMessageIDInHeader = value;
+   }
+      
+   public synchronized boolean isPaused()
+   {
+      return paused;
+   }
+   
+   public synchronized boolean isFailed()
+   {
+      return failed;
+   }
+
+   public synchronized void setSourceConnectionFactoryFactory(ConnectionFactoryFactory cff)
+   {
+      checkBridgeNotStarted();
+      checkNotNull(cff, "SourceConnectionFactoryFactory");
+      
+      this.sourceCff = cff;
+   }
+   
+   public synchronized void setTargetConnectionFactoryFactory(ConnectionFactoryFactory cff)
+   {
+      checkBridgeNotStarted();
+      checkNotNull(cff, "TargetConnectionFactoryFactory");
+
+      this.targetCff = cff;
+   }
+   
+   public void setTransactionManager(TransactionManager tm)
+   {
+      this.tm = tm;
+   }
+
+   // Public ---------------------------------------------------------------------------
+   
+   // Private -------------------------------------------------------------------
+
+   private void checkParams()
+   {
+      checkNotNull(sourceCff, "sourceCff");
+      checkNotNull(targetCff, "targetCff");
+      checkNotNull(sourceDestinationFactory, "sourceDestinationFactory");
+      checkNotNull(targetDestinationFactory, "targetDestinationFactory");
+      checkValidValue(failureRetryInterval, "failureRetryInterval");
+      checkValidValue(maxRetries, "maxRetries");
+      if (failureRetryInterval == -1 && maxRetries > 0)
+      {
+         throw new IllegalArgumentException("If failureRetryInterval == -1 maxRetries must be 0");
+      }
+      checkMaxBatchSize(maxBatchSize);
+      checkValidValue(maxBatchTime, "maxBatchTime");
+      checkNotNull(qualityOfServiceMode, "qualityOfServiceMode");
+   }
+
+   /**
+    * Check the object is not null
+    * 
+    * @throws IllegalArgumentException if the object is null
+    */
+   private static void checkNotNull(Object obj, String name)
+   {
+      if (obj == null)
+      {
+         throw new IllegalArgumentException(name + " cannot be null");
+      }
+   }
+   
+   /**
+    * Check the bridge is not started
+    * 
+    * @throws IllegalStateException if the bridge is started
+    */
+   private void checkBridgeNotStarted()
+   {
+      if (started)
+      {
+         throw new IllegalStateException("Cannot set bridge attributes while it is started");
+      }
+   }
+   
+   /**
+    * Check that value is either equals to -1 or greater than 0
+    * 
+    * @throws IllegalArgumentException if the value is not valid
+    */
+   private static void checkValidValue(long value, String name)
+   {
+      if (!(value == -1 || value > 0))
+      {
+         throw new IllegalArgumentException(name + " must be > 0 or -1");
+      }
+   }  
+   
+   private static void checkMaxBatchSize(int size)
+   {
+      if (!(size >= 1))
+      {
+         throw new IllegalArgumentException("maxBatchSize must be >= 1");
+      }
+   }
+
+   private void enlistResources(Transaction tx) throws Exception
+   {
+      if (trace) { log.trace("Enlisting resources in tx"); }
+      
+      XAResource resSource = ((XASession)sourceSession).getXAResource();
+      
+      tx.enlistResource(resSource);
+      
+      XAResource resDest = ((XASession)targetSession).getXAResource();
+      
+      tx.enlistResource(resDest);
+      
+      if (trace) { log.trace("Enlisted resources in tx"); }
+   }
+   
+   private void delistResources(Transaction tx) throws Exception
+   {
+      if (trace) { log.trace("Delisting resources from tx"); }
+      
+      XAResource resSource = ((XASession)sourceSession).getXAResource();
+      
+      tx.delistResource(resSource, XAResource.TMSUCCESS);
+      
+      XAResource resDest = ((XASession)targetSession).getXAResource();
+      
+      tx.delistResource(resDest, XAResource.TMSUCCESS);
+      
+      if (trace) { log.trace("Delisted resources from tx"); }
+   }
+   
+   private Transaction startTx() throws Exception
+   {
+      if (trace) { log.trace("Starting JTA transaction"); }
+      
+      TransactionManager tm = getTm();
+      
+      //Set timeout to a large value since we do not want to time out while waiting for messages
+      //to arrive - 10 years should be enough
+      tm.setTransactionTimeout(TEN_YEARS);
+      
+      tm.begin();
+         
+      Transaction tx = tm.getTransaction();
+      
+      //Remove the association between current thread - we don't want it
+      //we will be committing /rolling back directly on the transaction object
+      
+      tm.suspend();
+      
+      if (trace) { log.trace("Started JTA transaction"); }
+      
+      return tx;
+   }
+   
+   private TransactionManager getTm()
+   {
+      if (tm == null)
+      {
+         tm = TransactionManagerLocator.getInstance().locate();
+         
+         if (tm == null)
+         {
+            throw new IllegalStateException("Cannot locate a transaction manager");
+         }         
+      }
+      
+      return tm;
+   }
+         
+   private Connection createConnection(String username, String password, ConnectionFactoryFactory cff)
+      throws Exception
+   {
+      Connection conn;
+      
+      ConnectionFactory cf = cff.createConnectionFactory();
+      
+      if (qualityOfServiceMode == QualityOfServiceMode.ONCE_AND_ONLY_ONCE &&
+          !(cf instanceof XAConnectionFactory))
+      {
+         throw new IllegalArgumentException("Connection factory must be XAConnectionFactory");
+      }
+      
+      if (username == null)
+      {
+         if (qualityOfServiceMode == QualityOfServiceMode.ONCE_AND_ONLY_ONCE)
+         {
+            if (trace) { log.trace("Creating an XA connection"); }
+            conn = ((XAConnectionFactory)cf).createXAConnection();
+         }
+         else
+         {
+            if (trace) { log.trace("Creating a non XA connection"); }
+            conn = cf.createConnection();            
+         }
+      }
+      else
+      {
+         if (qualityOfServiceMode == QualityOfServiceMode.ONCE_AND_ONLY_ONCE)
+         {
+            if (trace) { log.trace("Creating an XA connection"); }
+            conn = ((XAConnectionFactory)cf).createXAConnection(username, password);
+         }
+         else
+         {
+            if (trace) { log.trace("Creating a non XA connection"); }
+            conn = cf.createConnection(username, password);            
+         }  
+      }
+      
+      return conn;
+   }
+    
+   /*
+    * Source and target on same server
+    * --------------------------------
+    * If the source and target destinations are on the same server (same resource manager) then,
+    * in order to get ONCE_AND_ONLY_ONCE, we simply need to consuming and send in a single
+    * local JMS transaction.
+    * 
+    * We actually use a single local transacted session for the other QoS modes too since this
+    * is more performant than using DUPS_OK_ACKNOWLEDGE or AUTO_ACKNOWLEDGE session ack modes, so effectively
+    * the QoS is upgraded.
+    * 
+    * Source and target on different server
+    * -------------------------------------
+    * If the source and target destinations are on a different servers (different resource managers) then:
+    * 
+    * If desired QoS is ONCE_AND_ONLY_ONCE, then we start a JTA transaction and enlist the consuming and sending
+    * XAResources in that.
+    * 
+    * If desired QoS is DUPLICATES_OK then, we use CLIENT_ACKNOWLEDGE for the consuming session and
+    * AUTO_ACKNOWLEDGE (this is ignored) for the sending session if the maxBatchSize == 1, otherwise we
+    * use a local transacted session for the sending session where maxBatchSize > 1, since this is more performant
+    * When bridging a batch, we make sure to manually acknowledge the consuming session, if it is CLIENT_ACKNOWLEDGE
+    * *after* the batch has been sent
+    * 
+    * If desired QoS is AT_MOST_ONCE then, if maxBatchSize == 1, we use AUTO_ACKNOWLEDGE for the consuming session,
+    * and AUTO_ACKNOWLEDGE for the sending session.
+    * If maxBatchSize > 1, we use CLIENT_ACKNOWLEDGE for the consuming session and a local transacted session for the
+    * sending session.
+    * 
+    * When bridging a batch, we make sure to manually acknowledge the consuming session, if it is CLIENT_ACKNOWLEDGE
+    * *before* the batch has been sent
+    * 
+    */
+   private boolean setupJMSObjects()
+   {
+      try
+      {  
+         if (sourceCff == targetCff)
+         {
+            //Source and target destinations are on the server - we can get once and only once
+            //just using a local transacted session
+         	//everything becomes once and only once
+         	
+         	forwardMode = FORWARD_MODE_LOCALTX;
+         }
+         else
+         {
+         	//Different servers
+         	if (qualityOfServiceMode == QualityOfServiceMode.ONCE_AND_ONLY_ONCE)
+         	{
+         		//Use XA
+         		
+         		forwardMode = FORWARD_MODE_XA;
+         	}
+         	else
+         	{
+         		forwardMode = FORWARD_MODE_NONTX;
+         	}
+         }
+         
+      	//Lookup the destinations
+      	sourceDestination = sourceDestinationFactory.createDestination();
+      	
+      	targetDestination = targetDestinationFactory.createDestination();      	      
+         
+         sourceConn = createConnection(sourceUsername, sourcePassword, sourceCff);
+         
+         if (forwardMode != FORWARD_MODE_LOCALTX)
+         {
+            targetConn = createConnection(targetUsername, targetPassword, targetCff);
+            
+            targetConn.setExceptionListener(new BridgeExceptionListener());            
+         }
+                  
+         if (clientID != null)
+         {
+            sourceConn.setClientID(clientID);
+         }
+         
+         sourceConn.setExceptionListener(new BridgeExceptionListener());         
+          
+         Session sess;
+         
+         if (forwardMode == FORWARD_MODE_LOCALTX)
+         {
+            //We simply use a single local transacted session for consuming and sending      
+            
+            sourceSession = sourceConn.createSession(true, Session.SESSION_TRANSACTED);
+            
+            sess = sourceSession;
+         }
+         else
+         {
+            if (forwardMode == FORWARD_MODE_XA)
+            {
+               //Create an XASession for consuming from the source
+               if (trace) { log.trace("Creating XA source session"); }
+               
+               sourceSession = ((XAConnection)sourceConn).createXASession();
+               
+               sess = ((XASession)sourceSession).getSession();
+            }
+            else
+            {
+               if (trace) { log.trace("Creating non XA source session"); }
+               
+               //Create a standard session for consuming from the source
+               
+               //We use ack mode client ack
+                              
+               sourceSession = sourceConn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+               
+               sess = sourceSession;
+            }
+         }
+         
+         if (forwardMode == FORWARD_MODE_XA && sourceSession instanceof JBossSession)
+         {
+         	JBossSession jsession = (JBossSession)sourceSession;
+
+         	ClientSession clientSession = jsession.getDelegate();
+            
+         	//clientSession.setTreatAsNonTransactedWhenNotEnlisted(false);
+         }
+            
+         if (subName == null)
+         {
+            if (selector == null)
+            {
+               sourceConsumer = sess.createConsumer(sourceDestination);
+            }
+            else
+            {
+               sourceConsumer = sess.createConsumer(sourceDestination, selector, false);
+            }
+         }
+         else
+         {
+            //Durable subscription
+            if (selector == null)
+            {
+               sourceConsumer = sess.createDurableSubscriber((Topic)sourceDestination, subName);
+            }
+            else
+            {
+               sourceConsumer = sess.createDurableSubscriber((Topic)sourceDestination, subName, selector, false);
+            }
+         }
+         
+         //Now the sending session
+         
+         
+         if (forwardMode != FORWARD_MODE_LOCALTX)
+         {            
+            if (forwardMode == FORWARD_MODE_XA)
+            {
+               if (trace) { log.trace("Creating XA dest session"); }
+               
+               //Create an XA sesion for sending to the destination
+               
+               targetSession = ((XAConnection)targetConn).createXASession();
+               
+               sess = ((XASession)targetSession).getSession();
+            }
+            else
+            {
+               if (trace) { log.trace("Creating non XA dest session"); }
+               
+               //Create a standard session for sending to the target
+                                             
+               //If batch size > 1 we use a transacted session since is more efficient
+               
+               boolean transacted = maxBatchSize > 1;
+               
+               targetSession = targetConn.createSession(transacted, transacted ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
+               
+               sess = targetSession;
+            }       
+         }
+         
+         if (forwardMode == FORWARD_MODE_XA)
+         {
+            if (trace) { log.trace("Starting JTA transaction"); }
+            
+            tx = startTx();
+            
+            enlistResources(tx);                  
+         }
+         
+         targetProducer = sess.createProducer(null);
+                          
+         sourceConsumer.setMessageListener(new SourceListener());
+         
+         return true;
+      }
+      catch (Exception e)
+      {
+         log.warn("Failed to set up connections", e);
+         
+         //If this fails we should attempt to cleanup or we might end up in some weird state
+         
+         cleanup();
+         
+         return false;
+      }
+   }
+   
+   private void cleanup()
+   {
+   	
+      //Close the old objects
+      try
+      {
+         sourceConn.close();
+      }
+      catch (Throwable ignore)
+      {            
+      	if (trace) { log.trace("Failed to close source connection", ignore); }
+      }
+      try
+      {
+         if (targetConn != null)
+         {
+            targetConn.close();
+         }
+      }
+      catch (Throwable ignore)
+      {    
+      	if (trace) { log.trace("Failed to close target connection", ignore); }
+      }
+
+   	
+      if (tx != null)
+      {
+         try
+         {
+            delistResources(tx);
+         }
+         catch (Throwable ignore)
+         {
+         	if (trace) { log.trace("Failed to delist resources", ignore); }
+         } 
+         try
+         {
+            //Terminate the tx
+            tx.rollback();
+         }
+         catch (Throwable ignore)
+         {
+         	if (trace) { log.trace("Failed to rollback", ignore); }
+         } 
+      }      
+   }
+   
+   private void pause(long interval)
+   {
+      long start = System.currentTimeMillis();
+      while (System.currentTimeMillis() - start < failureRetryInterval)
+      {
+         try
+         {
+            Thread.sleep(failureRetryInterval);
+         }
+         catch (InterruptedException ex)
+         {                  
+         }
+      }
+   }
+   
+   private boolean setupJMSObjectsWithRetry()
+   {
+      if (trace) { log.trace("Setting up connections"); }
+      
+      int count = 0;
+      
+      while (true)
+      {
+         boolean ok = setupJMSObjects();
+         
+         if (ok)
+         {
+            return true;
+         }
+         
+         count++;
+         
+         if (maxRetries != -1 && count == maxRetries)
+         {
+            break;
+         }
+         
+         log.warn("Failed to set up connections, will retry after a pause of " + failureRetryInterval + " ms");
+         
+         pause(failureRetryInterval);
+      }
+      
+      //If we get here then we exceed maxRetries
+      return false;      
+   }
+      
+   
+   private void sendBatch() 
+   {
+      if (trace) { log.trace("Sending batch of " + messages.size() + " messages"); }
+        
+      if (paused)
+      {
+         //Don't send now
+         if (trace) { log.trace("Paused, so not sending now"); }
+         
+         return;            
+      }
+      
+      if (forwardMode == FORWARD_MODE_LOCALTX)
+      {
+         sendBatchLocalTx();
+      }
+      else if (forwardMode == FORWARD_MODE_XA)        
+      {
+         sendBatchXA();
+      }
+      else
+      {
+         sendBatchNonTransacted();
+      }
+   }
+   
+   private void sendBatchNonTransacted()
+   {
+   	try
+      {         
+   		if (qualityOfServiceMode == QualityOfServiceMode.ONCE_AND_ONLY_ONCE)
+   		{
+   			//We client ack before sending
+   			
+            if (trace) { log.trace("Client acking source session"); }
+               			
+            ((Message)messages.getLast()).acknowledge();
+            
+            if (trace) { log.trace("Client acked source session"); }            
+   		}
+   		   		
+         sendMessages();
+         
+         if (maxBatchSize > 1)
+         {
+         	//The sending session is transacted - we need to commit it
+         	
+            if (trace) { log.trace("Committing target session"); }
+                     	
+         	targetSession.commit();
+         	
+            if (trace) { log.trace("Committed source session"); }            
+         }
+         
+         if (qualityOfServiceMode == QualityOfServiceMode.DUPLICATES_OK)
+   		{
+   			//We client ack after sending
+         	
+         	//Note we could actually use Session.DUPS_OK_ACKNOWLEDGE here
+         	//For a slightly less strong delivery guarantee
+   		
+            if (trace) { log.trace("Client acking source session"); }
+               			
+            messages.getLast().acknowledge();
+            
+            if (trace) { log.trace("Client acked source session"); }            
+   		}
+                                         
+         //Clear the messages
+         messages.clear();            
+      }
+      catch (Exception e)
+      {
+         log.warn("Failed to send + acknowledge batch, closing JMS objects", e);
+      
+         handleFailureOnSend();                                                 
+      }	
+   }
+   
+   private void sendBatchXA()
+   {
+   	try
+      {         
+         sendMessages();
+         
+         //Commit the JTA transaction and start another
+                                 
+         delistResources(tx);
+            
+         if (trace) { log.trace("Committing JTA transaction"); }
+         
+         tx.commit();
+
+         if (trace) { log.trace("Committed JTA transaction"); }
+         
+         tx = startTx();  
+         
+         enlistResources(tx);
+                  
+         //Clear the messages
+         messages.clear();            
+      }
+      catch (Exception e)
+      {
+         log.warn("Failed to send + acknowledge batch, closing JMS objects", e);
+      
+         handleFailureOnSend();                                                 
+      }
+   }
+   
+   private void sendBatchLocalTx()
+   {
+   	try
+      {         
+         sendMessages();
+                     
+         if (trace) { log.trace("Committing source session"); }
+         
+         sourceSession.commit();
+         
+         if (trace) { log.trace("Committed source session"); }     
+         
+         //Clear the messages
+         messages.clear();           
+      }
+      catch (Exception e)
+      {
+         log.warn("Failed to send + acknowledge batch, closing JMS objects", e);
+      
+         handleFailureOnSend();                                                 
+      }
+   }
+   
+   private void sendMessages() throws Exception
+   {
+      Iterator iter = messages.iterator();
+      
+      Message msg = null;
+      
+      while (iter.hasNext())
+      {
+      	msg = (Message)iter.next();
+      	
+      	if (addMessageIDInHeader)
+         {
+         	addMessageIDInHeader(msg);            	
+         }
+         
+         if (trace) { log.trace("Sending message " + msg); }
+         
+         //Make sure the correct time to live gets propagated
+         
+         long timeToLive = msg.getJMSExpiration();
+         
+   		if (timeToLive != 0)
+   		{
+   			timeToLive -=  System.currentTimeMillis();
+   			
+   			if (timeToLive <= 0)
+   			{
+   				timeToLive = 1; //Should have already expired - set to 1 so it expires when it is consumed or delivered
+   			}
+   		}
+         
+   		targetProducer.send(targetDestination, msg, msg.getJMSDeliveryMode(), msg.getJMSPriority(), timeToLive);
+   		
+         if (trace) { log.trace("Sent message " + msg); }     
+      }
+   }
+   
+   private void handleFailureOnSend()
+   {
+      handleFailure(new FailureHandler());
+   }
+   
+   private void handleFailureOnStartup()
+   {
+      handleFailure(new StartupFailureHandler());
+   }
+   
+   private void handleFailure(Runnable failureHandler)
+   {
+      failed = true;
+
+      //Failure must be handled on a separate thread to the calling thread (either onMessage or start).
+      //In the case of onMessage we can't close the connection from inside the onMessage method
+      //since it will block waiting for onMessage to complete. In the case of start we want to return
+      //from the call before the connections are reestablished so that the caller is not blocked unnecessarily.
+      Thread t = new Thread(failureHandler);
+      
+      t.start();         
+   }
+   
+   private void addMessageIDInHeader(Message msg) throws Exception
+   {
+   	//We concatenate the old message id as a header in the message
+   	//This allows the target to then use this as the JMSCorrelationID of any response message
+   	//thus enabling a distributed request-response pattern.
+   	//Each bridge (if there are more than one) in the chain can concatenate the message id
+   	//So in the case of multiple bridges having routed the message this can be used in a multi-hop
+   	//distributed request/response
+   	if (trace) { log.trace("Adding old message id in Message header"); }
+   	
+   	copyProperties(msg);
+
+   	String val = null;
+   	
+   	val = msg.getStringProperty(JBossMessage.JBOSS_MESSAGING_BRIDGE_MESSAGE_ID_LIST);
+   	
+   	if (val == null)
+   	{
+   		val = msg.getJMSMessageID();
+   	}
+   	else
+   	{
+   		StringBuffer sb = new StringBuffer(val);
+   		
+   		sb.append(",").append(msg.getJMSMessageID());
+   		
+   		val = sb.toString();
+   	}
+   	
+   	msg.setStringProperty(JBossMessage.JBOSS_MESSAGING_BRIDGE_MESSAGE_ID_LIST, val);            	   
+   }
+
+   
+   /*
+    * JMS does not let you add a property on received message without first
+    * calling clearProperties, so we need to save and re-add all the old properties so we
+    * don't lose them!!
+    */
+   private static void copyProperties(Message msg) throws JMSException
+   {   	
+   	Enumeration en = msg.getPropertyNames();
+   	
+   	Map<String, Object> oldProps = null;
+   	
+   	while (en.hasMoreElements())
+   	{
+   		String propName = (String)en.nextElement();
+   		
+   		if (oldProps == null)
+   		{
+   			oldProps = new HashMap<String, Object>();
+   		}
+   		
+   		oldProps.put(propName, msg.getObjectProperty(propName));
+   	}
+   	            	
+   	msg.clearProperties();
+   	
+   	if (oldProps != null)
+   	{
+   		Iterator oldPropsIter = oldProps.entrySet().iterator();
+   		
+   		while (oldPropsIter.hasNext())
+   		{
+   			Map.Entry entry = (Map.Entry)oldPropsIter.next();
+
+   			String propName = (String)entry.getKey();
+
+   			msg.setObjectProperty(propName, entry.getValue());   			
+   		}
+   	}
+   }
+   
+   // Inner classes ---------------------------------------------------------------
+   
+   private class FailureHandler implements Runnable
+   {
+      /**
+       * Start the source connection - note the source connection must not be started before
+       * otherwise messages will be received and ignored
+       */
+      protected void startSourceConnection()
+      {
+         try
+         {
+            sourceConn.start();
+         }
+         catch (JMSException e)
+         {
+            log.error("Failed to start source connection", e);
+         }
+      }
+
+      protected void succeeded()
+      {
+         log.info("Succeeded in reconnecting to servers");
+         
+         synchronized (lock)
+         {
+            failed = false;
+
+            startSourceConnection();
+         }
+      }
+      
+      protected void failed()
+      {
+         //We haven't managed to recreate connections or maxRetries = 0
+         log.warn("Unable to set up connections, bridge will be stopped");
+         
+         try
+         {                  
+            stop();
+         }
+         catch (Exception ignore)
+         {                  
+         }
+      }
+
+      public void run()
+      {
+      	if (trace) { log.trace("Failure handler running"); }
+      	
+         // Clear the messages
+         messages.clear();
+
+         cleanup();
+         
+         boolean ok = false;
+         
+         if (maxRetries > 0 || maxRetries == -1)
+         {
+            log.warn("Will retry after a pause of " + failureRetryInterval + " ms");
+            
+            pause(failureRetryInterval);
+            
+            //Now we try
+            ok = setupJMSObjectsWithRetry();
+         }
+         
+         if (!ok)
+         {
+            failed();
+         }
+         else
+         {
+            succeeded();
+         }    
+      }
+   }
+   
+   private class StartupFailureHandler extends FailureHandler
+   {
+      protected void failed()
+      {
+         // Don't call super
+         log.warn("Unable to set up connections, bridge will not be started");
+      }
+      
+      protected void succeeded()
+      {
+         // Don't call super - a bit ugly in this case but better than taking the lock twice.
+         log.info("Succeeded in connecting to servers");
+         
+         synchronized (lock)
+         {
+            failed = false;
+            started = true;
+            
+            //Start the source connection - note the source connection must not be started before
+            //otherwise messages will be received and ignored
+            
+            try
+            {
+               sourceConn.start();
+            }
+            catch (JMSException e)
+            {
+               log.error("Failed to start source connection", e);
+            }
+         }
+      }
+   }
+   
+   private class BatchTimeChecker implements Runnable
+   {
+      public void run()
+      {
+         if (trace) { log.trace(this + " running"); }
+         
+         synchronized (lock)
+         {
+            while (started)
+            {
+               long toWait = batchExpiryTime - System.currentTimeMillis();
+               
+               if (toWait <= 0)
+               {
+                  if (trace) { log.trace(this + " waited enough"); }
+                  
+                  synchronized (lock)
+                  {              
+                     if (!failed && !messages.isEmpty())
+                     {
+                        if (trace) { log.trace(this + " got some messages so sending batch"); }
+                        
+                        sendBatch();                     
+                        
+                        if (trace) { log.trace(this + " sent batch"); }
+                     }
+                  }
+                  
+                  batchExpiryTime = System.currentTimeMillis() + maxBatchTime;
+               }
+               else
+               {                    
+                  try
+                  {
+                     if (trace) { log.trace(this + " waiting for " + toWait); }
+                     
+                     lock.wait(toWait);
+                     
+                     if (trace) { log.trace(this + " woke up"); }
+                  }
+                  catch (InterruptedException e)
+                  {
+                     //Ignore
+                     if (trace) { log.trace(this + " thread was interrupted"); }
+                  }
+                  
+               }
+            }        
+         }
+      }      
+   }  
+   
+   private class SourceListener implements MessageListener
+   {
+      public void onMessage(Message msg)
+      {
+         synchronized (lock)
+         {
+            if (failed)
+            {
+               //Ignore the message
+               if (trace) { log.trace("Bridge has failed so ignoring message"); }
+                              
+               return;
+            }
+            
+            if (trace) { log.trace(this + " received message " + msg); }
+            
+            messages.add(msg);
+            
+            batchExpiryTime = System.currentTimeMillis() + maxBatchTime;            
+            
+            if (trace) { log.trace(this + " rescheduled batchExpiryTime to " + batchExpiryTime); }
+            
+            if (maxBatchSize != -1 && messages.size() >= maxBatchSize)
+            {
+               if (trace) { log.trace(this + " maxBatchSize has been reached so sending batch"); }
+               
+               sendBatch();
+               
+               if (trace) { log.trace(this + " sent batch"); }
+            }                        
+         }
+      }      
+   }   
+   
+   private class BridgeExceptionListener implements ExceptionListener
+   {
+		public void onException(JMSException e)
+		{
+			log.warn("Detected failure on connection", e);
+			
+			synchronized (lock)
+			{
+				if (failed)
+				{
+					//The failure has already been detected and is being handled
+					if (trace) { log.trace("Failure recovery already in progress"); }
+				}
+				else
+			   {				
+					handleFailure(new FailureHandler());
+			   }
+			}
+		}   	
+   }   
+}

Copied: trunk/src/main/org/jboss/messaging/jms/bridge/impl/JNDIConnectionFactoryFactory.java (from rev 4566, trunk/src/main/org/jboss/messaging/jms/bridge/JNDIConnectionFactoryFactory.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/bridge/impl/JNDIConnectionFactoryFactory.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/jms/bridge/impl/JNDIConnectionFactoryFactory.java	2008-06-25 13:00:39 UTC (rev 4576)
@@ -0,0 +1,52 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */ 
+
+package org.jboss.messaging.jms.bridge.impl;
+
+import java.util.Hashtable;
+
+import javax.jms.ConnectionFactory;
+
+import org.jboss.messaging.jms.bridge.ConnectionFactoryFactory;
+
+/**
+ * A JNDIConnectionFactoryFactory
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision:4566 $</tt>
+ *
+ * $Id:JNDIConnectionFactoryFactory.java 4566 2008-06-24 08:01:35Z jmesnil $
+ *
+ */
+public class JNDIConnectionFactoryFactory extends JNDIFactorySupport implements ConnectionFactoryFactory
+{
+	public JNDIConnectionFactoryFactory(Hashtable jndiProperties, String lookup)
+   {
+      super(jndiProperties, lookup);      
+   }
+
+   public ConnectionFactory createConnectionFactory() throws Exception
+   {
+   	return (ConnectionFactory)createObject();   	
+   }
+
+}

Copied: trunk/src/main/org/jboss/messaging/jms/bridge/impl/JNDIDestinationFactory.java (from rev 4566, trunk/src/main/org/jboss/messaging/jms/bridge/JNDIDestinationFactory.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/bridge/impl/JNDIDestinationFactory.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/jms/bridge/impl/JNDIDestinationFactory.java	2008-06-25 13:00:39 UTC (rev 4576)
@@ -0,0 +1,50 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */ 
+
+package org.jboss.messaging.jms.bridge.impl;
+
+import java.util.Hashtable;
+
+import javax.jms.Destination;
+
+import org.jboss.messaging.jms.bridge.DestinationFactory;
+
+/**
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: $</tt>10 Oct 2007
+ *
+ * $Id: $
+ *
+ */
+public class JNDIDestinationFactory extends JNDIFactorySupport implements DestinationFactory
+{
+   public JNDIDestinationFactory(Hashtable jndiProperties, String lookup)
+   {
+      super(jndiProperties, lookup);      
+   }
+
+   public Destination createDestination() throws Exception
+   {
+   	return (Destination)createObject();   	
+   }
+}

Copied: trunk/src/main/org/jboss/messaging/jms/bridge/impl/JNDIFactorySupport.java (from rev 4566, trunk/src/main/org/jboss/messaging/jms/bridge/JNDIFactorySupport.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/bridge/impl/JNDIFactorySupport.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/jms/bridge/impl/JNDIFactorySupport.java	2008-06-25 13:00:39 UTC (rev 4576)
@@ -0,0 +1,78 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */ 
+
+package org.jboss.messaging.jms.bridge.impl;
+
+import java.util.Hashtable;
+
+import javax.naming.InitialContext;
+
+/**
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: $</tt>10 Oct 2007
+ *
+ * $Id: $
+ *
+ */
+public abstract class JNDIFactorySupport
+{
+	protected Hashtable jndiProperties;
+   
+   protected String lookup;
+   
+   protected JNDIFactorySupport(Hashtable jndiProperties, String lookup)
+   {
+      this.jndiProperties = jndiProperties;
+      
+      this.lookup = lookup;       
+   }
+
+   protected Object createObject() throws Exception
+   {
+      InitialContext ic = null;
+      
+      Object obj = null;
+      
+      try
+      {
+         if (jndiProperties == null)
+         {
+            ic = new InitialContext();
+         }
+         else
+         {
+            ic = new InitialContext(jndiProperties);
+         }
+         
+         obj = ic.lookup(lookup);         
+      }
+      finally
+      {
+         if (ic != null)
+         {
+            ic.close();
+         }
+      }
+      return obj;      
+   }
+}

Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/bridge/BridgeMBeanTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/bridge/BridgeMBeanTest.java	2008-06-25 12:28:42 UTC (rev 4575)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/bridge/BridgeMBeanTest.java	2008-06-25 13:00:39 UTC (rev 4576)
@@ -37,6 +37,7 @@
 
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.jms.bridge.Bridge;
+import org.jboss.messaging.jms.bridge.QualityOfServiceMode;
 import org.jboss.test.messaging.tools.ServerManagement;
 
 /**
@@ -103,7 +104,7 @@
 	      on = deployBridge(0, "Bridge1", sourceProviderLoader, targetProviderLoader,
 	                                   "/queue/sourceQueue", "/queue/targetQueue",
 	                                   null, null, null, null,
-	                                   Bridge.QOS_AT_MOST_ONCE, null, 1,
+	                                   QualityOfServiceMode.AT_MOST_ONCE, null, 1,
 	                                   -1, null, null, 5000, -1, false);
 	      log.info("Deployed bridge");
 	      
@@ -267,7 +268,7 @@
          on = deployBridge(0, "Bridge2", sourceProviderLoader, targetProviderLoader,
                            "/queue/sourceQueue", "/queue/targetQueue",
                            null, null, null, null,
-                           Bridge.QOS_ONCE_AND_ONLY_ONCE, null, 1,
+                           QualityOfServiceMode.ONCE_AND_ONLY_ONCE, null, 1,
                            -1, null, null, 5000, -1, false);
          
          log.trace("Constructed bridge");
@@ -351,11 +352,11 @@
          
          {
             Integer qos = (Integer)ServerManagement.getAttribute(on, "QualityOfServiceMode");
-            assertEquals(Bridge.QOS_ONCE_AND_ONLY_ONCE, qos.intValue());
-            ServerManagement.setAttribute(on, "QualityOfServiceMode", String.valueOf(Bridge.QOS_AT_MOST_ONCE));
+            assertEquals(QualityOfServiceMode.ONCE_AND_ONLY_ONCE.intValue(), qos.intValue());
+            ServerManagement.setAttribute(on, "QualityOfServiceMode", String.valueOf(QualityOfServiceMode.AT_MOST_ONCE.intValue()));
             qos = (Integer)ServerManagement.getAttribute(on, "QualityOfServiceMode");
-            assertEquals(new Integer(Bridge.QOS_AT_MOST_ONCE), qos);
-            ServerManagement.setAttribute(on, "QualityOfServiceMode", String.valueOf(Bridge.QOS_ONCE_AND_ONLY_ONCE));
+            assertEquals(new Integer(QualityOfServiceMode.AT_MOST_ONCE.intValue()), qos);
+            ServerManagement.setAttribute(on, "QualityOfServiceMode", String.valueOf(QualityOfServiceMode.ONCE_AND_ONLY_ONCE.intValue()));
          }
          
          {
@@ -492,10 +493,10 @@
          
          {
             Integer qos = (Integer)ServerManagement.getAttribute(on, "QualityOfServiceMode");
-            assertEquals(Bridge.QOS_ONCE_AND_ONLY_ONCE, qos.intValue());
-            ServerManagement.setAttribute(on, "QualityOfServiceMode", String.valueOf(Bridge.QOS_AT_MOST_ONCE));
+            assertEquals(QualityOfServiceMode.ONCE_AND_ONLY_ONCE.intValue(), qos.intValue());
+            ServerManagement.setAttribute(on, "QualityOfServiceMode", String.valueOf(QualityOfServiceMode.AT_MOST_ONCE.intValue()));
             qos = (Integer)ServerManagement.getAttribute(on, "QualityOfServiceMode");
-            assertEquals(new Integer(Bridge.QOS_ONCE_AND_ONLY_ONCE), qos);
+            assertEquals(new Integer(QualityOfServiceMode.ONCE_AND_ONLY_ONCE.intValue()), qos);
          }
          
          {
@@ -593,7 +594,7 @@
             String sourceDestLookup, String targetDestLookup,
             String sourceUsername, String sourcePassword,
             String targetUsername, String targetPassword,
-            int qos, String selector, int maxBatchSize,
+            QualityOfServiceMode qos, String selector, int maxBatchSize,
             long maxBatchTime, String subName, String clientID,
             long failureRetryInterval, int maxRetries, boolean addMessageIDInHeader) throws Exception
    {
@@ -621,7 +622,7 @@
       {
          config += "<attribute name=\"TargetPassword\">" + targetPassword + "</attribute>";
       }
-      config += "<attribute name=\"QualityOfServiceMode\">" + qos +"</attribute>";
+      config += "<attribute name=\"QualityOfServiceMode\">" + qos.intValue() +"</attribute>";
       if (selector != null)
       {
          config += "<attribute name=\"Selector\">" + selector + "</attribute>";

Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/bridge/BridgeTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/bridge/BridgeTest.java	2008-06-25 12:28:42 UTC (rev 4575)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/bridge/BridgeTest.java	2008-06-25 13:00:39 UTC (rev 4576)
@@ -37,6 +37,8 @@
 
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.jms.bridge.Bridge;
+import org.jboss.messaging.jms.bridge.QualityOfServiceMode;
+import org.jboss.messaging.jms.bridge.impl.BridgeImpl;
 import org.jboss.messaging.jms.client.JBossMessage;
 import org.jboss.tm.TransactionManagerLocator;
 
@@ -62,32 +64,32 @@
    
    public void testNoMaxBatchTime_AtMostOnce_P() throws Exception
    {
-      testNoMaxBatchTime(Bridge.QOS_AT_MOST_ONCE, true);
+      testNoMaxBatchTime(QualityOfServiceMode.AT_MOST_ONCE, true);
    }
    
    public void testNoMaxBatchTime_DuplicatesOk_P() throws Exception
    {
-      testNoMaxBatchTime(Bridge.QOS_DUPLICATES_OK, true);
+      testNoMaxBatchTime(QualityOfServiceMode.DUPLICATES_OK, true);
    }
    
    public void testNoMaxBatchTime_OnceAndOnlyOnce_P() throws Exception
    {
-      testNoMaxBatchTime(Bridge.QOS_ONCE_AND_ONLY_ONCE, true);
+      testNoMaxBatchTime(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, true);
    }
    
    public void testNoMaxBatchTime_AtMostOnce_NP() throws Exception
    {
-      testNoMaxBatchTime(Bridge.QOS_AT_MOST_ONCE, false);
+      testNoMaxBatchTime(QualityOfServiceMode.AT_MOST_ONCE, false);
    }
    
    public void testNoMaxBatchTime_DuplicatesOk_NP() throws Exception
    {
-      testNoMaxBatchTime(Bridge.QOS_DUPLICATES_OK, false);
+      testNoMaxBatchTime(QualityOfServiceMode.DUPLICATES_OK, false);
    }
    
    public void testNoMaxBatchTime_OnceAndOnlyOnce_NP() throws Exception
    {
-      testNoMaxBatchTime(Bridge.QOS_ONCE_AND_ONLY_ONCE, false);
+      testNoMaxBatchTime(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, false);
    }
    
    //Same server
@@ -96,32 +98,32 @@
    
    public void testNoMaxBatchTimeSameServer_AtMostOnce_P() throws Exception
    {
-      testNoMaxBatchTimeSameServer(Bridge.QOS_AT_MOST_ONCE, true);
+      testNoMaxBatchTimeSameServer(QualityOfServiceMode.AT_MOST_ONCE, true);
    }
    
    public void testNoMaxBatchTimeSameServer_DuplicatesOk_P() throws Exception
    {
-      testNoMaxBatchTimeSameServer(Bridge.QOS_DUPLICATES_OK, true);
+      testNoMaxBatchTimeSameServer(QualityOfServiceMode.DUPLICATES_OK, true);
    }
    
    public void testNoMaxBatchTimeSameServer_OnceAndOnlyOnce_P() throws Exception
    {
-      testNoMaxBatchTimeSameServer(Bridge.QOS_ONCE_AND_ONLY_ONCE, true);
+      testNoMaxBatchTimeSameServer(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, true);
    }
    
    public void testNoMaxBatchTimeSameServer_AtMostOnce_NP() throws Exception
    {
-      testNoMaxBatchTimeSameServer(Bridge.QOS_AT_MOST_ONCE, false);
+      testNoMaxBatchTimeSameServer(QualityOfServiceMode.AT_MOST_ONCE, false);
    }
    
    public void testNoMaxBatchTimeSameServer_DuplicatesOk_NP() throws Exception
    {
-      testNoMaxBatchTimeSameServer(Bridge.QOS_DUPLICATES_OK, false);
+      testNoMaxBatchTimeSameServer(QualityOfServiceMode.DUPLICATES_OK, false);
    }
    
    public void testNoMaxBatchTimeSameServer_OnceAndOnlyOnce_NP() throws Exception
    {
-      testNoMaxBatchTimeSameServer(Bridge.QOS_ONCE_AND_ONLY_ONCE, false);
+      testNoMaxBatchTimeSameServer(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, false);
    }
    
    
@@ -129,32 +131,32 @@
    
    public void testMaxBatchTime_AtMostOnce_P() throws Exception
    {
-      this.testMaxBatchTime(Bridge.QOS_AT_MOST_ONCE, true);
+      this.testMaxBatchTime(QualityOfServiceMode.AT_MOST_ONCE, true);
    }
    
    public void testMaxBatchTime_DuplicatesOk_P() throws Exception
    {
-      this.testMaxBatchTime(Bridge.QOS_DUPLICATES_OK, true);
+      this.testMaxBatchTime(QualityOfServiceMode.DUPLICATES_OK, true);
    }
    
    public void testMaxBatchTime_OnceAndOnlyOnce_P() throws Exception
    {
-      testMaxBatchTime(Bridge.QOS_ONCE_AND_ONLY_ONCE, true);
+      testMaxBatchTime(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, true);
    }
    
    public void testMaxBatchTime_AtMostOnce_NP() throws Exception
    {
-      this.testMaxBatchTime(Bridge.QOS_AT_MOST_ONCE, false);
+      this.testMaxBatchTime(QualityOfServiceMode.AT_MOST_ONCE, false);
    }
    
    public void testMaxBatchTime_DuplicatesOk_NP() throws Exception
    {
-      this.testMaxBatchTime(Bridge.QOS_DUPLICATES_OK, false);
+      this.testMaxBatchTime(QualityOfServiceMode.DUPLICATES_OK, false);
    }
    
    public void testMaxBatchTime_OnceAndOnlyOnce_NP() throws Exception
    {
-      testMaxBatchTime(Bridge.QOS_ONCE_AND_ONLY_ONCE, false);
+      testMaxBatchTime(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, false);
    }
     
    // Same server
@@ -163,108 +165,108 @@
    
    public void testMaxBatchTimeSameServer_AtMostOnce_P() throws Exception
    {
-      this.testMaxBatchTimeSameServer(Bridge.QOS_AT_MOST_ONCE, true);
+      this.testMaxBatchTimeSameServer(QualityOfServiceMode.AT_MOST_ONCE, true);
    }
    
    public void testMaxBatchTimeSameServer_DuplicatesOk_P() throws Exception
    {
-      this.testMaxBatchTimeSameServer(Bridge.QOS_DUPLICATES_OK, true);
+      this.testMaxBatchTimeSameServer(QualityOfServiceMode.DUPLICATES_OK, true);
    }
    
    public void testMaxBatchTimeSameServer_OnceAndOnlyOnce_P() throws Exception
    {
-      testMaxBatchTimeSameServer(Bridge.QOS_ONCE_AND_ONLY_ONCE, true);
+      testMaxBatchTimeSameServer(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, true);
    }
    
    public void testMaxBatchTimeSameServer_AtMostOnce_NP() throws Exception
    {
-      this.testMaxBatchTimeSameServer(Bridge.QOS_AT_MOST_ONCE, false);
+      this.testMaxBatchTimeSameServer(QualityOfServiceMode.AT_MOST_ONCE, false);
    }
    
    public void testMaxBatchTimeSameServer_DuplicatesOk_NP() throws Exception
    {
-      this.testMaxBatchTimeSameServer(Bridge.QOS_DUPLICATES_OK, false);
+      this.testMaxBatchTimeSameServer(QualityOfServiceMode.DUPLICATES_OK, false);
    }
    
    public void testMaxBatchTimeSameServer_OnceAndOnlyOnce_NP() throws Exception
    {
-      testMaxBatchTimeSameServer(Bridge.QOS_ONCE_AND_ONLY_ONCE, false);
+      testMaxBatchTimeSameServer(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, false);
    }
    
    // Stress with batch size of 50
    
    public void testStress_AtMostOnce_P_50() throws Exception
    {
-      testStress(Bridge.QOS_AT_MOST_ONCE, true, 50);
+      testStress(QualityOfServiceMode.AT_MOST_ONCE, true, 50);
    }
    
    public void testStress_DuplicatesOk_P_50() throws Exception
    {
-      testStress(Bridge.QOS_DUPLICATES_OK, true, 50);
+      testStress(QualityOfServiceMode.DUPLICATES_OK, true, 50);
    }
    
    public void testStress_OnceAndOnlyOnce_P_50() throws Exception
    {
-      testStress(Bridge.QOS_ONCE_AND_ONLY_ONCE, true, 50);
+      testStress(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, true, 50);
    }
    
    public void testStress_AtMostOnce_NP_50() throws Exception
    {
-      testStress(Bridge.QOS_AT_MOST_ONCE, false, 50);
+      testStress(QualityOfServiceMode.AT_MOST_ONCE, false, 50);
    }
    
    public void testStress_DuplicatesOk_NP_50() throws Exception
    {
-      testStress(Bridge.QOS_DUPLICATES_OK, false, 50);
+      testStress(QualityOfServiceMode.DUPLICATES_OK, false, 50);
    }
    
    public void testStress_OnceAndOnlyOnce_NP_50() throws Exception
    {
-      testStress(Bridge.QOS_ONCE_AND_ONLY_ONCE, false, 50);
+      testStress(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, false, 50);
    }
    
    // Stress with batch size of 1
    
    public void testStress_AtMostOnce_P_1() throws Exception
    {
-      testStress(Bridge.QOS_AT_MOST_ONCE, true, 1);
+      testStress(QualityOfServiceMode.AT_MOST_ONCE, true, 1);
    }
    
    public void testStress_DuplicatesOk_P_1() throws Exception
    {
-      testStress(Bridge.QOS_DUPLICATES_OK, true, 1);
+      testStress(QualityOfServiceMode.DUPLICATES_OK, true, 1);
    }
    
    public void testStress_OnceAndOnlyOnce_P_1() throws Exception
    {
-      testStress(Bridge.QOS_ONCE_AND_ONLY_ONCE, true, 1);
+      testStress(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, true, 1);
    }
    
    public void testStress_AtMostOnce_NP_1() throws Exception
    {
-      testStress(Bridge.QOS_AT_MOST_ONCE, false, 1);
+      testStress(QualityOfServiceMode.AT_MOST_ONCE, false, 1);
    }
    
    public void testStress_DuplicatesOk_NP_1() throws Exception
    {
-      testStress(Bridge.QOS_DUPLICATES_OK, false, 1);
+      testStress(QualityOfServiceMode.DUPLICATES_OK, false, 1);
    }
    
    public void testStress_OnceAndOnlyOnce_NP_1() throws Exception
    {
-      testStress(Bridge.QOS_ONCE_AND_ONLY_ONCE, false, 1);
+      testStress(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, false, 1);
    }
    
    // Max batch time
    
    public void testStressMaxBatchTime_OnceAndOnlyOnce_NP() throws Exception
    {
-   	this.testStressBatchTime(Bridge.QOS_ONCE_AND_ONLY_ONCE, false, 200);
+   	this.testStressBatchTime(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, false, 200);
    }
    
    public void testStressMaxBatchTime_OnceAndOnlyOnce_P() throws Exception
    {
-   	this.testStressBatchTime(Bridge.QOS_ONCE_AND_ONLY_ONCE, true, 200);
+   	this.testStressBatchTime(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, true, 200);
    }
    
    
@@ -274,73 +276,73 @@
    
    public void testStressSameServer_AtMostOnce_P_50() throws Exception
    {
-      testStressSameServer(Bridge.QOS_AT_MOST_ONCE, true, 50);
+      testStressSameServer(QualityOfServiceMode.AT_MOST_ONCE, true, 50);
    }
    
    public void testStressSameServer_DuplicatesOk_P_50() throws Exception
    {
-      testStressSameServer(Bridge.QOS_DUPLICATES_OK, true, 50);
+      testStressSameServer(QualityOfServiceMode.DUPLICATES_OK, true, 50);
    }
    
    public void testStressSameServer_OnceAndOnlyOnce_P_50() throws Exception
    {
-      testStress(Bridge.QOS_ONCE_AND_ONLY_ONCE, true, 50);
+      testStress(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, true, 50);
    }
    
    public void testStressSameServer_AtMostOnce_NP_50() throws Exception
    {
-      testStressSameServer(Bridge.QOS_AT_MOST_ONCE, false, 50);
+      testStressSameServer(QualityOfServiceMode.AT_MOST_ONCE, false, 50);
    }
    
    public void testStressSameServer_DuplicatesOk_NP_50() throws Exception
    {
-      testStressSameServer(Bridge.QOS_DUPLICATES_OK, false, 50);
+      testStressSameServer(QualityOfServiceMode.DUPLICATES_OK, false, 50);
    }
    
    public void testStressSameServer_OnceAndOnlyOnce_NP_50() throws Exception
    {
-      testStressSameServer(Bridge.QOS_ONCE_AND_ONLY_ONCE, false, 50);
+      testStressSameServer(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, false, 50);
    }
    
    // Stress with batch size of 1
    
    public void testStressSameServer_AtMostOnce_P_1() throws Exception
    {
-      testStressSameServer(Bridge.QOS_AT_MOST_ONCE, true, 1);
+      testStressSameServer(QualityOfServiceMode.AT_MOST_ONCE, true, 1);
    }
    
    public void testStressSameServer_DuplicatesOk_P_1() throws Exception
    {
-      testStressSameServer(Bridge.QOS_DUPLICATES_OK, true, 1);
+      testStressSameServer(QualityOfServiceMode.DUPLICATES_OK, true, 1);
    }
    
    public void testStressSameServer_OnceAndOnlyOnce_P_1() throws Exception
    {
-      testStressSameServer(Bridge.QOS_ONCE_AND_ONLY_ONCE, true, 1);
+      testStressSameServer(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, true, 1);
    }
    
    public void testStressSameServer_AtMostOnce_NP_1() throws Exception
    {
-      testStressSameServer(Bridge.QOS_AT_MOST_ONCE, false, 1);
+      testStressSameServer(QualityOfServiceMode.AT_MOST_ONCE, false, 1);
    }
    
    public void testStressSameServer_DuplicatesOk_NP_1() throws Exception
    {
-      testStressSameServer(Bridge.QOS_DUPLICATES_OK, false, 1);
+      testStressSameServer(QualityOfServiceMode.DUPLICATES_OK, false, 1);
    }
    
    public void testStressSameServer_OnceAndOnlyOnce_NP_1() throws Exception
    {
-      testStressSameServer(Bridge.QOS_ONCE_AND_ONLY_ONCE, false, 1);
+      testStressSameServer(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, false, 1);
    }
    
    public void testParams() throws Exception
    {
-      Bridge bridge = null;
+      BridgeImpl bridge = null;
       
       try
       {               
-         int qosMode = Bridge.QOS_AT_MOST_ONCE;
+         QualityOfServiceMode qosMode = QualityOfServiceMode.AT_MOST_ONCE;
          
          int batchSize = 10;
          
@@ -366,7 +368,7 @@
          
          try
          {
-            bridge= new Bridge(null, cff1, sourceQueueFactory, targetQueueFactory,
+            bridge= new BridgeImpl(null, cff1, sourceQueueFactory, targetQueueFactory,
                                sourceUsername, sourcePassword, destUsername, destPassword,
                                selector, failureRetryInterval, maxRetries, qosMode,
                                batchSize, maxBatchTime,
@@ -379,7 +381,7 @@
          
          try
          {
-            bridge= new Bridge(cff0, null, sourceQueueFactory, targetQueueFactory,
+            bridge= new BridgeImpl(cff0, null, sourceQueueFactory, targetQueueFactory,
                                sourceUsername, sourcePassword, destUsername, destPassword,
                                selector, failureRetryInterval, maxRetries, qosMode,
                                batchSize, maxBatchTime,
@@ -392,7 +394,7 @@
          
          try
          {
-            bridge= new Bridge(cff0, cff1, null, targetQueueFactory,
+            bridge= new BridgeImpl(cff0, cff1, null, targetQueueFactory,
                                sourceUsername, sourcePassword, destUsername, destPassword,
                                selector, failureRetryInterval, maxRetries, qosMode,
                                batchSize, maxBatchTime,
@@ -405,7 +407,7 @@
          
          try
          {
-            bridge= new Bridge(cff0, cff1, sourceQueueFactory, null,
+            bridge= new BridgeImpl(cff0, cff1, sourceQueueFactory, null,
                                sourceUsername, sourcePassword, destUsername, destPassword,
                                selector, failureRetryInterval, maxRetries, qosMode,
                                batchSize, maxBatchTime,
@@ -418,7 +420,7 @@
          
          try
          {
-            bridge= new Bridge(cff0, cff1, sourceQueueFactory, targetQueueFactory,
+            bridge= new BridgeImpl(cff0, cff1, sourceQueueFactory, targetQueueFactory,
                                sourceUsername, sourcePassword, destUsername, destPassword,
                                selector, -2, maxRetries, qosMode,
                                batchSize, maxBatchTime,
@@ -431,7 +433,7 @@
          
          try
          {
-            bridge= new Bridge(cff0, cff1, sourceQueueFactory, targetQueueFactory,
+            bridge= new BridgeImpl(cff0, cff1, sourceQueueFactory, targetQueueFactory,
                                sourceUsername, sourcePassword, destUsername, destPassword,
                                selector, -1, 10, qosMode,
                                batchSize, maxBatchTime,
@@ -444,35 +446,9 @@
          
          try
          {
-            bridge= new Bridge(cff0, cff1, sourceQueueFactory, null,
+            bridge= new BridgeImpl(cff0, cff1, sourceQueueFactory, null,
                                sourceUsername, sourcePassword, destUsername, destPassword,
-                               selector, failureRetryInterval, maxRetries, -2,
-                               batchSize, maxBatchTime,
-                               subName, clientID, false);
-         }
-         catch (IllegalArgumentException e)
-         {
-            //Ok
-         }
-         
-         try
-         {
-            bridge= new Bridge(cff0, cff1, sourceQueueFactory, null,
-                               sourceUsername, sourcePassword, destUsername, destPassword,
-                               selector, failureRetryInterval, maxRetries, 3,
-                               batchSize, maxBatchTime,
-                               subName, clientID, false);
-         }
-         catch (IllegalArgumentException e)
-         {
-            //Ok
-         }
-         
-         try
-         {
-            bridge= new Bridge(cff0, cff1, sourceQueueFactory, null,
-                               sourceUsername, sourcePassword, destUsername, destPassword,
-                               selector, failureRetryInterval, maxRetries, 3,
+                               selector, failureRetryInterval, maxRetries, qosMode,
                                0, maxBatchTime,
                                subName, clientID, false);
          }
@@ -483,9 +459,9 @@
          
          try
          {
-            bridge= new Bridge(cff0, cff1, sourceQueueFactory, null,
+            bridge= new BridgeImpl(cff0, cff1, sourceQueueFactory, null,
                                sourceUsername, sourcePassword, destUsername, destPassword,
-                               selector, failureRetryInterval, maxRetries, 3,
+                               selector, failureRetryInterval, maxRetries, qosMode,
                                batchSize, -2,
                                subName, clientID, false);
          }
@@ -505,7 +481,7 @@
    
    public void testSelector() throws Exception
    {      
-      Bridge bridge = null;
+      BridgeImpl bridge = null;
       
       Connection connSource = null;
       
@@ -517,9 +493,9 @@
          
          String selector = "vegetable='radish'";
          
-         bridge = new Bridge(cff0, cff1, sourceQueueFactory, targetQueueFactory,
+         bridge = new BridgeImpl(cff0, cff1, sourceQueueFactory, targetQueueFactory,
                   null, null, null, null,
-                  selector, 5000, 10, Bridge.QOS_AT_MOST_ONCE,
+                  selector, 5000, 10, QualityOfServiceMode.AT_MOST_ONCE,
                   1, -1,
                   null, null, false);
          
@@ -592,7 +568,7 @@
    
    public void testStartBridgeWithJTATransactionAlreadyRunning() throws Exception
    {  
-      Bridge bridge = null;
+      BridgeImpl bridge = null;
       
       Transaction toResume = null;
       
@@ -611,9 +587,9 @@
            
          final int NUM_MESSAGES = 10;
          
-         bridge = new Bridge(cff0, cff1, sourceTopicFactory, targetQueueFactory,
+         bridge = new BridgeImpl(cff0, cff1, sourceTopicFactory, targetQueueFactory,
                   null, null, null, null,
-                  null, 5000, 10, Bridge.QOS_AT_MOST_ONCE,
+                  null, 5000, 10, QualityOfServiceMode.AT_MOST_ONCE,
                   1, -1,
                   null, null, false);
          
@@ -657,15 +633,15 @@
    
    public void testNonDurableSubscriber() throws Exception
    { 
-      Bridge bridge = null;
+      BridgeImpl bridge = null;
             
       try
       {   
          final int NUM_MESSAGES = 10;
          
-         bridge = new Bridge(cff0, cff1, sourceTopicFactory, targetQueueFactory,
+         bridge = new BridgeImpl(cff0, cff1, sourceTopicFactory, targetQueueFactory,
                   null, null, null, null,
-                  null, 5000, 10, Bridge.QOS_AT_MOST_ONCE,
+                  null, 5000, 10, QualityOfServiceMode.AT_MOST_ONCE,
                   1, -1,
                   null, null, false);
          
@@ -686,15 +662,15 @@
    
    public void testDurableSubscriber() throws Exception
    {
-      Bridge bridge = null;
+      BridgeImpl bridge = null;
             
       try
       {
          final int NUM_MESSAGES = 10;
          
-         bridge = new Bridge(cff0, cff1, sourceTopicFactory, targetQueueFactory,
+         bridge = new BridgeImpl(cff0, cff1, sourceTopicFactory, targetQueueFactory,
                   null, null, null, null,
-                  null, 5000, 10, Bridge.QOS_AT_MOST_ONCE,
+                  null, 5000, 10, QualityOfServiceMode.AT_MOST_ONCE,
                   1, -1,
                   "subTest", "clientid123", false);
          
@@ -732,7 +708,7 @@
    
    private void messageIDInHeader(boolean on) throws Exception
    { 
-      Bridge bridge = null;
+      BridgeImpl bridge = null;
       
       Connection connSource = null;
       
@@ -742,9 +718,9 @@
       {
          final int NUM_MESSAGES = 10;
          
-         bridge = new Bridge(cff0, cff1, sourceQueueFactory, targetQueueFactory,
+         bridge = new BridgeImpl(cff0, cff1, sourceQueueFactory, targetQueueFactory,
                   null, null, null, null,
-                  null, 5000, 10, Bridge.QOS_AT_MOST_ONCE,
+                  null, 5000, 10, QualityOfServiceMode.AT_MOST_ONCE,
                   1, -1,
                   null, null, on);
          
@@ -903,7 +879,7 @@
    
    private void propertiesPreserved(boolean persistent, boolean messageIDInHeader) throws Exception
    { 
-      Bridge bridge = null;
+      BridgeImpl bridge = null;
       
       Connection connSource = null;
       
@@ -913,9 +889,9 @@
       {
          final int NUM_MESSAGES = 10;
          
-         bridge = new Bridge(cff0, cff1, sourceQueueFactory, targetQueueFactory,
+         bridge = new BridgeImpl(cff0, cff1, sourceQueueFactory, targetQueueFactory,
                   null, null, null, null,
-                  null, 5000, 10, Bridge.QOS_AT_MOST_ONCE,
+                  null, 5000, 10, QualityOfServiceMode.AT_MOST_ONCE,
                   1, -1,
                   null, null, messageIDInHeader);
          
@@ -1064,7 +1040,7 @@
    
    public void testNoMessageIDInHeader() throws Exception
    { 
-      Bridge bridge = null;
+      BridgeImpl bridge = null;
       
       Connection connSource = null;
       
@@ -1074,9 +1050,9 @@
       {
          final int NUM_MESSAGES = 10;
          
-         bridge = new Bridge(cff0, cff1, sourceQueueFactory, targetQueueFactory,
+         bridge = new BridgeImpl(cff0, cff1, sourceQueueFactory, targetQueueFactory,
                   null, null, null, null,
-                  null, 5000, 10, Bridge.QOS_AT_MOST_ONCE,
+                  null, 5000, 10, QualityOfServiceMode.AT_MOST_ONCE,
                   1, -1,
                   null, null, false);
          
@@ -1151,17 +1127,17 @@
    
    // Private -------------------------------------------------------------------------------
              
-   private void testStress(int qosMode, boolean persistent, int batchSize) throws Exception
+   private void testStress(QualityOfServiceMode qosMode, boolean persistent, int batchSize) throws Exception
    {
       Connection connSource = null;
       
-      Bridge bridge = null;
+      BridgeImpl bridge = null;
       
       Thread t = null;
             
       try
       {      
-         bridge = new Bridge(cff0, cff1, sourceQueueFactory, targetQueueFactory,
+         bridge = new BridgeImpl(cff0, cff1, sourceQueueFactory, targetQueueFactory,
                   null, null, null, null,
                   null, 5000, 10, qosMode,
                   batchSize, -1,
@@ -1224,17 +1200,17 @@
       }      
    }
    
-   private void testStressBatchTime(int qosMode, boolean persistent, int maxBatchTime) throws Exception
+   private void testStressBatchTime(QualityOfServiceMode qosMode, boolean persistent, int maxBatchTime) throws Exception
    {
       Connection connSource = null;
       
-      Bridge bridge = null;
+      BridgeImpl bridge = null;
       
       Thread t = null;
             
       try
       {      
-         bridge = new Bridge(cff0, cff1, sourceQueueFactory, targetQueueFactory,
+         bridge = new BridgeImpl(cff0, cff1, sourceQueueFactory, targetQueueFactory,
                   null, null, null, null,
                   null, 5000, 10, qosMode,
                   2, maxBatchTime,
@@ -1298,17 +1274,17 @@
    }
    
    //Both source and destination on same rm
-   private void testStressSameServer(int qosMode, boolean persistent, int batchSize) throws Exception
+   private void testStressSameServer(QualityOfServiceMode qosMode, boolean persistent, int batchSize) throws Exception
    {
       Connection connSource = null;
       
-      Bridge bridge = null;
+      BridgeImpl bridge = null;
       
       Thread t = null;
             
       try
       {  
-         bridge = new Bridge(cff0, cff0, sourceQueueFactory, localTargetQueueFactory,
+         bridge = new BridgeImpl(cff0, cff0, sourceQueueFactory, localTargetQueueFactory,
                   null, null, null, null,
                   null, 5000, 10, qosMode,
                   batchSize, -1,
@@ -1372,15 +1348,15 @@
    }
    
       
-   private void testNoMaxBatchTime(int qosMode, boolean persistent) throws Exception
+   private void testNoMaxBatchTime(QualityOfServiceMode qosMode, boolean persistent) throws Exception
    {
-      Bridge bridge = null;
+      BridgeImpl bridge = null;
             
       try
       {
          final int NUM_MESSAGES = 10;
          
-         bridge = new Bridge(cff0, cff1, sourceQueueFactory, targetQueueFactory,
+         bridge = new BridgeImpl(cff0, cff1, sourceQueueFactory, targetQueueFactory,
                   null, null, null, null,
                   null, 5000, 10, qosMode,
                   NUM_MESSAGES, -1,
@@ -1430,15 +1406,15 @@
       }                  
    }
    
-   private void testNoMaxBatchTimeSameServer(int qosMode, boolean persistent) throws Exception
+   private void testNoMaxBatchTimeSameServer(QualityOfServiceMode qosMode, boolean persistent) throws Exception
    {
-      Bridge bridge = null;
+      BridgeImpl bridge = null;
             
       try
       {
          final int NUM_MESSAGES = 10;
          
-         bridge = new Bridge(cff0, cff0, sourceQueueFactory, localTargetQueueFactory,
+         bridge = new BridgeImpl(cff0, cff0, sourceQueueFactory, localTargetQueueFactory,
                   null, null, null, null,
                   null, 5000, 10, qosMode,
                   NUM_MESSAGES, -1,
@@ -1488,9 +1464,9 @@
       }                  
    }
    
-   private void testMaxBatchTime(int qosMode, boolean persistent) throws Exception
+   private void testMaxBatchTime(QualityOfServiceMode qosMode, boolean persistent) throws Exception
    {
-      Bridge bridge = null;
+      BridgeImpl bridge = null;
             
       try
       {
@@ -1498,7 +1474,7 @@
          
          final int MAX_BATCH_SIZE = 100000; // something big so it won't reach it
          
-         bridge = new Bridge(cff0, cff1, sourceQueueFactory, targetQueueFactory,
+         bridge = new BridgeImpl(cff0, cff1, sourceQueueFactory, targetQueueFactory,
                   null, null, null, null,
                   null, 3000, 10, qosMode,
                   MAX_BATCH_SIZE, MAX_BATCH_TIME,
@@ -1529,9 +1505,9 @@
       }                  
    }
    
-   private void testMaxBatchTimeSameServer(int qosMode, boolean persistent) throws Exception
+   private void testMaxBatchTimeSameServer(QualityOfServiceMode qosMode, boolean persistent) throws Exception
    {
-      Bridge bridge = null;
+      BridgeImpl bridge = null;
             
       try
       {
@@ -1539,7 +1515,7 @@
          
          final int MAX_BATCH_SIZE = 100000; // something big so it won't reach it
          
-         bridge = new Bridge(cff0, cff0, sourceQueueFactory, localTargetQueueFactory,
+         bridge = new BridgeImpl(cff0, cff0, sourceQueueFactory, localTargetQueueFactory,
                   null, null, null, null,
                   null, 3000, 10, qosMode,
                   MAX_BATCH_SIZE, MAX_BATCH_TIME,

Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/bridge/BridgeTestBase.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/bridge/BridgeTestBase.java	2008-06-25 12:28:42 UTC (rev 4575)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/bridge/BridgeTestBase.java	2008-06-25 13:00:39 UTC (rev 4576)
@@ -38,11 +38,12 @@
 import javax.naming.InitialContext;
 
 import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.jms.bridge.Bridge;
 import org.jboss.messaging.jms.bridge.ConnectionFactoryFactory;
 import org.jboss.messaging.jms.bridge.DestinationFactory;
-import org.jboss.messaging.jms.bridge.JNDIConnectionFactoryFactory;
-import org.jboss.messaging.jms.bridge.JNDIDestinationFactory;
+import org.jboss.messaging.jms.bridge.Bridge;
+import org.jboss.messaging.jms.bridge.QualityOfServiceMode;
+import org.jboss.messaging.jms.bridge.impl.JNDIConnectionFactoryFactory;
+import org.jboss.messaging.jms.bridge.impl.JNDIDestinationFactory;
 import org.jboss.test.messaging.JBMServerTestCase;
 import org.jboss.test.messaging.tools.ServerManagement;
 import org.jboss.test.messaging.tools.container.ServiceContainer;
@@ -208,7 +209,7 @@
    }
    
 
-   protected void checkMessagesReceived(ConnectionFactory cf, Destination dest, int qosMode,
+   protected void checkMessagesReceived(ConnectionFactory cf, Destination dest, QualityOfServiceMode qosMode,
    		                               int numMessages, boolean longWaitForFirst) throws Exception
    {
       Connection conn = null;
@@ -248,7 +249,7 @@
             
          }
          
-         if (qosMode == Bridge.QOS_ONCE_AND_ONLY_ONCE || qosMode == Bridge.QOS_DUPLICATES_OK)
+         if (qosMode == QualityOfServiceMode.ONCE_AND_ONLY_ONCE || qosMode == QualityOfServiceMode.DUPLICATES_OK)
          {            
             //All the messages should be received
             
@@ -258,12 +259,12 @@
             }
             
             //Should be no more
-            if (qosMode == Bridge.QOS_ONCE_AND_ONLY_ONCE)
+            if (qosMode == QualityOfServiceMode.ONCE_AND_ONLY_ONCE)
             {
                assertEquals(numMessages, msgs.size());
             }         
          }
-         else if (qosMode == Bridge.QOS_AT_MOST_ONCE)
+         else if (qosMode == QualityOfServiceMode.AT_MOST_ONCE)
          {
             //No *guarantee* that any messages will be received
             //but you still might get some depending on how/where the crash occurred                 

Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/bridge/ReconnectTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/bridge/ReconnectTest.java	2008-06-25 12:28:42 UTC (rev 4575)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/bridge/ReconnectTest.java	2008-06-25 13:00:39 UTC (rev 4576)
@@ -23,6 +23,8 @@
 
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.jms.bridge.Bridge;
+import org.jboss.messaging.jms.bridge.QualityOfServiceMode;
+import org.jboss.messaging.jms.bridge.impl.BridgeImpl;
 import org.jboss.test.messaging.tools.ServerManagement;
 
 /**
@@ -47,36 +49,36 @@
    
    public void testCrashAndReconnectDestBasic_OnceAndOnlyOnce_P() throws Exception
    {
-      testCrashAndReconnectDestBasic(Bridge.QOS_ONCE_AND_ONLY_ONCE, true);
+      testCrashAndReconnectDestBasic(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, true);
    }
    
    public void testCrashAndReconnectDestBasic_OnceAndOnlyOnce_NP() throws Exception
    {
-      testCrashAndReconnectDestBasic(Bridge.QOS_ONCE_AND_ONLY_ONCE, false);
+      testCrashAndReconnectDestBasic(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, false);
    }
 
    // dups ok
 
    public void testCrashAndReconnectDestBasic_DuplicatesOk_P() throws Exception
    {
-      testCrashAndReconnectDestBasic(Bridge.QOS_DUPLICATES_OK, true);
+      testCrashAndReconnectDestBasic(QualityOfServiceMode.DUPLICATES_OK, true);
    }
 
    public void testCrashAndReconnectDestBasic_DuplicatesOk_NP() throws Exception
    {
-      testCrashAndReconnectDestBasic(Bridge.QOS_DUPLICATES_OK, false);
+      testCrashAndReconnectDestBasic(QualityOfServiceMode.DUPLICATES_OK, false);
    }
 
    // At most once
 
    public void testCrashAndReconnectDestBasic_AtMostOnce_P() throws Exception
    {
-      testCrashAndReconnectDestBasic(Bridge.QOS_AT_MOST_ONCE, true);
+      testCrashAndReconnectDestBasic(QualityOfServiceMode.AT_MOST_ONCE, true);
    }
 
    public void testCrashAndReconnectDestBasic_AtMostOnce_NP() throws Exception
    {
-      testCrashAndReconnectDestBasic(Bridge.QOS_AT_MOST_ONCE, false);
+      testCrashAndReconnectDestBasic(QualityOfServiceMode.AT_MOST_ONCE, false);
    }
 
    // Crash tests specific to XA transactions
@@ -97,9 +99,9 @@
    {
       ServerManagement.kill(1);
 
-      Bridge bridge = new Bridge(cff0, cff1, sourceQueueFactory, targetQueueFactory,
+      BridgeImpl bridge = new BridgeImpl(cff0, cff1, sourceQueueFactory, targetQueueFactory,
             null, null, null, null,
-            null, 1000, -1, Bridge.QOS_DUPLICATES_OK,
+            null, 1000, -1, QualityOfServiceMode.DUPLICATES_OK,
             10, -1,
             null, null, false);
       
@@ -138,13 +140,13 @@
     * Send some more messages
     * Verify all messages are received
     */
-   private void testCrashAndReconnectDestBasic(int qosMode, boolean persistent) throws Exception
+   private void testCrashAndReconnectDestBasic(QualityOfServiceMode qosMode, boolean persistent) throws Exception
    {
-      Bridge bridge = null;
+      BridgeImpl bridge = null;
          
       try
       {   
-         bridge = new Bridge(cff0, cff1, sourceQueueFactory, targetQueueFactory,
+         bridge = new BridgeImpl(cff0, cff1, sourceQueueFactory, targetQueueFactory,
                   null, null, null, null,
                   null, 1000, -1, qosMode,
                   10, -1,
@@ -220,13 +222,13 @@
     */
    private void testCrashAndReconnectDestCrashBeforePrepare(boolean persistent) throws Exception
    {   
-      Bridge bridge = null;
+      BridgeImpl bridge = null;
             
       try
       {
-         bridge = new Bridge(cff0, cff1, sourceQueueFactory, targetQueueFactory,
+         bridge = new BridgeImpl(cff0, cff1, sourceQueueFactory, targetQueueFactory,
                   null, null, null, null,
-                  null, 1000, -1, Bridge.QOS_ONCE_AND_ONLY_ONCE,
+                  null, 1000, -1, QualityOfServiceMode.ONCE_AND_ONLY_ONCE,
                   10, 5000,
                   null, null, false);
          
@@ -262,7 +264,7 @@
          
          sendMessages(cf0, sourceQueue, NUM_MESSAGES / 2, NUM_MESSAGES / 2, persistent);
                            
-         checkMessagesReceived(cf1, targetQueue, Bridge.QOS_ONCE_AND_ONLY_ONCE, NUM_MESSAGES, false);         
+         checkMessagesReceived(cf1, targetQueue, QualityOfServiceMode.ONCE_AND_ONLY_ONCE, NUM_MESSAGES, false);         
       }
       finally
       {      




More information about the jboss-cvs-commits mailing list