[jboss-cvs] JBoss Messaging SVN: r3180 - in trunk: tests/src/org/jboss/test/messaging/jms/bridge and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Oct 8 07:58:49 EDT 2007


Author: timfox
Date: 2007-10-08 07:58:49 -0400 (Mon, 08 Oct 2007)
New Revision: 3180

Modified:
   trunk/src/main/org/jboss/jms/server/bridge/Bridge.java
   trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTest.java
Log:
http://jira.jboss.com/jira/browse/JBMESSAGING-1098


Modified: trunk/src/main/org/jboss/jms/server/bridge/Bridge.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/bridge/Bridge.java	2007-10-08 02:43:15 UTC (rev 3179)
+++ trunk/src/main/org/jboss/jms/server/bridge/Bridge.java	2007-10-08 11:58:49 UTC (rev 3180)
@@ -1041,7 +1041,7 @@
             enlistResources(tx);                  
          }
          
-         producer = sess.createProducer(targetDestination);
+         producer = sess.createProducer(null);
                           
          consumer.setMessageListener(new SourceListener());
          
@@ -1152,10 +1152,6 @@
       return false;      
    }
     
-   /*
-    * If one of the JMS operations fail, then we try and lookup the connection factories, create
-    * the connections and retry, up to a certain number of times
-    */
    private void sendBatch() 
    {
       if (trace) { log.trace("Sending batch of " + messages.size() + " messages"); }
@@ -1197,8 +1193,20 @@
             
             if (trace) { log.trace("Sending message " + msg); }
             
-            producer.send(msg);
+            long timeToLive = msg.getJMSExpiration();
             
+   			if (timeToLive != 0)
+   			{
+   				timeToLive -=  System.currentTimeMillis();
+   				
+   				if (timeToLive <= 0)
+   				{
+   					timeToLive = 1; //Should have already expired - set to 1 so it expires when it is consumed or delivered
+   				}
+   			}
+            
+   			producer.send(targetDestination, msg, msg.getJMSDeliveryMode(), msg.getJMSPriority(), timeToLive);
+   			
             if (trace) { log.trace("Sent message " + msg); }                    
          }
          

Modified: trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTest.java	2007-10-08 02:43:15 UTC (rev 3179)
+++ trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTest.java	2007-10-08 11:58:49 UTC (rev 3180)
@@ -720,7 +720,17 @@
       }                  
    }
       
-   public void testMessageIDInHeader() throws Exception
+   public void testMessageIDInHeaderOn() throws Exception
+   {
+   	messageIDInHeader(true);
+   }
+   
+   public void testMessageIDInHeaderOff() throws Exception
+   {
+   	messageIDInHeader(false);
+   }
+   
+   private void messageIDInHeader(boolean on) throws Exception
    { 
       Bridge bridge = null;
       
@@ -736,7 +746,7 @@
                   null, null, null, null,
                   null, 5000, 10, Bridge.QOS_AT_MOST_ONCE,
                   1, -1,
-                  null, null, true);
+                  null, null, on);
          
          bridge.start();
          
@@ -796,54 +806,199 @@
             assertEquals("mygroup543", tm.getStringProperty("JMSXGroupID"));
             assertEquals(777, tm.getIntProperty("JMSXGroupSeq"));
             
-            String header = tm.getStringProperty(JBossMessage.JBOSS_MESSAGING_BRIDGE_MESSAGE_ID_LIST);
-            
-            assertNotNull(header);
-            
-            assertEquals(ids1.get(i), header);
-            
-            msgs.add(tm);
+            if (on)
+            {            
+	            String header = tm.getStringProperty(JBossMessage.JBOSS_MESSAGING_BRIDGE_MESSAGE_ID_LIST);
+	            
+	            assertNotNull(header);
+	            
+	            assertEquals(ids1.get(i), header);
+	            
+	            msgs.add(tm);
+            }
          }
          
-         //Now we send them again back to the source
+         if (on)
+         {	         
+	         //Now we send them again back to the source
+	         
+	         Iterator iter = msgs.iterator();
+	         
+	         List ids2 = new ArrayList();
+	         
+	         while (iter.hasNext())
+	         {
+	         	Message msg = (Message)iter.next();
+	         	
+	         	prod.send(msg);
+	            
+	            ids2.add(msg.getJMSMessageID());
+	         }
+	                               
+	         //And consume them again
+	         
+	         for (int i = 0; i < NUM_MESSAGES; i++)
+	         {
+	            TextMessage tm = (TextMessage)cons.receive(5000);
+	            
+	            assertNotNull(tm);
+	            
+	            assertEquals("message" + i, tm.getText());
+	            
+	            assertEquals("uhuh", tm.getStringProperty("wib"));
+	            assertTrue(tm.getBooleanProperty("cheese"));
+	            assertEquals(23, tm.getIntProperty("Sausages"));
+	            
+	            assertEquals("mygroup543", tm.getStringProperty("JMSXGroupID"));
+	            assertEquals(777, tm.getIntProperty("JMSXGroupSeq"));            
+	            
+	            String header = tm.getStringProperty(JBossMessage.JBOSS_MESSAGING_BRIDGE_MESSAGE_ID_LIST);
+	            
+	            assertNotNull(header);
+	            
+	            assertEquals(ids1.get(i) + "," + ids2.get(i), header);
+	         }
+         }
          
