Author: jmesnil
Date: 2010-01-20 12:11:18 -0500 (Wed, 20 Jan 2010)
New Revision: 8817
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/Stomp.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-129: Implement STOMP v1.0
* added SUBSCRIBE command
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/Stomp.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/Stomp.java 2010-01-20
17:07:39 UTC (rev 8816)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/Stomp.java 2010-01-20
17:11:18 UTC (rev 8817)
@@ -31,8 +31,8 @@
String CONNECT = "CONNECT";
String SEND = "SEND";
String DISCONNECT = "DISCONNECT";
- String SUBSCRIBE = "SUB";
- String UNSUBSCRIBE = "UNSUB";
+ String SUBSCRIBE = "SUBSCRIBE";
+ String UNSUBSCRIBE = "UNSUBSCRIBE";
String BEGIN_TRANSACTION = "BEGIN";
String COMMIT_TRANSACTION = "COMMIT";
String ABORT_TRANSACTION = "ABORT";
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java 2010-01-20
17:07:39 UTC (rev 8816)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java 2010-01-20
17:11:18 UTC (rev 8817)
@@ -43,6 +43,9 @@
import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.impl.CorePacketDecoder;
import org.hornetq.core.remoting.impl.ssl.SSLSupport;
+import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
+import org.hornetq.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionSendMessage;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.ServerMessage;
@@ -587,7 +590,7 @@
{
response = onConnect(frame, server, connection);
}
- if (Stomp.Commands.DISCONNECT.equals(command))
+ else if (Stomp.Commands.DISCONNECT.equals(command))
{
response = onDisconnect(frame, server, connection);
}
@@ -595,10 +598,16 @@
{
response = onSend(frame, server, connection);
}
+ else if (Stomp.Commands.SUBSCRIBE.equals(command))
+ {
+ response = onSubscribe(frame, server, connection);
+ }
else
{
log.error("Unsupported Stomp frame: " + frame);
+ response = new StompFrame(Stomp.Responses.ERROR, new HashMap<String,
Object>(), ("Unsupported frame: " + command).getBytes());
}
+
if (response != null)
{
System.out.println(">>> will reply " + response);
@@ -638,19 +647,44 @@
}
}
- private void checkConnected(RemotingConnection connection) throws StompException
+ /**
+ * @param frame
+ * @param server
+ * @param connection
+ * @return
+ * @throws StompException
+ * @throws HornetQException
+ */
+ private StompFrame onSubscribe(StompFrame frame, HornetQServer server,
RemotingConnection connection) throws StompException, HornetQException
{
+ Map<String, Object> headers = frame.getHeaders();
+ String queue = (String)headers.get(Stomp.Headers.Send.DESTINATION);
+ SimpleString queueName = StompDestinationConverter.convertDestination(queue);
+
+ ServerSession session = checkAndGetSession(connection);
+ long id = server.getStorageManager().generateUniqueID();
+ SessionCreateConsumerMessage packet = new SessionCreateConsumerMessage(id ,
queueName, null, false, false);
+ session.handleCreateConsumer(packet);
+ SessionConsumerFlowCreditMessage credits = new
SessionConsumerFlowCreditMessage(id, -1);
+ session.handleReceiveConsumerCredits(credits );
+ session.handleStart(new PacketImpl(PacketImpl.SESS_START));
+
+ return null;
+ }
+
+ private ServerSession checkAndGetSession(RemotingConnection connection) throws
StompException
+ {
ServerSession session = sessions.get(connection);
if (session == null)
{
throw new StompException("Not connected");
}
+ return session;
}
+
private StompFrame onDisconnect(StompFrame frame, HornetQServer server,
RemotingConnection connection) throws StompException
{
- checkConnected(connection);
-
- ServerSession session = sessions.get(connection);
+ ServerSession session = checkAndGetSession(connection);
if (session != null)
{
try
@@ -668,7 +702,7 @@
private StompFrame onSend(StompFrame frame, HornetQServer server,
RemotingConnection connection) throws HornetQException, StompException
{
- checkConnected(connection);
+ ServerSession session = checkAndGetSession(connection);
Map<String, Object> headers = frame.getHeaders();
String queue = (String)headers.get(Stomp.Headers.Send.DESTINATION);
@@ -703,7 +737,6 @@
message.getBodyBuffer().writeBytes(content);
}
- ServerSession session = sessions.get(connection);
SessionSendMessage packet = new SessionSendMessage(message, false);
session.handleSend(packet);
if (headers.containsKey(Stomp.Headers.RECEIPT_REQUESTED))
Modified:
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-01-20
17:07:39 UTC (rev 8816)
+++
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-01-20
17:11:18 UTC (rev 8817)
@@ -728,6 +728,10 @@
Stomp.NULL;
sendFrame(frame);
+ waitForFrameToTakeEffect();
+ // check the message is not committed
+ assertNull(consumer.receive(100));
+
frame =
"COMMIT\n" +
"transaction: tx1\n" +
@@ -899,7 +903,6 @@
int c = 0;
for (; ;) {
c = is.read();
- System.out.println(c);
if (c < 0) {
throw new IOException("socket closed.");
}