[jboss-cvs] JBoss Messaging SVN: r1957 - in trunk: tests/src/org/jboss/test/messaging/jms/bridge and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Jan 11 07:55:53 EST 2007


Author: timfox
Date: 2007-01-11 07:55:47 -0500 (Thu, 11 Jan 2007)
New Revision: 1957

Modified:
   trunk/src/main/org/jboss/jms/server/bridge/Bridge.java
   trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTest.java
Log:
More bridge work



Modified: trunk/src/main/org/jboss/jms/server/bridge/Bridge.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/bridge/Bridge.java	2007-01-11 03:56:53 UTC (rev 1956)
+++ trunk/src/main/org/jboss/jms/server/bridge/Bridge.java	2007-01-11 12:55:47 UTC (rev 1957)
@@ -22,8 +22,8 @@
 package org.jboss.jms.server.bridge;
 
 import java.util.ArrayList;
-import java.util.Hashtable;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 
 import javax.jms.Connection;
@@ -33,19 +33,17 @@
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.QueueConnection;
-import javax.jms.QueueSession;
 import javax.jms.Session;
 import javax.jms.Topic;
-import javax.jms.TopicConnection;
-import javax.jms.TopicSession;
-import javax.naming.InitialContext;
+import javax.jms.XAConnection;
+import javax.jms.XAConnectionFactory;
+import javax.jms.XASession;
+import javax.transaction.Transaction;
 import javax.transaction.TransactionManager;
+import javax.transaction.xa.XAResource;
 
 import org.jboss.logging.Logger;
 import org.jboss.messaging.core.plugin.contract.MessagingComponent;
-import org.jboss.tm.TransactionManagerLocator;
 
 /**
  * 
@@ -63,25 +61,73 @@
    
    private static boolean trace;
    
+   //Transaction modes
+   
+//   public static final int TRANSACTIONS_NONE = 0;
+//   
+//   public static final int TRANSACTIONS_LOCAL = 1;
+//   
+//   public static final int TRANSACTIONS_XA = 2;
+   
+   //Quality of service modes
+   
+   public static final int QOS_AT_MOST_ONCE = 0;
+   
+   public static final int QOS_DUPLICATES_OK = 1;
+   
+   public static final int QOS_ONCE_AND_ONLY_ONCE = 2;
+   
+   /*
+    * QOS_AT_MOST_ONCE
+    * 
+    * With this QoS mode messages will reach the destination from the source at most once.
+    * The messages are consumed from the source and acknowledged
+    * before sending to the destination. Therefore there is a possibility that if failure occurs between
+    * removing them from the source and them arriving at the destination they could be lost. Hence delivery
+    * will occur at most once.
+    * This mode is avilable for both persistent and non persistent messages.
+    * 
+    * QOS_DUPLICATES_OK
+    * 
+    * With this QoS mode, the messages are consumed from the source and then acknowledged
+    * after they have been successfully sent to the destination. Therefore there is a possibility that if
+    * failure occurs after sending to the destination but before acknowledging them, they could be sent again
+    * when the system recovers. I.e. the destination might receive duplicates after a failure.
+    * This mode is available for both persistent and non persistent messages.
+    * 
+    * QOS_ONCE_AND_ONLY_ONCE
+    * 
+    * This QoS mode ensures messages will reach the destination from the source once and only once.
+    * (Sometimes this mode is known as "exactly once").
+    * If both the source and the destination are on the same JBoss Messaging server instance then this can 
+    * be achieved by sending and acknowledging the messages in the same local transaction.
+    * If the source and destination are on different servers this is achieved by enlisting the sending and consuming
+    * sessions in a JTA transaction. The JTA transaction is controlled by JBoss Transactions JTA implementation which
+    * is a fully recovering transaction manager, thus providing a very high degree of durability.
+    * If JTA is required then both supplied connection factories need to be XAConnectionFactory implementations.
+    * This mode is only available for persistent messages.
+    * This is likely to be the slowest mode since it requires extra persistence for the transaction logging.
+    * 
+    * Note:
+    * For a specific application it may possible to provide once and only once semantics without using the
+    * QOS_ONCE_AND_ONLY_ONCE QoS level. This can be done by using the QOS_DUPLICATES_OK mode and then checking for
+    * duplicates at the destination and discarding them. Some JMS servers provide automatic duplicate message detection
+    * functionality, or this may be possible to implement on the application level by maintaining a cache of received
+    * message ids on disk and comparing received messages to them. The cache would only be valid
+    * for a certain period of time so this approach is not as watertight as using QOS_ONCE_AND_ONLY_ONCE but
+    * may be a good choice depending on your specific application.
+    * 
+    * 
+    * 
+    */
+   
    static
    {
       log = Logger.getLogger(Bridge.class);
       
       trace = log.isTraceEnabled();
    }