-         Iterator iter = msgs.iterator();
+      }
+      finally
+      {                        
+         if (bridge != null)
+         {
+            bridge.stop();
+         }
          
-         List ids2 = new ArrayList();
-         
-         while (iter.hasNext())
+         if (connSource != null)
          {
-         	Message msg = (Message)iter.next();
-         	
-         	prod.send(msg);
-            
-            ids2.add(msg.getJMSMessageID());
+            connSource.close();
          }
-                               
-         //And consume them again
          
-         for (int i = 0; i < NUM_MESSAGES; i++)
+         if (connTarget != null)
          {
-            TextMessage tm = (TextMessage)cons.receive(5000);
+            connTarget.close();
+         }
+      }                  
+   }
+   
+   
+   public void testPropertiesPreservedPOn() throws Exception
+   {
+   	propertiesPreserved(true, true);
+   }
+   
+   public void testPropertiesPreservedNPoff() throws Exception
+   {
+   	propertiesPreserved(false, true);
+   }
+   
+   public void testPropertiesPreservedNPOn() throws Exception
+   {
+   	propertiesPreserved(false, true);
+   }
+   
+   public void testPropertiesPreservedPoff() throws Exception
+   {
+   	propertiesPreserved(true, true);
+   }
+   
+   private void propertiesPreserved(boolean persistent, boolean messageIDInHeader) throws Exception
+   { 
+      Bridge bridge = null;
+      
+      Connection connSource = null;
+      
+      Connection connTarget = null;
             
-            assertNotNull(tm);
-            
-            assertEquals("message" + i, tm.getText());
-            
-            assertEquals("uhuh", tm.getStringProperty("wib"));
-            assertTrue(tm.getBooleanProperty("cheese"));
-            assertEquals(23, tm.getIntProperty("Sausages"));
-            
-            assertEquals("mygroup543", tm.getStringProperty("JMSXGroupID"));
-            assertEquals(777, tm.getIntProperty("JMSXGroupSeq"));            
-            
-            String header = tm.getStringProperty(JBossMessage.JBOSS_MESSAGING_BRIDGE_MESSAGE_ID_LIST);
-            
-            assertNotNull(header);
-            
-            assertEquals(ids1.get(i) + "," + ids2.get(i), header);
-         }
+      try
+      {
+         final int NUM_MESSAGES = 10;
          
+         bridge = new Bridge(cff0, cff1, sourceQueue, destQueue,
+                  null, null, null, null,
+                  null, 5000, 10, Bridge.QOS_AT_MOST_ONCE,
+                  1, -1,
+                  null, null, messageIDInHeader);
+         
+         bridge.start();
+         
+         connSource = cf0.createConnection();
+         
+         connTarget = cf1.createConnection();
+                    
+         log.trace("Sending " + NUM_MESSAGES + " messages");
+         
+         Session sessSource = connSource.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         Session sessTarget = connTarget.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         MessageConsumer cons = sessTarget.createConsumer(destQueue);
+         
+         connTarget.start();
+         
+         MessageProducer prod = sessSource.createProducer(sourceQueue);
+         
+         prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);  
+         
+         
+         
+         TextMessage tm = sessSource.createTextMessage("blahmessage");
+         
+         prod.setPriority(7);
+         
+         prod.setTimeToLive(1 * 60 * 60 * 1000);
+
+         prod.send(tm);
+         
+         long expiration = tm.getJMSExpiration();
+         
+         assertEquals(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT, tm.getJMSDeliveryMode());
+                                 
+         tm = (TextMessage)cons.receive(1000);
+         
+         assertNotNull(tm);
+         
+         assertEquals("blahmessage", tm.getText());
+
+         assertEquals(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT, tm.getJMSDeliveryMode());
+         
+         assertEquals(7, tm.getJMSPriority());
+        
+         assertTrue(Math.abs(expiration - tm.getJMSExpiration()) < 100);
+                  
+         Message m = cons.receive(5000);
+         
+         assertNull(m);
+         
+         
+         //Now do one with expiration = 0
+         
+         
+         tm = sessSource.createTextMessage("blahmessage2");
+         
+         prod.setPriority(7);
+         
+         prod.setTimeToLive(0);
+
+         prod.send(tm);
+         
+         assertEquals(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT, tm.getJMSDeliveryMode());
+                                 
+         tm = (TextMessage)cons.receive(1000);
+         
+         assertNotNull(tm);
+         
+         assertEquals("blahmessage2", tm.getText());
+
+         assertEquals(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT, tm.getJMSDeliveryMode());
+         
+         assertEquals(7, tm.getJMSPriority());
+        
+         assertEquals(0, tm.getJMSExpiration());
+                  
+         m = cons.receive(5000);
+         
+         assertNull(m);                          
+         
       }
       finally
       {                        




More information about the jboss-cvs-commits mailing list