Author: gaohoward
Date: 2011-09-15 10:34:00 -0400 (Thu, 15 Sep 2011)
New Revision: 11351
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/HornetQStompException.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameV11.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompFrameFactoryV11.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
Log:
tests
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/HornetQStompException.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/HornetQStompException.java 2011-09-15
06:25:53 UTC (rev 11350)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/HornetQStompException.java 2011-09-15
14:34:00 UTC (rev 11351)
@@ -27,6 +27,7 @@
private List<Header> headers = new ArrayList<Header>(10);
private String body;
private VersionedStompFrameHandler handler;
+ private boolean disconnect;
public HornetQStompException(StompConnection connection, String msg)
{
@@ -85,6 +86,7 @@
frame = handler.createStompFrame("ERROR");
frame.addHeader("message", this.getMessage());
}
+ frame.setNeedsDisconnect(disconnect);
return frame;
}
@@ -99,4 +101,9 @@
this.val = val;
}
}
+
+ public void setDisconnect(boolean b)
+ {
+ disconnect = b;
+ }
}
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java 2011-09-15
06:25:53 UTC (rev 11350)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java 2011-09-15
14:34:00 UTC (rev 11351)
@@ -371,8 +371,6 @@
{
String acceptVersion = frame.getHeader(Stomp.Headers.ACCEPT_VERSION);
- log.error("----------------- acceptVersion: " + acceptVersion);
-
if (acceptVersion == null)
{
this.version = StompVersions.V1_0;
@@ -401,6 +399,7 @@
error.addHeader("version", acceptVersion);
error.addHeader("content-type", "text/plain");
error.setBody("Supported protocol version are " +
manager.getSupportedVersionsAsString());
+ error.setDisconnect(true);
throw error;
}
log.error("------------------ negotiated version is " +
this.version);
@@ -438,11 +437,11 @@
stompListener.requestAccepted(request);
}
+ String cmd = request.getCommand();
try
{
if (!initialized)
{
- String cmd = request.getCommand();
if ( ! (Stomp.Commands.CONNECT.equals(cmd) ||
Stomp.Commands.STOMP.equals(cmd)))
{
throw new HornetQStompException("Connection hasn't been
established.");
@@ -461,6 +460,11 @@
{
sendFrame(reply);
}
+
+ if (Stomp.Commands.DISCONNECT.equals(cmd))
+ {
+ this.disconnect();
+ }
}
public void sendFrame(StompFrame frame)
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java 2011-09-15
06:25:53 UTC (rev 11350)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java 2011-09-15
14:34:00 UTC (rev 11351)
@@ -115,6 +115,8 @@
public static final byte U = (byte)'U';
public static final byte N = (byte)'N';
+
+ public static final byte LN = (byte)'n';
public static final byte HEADER_SEPARATOR = (byte)':';
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java 2011-09-15
06:25:53 UTC (rev 11350)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java 2011-09-15
14:34:00 UTC (rev 11351)
@@ -177,11 +177,15 @@
public String getEscapedKey()
{
+ log.error("----------------key is : |" + key + "|");
+ log.error("----------------esc'd: |" + escape(key) +
"|");
return escape(key);
}
public String getEscapedValue()
{
+ log.error("----------------val is : |" + val + "|");
+ log.error("----------------esc'd v: |" + escape(val) +
"|");
return escape(val);
}
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java 2011-09-15
06:25:53 UTC (rev 11350)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java 2011-09-15
14:34:00 UTC (rev 11351)
@@ -134,10 +134,7 @@
return receipt;
}
- public StompFrame postprocess(StompFrame request)
- {
- return null;
- }
+ public abstract StompFrame postprocess(StompFrame request);
public abstract StompFrame createMessageFrame(ServerMessage serverMessage,
StompSubscription subscription, int deliveryCount) throws Exception;
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java 2011-09-15
06:25:53 UTC (rev 11350)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java 2011-09-15
14:34:00 UTC (rev 11351)
@@ -104,7 +104,6 @@
@Override
public StompFrame onDisconnect(StompFrame frame)
{
- connection.destroy();
return null;
}
@@ -398,5 +397,20 @@
// TODO Auto-generated method stub
}
+
+ @Override
+ public StompFrame postprocess(StompFrame request)
+ {
+ StompFrame response = null;
+ if (request.hasHeader(Stomp.Headers.RECEIPT_REQUESTED))
+ {
+ response = handleReceipt(request.getHeader(Stomp.Headers.RECEIPT_REQUESTED));
+ if (request.getCommand().equals(Stomp.Commands.DISCONNECT))
+ {
+ response.setNeedsDisconnect(true);
+ }
+ }
+ return response;
+ }
}
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2011-09-15
06:25:53 UTC (rev 11350)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2011-09-15
14:34:00 UTC (rev 11351)
@@ -387,7 +387,7 @@
StompSubscription subscription, int deliveryCount)
throws Exception
{
- StompFrame frame = new StompFrame(Stomp.Responses.MESSAGE);
+ StompFrame frame = new StompFrameV11(Stomp.Responses.MESSAGE);
if (subscription.getID() != null)
{
@@ -403,7 +403,7 @@
byte[] data = new byte[size];
if (serverMessage.containsProperty(Stomp.Headers.CONTENT_LENGTH) ||
serverMessage.getType() == Message.BYTES_TYPE)
{
- frame.addHeader(Stomp.Headers.CONTENT_LENGTH, String.valueOf(data.length));
+ frame.addHeader(Stomp.Headers.CONTENT_LENGTH, String.valueOf(data.length > 0
? (data.length - 1) : data.length));
buffer.readBytes(data);
}
else
@@ -417,7 +417,6 @@
{
data = new byte[0];
}
- frame.addHeader(Stomp.Headers.CONTENT_TYPE, "text/plain");
}
frame.setByteBody(data);
@@ -426,6 +425,8 @@
StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame,
deliveryCount);
+ log.error("-------------------- frame created: " + frame);
+
return frame;
}
@@ -888,6 +889,19 @@
break;
}
+ case StompDecoder.LN:
+ {
+ if (isEscaping)
+ {
+ holder.append(StompDecoder.NEW_LINE);
+ isEscaping = false;
+ }
+ else
+ {
+ holder.append(b);
+ }
+ break;
+ }
case StompDecoder.NEW_LINE:
{
if (decoder.whiteSpaceOnly)
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameV11.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameV11.java 2011-09-15
06:25:53 UTC (rev 11350)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameV11.java 2011-09-15
14:34:00 UTC (rev 11351)
@@ -71,6 +71,8 @@
}
// Add a newline to separate the headers from the content.
head.append(Stomp.NEWLINE);
+
+ log.error("------------------------_______now head: " + head);
buffer.writeBytes(head.toString().getBytes("UTF-8"));
if (bytesBody != null)
@@ -91,8 +93,12 @@
if (!headers.containsKey(key))
{
headers.put(key, val);
+ allHeaders.add(new Header(key, val));
}
- allHeaders.add(new Header(key, val));
+ else if (!key.equals(Stomp.Headers.CONTENT_LENGTH))
+ {
+ allHeaders.add(new Header(key, val));
+ }
}
Modified:
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java
===================================================================
---
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java 2011-09-15
06:25:53 UTC (rev 11350)
+++
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java 2011-09-15
14:34:00 UTC (rev 11351)
@@ -94,9 +94,15 @@
public void disconnect() throws IOException, InterruptedException
{
ClientStompFrame frame = factory.newFrame(DISCONNECT_COMMAND);
+ frame.addHeader("receipt", "1");
ClientStompFrame result = this.sendFrame(frame);
+ if (result == null || (!"RECEIPT".equals(result.getCommand())) ||
(!"1".equals(result.getHeader("receipt-id"))))
+ {
+ throw new IOException("Disconnect failed! " + result);
+ }
+
close();
connected = false;
Modified:
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompFrameFactoryV11.java
===================================================================
---
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompFrameFactoryV11.java 2011-09-15
06:25:53 UTC (rev 11350)
+++
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompFrameFactoryV11.java 2011-09-15
14:34:00 UTC (rev 11351)
@@ -12,8 +12,12 @@
*/
package org.hornetq.tests.integration.stomp.util;
+import java.io.UnsupportedEncodingException;
import java.util.StringTokenizer;
+import org.hornetq.core.protocol.stomp.HornetQStompException;
+import org.hornetq.core.protocol.stomp.StompDecoder;
+
/**
*
* @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
@@ -40,11 +44,17 @@
@Override
- public ClientStompFrame createFrame(String data)
+ public ClientStompFrame createFrame(final String data)
{
+ System.out.println("Data: |" + data + "|");
//split the string at "\n\n"
String[] dataFields = data.split("\n\n");
+ System.out.println("DataFields[0] |" + dataFields[0]);
+ if (dataFields.length > 1)
+ {
+ System.out.println("DataFields[1] |" + dataFields[1]);
+ }
StringTokenizer tokenizer = new StringTokenizer(dataFields[0], "\n");
String command = tokenizer.nextToken();
@@ -53,7 +63,8 @@
while (tokenizer.hasMoreTokens())
{
String header = tokenizer.nextToken();
- String[] fields = header.split(":");
+ System.out.println("header is: " + header);
+ String[] fields = splitHeader(header);
frame.addHeader(fields[0], fields[1]);
}
@@ -64,7 +75,111 @@
}
return frame;
}
+
+ //find true :
+ private String[] splitHeader(String header)
+ {
+ StringBuffer sbKey = new StringBuffer();
+ StringBuffer sbVal = new StringBuffer();
+ boolean isEsc = false;
+ boolean isKey = true;
+
+ for (int i = 0; i < header.length(); i++)
+ {
+ char b = header.charAt(i);
+ switch (b)
+ {
+ //escaping
+ case '\\':
+ {
+ if (isEsc)
+ {
+ //this is a backslash
+ if (isKey)
+ {
+ sbKey.append(b);
+ }
+ else
+ {
+ sbVal.append(b);
+ }
+ isEsc = false;
+ }
+ else
+ {
+ //begin escaping
+ isEsc = true;
+ }
+ break;
+ }
+ case ':':
+ {
+ if (isEsc)
+ {
+ if (isKey)
+ {
+ sbKey.append(b);
+ }
+ else
+ {
+ sbVal.append(b);
+ }
+ isEsc = false;
+ }
+ else
+ {
+ isKey = false;
+ }
+ break;
+ }
+ case 'n':
+ {
+ if (isEsc)
+ {
+ if (isKey)
+ {
+ sbKey.append('\n');
+ }
+ else
+ {
+ sbVal.append('\n');
+ }
+ isEsc = false;
+ }
+ else
+ {
+ if (isKey)
+ {
+ sbKey.append(b);
+ }
+ else
+ {
+ sbVal.append(b);
+ }
+ }
+ break;
+ }
+ default:
+ {
+ if (isKey)
+ {
+ sbKey.append(b);
+ }
+ else
+ {
+ sbVal.append(b);
+ }
+ }
+ }
+ }
+ String[] result = new String[2];
+ result[0] = sbKey.toString();
+ result[1] = sbVal.toString();
+
+ return result;
+ }
+
@Override
public ClientStompFrame newFrame(String command)
{
Modified:
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
===================================================================
---
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-09-15
06:25:53 UTC (rev 11350)
+++
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-09-15
14:34:00 UTC (rev 11351)
@@ -96,7 +96,7 @@
//reply headers: version, session, server
assertEquals(null, reply.getHeader("version"));
-
+
connV11.disconnect();
// case 2 accept-version=1.0, result: 1.0
@@ -164,7 +164,6 @@
System.out.println("Got error frame " + reply);
- connV11.disconnect();
}
public void testSendAndReceive() throws Exception
@@ -303,6 +302,54 @@
newConn.disconnect();
}
+ public void testHeaderEncoding() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+ ClientStompFrame frame = connV11.createFrame("SEND");
+
+ String body = "Hello World 1!";
+ String cLen = String.valueOf(body.getBytes("UTF-8").length);
+
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("content-type", "application/xml");
+ frame.addHeader("content-length", cLen);
+ String hKey = "special-header\\\\\\n\\:";
+ String hVal = "\\:\\\\\\ngood";
+ frame.addHeader(hKey, hVal);
+
+ System.out.println("key: |" + hKey + "| val: |" + hVal);
+
+ frame.setBody(body);
+
+ connV11.sendFrame(frame);
+
+ //subscribe
+ StompClientConnection newConn =
StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ newConn.connect(defUser, defPass);
+
+ ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
+ subFrame.addHeader("id", "a-sub");
+ subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+ subFrame.addHeader("ack", "auto");
+
+ newConn.sendFrame(subFrame);
+
+ frame = newConn.receiveFrame();
+
+ System.out.println("received " + frame);
+
+ assertEquals("MESSAGE", frame.getCommand());
+
+ String value = frame.getHeader("special-header" + "\\" +
"\n" + ":");
+
+ assertEquals(":" + "\\" + "\n" + "good",
value);
+
+ //unsub
+ ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
+ unsubFrame.addHeader("id", "a-sub");
+
+ newConn.disconnect();
+ }
}