-         
-   private Hashtable sourceJNDIProperties;
-   
-   private Hashtable destJNDIProperties;
-   
-   private String sourceConnectionFactoryLookup;
-   
-   private String destConnectionFactoryLookup;
-   
-   private String sourceDestinationLookup;
-   
-   private String destDestinationLookup;
-   
+
    private String sourceUsername;
    
    private String sourcePassword;
@@ -96,10 +142,8 @@
    
    private long failureRetryInterval;
    
-   private boolean transactional;
+   private int qualityOfServiceMode;
    
-   private boolean XA;
-   
    private int maxBatchSize;
    
    private long maxBatchTime;
@@ -110,19 +154,22 @@
    
    private boolean started;
    
-   private List messages;
+   private LinkedList messages;
    
    private Object lock;
    
-   //Needed since we use the 1.0.2 JMS API so we can work with 1.0.2 providers
-   private boolean sourceIsTopic;
+   private ConnectionFactory cfSource;
    
-   private boolean destIsTopic;
+   private ConnectionFactory cfDest;
    
    private Connection connSource; 
    
    private Connection connDest;
    
+   private Destination destSource;
+   
+   private Destination destDest;
+   
    private Session sessSource;
    
    private Session sessDest;
@@ -139,45 +186,37 @@
    
    private boolean paused;         
    
-   private InitialContext icSource;
+   private Transaction tx;  
    
-   private InitialContext icDest;
+   private boolean manualAck;
    
+   private boolean manualCommit;
          
