[jboss-cvs] JBoss Messaging SVN: r1952 - in trunk: tests/src/org/jboss/test/messaging/jms/bridge and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Jan 10 15:51:06 EST 2007


Author: timfox
Date: 2007-01-10 15:51:02 -0500 (Wed, 10 Jan 2007)
New Revision: 1952

Modified:
   trunk/src/main/org/jboss/jms/server/bridge/Bridge.java
   trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTest.java
Log:
More bridge work



Modified: trunk/src/main/org/jboss/jms/server/bridge/Bridge.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/bridge/Bridge.java	2007-01-10 20:03:51 UTC (rev 1951)
+++ trunk/src/main/org/jboss/jms/server/bridge/Bridge.java	2007-01-10 20:51:02 UTC (rev 1952)
@@ -625,7 +625,7 @@
             
             if (messages.size() >= maxBatchSize)
             {
-               if (trace) { log.trace(this + " maxBatchSizew has been reached so sending batch"); }
+               if (trace) { log.trace(this + " maxBatchSize has been reached so sending batch"); }
                
                sendBatch();
                

Modified: trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTest.java	2007-01-10 20:03:51 UTC (rev 1951)
+++ trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTest.java	2007-01-10 20:51:02 UTC (rev 1952)
@@ -64,95 +64,234 @@
       super.tearDown();
    }
    
