[jboss-cvs] JBoss Messaging SVN: r1962 - in trunk: tests/src/org/jboss/test/messaging/jms/bridge and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Jan 11 16:02:36 EST 2007


Author: timfox
Date: 2007-01-11 16:02:31 -0500 (Thu, 11 Jan 2007)
New Revision: 1962

Modified:
   trunk/src/main/org/jboss/jms/server/bridge/Bridge.java
   trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTest.java
Log:
More bridge work


Modified: trunk/src/main/org/jboss/jms/server/bridge/Bridge.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/bridge/Bridge.java	2007-01-11 18:13:23 UTC (rev 1961)
+++ trunk/src/main/org/jboss/jms/server/bridge/Bridge.java	2007-01-11 21:02:31 UTC (rev 1962)
@@ -21,10 +21,8 @@
  */
 package org.jboss.jms.server.bridge;
 
-import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.LinkedList;
-import java.util.List;
 
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
@@ -61,14 +59,6 @@
    
    private static boolean trace;
    
-   //Transaction modes
-   
-//   public static final int TRANSACTIONS_NONE = 0;
-//   
-//   public static final int TRANSACTIONS_LOCAL = 1;
-//   
-//   public static final int TRANSACTIONS_XA = 2;
-   
    //Quality of service modes
    
    public static final int QOS_AT_MOST_ONCE = 0;
@@ -78,7 +68,12 @@
    public static final int QOS_ONCE_AND_ONLY_ONCE = 2;
    
    /*
+    * 
+    * Quality of service (QoS) levels
+    * ===============================
+    * 
     * QOS_AT_MOST_ONCE
+    * ----------------
     * 
     * With this QoS mode messages will reach the destination from the source at most once.
     * The messages are consumed from the source and acknowledged
@@ -88,6 +83,7 @@
     * This mode is avilable for both persistent and non persistent messages.
     * 
     * QOS_DUPLICATES_OK
+    * -----------------
     * 
     * With this QoS mode, the messages are consumed from the source and then acknowledged
     * after they have been successfully sent to the destination. Therefore there is a possibility that if
@@ -96,6 +92,7 @@
     * This mode is available for both persistent and non persistent messages.
     * 
     * QOS_ONCE_AND_ONLY_ONCE
+    * ----------------------
     * 
     * This QoS mode ensures messages will reach the destination from the source once and only once.
     * (Sometimes this mode is known as "exactly once").
@@ -118,7 +115,6 @@
     * may be a good choice depending on your specific application.
     * 
     * 
-    * 
     */
    
    static
@@ -142,6 +138,8 @@
    
    private long failureRetryInterval;
    
+   private int maxRetries;
+   
    private int qualityOfServiceMode;
    
    private int maxBatchSize;
@@ -158,9 +156,9 @@
    
    private Object lock;
    
-   private ConnectionFactory cfSource;
+   private ConnectionFactoryFactory sourceCfFactory;
    
-   private ConnectionFactory cfDest;
+   private ConnectionFactoryFactory destCfFactory;
    
    private Connection connSource; 
    
@@ -191,23 +189,27 @@
    private boolean manualAck;
    
    private boolean manualCommit;
