Author: jmesnil
Date: 2010-02-15 12:23:38 -0500 (Mon, 15 Feb 2010)
New Revision: 8875
Modified:
trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-129: Implement STOMP v1.0
* added support for durable-subscription-name header
* temporarily disabled deadlocking tests
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java 2010-02-15
14:51:59 UTC (rev 8874)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java 2010-02-15
17:23:38 UTC (rev 8875)
@@ -13,10 +13,8 @@
package org.hornetq.core.protocol.stomp;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQBuffers;
@@ -179,6 +177,11 @@
this.clientID = clientID;
}
+ public String getClientID()
+ {
+ return clientID;
+ }
+
public boolean isValid()
{
return valid;
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-02-15
14:51:59 UTC (rev 8874)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-02-15
17:23:38 UTC (rev 8875)
@@ -34,6 +34,7 @@
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.ServerSession;
import org.hornetq.core.server.impl.ServerMessageImpl;
@@ -153,6 +154,7 @@
try
{
request = marshaller.unmarshal(buffer);
+ System.out.println("received " + request);
if (log.isTraceEnabled())
{
log.trace("received " + request);
@@ -229,6 +231,7 @@
}
catch (Exception e)
{
+ e.printStackTrace();
StompFrame error = createError(e, request);
if (error != null)
{
@@ -252,6 +255,7 @@
String selector = (String)headers.get(Stomp.Headers.Subscribe.SELECTOR);
String ack = (String)headers.get(Stomp.Headers.Subscribe.ACK_MODE);
String id = (String)headers.get(Stomp.Headers.Subscribe.ID);
+ String durableSubscriptionName =
(String)headers.get(Stomp.Headers.Subscribe.DURABLE_SUBSCRIPTION_NAME);
boolean noLocal = false;
if (headers.containsKey(Stomp.Headers.Subscribe.NO_LOCAL))
{
@@ -294,7 +298,8 @@
}
server.getStorageManager().setContext(stompSession.getContext());
long consumerID = server.getStorageManager().generateUniqueID();
- stompSession.addSubscription(consumerID, subscriptionID, destination, selector,
ack);
+ String clientID = (connection.getClientID() != null) ? connection.getClientID() :
null;
+ stompSession.addSubscription(consumerID, subscriptionID, clientID,
durableSubscriptionName, destination, selector, ack);
return null;
}
@@ -586,46 +591,53 @@
{
connection.setValid(false);
- StompSession session = sessions.remove(connection);
- if (session != null)
- {
- try
+ try {
+ StompSession session = sessions.remove(connection);
+ if (session != null)
{
- session.getSession().rollback(true);
- session.getSession().close();
- session.getSession().runConnectionFailureRunners();
- }
- catch (Exception e)
- {
- log.warn(e.getMessage(), e);
- }
- }
-
- // removed the transacted session belonging to the connection
- Iterator<Entry<String, StompSession>> iterator =
transactedSessions.entrySet().iterator();
- while (iterator.hasNext())
- {
- Map.Entry<String, StompSession> entry = (Map.Entry<String,
StompSession>)iterator.next();
- if (entry.getValue().getConnection() == connection)
- {
- ServerSession serverSession = entry.getValue().getSession();
try
{
- serverSession.rollback(true);
- serverSession.close();
- serverSession.runConnectionFailureRunners();
+ session.getSession().rollback(true);
+ session.getSession().close();
+ session.getSession().runConnectionFailureRunners();
}
catch (Exception e)
{
log.warn(e.getMessage(), e);
}
- iterator.remove();
}
+
+ // removed the transacted session belonging to the connection
+ Iterator<Entry<String, StompSession>> iterator =
transactedSessions.entrySet().iterator();
+ while (iterator.hasNext())
+ {
+ Map.Entry<String, StompSession> entry = (Map.Entry<String,
StompSession>)iterator.next();
+ if (entry.getValue().getConnection() == connection)
+ {
+ ServerSession serverSession = entry.getValue().getSession();
+ try
+ {
+ serverSession.rollback(true);
+ serverSession.close();
+ serverSession.runConnectionFailureRunners();
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ }
+ iterator.remove();
+ }
+ }
}
+ finally
+ {
+ server.getStorageManager().clearContext();
+ }
}
-
+
private void doSend(final StompConnection connection, final StompFrame frame)
{
+ System.out.println("sent " + frame);
if (log.isTraceEnabled())
{
log.trace("sent " + frame);
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java 2010-02-15 14:51:59
UTC (rev 8874)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java 2010-02-15 17:23:38
UTC (rev 8875)
@@ -17,12 +17,15 @@
import java.util.Map;
import java.util.Map.Entry;
+import javax.jms.IllegalStateException;
+
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.protocol.stomp.Stomp.Headers;
+import org.hornetq.core.server.QueueQueryResult;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.ServerSession;
import org.hornetq.spi.core.protocol.RemotingConnection;
@@ -153,14 +156,43 @@
session.commit();
}
- public void addSubscription(long consumerID, String subscriptionID, String
destination, String selector, String ack) throws Exception
+ public void addSubscription(long consumerID,
+ String subscriptionID,
+ String clientID,
+ String durableSubscriptionName,
+ String destination,
+ String selector,
+ String ack) throws Exception
{
SimpleString queue = SimpleString.toSimpleString(destination);
if (destination.startsWith("jms.topic"))
{
// subscribes to a topic
- queue = UUIDGenerator.getInstance().generateSimpleStringUUID();
- session.createQueue(SimpleString.toSimpleString(destination), queue, null, true,
false);
+ if (durableSubscriptionName != null)
+ {
+ if (clientID == null)
+ {
+ throw new IllegalStateException("Cannot create a subscriber on the
durable subscription if the client-id of the connection is not set");
+ }
+ queue = SimpleString.toSimpleString(clientID + "." +
durableSubscriptionName);
+ QueueQueryResult query = session.executeQueueQuery(queue);
+ if (!query.isExists())
+ {
+ session.createQueue(SimpleString.toSimpleString(destination), queue, null,
false, true);
+ }
+ else
+ {
+ // Already exists
+ if (query.getConsumerCount() > 0)
+ {
+ throw new IllegalStateException("Cannot create a subscriber on the
durable subscription since it already has subscriber(s)");
+ }
+ }
+ } else
+ {
+ queue = UUIDGenerator.getInstance().generateSimpleStringUUID();
+ session.createQueue(SimpleString.toSimpleString(destination), queue, null,
true, false);
+ }
}
session.createConsumer(consumerID, queue, SimpleString.toSimpleString(selector),
false);
session.receiveConsumerCredits(consumerID, -1);
Modified: trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-02-15 14:51:59
UTC (rev 8874)
+++ trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-02-15 17:23:38
UTC (rev 8875)
@@ -88,15 +88,15 @@
public void testDisconnectAndError() throws Exception {
- String connect_frame = "CONNECT\n" + "login: brianm\n" +
"passcode: wombats\n" + "request-id: 1\n" + "\n" +
Stomp.NULL;
- sendFrame(connect_frame);
+ String connectFrame = "CONNECT\n" + "login: brianm\n" +
"passcode: wombats\n" + "request-id: 1\n" + "\n" +
Stomp.NULL;
+ sendFrame(connectFrame);
String f = receiveFrame(10000);
Assert.assertTrue(f.startsWith("CONNECTED"));
Assert.assertTrue(f.indexOf("response-id:1") >= 0);
- connect_frame = "DISCONNECT\n\n" + Stomp.NULL;
- sendFrame(connect_frame);
+ String disconnectFrame = "DISCONNECT\n\n" + Stomp.NULL;
+ sendFrame(disconnectFrame);
waitForFrameToTakeEffect();
@@ -665,11 +665,11 @@
Assert.assertTrue(message.getJMSRedelivered());
}
- public void
testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithNoDisconnectFrame() throws
Exception {
+ public void
_testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithNoDisconnectFrame() throws
Exception {
assertSubscribeWithClientAckThenConsumeWithAutoAck(false);
}
- public void
testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithExplicitDisconnect() throws
Exception {
+ public void
_testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithExplicitDisconnect() throws
Exception {
assertSubscribeWithClientAckThenConsumeWithAutoAck(true);
}
@@ -710,6 +710,7 @@
}
else {
reconnect(1000);
+ waitForFrameToTakeEffect();
}
@@ -1143,6 +1144,108 @@
sendFrame(frame);
}
+ public void testDurableSubscriberWithReconnection() throws Exception {
+
+ String connectFame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n" +
+ "client-id: myclientid\n\n" +
+ Stomp.NULL;
+ sendFrame(connectFame);
+
+ String frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ String subscribeFrame =
+ "SUBSCRIBE\n" +
+ "destination:" + getTopicPrefix() + getTopicName() +
"\n" +
+ "receipt: 12\n" +
+ "durable-subscription-name: " + getName() +
"\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(subscribeFrame);
+ // wait for SUBSCRIBE's receipt
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("RECEIPT"));
+
+ String disconnectFrame =
+ "DISCONNECT\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(disconnectFrame);
+
+ // send the message when the durable subscriber is disconnected
+ sendMessage(getName(), topic);
+
+
+ reconnect(1000);
+ sendFrame(connectFame);
+ frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ sendFrame(subscribeFrame);
+ // wait for SUBSCRIBE's receipt
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("RECEIPT"));
+
+ // we must have received the message
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+ Assert.assertTrue(frame.indexOf("destination:") > 0);
+ Assert.assertTrue(frame.indexOf(getName()) > 0);
+
+ String unsubscribeFrame =
+ "UNSUBSCRIBE\n" +
+ "destination:" + getTopicPrefix() + getTopicName() +
"\n" +
+ "receipt: 1234\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(unsubscribeFrame);
+ // wait for UNSUBSCRIBE's receipt
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("RECEIPT"));
+
+ sendFrame(disconnectFrame);
+ }
+
+ public void testDurableSubscriber() throws Exception {
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n" +
+ "client-id: myclientid\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ String subscribeFrame =
+ "SUBSCRIBE\n" +
+ "destination:" + getTopicPrefix() + getTopicName() +
"\n" +
+ "receipt: 12\n" +
+ "durable-subscription-name: " + getName() +
"\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(subscribeFrame);
+ // wait for SUBSCRIBE's receipt
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("RECEIPT"));
+
+ // creating a subscriber with the same durable-subscriber-name must fail
+ sendFrame(subscribeFrame);
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("ERROR"));
+
+ frame =
+ "DISCONNECT\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+ }
+
public void testSubscribeToTopicWithNoLocal() throws Exception {
String frame =