Author: gaohoward
Date: 2011-09-08 11:35:08 -0400 (Thu, 08 Sep 2011)
New Revision: 11305
Added:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/SimpleBytes.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameV10.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameV11.java
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
Log:
char escaping
Added:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/SimpleBytes.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/SimpleBytes.java
(rev 0)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/SimpleBytes.java 2011-09-08
15:35:08 UTC (rev 11305)
@@ -0,0 +1,44 @@
+package org.hornetq.core.protocol.stomp;
+
+import java.io.UnsupportedEncodingException;
+
+
+public class SimpleBytes
+{
+ private int step;
+ private byte[] contents;
+ private int index;
+
+ public SimpleBytes(int initCapacity)
+ {
+ this.step = initCapacity;
+ contents = new byte[initCapacity];
+ index = 0;
+ }
+
+ public String getString() throws UnsupportedEncodingException
+ {
+ if (index == 0) return "";
+ byte[] realData = new byte[index];
+ System.arraycopy(contents, 0, realData, 0, realData.length);
+
+ return new String(realData, "UTF-8");
+ }
+
+ public void reset()
+ {
+ index = 0;
+ }
+
+ public void append(byte b)
+ {
+ if (index >= contents.length)
+ {
+ //grow
+ byte[] newBuffer = new byte[contents.length + step];
+ System.arraycopy(contents, 0, newBuffer, 0, contents.length);
+ contents = newBuffer;
+ }
+ contents[index++] = b;
+ }
+}
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java 2011-09-08
15:06:11 UTC (rev 11304)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java 2011-09-08
15:35:08 UTC (rev 11305)
@@ -61,7 +61,7 @@
private final long creationTime;
- private StompDecoder decoder = new StompDecoder();
+ private StompDecoder decoder;
private final List<FailureListener> failureListeners = new
CopyOnWriteArrayList<FailureListener>();
@@ -90,6 +90,8 @@
this.manager = manager;
+ this.decoder = new StompDecoder(this);
+
this.creationTime = System.currentTimeMillis();
}
@@ -697,4 +699,9 @@
}
}
+
+ public VersionedStompFrameHandler getFrameHandler()
+ {
+ return this.frameHandler;
+ }
}
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java 2011-09-08
15:06:11 UTC (rev 11304)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java 2011-09-08
15:35:08 UTC (rev 11305)
@@ -30,118 +30,131 @@
{
private static final Logger log = Logger.getLogger(StompDecoder.class);
- private static final boolean TRIM_LEADING_HEADER_VALUE_WHITESPACE = true;
+ public static final boolean TRIM_LEADING_HEADER_VALUE_WHITESPACE = true;
- private static final String COMMAND_ABORT = "ABORT";
+ public static final String COMMAND_ABORT = "ABORT";
- private static final int COMMAND_ABORT_LENGTH = COMMAND_ABORT.length();
+ public static final int COMMAND_ABORT_LENGTH = COMMAND_ABORT.length();
- private static final String COMMAND_ACK = "ACK";
+ public static final String COMMAND_ACK = "ACK";
- private static final int COMMAND_ACK_LENGTH = COMMAND_ACK.length();
+ public static final int COMMAND_ACK_LENGTH = COMMAND_ACK.length();
- private static final String COMMAND_BEGIN = "BEGIN";
+ public static final String COMMAND_NACK = "NACK";
- private static final int COMMAND_BEGIN_LENGTH = COMMAND_BEGIN.length();
+ public static final int COMMAND_NACK_LENGTH = COMMAND_NACK.length();
- private static final String COMMAND_COMMIT = "COMMIT";
+ public static final String COMMAND_BEGIN = "BEGIN";
- private static final int COMMAND_COMMIT_LENGTH = COMMAND_COMMIT.length();
+ public static final int COMMAND_BEGIN_LENGTH = COMMAND_BEGIN.length();
- private static final String COMMAND_CONNECT = "CONNECT";
+ public static final String COMMAND_COMMIT = "COMMIT";
- private static final int COMMAND_CONNECT_LENGTH = COMMAND_CONNECT.length();
+ public static final int COMMAND_COMMIT_LENGTH = COMMAND_COMMIT.length();
- private static final String COMMAND_DISCONNECT = "DISCONNECT";
+ public static final String COMMAND_CONNECT = "CONNECT";
- private static final int COMMAND_DISCONNECT_LENGTH = COMMAND_DISCONNECT.length();
+ public static final int COMMAND_CONNECT_LENGTH = COMMAND_CONNECT.length();
- private static final String COMMAND_SEND = "SEND";
+ public static final String COMMAND_DISCONNECT = "DISCONNECT";
- private static final int COMMAND_SEND_LENGTH = COMMAND_SEND.length();
+ public static final int COMMAND_DISCONNECT_LENGTH = COMMAND_DISCONNECT.length();
- private static final String COMMAND_SUBSCRIBE = "SUBSCRIBE";
+ public static final String COMMAND_SEND = "SEND";
- private static final int COMMAND_SUBSCRIBE_LENGTH = COMMAND_SUBSCRIBE.length();
+ public static final int COMMAND_SEND_LENGTH = COMMAND_SEND.length();
- private static final String COMMAND_UNSUBSCRIBE = "UNSUBSCRIBE";
+ public static final String COMMAND_STOMP = "STOMP";
- private static final int COMMAND_UNSUBSCRIBE_LENGTH = COMMAND_UNSUBSCRIBE.length();
+ public static final int COMMAND_STOMP_LENGTH = COMMAND_STOMP.length();
+ public static final String COMMAND_SUBSCRIBE = "SUBSCRIBE";
+
+ public static final int COMMAND_SUBSCRIBE_LENGTH = COMMAND_SUBSCRIBE.length();
+
+ public static final String COMMAND_UNSUBSCRIBE = "UNSUBSCRIBE";
+
+ public static final int COMMAND_UNSUBSCRIBE_LENGTH = COMMAND_UNSUBSCRIBE.length();
+
/**** added by meddy, 27 april 2011, handle header parser for reply to websocket
protocol ****/
- private static final String COMMAND_CONNECTED = "CONNECTED";
+ public static final String COMMAND_CONNECTED = "CONNECTED";
- private static final int COMMAND_CONNECTED_LENGTH = COMMAND_CONNECTED.length();
+ public static final int COMMAND_CONNECTED_LENGTH = COMMAND_CONNECTED.length();
- private static final String COMMAND_MESSAGE = "MESSAGE";
+ public static final String COMMAND_MESSAGE = "MESSAGE";
- private static final int COMMAND_MESSAGE_LENGTH = COMMAND_MESSAGE.length();
+ public static final int COMMAND_MESSAGE_LENGTH = COMMAND_MESSAGE.length();
- private static final String COMMAND_ERROR = "ERROR";
+ public static final String COMMAND_ERROR = "ERROR";
- private static final int COMMAND_ERROR_LENGTH = COMMAND_ERROR.length();
+ public static final int COMMAND_ERROR_LENGTH = COMMAND_ERROR.length();
- private static final String COMMAND_RECEIPT = "RECEIPT";
+ public static final String COMMAND_RECEIPT = "RECEIPT";
- private static final int COMMAND_RECEIPT_LENGTH = COMMAND_RECEIPT.length();
+ public static final int COMMAND_RECEIPT_LENGTH = COMMAND_RECEIPT.length();
/**** end ****/
- private static final byte A = (byte)'A';
+ public static final byte A = (byte)'A';
- private static final byte B = (byte)'B';
+ public static final byte B = (byte)'B';
- private static final byte C = (byte)'C';
+ public static final byte C = (byte)'C';
- private static final byte D = (byte)'D';
+ public static final byte D = (byte)'D';
- private static final byte E = (byte)'E';
+ public static final byte E = (byte)'E';
- private static final byte M = (byte)'M';
+ public static final byte M = (byte)'M';
- private static final byte S = (byte)'S';
+ public static final byte S = (byte)'S';
- private static final byte R = (byte)'R';
+ public static final byte R = (byte)'R';
- private static final byte U = (byte)'U';
+ public static final byte U = (byte)'U';
- private static final byte HEADER_SEPARATOR = (byte)':';
+ public static final byte N = (byte)'N';
- private static final byte NEW_LINE = (byte)'\n';
+ public static final byte HEADER_SEPARATOR = (byte)':';
- private static final byte SPACE = (byte)' ';
+ public static final byte NEW_LINE = (byte)'\n';
- private static final byte TAB = (byte)'\t';
+ public static final byte SPACE = (byte)' ';
- private static String CONTENT_LENGTH_HEADER_NAME = "content-length";
+ public static final byte TAB = (byte)'\t';
- private byte[] workingBuffer = new byte[1024];
+ public static String CONTENT_LENGTH_HEADER_NAME = "content-length";
- private int pos;
+ public byte[] workingBuffer = new byte[1024];
- private int data;
+ public int pos;
- private String command;
+ public int data;
- private Map<String, Object> headers;
+ public String command;
- private int headerBytesCopyStart;
+ public Map<String, String> headers;
- private boolean readingHeaders;
+ public int headerBytesCopyStart;
- private boolean headerValueWhitespace;
+ public boolean readingHeaders;
- private boolean inHeaderName;
+ public boolean headerValueWhitespace;
- private String headerName;
+ public boolean inHeaderName;
- private boolean whiteSpaceOnly;
+ public String headerName;
- private int contentLength;
+ public boolean whiteSpaceOnly;
- private int bodyStart;
+ public int contentLength;
- public StompDecoder()
+ public int bodyStart;
+
+ public StompConnection connection;
+
+ public StompDecoder(StompConnection stompConnection)
{
+ this.connection = stompConnection;
init();
}
@@ -156,13 +169,25 @@
* followed by an empty line
* followed by an optional message body
* terminated with a null character
+ *
+ * Note: to support both 1.0 and 1.1, we just assemble a
+ * standard StompFrame and let the versioned handler to do more
+ * spec specific job (like trimming, escaping etc).
*/
public synchronized StompFrame decode(final HornetQBuffer buffer) throws Exception
{
- //log.info("got buff " + buffer.readableBytes());
+ if (connection.isValid())
+ {
+ VersionedStompFrameHandler handler = connection.getFrameHandler();
+ return handler.decode(this, buffer);
+ }
- long start = System.nanoTime();
-
+ return defaultDecode(buffer);
+ }
+
+ public StompFrame defaultDecode(final HornetQBuffer buffer) throws
HornetQStompException
+ {
+
int readable = buffer.readableBytes();
if (data + readable >= workingBuffer.length)
@@ -375,8 +400,6 @@
throwInvalid();
}
}
-
- long commandTime = System.nanoTime() - start;
if (readingHeaders)
{
@@ -482,8 +505,6 @@
}
}
}
-
- long headersTime = System.nanoTime() - start - commandTime;
// Now the body
@@ -526,8 +547,6 @@
}
}
-
-
if (content != null)
{
if (data > pos)
@@ -546,34 +565,26 @@
StompFrame ret = new StompFrame(command, headers, content);
init();
-
- // log.info("decoded");
-
- long bodyTime = System.nanoTime() - start - headersTime - commandTime;
-
- // log.info("command: "+ commandTime + " headers: " +
headersTime + " body: " + bodyTime);
return ret;
}
else
{
return null;
- }
+ }
}
- private void throwInvalid() throws StompException
+ public void throwInvalid() throws HornetQStompException
{
- throw new StompException("Invalid STOMP frame: " +
this.dumpByteArray(workingBuffer));
+ throw new HornetQStompException("Invalid STOMP frame: " +
this.dumpByteArray(workingBuffer));
}
- private void init()
+ public void init()
{
pos = 0;
command = null;
- headers = new HashMap<String, Object>();
-
this.headerBytesCopyStart = -1;
readingHeaders = true;
@@ -591,7 +602,7 @@
bodyStart = -1;
}
- private void resizeWorking(final int newSize)
+ public void resizeWorking(final int newSize)
{
byte[] oldBuffer = workingBuffer;
@@ -600,7 +611,7 @@
System.arraycopy(oldBuffer, 0, workingBuffer, 0, oldBuffer.length);
}
- private boolean tryIncrement(final int length)
+ public boolean tryIncrement(final int length)
{
if (pos + length >= data)
{
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java 2011-09-08
15:06:11 UTC (rev 11304)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java 2011-09-08
15:35:08 UTC (rev 11305)
@@ -51,6 +51,8 @@
private List<Header> allHeaders = new ArrayList<Header>();
private String body;
+
+ private byte[] bytesBody;
private HornetQBuffer buffer = null;
@@ -70,6 +72,14 @@
this.disconnect = disconnect;
}
+ public StompFrame(String command, Map<String, String> headers,
+ byte[] content)
+ {
+ this.command = command;
+ this.headers = headers;
+ this.bytesBody = content;
+ }
+
public String getCommand()
{
return command;
@@ -107,7 +117,7 @@
{
if (buffer == null)
{
- buffer = HornetQBuffers.dynamicBuffer(content.length + 512);
+ buffer = HornetQBuffers.dynamicBuffer(bytesBody.length + 512);
StringBuffer head = new StringBuffer();
head.append(command);
@@ -124,7 +134,7 @@
head.append(Stomp.NEWLINE);
buffer.writeBytes(head.toString().getBytes("UTF-8"));
- buffer.writeBytes(content);
+ buffer.writeBytes(bytesBody);
buffer.writeBytes(END_OF_FRAME);
size = buffer.writerIndex();
@@ -151,7 +161,7 @@
return headers;
}
- private class Header
+ public static class Header
{
public String key;
public String val;
@@ -192,4 +202,14 @@
{
return disconnect;
}
+
+ public List<Header> getHeaders()
+ {
+ return this.allHeaders;
+ }
+
+ public void setByteBody(byte[] content)
+ {
+ this.bytesBody = content;
+ }
}
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java 2011-09-08
15:06:11 UTC (rev 11304)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java 2011-09-08
15:35:08 UTC (rev 11305)
@@ -14,6 +14,7 @@
import java.io.UnsupportedEncodingException;
+import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.core.protocol.stomp.v10.StompFrameHandlerV10;
import org.hornetq.core.protocol.stomp.v11.StompFrameHandlerV11;
import org.hornetq.core.server.ServerMessage;
@@ -121,11 +122,23 @@
public StompFrame handleReceipt(String receiptID)
{
StompFrame receipt = new StompFrame(Stomp.Responses.RECEIPT);
- receipt.addHeader(Stomp.Headers.Response.RECEIPT_ID, receiptID);
+ try
+ {
+ receipt.addHeader(Stomp.Headers.Response.RECEIPT_ID, receiptID);
+ }
+ catch (HornetQStompException e)
+ {
+ return e.getFrame();
+ }
return receipt;
}
public abstract StompFrame createMessageFrame(ServerMessage serverMessage,
StompSubscription subscription, int deliveryCount) throws Exception;
+
+ public abstract StompFrame createStompFrame(String command);
+
+ public abstract StompFrame decode(StompDecoder decoder, final HornetQBuffer buffer)
throws HornetQStompException;
+
}
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java 2011-09-08
15:06:11 UTC (rev 11304)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java 2011-09-08
15:35:08 UTC (rev 11305)
@@ -13,6 +13,7 @@
package org.hornetq.core.protocol.stomp.v10;
import java.io.UnsupportedEncodingException;
+import java.util.List;
import java.util.Map;
import org.hornetq.api.core.HornetQBuffer;
@@ -23,7 +24,9 @@
import org.hornetq.core.protocol.stomp.HornetQStompException;
import org.hornetq.core.protocol.stomp.Stomp;
import org.hornetq.core.protocol.stomp.StompConnection;
+import org.hornetq.core.protocol.stomp.StompDecoder;
import org.hornetq.core.protocol.stomp.StompFrame;
+import org.hornetq.core.protocol.stomp.StompFrame.Header;
import org.hornetq.core.protocol.stomp.StompSubscription;
import org.hornetq.core.protocol.stomp.StompUtils;
import org.hornetq.core.protocol.stomp.VersionedStompFrameHandler;
@@ -345,4 +348,15 @@
}
+ @Override
+ public StompFrame createStompFrame(String command)
+ {
+ return new StompFrameV10(command);
+ }
+
+ public StompFrame decode(StompDecoder decoder, final HornetQBuffer buffer) throws
HornetQStompException
+ {
+ return decoder.defaultDecode(buffer);
+ }
+
}
Added:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameV10.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameV10.java
(rev 0)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameV10.java 2011-09-08
15:35:08 UTC (rev 11305)
@@ -0,0 +1,22 @@
+package org.hornetq.core.protocol.stomp.v10;
+
+import org.hornetq.core.protocol.stomp.HornetQStompException;
+import org.hornetq.core.protocol.stomp.StompFrame;
+
+public class StompFrameV10 extends StompFrame
+{
+ public StompFrameV10(String command)
+ {
+ super(command);
+ }
+
+ @Override
+ public void addHeader(String key, String val) throws HornetQStompException
+ {
+ //trimming
+ String newKey = key.trim();
+ String newVal = val.trim();
+ super.addHeader(newKey, newVal);
+ }
+
+}
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2011-09-08
15:06:11 UTC (rev 11304)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2011-09-08
15:35:08 UTC (rev 11305)
@@ -12,6 +12,7 @@
*/
package org.hornetq.core.protocol.stomp.v11;
+import java.io.UnsupportedEncodingException;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
@@ -24,6 +25,7 @@
import org.hornetq.core.protocol.stomp.HornetQStompException;
import org.hornetq.core.protocol.stomp.Stomp;
import org.hornetq.core.protocol.stomp.StompConnection;
+import org.hornetq.core.protocol.stomp.StompDecoder;
import org.hornetq.core.protocol.stomp.StompFrame;
import org.hornetq.core.protocol.stomp.StompSubscription;
import org.hornetq.core.protocol.stomp.StompUtils;
@@ -40,6 +42,8 @@
{
private static final Logger log = Logger.getLogger(StompFrameHandlerV11.class);
+ private static final char ESC_CHAR = '\\';
+
private HeartBeater heartBeater;
public StompFrameHandlerV11(StompConnection connection)
@@ -538,4 +542,436 @@
}
}
+ @Override
+ public StompFrame createStompFrame(String command)
+ {
+ return new StompFrameV11(command);
+ }
+
+ //all frame except CONNECT are decoded here.
+ public StompFrame decode(StompDecoder decoder, final HornetQBuffer buffer) throws
HornetQStompException
+ {
+ int readable = buffer.readableBytes();
+
+ if (decoder.data + readable >= decoder.workingBuffer.length)
+ {
+ decoder.resizeWorking(decoder.data + readable);
+ }
+
+ buffer.readBytes(decoder.workingBuffer, decoder.data, readable);
+
+ decoder.data += readable;
+
+ if (decoder.command == null)
+ {
+ if (decoder.data < 4)
+ {
+ // Need at least four bytes to identify the command
+ // - up to 3 bytes for the command name + potentially another byte for a
leading \n
+
+ return null;
+ }
+
+ int offset;
+
+ if (decoder.workingBuffer[0] == StompDecoder.NEW_LINE)
+ {
+ // Yuck, some badly behaved STOMP clients add a \n *after* the terminating
NUL char at the end of the
+ // STOMP
+ // frame this can manifest as an extra \n at the beginning when the next
STOMP frame is read - we need to
+ // deal
+ // with this
+ offset = 1;
+ }
+ else
+ {
+ offset = 0;
+ }
+
+ byte b = decoder.workingBuffer[offset];
+
+ switch (b)
+ {
+ case StompDecoder.A:
+ {
+ if (decoder.workingBuffer[offset + 1] == StompDecoder.B)
+ {
+ if (!decoder.tryIncrement(offset + StompDecoder.COMMAND_ABORT_LENGTH +
1))
+ {
+ return null;
+ }
+
+ // ABORT
+ decoder.command = StompDecoder.COMMAND_ABORT;
+ }
+ else
+ {
+ if (!decoder.tryIncrement(offset + StompDecoder.COMMAND_ACK_LENGTH +
1))
+ {
+ return null;
+ }
+
+ // ACK
+ decoder.command = StompDecoder.COMMAND_ACK;
+ }
+ break;
+ }
+ case StompDecoder.B:
+ {
+ if (!decoder.tryIncrement(offset + StompDecoder.COMMAND_BEGIN_LENGTH +
1))
+ {
+ return null;
+ }
+
+ // BEGIN
+ decoder.command = StompDecoder.COMMAND_BEGIN;
+
+ break;
+ }
+ case StompDecoder.C:
+ {
+ if (decoder.workingBuffer[offset + 2] == StompDecoder.M)
+ {
+ if (!decoder.tryIncrement(offset + StompDecoder.COMMAND_COMMIT_LENGTH +
1))
+ {
+ return null;
+ }
+
+ // COMMIT
+ decoder.command = StompDecoder.COMMAND_COMMIT;
+ }
+ /**** added by meddy, 27 april 2011, handle header parser for reply to
websocket protocol ****/
+ else if (decoder.workingBuffer[offset+7] == StompDecoder.E)
+ {
+ if (!decoder.tryIncrement(offset +
StompDecoder.COMMAND_CONNECTED_LENGTH + 1))
+ {
+ return null;
+ }
+
+ // CONNECTED
+ decoder.command = StompDecoder.COMMAND_CONNECTED;
+ }
+ /**** end ****/
+ else
+ {
+ if (!decoder.tryIncrement(offset + StompDecoder.COMMAND_CONNECT_LENGTH
+ 1))
+ {
+ return null;
+ }
+
+ // CONNECT
+ decoder.command = StompDecoder.COMMAND_CONNECT;
+ }
+ break;
+ }
+ case StompDecoder.D:
+ {
+ if (!decoder.tryIncrement(offset + StompDecoder.COMMAND_DISCONNECT_LENGTH
+ 1))
+ {
+ return null;
+ }
+
+ // DISCONNECT
+ decoder.command = StompDecoder.COMMAND_DISCONNECT;
+
+ break;
+ }
+ case StompDecoder.R:
+ {
+ if (!decoder.tryIncrement(offset + StompDecoder.COMMAND_RECEIPT_LENGTH +
1))
+ {
+ return null;
+ }
+
+ // RECEIPT
+ decoder.command = StompDecoder.COMMAND_RECEIPT;
+
+ break;
+ }
+ /**** added by meddy, 27 april 2011, handle header parser for reply to
websocket protocol ****/
+ case StompDecoder.E:
+ {
+ if (!decoder.tryIncrement(offset + StompDecoder.COMMAND_ERROR_LENGTH +
1))
+ {
+ return null;
+ }
+
+ // ERROR
+ decoder.command = StompDecoder.COMMAND_ERROR;
+
+ break;
+ }
+ case StompDecoder.M:
+ {
+ if (!decoder.tryIncrement(offset + StompDecoder.COMMAND_MESSAGE_LENGTH +
1))
+ {
+ return null;
+ }
+
+ // MESSAGE
+ decoder.command = StompDecoder.COMMAND_MESSAGE;
+
+ break;
+ }
+ /**** end ****/
+ case StompDecoder.S:
+ {
+ if (decoder.workingBuffer[offset + 1] == StompDecoder.E)
+ {
+ if (!decoder.tryIncrement(offset + StompDecoder.COMMAND_SEND_LENGTH +
1))
+ {
+ return null;
+ }
+
+ // SEND
+ decoder.command = StompDecoder.COMMAND_SEND;
+ }
+ else if (decoder.workingBuffer[offset + 1] == StompDecoder.U)
+ {
+ if (!decoder.tryIncrement(offset +
StompDecoder.COMMAND_SUBSCRIBE_LENGTH + 1))
+ {
+ return null;
+ }
+
+ // SUBSCRIBE
+ decoder.command = StompDecoder.COMMAND_SUBSCRIBE;
+ }
+ else
+ {
+ if (!decoder.tryIncrement(offset + StompDecoder.COMMAND_STOMP_LENGTH +
1))
+ {
+ return null;
+ }
+
+ // SUBSCRIBE
+ decoder.command = StompDecoder.COMMAND_STOMP;
+ }
+ break;
+ }
+ case StompDecoder.U:
+ {
+ if (!decoder.tryIncrement(offset + StompDecoder.COMMAND_UNSUBSCRIBE_LENGTH
+ 1))
+ {
+ return null;
+ }
+
+ // UNSUBSCRIBE
+ decoder.command = StompDecoder.COMMAND_UNSUBSCRIBE;
+
+ break;
+ }
+ case StompDecoder.N:
+ {
+ if (!decoder.tryIncrement(offset + StompDecoder.COMMAND_NACK_LENGTH + 1))
+ {
+ return null;
+ }
+ //NACK
+ decoder.command = StompDecoder.COMMAND_NACK;
+ break;
+ }
+ default:
+ {
+ decoder.throwInvalid();
+ }
+ }
+
+ // Sanity check
+
+ if (decoder.workingBuffer[decoder.pos - 1] != StompDecoder.NEW_LINE)
+ {
+ decoder.throwInvalid();
+ }
+ }
+
+ if (decoder.readingHeaders)
+ {
+ if (decoder.headerBytesCopyStart == -1)
+ {
+ decoder.headerBytesCopyStart = decoder.pos;
+ }
+
+ // Now the headers
+
+ boolean isEscaping = false;
+ SimpleBytes holder = new SimpleBytes(1024);
+
+ outer: while (true)
+ {
+ byte b = decoder.workingBuffer[decoder.pos++];
+
+ switch (b)
+ {
+ //escaping
+ case ESC_CHAR:
+ {
+ if (isEscaping)
+ {
+ //this is a backslash
+ holder.append(b);
+ isEscaping = false;
+ }
+ else
+ {
+ //begin escaping
+ isEscaping = true;
+ }
+ break;
+ }
+ case StompDecoder.HEADER_SEPARATOR:
+ {
+ if (isEscaping)
+ {
+ //a colon
+ holder.append(b);
+ isEscaping = false;
+ }
+ else
+ {
+ if (decoder.inHeaderName)
+ {
+ try
+ {
+ decoder.headerName = holder.getString();
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ throw new HornetQStompException("Encoding
exception", e);
+ }
+
+ holder.reset();
+
+ decoder.inHeaderName = false;
+
+ decoder.headerBytesCopyStart = decoder.pos;
+
+ decoder.headerValueWhitespace = true;
+ }
+ }
+
+ decoder.whiteSpaceOnly = false;
+
+ break;
+ }
+ case StompDecoder.NEW_LINE:
+ {
+ if (decoder.whiteSpaceOnly)
+ {
+ // Headers are terminated by a blank line
+ decoder.readingHeaders = false;
+
+ break outer;
+ }
+
+ String headerValue;
+ try
+ {
+ headerValue = holder.getString();
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ throw new HornetQStompException("Encoding exception.",
e);
+ }
+ holder.reset();
+
+ decoder.headers.put(decoder.headerName, headerValue);
+
+ if
(decoder.headerName.equals(StompDecoder.CONTENT_LENGTH_HEADER_NAME))
+ {
+ decoder.contentLength = Integer.parseInt(headerValue);
+ }
+
+ decoder.whiteSpaceOnly = true;
+
+ decoder.headerBytesCopyStart = decoder.pos;
+
+ decoder.inHeaderName = true;
+
+ decoder.headerValueWhitespace = false;
+
+ break;
+ }
+ default:
+ {
+ decoder.whiteSpaceOnly = false;
+
+ decoder.headerValueWhitespace = false;
+ }
+ }
+ if (decoder.pos == decoder.data)
+ {
+ // Run out of data
+
+ return null;
+ }
+ }
+ }
+
+ // Now the body
+
+ byte[] content = null;
+
+ if (decoder.contentLength != -1)
+ {
+ if (decoder.pos + decoder.contentLength + 1 > decoder.data)
+ {
+ // Need more bytes
+ }
+ else
+ {
+ content = new byte[decoder.contentLength];
+
+ System.arraycopy(decoder.workingBuffer, decoder.pos, content, 0,
decoder.contentLength);
+
+ decoder.pos += decoder.contentLength + 1;
+ }
+ }
+ else
+ {
+ // Need to scan for terminating NUL
+
+ if (decoder.bodyStart == -1)
+ {
+ decoder.bodyStart = decoder.pos;
+ }
+
+ while (decoder.pos < decoder.data)
+ {
+ if (decoder.workingBuffer[decoder.pos++] == 0)
+ {
+ content = new byte[decoder.pos - decoder.bodyStart - 1];
+
+ System.arraycopy(decoder.workingBuffer, decoder.bodyStart, content, 0,
content.length);
+
+ break;
+ }
+ }
+ }
+
+ if (content != null)
+ {
+ if (decoder.data > decoder.pos)
+ {
+ if (decoder.workingBuffer[decoder.pos] == StompDecoder.NEW_LINE)
decoder.pos++;
+
+ if (decoder.data > decoder.pos)
+ // More data still in the buffer from the next packet
+ System.arraycopy(decoder.workingBuffer, decoder.pos, decoder.workingBuffer,
0, decoder.data - decoder.pos);
+ }
+
+ decoder.data = decoder.data - decoder.pos;
+
+ // reset
+
+ StompFrame ret = new StompFrameV11(decoder.command, decoder.headers, content);
+
+ decoder.init();
+
+ return ret;
+ }
+ else
+ {
+ return null;
+ }
+ }
}
Added:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameV11.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameV11.java
(rev 0)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameV11.java 2011-09-08
15:35:08 UTC (rev 11305)
@@ -0,0 +1,94 @@
+package org.hornetq.core.protocol.stomp.v11;
+
+import java.util.Map;
+
+import org.hornetq.core.protocol.stomp.HornetQStompException;
+import org.hornetq.core.protocol.stomp.StompFrame;
+
+public class StompFrameV11 extends StompFrame
+{
+ public static final char ESC_CHAR = '\\';
+ public static final char COLON = ':';
+
+ public StompFrameV11(String command, Map<String, String> headers, byte[]
content)
+ {
+ super(command, headers, content);
+ }
+
+ public StompFrameV11(String command)
+ {
+ super(command);
+ }
+
+ public static String escaping(String rawString) throws HornetQStompException
+ {
+ int len = rawString.length();
+
+ SimpleBytes sb = new SimpleBytes(1024);
+
+ boolean beginEsc = false;
+ for (int i = 0; i < len; i++)
+ {
+ char k = rawString.charAt(i);
+
+ if (k == ESC_CHAR)
+ {
+ if (beginEsc)
+ {
+ //it is a backslash
+ sb.append('\\');
+ beginEsc = false;
+ }
+ else
+ {
+ beginEsc = true;
+ }
+ }
+ else if (k == 'n')
+ {
+ if (beginEsc)
+ {
+ //it is a newline
+ sb.append('\n');
+ beginEsc = false;
+ }
+ else
+ {
+ sb.append(k);
+ }
+ }
+ else if (k == ':')
+ {
+ if (beginEsc)
+ {
+ sb.append(k);
+ beginEsc = false;
+ }
+ else
+ {
+ //error
+ throw new HornetQStompException("Colon not escaped!");
+ }
+ }
+ else
+ {
+ if (beginEsc)
+ {
+ //error, no other escape defined.
+ throw new HornetQStompException("Bad escape char found: " + k);
+ }
+ else
+ {
+ sb.append(k);
+ }
+ }
+ }
+ return sb.toString();
+ }
+
+ public static void main(String[] args)
+ {
+ String rawStr = "hello world\\n\\:"
+ }
+
+}