[jboss-cvs] JBoss Messaging SVN: r3607 - in branches/Branch_Stable: src/main/org/jboss/jms/message and 2 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Jan 21 16:34:08 EST 2008
Author: clebert.suconic at jboss.com
Date: 2008-01-21 16:34:07 -0500 (Mon, 21 Jan 2008)
New Revision: 3607
Modified:
branches/Branch_Stable/src/main/org/jboss/jms/client/container/ClientConsumer.java
branches/Branch_Stable/src/main/org/jboss/jms/message/JBossMessage.java
branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/DistributedTopicTest.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-1216 - fix on Destination on Topics transfered through MessageSucker
Modified: branches/Branch_Stable/src/main/org/jboss/jms/client/container/ClientConsumer.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/client/container/ClientConsumer.java 2008-01-21 21:01:36 UTC (rev 3606)
+++ branches/Branch_Stable/src/main/org/jboss/jms/client/container/ClientConsumer.java 2008-01-21 21:34:07 UTC (rev 3607)
@@ -35,6 +35,7 @@
import org.jboss.jms.delegate.DefaultCancel;
import org.jboss.jms.delegate.DeliveryInfo;
import org.jboss.jms.delegate.SessionDelegate;
+import org.jboss.jms.destination.JBossDestination;
import org.jboss.jms.message.JBossMessage;
import org.jboss.jms.message.MessageProxy;
import org.jboss.jms.wireformat.ClientDelivery;
@@ -269,10 +270,18 @@
{
ClientDelivery del = (ClientDelivery)message;
- Message msg = del.getMessage();
+ JBossMessage msg = (JBossMessage)del.getMessage();
MessageProxy proxy = JBossMessage.
createThinDelegate(del.getDeliveryId(), (JBossMessage)msg, del.getDeliveryCount());
+
+ JBossDestination dest =(JBossDestination) proxy.getJMSDestination();
+
+ // If the message received is a direct destination (MessageSucker), and this Consumer is not a MessageSucker.. then we need to replace the destination
+ if (dest.isDirect() && !this.consumerDelegate.getDestination().isDirect())
+ {
+ proxy.setJMSDestination(msg.getOriginalSuckerDestination());
+ }
//TODO - we temporarily need to execute on a different thread to avoid a deadlock situation in
// failover where a message is sent then the valve is locked, and the message send cause
Modified: branches/Branch_Stable/src/main/org/jboss/jms/message/JBossMessage.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/message/JBossMessage.java 2008-01-21 21:01:36 UTC (rev 3606)
+++ branches/Branch_Stable/src/main/org/jboss/jms/message/JBossMessage.java 2008-01-21 21:34:07 UTC (rev 3607)
@@ -103,6 +103,9 @@
//Used when sending a message to the DLQ
public static final String JBOSS_MESSAGING_ORIG_MESSAGE_ID = "JBM_ORIG_MESSAGE_ID";
+ // Used to cache messages when transfered through MessageSucker
+ public static final String JBOSS_MESSAGING_ORIG_DESTINATION_SUCKER = "JBM_ORIG_DESTINATION_SUCKER";
+
//Used when sending a mesage to the DLQ
public static final String JBOSS_MESSAGING_ACTUAL_EXPIRY_TIME = "JBM_ACTUAL_EXPIRY";
@@ -490,6 +493,16 @@
return headers;
}
+
+ public Destination getOriginalSuckerDestination()
+ {
+ return (Destination)headers.get(JBOSS_MESSAGING_ORIG_DESTINATION_SUCKER);
+ }
+
+ public void setOriginalSuckerDestination(Destination destination)
+ {
+ headers.put(JBOSS_MESSAGING_ORIG_DESTINATION_SUCKER, destination);
+ }
public int getJMSDeliveryMode() throws JMSException
{
Modified: branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java 2008-01-21 21:01:36 UTC (rev 3606)
+++ branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java 2008-01-21 21:34:07 UTC (rev 3607)
@@ -22,6 +22,7 @@
package org.jboss.messaging.core.impl.clusterconnection;
+import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
@@ -37,6 +38,7 @@
import org.jboss.jms.delegate.SessionDelegate;
import org.jboss.jms.destination.JBossDestination;
import org.jboss.jms.destination.JBossQueue;
+import org.jboss.jms.message.JBossMessage;
import org.jboss.jms.message.MessageProxy;
import org.jboss.logging.Logger;
import org.jboss.messaging.core.contract.Queue;
@@ -247,7 +249,6 @@
{
Transaction tx = null;
- if (trace) { log.trace(this + " sucked message " + msg); }
try
{
@@ -319,7 +320,11 @@
}
*/
- org.jboss.messaging.core.contract.Message coreMessage = ((MessageProxy)msg).getMessage();
+ if (trace) { log.trace(this + " sucked message " + msg + " JMSDestination - " + msg.getJMSDestination()); }
+
+ Destination originalDestination = msg.getJMSDestination();
+
+ org.jboss.messaging.core.contract.Message coreMessage = ((MessageProxy)msg).getMessage();
if (preserveOrdering)
{
@@ -344,8 +349,10 @@
//First we ack it - this ack only occurs in memory even if it is a persistent message
msg.acknowledge();
- if (trace) { log.trace("Acknowledged message"); }
+ if (trace) { log.trace("Acknowledged message"); }
+ coreMessage.getHeaders().put(JBossMessage.JBOSS_MESSAGING_ORIG_DESTINATION_SUCKER, originalDestination);
+
//Then we send - this causes the ref to be moved (SQL UPDATE) in the database
producer.send(null, msg, msg.getJMSDeliveryMode(), msg.getJMSPriority(), timeToLive, true);
Modified: branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/DistributedTopicTest.java
===================================================================
--- branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/DistributedTopicTest.java 2008-01-21 21:01:36 UTC (rev 3606)
+++ branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/DistributedTopicTest.java 2008-01-21 21:34:07 UTC (rev 3607)
@@ -256,16 +256,21 @@
{
Connection connection0 = null;
Connection connection1 = null;
+ Connection connection2 = null;
Session session0 = null;
Session session1 = null;
+ Session session2 = null;
try
{
connection0 = createConnectionOnServer(cf, 0);
connection1 = createConnectionOnServer(cf, 1);
+ connection2 = createConnectionOnServer(cf, 1);
+
connection0.setClientID("aCustomer");
connection1.setClientID("aCustomer");
+ connection2.setClientID("aCustomer");
session0 = connection0.createSession(true,
@@ -283,13 +288,20 @@
MessageConsumer subscriber1 = session1.createDurableSubscriber(
topic[0], "sub2");
-
+
+
TestListener messageListener1 = new TestListener();
subscriber1.setMessageListener(messageListener1);
+ session2 = connection2.createSession(true, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer subscriber2 = session2.createDurableSubscriber(topic[0], "sub3");
+ TestListener messageListener2 = new TestListener();
+ subscriber2.setMessageListener(messageListener2);
+
connection0.start();
connection1.start();
+ connection2.start();
TextMessage message = session0.createTextMessage("Hello!");
@@ -300,22 +312,41 @@
message = (TextMessage) m0;
assertTrue(m0.getJMSDestination() instanceof Topic);
+
+ Topic topicDest = (Topic)m0.getJMSDestination();
+ assertEquals(topic[0].getTopicName(), topicDest.getTopicName());
assertEquals("Hello!", message.getText());
Message m1 = messageListener1.waitForMessage();
message = (TextMessage) m1;
Destination d1 = m1.getJMSDestination();
assertTrue(m1.getJMSDestination() instanceof Topic);
+ topicDest = (Topic)m1.getJMSDestination();
+ log.info("******** topicName = " + topicDest.getTopicName());
+ assertEquals(topic[1].getTopicName(), topicDest.getTopicName());
assertEquals("Hello!", message.getText());
+
+ Message m2 = messageListener2.waitForMessage();
+ message = (TextMessage) m2;
+ Destination d2 = m2.getJMSDestination();
+ assertTrue(d2 instanceof Topic);
+ assertEquals(topic[0].getTopicName(),((Topic)d2).getTopicName());
+
+ session0.commit();
+ session1.commit();
+ session2.commit();
+
subscriber0.close();
subscriber1.close();
+ subscriber2.close();
+
session0.unsubscribe("sub");
session1.unsubscribe("sub2");
-
-
+ session2.unsubscribe("sub3");
+
} finally
{
try
@@ -340,6 +371,17 @@
throw e;
}
+ try
+ {
+ if (connection2 != null)
+ {
+ connection2.close();
+ }
+ } catch (JMSException e)
+ {
+ throw e;
+ }
+
}
}
More information about the jboss-cvs-commits
mailing list