[jboss-cvs] JBoss Messaging SVN: r3608 - in branches/Branch_JBossMessaging_1_4_0_SP3_CP: src/main/org/jboss/jms/message and 2 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Jan 21 17:22:08 EST 2008


Author: clebert.suconic at jboss.com
Date: 2008-01-21 17:22:08 -0500 (Mon, 21 Jan 2008)
New Revision: 3608

Modified:
   branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/jms/client/container/ClientConsumer.java
   branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/jms/message/JBossMessage.java
   branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
   branches/Branch_JBossMessaging_1_4_0_SP3_CP/tests/src/org/jboss/test/messaging/jms/clustering/DistributedTopicTest.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-1216 - fix on Destination Topics transfered through MessageSucker

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/jms/client/container/ClientConsumer.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/jms/client/container/ClientConsumer.java	2008-01-21 21:34:07 UTC (rev 3607)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/jms/client/container/ClientConsumer.java	2008-01-21 22:22:08 UTC (rev 3608)
@@ -35,6 +35,7 @@
 import org.jboss.jms.delegate.DefaultCancel;
 import org.jboss.jms.delegate.DeliveryInfo;
 import org.jboss.jms.delegate.SessionDelegate;
+import org.jboss.jms.destination.JBossDestination;
 import org.jboss.jms.message.JBossMessage;
 import org.jboss.jms.message.MessageProxy;
 import org.jboss.jms.wireformat.ClientDelivery;
@@ -269,10 +270,18 @@
    {
       ClientDelivery del = (ClientDelivery)message;
       
-      Message msg = del.getMessage();
+      JBossMessage msg = (JBossMessage)del.getMessage();
       
       MessageProxy proxy = JBossMessage.
          createThinDelegate(del.getDeliveryId(), (JBossMessage)msg, del.getDeliveryCount());
+      
+      JBossDestination dest =(JBossDestination) proxy.getJMSDestination();
+      
+      // If the message received is a direct destination (MessageSucker), and this Consumer is not a MessageSucker.. then we need to replace the destination
+      if (dest.isDirect() && !this.consumerDelegate.getDestination().isDirect())
+      {
+         proxy.setJMSDestination(msg.getOriginalSuckerDestination());
+      }
 
       //TODO - we temporarily need to execute on a different thread to avoid a deadlock situation in
       //       failover where a message is sent then the valve is locked, and the message send cause

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/jms/message/JBossMessage.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/jms/message/JBossMessage.java	2008-01-21 21:34:07 UTC (rev 3607)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/jms/message/JBossMessage.java	2008-01-21 22:22:08 UTC (rev 3608)
@@ -103,6 +103,9 @@
    //Used when sending a message to the DLQ
    public static final String JBOSS_MESSAGING_ORIG_MESSAGE_ID = "JBM_ORIG_MESSAGE_ID";
    
+   // Used to cache messages when transfered through MessageSucker
+   public static final String JBOSS_MESSAGING_ORIG_DESTINATION_SUCKER = "JBM_ORIG_DESTINATION_SUCKER";
+   
    //Used when sending a mesage to the DLQ
    public static final String JBOSS_MESSAGING_ACTUAL_EXPIRY_TIME = "JBM_ACTUAL_EXPIRY";
    
@@ -490,6 +493,16 @@
       
       return headers;
    }