-   public Bridge(Hashtable sourceJNDIProperties, Hashtable destJNDIProperties,
-                 String sourceConnectionFactoryLookup, String destConnectionFactoryLookup,   
-                 String sourceDestinationLookup, String destDestinationLookup,
+   public Bridge(ConnectionFactory cfSource, ConnectionFactory cfDest,
+                 Destination destSource, Destination destDest,         
                  String sourceUsername, String sourcePassword,
                  String destUsername, String destPassword,
-                 String selector, long failureRetryInterval, boolean transactional,
-                 boolean XA, int maxBatchSize, long maxBatchTime,
-                 String subName, String clientID,
-                 boolean sourceIsTopic, boolean destIsTopic)
+                 String selector, long failureRetryInterval, 
+                 int qosMode,
+                 int maxBatchSize, long maxBatchTime,
+                 String subName, String clientID)
    {
-      if (sourceJNDIProperties == null)
+      if (cfSource == null)
       {
-         throw new IllegalArgumentException("sourceJNDIProperties cannot be null");
+         throw new IllegalArgumentException("cfSource cannot be null");
       }
-      if (destJNDIProperties == null)
+      if (cfDest == null)
       {
-         throw new IllegalArgumentException("destJNDIProperties cannot be null");
+         throw new IllegalArgumentException("cfDest cannot be null");
       }
-      if (sourceConnectionFactoryLookup == null)
+      if (destSource == null)
       {
-         throw new IllegalArgumentException("sourceConnectionFactoryLookup cannot be null");
+         throw new IllegalArgumentException("destSource cannot be null");
       }
-      if (destConnectionFactoryLookup == null)
+      if (destDest == null)
       {
-         throw new IllegalArgumentException("destConnectionFactoryLookup cannot be null");
+         throw new IllegalArgumentException("destDest cannot be null");
       }
-      if (sourceDestinationLookup == null)
-      {
-         throw new IllegalArgumentException("sourceDestinationLookup cannot be null");
-      }
-      if (destDestinationLookup == null)
-      {
-         throw new IllegalArgumentException("destDestinationLookup cannot be null");
-      }
       if (failureRetryInterval < 0 && failureRetryInterval != -1)
       {
          throw new IllegalArgumentException("failureRetryInterval must be > 0 or -1 to represent no retry");
@@ -194,19 +233,19 @@
       {
          throw new IllegalArgumentException("Cannot have unlimited batch size and unlimited batch time");
       }
+      if (qosMode != QOS_AT_MOST_ONCE && qosMode != QOS_DUPLICATES_OK && qosMode != QOS_ONCE_AND_ONLY_ONCE)
+      {
+         throw new IllegalArgumentException("Invalid QoS mode " + qosMode);
+      }
       
-      this.sourceJNDIProperties = sourceJNDIProperties;
+      this.cfSource = cfSource;
       
-      this.destJNDIProperties = destJNDIProperties; 
+      this.cfDest = cfDest;
       
-      this.sourceConnectionFactoryLookup = sourceConnectionFactoryLookup; 
+      this.destSource = destSource;
       
-      this.destConnectionFactoryLookup = destConnectionFactoryLookup;
+      this.destDest = destDest;
       
-      this.sourceDestinationLookup = sourceDestinationLookup;
-      
-      this.destDestinationLookup = destDestinationLookup;
-      
       this.sourceUsername = sourceUsername;
       
       this.sourcePassword = sourcePassword;
@@ -219,24 +258,18 @@
       
       this.failureRetryInterval = failureRetryInterval;
       
-      this.transactional = transactional;
+      this.qualityOfServiceMode = qosMode;
       
-      this.XA = XA;
-      
       this.maxBatchSize = maxBatchSize;
       
       this.maxBatchTime = maxBatchTime;
-         
+      
       this.subName = subName;
       
       this.clientID = clientID;
       
-      this.sourceIsTopic = sourceIsTopic;
+      this.messages = new LinkedList();      
       
-      this.destIsTopic = destIsTopic;
-      
-      this.messages = new ArrayList(maxBatchSize);
-      
       this.lock = new Object();
       
       if (trace)
@@ -245,23 +278,9 @@
       }
    }
    
-   private TransactionManager getTm()
-   {
-      if (tm == null)
-      {
-         tm = TransactionManagerLocator.getInstance().locate();
-         
-         if (tm == null)
-         {
-            throw new IllegalStateException("Cannot locate a transaction manager");
-         }
-      }
-      
-      return tm;
-   }
    
    // MessagingComponent overrides --------------------------------------------------
-   
+  
    public synchronized void start() throws Exception
    {
       if (started)
@@ -270,117 +289,128 @@
          return;
       }
       
-      if (trace) { log.trace("Starting " + this); }
+      if (trace) { log.trace("Starting " + this); }         
       
-      icSource = new InitialContext(sourceJNDIProperties);
+      connSource = createConnection(sourceUsername, sourcePassword, cfSource);
       
-      icDest = new InitialContext(destJNDIProperties);
+      connDest = createConnection(destUsername, destPassword, cfDest);
       
-      ConnectionFactory cfSource = (ConnectionFactory)icSource.lookup(sourceConnectionFactoryLookup);
-      
-      ConnectionFactory cfDest = (ConnectionFactory)icDest.lookup(destConnectionFactoryLookup);
-      
-      Destination sourceDest = (Destination)icSource.lookup(sourceDestinationLookup);
-      
-      Destination destDest = (Destination)icDest.lookup(destDestinationLookup);
-      
-      if (sourceUsername == null)
+      if (clientID != null)
       {
-         connSource = cfSource.createConnection();         
+         connSource.setClientID(clientID);
       }
-      else
-      {
-         connSource = cfSource.createConnection(sourceUsername, sourcePassword);     
-      }
+       
+      Session sess;
       
