[hornetq-commits] JBoss hornetq SVN: r8872 - in trunk: src/main/org/hornetq/core/protocol/stomp and 1 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Mon Feb 15 05:06:01 EST 2010
Author: jmesnil
Date: 2010-02-15 05:06:00 -0500 (Mon, 15 Feb 2010)
New Revision: 8872
Modified:
trunk/docs/user-manual/en/interoperability.xml
trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompUtils.java
trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-129: Implement STOMP v1.0
* removed conversion from Stomp destinations to HornetQ addresses and queues. Stomp clients MUST
use HornetQ semantics (address & queue, jms prefix) when setting Stomp destinations
* added doc about Stomp/HornetQ/JMS destination mapping
Modified: trunk/docs/user-manual/en/interoperability.xml
===================================================================
--- trunk/docs/user-manual/en/interoperability.xml 2010-02-11 20:16:38 UTC (rev 8871)
+++ trunk/docs/user-manual/en/interoperability.xml 2010-02-15 10:06:00 UTC (rev 8872)
@@ -44,14 +44,6 @@
<para>Message acknowledgements are not transactional. The ACK frame can not be part of a transaction
(it will be ignored if its <literal>transaction</literal> header is set).</para>
</section>
- <section>
- <title>Destination Mapping</title>
- <para>Stomp messages are sent and received by specifying "destinations".
- If the Stomp destinations starts with <literal>/queue/</literal>, <literal>/topic/</literal>,
- <literal>/temp-queue/</literal> or <literal>/temp-topic/</literal>, they will be mapped to corresponding
- JMS Destinations. Ohterwise, they will be treated as regular HornetQ addresses (for sent messages) and
- queues (for subscription and received messages).</para>
- </section>
</section>
<section id="stompconnect">
<title>StompConnect</title>
@@ -70,6 +62,46 @@
<para>Make sure this file is in the classpath along with the StompConnect jar and the
HornetQ jars and simply run <literal>java org.codehaus.stomp.jms.Main</literal>.</para>
</section>
+ <section>
+ <title>Mapping Stomp destinations to HornetQ addresses and queues</title>
+ <para>Stomp clients deals with <emphasis>destinations</emphasis> when sending messages and subscribing.
+ Destination names are simply strings which are mapped to some form of destination on the
+ server - how the server translates these is left to the server implementation.</para>
+ <para>In HornetQ, these destinations are mapped to <emphasis>addresses</emphasis> and <emphasis>queues</emphasis>.
+ When a Stomp client sends a message (using a <literal>SEND</literal> frame), the specified destination is mapped
+ to an address.
+ When a Stomp client subscribes (or unsubscribes) for a destination (using a <literal>SUBSCRIBE</literal>
+ or <literal>UNSUBSCRIBE</literal> frame), the destination is mapped to a HornetQ queue.</para>
+ <section>
+ <title>Using JMS destinations</title>
+ <para>As explained in <xref linkend="jms-core-mapping" />, JMS destinations are also mapped to HornetQ addresses and queues.
+ If you want to use Stomp to send messages to JMS destinations, the Stomp destinations must follow the same convention:</para>
+ <itemizedlist>
+ <listitem>
+ <para>send or subscribe to a JMS <emphasis>Queue</emphasis> by prepending the queue name by <literal>jms.queue.</literal>.</para>
+ <para>For example, to send a message to the <literal>orders</literal> JMS Queue, the Stomp client must send the frame:</para>
+ <programlisting>
+SEND
+destination:jms.queue.orders
+
+hello queue orders
+^@
+ </programlisting>
+ </listitem>
+ <listitem>
+ <para>send or subscribe to a JMS <emphasis>Queue</emphasis> by prepending the topic name by <literal>jms.topic.</literal>.</para>
+ <para>For example to subscribe to the <literal>stocks</literal> JMS Topic, the Stomp client must send the frame:</para>
+ <programlisting>
+SUBSCRIBE
+destination:jms.topic.stocks
+
+^@
+ </programlisting>
+ </listitem>
+ </itemizedlist>
+
+ </section>
+ </section>
</section>
<section>
<title>REST</title>
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-02-11 20:16:38 UTC (rev 8871)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-02-15 10:06:00 UTC (rev 8872)
@@ -258,7 +258,6 @@
}
subscriptionID = "subscription/" + destination;
}
- String hornetqDestination = StompUtils.toHornetQAddress(destination);
StompSession stompSession = getSession(connection);
if (stompSession.containsSubscription(subscriptionID))
{
@@ -266,7 +265,7 @@
". Either use unique subscription IDs or do not create multiple subscriptions for the same destination");
}
long consumerID = server.getStorageManager().generateUniqueID();
- stompSession.addSubscription(consumerID, subscriptionID, hornetqDestination, selector, ack);
+ stompSession.addSubscription(consumerID, subscriptionID, destination, selector, ack);
return null;
}
@@ -467,7 +466,7 @@
{
checkConnected(connection);
Map<String, Object> headers = frame.getHeaders();
- String queue = (String)headers.remove(Stomp.Headers.Send.DESTINATION);
+ String destination = (String)headers.remove(Stomp.Headers.Send.DESTINATION);
String txID = (String)headers.remove(Stomp.Headers.TRANSACTION);
byte type = Message.TEXT_TYPE;
if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH))
@@ -475,12 +474,11 @@
type = Message.BYTES_TYPE;
}
long timestamp = System.currentTimeMillis();
- SimpleString address = SimpleString.toSimpleString(StompUtils.toHornetQAddress(queue));
ServerMessageImpl message = new ServerMessageImpl(server.getStorageManager().generateUniqueID(), 512);
message.setType(type);
message.setTimestamp(timestamp);
- message.setAddress(address);
+ message.setAddress(SimpleString.toSimpleString(destination));
StompUtils.copyStandardHeadersFromFrameToMessage(frame, message);
byte[] content = frame.getContent();
if (type == Message.TEXT_TYPE)
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java 2010-02-11 20:16:38 UTC (rev 8871)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java 2010-02-15 10:06:00 UTC (rev 8872)
@@ -73,8 +73,7 @@
StompSubscription subscription = subscriptions.get(consumerID);
Map<String, Object> headers = new HashMap<String, Object>();
- headers.put(Stomp.Headers.Message.DESTINATION, StompUtils.toStompDestination(serverMessage.getAddress()
- .toString()));
+ headers.put(Stomp.Headers.Message.DESTINATION, serverMessage.getAddress().toString());
if (subscription.getID() != null)
{
headers.put(Stomp.Headers.Message.SUBSCRIPTION, subscription.getID());
@@ -150,7 +149,7 @@
public void addSubscription(long consumerID, String subscriptionID, String destination, String selector, String ack) throws Exception
{
SimpleString queue = SimpleString.toSimpleString(destination);
- if (destination.startsWith(StompUtils.HQ_TOPIC_PREFIX))
+ if (destination.startsWith("jms.topic"))
{
// subscribes to a topic
queue = UUIDGenerator.getInstance().generateSimpleStringUUID();
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompUtils.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompUtils.java 2010-02-11 20:16:38 UTC (rev 8871)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompUtils.java 2010-02-15 10:06:00 UTC (rev 8872)
@@ -18,7 +18,6 @@
import java.util.Map;
import java.util.Set;
-import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.client.impl.ClientMessageImpl;
@@ -33,94 +32,12 @@
*/
class StompUtils
{
-
- public static String HQ_QUEUE_PREFIX = "jms.queue.";
-
- public static String STOMP_QUEUE_PREFIX = "/queue/";
-
- public static String HQ_TEMP_QUEUE_PREFIX = "jms.tempqueue.";
-
- public static String STOMP_TEMP_QUEUE_PREFIX = "/temp-queue/";
-
- public static String HQ_TOPIC_PREFIX = "jms.topic.";
-
- public static String STOMP_TOPIC_PREFIX = "/topic/";
-
- public static String HQ_TEMP_TOPIC_PREFIX = "jms.temptopic.";
-
- public static String STOMP_TEMP_TOPIC_PREFIX = "/temp-topic/";
-
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
// Static --------------------------------------------------------
- public static String toHornetQAddress(String stompDestination) throws HornetQException
- {
- if (stompDestination == null)
- {
- throw new HornetQException(HornetQException.ILLEGAL_STATE, "No destination is specified!");
- }
- else if (stompDestination.startsWith(STOMP_QUEUE_PREFIX))
- {
- return convert(stompDestination, STOMP_QUEUE_PREFIX, HQ_QUEUE_PREFIX);
- }
- else if (stompDestination.startsWith(STOMP_TOPIC_PREFIX))
- {
- return convert(stompDestination, STOMP_TOPIC_PREFIX, HQ_TOPIC_PREFIX);
- }
- else if (stompDestination.startsWith(STOMP_TEMP_QUEUE_PREFIX))
- {
- return convert(stompDestination, STOMP_TEMP_QUEUE_PREFIX, HQ_TEMP_QUEUE_PREFIX);
- }
- else if (stompDestination.startsWith(STOMP_TEMP_TOPIC_PREFIX))
- {
- return convert(stompDestination, STOMP_TEMP_TOPIC_PREFIX, HQ_TEMP_TOPIC_PREFIX);
- }
- else
- {
- // it is also possible the STOMP client send a message directly to a HornetQ address
- // in that case, we do nothing:
- return stompDestination;
- }
- }
-
- public static String toStompDestination(String hornetqAddress) throws HornetQException
- {
- if (hornetqAddress == null)
- {
- throw new HornetQException(HornetQException.ILLEGAL_STATE, "No destination is specified!");
- }
- else if (hornetqAddress.startsWith(HQ_QUEUE_PREFIX))
- {
- return convert(hornetqAddress, HQ_QUEUE_PREFIX, STOMP_QUEUE_PREFIX);
- }
- else if (hornetqAddress.startsWith(HQ_TOPIC_PREFIX))
- {
- return convert(hornetqAddress, HQ_TOPIC_PREFIX, STOMP_TOPIC_PREFIX);
- }
- else if (hornetqAddress.startsWith(HQ_TEMP_QUEUE_PREFIX))
- {
- return convert(hornetqAddress, HQ_TEMP_QUEUE_PREFIX, STOMP_TEMP_QUEUE_PREFIX);
- }
- else if (hornetqAddress.startsWith(HQ_TEMP_TOPIC_PREFIX))
- {
- return convert(hornetqAddress, HQ_TEMP_TOPIC_PREFIX, STOMP_TEMP_TOPIC_PREFIX);
- }
- else
- {
- // do nothing
- return hornetqAddress;
- }
- }
-
- private static String convert(String str, String oldPrefix, String newPrefix)
- {
- String sub = str.substring(oldPrefix.length(), str.length());
- return new String(newPrefix + sub);
- }
-
public static void copyStandardHeadersFromFrameToMessage(StompFrame frame, ServerMessageImpl msg) throws Exception
{
Map<String, Object> headers = new HashMap<String, Object>(frame.getHeaders());
@@ -144,12 +61,17 @@
{
msg.putStringProperty(Message.HDR_GROUP_ID, SimpleString.toSimpleString(groupID));
}
- Object o = headers.remove(Stomp.Headers.Send.REPLY_TO);
- if (o != null)
+ Object replyTo = headers.remove(Stomp.Headers.Send.REPLY_TO);
+ if (replyTo != null)
{
- msg.putStringProperty(ClientMessageImpl.REPLYTO_HEADER_NAME, SimpleString.toSimpleString((String)o));
+ msg.putStringProperty(ClientMessageImpl.REPLYTO_HEADER_NAME, SimpleString.toSimpleString((String)replyTo));
}
-
+ String expiration = (String)headers.remove(Stomp.Headers.Send.REPLY_TO);
+ if (expiration != null)
+ {
+ msg.setExpiration(Long.parseLong(expiration));
+ }
+
// now the general headers
for (Iterator<Map.Entry<String, Object>> iter = headers.entrySet().iterator(); iter.hasNext();)
{
@@ -163,7 +85,7 @@
public static void copyStandardHeadersFromMessageToFrame(Message message, StompFrame command, int deliveryCount) throws Exception
{
final Map<String, Object> headers = command.getHeaders();
- headers.put(Stomp.Headers.Message.DESTINATION, toStompDestination(message.getAddress().toString()));
+ headers.put(Stomp.Headers.Message.DESTINATION, message.getAddress().toString());
headers.put(Stomp.Headers.Message.MESSAGE_ID, message.getMessageID());
if (message.getObjectProperty("JMSCorrelationID") != null)
@@ -173,11 +95,10 @@
headers.put(Stomp.Headers.Message.EXPIRATION_TIME, "" + message.getExpiration());
headers.put(Stomp.Headers.Message.REDELIVERED, deliveryCount > 1);
headers.put(Stomp.Headers.Message.PRORITY, "" + message.getPriority());
-
if (message.getStringProperty(ClientMessageImpl.REPLYTO_HEADER_NAME) != null)
{
headers.put(Stomp.Headers.Message.REPLY_TO,
- toStompDestination(message.getStringProperty(ClientMessageImpl.REPLYTO_HEADER_NAME)));
+ message.getStringProperty(ClientMessageImpl.REPLYTO_HEADER_NAME));
}
headers.put(Stomp.Headers.Message.TIMESTAMP, "" + message.getTimestamp());
Modified: trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-02-11 20:16:38 UTC (rev 8871)
+++ trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-02-15 10:06:00 UTC (rev 8872)
@@ -103,7 +103,7 @@
// sending a message will result in an error
String frame =
"SEND\n" +
- "destination:/queue/" + getQueueName() + "\n\n" +
+ "destination:" + getQueuePrefix() + getQueueName() + "\n\n" +
"Hello World" +
Stomp.NULL;
try {
@@ -131,7 +131,7 @@
frame =
"SEND\n" +
- "destination:/queue/" + getQueueName() + "\n\n" +
+ "destination:" + getQueuePrefix() + getQueueName() + "\n\n" +
"Hello World" +
Stomp.NULL;
@@ -164,7 +164,7 @@
frame =
"SEND\n" +
- "destination:/queue/" + getQueueName() + "\n" +
+ "destination:" + getQueuePrefix() + getQueueName() + "\n" +
"receipt: 1234\n\n" +
"Hello World" +
Stomp.NULL;
@@ -204,7 +204,7 @@
frame =
"SEND\n" +
- "destination:/queue/" + getQueueName() + "\n" +
+ "destination:" + getQueuePrefix() + getQueueName() + "\n" +
"content-length:" + data.length + "\n\n" +
new String(data) +
Stomp.NULL;
@@ -242,7 +242,7 @@
frame =
"SEND\n" +
- "destination:/queue/" + getQueueName() + "\n" +
+ "destination:" + getQueuePrefix() + getQueueName() + "\n" +
"JMSXGroupID: TEST\n\n" +
"Hello World" +
Stomp.NULL;
@@ -273,7 +273,7 @@
"SEND\n" +
"foo:abc\n" +
"bar:123\n" +
- "destination:/queue/" + getQueueName() + "\n\n" +
+ "destination:" + getQueuePrefix() + getQueueName() + "\n\n" +
"Hello World" +
Stomp.NULL;
@@ -309,7 +309,7 @@
"JMSXGroupID:abc\n" +
"foo:abc\n" +
"bar:123\n" +
- "destination:/queue/" + getQueueName() + "\n\n" +
+ "destination:" + getQueuePrefix() + getQueueName() + "\n\n" +
"Hello World" +
Stomp.NULL;
@@ -344,7 +344,7 @@
frame =
"SUBSCRIBE\n" +
- "destination:/queue/" + getQueueName() + "\n" +
+ "destination:" + getQueuePrefix() + getQueueName() + "\n" +
"ack:auto\n\n" +
Stomp.NULL;
sendFrame(frame);
@@ -383,7 +383,7 @@
frame =
"SUBSCRIBE\n" +
- "destination:/queue/" + getQueueName() + "\n" +
+ "destination:" + getQueuePrefix() + getQueueName() + "\n" +
"ack:auto\n\n" +
Stomp.NULL;
sendFrame(frame);
@@ -423,7 +423,7 @@
frame =
"SUBSCRIBE\n" +
- "destination:/queue/" + getQueueName() + "\n" +
+ "destination:" + getQueuePrefix() + getQueueName() + "\n" +
"ack:auto\n\n" +
Stomp.NULL;
sendFrame(frame);
@@ -476,7 +476,7 @@
frame =
"SUBSCRIBE\n" +
- "destination:/queue/" + getQueueName() + "\n" +
+ "destination:" + getQueuePrefix() + getQueueName() + "\n" +
"ack:auto\n" +
"id: mysubid\n\n" +
Stomp.NULL;
@@ -513,7 +513,7 @@
frame =
"SUBSCRIBE\n" +
- "destination:/queue/" + getQueueName() + "\n" +
+ "destination:" + getQueuePrefix() + getQueueName() + "\n" +
"ack:auto\n\n" +
Stomp.NULL;
sendFrame(frame);
@@ -562,7 +562,7 @@
frame =
"SUBSCRIBE\n" +
- "destination:/queue/" + getQueueName() + "\n" +
+ "destination:" + getQueuePrefix() + getQueueName() + "\n" +
"selector: foo = 'zzz'\n" +
"ack:auto\n\n" +
Stomp.NULL;
@@ -596,7 +596,7 @@
frame =
"SUBSCRIBE\n" +
- "destination:/queue/" + getQueueName() + "\n" +
+ "destination:" + getQueuePrefix() + getQueueName() + "\n" +
"ack:client\n\n" +
Stomp.NULL;
@@ -642,7 +642,7 @@
frame =
"SUBSCRIBE\n" +
- "destination:/queue/" + getQueueName() + "\n" +
+ "destination:" + getQueuePrefix() + getQueueName() + "\n" +
"ack:client\n\n" +
Stomp.NULL;
@@ -687,7 +687,7 @@
frame =
"SUBSCRIBE\n" +
- "destination:/queue/" + getQueueName() + "\n" +
+ "destination:" + getQueuePrefix() + getQueueName() + "\n" +
"ack:client\n\n" +
Stomp.NULL;
@@ -726,7 +726,7 @@
frame =
"SUBSCRIBE\n" +
- "destination:/queue/" + getQueueName() + "\n\n" +
+ "destination:" + getQueuePrefix() + getQueueName() + "\n\n" +
Stomp.NULL;
sendFrame(frame);
@@ -756,7 +756,7 @@
frame =
"SUBSCRIBE\n" +
- "destination:/queue/" + getQueueName() + "\n" +
+ "destination:" + getQueuePrefix() + getQueueName() + "\n" +
"receipt: 1234\n\n" +
Stomp.NULL;
@@ -786,7 +786,7 @@
frame =
"SUBSCRIBE\n" +
- "destination:/queue/" + getQueueName() + "\n" +
+ "destination:" + getQueuePrefix() + getQueueName() + "\n" +
"ack:auto\n\n" +
Stomp.NULL;
sendFrame(frame);
@@ -801,7 +801,7 @@
//remove suscription
frame =
"UNSUBSCRIBE\n" +
- "destination:/queue/" + getQueueName() + "\n" +
+ "destination:" + getQueuePrefix() + getQueueName() + "\n" +
"receipt:567\n" +
"\n\n" +
Stomp.NULL;
@@ -834,7 +834,7 @@
frame =
"SUBSCRIBE\n" +
- "destination:/queue/" + getQueueName() + "\n" +
+ "destination:" + getQueuePrefix() + getQueueName() + "\n" +
"id: mysubid\n" +
"ack:auto\n\n" +
Stomp.NULL;
@@ -892,7 +892,7 @@
frame =
"SEND\n" +
- "destination:/queue/" + getQueueName() + "\n" +
+ "destination:" + getQueuePrefix() + getQueueName() + "\n" +
"transaction: tx1\n" +
"receipt: 123\n" +
"\n\n" +
@@ -940,7 +940,7 @@
frame =
"SEND\n" +
- "destination:/queue/" + getQueueName() + "\n" +
+ "destination:" + getQueuePrefix() + getQueueName() + "\n" +
"transaction: tx1\n" +
"\n\n" +
"Hello World" +
@@ -967,7 +967,7 @@
frame =
"SEND\n" +
- "destination:/queue/" + getQueueName() + "\n" +
+ "destination:" + getQueuePrefix() + getQueueName() + "\n" +
"transaction: tx1\n" +
"\n\n" +
"Hello World" +
@@ -1038,7 +1038,7 @@
frame =
"SEND\n" +
- "destination:/queue/" + getQueueName() + "\n" +
+ "destination:" + getQueuePrefix() + getQueueName() + "\n" +
"transaction: tx1\n" +
"\n" +
"first message" +
@@ -1062,7 +1062,7 @@
frame =
"SEND\n" +
- "destination:/queue/" + getQueueName() + "\n" +
+ "destination:" + getQueuePrefix() + getQueueName() + "\n" +
"transaction: tx1\n" +
"\n" +
"second message" +
@@ -1098,7 +1098,7 @@
frame =
"SUBSCRIBE\n" +
- "destination:/topic/" + getTopicName() + "\n" +
+ "destination:" + getTopicPrefix() + getTopicName() + "\n" +
"receipt: 12\n" +
"\n\n" +
Stomp.NULL;
@@ -1116,7 +1116,7 @@
frame =
"UNSUBSCRIBE\n" +
- "destination:/topic/" + getTopicName() + "\n" +
+ "destination:" + getTopicPrefix() + getTopicName() + "\n" +
"receipt: 1234\n" +
"\n\n" +
Stomp.NULL;
@@ -1157,7 +1157,7 @@
frame =
"SUBSCRIBE\n" +
- "destination:/queue/" + getQueueName() + "\n" +
+ "destination:" + getQueuePrefix() + getQueueName() + "\n" +
"ack:client\n" +
"\n\n" +
Stomp.NULL;
@@ -1208,7 +1208,7 @@
frame =
"UNSUBSCRIBE\n" +
- "destination:/queue/" + getQueueName() + "\n" +
+ "destination:" + getQueuePrefix() + getQueueName() + "\n" +
"\n\n" +
Stomp.NULL;
sendFrame(frame);
@@ -1301,10 +1301,18 @@
return "test";
}
+ protected String getQueuePrefix() {
+ return "jms.queue.";
+ }
+
protected String getTopicName() {
return "testtopic";
}
-
+
+ protected String getTopicPrefix() {
+ return "jms.topic.";
+ }
+
public void sendFrame(String data) throws Exception {
byte[] bytes = data.getBytes("UTF-8");
OutputStream outputStream = stompSocket.getOutputStream();
More information about the hornetq-commits
mailing list