Author: jmesnil
Date: 2010-01-26 10:27:21 -0500 (Tue, 26 Jan 2010)
New Revision: 8845
Modified:
trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompSubscription.java
trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-129: Implement STOMP v1.0
* added checks and headers support
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java 2010-01-26
13:31:50 UTC (rev 8844)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java 2010-01-26
15:27:21 UTC (rev 8845)
@@ -22,7 +22,6 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.CloseListener;
import org.hornetq.core.remoting.FailureListener;
-import org.hornetq.spi.core.protocol.ProtocolManager;
import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.spi.core.remoting.Connection;
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-01-26
13:31:50 UTC (rev 8844)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-01-26
15:27:21 UTC (rev 8845)
@@ -19,12 +19,13 @@
import java.io.PrintWriter;
import java.io.UnsupportedEncodingException;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQBuffers;
-import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Interceptor;
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
@@ -81,7 +82,9 @@
headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
}
- return new StompFrame(Stomp.Responses.ERROR, headers, baos.toByteArray());
+ byte[] payload = baos.toByteArray();
+ headers.put(Stomp.Headers.CONTENT_LENGTH, payload.length);
+ return new StompFrame(Stomp.Responses.ERROR, headers, payload);
}
catch (UnsupportedEncodingException ex)
{
@@ -175,12 +178,13 @@
if (request.getHeaders().containsKey(Stomp.Headers.RECEIPT_REQUESTED))
{
- if (response == null)
+ if (response == null)
{
Map<String, Object> h = new HashMap<String, Object>();
response = new StompFrame(Stomp.Responses.RECEIPT, h,
StompMarshaller.NO_DATA);
}
- response.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID,
request.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED));
+ response.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID,
+
request.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED));
}
if (response != null)
@@ -206,52 +210,88 @@
// Private -------------------------------------------------------
- private StompFrame onSubscribe(StompFrame frame, HornetQServer server, StompConnection
connection) throws Exception,
-
StompException,
-
HornetQException
+ private StompFrame onSubscribe(StompFrame frame, HornetQServer server, StompConnection
connection) throws Exception
{
Map<String, Object> headers = frame.getHeaders();
String destination = (String)headers.get(Stomp.Headers.Subscribe.DESTINATION);
String selector = (String)headers.get(Stomp.Headers.Subscribe.SELECTOR);
String ack = (String)headers.get(Stomp.Headers.Subscribe.ACK_MODE);
- String subID =
(String)headers.get(Stomp.Headers.Subscribe.DURABLE_SUBSCRIPTION_NAME);
+ String id = (String)headers.get(Stomp.Headers.Subscribe.ID);
if (ack == null)
{
ack = Stomp.Headers.Subscribe.AckModeValues.AUTO;
}
+ String subscriptionID = null;
+ if (id != null)
+ {
+ subscriptionID = id;
+ }
+ else
+ {
+ if (destination == null)
+ {
+ throw new StompException("Client must set destination or id header to a
SUBSCRIBE command");
+ }
+ subscriptionID = "subscription/" + destination;
+ }
StompSession stompSession = getSession(connection);
+ if (stompSession.containsSubscription(subscriptionID))
+ {
+ throw new StompException("There already is a subscription for: " +
subscriptionID +
+ ". Either use unique subscription IDs or do not
create multiple subscriptions for the same destination");
+ }
long consumerID = server.getStorageManager().generateUniqueID();
- stompSession.addSubscription(consumerID, subID, destination, selector, ack);
+ stompSession.addSubscription(consumerID, subscriptionID, destination, selector,
ack);
return null;
}
- private StompFrame onUnsubscribe(StompFrame frame, HornetQServer server,
StompConnection connection) throws Exception,
-
StompException,
-
HornetQException
+ private StompFrame onUnsubscribe(StompFrame frame, HornetQServer server,
StompConnection connection) throws Exception
{
Map<String, Object> headers = frame.getHeaders();
String destination = (String)headers.get(Stomp.Headers.Unsubscribe.DESTINATION);
String id = (String)headers.get(Stomp.Headers.Unsubscribe.ID);
+ String subscriptionID = null;
+ if (id != null)
+ {
+ subscriptionID = id;
+ }
+ else
+ {
+ if (destination == null)
+ {
+ throw new StompException("Must specify the subscription's id or the
destination you are unsubscribing from");
+ }
+ subscriptionID = "subscription/" + destination;
+ }
+
StompSession stompSession = getSession(connection);
- stompSession.unsubscribe(destination);
-
+ boolean unsubscribed = stompSession.unsubscribe(subscriptionID);
+ if (!unsubscribed)
+ {
+ throw new StompException("Cannot unsubscribe as o subscription exists for
id: " + subscriptionID);
+ }
return null;
}
- private StompFrame onAck(StompFrame frame, HornetQServer server, StompConnection
connection) throws Exception,
-
StompException,
-
HornetQException
+ private StompFrame onAck(StompFrame frame, HornetQServer server, StompConnection
connection) throws Exception
{
Map<String, Object> headers = frame.getHeaders();
- String messageID = (String)headers.get(Stomp.Headers.Ack.MESSAGE_ID);
+ String messageID = (String)headers.get(Stomp.Headers.Ack.MESSAGE_ID);
String txID = (String)headers.get(Stomp.Headers.TRANSACTION);
-
- StompSession stompSession = getSession(connection);
+ StompSession stompSession = null;
+ if (txID != null)
+ {
+ throw new StompException("transactional ACK are not supported");
+ }
+ else
+ {
+ stompSession = getSession(connection);
+ }
stompSession.acknowledge(messageID);
-
+
return null;
}
@@ -259,24 +299,30 @@
{
Map<String, Object> headers = frame.getHeaders();
String txID = (String)headers.get(Stomp.Headers.TRANSACTION);
-
+ if (txID == null)
+ {
+ throw new StompException("transaction header is mandatory to BEGIN a
transaction");
+ }
if (transactedSessions.containsKey(txID))
{
throw new StompException("Transaction already started: " + txID);
}
- StompSession stompSession = getTransactedSession(connection, txID);
+ // create the transacted session
+ getTransactedSession(connection, txID);
+
return null;
}
- private StompFrame onCommit(StompFrame frame, HornetQServer server, StompConnection
connection) throws Exception,
-
StompException,
-
HornetQException
+ private StompFrame onCommit(StompFrame frame, HornetQServer server, StompConnection
connection) throws Exception
{
Map<String, Object> headers = frame.getHeaders();
String txID = (String)headers.get(Stomp.Headers.TRANSACTION);
+ if (txID == null)
+ {
+ throw new StompException("transaction header is mandatory to COMMIT a
transaction");
+ }
StompSession session = transactedSessions.remove(txID);
-
if (session == null)
{
throw new StompException("No transaction started: " + txID);
@@ -287,12 +333,14 @@
return null;
}
- private StompFrame onAbort(StompFrame frame, HornetQServer server, StompConnection
connection) throws Exception,
-
StompException,
-
HornetQException
+ private StompFrame onAbort(StompFrame frame, HornetQServer server, StompConnection
connection) throws Exception
{
Map<String, Object> headers = frame.getHeaders();
String txID = (String)headers.get(Stomp.Headers.TRANSACTION);
+ if (txID == null)
+ {
+ throw new StompException("transaction header is mandatory to ABORT a
transaction");
+ }
StompSession session = transactedSessions.remove(txID);
@@ -361,7 +409,7 @@
private StompFrame onDisconnect(StompFrame frame, HornetQServer server,
StompConnection connection) throws Exception
{
- StompSession session = getSession(connection);
+ StompSession session = sessions.remove(connection);
if (session != null)
{
try
@@ -373,9 +421,23 @@
{
throw new StompException(e.getMessage());
}
- sessions.remove(connection);
- connection.setValid(false);
}
+
+ // removed the transacted session belonging to the connection
+ Iterator<Entry<String, StompSession>> iterator =
transactedSessions.entrySet().iterator();
+ while (iterator.hasNext())
+ {
+ Map.Entry<String, StompSession> entry = (Map.Entry<String,
StompSession>)iterator.next();
+ if (entry.getValue().getConnection() == connection)
+ {
+ ServerSession serverSession = entry.getValue().getSession();
+ serverSession.rollback(true);
+ serverSession.close();
+ iterator.remove();
+ }
+ }
+ connection.setValid(false);
+
return null;
}
@@ -412,7 +474,8 @@
if (txID == null)
{
session = getSession(connection).getSession();
- } else
+ }
+ else
{
session = transactedSessions.get(txID).getSession();
}
@@ -461,7 +524,7 @@
}
}
- public synchronized void cleanup(StompConnection conn)
+ synchronized void cleanup(StompConnection conn)
{
StompSession session = sessions.remove(conn);
if (session != null)
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java 2010-01-26 13:31:50
UTC (rev 8844)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java 2010-01-26 15:27:21
UTC (rev 8845)
@@ -70,9 +70,15 @@
{
try
{
+ StompSubscription subscription = subscriptions.get(consumerID);
+
Map<String, Object> headers = new HashMap<String, Object>();
headers.put(Stomp.Headers.Message.DESTINATION,
StompUtils.toStompDestination(serverMessage.getAddress()
.toString()));
+ if (subscription.getID() != null)
+ {
+ headers.put(Stomp.Headers.Message.SUBSCRIPTION, subscription.getID());
+ }
byte[] data = new byte[] {};
if (serverMessage.getType() == Message.TEXT_TYPE)
{
@@ -91,15 +97,15 @@
buffer.readBytes(data);
headers.put(Headers.CONTENT_LENGTH, data.length);
}
+
StompFrame frame = new StompFrame(Stomp.Responses.MESSAGE, headers, data);
StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame,
deliveryCount);
+
System.out.println(">>> " + frame);
byte[] bytes = marshaller.marshal(frame);
HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
connection.getTransportConnection().write(buffer, true);
- StompSubscription subscription = subscriptions.get(consumerID);
-
if (subscription.getAck().equals(Stomp.Headers.Subscribe.AckModeValues.AUTO))
{
session.acknowledge(consumerID, serverMessage.getMessageID());
@@ -142,7 +148,7 @@
session.commit();
}
- public void addSubscription(long consumerID, String clientID, String destination,
String selector, String ack) throws Exception
+ public void addSubscription(long consumerID, String subscriptionID, String
destination, String selector, String ack) throws Exception
{
String queue = StompUtils.toHornetQAddress(destination);
synchronized (session)
@@ -152,7 +158,7 @@
SimpleString.toSimpleString(selector),
false);
session.receiveConsumerCredits(consumerID, -1);
- StompSubscription subscription = new StompSubscription(consumerID, clientID,
destination, ack);
+ StompSubscription subscription = new StompSubscription(subscriptionID,
destination, ack);
subscriptions.put(consumerID, subscription);
// FIXME not very smart: since we can't start the consumer, we start the
session
// everytime to start the new consumer (and all previous consumers...)
@@ -160,7 +166,7 @@
}
}
- public void unsubscribe(String destination) throws Exception
+ public boolean unsubscribe(String id) throws Exception
{
Iterator<Entry<Long, StompSubscription>> iterator =
subscriptions.entrySet().iterator();
while (iterator.hasNext())
@@ -168,11 +174,33 @@
Map.Entry<Long, StompSubscription> entry = (Map.Entry<Long,
StompSubscription>)iterator.next();
long consumerID = entry.getKey();
StompSubscription sub = entry.getValue();
- if (sub.getDestination().equals(destination))
+ if (id != null && id.equals(sub.getID()))
{
iterator.remove();
session.closeConsumer(consumerID);
+ return true;
}
}
+ return false;
}
+
+ boolean containsSubscription(String subscriptionID)
+ {
+ Iterator<Entry<Long, StompSubscription>> iterator =
subscriptions.entrySet().iterator();
+ while (iterator.hasNext())
+ {
+ Map.Entry<Long, StompSubscription> entry = (Map.Entry<Long,
StompSubscription>)iterator.next();
+ StompSubscription sub = entry.getValue();
+ if (sub.getID().equals(subscriptionID))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public RemotingConnection getConnection()
+ {
+ return connection;
+ }
}
\ No newline at end of file
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompSubscription.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompSubscription.java 2010-01-26
13:31:50 UTC (rev 8844)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompSubscription.java 2010-01-26
15:27:21 UTC (rev 8845)
@@ -26,8 +26,6 @@
// Attributes ----------------------------------------------------
- private final long consumerID;
-
private final String subID;
private final String destination;
@@ -38,9 +36,8 @@
// Constructors --------------------------------------------------
- public StompSubscription(long consumerID, String subID, String destination, String
ack)
+ public StompSubscription(String subID, String destination, String ack)
{
- this.consumerID = consumerID;
this.subID = subID;
this.destination = destination;
this.ack = ack;
@@ -57,6 +54,11 @@
{
return destination;
}
+
+ public String getID()
+ {
+ return subID;
+ }
// Package protected ---------------------------------------------
Modified: trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-01-26 13:31:50
UTC (rev 8844)
+++ trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-01-26 15:27:21
UTC (rev 8845)
@@ -454,7 +454,42 @@
Stomp.NULL;
sendFrame(frame);
}
+
+ public void testSubscribeWithID() throws Exception {
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SUBSCRIBE\n" +
+ "destination:/queue/" + getQueueName() + "\n"
+
+ "ack:auto\n" +
+ "id: mysubid\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ sendMessage(getName());
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+ Assert.assertTrue(frame.indexOf("destination:") > 0);
+ Assert.assertTrue(frame.indexOf("subscription:") > 0);
+ Assert.assertTrue(frame.indexOf(getName()) > 0);
+
+ frame =
+ "DISCONNECT\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+ }
+
public void testMessagesAreInOrder() throws Exception {
int ctr = 10;
String[] data = new String[ctr];
@@ -622,7 +657,7 @@
Assert.assertNotNull(message);
Assert.assertTrue(message.getJMSRedelivered());
}
-
+
public void
testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithNoDisconnectFrame() throws
Exception {
assertSubscribeWithClientAckThenConsumeWithAutoAck(false);
}
@@ -771,6 +806,55 @@
}
}
+ public void testUnsubscribeWithID() throws Exception {
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+ frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SUBSCRIBE\n" +
+ "destination:/queue/" + getQueueName() + "\n"
+
+ "id: mysubid\n" +
+ "ack:auto\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ //send a message to our queue
+ sendMessage("first message");
+
+ //receive message from socket
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+
+ //remove suscription
+ frame =
+ "UNSUBSCRIBE\n" +
+ "id:mysubid\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ waitForFrameToTakeEffect();
+
+ //send a message to our queue
+ sendMessage("second message");
+
+ try {
+ frame = receiveFrame(1000);
+ log.info("Received frame: " + frame);
+ Assert.fail("No message should have been received since subscription was
removed");
+ }
+ catch (SocketTimeoutException e) {
+
+ }
+ }
+
public void testTransactionCommit() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
@@ -816,7 +900,106 @@
TextMessage message = (TextMessage) consumer.receive(1000);
Assert.assertNotNull("Should have received a message", message);
}
+
+ public void testSuccessiveTransactionsWithSameID() throws Exception {
+ MessageConsumer consumer = session.createConsumer(queue);
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ String f = receiveFrame(1000);
+ Assert.assertTrue(f.startsWith("CONNECTED"));
+
+ // first tx
+ frame =
+ "BEGIN\n" +
+ "transaction: tx1\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame =
+ "SEND\n" +
+ "destination:/queue/" + getQueueName() + "\n"
+
+ "transaction: tx1\n" +
+ "\n\n" +
+ "Hello World" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame =
+ "COMMIT\n" +
+ "transaction: tx1\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ TextMessage message = (TextMessage) consumer.receive(1000);
+ Assert.assertNotNull("Should have received a message", message);
+
+ // 2nd tx with same tx ID
+ frame =
+ "BEGIN\n" +
+ "transaction: tx1\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame =
+ "SEND\n" +
+ "destination:/queue/" + getQueueName() + "\n"
+
+ "transaction: tx1\n" +
+ "\n\n" +
+ "Hello World" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame =
+ "COMMIT\n" +
+ "transaction: tx1\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ message = (TextMessage) consumer.receive(1000);
+ Assert.assertNotNull("Should have received a message", message);
+}
+
+ public void testBeginSameTransactionTwice() throws Exception {
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ String f = receiveFrame(1000);
+ Assert.assertTrue(f.startsWith("CONNECTED"));
+
+ frame =
+ "BEGIN\n" +
+ "transaction: tx1\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ // begin the tx a 2nd time
+ frame =
+ "BEGIN\n" +
+ "transaction: tx1\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ f = receiveFrame(1000);
+ Assert.assertTrue(f.startsWith("ERROR"));
+
+ }
+
public void testTransactionRollback() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);