+   
+   public Destination getOriginalSuckerDestination()
+   {
+      return (Destination)headers.get(JBOSS_MESSAGING_ORIG_DESTINATION_SUCKER);
+   }
+   
+   public void setOriginalSuckerDestination(Destination destination)
+   {
+      headers.put(JBOSS_MESSAGING_ORIG_DESTINATION_SUCKER, destination);
+   }
 
    public int getJMSDeliveryMode() throws JMSException
    {

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java	2008-01-21 21:34:07 UTC (rev 3607)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java	2008-01-21 22:22:08 UTC (rev 3608)
@@ -22,6 +22,7 @@
 
 package org.jboss.messaging.core.impl.clusterconnection;
 
+import javax.jms.Destination;
 import javax.jms.Message;
 import javax.jms.MessageListener;
 import javax.jms.Session;
@@ -37,6 +38,7 @@
 import org.jboss.jms.delegate.SessionDelegate;
 import org.jboss.jms.destination.JBossDestination;
 import org.jboss.jms.destination.JBossQueue;
+import org.jboss.jms.message.JBossMessage;
 import org.jboss.jms.message.MessageProxy;
 import org.jboss.logging.Logger;
 import org.jboss.messaging.core.contract.Queue;
@@ -247,7 +249,6 @@
 	{
 		Transaction tx = null;
 		
-		if (trace) { log.trace(this + " sucked message " + msg); }
 		
 		try
 		{
@@ -319,7 +320,11 @@
 			}
 			*/
 
-         org.jboss.messaging.core.contract.Message coreMessage = ((MessageProxy)msg).getMessage();
+	      if (trace) { log.trace(this + " sucked message " + msg + " JMSDestination - " + msg.getJMSDestination()); }
+
+	      Destination originalDestination = msg.getJMSDestination();
+	      
+	      org.jboss.messaging.core.contract.Message coreMessage = ((MessageProxy)msg).getMessage();
                   
          if (preserveOrdering)
          {
@@ -344,8 +349,10 @@
          //First we ack it - this ack only occurs in memory even if it is a persistent message
          msg.acknowledge();
          
-         if (trace) { log.trace("Acknowledged message"); }     
+         if (trace) { log.trace("Acknowledged message"); }
          
+         coreMessage.getHeaders().put(JBossMessage.JBOSS_MESSAGING_ORIG_DESTINATION_SUCKER, originalDestination);
+         
          //Then we send - this causes the ref to be moved (SQL UPDATE) in the database        
          producer.send(null, msg, msg.getJMSDeliveryMode(), msg.getJMSPriority(), timeToLive, true);
          

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP/tests/src/org/jboss/test/messaging/jms/clustering/DistributedTopicTest.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP/tests/src/org/jboss/test/messaging/jms/clustering/DistributedTopicTest.java	2008-01-21 21:34:07 UTC (rev 3607)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP/tests/src/org/jboss/test/messaging/jms/clustering/DistributedTopicTest.java	2008-01-21 22:22:08 UTC (rev 3608)
@@ -23,9 +23,12 @@
 
 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
+import javax.jms.Destination;
 import javax.jms.InvalidDestinationException;
+import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
@@ -247,6 +250,140 @@
       }
    }
    
+   // http://jira.jboss.org/jira/browse/JBMESSAGING-1216
+   public void testDestinationTypeOnMessage() throws Exception
+   {
+      Connection connection0 = null;
+      Connection connection1 = null;
+      Connection connection2 = null;
+      
+      Session session0 = null;
+      Session session1 = null;
+      Session session2 = null;
+
+      try
+      {
+         connection0 = createConnectionOnServer(cf, 0);
+         connection1 = createConnectionOnServer(cf, 1);
+         connection2 = createConnectionOnServer(cf, 1);
+
+         connection0.setClientID("aCustomer");
+         connection1.setClientID("aCustomer");
+         connection2.setClientID("aCustomer");
+
+
+         session0 = connection0.createSession(true,
+               Session.AUTO_ACKNOWLEDGE);
+
+         MessageConsumer subscriber0 = session0.createDurableSubscriber(
+               topic[0], "sub");
+         TestListener messageListener0 = new TestListener();
+         subscriber0.setMessageListener(messageListener0);
+
+         MessageProducer publisher = session0.createProducer(topic[0]);
+
+         session1 = connection1.createSession(true,
+               Session.AUTO_ACKNOWLEDGE);
+
+         MessageConsumer subscriber1 = session1.createDurableSubscriber(
+               topic[0], "sub2");
+         
+         
+         TestListener messageListener1 = new TestListener();
+         subscriber1.setMessageListener(messageListener1);
+
+         session2 = connection2.createSession(true, Session.AUTO_ACKNOWLEDGE);
+         MessageConsumer subscriber2 = session2.createDurableSubscriber(topic[0], "sub3");
+         TestListener messageListener2 = new TestListener();
+         subscriber2.setMessageListener(messageListener2);
+
+
+         connection0.start();
+         connection1.start();
+         connection2.start();
+
+
+         TextMessage message = session0.createTextMessage("Hello!");
+         publisher.send(message);
+         session0.commit();
+
+         Message m0 = messageListener0.waitForMessage();
+
+         message = (TextMessage) m0;
+         assertTrue(m0.getJMSDestination() instanceof Topic);
+         
+         Topic topicDest = (Topic)m0.getJMSDestination();
+         assertEquals(topic[0].getTopicName(), topicDest.getTopicName());
+         assertEquals("Hello!", message.getText());
+
+         Message m1 = messageListener1.waitForMessage();
+         message = (TextMessage) m1;
+         Destination d1 = m1.getJMSDestination();
+         assertTrue(m1.getJMSDestination() instanceof Topic);
+         topicDest = (Topic)m1.getJMSDestination();
+         log.info("******** topicName = " + topicDest.getTopicName());
+         assertEquals(topic[1].getTopicName(), topicDest.getTopicName());
+
+         assertEquals("Hello!", message.getText());
+         
+         
+         Message m2 = messageListener2.waitForMessage();
+         message = (TextMessage) m2;
+         Destination d2 = m2.getJMSDestination();
+         assertTrue(d2 instanceof Topic);
+         assertEquals(topic[0].getTopicName(),((Topic)d2).getTopicName());
+         
+         session0.commit();
+         session1.commit();
+         session2.commit();
+         
+         subscriber0.close();
+         subscriber1.close();
+         subscriber2.close();
+         
+         session0.unsubscribe("sub");
+         session1.unsubscribe("sub2");
+         session2.unsubscribe("sub3");
+
+
+      } finally
+      {
+         try
+         {
+            if (connection0 != null)
+            {
+               connection0.close();
+            }
+         } catch (JMSException e)
+         {
+            throw e;
+         }
+
+         try
+         {
+            if (connection1 != null)
+            {
+               connection1.close();
+            }
+         } catch (JMSException e)
+         {
+            throw e;
+         }
+
+         try
+         {
+            if (connection2 != null)
+            {
+               connection2.close();
+            }
+         } catch (JMSException e)
+         {
+            throw e;
+         }
+
+      }
+   }
+   
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
@@ -1683,5 +1820,65 @@
   
    // Inner classes -------------------------------------------------
    
+   
+   public class TestListener implements MessageListener
+   {
+      // Constants -----------------------------------------------------
 
+      // Static --------------------------------------------------------
+
+      // Attributes ----------------------------------------------------
+
+      private Message message;
+
+      // Constructors --------------------------------------------------
+
+      public TestListener()
+      {
+      }
+
+      // MessageListener implementation --------------------------------
+
+      public synchronized void onMessage(Message message)
+      {
+         this.message = message;
+         notifyAll();
+      }
+
+      // Public --------------------------------------------------------
+
+      public synchronized Message waitForMessage()
+      {
+         if (message != null)
+         {
+            return message;
+         }
+         else
+         {
+   
+            try
+            {
+               wait(5000);
+            }
+            catch(InterruptedException e)
+            {
+               // OK
+            }
+   
+            return message;
+         }
+
+      }
+
+      // Package protected ---------------------------------------------
+
+      // Protected -----------------------------------------------------
+
+      // Private -------------------------------------------------------
+
+      // Inner classes -------------------------------------------------
+
+   }
+
+
 }




More information about the jboss-cvs-commits mailing list