-   public void testBridge() throws Exception
+   public void testMaxBatchSizeNoMaxBatchTimeTransacted() throws Exception
    {
-      ServerManagement.start(0, "all", null, true);
-      
-      ServerManagement.start(1, "all", null, false);
-      
-      Hashtable props0 = ServerManagement.getJNDIEnvironment(0);
-      
-      Hashtable props1 = ServerManagement.getJNDIEnvironment(1);
-      
-      ServerManagement.deployQueue("sourceQueue", 0);
-      
-      ServerManagement.deployQueue("destQueue", 1);
-      
-      InitialContext ic0 = new InitialContext(props0);
-      
-      InitialContext ic1 = new InitialContext(props1);
-      
-      ConnectionFactory cf0 = (ConnectionFactory)ic0.lookup("/ConnectionFactory");
-      
-      ConnectionFactory cf1 = (ConnectionFactory)ic1.lookup("/ConnectionFactory");
-      
-      Queue sourceQueue = (Queue)ic0.lookup("/queue/sourceQueue");
-      
-      Queue destQueue = (Queue)ic1.lookup("/queue/destQueue");
-      
-      
-      Bridge bridge = new Bridge(props0, props1, "/ConnectionFactory", "/ConnectionFactory",
-               "/queue/sourceQueue", "/queue/destQueue", null, null, null, null,
-               null, 0, false,
-               false, 10, -1,
-               null, null,
-               false, false);
-      
-      bridge.start();
-      
-
-      Connection connSource = cf0.createConnection();
-      
-      Connection connDest = cf1.createConnection();
-      
-      Session sessSend = connSource.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      
-      MessageProducer prod = sessSend.createProducer(sourceQueue);
-      
-      final int NUM_MESSAGES = 10;
-      
-      for (int i = 0; i < NUM_MESSAGES; i++)
+      if (!ServerManagement.isRemote())
       {
-         TextMessage tm = sessSend.createTextMessage("message" + i);
-         
-         prod.send(tm);
+         return;
       }
+      testMaxBatchSizeNoMaxBatchTime(true);
+   }
+   
+   public void testMaxBatchSizeNoMaxBatchTimeNonTransacted() throws Exception
+   {
+      if (!ServerManagement.isRemote())
+      {
+         return;
+      }
+      testMaxBatchSizeNoMaxBatchTime(false);
+   }
+   
+   private void testMaxBatchSizeNoMaxBatchTime(boolean transacted) throws Exception
+   {
+      Connection connSource = null;
       
-      Session sessRec = connDest.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      Connection connDest = null;
       
-      MessageConsumer cons = sessRec.createConsumer(destQueue);
-      
-      connDest.start();
-      
-      for (int i = 0; i < NUM_MESSAGES; i++)
+      Bridge bridge = null;
+            
+      try
       {
+         ServerManagement.start(0, "all", null, true);
+         
+         ServerManagement.start(1, "all", null, false);
+         
+         ServerManagement.deployQueue("sourceQueue", 0);
+         
+         ServerManagement.deployQueue("destQueue", 1);
+         
+         Hashtable props0 = ServerManagement.getJNDIEnvironment(0);
+         
+         Hashtable props1 = ServerManagement.getJNDIEnvironment(1);
+               
+         InitialContext ic0 = new InitialContext(props0);
+         
+         InitialContext ic1 = new InitialContext(props1);
+         
+         ConnectionFactory cf0 = (ConnectionFactory)ic0.lookup("/ConnectionFactory");
+         
+         ConnectionFactory cf1 = (ConnectionFactory)ic1.lookup("/ConnectionFactory");
+         
+         Queue sourceQueue = (Queue)ic0.lookup("/queue/sourceQueue");
+         
+         Queue destQueue = (Queue)ic1.lookup("/queue/destQueue");
+         
+         final int BATCH_SIZE = 10;
+         
+         bridge = new Bridge(props0, props1, "/ConnectionFactory", "/ConnectionFactory",
+                  "/queue/sourceQueue", "/queue/destQueue", null, null, null, null,
+                  null, 0, false,
+                  false, 10, -1,
+                  null, null,
+                  false, false);
+         
+         bridge.start();
+            
+         connSource = cf0.createConnection();
+         
+         connDest = cf1.createConnection();
+         
+         Session sessSend = connSource.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         MessageProducer prod = sessSend.createProducer(sourceQueue);
+         
+         //Send half the messges
+
+         for (int i = 0; i < BATCH_SIZE / 2; i++)
+         {
+            TextMessage tm = sessSend.createTextMessage("message" + i);
+            
+            prod.send(tm);
+         }
+         
+         Session sessRec = connDest.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         MessageConsumer cons = sessRec.createConsumer(destQueue);
+         
+         connDest.start();
+         
+         //Verify none are received
+         
+         Message m = cons.receive(2000);
+         
+         assertNull(m);
+         
+         //Send the other half
+         
+         for (int i = BATCH_SIZE / 2; i < BATCH_SIZE; i++)
+         {
+            TextMessage tm = sessSend.createTextMessage("message" + i);
+            
+            prod.send(tm);
+         }
+         
+         //This should now be receivable
+         
+         for (int i = 0; i < BATCH_SIZE; i++)
+         {
+            TextMessage tm = (TextMessage)cons.receive(1000);
+            
+            assertNotNull(tm);
+            
+            assertEquals("message" + i, tm.getText());
+         }
+         
+         //Send another batch with one more than batch size
+         
+         for (int i = 0; i < BATCH_SIZE + 1; i++)
+         {
+            TextMessage tm = sessSend.createTextMessage("message" + i);
+            
+            prod.send(tm);
+         }
+         
+         //Make sure only batch size are received
+         
+         for (int i = 0; i < BATCH_SIZE; i++)
+         {
+            TextMessage tm = (TextMessage)cons.receive(1000);
+            
+            assertNotNull(tm);
+            
+            assertEquals("message" + i, tm.getText());
+         }
+         
+         m = cons.receive(2000);
+         
+         assertNull(m);
+         
+         //Final batch
+         
+         for (int i = 0; i < BATCH_SIZE - 1; i++)
+         {
+            TextMessage tm = sessSend.createTextMessage("message" + i);
+            
+            prod.send(tm);
+         }
+         
          TextMessage tm = (TextMessage)cons.receive(1000);
          
          assertNotNull(tm);
          
-         assertEquals("message" + i, tm.getText());
+         assertEquals("message" + BATCH_SIZE, tm.getText());
+         
+         for (int i = 0; i < BATCH_SIZE - 1; i++)
+         {
+            tm = (TextMessage)cons.receive(1000);
+            
+            assertNotNull(tm);
+            
+            assertEquals("message" + i, tm.getText());
+         }
+         
+         
+         //Make sure no messages are left in the source dest
+         
+         MessageConsumer cons2 = sessSend.createConsumer(sourceQueue);
+         
+         connSource.start();
+         
+         m = cons2.receive(1000);
+         
+         assertNull(m);
+         
+         connSource.close();
+         
+         connDest.close();
+                  
       }
-      
-      //Make sure no messages are left in the source dest
-      
-      MessageConsumer cons2 = sessSend.createConsumer(sourceQueue);
-      
-      connSource.start();
-      
-      Message m = cons2.receive(1000);
-      
-      assertNull(m);
-      
-      connSource.close();
-      
-      connDest.close();
-      
-      bridge.stop();
-      
-      ServerManagement.stop(0);
-      
-      ServerManagement.stop(1);
-      
+      finally
+      {      
+         if (connSource != null)
+         {
+            try
+            {
+               connSource.close();
+            }
+            catch (Exception e)
+            {
+               log.error("Failed to close connection", e);
+            }
+         }
+         
+         if (connDest != null)
+         {
+            try
+            {
+               connDest.close();
+            }
+            catch (Exception e)
+            {
+              log.error("Failed to close connection", e);
+            }
+         }
+         
+         if (bridge != null)
+         {
+            bridge.stop();
+         }
+         
+         try
+         {
+            ServerManagement.undeployQueue("sourceQueue", 0);
+         }
+         catch (Exception e)
+         {
+            log.error("Failed to undeploy", e);
+         }
+         
+         try
+         {
+            ServerManagement.undeployQueue("destQueue", 1);
+         }
+         catch (Exception e)
+         {
+            log.error("Failed to undeploy", e);
+         }
+         
+         ServerManagement.stop(0);
+         
+         ServerManagement.stop(1);
+      }                  
    }
-
 }




More information about the jboss-cvs-commits mailing list