Author: jmesnil
Date: 2010-02-15 09:51:59 -0500 (Mon, 15 Feb 2010)
New Revision: 8874
Modified:
trunk/docs/user-manual/en/interoperability.xml
trunk/src/main/org/hornetq/core/protocol/stomp/StompFrame.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-129: Implement STOMP v1.0
* added missing calls to coordinate with the storageManager
* added support for no-local header
Modified: trunk/docs/user-manual/en/interoperability.xml
===================================================================
--- trunk/docs/user-manual/en/interoperability.xml 2010-02-15 11:07:59 UTC (rev 8873)
+++ trunk/docs/user-manual/en/interoperability.xml 2010-02-15 14:51:59 UTC (rev 8874)
@@ -89,7 +89,7 @@
</programlisting>
</listitem>
<listitem>
- <para>send or subscribe to a JMS
<emphasis>Queue</emphasis> by prepending the topic name by
<literal>jms.topic.</literal>.</para>
+ <para>send or subscribe to a JMS
<emphasis>Topic</emphasis> by prepending the topic name by
<literal>jms.topic.</literal>.</para>
<para>For example to subscribe to the
<literal>stocks</literal> JMS Topic, the Stomp client must send the
frame:</para>
<programlisting>
SUBSCRIBE
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompFrame.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompFrame.java 2010-02-15 11:07:59 UTC
(rev 8873)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompFrame.java 2010-02-15 14:51:59 UTC
(rev 8874)
@@ -35,6 +35,8 @@
private byte[] content = StompFrame.NO_DATA;
+ private int size = -1;
+
public StompFrame()
{
this.headers = new HashMap<String, Object>();
@@ -62,9 +64,25 @@
return headers;
}
+ public int getEncodedSize()
+ {
+ if (size == -1)
+ {
+ throw new IllegalStateException("Frame has not been encoded yet");
+ }
+
+ return size ;
+ }
+
+ public void setEncodedSize(int size)
+ {
+ this.size = size;
+ }
+
@Override
public String toString()
{
return "StompFrame[command=" + command + ", headers=" + headers
+ ", content-length=" + content.length + "]";
}
+
}
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java 2010-02-15
11:07:59 UTC (rev 8873)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java 2010-02-15
14:51:59 UTC (rev 8874)
@@ -62,7 +62,9 @@
DataOutputStream dos = new DataOutputStream(baos);
marshal(command, dos);
dos.close();
- return baos.toByteArray();
+ byte[] bytes = baos.toByteArray();
+ command.setEncodedSize(bytes.length);
+ return bytes;
}
public void marshal(StompFrame stomp, DataOutput os) throws IOException
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-02-15
11:07:59 UTC (rev 8873)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-02-15
14:51:59 UTC (rev 8874)
@@ -27,10 +27,12 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Interceptor;
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.ServerSession;
@@ -52,18 +54,21 @@
private static final Logger log = Logger.getLogger(StompProtocolManager.class);
+ // TODO use same value than HornetQConnection
+ private static final String CONNECTION_ID_PROP = "__HQ_CID";
+
// Attributes ----------------------------------------------------
private final HornetQServer server;
private final StompMarshaller marshaller;
+ private final Executor executor;
+
private final Map<String, StompSession> transactedSessions = new
HashMap<String, StompSession>();
private final Map<RemotingConnection, StompSession> sessions = new
HashMap<RemotingConnection, StompSession>();
- private Executor executor;
-
// Static --------------------------------------------------------
private static StompFrame createError(Exception e, StompFrame request)
@@ -129,7 +134,14 @@
{
public void run()
{
- doHandleBuffer(connection, buffer);
+ try
+ {
+ doHandleBuffer(connection, buffer);
+ }
+ finally
+ {
+ server.getStorageManager().clearContext();
+ }
}
});
}
@@ -240,7 +252,22 @@
String selector = (String)headers.get(Stomp.Headers.Subscribe.SELECTOR);
String ack = (String)headers.get(Stomp.Headers.Subscribe.ACK_MODE);
String id = (String)headers.get(Stomp.Headers.Subscribe.ID);
-
+ boolean noLocal = false;
+ if (headers.containsKey(Stomp.Headers.Subscribe.NO_LOCAL))
+ {
+ noLocal =
Boolean.parseBoolean((String)headers.get(Stomp.Headers.Subscribe.NO_LOCAL));
+ }
+ if (noLocal)
+ {
+ String noLocalFilter = CONNECTION_ID_PROP + " <> '" +
connection.getID().toString() + "'";
+ if (selector == null)
+ {
+ selector = noLocalFilter;
+ } else
+ {
+ selector += " AND " + noLocalFilter;
+ }
+ }
if (ack == null)
{
ack = Stomp.Headers.Subscribe.AckModeValues.AUTO;
@@ -259,11 +286,13 @@
subscriptionID = "subscription/" + destination;
}
StompSession stompSession = getSession(connection);
+ stompSession.setNoLocal(noLocal);
if (stompSession.containsSubscription(subscriptionID))
{
throw new StompException("There already is a subscription for: " +
subscriptionID +
". Either use unique subscription IDs or do not
create multiple subscriptions for the same destination");
}
+ server.getStorageManager().setContext(stompSession.getContext());
long consumerID = server.getStorageManager().generateUniqueID();
stompSession.addSubscription(consumerID, subscriptionID, destination, selector,
ack);
@@ -291,6 +320,7 @@
}
StompSession stompSession = getSession(connection);
+ server.getStorageManager().setContext(stompSession.getContext());
boolean unsubscribed = stompSession.unsubscribe(subscriptionID);
if (!unsubscribed)
{
@@ -310,6 +340,7 @@
log.warn("Transactional acknowledgement is not supported");
}
stompSession = getSession(connection);
+ server.getStorageManager().setContext(stompSession.getContext());
stompSession.acknowledge(messageID);
return null;
@@ -386,7 +417,7 @@
StompSession stompSession = sessions.get(connection);
if (stompSession == null)
{
- stompSession = new StompSession(connection, this);
+ stompSession = new StompSession(connection, this,
server.getStorageManager().newContext(executor));
String name = UUIDGenerator.getInstance().generateStringUUID();
ServerSession session = server.createSession(name,
connection.getLogin(),
@@ -409,7 +440,7 @@
StompSession stompSession = transactedSessions.get(txID);
if (stompSession == null)
{
- stompSession = new StompSession(connection, this);
+ stompSession = new StompSession(connection, this,
server.getStorageManager().newContext(executor));
String name = UUIDGenerator.getInstance().generateStringUUID();
ServerSession session = server.createSession(name,
connection.getLogin(),
@@ -490,17 +521,21 @@
message.getBodyBuffer().writeBytes(content);
}
- ServerSession session = null;
+ StompSession stompSession = null;
if (txID == null)
{
- session = getSession(connection).getSession();
+ stompSession = getSession(connection);
}
else
{
- session = transactedSessions.get(txID).getSession();
+ stompSession = transactedSessions.get(txID);
}
-
- session.send(message);
+ server.getStorageManager().setContext(stompSession.getContext());
+ if (stompSession.isNoLocal())
+ {
+ message.putStringProperty(CONNECTION_ID_PROP, connection.getID().toString());
+ }
+ stompSession.getSession().send(message);
return null;
}
@@ -528,32 +563,23 @@
return new StompFrame(Stomp.Responses.CONNECTED, h, StompMarshaller.NO_DATA);
}
- public int send(StompConnection connection, StompFrame frame)
+ public void send(final StompConnection connection, final StompFrame frame)
{
- if (log.isTraceEnabled())
+ server.getStorageManager().afterCompleteOperations(new IOAsyncTask()
{
- log.trace("sent " + frame);
- }
- synchronized (connection)
- {
- if (connection.isDestroyed() || !connection.isValid())
+ public void onError(final int errorCode, final String errorMessage)
{
- log.warn("Connection closed " + connection);
- return 0;
+ log.warn("Error processing IOCallback code = " + errorCode + "
message = " + errorMessage);
+
+ StompFrame error = createError(new HornetQException(errorCode, errorMessage),
frame);
+ doSend(connection, error);
}
- try
+
+ public void done()
{
- byte[] bytes = marshaller.marshal(frame);
- HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
- connection.getTransportConnection().write(buffer, true);
- return bytes.length;
+ doSend(connection, frame);
}
- catch (IOException e)
- {
- log.error("Unable to send frame " + frame, e);
- return 0;
- }
- }
+ });
}
public void cleanup(StompConnection connection)
@@ -597,6 +623,31 @@
}
}
}
+
+ private void doSend(final StompConnection connection, final StompFrame frame)
+ {
+ if (log.isTraceEnabled())
+ {
+ log.trace("sent " + frame);
+ }
+ synchronized (connection)
+ {
+ if (connection.isDestroyed() || !connection.isValid())
+ {
+ log.warn("Connection closed " + connection);
+ }
+ try
+ {
+ byte[] bytes = marshaller.marshal(frame);
+ HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
+ connection.getTransportConnection().write(buffer, true);
+ }
+ catch (IOException e)
+ {
+ log.error("Unable to send frame " + frame, e);
+ }
+ }
+ }
// Inner classes -------------------------------------------------
}
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java 2010-02-15 11:07:59
UTC (rev 8873)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java 2010-02-15 14:51:59
UTC (rev 8874)
@@ -21,6 +21,7 @@
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.message.impl.MessageImpl;
+import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.protocol.stomp.Stomp.Headers;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.ServerSession;
@@ -41,15 +42,20 @@
private ServerSession session;
+ private final OperationContext sessionContext;
+
private final Map<Long, StompSubscription> subscriptions = new HashMap<Long,
StompSubscription>();
// key = message ID, value = consumer ID
private final Map<Long, Long> messagesToAck = new HashMap<Long, Long>();
- StompSession(final StompConnection connection, final StompProtocolManager manager)
+ private boolean noLocal = false;
+
+ StompSession(final StompConnection connection, final StompProtocolManager manager,
OperationContext sessionContext)
{
this.connection = connection;
this.manager = manager;
+ this.sessionContext = sessionContext;
}
void setServerSession(ServerSession session)
@@ -102,8 +108,9 @@
StompFrame frame = new StompFrame(Stomp.Responses.MESSAGE, headers, data);
StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame,
deliveryCount);
- int length = manager.send(connection, frame);
-
+ manager.send(connection, frame);
+ int size = frame.getEncodedSize();
+
if (subscription.getAck().equals(Stomp.Headers.Subscribe.AckModeValues.AUTO))
{
session.acknowledge(consumerID, serverMessage.getMessageID());
@@ -113,7 +120,7 @@
{
messagesToAck.put(serverMessage.getMessageID(), consumerID);
}
- return length;
+ return size;
}
catch (Exception e)
@@ -201,4 +208,19 @@
{
return connection;
}
+
+ public OperationContext getContext()
+ {
+ return sessionContext;
+ }
+
+ public boolean isNoLocal()
+ {
+ return noLocal;
+ }
+
+ public void setNoLocal(boolean noLocal)
+ {
+ this.noLocal = noLocal;
+ }
}
\ No newline at end of file
Modified: trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-02-15 11:07:59
UTC (rev 8873)
+++ trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-02-15 14:51:59
UTC (rev 8874)
@@ -1143,6 +1143,59 @@
sendFrame(frame);
}
+ public void testSubscribeToTopicWithNoLocal() throws Exception {
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SUBSCRIBE\n" +
+ "destination:" + getTopicPrefix() + getTopicName() +
"\n" +
+ "receipt: 12\n" +
+ "no-local: true\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+ // wait for SUBSCRIBE's receipt
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("RECEIPT"));
+
+ // send a message on the same connection => it should not be received
+ frame = "SEND\n" +
+ "destination:" + getTopicPrefix() + getTopicName() + "\n\n"
+
+ "Hello World" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ try {
+ frame = receiveFrame(2000);
+ log.info("Received frame: " + frame);
+ Assert.fail("No message should have been received since subscription is
noLocal");
+ }
+ catch (SocketTimeoutException e) {
+ }
+
+ // send message on another JMS connection => it should be received
+ sendMessage(getName(), topic);
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+ Assert.assertTrue(frame.indexOf("destination:") > 0);
+ Assert.assertTrue(frame.indexOf(getName()) > 0);
+
+ frame =
+ "DISCONNECT\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+ }
+
public void testClientAckNotPartOfTransaction() throws Exception {
String frame =