-      if (destUsername == null)
+      if (qualityOfServiceMode == QOS_ONCE_AND_ONLY_ONCE)
       {
-         connDest = cfDest.createConnection();         
+         //Create an XASession for consuming from the source
+         if (trace) { log.trace("Creating XA source session"); }
+         sessSource = ((XAConnection)connSource).createXASession();
+         
+         sess = ((XASession)sessSource).getSession();
       }
       else
       {
-         connDest = cfDest.createConnection(destUsername, destPassword);     
+         if (trace) { log.trace("Creating non XA source session"); }
+         
+         //Create a standard session for consuming from the source
+         
+         //If the QoS is at_most_once, and max batch size is 1 then we use AUTO_ACKNOWLEDGE
+         //If the QoS is at_most_once, and max batch size > 1 or -1, then we use CLIENT_ACKNOWLEDGE
+         //We could use CLIENT_ACKNOWLEDGE for both the above but AUTO_ACKNOWLEGE may be slightly more
+         //performant in some implementations that manually acking every time but it really depends
+         //on the implementation.
+         //We could also use local transacted for both the above but don't for the same reasons.
+         
+         //If the QoS is duplicates_ok, we use CLIENT_ACKNOWLEDGE
+         //We could use local transacted, whether one is faster than the other probably depends on the
+         //messaging implementation but there's probably not much in it
+         
+         int ackMode;
+         if (qualityOfServiceMode == QOS_AT_MOST_ONCE && maxBatchSize == 1)
+         {
+            ackMode = Session.AUTO_ACKNOWLEDGE;
+         }
+         else
+         {
+            ackMode = Session.CLIENT_ACKNOWLEDGE;
+            
+            manualAck = true;
+         }
+         
+         sessSource = connSource.createSession(false, ackMode);
+         
+         sess = sessSource;
       }
-
-      if (clientID != null)
+         
+      if (subName == null)
       {
-         connDest.setClientID(clientID);
-      }
-       
-      //Note we use the JMS 1.0.2 API so we can interoperate with JMS providers that don't support
-      //JMS 1.1
-      if (sourceIsTopic)
-      {         
-         sessSource =
-            ((TopicConnection)connSource).createTopicSession(transactional, transactional ? Session.SESSION_TRANSACTED : Session.CLIENT_ACKNOWLEDGE);
-         
-         if (subName == null)
+         if (selector == null)
          {
-            //Non durable
-            if (selector == null)
-            {
-               consumer = ((TopicSession)sessSource).createSubscriber((Topic)sourceDest);
-            }
-            else
-            {
-               consumer = ((TopicSession)sessSource).createSubscriber((Topic)sourceDest, selector, false);
-            }
+            consumer = sess.createConsumer(destSource);
          }
          else
          {
-            //Durable
-            if (selector == null)
-            {
-               consumer = ((TopicSession)sessSource).createDurableSubscriber((Topic)sourceDest, subName);
-            }
-            else
-            {
-               consumer = ((TopicSession)sessSource).createDurableSubscriber((Topic)sourceDest, subName, selector, false);
-            }
+            consumer = sess.createConsumer(destSource, selector, false);
          }
       }
       else
       {
-         sessSource =
-            ((QueueConnection)connSource).createQueueSession(transactional, transactional ? Session.SESSION_TRANSACTED : Session.CLIENT_ACKNOWLEDGE);
-
-         if (subName == null)
+         //Durable subscription
+         if (selector == null)
          {
-            if (selector == null)
-            {
-               consumer = ((QueueSession)sessSource).createReceiver((Queue)sourceDest);
-            }
-            else
-            {
-               consumer = ((QueueSession)sessSource).createReceiver((Queue)sourceDest, selector);
-            }
+            consumer = sess.createDurableSubscriber((Topic)destSource, subName);
          }
          else
          {
-            //Shouldn't specify sub name for source quuee
-            throw new IllegalArgumentException("subName should not be specified if the source destination is a queue");
+            consumer = sess.createDurableSubscriber((Topic)destSource, subName, selector, false);
          }
       }
       
