Author: jmesnil
Date: 2010-01-21 10:40:24 -0500 (Thu, 21 Jan 2010)
New Revision: 8828
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompDestinationConverter.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-129: Implement STOMP v1.0
* pass testSubscribeWithAutoAck
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompDestinationConverter.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompDestinationConverter.java 2010-01-21
14:46:21 UTC (rev 8827)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompDestinationConverter.java 2010-01-21
15:40:24 UTC (rev 8828)
@@ -70,6 +70,35 @@
}
}
+ public static String toStomp(String address) throws HornetQException
+ {
+ if (address == null)
+ {
+ throw new HornetQException(HornetQException.ILLEGAL_STATE, "No destination
is specified!");
+ }
+ else if (address.startsWith(HornetQQueue.JMS_QUEUE_ADDRESS_PREFIX))
+ {
+ return "/queue/" +
address.substring(HornetQQueue.JMS_QUEUE_ADDRESS_PREFIX.length(), address.length());
+ }
+ else if (address.startsWith(HornetQTemporaryQueue.JMS_TEMP_QUEUE_ADDRESS_PREFIX))
+ {
+ return "/temp-queue/" +
address.substring(HornetQTemporaryQueue.JMS_TEMP_QUEUE_ADDRESS_PREFIX.length(),
address.length());
+ }
+ else if (address.startsWith(HornetQTopic.JMS_TOPIC_ADDRESS_PREFIX))
+ {
+ return "/topic/" +
address.substring(HornetQTopic.JMS_TOPIC_ADDRESS_PREFIX.length(), address.length());
+ }
+ else if (address.startsWith(HornetQTemporaryTopic.JMS_TEMP_TOPIC_ADDRESS_PREFIX))
+ {
+ return "/temp-topic/" +
address.substring(HornetQTemporaryTopic.JMS_TEMP_TOPIC_ADDRESS_PREFIX.length(),
address.length());
+ }
+ else
+ {
+ throw new HornetQException(HornetQException.ILLEGAL_STATE, "Illegal address
name: [" + address +
+ "] -- Acceptable
address must comply to JMS semantics");
+ }
+ }
+
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java 2010-01-21
14:46:21 UTC (rev 8827)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java 2010-01-21
15:40:24 UTC (rev 8828)
@@ -46,6 +46,7 @@
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.ServerSession;
+import org.hornetq.core.server.SessionCallback;
import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.core.server.management.Notification;
import org.hornetq.core.server.management.NotificationService;
@@ -325,9 +326,7 @@
else
{
ChannelPipelineSupport.addHornetQCodecFilter(pipeline, handler);
- pipeline.addLast("handler", new
HornetQServerChannelHandler(channelGroup,
- handler,
- new
Listener()));
+ pipeline.addLast("handler", new
HornetQServerChannelHandler(channelGroup, handler, new Listener()));
}
return pipeline;
@@ -595,9 +594,11 @@
else
{
log.error("Unsupported Stomp frame: " + frame);
- response = new StompFrame(Stomp.Responses.ERROR, new HashMap<String,
Object>(), ("Unsupported frame: " + command).getBytes());
+ response = new StompFrame(Stomp.Responses.ERROR,
+ new HashMap<String, Object>(),
+ ("Unsupported frame: " +
command).getBytes());
}
-
+
if (response != null)
{
System.out.println(">>> will reply " + response);
@@ -613,15 +614,15 @@
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos,
"UTF-8"));
ex.printStackTrace(stream);
- stream.append(Stomp.NULL + Stomp.NEWLINE);
stream.close();
Map<String, Object> headers = new HashMap<String, Object>();
headers.put(Stomp.Headers.Error.MESSAGE, e.getMessage());
- final String receiptId = (String)
frame.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
- if (receiptId != null) {
- headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
+ final String receiptId =
(String)frame.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
+ if (receiptId != null)
+ {
+ headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
}
StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR, headers,
baos.toByteArray());
@@ -645,7 +646,9 @@
* @throws StompException
* @throws HornetQException
*/
- private StompFrame onSubscribe(StompFrame frame, HornetQServer server,
RemotingConnection connection) throws Exception, StompException, HornetQException
+ private StompFrame onSubscribe(StompFrame frame, HornetQServer server,
RemotingConnection connection) throws Exception,
+
StompException,
+
HornetQException
{
Map<String, Object> headers = frame.getHeaders();
String queue = (String)headers.get(Stomp.Headers.Send.DESTINATION);
@@ -669,7 +672,7 @@
}
return session;
}
-
+
private StompFrame onDisconnect(StompFrame frame, HornetQServer server,
RemotingConnection connection) throws StompException
{
ServerSession session = checkAndGetSession(connection);
@@ -691,7 +694,7 @@
private StompFrame onSend(StompFrame frame, HornetQServer server,
RemotingConnection connection) throws Exception
{
ServerSession session = checkAndGetSession(connection);
-
+
Map<String, Object> headers = frame.getHeaders();
String queue = (String)headers.get(Stomp.Headers.Send.DESTINATION);
/*
@@ -738,7 +741,7 @@
}
}
- private StompFrame onConnect(StompFrame frame, HornetQServer server,
CoreRemotingConnection connection) throws Exception
+ private StompFrame onConnect(StompFrame frame, HornetQServer server, final
CoreRemotingConnection connection) throws Exception
{
Map<String, Object> headers = frame.getHeaders();
String login = (String)headers.get(Stomp.Headers.Connect.LOGIN);
@@ -756,6 +759,62 @@
false,
false);
ServerSession session = server.getSession(name);
+ session.setCallback(new SessionCallback()
+ {
+ public void sendProducerCreditsMessage(int credits, SimpleString address, int
offset)
+ {
+ }
+
+ public int sendMessage(ServerMessage serverMessage, long consumerID, int
deliveryCount)
+ {
+ try
+ {
+ Map<String, Object> headers = new HashMap<String,
Object>();
+ headers.put(Stomp.Headers.Message.DESTINATION,
+
StompDestinationConverter.toStomp(serverMessage.getAddress().toString()));
+ byte[] data = new byte[] {};
+ if (serverMessage.getType() == HornetQTextMessage.TYPE)
+ {
+ SimpleString text =
serverMessage.getBodyBuffer().readNullableSimpleString();
+ if (text != null)
+ {
+ data = text.toString().getBytes();
+ }
+ }
+ StompFrame msg = new StompFrame(Stomp.Responses.MESSAGE, headers,
data);
+ System.out.println("SENDING : " + msg);
+ byte[] bytes = marshaller.marshal(msg);
+ HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
+ connection.getTransportConnection().write(buffer, true);
+
+ return bytes.length;
+
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ return 0;
+ }
+
+ }
+
+ public int sendLargeMessageContinuation(long consumerID,
+ byte[] body,
+ boolean continues,
+ boolean requiresResponse)
+ {
+ return 0;
+ }
+
+ public int sendLargeMessage(long consumerID, byte[] headerBuffer, long
bodySize, int deliveryCount)
+ {
+ return 0;
+ }
+
+ public void closed()
+ {
+ }
+ });
sessions.put(connection, session);
System.out.println(">>> created session " + session);
HashMap<String, Object> h = new HashMap<String, Object>();
Modified:
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-01-21
14:46:21 UTC (rev 8827)
+++
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-01-21
15:40:24 UTC (rev 8828)
@@ -341,6 +341,8 @@
frame = receiveFrame(10000);
Assert.assertTrue(frame.startsWith("MESSAGE"));
+ Assert.assertTrue(frame.indexOf("destination:") > 0);
+ Assert.assertTrue(frame.indexOf(getName()) > 0);
frame =
"DISCONNECT\n" +
@@ -408,7 +410,7 @@
MessageProducer producer = session.createProducer(queue);
TextMessage message = session.createTextMessage("Hello World");
- message.setStringProperty("s", "value");
+ message.setStringProperty("S", "value");
message.setBooleanProperty("n", false);
message.setByteProperty("byte", (byte) 9);
message.setDoubleProperty("d", 2.0);
@@ -420,6 +422,15 @@
frame = receiveFrame(10000);
Assert.assertTrue(frame.startsWith("MESSAGE"));
+ Assert.assertTrue(frame.indexOf("S:") > 0);
+ Assert.assertTrue(frame.indexOf("n:") > 0);
+ Assert.assertTrue(frame.indexOf("byte:") > 0);
+ Assert.assertTrue(frame.indexOf("d:") > 0);
+ Assert.assertTrue(frame.indexOf("f:") > 0);
+ Assert.assertTrue(frame.indexOf("i:") > 0);
+ Assert.assertTrue(frame.indexOf("l:") > 0);
+ Assert.assertTrue(frame.indexOf("s:") > 0);
+ Assert.assertTrue(frame.indexOf("Hello World") > 0);
// System.out.println("out: "+frame);
@@ -908,6 +919,11 @@
}
else if (c == 0) {
c = is.read();
+ if (c != '\n')
+ {
+ byte[] ba = inputBuffer.toByteArray();
+ System.out.println(new String(ba, "UTF-8"));
+ }
Assert.assertEquals("Expecting stomp frame to terminate with
\0\n", c, '\n');
byte[] ba = inputBuffer.toByteArray();
inputBuffer.reset();