Author: jmesnil
Date: 2010-01-27 10:15:42 -0500 (Wed, 27 Jan 2010)
New Revision: 8848
Modified:
trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-129: Implement STOMP v1.0
* create a HornetQ queue when a subcription to a topic is made and create
a consumer on this queue.
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-01-27
12:23:11 UTC (rev 8847)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-01-27
15:15:42 UTC (rev 8848)
@@ -189,15 +189,20 @@
if (response != null)
{
- send(connection, response);
+ send(conn, response);
}
+
+ if (Stomp.Commands.DISCONNECT.equals(command))
+ {
+ conn.destroy();
+ }
}
catch (Exception e)
{
StompFrame error = createError(e, request);
if (error != null)
{
- send(connection, error);
+ send(conn, error);
}
}
}
@@ -235,6 +240,7 @@
}
subscriptionID = "subscription/" + destination;
}
+ String hornetqDestination = StompUtils.toHornetQAddress(destination);
StompSession stompSession = getSession(connection);
if (stompSession.containsSubscription(subscriptionID))
{
@@ -242,7 +248,7 @@
". Either use unique subscription IDs or do not
create multiple subscriptions for the same destination");
}
long consumerID = server.getStorageManager().generateUniqueID();
- stompSession.addSubscription(consumerID, subscriptionID, destination, selector,
ack);
+ stompSession.addSubscription(consumerID, subscriptionID, hornetqDestination,
selector, ack);
return null;
}
@@ -284,12 +290,9 @@
StompSession stompSession = null;
if (txID != null)
{
- throw new StompException("transactional ACK are not supported");
+ log.warn("Transactional acknowledgement is not supported");
}
- else
- {
- stompSession = getSession(connection);
- }
+ stompSession = getSession(connection);
stompSession.acknowledge(messageID);
return null;
@@ -508,12 +511,12 @@
return new StompFrame(Stomp.Responses.CONNECTED, h, StompMarshaller.NO_DATA);
}
- public int send(RemotingConnection connection, StompFrame frame)
+ public int send(StompConnection connection, StompFrame frame)
{
System.out.println(">>> " + frame);
synchronized (connection)
{
- if (connection.isDestroyed())
+ if (connection.isDestroyed() || !connection.isValid())
{
log.warn("Connection closed " + connection);
return 0;
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java 2010-01-27 12:23:11
UTC (rev 8847)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java 2010-01-27 15:15:42
UTC (rev 8848)
@@ -26,6 +26,7 @@
import org.hornetq.core.server.ServerSession;
import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.spi.core.protocol.SessionCallback;
+import org.hornetq.utils.UUIDGenerator;
/**
* A StompSession
@@ -146,17 +147,23 @@
public void addSubscription(long consumerID, String subscriptionID, String
destination, String selector, String ack) throws Exception
{
- String queue = StompUtils.toHornetQAddress(destination);
- session.createConsumer(consumerID,
- SimpleString.toSimpleString(queue),
- SimpleString.toSimpleString(selector),
- false);
- session.receiveConsumerCredits(consumerID, -1);
- StompSubscription subscription = new StompSubscription(subscriptionID,
destination, ack);
- subscriptions.put(consumerID, subscription);
- // FIXME not very smart: since we can't start the consumer, we start the
session
- // everytime to start the new consumer (and all previous consumers...)
- session.start();
+ SimpleString queue = SimpleString.toSimpleString(destination);
+ if (destination.startsWith(StompUtils.HQ_TOPIC_PREFIX))
+ {
+ //subscribes to a topic
+ queue = UUIDGenerator.getInstance().generateSimpleStringUUID();
+ session.createQueue(SimpleString.toSimpleString(destination), queue, null, true,
false);
+ }
+ session.createConsumer(consumerID,
+ queue,
+ SimpleString.toSimpleString(selector),
+ false);
+ session.receiveConsumerCredits(consumerID, -1);
+ StompSubscription subscription = new StompSubscription(subscriptionID, destination,
ack);
+ subscriptions.put(consumerID, subscription);
+ // FIXME not very smart: since we can't start the consumer, we start the
session
+ // every time to start the new consumer (and all previous consumers...)
+ session.start();
}
public boolean unsubscribe(String id) throws Exception
Modified: trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-01-27 12:23:11
UTC (rev 8847)
+++ trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-01-27 15:15:42
UTC (rev 8848)
@@ -32,15 +32,16 @@
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
+import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
+import javax.jms.Topic;
import junit.framework.Assert;
-import junit.framework.TestCase;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.config.Configuration;
@@ -58,6 +59,7 @@
import org.hornetq.jms.server.config.JMSConfiguration;
import org.hornetq.jms.server.config.impl.JMSConfigurationImpl;
import org.hornetq.jms.server.config.impl.QueueConfigurationImpl;
+import org.hornetq.jms.server.config.impl.TopicConfigurationImpl;
import org.hornetq.jms.server.impl.JMSServerManagerImpl;
import org.hornetq.spi.core.protocol.ProtocolType;
import org.hornetq.tests.util.UnitTestCase;
@@ -71,6 +73,7 @@
private Connection connection;
private Session session;
private Queue queue;
+ private Topic topic;
private JMSServerManager server;
public void testConnect() throws Exception {
@@ -101,10 +104,12 @@
"destination:/queue/" + getQueueName() + "\n\n" +
"Hello World" +
Stomp.NULL;
- sendFrame(frame);
-
- f = receiveFrame(10000);
- Assert.assertTrue(f.startsWith("ERROR"));
+ try {
+ sendFrame(frame);
+ Assert.fail("the socket must have been closed when the server handled the
DISCONNECT");
+ } catch (IOException e)
+ {
+ }
}
@@ -599,7 +604,7 @@
sendMessage(getName());
frame = receiveFrame(10000);
Assert.assertTrue(frame.startsWith("MESSAGE"));
- Pattern cl = Pattern.compile("message-id:\\s*(\\d+)",
Pattern.CASE_INSENSITIVE);
+ Pattern cl = Pattern.compile("message-id:\\s*(\\S+)",
Pattern.CASE_INSENSITIVE);
Matcher cl_matcher = cl.matcher(frame);
Assert.assertTrue(cl_matcher.find());
String messageID = cl_matcher.group(1);
@@ -1069,7 +1074,137 @@
Assert.assertNotNull(message);
Assert.assertEquals("second message", message.getText().trim());
}
+
+ public void testSubscribeToTopic() throws Exception {
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SUBSCRIBE\n" +
+ "destination:/topic/" + getTopicName() + "\n"
+
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ sendMessage(getName(), topic);
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+ Assert.assertTrue(frame.indexOf("destination:") > 0);
+ Assert.assertTrue(frame.indexOf(getName()) > 0);
+
+ frame =
+ "UNSUBSCRIBE\n" +
+ "destination:/topic/" + getTopicName() + "\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ sendMessage(getName(), topic);
+
+ try {
+ frame = receiveFrame(1000);
+ log.info("Received frame: " + frame);
+ Assert.fail("No message should have been received since subscription was
removed");
+ }
+ catch (SocketTimeoutException e) {
+
+ }
+
+ frame =
+ "DISCONNECT\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+ }
+
+ public void testClientAckNotPartOfTransaction() throws Exception {
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SUBSCRIBE\n" +
+ "destination:/queue/" + getQueueName() + "\n"
+
+ "ack:client\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ sendMessage(getName());
+
+ frame = receiveFrame(10000);
+ System.out.println(frame);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+ Assert.assertTrue(frame.indexOf("destination:") > 0);
+ Assert.assertTrue(frame.indexOf(getName()) > 0);
+ Assert.assertTrue(frame.indexOf("message-id:") > 0);
+ Pattern cl = Pattern.compile("message-id:\\s*(\\S+)",
Pattern.CASE_INSENSITIVE);
+ Matcher cl_matcher = cl.matcher(frame);
+ Assert.assertTrue(cl_matcher.find());
+ String messageID = cl_matcher.group(1);
+ System.out.println(messageID);
+
+ frame =
+ "BEGIN\n" +
+ "transaction: tx1\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame =
+ "ACK\n" +
+ "message-id:" + messageID + "\n" +
+ "transaction: tx1\n" +
+ "\n" +
+ "second message" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame =
+ "ABORT\n" +
+ "transaction: tx1\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ try {
+ frame = receiveFrame(1000);
+ log.info("Received frame: " + frame);
+ Assert.fail("No message should have been received as the message was
acked even though the transaction has been aborted");
+ }
+ catch (SocketTimeoutException e) {
+ }
+
+ frame =
+ "UNSUBSCRIBE\n" +
+ "destination:/topic/" + getQueueName() + "\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame =
+ "DISCONNECT\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+ }
+
// Implementation methods
//-------------------------------------------------------------------------
protected void setUp() throws Exception {
@@ -1083,6 +1218,7 @@
connection = connectionFactory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
queue = session.createQueue(getQueueName());
+ topic = session.createTopic(getTopicName());
connection.start();
}
@@ -1106,6 +1242,7 @@
JMSConfiguration jmsConfig = new JMSConfigurationImpl();
jmsConfig.getQueueConfigurations().add(new QueueConfigurationImpl(getQueueName(),
null, false, getQueueName()));
+ jmsConfig.getTopicConfigurations().add(new TopicConfigurationImpl(getTopicName(),
getTopicName()));
server = new JMSServerManagerImpl(hornetQServer, jmsConfig);
server.setContext(null);
return server;
@@ -1145,6 +1282,10 @@
return "test";
}
+ protected String getTopicName() {
+ return "testtopic";
+ }
+
public void sendFrame(String data) throws Exception {
byte[] bytes = data.getBytes("UTF-8");
OutputStream outputStream = stompSocket.getOutputStream();
@@ -1182,11 +1323,19 @@
}
public void sendMessage(String msg) throws Exception {
- sendMessage(msg, "foo", "xyz");
+ sendMessage(msg, "foo", "xyz", queue);
+ }
+
+ public void sendMessage(String msg, Destination destination) throws Exception {
+ sendMessage(msg, "foo", "xyz", destination);
}
public void sendMessage(String msg, String propertyName, String propertyValue) throws
JMSException {
- MessageProducer producer = session.createProducer(queue);
+ sendMessage(msg, propertyName, propertyValue, queue);
+ }
+
+ public void sendMessage(String msg, String propertyName, String propertyValue,
Destination destination) throws JMSException {
+ MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage(msg);
message.setStringProperty(propertyName, propertyValue);
producer.send(message);