-      if (destIsTopic)
+      if (qualityOfServiceMode == QOS_ONCE_AND_ONLY_ONCE)
       {
-         sessDest =
-            ((TopicConnection)connDest).createTopicSession(transactional, transactional ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
+         if (trace) { log.trace("Creating XA dest session"); }
          
-         producer = ((TopicSession)sessDest).createProducer((Topic)destDest);
+         //Create an XA sesion for sending to the destination
+         
+         sessDest = ((XAConnection)connDest).createXASession();
+         
+         sess = ((XASession)sessDest).getSession();
       }
       else
       {
-         sessDest =
-            ((QueueConnection)connDest).createQueueSession(transactional, transactional ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
-
-         producer = ((QueueSession)sessDest).createSender((Queue)destDest);
+         if (trace) { log.trace("Creating non XA dest session"); }
          
+         //Create a standard session for sending to the destination
+         
+         //If maxBatchSize == 1 we just create a non transacted session, otherwise we
+         //create a transacted session for the send, since sending the batch in a transaction
+         //is likely to be more efficient than sending messages individually
+         
+         manualCommit = maxBatchSize == 1;
+         
+         sessDest = connDest.createSession(manualCommit, manualCommit ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
+         
+         sess = sessDest;
       }
       
+      if (qualityOfServiceMode == QOS_ONCE_AND_ONLY_ONCE)
+      {
+         if (trace) { log.trace("Starting JTA transaction"); }
+         
+         tx = startTx();
+         
+         enlistResources(tx);                  
+      }
+      
+      producer = sess.createProducer(destDest);
+                       
       consumer.setMessageListener(new SourceListener());
       
       connSource.start();
+      
+      started = true;
 
       if (maxBatchTime != -1)
       {
@@ -391,11 +421,11 @@
          batchExpiryTime = System.currentTimeMillis() + maxBatchTime;
          
          checkerThread.start();
-      }
+      }            
       
       if (trace) { log.trace("Started " + this); }
    }
-
+   
    public synchronized void stop() throws Exception
    {
       if (!started)
@@ -427,10 +457,12 @@
       
       connDest.close();
 
-      icSource.close();
+      if (tx != null)
+      {
+         //Terminate any transaction
+         tx.rollback();
+      }
       
-      icDest.close();
-      
       if (trace) { log.trace("Stopped " + this); }
    }
    
@@ -465,11 +497,112 @@
    }
    
    // Private -------------------------------------------------------------------
-          
-   private void sendBatch()
+   
+   private void enlistResources(Transaction tx) throws Exception
    {
+      if (trace) { log.trace("Enlisting resources in tx"); }
+      
+      XAResource resSource = ((XASession)sessSource).getXAResource();
+      
+      tx.enlistResource(resSource);
+      
+      XAResource resDest = ((XASession)sessDest).getXAResource();
+      
+      tx.enlistResource(resDest);
+      
+      if (trace) { log.trace("Enlisted resources in tx"); }
+   }
+   
+   private void delistResources(Transaction tx) throws Exception
+   {
+      if (trace) { log.trace("Delisting resources from tx"); }
+      
+      XAResource resSource = ((XASession)sessSource).getXAResource();
+      
+      tx.delistResource(resSource, XAResource.TMSUCCESS);
+      
+      XAResource resDest = ((XASession)sessDest).getXAResource();
+      
+      tx.delistResource(resDest, XAResource.TMSUCCESS);
+      
+      if (trace) { log.trace("Delisted resources from tx"); }
+   }
+   
+   private Transaction startTx() throws Exception
+   {
+      if (trace) { log.trace("Starting JTA transaction"); }
+      
+      TransactionManager tm = getTm();
+      
+      tm.begin();
+      
+      Transaction tx = tm.getTransaction();
+      
+      //Remove the association between current thread - we don't want it
+      //we will be committing /rolling back directly on the transaction object
+      
+      tm.suspend();
+      
+      if (trace) { log.trace("Started JTA transaction"); }
+      
+      return tx;
+   }
+   
+   private TransactionManager getTm()
+   {
+      if (tm == null)
+      {
+//         tm = TransactionManagerLocator.getInstance().locate();
+//         
+//         if (tm == null)
+//         {
+//            throw new IllegalStateException("Cannot locate a transaction manager");
+//         }
+         
+         tm = com.arjuna.ats.jta.TransactionManager.transactionManager();
+      }
+      
+      return tm;
+   }
+   
+   private Connection createConnection(String username, String password, ConnectionFactory cf)
+      throws Exception
+   {
+      Connection conn;
+      
+      if (username == null)
+      {
+         if (qualityOfServiceMode == QOS_ONCE_AND_ONLY_ONCE)
+         {
+            if (trace) { log.trace("Creating an XA connection"); }
+            conn = ((XAConnectionFactory)cf).createXAConnection();
+         }
+         else
+         {
+            if (trace) { log.trace("Creating a non XA connection"); }
+            conn = cf.createConnection();            
+         }
+      }
+      else
+      {
+         if (qualityOfServiceMode == QOS_ONCE_AND_ONLY_ONCE)
+         {
+            if (trace) { log.trace("Creating an XA connection"); }
+            conn = ((XAConnectionFactory)cf).createXAConnection(username, password);
+         }
+         else
+         {
+            if (trace) { log.trace("Creating a non XA connection"); }
+            conn = cf.createConnection(username, password);            
+         }  
+      }
+      return conn;
+   }
+                
+   private void sendBatch() 
+   {
       if (trace) { log.trace("Sending batch of " + messages.size() + " messages"); }
-      
+        
       synchronized (lock)
       {
          if (paused)
@@ -480,6 +613,26 @@
             return;            
          }
          
+         if (qualityOfServiceMode == QOS_AT_MOST_ONCE)
+         {
+            //We ack before we send
+            if (manualAck)
+            {
+               //Ack on the last message
+               try
+               {
+                  ((Message)messages.getLast()).acknowledge();
+               }
+               catch (Throwable t)
+               {
+                  //Deal with this
+                  t.printStackTrace();
+               }      
+            }
+         }
+         
+         //Now send the message(s)   
+            
          Iterator iter = messages.iterator();
          
          Message msg = null;
@@ -503,58 +656,67 @@
             }                        
          }
          