-         
-   public Bridge(ConnectionFactory cfSource, ConnectionFactory cfDest,
+      
+   /*
+    * This constructor is used when source and destination are on different servers
+    */
+   public Bridge(ConnectionFactoryFactory sourceCfFactory, ConnectionFactoryFactory destCfFactory,
                  Destination destSource, Destination destDest,         
                  String sourceUsername, String sourcePassword,
                  String destUsername, String destPassword,
-                 String selector, long failureRetryInterval, 
+                 String selector, long failureRetryInterval,
+                 int maxRetries,
                  int qosMode,
                  int maxBatchSize, long maxBatchTime,
                  String subName, String clientID)
    {
-      if (cfSource == null)
+      if (sourceCfFactory == null)
       {
-         throw new IllegalArgumentException("cfSource cannot be null");
+         throw new IllegalArgumentException("sourceCfFactory cannot be null");
       }
-      if (cfDest == null)
+      if (destCfFactory == null)
       {
-         throw new IllegalArgumentException("cfDest cannot be null");
+         throw new IllegalArgumentException("destCfFactory cannot be null");
       }
       if (destSource == null)
       {
@@ -221,26 +223,30 @@
       {
          throw new IllegalArgumentException("failureRetryInterval must be > 0 or -1 to represent no retry");
       }
-      if (maxBatchSize < 1 && maxBatchSize != -1)
+      if (maxRetries < 0)
       {
-         throw new IllegalArgumentException("maxBatchSize must be >= 1 or -1 to represent unlimited batch size");
+         throw new IllegalArgumentException("maxRetries must be >= 0");
       }
+      if (failureRetryInterval == -1 && maxRetries > 0)
+      {
+         throw new IllegalArgumentException("If failureRetryInterval == -1 maxRetries must be 0");
+      }
+      if (maxBatchSize < 1)
+      {
+         throw new IllegalArgumentException("maxBatchSize must be >= 1");
+      }
       if (maxBatchTime < 1 && maxBatchTime != -1)
       {
          throw new IllegalArgumentException("maxBatchTime must be >= 1 or -1 to represent unlimited batch time");
       }
-      if (maxBatchTime == -1 && maxBatchSize == -1)
-      {
-         throw new IllegalArgumentException("Cannot have unlimited batch size and unlimited batch time");
-      }
       if (qosMode != QOS_AT_MOST_ONCE && qosMode != QOS_DUPLICATES_OK && qosMode != QOS_ONCE_AND_ONLY_ONCE)
       {
          throw new IllegalArgumentException("Invalid QoS mode " + qosMode);
       }
       
-      this.cfSource = cfSource;
+      this.sourceCfFactory = sourceCfFactory;
       
-      this.cfDest = cfDest;
+      this.destCfFactory = destCfFactory;
       
       this.destSource = destSource;
       
@@ -258,6 +264,8 @@
       
       this.failureRetryInterval = failureRetryInterval;
       
+      this.maxRetries = maxRetries;
+      
       this.qualityOfServiceMode = qosMode;
       
       this.maxBatchSize = maxBatchSize;
@@ -278,9 +286,9 @@
       }
    }
    
-   
+      
    // MessagingComponent overrides --------------------------------------------------
-  
+        
    public synchronized void start() throws Exception
    {
       if (started)
@@ -291,139 +299,33 @@
       
       if (trace) { log.trace("Starting " + this); }         
       
-      connSource = createConnection(sourceUsername, sourcePassword, cfSource);
+      boolean ok = setupJMSObjectsWithRetry();
       
-      connDest = createConnection(destUsername, destPassword, cfDest);
-      
-      if (clientID != null)
-      {
-         connSource.setClientID(clientID);
-      }
-       
-      Session sess;
-      
-      if (qualityOfServiceMode == QOS_ONCE_AND_ONLY_ONCE)
-      {
-         //Create an XASession for consuming from the source
-         if (trace) { log.trace("Creating XA source session"); }
-         sessSource = ((XAConnection)connSource).createXASession();
+      if (ok)
+      {         
+         started = true;
          
-         sess = ((XASession)sessSource).getSession();
-      }
-      else
-      {
-         if (trace) { log.trace("Creating non XA source session"); }
-         
-         //Create a standard session for consuming from the source
-         
-         //If the QoS is at_most_once, and max batch size is 1 then we use AUTO_ACKNOWLEDGE
-         //If the QoS is at_most_once, and max batch size > 1 or -1, then we use CLIENT_ACKNOWLEDGE
-         //We could use CLIENT_ACKNOWLEDGE for both the above but AUTO_ACKNOWLEGE may be slightly more
-         //performant in some implementations that manually acking every time but it really depends
-         //on the implementation.
-         //We could also use local transacted for both the above but don't for the same reasons.
-         
-         //If the QoS is duplicates_ok, we use CLIENT_ACKNOWLEDGE
-         //We could use local transacted, whether one is faster than the other probably depends on the
-         //messaging implementation but there's probably not much in it
-         
-         int ackMode;
-         if (qualityOfServiceMode == QOS_AT_MOST_ONCE && maxBatchSize == 1)
+         if (maxBatchTime != -1)
          {
-            ackMode = Session.AUTO_ACKNOWLEDGE;
-         }
-         else
-         {
-            ackMode = Session.CLIENT_ACKNOWLEDGE;
+            if (trace) { log.trace("Starting time checker thread"); }
+                     
+            timeChecker = new BatchTimeChecker();
             
-            manualAck = true;
-         }
+            checkerThread = new Thread(timeChecker);
+            
+            batchExpiryTime = System.currentTimeMillis() + maxBatchTime;
+            
+            checkerThread.start();
+            
+            if (trace) { log.trace("Started time checker thread"); }
+         }            
          
-         sessSource = connSource.createSession(false, ackMode);
-         
-         sess = sessSource;
+         if (trace) { log.trace("Started " + this); }
       }
-         
-      if (subName == null)
-      {
-         if (selector == null)
-         {
-            consumer = sess.createConsumer(destSource);
-         }
-         else
-         {
-            consumer = sess.createConsumer(destSource, selector, false);
-         }
-      }
       else
       {
-         //Durable subscription
-         if (selector == null)
-         {
-            consumer = sess.createDurableSubscriber((Topic)destSource, subName);
-         }
-         else
-         {
-            consumer = sess.createDurableSubscriber((Topic)destSource, subName, selector, false);
-         }
+         log.warn("Failed to start bridge");
       }
-      
-      if (qualityOfServiceMode == QOS_ONCE_AND_ONLY_ONCE)
-      {
-         if (trace) { log.trace("Creating XA dest session"); }
-         
-         //Create an XA sesion for sending to the destination
-         
-         sessDest = ((XAConnection)connDest).createXASession();
-         
-         sess = ((XASession)sessDest).getSession();
-      }
-      else
-      {
-         if (trace) { log.trace("Creating non XA dest session"); }
-         
-         //Create a standard session for sending to the destination
-         
-         //If maxBatchSize == 1 we just create a non transacted session, otherwise we
-         //create a transacted session for the send, since sending the batch in a transaction
-         //is likely to be more efficient than sending messages individually
-         
-         manualCommit = maxBatchSize == 1;
-         
-         sessDest = connDest.createSession(manualCommit, manualCommit ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
-         
-         sess = sessDest;
-      }
-      
-      if (qualityOfServiceMode == QOS_ONCE_AND_ONLY_ONCE)
-      {
-         if (trace) { log.trace("Starting JTA transaction"); }
-         
-         tx = startTx();
-         
-         enlistResources(tx);                  
-      }
-      
-      producer = sess.createProducer(destDest);
-                       
-      consumer.setMessageListener(new SourceListener());
-      
-      connSource.start();
-      
-      started = true;
-
-      if (maxBatchTime != -1)
-      {
-         timeChecker = new BatchTimeChecker();
-         
-         checkerThread = new Thread(timeChecker);
-         
-         batchExpiryTime = System.currentTimeMillis() + maxBatchTime;
-         
-         checkerThread.start();
-      }            
-      
-      if (trace) { log.trace("Started " + this); }
    }
    
    public synchronized void stop() throws Exception
@@ -450,7 +352,11 @@
       //This must be outside sync block
       if (checkerThread != null)
       {  
+         if (trace) { log.trace("Waiting for checker thread to finish");}
+         
          checkerThread.join();
+         
+         if (trace) { log.trace("Checker thread has finished"); }
       }
       
       connSource.close();
@@ -460,7 +366,11 @@
       if (tx != null)
       {
          //Terminate any transaction
+         if (trace) { log.trace("Rolling back remaining tx"); }
+         
          tx.rollback();
+         
+         if (trace) { log.trace("Rolled back remaining tx"); }
       }
       
       if (trace) { log.trace("Stopped " + this); }
@@ -565,11 +475,19 @@
       return tm;
    }
    
-   private Connection createConnection(String username, String password, ConnectionFactory cf)
+   private Connection createConnection(String username, String password, ConnectionFactoryFactory cff)
       throws Exception
    {
       Connection conn;
       
+      ConnectionFactory cf = cff.createConnectionFactory();
+      
+      if (qualityOfServiceMode == QOS_ONCE_AND_ONLY_ONCE &&
+          !(cf instanceof XAConnectionFactory))
+      {
+         throw new IllegalArgumentException("Connection factory must be XAConnectionFactory");
+      }
+      
       if (username == null)
       {
          if (qualityOfServiceMode == QOS_ONCE_AND_ONLY_ONCE)
@@ -598,106 +516,296 @@
       }
       return conn;
    }
-                
-   private void sendBatch() 
+   
+   private boolean setupJMSObjects()
    {
-      if (trace) { log.trace("Sending batch of " + messages.size() + " messages"); }
-        
-      synchronized (lock)
+      try
       {
-         if (paused)
+         connSource = createConnection(sourceUsername, sourcePassword, sourceCfFactory);
+         
+         connDest = createConnection(destUsername, destPassword, destCfFactory);
+         
+         if (clientID != null)
          {
-            //Don't send now
-            if (trace) { log.trace("Paused, so not sending now"); }
-            
-            return;            
+            connSource.setClientID(clientID);
          }
+          
+         Session sess;
          
-         if (qualityOfServiceMode == QOS_AT_MOST_ONCE)
+         if (qualityOfServiceMode == QOS_ONCE_AND_ONLY_ONCE)
          {
-            //We ack before we send
-            if (manualAck)
+            //Create an XASession for consuming from the source
+            if (trace) { log.trace("Creating XA source session"); }
+            sessSource = ((XAConnection)connSource).createXASession();
+            
+            sess = ((XASession)sessSource).getSession();
+         }
+         else
+         {
+            if (trace) { log.trace("Creating non XA source session"); }
+            
+            //Create a standard session for consuming from the source
+            
+            //If the QoS is at_most_once, and max batch size is 1 then we use AUTO_ACKNOWLEDGE
+            //If the QoS is at_most_once, and max batch size > 1 or -1, then we use CLIENT_ACKNOWLEDGE
+            //We could use CLIENT_ACKNOWLEDGE for both the above but AUTO_ACKNOWLEGE may be slightly more
+            //performant in some implementations that manually acking every time but it really depends
+            //on the implementation.
+            //We could also use local transacted for both the above but don't for the same reasons.
+            
+            //If the QoS is duplicates_ok, we use CLIENT_ACKNOWLEDGE
+            //We could use local transacted, whether one is faster than the other probably depends on the
+            //messaging implementation but there's probably not much in it
+            
+            int ackMode;
+            if (qualityOfServiceMode == QOS_AT_MOST_ONCE && maxBatchSize == 1)
             {
-               //Ack on the last message
-               try
-               {
-                  ((Message)messages.getLast()).acknowledge();
-               }
-               catch (Throwable t)
-               {
-                  //Deal with this
-                  t.printStackTrace();
-               }      
+               ackMode = Session.AUTO_ACKNOWLEDGE;
             }
+            else
+            {
+               ackMode = Session.CLIENT_ACKNOWLEDGE;
+               
+               manualAck = true;
+            }
+            
+            sessSource = connSource.createSession(false, ackMode);
+            
+            sess = sessSource;
          }
+            
+         if (subName == null)
+         {
+            if (selector == null)
+            {
+               consumer = sess.createConsumer(destSource);
+            }
+            else
+            {
+               consumer = sess.createConsumer(destSource, selector, false);
+            }
+         }
+         else
+         {
+            //Durable subscription
+            if (selector == null)
+            {
+               consumer = sess.createDurableSubscriber((Topic)destSource, subName);
+            }
+            else
+            {
+               consumer = sess.createDurableSubscriber((Topic)destSource, subName, selector, false);
+            }
+         }
          
-         //Now send the message(s)   
+         if (qualityOfServiceMode == QOS_ONCE_AND_ONLY_ONCE)
+         {
+            if (trace) { log.trace("Creating XA dest session"); }
             
-         Iterator iter = messages.iterator();
+            //Create an XA sesion for sending to the destination
+            
+            sessDest = ((XAConnection)connDest).createXASession();
+            
+            sess = ((XASession)sessDest).getSession();
+         }
+         else
+         {
+            if (trace) { log.trace("Creating non XA dest session"); }
+            
+            //Create a standard session for sending to the destination
+            
+            //If maxBatchSize == 1 we just create a non transacted session, otherwise we
+            //create a transacted session for the send, since sending the batch in a transaction
+            //is likely to be more efficient than sending messages individually
+            
+            manualCommit = maxBatchSize == 1;
+            
+            sessDest = connDest.createSession(manualCommit, manualCommit ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
+            
+            sess = sessDest;
+         }
          
-         Message msg = null;
+         if (qualityOfServiceMode == QOS_ONCE_AND_ONLY_ONCE)
+         {
+            if (trace) { log.trace("Starting JTA transaction"); }
+            
+            tx = startTx();
+            
+            enlistResources(tx);                  
+         }
          
-         while (iter.hasNext())
+         producer = sess.createProducer(destDest);
+                          
+         consumer.setMessageListener(new SourceListener());
+         
+         connSource.start();
+         
+         return true;
+      }
+      catch (Exception e)
+      {
+         log.warn("Failed to set up connections", e);
+         
+         //If this fails we should attempt to cleanup or we might end up in some weird state
+         
+         cleanup();
+         
+         return false;
+      }
+   }
+   
+   private void cleanup()
+   {
+      if (tx != null)
+      {
+         try
          {
-            msg = (Message)iter.next();
+            delistResources(tx);
+         }
+         catch (Throwable ignore)
+         {
+         } 
+         try
+         {
+            //Terminate the tx
+            tx.rollback();
+         }
+         catch (Throwable ignore)
+         {
+         } 
+      }
+      
+      //Close the old objects
+      try
+      {
+         connSource.close();
+      }
+      catch (Throwable ignore)
+      {            
+      }
+      try
+      {
+         connDest.close();
+      }
+      catch (Throwable ignore)
+      {            
+      }
+   }
+   
+   private void pause(long interval)
+   {
+      long start = System.currentTimeMillis();
+      while (System.currentTimeMillis() - start < failureRetryInterval)
+      {
+         try
+         {
+            Thread.sleep(failureRetryInterval);
+         }
+         catch (InterruptedException ex)
+         {                  
+         }
+      }
+   }
+   
+   private boolean setupJMSObjectsWithRetry()
+   {
+      if (trace) { log.trace("Setting up connections"); }
+      
+      int count = 0;
+      
+      while (true)
+      {
+         boolean ok = setupJMSObjects();
+         
+         if (ok)
+         {
+            return true;
+         }
+         
+         count++;
+         
+         if (maxRetries != -1 && count == maxRetries)
+         {
+            break;
+         }
+         
+         log.warn("Failed to set up connections, will retry after a pause of " + failureRetryInterval);
+         
+         pause(failureRetryInterval);
+      }
+      
+      //If we get here then we exceed maxRetries
+      return false;      
+   }
+    
+   /*
+    * If one of the JMS operations fail, then we try and lookup the connection factories, create
+    * the connections and retry, up to a certain number of times
+    */
+   private void sendBatch() 
+   {
+      if (trace) { log.trace("Sending batch of " + messages.size() + " messages"); }
+        
+      synchronized (lock)
+      {
+         try
+         {
+            if (paused)
+            {
+               //Don't send now
+               if (trace) { log.trace("Paused, so not sending now"); }
+               
+               return;            
+            }
             
-            try
+            if (qualityOfServiceMode == QOS_AT_MOST_ONCE)
             {
+               //We ack before we send
+               if (manualAck)
+               {
+                  //Ack on the last message
+                  ((Message)messages.getLast()).acknowledge();       
+               }
+            }
+            
+            //Now send the message(s)   
+               
+            Iterator iter = messages.iterator();
+            
+            Message msg = null;
+            
+            while (iter.hasNext())
+            {
+               msg = (Message)iter.next();
+               
                if (trace) { log.trace("Sending message " + msg); }
                
                producer.send(msg);
                
-               if (trace) { log.trace("Sent message " + msg); }
+               if (trace) { log.trace("Sent message " + msg); }                    
             }
-            catch (Throwable t)
+            
+            if (qualityOfServiceMode == QOS_DUPLICATES_OK)
             {
-               //Failed to send - deal with retries
-               t.printStackTrace();
-            }                        
-         }
-         
-         if (qualityOfServiceMode == QOS_DUPLICATES_OK)
-         {
-            //We ack the source message(s) after sending
-            
-            if (manualAck)
-            {               
-               try
-               {
+               //We ack the source message(s) after sending
+               
+               if (manualAck)
+               {               
                   //Ack on the last message
                   ((Message)messages.getLast()).acknowledge();
-               }
-               catch (Throwable t)
-               {
-                  //Deal with this
-                  t.printStackTrace();
-               } 
-            }                  
-         }
-         
-         //Now we commit the sending session if necessary
-         if (manualCommit)
-         {
-            try
+               }                  
+            }
+            
+            //Now we commit the sending session if necessary
+            if (manualCommit)
             {
                sessDest.commit();            
             }
-            catch (Throwable t)
+            
+            if (qualityOfServiceMode == QOS_ONCE_AND_ONLY_ONCE)
             {
-               //Deal with this
-               t.printStackTrace();
-            } 
-         }
-                           
-         messages.clear();
-                   
-         if (qualityOfServiceMode == QOS_ONCE_AND_ONLY_ONCE)
-         {
-            //Commit the JTA transaction and start another
-                        
-            //XA
-            try
-            {
+               //Commit the JTA transaction and start another
+                                       
                delistResources(tx);
                   
                if (trace) { log.trace("Committing JTA transaction"); }
@@ -710,13 +818,61 @@
                
                enlistResources(tx);
             }
-            catch (Throwable t)
+            
+            //Clear the messages
+            messages.clear();            
+         }
+         catch (Exception e)
+         {
+            log.warn("Failed to send + acknowledge batch, closing JMS objects", e);
+            
+            /*
+             * If an Exception occurs in attempting to send / ack the batch, this might be due
+             * to a network problem on either the source or destination connection.
+             * If it was on the source connection then the server has probably NACKed the unacked
+             * messages back to the destination anyway.
+             * If the failure occurred during 2PC commit protocol then the participants may or may not
+             * have reached the prepared state, if they do then the tx will commit at some time during
+             * recovery.
+             * 
+             * So we can safely close the dead connections, without fear of stepping outside our
+             * QoS guarantee.
+             * 
+             * 
+             */
+            
+            //Clear the messages
+            messages.clear();
+                        
+            cleanup();
+            
+            boolean ok = false;
+            
+            if (maxRetries > 0 || maxRetries == -1)
             {
-               //Deal with this
-               t.printStackTrace();
+               log.warn("Will try and re-set up connections after a pause of " + failureRetryInterval);
+               
+               pause(this.failureRetryInterval);
+               
+               //Now we try
+               ok = setupJMSObjectsWithRetry();
             }
-         }
-         
+            
+            if (!ok)
+            {
+               //We haven't managed to recreate connections or maxRetries = 0
+               log.warn("Unable to set up connections, bridge will be stopped");
+               
+               try
+               {                  
+                  stop();
+               }
+               catch (Exception ignore)
+               {                  
+               }
+            }
+            
+         }                                                      
       }
    }
    
