[jboss-cvs] JBoss Messaging SVN: r3607 - in branches/Branch_Stable: src/main/org/jboss/jms/message and 2 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Jan 21 16:34:08 EST 2008


Author: clebert.suconic at jboss.com
Date: 2008-01-21 16:34:07 -0500 (Mon, 21 Jan 2008)
New Revision: 3607

Modified:
   branches/Branch_Stable/src/main/org/jboss/jms/client/container/ClientConsumer.java
   branches/Branch_Stable/src/main/org/jboss/jms/message/JBossMessage.java
   branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
   branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/DistributedTopicTest.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-1216 - fix on Destination on Topics transfered through MessageSucker

Modified: branches/Branch_Stable/src/main/org/jboss/jms/client/container/ClientConsumer.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/client/container/ClientConsumer.java	2008-01-21 21:01:36 UTC (rev 3606)
+++ branches/Branch_Stable/src/main/org/jboss/jms/client/container/ClientConsumer.java	2008-01-21 21:34:07 UTC (rev 3607)
@@ -35,6 +35,7 @@
 import org.jboss.jms.delegate.DefaultCancel;
 import org.jboss.jms.delegate.DeliveryInfo;
 import org.jboss.jms.delegate.SessionDelegate;
+import org.jboss.jms.destination.JBossDestination;
 import org.jboss.jms.message.JBossMessage;
 import org.jboss.jms.message.MessageProxy;
 import org.jboss.jms.wireformat.ClientDelivery;
@@ -269,10 +270,18 @@
    {
       ClientDelivery del = (ClientDelivery)message;
       
-      Message msg = del.getMessage();
+      JBossMessage msg = (JBossMessage)del.getMessage();
       
       MessageProxy proxy = JBossMessage.
          createThinDelegate(del.getDeliveryId(), (JBossMessage)msg, del.getDeliveryCount());
+      
+      JBossDestination dest =(JBossDestination) proxy.getJMSDestination();
+      
+      // If the message received is a direct destination (MessageSucker), and this Consumer is not a MessageSucker.. then we need to replace the destination
+      if (dest.isDirect() && !this.consumerDelegate.getDestination().isDirect())
+      {
+         proxy.setJMSDestination(msg.getOriginalSuckerDestination());
+      }
 
       //TODO - we temporarily need to execute on a different thread to avoid a deadlock situation in
       //       failover where a message is sent then the valve is locked, and the message send cause

Modified: branches/Branch_Stable/src/main/org/jboss/jms/message/JBossMessage.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/message/JBossMessage.java	2008-01-21 21:01:36 UTC (rev 3606)
+++ branches/Branch_Stable/src/main/org/jboss/jms/message/JBossMessage.java	2008-01-21 21:34:07 UTC (rev 3607)
@@ -103,6 +103,9 @@
    //Used when sending a message to the DLQ
    public static final String JBOSS_MESSAGING_ORIG_MESSAGE_ID = "JBM_ORIG_MESSAGE_ID";
    
+   // Used to cache messages when transfered through MessageSucker
+   public static final String JBOSS_MESSAGING_ORIG_DESTINATION_SUCKER = "JBM_ORIG_DESTINATION_SUCKER";
+   
    //Used when sending a mesage to the DLQ
    public static final String JBOSS_MESSAGING_ACTUAL_EXPIRY_TIME = "JBM_ACTUAL_EXPIRY";
    
@@ -490,6 +493,16 @@
       
       return headers;
    }