-         if (transactional)
+         if (qualityOfServiceMode == QOS_DUPLICATES_OK)
          {
-            try
-            {
-               if (trace) { log.trace("Committing local sending tx"); }
-               
-               sessDest.commit();
-               
-               if (trace) { log.trace("Committed local sending tx"); }
-            }
-            catch (Throwable t)
-            {
-               //Deal with this
-               t.printStackTrace();
-            }
+            //We ack the source message(s) after sending
+            
+            if (manualAck)
+            {               
+               try
+               {
+                  //Ack on the last message
+                  ((Message)messages.getLast()).acknowledge();
+               }
+               catch (Throwable t)
+               {
+                  //Deal with this
+                  t.printStackTrace();
+               } 
+            }                  
          }
          
-         messages.clear();
-         
-         if (transactional)
+         //Now we commit the sending session if necessary
+         if (manualCommit)
          {
             try
             {
-               if (trace) { log.trace("Committing local consuming tx"); }
-               
-               sessSource.commit();
-               
-               if (trace) { log.trace("Committed local consuming tx"); }
+               sessDest.commit();            
             }
             catch (Throwable t)
             {
                //Deal with this
                t.printStackTrace();
-            }
+            } 
          }
-         else
+                           
+         messages.clear();
+                   
+         if (qualityOfServiceMode == QOS_ONCE_AND_ONLY_ONCE)
          {
+            //Commit the JTA transaction and start another
+                        
+            //XA
             try
             {
-               if (trace) { log.trace("Acknowledging session"); }
+               delistResources(tx);
+                  
+               if (trace) { log.trace("Committing JTA transaction"); }
                
-               msg.acknowledge();
+               tx.commit();
                
-               if (trace) { log.trace("Acknowledged session"); }
+               if (trace) { log.trace("Committed JTA transaction"); }
+               
+               tx = startTx();  
+               
+               enlistResources(tx);
             }
             catch (Throwable t)
             {
                //Deal with this
                t.printStackTrace();
             }
+         }
          
-         }
       }
    }
    
@@ -623,7 +785,7 @@
             
             if (trace) { log.trace(this + " rescheduled batchExpiryTime to " + batchExpiryTime); }
             