@@ -781,10 +937,12 @@
             
             messages.add(msg);
             
-            batchExpiryTime = System.currentTimeMillis() + maxBatchTime;
+            batchExpiryTime = System.currentTimeMillis() + maxBatchTime;            
             
             if (trace) { log.trace(this + " rescheduled batchExpiryTime to " + batchExpiryTime); }
             
+            if (trace) { log.trace("max Batch Size is " + maxBatchSize); }
+            
             if (maxBatchSize != -1 && messages.size() >= maxBatchSize)
             {
                if (trace) { log.trace(this + " maxBatchSize has been reached so sending batch"); }

Modified: trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTest.java	2007-01-11 18:13:23 UTC (rev 1961)
+++ trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTest.java	2007-01-11 21:02:31 UTC (rev 1962)
@@ -25,6 +25,7 @@
 
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
@@ -34,6 +35,9 @@
 import javax.naming.InitialContext;
 
 import org.jboss.jms.server.bridge.Bridge;
+import org.jboss.jms.server.bridge.ConnectionFactoryFactory;
+import org.jboss.jms.server.bridge.JNDIConnectionFactoryFactory;
+import org.jboss.logging.Logger;
 import org.jboss.test.messaging.MessagingTestCase;
 import org.jboss.test.messaging.tools.ServerManagement;
 
@@ -48,6 +52,8 @@
  */
 public class BridgeTest extends MessagingTestCase
 {
+   private static final Logger log = Logger.getLogger(BridgeTest.class);
+   
    public BridgeTest(String name)
    {
       super(name);
@@ -63,68 +69,219 @@
       super.tearDown();
    }
    
-   public void testMaxBatchSizeNoMaxBatchTime_AtMostOnce() throws Exception
+   // MaxBatchSize but no MaxBatchTime
+   
+   public void testNoMaxBatchTime_AtMostOnce_P() throws Exception
    {
       if (!ServerManagement.isRemote())
       {
          return;
       }
-      testMaxBatchSizeNoMaxBatchTime(Bridge.QOS_AT_MOST_ONCE);
+      testNoMaxBatchTime(Bridge.QOS_AT_MOST_ONCE, true);
    }
    
-   public void testMaxBatchSizeNoMaxBatchTime_DuplicatesOk() throws Exception
+   public void testNoMaxBatchTime_DuplicatesOk_P() throws Exception
    {
       if (!ServerManagement.isRemote())
       {
          return;
       }
-      testMaxBatchSizeNoMaxBatchTime(Bridge.QOS_DUPLICATES_OK);
+      testNoMaxBatchTime(Bridge.QOS_DUPLICATES_OK, true);
    }
    
-   public void testMaxBatchSizeNoMaxBatchTime_OnceAndOnlyOnce() throws Exception
+   public void testNoMaxBatchTime_OnceAndOnlyOnce_P() throws Exception
    {
       if (!ServerManagement.isRemote())
       {
          return;
       }
-      testMaxBatchSizeNoMaxBatchTime(Bridge.QOS_ONCE_AND_ONLY_ONCE);
+      testNoMaxBatchTime(Bridge.QOS_ONCE_AND_ONLY_ONCE, true);
    }
    
+   public void testNoMaxBatchTime_AtMostOnce_NP() throws Exception
+   {
+      if (!ServerManagement.isRemote())
+      {
+         return;
+      }
+      testNoMaxBatchTime(Bridge.QOS_AT_MOST_ONCE, false);
+   }
    
-   public void testMaxBatchTimeNoMaxBatchSize_AtMostOnce() throws Exception
+   public void testNoMaxBatchTime_DuplicatesOk_NP() throws Exception
    {
       if (!ServerManagement.isRemote())
       {
          return;
       }
-      this.testMaxBatchTimeNoMaxBatchSize(Bridge.QOS_AT_MOST_ONCE);
+      testNoMaxBatchTime(Bridge.QOS_DUPLICATES_OK, false);
    }
    
-   public void testMaxBatchTimeNoMaxBatchSize_DuplicatesOk() throws Exception
+   public void testNoMaxBatchTime_OnceAndOnlyOnce_NP() throws Exception
    {
       if (!ServerManagement.isRemote())
       {
          return;
       }
-      this.testMaxBatchTimeNoMaxBatchSize(Bridge.QOS_DUPLICATES_OK);
+      testNoMaxBatchTime(Bridge.QOS_ONCE_AND_ONLY_ONCE, false);
    }
    
-   public void testMaxBatchTimeNoMaxBatchSize_OnceAndOnlyOnce() throws Exception
+   
+   // MaxBatchTime but no MaxBatchSize
+   
+   public void testMaxBatchTime_AtMostOnce_P() throws Exception
    {
       if (!ServerManagement.isRemote())
       {
          return;
       }
-      testMaxBatchTimeNoMaxBatchSize(Bridge.QOS_ONCE_AND_ONLY_ONCE);
+      this.testMaxBatchTime(Bridge.QOS_AT_MOST_ONCE, true);
    }
+   
+   public void testMaxBatchTime_DuplicatesOk_P() throws Exception
+   {
+      if (!ServerManagement.isRemote())
+      {
+         return;
+      }
+      this.testMaxBatchTime(Bridge.QOS_DUPLICATES_OK, true);
+   }
+   
+   public void testMaxBatchTime_OnceAndOnlyOnce_P() throws Exception
+   {
+      if (!ServerManagement.isRemote())
+      {
+         return;
+      }
+      testMaxBatchTime(Bridge.QOS_ONCE_AND_ONLY_ONCE, true);
+   }
+   
+   public void testMaxBatchTime_AtMostOnce_NP() throws Exception
+   {
+      if (!ServerManagement.isRemote())
+      {
+         return;
+      }
+      this.testMaxBatchTime(Bridge.QOS_AT_MOST_ONCE, false);
+   }
+   
+   public void testMaxBatchTime_DuplicatesOk_NP() throws Exception
+   {
+      if (!ServerManagement.isRemote())
+      {
+         return;
+      }
+      this.testMaxBatchTime(Bridge.QOS_DUPLICATES_OK, false);
+   }
+   
+   public void testMaxBatchTime_OnceAndOnlyOnce_NP() throws Exception
+   {
+      if (!ServerManagement.isRemote())
+      {
+         return;
+      }
+      testMaxBatchTime(Bridge.QOS_ONCE_AND_ONLY_ONCE, false);
+   }
+    
+   
+   // Stress 
+   
+   public void testStress_AtMostOnce_P() throws Exception
+   {
+      if (!ServerManagement.isRemote())
+      {
+         return;
+      }
+      testStress(Bridge.QOS_AT_MOST_ONCE, true);
+   }
+   
+   public void testStress_DuplicatesOk_P() throws Exception
+   {
+      if (!ServerManagement.isRemote())
+      {
+         return;
+      }
+      testStress(Bridge.QOS_DUPLICATES_OK, true);
+   }
+   
+   public void testStress_OnceAndOnlyOnce_P() throws Exception
+   {
+      if (!ServerManagement.isRemote())
+      {
+         return;
+      }
+      testStress(Bridge.QOS_ONCE_AND_ONLY_ONCE, true);
+   }
+   
+   public void testStress_AtMostOnce_NP() throws Exception
+   {
+      if (!ServerManagement.isRemote())
+      {
+         return;
+      }
+      testStress(Bridge.QOS_AT_MOST_ONCE, false);
+   }
+   
+   public void testStress_DuplicatesOk_NP() throws Exception
+   {
+      if (!ServerManagement.isRemote())
+      {
+         return;
+      }
+      testStress(Bridge.QOS_DUPLICATES_OK, false);
+   }
+   
+   public void testStress_OnceAndOnlyOnce_NP() throws Exception
+   {
+      if (!ServerManagement.isRemote())
+      {
+         return;
+      }
+      testStress(Bridge.QOS_ONCE_AND_ONLY_ONCE, false);
+   }
+   
+   
+   
+   private static class Sender implements Runnable
+   {
+      int numMessages;
       
-   private void testMaxBatchSizeNoMaxBatchTime(int qosMode) throws Exception
+      Session sess;
+      
+      MessageProducer prod;
+      
+      Exception ex;
+      
+      public void run()
+      {
+         try
+         {
+            for (int i = 0; i < numMessages; i++)
+            {
+               TextMessage tm = sess.createTextMessage("message" + i);
+                                            
+               prod.send(tm);
+               
+               log.trace("Sent message " + i);
+            }
+         }
+         catch (Exception e)
+         {
+            log.error("Failed to send", e);
+            ex = e;
+         }         
+      }
+      
+   }   
+   
+   private void testStress(int qosMode, boolean persistent) throws Exception
    {
       Connection connSource = null;
       
       Connection connDest = null;
       
       Bridge bridge = null;
+      
+      Thread t = null;
             
       try
       {
@@ -139,7 +296,11 @@
          Hashtable props0 = ServerManagement.getJNDIEnvironment(0);
          
          Hashtable props1 = ServerManagement.getJNDIEnvironment(1);
-               
+         
+         ConnectionFactoryFactory cff0 = new JNDIConnectionFactoryFactory(props0, "/ConnectionFactory");
+         
+         ConnectionFactoryFactory cff1 = new JNDIConnectionFactoryFactory(props1, "/ConnectionFactory");
+                      
          InitialContext ic0 = new InitialContext(props0);
          
          InitialContext ic1 = new InitialContext(props1);
@@ -152,12 +313,169 @@
          
          Queue destQueue = (Queue)ic1.lookup("/queue/destQueue");
          
+         final int BATCH_SIZE = 50;
+         
+         bridge = new Bridge(cff0, cff1, sourceQueue, destQueue,
+                  null, null, null, null,
+                  null, 5000, 10, qosMode,
+                  BATCH_SIZE, -1,
+                  null, null);
+         
+         bridge.start();
+            
+         connSource = cf0.createConnection();
+         
+         connDest = cf1.createConnection();
+         
+         Session sessSend = connSource.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         MessageProducer prod = sessSend.createProducer(sourceQueue);
+         
+         final int NUM_MESSAGES = 2000;
+         
+         Sender sender = new Sender();
+         sender.sess = sessSend;
+         sender.prod = prod;
+         sender.numMessages = NUM_MESSAGES;
+         prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+                          
+         Session sessRec = connDest.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         MessageConsumer cons = sessRec.createConsumer(destQueue);
+         
+         connDest.start();
+         
+         t = new Thread(sender);
+         
+         t.start();
+                 
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons.receive(5000);
+            
+            assertNotNull(tm);
+            
+            assertEquals("message" + i, tm.getText());
+         }
+         
+         Message m = cons.receive(1000);
+         
+         assertNull(m);
+         
+         t.join();
+         
+         if (sender.ex != null)
+         {
+            //An error occurred during the send
+            throw sender.ex;
+         }
+           
+      }
+      finally
+      {    
+         if (t != null)
+         {
+            t.join(10000);
+         }
+         
+         if (connSource != null)
+         {
+            try
+            {
+               connSource.close();
+            }
+            catch (Exception e)
+            {
+               log.error("Failed to close connection", e);
+            }
+         }
+         
+         if (connDest != null)
+         {
+            try
+            {
+               connDest.close();
+            }
+            catch (Exception e)
+            {
+              log.error("Failed to close connection", e);
+            }
+         }
+         
+         if (bridge != null)
+         {
+            bridge.stop();
+         }
+         
+         try
+         {
+            ServerManagement.undeployQueue("sourceQueue", 0);
+         }
+         catch (Exception e)
+         {
+            log.error("Failed to undeploy", e);
+         }
+         
+         try
+         {
+            ServerManagement.undeployQueue("destQueue", 1);
+         }
+         catch (Exception e)
+         {
+            log.error("Failed to undeploy", e);
+         }
+         
+         ServerManagement.stop(0);
+         
+         ServerManagement.stop(1);
+      }      
+   }
+   
+      
+   private void testNoMaxBatchTime(int qosMode, boolean persistent) throws Exception
+   {
+      Connection connSource = null;
+      
+      Connection connDest = null;
+      
+      Bridge bridge = null;
+            
+      try
+      {
+         ServerManagement.start(0, "all", null, true);
+         
+         ServerManagement.start(1, "all", null, false);
+         
+         ServerManagement.deployQueue("sourceQueue", 0);
+         
+         ServerManagement.deployQueue("destQueue", 1);
+         
+         Hashtable props0 = ServerManagement.getJNDIEnvironment(0);
+         
+         Hashtable props1 = ServerManagement.getJNDIEnvironment(1);
+         
+         ConnectionFactoryFactory cff0 = new JNDIConnectionFactoryFactory(props0, "/ConnectionFactory");
+         
+         ConnectionFactoryFactory cff1 = new JNDIConnectionFactoryFactory(props1, "/ConnectionFactory");
+                      
+         InitialContext ic0 = new InitialContext(props0);
+         
+         InitialContext ic1 = new InitialContext(props1);
+         
+         ConnectionFactory cf0 = (ConnectionFactory)ic0.lookup("/ConnectionFactory");
+         
+         ConnectionFactory cf1 = (ConnectionFactory)ic1.lookup("/ConnectionFactory");
+         
+         Queue sourceQueue = (Queue)ic0.lookup("/queue/sourceQueue");
+         
+         Queue destQueue = (Queue)ic1.lookup("/queue/destQueue");
+         
          final int BATCH_SIZE = 10;
          
-         bridge = new Bridge(cf0, cf1, sourceQueue, destQueue,
+         bridge = new Bridge(cff0, cff1, sourceQueue, destQueue,
                   null, null, null, null,
-                  null, 0, qosMode,
-                  10, -1,
+                  null, 5000, 10, qosMode,
+                  BATCH_SIZE, -1,
                   null, null);
          
          bridge.start();
@@ -170,6 +488,8 @@
          
          MessageProducer prod = sessSend.createProducer(sourceQueue);
          
+         prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);        
+         
          //Send half the messges
 
          for (int i = 0; i < BATCH_SIZE / 2; i++)
@@ -276,12 +596,7 @@
          
          m = cons2.receive(1000);
          
-         assertNull(m);
-         
-         connSource.close();
-         
-         connDest.close();
-                  
+         assertNull(m);          
       }
       finally
       {      
@@ -338,7 +653,7 @@
       }                  
    }
    
-   private void testMaxBatchTimeNoMaxBatchSize(int qosMode) throws Exception
+   private void testMaxBatchTime(int qosMode, boolean persistent) throws Exception
    {
       Connection connSource = null;
       
@@ -359,6 +674,10 @@
          Hashtable props0 = ServerManagement.getJNDIEnvironment(0);
          
          Hashtable props1 = ServerManagement.getJNDIEnvironment(1);
+         
+         ConnectionFactoryFactory cff0 = new JNDIConnectionFactoryFactory(props0, "/ConnectionFactory");
+         
+         ConnectionFactoryFactory cff1 = new JNDIConnectionFactoryFactory(props1, "/ConnectionFactory");
                
          InitialContext ic0 = new InitialContext(props0);
          
@@ -374,10 +693,12 @@
          
          final long MAX_BATCH_TIME = 3000;
          
-         bridge = new Bridge(cf0, cf1, sourceQueue, destQueue,
+         final int MAX_BATCH_SIZE = 100000; // something big so it won't reach it
+         
+         bridge = new Bridge(cff0, cff1, sourceQueue, destQueue,
                   null, null, null, null,
-                  null, 0, qosMode,
-                  -1, MAX_BATCH_TIME,
+                  null, 5000, 10, qosMode,
+                  MAX_BATCH_SIZE, MAX_BATCH_TIME,
                   null, null);
          
          bridge.start();
@@ -390,6 +711,8 @@
          
          MessageProducer prod = sessSend.createProducer(sourceQueue);
          
+         prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);                          
+         
          final int NUM_MESSAGES = 10;
          
          //Send some message
@@ -442,10 +765,6 @@
          
          assertNull(m);
          
-         connSource.close();
-         
-         connDest.close();
-                  
       }
       finally
       {      




More information about the jboss-cvs-commits mailing list