+   
+   public Destination getOriginalSuckerDestination()
+   {
+      return (Destination)headers.get(JBOSS_MESSAGING_ORIG_DESTINATION_SUCKER);
+   }
+   
+   public void setOriginalSuckerDestination(Destination destination)
+   {
+      headers.put(JBOSS_MESSAGING_ORIG_DESTINATION_SUCKER, destination);
+   }
 
    public int getJMSDeliveryMode() throws JMSException
    {

Modified: branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java	2008-01-21 21:01:36 UTC (rev 3606)
+++ branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java	2008-01-21 21:34:07 UTC (rev 3607)
@@ -22,6 +22,7 @@
 
 package org.jboss.messaging.core.impl.clusterconnection;
 
+import javax.jms.Destination;
 import javax.jms.Message;
 import javax.jms.MessageListener;
 import javax.jms.Session;
@@ -37,6 +38,7 @@
 import org.jboss.jms.delegate.SessionDelegate;
 import org.jboss.jms.destination.JBossDestination;
 import org.jboss.jms.destination.JBossQueue;
+import org.jboss.jms.message.JBossMessage;
 import org.jboss.jms.message.MessageProxy;
 import org.jboss.logging.Logger;
 import org.jboss.messaging.core.contract.Queue;
@@ -247,7 +249,6 @@
 	{
 		Transaction tx = null;
 		
-		if (trace) { log.trace(this + " sucked message " + msg); }
 		
 		try
 		{
@@ -319,7 +320,11 @@
 			}
 			*/
 
-         org.jboss.messaging.core.contract.Message coreMessage = ((MessageProxy)msg).getMessage();
+	      if (trace) { log.trace(this + " sucked message " + msg + " JMSDestination - " + msg.getJMSDestination()); }
+
+	      Destination originalDestination = msg.getJMSDestination();
+	      
+	      org.jboss.messaging.core.contract.Message coreMessage = ((MessageProxy)msg).getMessage();
                   
          if (preserveOrdering)
          {
@@ -344,8 +349,10 @@
          //First we ack it - this ack only occurs in memory even if it is a persistent message
          msg.acknowledge();
          
-         if (trace) { log.trace("Acknowledged message"); }     
+         if (trace) { log.trace("Acknowledged message"); }
          
+         coreMessage.getHeaders().put(JBossMessage.JBOSS_MESSAGING_ORIG_DESTINATION_SUCKER, originalDestination);
+         
          //Then we send - this causes the ref to be moved (SQL UPDATE) in the database        
          producer.send(null, msg, msg.getJMSDeliveryMode(), msg.getJMSPriority(), timeToLive, true);
          

Modified: branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/DistributedTopicTest.java
===================================================================
--- branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/DistributedTopicTest.java	2008-01-21 21:01:36 UTC (rev 3606)
+++ branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/DistributedTopicTest.java	2008-01-21 21:34:07 UTC (rev 3607)
@@ -256,16 +256,21 @@
    {
       Connection connection0 = null;
       Connection connection1 = null;
+      Connection connection2 = null;
       
       Session session0 = null;
       Session session1 = null;
+      Session session2 = null;
 
       try
       {
          connection0 = createConnectionOnServer(cf, 0);
          connection1 = createConnectionOnServer(cf, 1);
+         connection2 = createConnectionOnServer(cf, 1);
+
          connection0.setClientID("aCustomer");
          connection1.setClientID("aCustomer");
+         connection2.setClientID("aCustomer");
 
 
          session0 = connection0.createSession(true,
@@ -283,13 +288,20 @@
 
          MessageConsumer subscriber1 = session1.createDurableSubscriber(
                topic[0], "sub2");
-
+         
+         
          TestListener messageListener1 = new TestListener();
          subscriber1.setMessageListener(messageListener1);
 
+         session2 = connection2.createSession(true, Session.AUTO_ACKNOWLEDGE);
+         MessageConsumer subscriber2 = session2.createDurableSubscriber(topic[0], "sub3");
+         TestListener messageListener2 = new TestListener();
+         subscriber2.setMessageListener(messageListener2);
 
+
          connection0.start();
          connection1.start();
+         connection2.start();
 
 
          TextMessage message = session0.createTextMessage("Hello!");
@@ -300,22 +312,41 @@
 
          message = (TextMessage) m0;
          assertTrue(m0.getJMSDestination() instanceof Topic);
+         
+         Topic topicDest = (Topic)m0.getJMSDestination();
+         assertEquals(topic[0].getTopicName(), topicDest.getTopicName());
          assertEquals("Hello!", message.getText());
 
          Message m1 = messageListener1.waitForMessage();
          message = (TextMessage) m1;
          Destination d1 = m1.getJMSDestination();
          assertTrue(m1.getJMSDestination() instanceof Topic);
+         topicDest = (Topic)m1.getJMSDestination();
+         log.info("******** topicName = " + topicDest.getTopicName());
+         assertEquals(topic[1].getTopicName(), topicDest.getTopicName());
 
          assertEquals("Hello!", message.getText());
          
+         
+         Message m2 = messageListener2.waitForMessage();
+         message = (TextMessage) m2;
+         Destination d2 = m2.getJMSDestination();
+         assertTrue(d2 instanceof Topic);
+         assertEquals(topic[0].getTopicName(),((Topic)d2).getTopicName());
+         
+         session0.commit();
+         session1.commit();
+         session2.commit();
+         
          subscriber0.close();
          subscriber1.close();
+         subscriber2.close();
+         
          session0.unsubscribe("sub");
          session1.unsubscribe("sub2");
-         
-          
+         session2.unsubscribe("sub3");
 
+
       } finally
       {
          try
@@ -340,6 +371,17 @@
             throw e;
          }
 
+         try
+         {
+            if (connection2 != null)
+            {
+               connection2.close();
+            }
+         } catch (JMSException e)
+         {
+            throw e;
+         }
+
       }
    }
    




More information about the jboss-cvs-commits mailing list