-            if (messages.size() >= maxBatchSize)
+            if (maxBatchSize != -1 && messages.size() >= maxBatchSize)
             {
                if (trace) { log.trace(this + " maxBatchSize has been reached so sending batch"); }
                

Modified: trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTest.java	2007-01-11 03:56:53 UTC (rev 1956)
+++ trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTest.java	2007-01-11 12:55:47 UTC (rev 1957)
@@ -48,7 +48,6 @@
  */
 public class BridgeTest extends MessagingTestCase
 {
-
    public BridgeTest(String name)
    {
       super(name);
@@ -64,26 +63,63 @@
       super.tearDown();
    }
    
-   public void testMaxBatchSizeNoMaxBatchTimeTransacted() throws Exception
+   public void testMaxBatchSizeNoMaxBatchTime_AtMostOnce() throws Exception
    {
       if (!ServerManagement.isRemote())
       {
          return;
       }
-      testMaxBatchSizeNoMaxBatchTime(true);
+      testMaxBatchSizeNoMaxBatchTime(Bridge.QOS_AT_MOST_ONCE);
    }
    
-   public void testMaxBatchSizeNoMaxBatchTimeNonTransacted() throws Exception
+   public void testMaxBatchSizeNoMaxBatchTime_DuplicatesOk() throws Exception
    {
       if (!ServerManagement.isRemote())
       {
          return;
       }
-      testMaxBatchSizeNoMaxBatchTime(false);
+      testMaxBatchSizeNoMaxBatchTime(Bridge.QOS_DUPLICATES_OK);
    }
    
-   private void testMaxBatchSizeNoMaxBatchTime(boolean transacted) throws Exception
+   public void testMaxBatchSizeNoMaxBatchTime_OnceAndOnlyOnce() throws Exception
    {
+      if (!ServerManagement.isRemote())
+      {
+         return;
+      }
+      testMaxBatchSizeNoMaxBatchTime(Bridge.QOS_ONCE_AND_ONLY_ONCE);
+   }
+   
+   
+   public void testMaxBatchTimeNoMaxBatchSize_AtMostOnce() throws Exception
+   {
+      if (!ServerManagement.isRemote())
+      {
+         return;
+      }
+      this.testMaxBatchTimeNoMaxBatchSize(Bridge.QOS_AT_MOST_ONCE);
+   }
+   
+   public void testMaxBatchTimeNoMaxBatchSize_DuplicatesOk() throws Exception
+   {
+      if (!ServerManagement.isRemote())
+      {
+         return;
+      }
+      this.testMaxBatchTimeNoMaxBatchSize(Bridge.QOS_DUPLICATES_OK);
+   }
+   
+   public void testMaxBatchTimeNoMaxBatchSize_OnceAndOnlyOnce() throws Exception
+   {
+      if (!ServerManagement.isRemote())
+      {
+         return;
+      }
+      testMaxBatchTimeNoMaxBatchSize(Bridge.QOS_ONCE_AND_ONLY_ONCE);
+   }
+      
+   private void testMaxBatchSizeNoMaxBatchTime(int qosMode) throws Exception
+   {
       Connection connSource = null;
       
       Connection connDest = null;
@@ -118,12 +154,11 @@
          
          final int BATCH_SIZE = 10;
          
-         bridge = new Bridge(props0, props1, "/ConnectionFactory", "/ConnectionFactory",
-                  "/queue/sourceQueue", "/queue/destQueue", null, null, null, null,
-                  null, 0, false,
-                  false, 10, -1,
-                  null, null,
-                  false, false);
+         bridge = new Bridge(cf0, cf1, sourceQueue, destQueue,
+                  null, null, null, null,
+                  null, 0, qosMode,
+                  10, -1,
+                  null, null);
          
          bridge.start();
             
@@ -176,6 +211,10 @@
             assertEquals("message" + i, tm.getText());
          }
          
+         m = cons.receive(1000);
+         
+         assertNull(m);
+         
          //Send another batch with one more than batch size
          
          for (int i = 0; i < BATCH_SIZE + 1; i++)
@@ -224,7 +263,11 @@
             assertEquals("message" + i, tm.getText());
          }
          
+         m = cons.receive(1000);
          
+         assertNull(m);
+         
+         
          //Make sure no messages are left in the source dest
          
          MessageConsumer cons2 = sessSend.createConsumer(sourceQueue);
@@ -294,4 +337,168 @@
          ServerManagement.stop(1);
       }                  
    }
