[jboss-cvs] JBoss Messaging SVN: r3608 - in branches/Branch_JBossMessaging_1_4_0_SP3_CP: src/main/org/jboss/jms/message and 2 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Jan 21 17:22:08 EST 2008
Author: clebert.suconic at jboss.com
Date: 2008-01-21 17:22:08 -0500 (Mon, 21 Jan 2008)
New Revision: 3608
Modified:
branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/jms/client/container/ClientConsumer.java
branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/jms/message/JBossMessage.java
branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
branches/Branch_JBossMessaging_1_4_0_SP3_CP/tests/src/org/jboss/test/messaging/jms/clustering/DistributedTopicTest.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-1216 - fix on Destination Topics transfered through MessageSucker
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/jms/client/container/ClientConsumer.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/jms/client/container/ClientConsumer.java 2008-01-21 21:34:07 UTC (rev 3607)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/jms/client/container/ClientConsumer.java 2008-01-21 22:22:08 UTC (rev 3608)
@@ -35,6 +35,7 @@
import org.jboss.jms.delegate.DefaultCancel;
import org.jboss.jms.delegate.DeliveryInfo;
import org.jboss.jms.delegate.SessionDelegate;
+import org.jboss.jms.destination.JBossDestination;
import org.jboss.jms.message.JBossMessage;
import org.jboss.jms.message.MessageProxy;
import org.jboss.jms.wireformat.ClientDelivery;
@@ -269,10 +270,18 @@
{
ClientDelivery del = (ClientDelivery)message;
- Message msg = del.getMessage();
+ JBossMessage msg = (JBossMessage)del.getMessage();
MessageProxy proxy = JBossMessage.
createThinDelegate(del.getDeliveryId(), (JBossMessage)msg, del.getDeliveryCount());
+
+ JBossDestination dest =(JBossDestination) proxy.getJMSDestination();
+
+ // If the message received is a direct destination (MessageSucker), and this Consumer is not a MessageSucker.. then we need to replace the destination
+ if (dest.isDirect() && !this.consumerDelegate.getDestination().isDirect())
+ {
+ proxy.setJMSDestination(msg.getOriginalSuckerDestination());
+ }
//TODO - we temporarily need to execute on a different thread to avoid a deadlock situation in
// failover where a message is sent then the valve is locked, and the message send cause
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/jms/message/JBossMessage.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/jms/message/JBossMessage.java 2008-01-21 21:34:07 UTC (rev 3607)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/jms/message/JBossMessage.java 2008-01-21 22:22:08 UTC (rev 3608)
@@ -103,6 +103,9 @@
//Used when sending a message to the DLQ
public static final String JBOSS_MESSAGING_ORIG_MESSAGE_ID = "JBM_ORIG_MESSAGE_ID";
+ // Used to cache messages when transfered through MessageSucker
+ public static final String JBOSS_MESSAGING_ORIG_DESTINATION_SUCKER = "JBM_ORIG_DESTINATION_SUCKER";
+
//Used when sending a mesage to the DLQ
public static final String JBOSS_MESSAGING_ACTUAL_EXPIRY_TIME = "JBM_ACTUAL_EXPIRY";
@@ -490,6 +493,16 @@
return headers;
}
+
+ public Destination getOriginalSuckerDestination()
+ {
+ return (Destination)headers.get(JBOSS_MESSAGING_ORIG_DESTINATION_SUCKER);
+ }
+
+ public void setOriginalSuckerDestination(Destination destination)
+ {
+ headers.put(JBOSS_MESSAGING_ORIG_DESTINATION_SUCKER, destination);
+ }
public int getJMSDeliveryMode() throws JMSException
{
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java 2008-01-21 21:34:07 UTC (rev 3607)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java 2008-01-21 22:22:08 UTC (rev 3608)
@@ -22,6 +22,7 @@
package org.jboss.messaging.core.impl.clusterconnection;
+import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
@@ -37,6 +38,7 @@
import org.jboss.jms.delegate.SessionDelegate;
import org.jboss.jms.destination.JBossDestination;
import org.jboss.jms.destination.JBossQueue;
+import org.jboss.jms.message.JBossMessage;
import org.jboss.jms.message.MessageProxy;
import org.jboss.logging.Logger;
import org.jboss.messaging.core.contract.Queue;
@@ -247,7 +249,6 @@
{
Transaction tx = null;
- if (trace) { log.trace(this + " sucked message " + msg); }
try
{
@@ -319,7 +320,11 @@
}
*/
- org.jboss.messaging.core.contract.Message coreMessage = ((MessageProxy)msg).getMessage();
+ if (trace) { log.trace(this + " sucked message " + msg + " JMSDestination - " + msg.getJMSDestination()); }
+
+ Destination originalDestination = msg.getJMSDestination();
+
+ org.jboss.messaging.core.contract.Message coreMessage = ((MessageProxy)msg).getMessage();
if (preserveOrdering)
{
@@ -344,8 +349,10 @@
//First we ack it - this ack only occurs in memory even if it is a persistent message
msg.acknowledge();
- if (trace) { log.trace("Acknowledged message"); }
+ if (trace) { log.trace("Acknowledged message"); }
+ coreMessage.getHeaders().put(JBossMessage.JBOSS_MESSAGING_ORIG_DESTINATION_SUCKER, originalDestination);
+
//Then we send - this causes the ref to be moved (SQL UPDATE) in the database
producer.send(null, msg, msg.getJMSDeliveryMode(), msg.getJMSPriority(), timeToLive, true);
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP/tests/src/org/jboss/test/messaging/jms/clustering/DistributedTopicTest.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP/tests/src/org/jboss/test/messaging/jms/clustering/DistributedTopicTest.java 2008-01-21 21:34:07 UTC (rev 3607)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP/tests/src/org/jboss/test/messaging/jms/clustering/DistributedTopicTest.java 2008-01-21 22:22:08 UTC (rev 3608)
@@ -23,9 +23,12 @@
import javax.jms.Connection;
import javax.jms.DeliveryMode;
+import javax.jms.Destination;
import javax.jms.InvalidDestinationException;
+import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
@@ -247,6 +250,140 @@
}
}
+ // http://jira.jboss.org/jira/browse/JBMESSAGING-1216
+ public void testDestinationTypeOnMessage() throws Exception
+ {
+ Connection connection0 = null;
+ Connection connection1 = null;
+ Connection connection2 = null;
+
+ Session session0 = null;
+ Session session1 = null;
+ Session session2 = null;
+
+ try
+ {
+ connection0 = createConnectionOnServer(cf, 0);
+ connection1 = createConnectionOnServer(cf, 1);
+ connection2 = createConnectionOnServer(cf, 1);
+
+ connection0.setClientID("aCustomer");
+ connection1.setClientID("aCustomer");
+ connection2.setClientID("aCustomer");
+
+
+ session0 = connection0.createSession(true,
+ Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer subscriber0 = session0.createDurableSubscriber(
+ topic[0], "sub");
+ TestListener messageListener0 = new TestListener();
+ subscriber0.setMessageListener(messageListener0);
+
+ MessageProducer publisher = session0.createProducer(topic[0]);
+
+ session1 = connection1.createSession(true,
+ Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer subscriber1 = session1.createDurableSubscriber(
+ topic[0], "sub2");
+
+
+ TestListener messageListener1 = new TestListener();
+ subscriber1.setMessageListener(messageListener1);
+
+ session2 = connection2.createSession(true, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer subscriber2 = session2.createDurableSubscriber(topic[0], "sub3");
+ TestListener messageListener2 = new TestListener();
+ subscriber2.setMessageListener(messageListener2);
+
+
+ connection0.start();
+ connection1.start();
+ connection2.start();
+
+
+ TextMessage message = session0.createTextMessage("Hello!");
+ publisher.send(message);
+ session0.commit();
+
+ Message m0 = messageListener0.waitForMessage();
+
+ message = (TextMessage) m0;
+ assertTrue(m0.getJMSDestination() instanceof Topic);
+
+ Topic topicDest = (Topic)m0.getJMSDestination();
+ assertEquals(topic[0].getTopicName(), topicDest.getTopicName());
+ assertEquals("Hello!", message.getText());
+
+ Message m1 = messageListener1.waitForMessage();
+ message = (TextMessage) m1;
+ Destination d1 = m1.getJMSDestination();
+ assertTrue(m1.getJMSDestination() instanceof Topic);
+ topicDest = (Topic)m1.getJMSDestination();
+ log.info("******** topicName = " + topicDest.getTopicName());
+ assertEquals(topic[1].getTopicName(), topicDest.getTopicName());
+
+ assertEquals("Hello!", message.getText());
+
+
+ Message m2 = messageListener2.waitForMessage();
+ message = (TextMessage) m2;
+ Destination d2 = m2.getJMSDestination();
+ assertTrue(d2 instanceof Topic);
+ assertEquals(topic[0].getTopicName(),((Topic)d2).getTopicName());
+
+ session0.commit();
+ session1.commit();
+ session2.commit();
+
+ subscriber0.close();
+ subscriber1.close();
+ subscriber2.close();
+
+ session0.unsubscribe("sub");
+ session1.unsubscribe("sub2");
+ session2.unsubscribe("sub3");
+
+
+ } finally
+ {
+ try
+ {
+ if (connection0 != null)
+ {
+ connection0.close();
+ }
+ } catch (JMSException e)
+ {
+ throw e;
+ }
+
+ try
+ {
+ if (connection1 != null)
+ {
+ connection1.close();
+ }
+ } catch (JMSException e)
+ {
+ throw e;
+ }
+
+ try
+ {
+ if (connection2 != null)
+ {
+ connection2.close();
+ }
+ } catch (JMSException e)
+ {
+ throw e;
+ }
+
+ }
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -1683,5 +1820,65 @@
// Inner classes -------------------------------------------------
+
+ public class TestListener implements MessageListener
+ {
+ // Constants -----------------------------------------------------
+ // Static --------------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private Message message;
+
+ // Constructors --------------------------------------------------
+
+ public TestListener()
+ {
+ }
+
+ // MessageListener implementation --------------------------------
+
+ public synchronized void onMessage(Message message)
+ {
+ this.message = message;
+ notifyAll();
+ }
+
+ // Public --------------------------------------------------------
+
+ public synchronized Message waitForMessage()
+ {
+ if (message != null)
+ {
+ return message;
+ }
+ else
+ {
+
+ try
+ {
+ wait(5000);
+ }
+ catch(InterruptedException e)
+ {
+ // OK
+ }
+
+ return message;
+ }
+
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+ }
+
+
}
More information about the jboss-cvs-commits
mailing list