Author: gaohoward
Date: 2011-09-15 02:25:53 -0400 (Thu, 15 Sep 2011)
New Revision: 11350
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
Log:
test
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java 2011-09-14
14:25:20 UTC (rev 11349)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java 2011-09-15
06:25:53 UTC (rev 11350)
@@ -86,6 +86,8 @@
StompFrame frame = connection.createStompMessage(serverMessage, subscription,
deliveryCount);
+ log.error("--------------lllll- Sending frame: " + frame);
+
if (subscription.getAck().equals(Stomp.Headers.Subscribe.AckModeValues.AUTO))
{
session.acknowledge(consumerID, serverMessage.getMessageID());
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2011-09-14
14:25:20 UTC (rev 11349)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2011-09-15
06:25:53 UTC (rev 11350)
@@ -417,9 +417,11 @@
{
data = new byte[0];
}
+ frame.addHeader(Stomp.Headers.CONTENT_TYPE, "text/plain");
}
- frame.addHeader(Stomp.Headers.CONTENT_TYPE, "text/plain");
+ frame.setByteBody(data);
+
serverMessage.getBodyBuffer().resetReaderIndex();
StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame,
deliveryCount);
@@ -961,11 +963,27 @@
}
else
{
- content = new byte[decoder.contentLength];
+ content = new byte[decoder.contentLength + 1];
System.arraycopy(decoder.workingBuffer, decoder.pos, content, 0,
decoder.contentLength);
decoder.pos += decoder.contentLength + 1;
+
+ content[decoder.contentLength] = 0;
+
+ //drain all the rest
+ if (decoder.bodyStart == -1)
+ {
+ decoder.bodyStart = decoder.pos;
+ }
+
+ while (decoder.pos < decoder.data)
+ {
+ if (decoder.workingBuffer[decoder.pos++] == 0)
+ {
+ break;
+ }
+ }
}
}
else
Modified:
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
===================================================================
---
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-09-14
14:25:20 UTC (rev 11349)
+++
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-09-15
06:25:53 UTC (rev 11350)
@@ -224,8 +224,85 @@
unsubFrame.addHeader("id", "a-sub");
newConn.disconnect();
+ }
+
+ public void testHeaderContentType() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+ ClientStompFrame frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("content-type", "application/xml");
+ frame.setBody("Hello World 1!");
+ connV11.sendFrame(frame);
+
+ //subscribe
+ StompClientConnection newConn =
StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ newConn.connect(defUser, defPass);
+
+ ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
+ subFrame.addHeader("id", "a-sub");
+ subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+ subFrame.addHeader("ack", "auto");
+
+ newConn.sendFrame(subFrame);
+
+ frame = newConn.receiveFrame();
+
+ System.out.println("received " + frame);
+
+ assertEquals("MESSAGE", frame.getCommand());
+
+ assertEquals("application/xml",
frame.getHeader("content-type"));
+
+ //unsub
+ ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
+ unsubFrame.addHeader("id", "a-sub");
+
+ newConn.disconnect();
}
+
+ public void testHeaderContentLength() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+ ClientStompFrame frame = connV11.createFrame("SEND");
+
+ String body = "Hello World 1!";
+ String cLen = String.valueOf(body.getBytes("UTF-8").length);
+
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("content-type", "application/xml");
+ frame.addHeader("content-length", cLen);
+ frame.setBody(body + "extra");
+
+ connV11.sendFrame(frame);
+
+ //subscribe
+ StompClientConnection newConn =
StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ newConn.connect(defUser, defPass);
+
+ ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
+ subFrame.addHeader("id", "a-sub");
+ subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+ subFrame.addHeader("ack", "auto");
+
+ newConn.sendFrame(subFrame);
+
+ frame = newConn.receiveFrame();
+
+ System.out.println("received " + frame);
+
+ assertEquals("MESSAGE", frame.getCommand());
+
+ assertEquals(cLen, frame.getHeader("content-length"));
+
+ //unsub
+ ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
+ unsubFrame.addHeader("id", "a-sub");
+
+ newConn.disconnect();
+ }
+
}