+   
+   private void testMaxBatchTimeNoMaxBatchSize(int qosMode) throws Exception
+   {
+      Connection connSource = null;
+      
+      Connection connDest = null;
+      
+      Bridge bridge = null;
+            
+      try
+      {
+         ServerManagement.start(0, "all", null, true);
+         
+         ServerManagement.start(1, "all", null, false);
+         
+         ServerManagement.deployQueue("sourceQueue", 0);
+         
+         ServerManagement.deployQueue("destQueue", 1);
+         
+         Hashtable props0 = ServerManagement.getJNDIEnvironment(0);
+         
+         Hashtable props1 = ServerManagement.getJNDIEnvironment(1);
+               
+         InitialContext ic0 = new InitialContext(props0);
+         
+         InitialContext ic1 = new InitialContext(props1);
+         
+         ConnectionFactory cf0 = (ConnectionFactory)ic0.lookup("/ConnectionFactory");
+         
+         ConnectionFactory cf1 = (ConnectionFactory)ic1.lookup("/ConnectionFactory");
+         
+         Queue sourceQueue = (Queue)ic0.lookup("/queue/sourceQueue");
+         
+         Queue destQueue = (Queue)ic1.lookup("/queue/destQueue");
+         
+         final long MAX_BATCH_TIME = 3000;
+         
+         bridge = new Bridge(cf0, cf1, sourceQueue, destQueue,
+                  null, null, null, null,
+                  null, 0, qosMode,
+                  -1, MAX_BATCH_TIME,
+                  null, null);
+         
+         bridge.start();
+            
+         connSource = cf0.createConnection();
+         
+         connDest = cf1.createConnection();
+         
+         Session sessSend = connSource.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         MessageProducer prod = sessSend.createProducer(sourceQueue);
+         
+         final int NUM_MESSAGES = 10;
+         
+         //Send some message
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sessSend.createTextMessage("message" + i);
+            
+            prod.send(tm);
+         }
+         
+         Session sessRec = connDest.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         MessageConsumer cons = sessRec.createConsumer(destQueue);
+         
+         connDest.start();
+         
+         //Verify none are received
+         
+         Message m = cons.receive(2000);
+         
+         assertNull(m);
+         
+         //Wait a bit longer
+         
+         Thread.sleep(1500);
+         
+         //Messages should now be receivable
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons.receive(1000);
+            
+            assertNotNull(tm);
+            
+            assertEquals("message" + i, tm.getText());
+         }
+         
+         m = cons.receive(1000);
+         
+         assertNull(m);
+         
+         //Make sure no messages are left in the source dest
+         
+         MessageConsumer cons2 = sessSend.createConsumer(sourceQueue);
+         
+         connSource.start();
+         
+         m = cons2.receive(1000);
+         
+         assertNull(m);
+         
+         connSource.close();
+         
+         connDest.close();
+                  
+      }
+      finally
+      {      
+         if (connSource != null)
+         {
+            try
+            {
+               connSource.close();
+            }
+            catch (Exception e)
+            {
+               log.error("Failed to close connection", e);
+            }
+         }
+         
+         if (connDest != null)
+         {
+            try
+            {
+               connDest.close();
+            }
+            catch (Exception e)
+            {
+              log.error("Failed to close connection", e);
+            }
+         }
+         
+         if (bridge != null)
+         {
+            bridge.stop();
+         }
+         
+         try
+         {
+            ServerManagement.undeployQueue("sourceQueue", 0);
+         }
+         catch (Exception e)
+         {
+            log.error("Failed to undeploy", e);
+         }
+         
+         try
+         {
+            ServerManagement.undeployQueue("destQueue", 1);
+         }
+         catch (Exception e)
+         {
+            log.error("Failed to undeploy", e);
+         }
+         
+         ServerManagement.stop(0);
+         
+         ServerManagement.stop(1);
+      }                  
+   }
 }




More information about the jboss-cvs-commits mailing list