[hornetq-commits] JBoss hornetq SVN: r11305 - in branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp: v10 and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Sep 8 11:35:08 EDT 2011


Author: gaohoward
Date: 2011-09-08 11:35:08 -0400 (Thu, 08 Sep 2011)
New Revision: 11305

Added:
   branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/SimpleBytes.java
   branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameV10.java
   branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameV11.java
Modified:
   branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java
   branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java
   branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java
   branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java
   branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java
   branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
Log:
char escaping


Added: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/SimpleBytes.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/SimpleBytes.java	                        (rev 0)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/SimpleBytes.java	2011-09-08 15:35:08 UTC (rev 11305)
@@ -0,0 +1,44 @@
+package org.hornetq.core.protocol.stomp;
+
+import java.io.UnsupportedEncodingException;
+
+
+public class SimpleBytes
+{
+   private int step;
+   private byte[] contents;
+   private int index;
+   
+   public SimpleBytes(int initCapacity)
+   {
+      this.step = initCapacity;
+      contents = new byte[initCapacity];
+      index = 0;
+   }
+
+   public String getString() throws UnsupportedEncodingException
+   {
+      if (index == 0) return "";
+      byte[] realData = new byte[index];
+      System.arraycopy(contents, 0, realData, 0, realData.length);
+      
+      return new String(realData, "UTF-8");
+   }
+   
+   public void reset()
+   {
+      index = 0;
+   }
+
+   public void append(byte b)
+   {
+      if (index >= contents.length)
+      {
+         //grow
+         byte[] newBuffer = new byte[contents.length + step];
+         System.arraycopy(contents, 0, newBuffer, 0, contents.length);
+         contents = newBuffer;
+      }
+      contents[index++] = b;
+   }
+}

Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java	2011-09-08 15:06:11 UTC (rev 11304)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java	2011-09-08 15:35:08 UTC (rev 11305)
@@ -61,7 +61,7 @@
    
    private final long creationTime;
 
-   private StompDecoder decoder = new StompDecoder();
+   private StompDecoder decoder;
 
    private final List<FailureListener> failureListeners = new CopyOnWriteArrayList<FailureListener>();
 
@@ -90,6 +90,8 @@
 
       this.manager = manager;
       
+      this.decoder = new StompDecoder(this);
+      
       this.creationTime = System.currentTimeMillis();
    }
 
@@ -697,4 +699,9 @@
       }
 
    }
+
+   public VersionedStompFrameHandler getFrameHandler()
+   {
+      return this.frameHandler;
+   }
 }

Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java	2011-09-08 15:06:11 UTC (rev 11304)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java	2011-09-08 15:35:08 UTC (rev 11305)
@@ -30,118 +30,131 @@
 {
    private static final Logger log = Logger.getLogger(StompDecoder.class);
 
-   private static final boolean TRIM_LEADING_HEADER_VALUE_WHITESPACE = true;
+   public static final boolean TRIM_LEADING_HEADER_VALUE_WHITESPACE = true;
 
-   private static final String COMMAND_ABORT = "ABORT";
+   public static final String COMMAND_ABORT = "ABORT";
 
-   private static final int COMMAND_ABORT_LENGTH = COMMAND_ABORT.length();
+   public static final int COMMAND_ABORT_LENGTH = COMMAND_ABORT.length();
 
-   private static final String COMMAND_ACK = "ACK";
+   public static final String COMMAND_ACK = "ACK";
 
-   private static final int COMMAND_ACK_LENGTH = COMMAND_ACK.length();
+   public static final int COMMAND_ACK_LENGTH = COMMAND_ACK.length();
 
-   private static final String COMMAND_BEGIN = "BEGIN";
+   public static final String COMMAND_NACK = "NACK";
 
-   private static final int COMMAND_BEGIN_LENGTH = COMMAND_BEGIN.length();
+   public static final int COMMAND_NACK_LENGTH = COMMAND_NACK.length();
 
-   private static final String COMMAND_COMMIT = "COMMIT";
+   public static final String COMMAND_BEGIN = "BEGIN";
 
-   private static final int COMMAND_COMMIT_LENGTH = COMMAND_COMMIT.length();
+   public static final int COMMAND_BEGIN_LENGTH = COMMAND_BEGIN.length();
 
-   private static final String COMMAND_CONNECT = "CONNECT";
+   public static final String COMMAND_COMMIT = "COMMIT";
 
-   private static final int COMMAND_CONNECT_LENGTH = COMMAND_CONNECT.length();
+   public static final int COMMAND_COMMIT_LENGTH = COMMAND_COMMIT.length();
 
-   private static final String COMMAND_DISCONNECT = "DISCONNECT";
+   public static final String COMMAND_CONNECT = "CONNECT";
 
-   private static final int COMMAND_DISCONNECT_LENGTH = COMMAND_DISCONNECT.length();
+   public static final int COMMAND_CONNECT_LENGTH = COMMAND_CONNECT.length();
 
-   private static final String COMMAND_SEND = "SEND";
+   public static final String COMMAND_DISCONNECT = "DISCONNECT";
 
-   private static final int COMMAND_SEND_LENGTH = COMMAND_SEND.length();
+   public static final int COMMAND_DISCONNECT_LENGTH = COMMAND_DISCONNECT.length();
 
-   private static final String COMMAND_SUBSCRIBE = "SUBSCRIBE";
+   public static final String COMMAND_SEND = "SEND";
 
-   private static final int COMMAND_SUBSCRIBE_LENGTH = COMMAND_SUBSCRIBE.length();
+   public static final int COMMAND_SEND_LENGTH = COMMAND_SEND.length();
 
-   private static final String COMMAND_UNSUBSCRIBE = "UNSUBSCRIBE";
+   public static final String COMMAND_STOMP = "STOMP";
 
-   private static final int COMMAND_UNSUBSCRIBE_LENGTH = COMMAND_UNSUBSCRIBE.length();
+   public static final int COMMAND_STOMP_LENGTH = COMMAND_STOMP.length();
 
+   public static final String COMMAND_SUBSCRIBE = "SUBSCRIBE";
+
+   public static final int COMMAND_SUBSCRIBE_LENGTH = COMMAND_SUBSCRIBE.length();
+
+   public static final String COMMAND_UNSUBSCRIBE = "UNSUBSCRIBE";
+
+   public static final int COMMAND_UNSUBSCRIBE_LENGTH = COMMAND_UNSUBSCRIBE.length();
+
    /**** added by meddy, 27 april 2011, handle header parser for reply to websocket protocol ****/
-   private static final String COMMAND_CONNECTED = "CONNECTED";
+   public static final String COMMAND_CONNECTED = "CONNECTED";
 
-   private static final int COMMAND_CONNECTED_LENGTH = COMMAND_CONNECTED.length();
+   public static final int COMMAND_CONNECTED_LENGTH = COMMAND_CONNECTED.length();
    
-   private static final String COMMAND_MESSAGE = "MESSAGE";
+   public static final String COMMAND_MESSAGE = "MESSAGE";
 
-   private static final int COMMAND_MESSAGE_LENGTH = COMMAND_MESSAGE.length();
+   public static final int COMMAND_MESSAGE_LENGTH = COMMAND_MESSAGE.length();
 
-   private static final String COMMAND_ERROR = "ERROR";
+   public static final String COMMAND_ERROR = "ERROR";
 
-   private static final int COMMAND_ERROR_LENGTH = COMMAND_ERROR.length();
+   public static final int COMMAND_ERROR_LENGTH = COMMAND_ERROR.length();
 
-   private static final String COMMAND_RECEIPT = "RECEIPT";
+   public static final String COMMAND_RECEIPT = "RECEIPT";
 
-   private static final int COMMAND_RECEIPT_LENGTH = COMMAND_RECEIPT.length();
+   public static final int COMMAND_RECEIPT_LENGTH = COMMAND_RECEIPT.length();
    /**** end  ****/
 
-   private static final byte A = (byte)'A';
+   public static final byte A = (byte)'A';
 
-   private static final byte B = (byte)'B';
+   public static final byte B = (byte)'B';
 
-   private static final byte C = (byte)'C';
+   public static final byte C = (byte)'C';
 
-   private static final byte D = (byte)'D';
+   public static final byte D = (byte)'D';
 
-   private static final byte E = (byte)'E';
+   public static final byte E = (byte)'E';
 
-   private static final byte M = (byte)'M';
+   public static final byte M = (byte)'M';
 
-   private static final byte S = (byte)'S';
+   public static final byte S = (byte)'S';
    
-   private static final byte R = (byte)'R';
+   public static final byte R = (byte)'R';
 
-   private static final byte U = (byte)'U';
+   public static final byte U = (byte)'U';
 
-   private static final byte HEADER_SEPARATOR = (byte)':';
+   public static final byte N = (byte)'N';
 
-   private static final byte NEW_LINE = (byte)'\n';
+   public static final byte HEADER_SEPARATOR = (byte)':';
 
-   private static final byte SPACE = (byte)' ';
+   public static final byte NEW_LINE = (byte)'\n';
 
-   private static final byte TAB = (byte)'\t';
+   public static final byte SPACE = (byte)' ';
 
-   private static String CONTENT_LENGTH_HEADER_NAME = "content-length";
+   public static final byte TAB = (byte)'\t';
 
-   private byte[] workingBuffer = new byte[1024];
+   public static String CONTENT_LENGTH_HEADER_NAME = "content-length";
 
-   private int pos;
+   public byte[] workingBuffer = new byte[1024];
 
-   private int data;
+   public int pos;
 
-   private String command;
+   public int data;
 
-   private Map<String, Object> headers;
+   public String command;
 
-   private int headerBytesCopyStart;
+   public Map<String, String> headers;
 
-   private boolean readingHeaders;
+   public int headerBytesCopyStart;
 
-   private boolean headerValueWhitespace;
+   public boolean readingHeaders;
 
-   private boolean inHeaderName;
+   public boolean headerValueWhitespace;
 
-   private String headerName;
+   public boolean inHeaderName;
 
-   private boolean whiteSpaceOnly;
+   public String headerName;
 
-   private int contentLength;
+   public boolean whiteSpaceOnly;
 
-   private int bodyStart;
+   public int contentLength;
 
-   public StompDecoder()
+   public int bodyStart;
+   
+   public StompConnection connection;
+
+   public StompDecoder(StompConnection stompConnection)
    {
+      this.connection = stompConnection;
       init();
    }
 
@@ -156,13 +169,25 @@
     * followed by an empty line
     * followed by an optional message body
     * terminated with a null character
+    * 
+    * Note: to support both 1.0 and 1.1, we just assemble a
+    * standard StompFrame and let the versioned handler to do more
+    * spec specific job (like trimming, escaping etc).
     */
    public synchronized StompFrame decode(final HornetQBuffer buffer) throws Exception
    {
-      //log.info("got buff " + buffer.readableBytes());
+      if (connection.isValid())
+      {
+         VersionedStompFrameHandler handler = connection.getFrameHandler();
+         return handler.decode(this, buffer);
+      }
       
-      long start = System.nanoTime();
-      
+      return defaultDecode(buffer);
+   }
+   
+   public StompFrame defaultDecode(final HornetQBuffer buffer) throws HornetQStompException
+   {
+
       int readable = buffer.readableBytes();
 
       if (data + readable >= workingBuffer.length)
@@ -375,8 +400,6 @@
             throwInvalid();
          }
       }
-      
-      long commandTime = System.nanoTime() - start;
 
       if (readingHeaders)
       {
@@ -482,8 +505,6 @@
             }
          }
       }
-      
-      long headersTime = System.nanoTime() - start - commandTime;
 
       // Now the body
 
@@ -526,8 +547,6 @@
          }
       }
       
-      
-
       if (content != null)
       {
          if (data > pos)
@@ -546,34 +565,26 @@
          StompFrame ret = new StompFrame(command, headers, content);
 
          init();
-         
-        // log.info("decoded");
-         
-         long bodyTime = System.nanoTime() - start - headersTime - commandTime;
-         
-        // log.info("command: "+ commandTime + " headers: " + headersTime + " body: " + bodyTime);
 
          return ret;
       }
       else
       {
          return null;
-      }
+      }      
    }
 
-   private void throwInvalid() throws StompException
+   public void throwInvalid() throws HornetQStompException
    {
-      throw new StompException("Invalid STOMP frame: " + this.dumpByteArray(workingBuffer));
+      throw new HornetQStompException("Invalid STOMP frame: " + this.dumpByteArray(workingBuffer));
    }
 
-   private void init()
+   public void init()
    {
       pos = 0;
 
       command = null;
 
-      headers = new HashMap<String, Object>();
-
       this.headerBytesCopyStart = -1;
 
       readingHeaders = true;
@@ -591,7 +602,7 @@
       bodyStart = -1;
    }
 
-   private void resizeWorking(final int newSize)
+   public void resizeWorking(final int newSize)
    {
       byte[] oldBuffer = workingBuffer;
 
@@ -600,7 +611,7 @@
       System.arraycopy(oldBuffer, 0, workingBuffer, 0, oldBuffer.length);
    }
 
-   private boolean tryIncrement(final int length)
+   public boolean tryIncrement(final int length)
    {
       if (pos + length >= data)
       {

Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java	2011-09-08 15:06:11 UTC (rev 11304)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java	2011-09-08 15:35:08 UTC (rev 11305)
@@ -51,6 +51,8 @@
    private List<Header> allHeaders = new ArrayList<Header>();
 
    private String body;
+   
+   private byte[] bytesBody;
 
    private HornetQBuffer buffer = null;
 
@@ -70,6 +72,14 @@
       this.disconnect = disconnect;
    }
 
+   public StompFrame(String command, Map<String, String> headers,
+         byte[] content)
+   {
+      this.command = command;
+      this.headers = headers;
+      this.bytesBody = content;
+   }
+
    public String getCommand()
    {
       return command;
@@ -107,7 +117,7 @@
    {
       if (buffer == null)
       {
-         buffer = HornetQBuffers.dynamicBuffer(content.length + 512);
+         buffer = HornetQBuffers.dynamicBuffer(bytesBody.length + 512);
 
          StringBuffer head = new StringBuffer();
          head.append(command);
@@ -124,7 +134,7 @@
          head.append(Stomp.NEWLINE);
 
          buffer.writeBytes(head.toString().getBytes("UTF-8"));
-         buffer.writeBytes(content);
+         buffer.writeBytes(bytesBody);
          buffer.writeBytes(END_OF_FRAME);
 
          size = buffer.writerIndex();
@@ -151,7 +161,7 @@
       return headers;
    }
    
-   private class Header
+   public static class Header
    {
       public String key;
       public String val;
@@ -192,4 +202,14 @@
    {
       return disconnect;
    }
+
+   public List<Header> getHeaders()
+   {
+      return this.allHeaders;
+   }
+
+   public void setByteBody(byte[] content)
+   {
+      this.bytesBody = content;
+   }
 }

Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java	2011-09-08 15:06:11 UTC (rev 11304)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java	2011-09-08 15:35:08 UTC (rev 11305)
@@ -14,6 +14,7 @@
 
 import java.io.UnsupportedEncodingException;
 
+import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.core.protocol.stomp.v10.StompFrameHandlerV10;
 import org.hornetq.core.protocol.stomp.v11.StompFrameHandlerV11;
 import org.hornetq.core.server.ServerMessage;
@@ -121,11 +122,23 @@
    public StompFrame handleReceipt(String receiptID)
    {
       StompFrame receipt = new StompFrame(Stomp.Responses.RECEIPT);
-      receipt.addHeader(Stomp.Headers.Response.RECEIPT_ID, receiptID);
+      try
+      {
+         receipt.addHeader(Stomp.Headers.Response.RECEIPT_ID, receiptID);
+      }
+      catch (HornetQStompException e)
+      {
+         return e.getFrame();
+      }
       
       return receipt;
    }
 
    public abstract StompFrame createMessageFrame(ServerMessage serverMessage,
          StompSubscription subscription, int deliveryCount) throws Exception;
+
+   public abstract StompFrame createStompFrame(String command);
+
+   public abstract StompFrame decode(StompDecoder decoder, final HornetQBuffer buffer) throws HornetQStompException;
+
 }

Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java	2011-09-08 15:06:11 UTC (rev 11304)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java	2011-09-08 15:35:08 UTC (rev 11305)
@@ -13,6 +13,7 @@
 package org.hornetq.core.protocol.stomp.v10;
 
 import java.io.UnsupportedEncodingException;
+import java.util.List;
 import java.util.Map;
 
 import org.hornetq.api.core.HornetQBuffer;
@@ -23,7 +24,9 @@
 import org.hornetq.core.protocol.stomp.HornetQStompException;
 import org.hornetq.core.protocol.stomp.Stomp;
 import org.hornetq.core.protocol.stomp.StompConnection;
+import org.hornetq.core.protocol.stomp.StompDecoder;
 import org.hornetq.core.protocol.stomp.StompFrame;
+import org.hornetq.core.protocol.stomp.StompFrame.Header;
 import org.hornetq.core.protocol.stomp.StompSubscription;
 import org.hornetq.core.protocol.stomp.StompUtils;
 import org.hornetq.core.protocol.stomp.VersionedStompFrameHandler;
@@ -345,4 +348,15 @@
 
    }
 
+   @Override
+   public StompFrame createStompFrame(String command)
+   {
+      return new StompFrameV10(command);
+   }
+
+   public StompFrame decode(StompDecoder decoder, final HornetQBuffer buffer) throws HornetQStompException
+   {
+      return decoder.defaultDecode(buffer);
+   }
+
 }

Added: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameV10.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameV10.java	                        (rev 0)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameV10.java	2011-09-08 15:35:08 UTC (rev 11305)
@@ -0,0 +1,22 @@
+package org.hornetq.core.protocol.stomp.v10;
+
+import org.hornetq.core.protocol.stomp.HornetQStompException;
+import org.hornetq.core.protocol.stomp.StompFrame;
+
+public class StompFrameV10 extends StompFrame
+{
+   public StompFrameV10(String command)
+   {
+      super(command);
+   }
+   
+   @Override
+   public void addHeader(String key, String val) throws HornetQStompException
+   {
+      //trimming
+      String newKey = key.trim();
+      String newVal = val.trim();
+      super.addHeader(newKey, newVal);
+   }
+
+}

Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java	2011-09-08 15:06:11 UTC (rev 11304)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java	2011-09-08 15:35:08 UTC (rev 11305)
@@ -12,6 +12,7 @@
  */
 package org.hornetq.core.protocol.stomp.v11;
 
+import java.io.UnsupportedEncodingException;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -24,6 +25,7 @@
 import org.hornetq.core.protocol.stomp.HornetQStompException;
 import org.hornetq.core.protocol.stomp.Stomp;
 import org.hornetq.core.protocol.stomp.StompConnection;
+import org.hornetq.core.protocol.stomp.StompDecoder;
 import org.hornetq.core.protocol.stomp.StompFrame;
 import org.hornetq.core.protocol.stomp.StompSubscription;
 import org.hornetq.core.protocol.stomp.StompUtils;
@@ -40,6 +42,8 @@
 {
    private static final Logger log = Logger.getLogger(StompFrameHandlerV11.class);
    
+   private static final char ESC_CHAR = '\\';
+   
    private HeartBeater heartBeater;
 
    public StompFrameHandlerV11(StompConnection connection)
@@ -538,4 +542,436 @@
       }
    }
 
+   @Override
+   public StompFrame createStompFrame(String command)
+   {
+      return new StompFrameV11(command);
+   }
+   
+   //all frame except CONNECT are decoded here.
+   public StompFrame decode(StompDecoder decoder, final HornetQBuffer buffer) throws HornetQStompException
+   {
+      int readable = buffer.readableBytes();
+
+      if (decoder.data + readable >= decoder.workingBuffer.length)
+      {
+         decoder.resizeWorking(decoder.data + readable);
+      }
+
+      buffer.readBytes(decoder.workingBuffer, decoder.data, readable);
+
+      decoder.data += readable;
+
+      if (decoder.command == null)
+      {
+         if (decoder.data < 4)
+         {
+            // Need at least four bytes to identify the command
+            // - up to 3 bytes for the command name + potentially another byte for a leading \n
+
+            return null;
+         }
+
+         int offset;
+
+         if (decoder.workingBuffer[0] == StompDecoder.NEW_LINE)
+         {
+            // Yuck, some badly behaved STOMP clients add a \n *after* the terminating NUL char at the end of the
+            // STOMP
+            // frame this can manifest as an extra \n at the beginning when the next STOMP frame is read - we need to
+            // deal
+            // with this
+            offset = 1;
+         }
+         else
+         {
+            offset = 0;
+         }
+
+         byte b = decoder.workingBuffer[offset];
+
+         switch (b)
+         {
+            case StompDecoder.A:
+            {
+               if (decoder.workingBuffer[offset + 1] == StompDecoder.B)
+               {
+                  if (!decoder.tryIncrement(offset + StompDecoder.COMMAND_ABORT_LENGTH + 1))
+                  {
+                     return null;
+                  }
+
+                  // ABORT
+                  decoder.command = StompDecoder.COMMAND_ABORT;
+               }
+               else
+               {
+                  if (!decoder.tryIncrement(offset + StompDecoder.COMMAND_ACK_LENGTH + 1))
+                  {
+                     return null;
+                  }
+
+                  // ACK
+                  decoder.command = StompDecoder.COMMAND_ACK;
+               }
+               break;
+            }
+            case StompDecoder.B:
+            {
+               if (!decoder.tryIncrement(offset + StompDecoder.COMMAND_BEGIN_LENGTH + 1))
+               {
+                  return null;
+               }
+
+               // BEGIN
+               decoder.command = StompDecoder.COMMAND_BEGIN;
+
+               break;
+            }
+            case StompDecoder.C:
+            {
+               if (decoder.workingBuffer[offset + 2] == StompDecoder.M)
+               {
+                  if (!decoder.tryIncrement(offset + StompDecoder.COMMAND_COMMIT_LENGTH + 1))
+                  {
+                     return null;
+                  }
+
+                  // COMMIT
+                  decoder.command = StompDecoder.COMMAND_COMMIT;
+               }
+               /**** added by meddy, 27 april 2011, handle header parser for reply to websocket protocol ****/
+               else if (decoder.workingBuffer[offset+7] == StompDecoder.E) 
+               {
+                  if (!decoder.tryIncrement(offset + StompDecoder.COMMAND_CONNECTED_LENGTH + 1))
+                  {
+                     return null;
+                  }
+
+                  // CONNECTED
+                  decoder.command = StompDecoder.COMMAND_CONNECTED;                  
+               }
+               /**** end ****/
+               else
+               {
+                  if (!decoder.tryIncrement(offset + StompDecoder.COMMAND_CONNECT_LENGTH + 1))
+                  {
+                     return null;
+                  }
+
+                  // CONNECT
+                  decoder.command = StompDecoder.COMMAND_CONNECT;
+               }
+               break;
+            }
+            case StompDecoder.D:
+            {
+               if (!decoder.tryIncrement(offset + StompDecoder.COMMAND_DISCONNECT_LENGTH + 1))
+               {
+                  return null;
+               }
+
+               // DISCONNECT
+               decoder.command = StompDecoder.COMMAND_DISCONNECT;
+
+               break;
+            }
+            case StompDecoder.R:
+            {
+               if (!decoder.tryIncrement(offset + StompDecoder.COMMAND_RECEIPT_LENGTH + 1))
+               {
+                  return null;
+               }
+
+               // RECEIPT
+               decoder.command = StompDecoder.COMMAND_RECEIPT;
+
+               break;
+            }
+            /**** added by meddy, 27 april 2011, handle header parser for reply to websocket protocol ****/
+            case StompDecoder.E:
+            {
+               if (!decoder.tryIncrement(offset + StompDecoder.COMMAND_ERROR_LENGTH + 1))
+               {
+                  return null;
+               }
+
+               // ERROR
+               decoder.command = StompDecoder.COMMAND_ERROR;
+
+               break;
+            }
+            case StompDecoder.M:
+            {
+               if (!decoder.tryIncrement(offset + StompDecoder.COMMAND_MESSAGE_LENGTH + 1))
+               {
+                  return null;
+               }
+
+               // MESSAGE
+               decoder.command = StompDecoder.COMMAND_MESSAGE;
+
+               break;
+            }
+            /**** end ****/
+            case StompDecoder.S:
+            {
+               if (decoder.workingBuffer[offset + 1] == StompDecoder.E)
+               {
+                  if (!decoder.tryIncrement(offset + StompDecoder.COMMAND_SEND_LENGTH + 1))
+                  {
+                     return null;
+                  }
+
+                  // SEND
+                  decoder.command = StompDecoder.COMMAND_SEND;
+               }
+               else if (decoder.workingBuffer[offset + 1] == StompDecoder.U)
+               {
+                  if (!decoder.tryIncrement(offset + StompDecoder.COMMAND_SUBSCRIBE_LENGTH + 1))
+                  {
+                     return null;
+                  }
+
+                  // SUBSCRIBE
+                  decoder.command = StompDecoder.COMMAND_SUBSCRIBE;
+               }
+               else
+               {
+                  if (!decoder.tryIncrement(offset + StompDecoder.COMMAND_STOMP_LENGTH + 1))
+                  {
+                     return null;
+                  }
+
+                  // SUBSCRIBE
+                  decoder.command = StompDecoder.COMMAND_STOMP;
+               }
+               break;
+            }
+            case StompDecoder.U:
+            {
+               if (!decoder.tryIncrement(offset + StompDecoder.COMMAND_UNSUBSCRIBE_LENGTH + 1))
+               {
+                  return null;
+               }
+
+               // UNSUBSCRIBE
+               decoder.command = StompDecoder.COMMAND_UNSUBSCRIBE;
+
+               break;
+            }
+            case StompDecoder.N:
+            {
+               if (!decoder.tryIncrement(offset + StompDecoder.COMMAND_NACK_LENGTH + 1))
+               {
+                  return null;
+               }
+               //NACK
+               decoder.command = StompDecoder.COMMAND_NACK;
+               break;
+            }
+            default:
+            {
+               decoder.throwInvalid();
+            }
+         }
+
+         // Sanity check
+
+         if (decoder.workingBuffer[decoder.pos - 1] != StompDecoder.NEW_LINE)
+         {
+            decoder.throwInvalid();
+         }
+      }
+
+      if (decoder.readingHeaders)
+      {
+         if (decoder.headerBytesCopyStart == -1)
+         {
+            decoder.headerBytesCopyStart = decoder.pos;
+         }
+
+         // Now the headers
+
+         boolean isEscaping = false;
+         SimpleBytes holder = new SimpleBytes(1024);         
+         
+         outer: while (true)
+         {
+            byte b = decoder.workingBuffer[decoder.pos++];
+
+            switch (b)
+            {
+               //escaping
+               case ESC_CHAR:
+               {
+                  if (isEscaping)
+                  {
+                     //this is a backslash
+                     holder.append(b);
+                     isEscaping = false;
+                  }
+                  else
+                  {
+                     //begin escaping
+                     isEscaping = true;
+                  }
+                  break;
+               }
+               case StompDecoder.HEADER_SEPARATOR:
+               {
+                  if (isEscaping)
+                  {
+                     //a colon
+                     holder.append(b);
+                     isEscaping = false;
+                  }
+                  else
+                  {
+                     if (decoder.inHeaderName)
+                     {
+                        try
+                        {
+                           decoder.headerName = holder.getString();
+                        }
+                        catch (UnsupportedEncodingException e)
+                        {
+                           throw new HornetQStompException("Encoding exception", e);
+                        }
+                        
+                        holder.reset();
+
+                        decoder.inHeaderName = false;
+
+                        decoder.headerBytesCopyStart = decoder.pos;
+
+                        decoder.headerValueWhitespace = true;
+                     }
+                  }
+
+                  decoder.whiteSpaceOnly = false;
+
+                  break;
+               }
+               case StompDecoder.NEW_LINE:
+               {
+                  if (decoder.whiteSpaceOnly)
+                  {
+                     // Headers are terminated by a blank line
+                     decoder.readingHeaders = false;
+
+                     break outer;
+                  }
+
+                  String headerValue;
+                  try
+                  {
+                     headerValue = holder.getString();
+                  }
+                  catch (UnsupportedEncodingException e)
+                  {
+                     throw new HornetQStompException("Encoding exception.", e);
+                  }
+                  holder.reset();
+                  
+                  decoder.headers.put(decoder.headerName, headerValue);
+
+                  if (decoder.headerName.equals(StompDecoder.CONTENT_LENGTH_HEADER_NAME))
+                  {
+                     decoder.contentLength = Integer.parseInt(headerValue);
+                  }
+
+                  decoder.whiteSpaceOnly = true;
+
+                  decoder.headerBytesCopyStart = decoder.pos;
+
+                  decoder.inHeaderName = true;
+
+                  decoder.headerValueWhitespace = false;
+
+                  break;
+               }
+               default:
+               {
+                  decoder.whiteSpaceOnly = false;
+
+                  decoder.headerValueWhitespace = false;
+               }
+            }
+            if (decoder.pos == decoder.data)
+            {
+               // Run out of data
+
+               return null;
+            }
+         }
+      }
+
+      // Now the body
+
+      byte[] content = null;
+
+      if (decoder.contentLength != -1)
+      {
+         if (decoder.pos + decoder.contentLength + 1 > decoder.data)
+         {
+            // Need more bytes
+         }
+         else
+         {
+            content = new byte[decoder.contentLength];
+
+            System.arraycopy(decoder.workingBuffer, decoder.pos, content, 0, decoder.contentLength);
+
+            decoder.pos += decoder.contentLength + 1;
+         }
+      }
+      else
+      {
+         // Need to scan for terminating NUL
+
+         if (decoder.bodyStart == -1)
+         {
+            decoder.bodyStart = decoder.pos;
+         }
+
+         while (decoder.pos < decoder.data)
+         {
+            if (decoder.workingBuffer[decoder.pos++] == 0)
+            {
+               content = new byte[decoder.pos - decoder.bodyStart - 1];
+
+               System.arraycopy(decoder.workingBuffer, decoder.bodyStart, content, 0, content.length);
+
+               break;
+            }
+         }
+      }
+      
+      if (content != null)
+      {
+         if (decoder.data > decoder.pos)
+         {
+            if (decoder.workingBuffer[decoder.pos] == StompDecoder.NEW_LINE) decoder.pos++;
+
+            if (decoder.data > decoder.pos)
+              // More data still in the buffer from the next packet
+              System.arraycopy(decoder.workingBuffer, decoder.pos, decoder.workingBuffer, 0, decoder.data - decoder.pos);
+         }
+
+         decoder.data = decoder.data - decoder.pos;
+
+         // reset
+
+         StompFrame ret = new StompFrameV11(decoder.command, decoder.headers, content);
+
+         decoder.init();
+
+         return ret;
+      }
+      else
+      {
+         return null;
+      }
+   }
 }

Added: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameV11.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameV11.java	                        (rev 0)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameV11.java	2011-09-08 15:35:08 UTC (rev 11305)
@@ -0,0 +1,94 @@
+package org.hornetq.core.protocol.stomp.v11;
+
+import java.util.Map;
+
+import org.hornetq.core.protocol.stomp.HornetQStompException;
+import org.hornetq.core.protocol.stomp.StompFrame;
+
+public class StompFrameV11 extends StompFrame
+{
+   public static final char ESC_CHAR = '\\';
+   public static final char COLON = ':';
+
+   public StompFrameV11(String command, Map<String, String> headers, byte[] content)
+   {
+      super(command, headers, content);
+   }
+   
+   public StompFrameV11(String command)
+   {
+      super(command);
+   }
+
+   public static String escaping(String rawString) throws HornetQStompException
+   {
+      int len = rawString.length();
+
+      SimpleBytes sb = new SimpleBytes(1024);
+      
+      boolean beginEsc = false;
+      for (int i = 0; i < len; i++)
+      {
+         char k = rawString.charAt(i);
+
+         if (k == ESC_CHAR)
+         {
+            if (beginEsc)
+            {
+               //it is a backslash
+               sb.append('\\');
+               beginEsc = false;
+            }
+            else
+            {
+               beginEsc = true;
+            }
+         }
+         else if (k == 'n')
+         {
+            if (beginEsc)
+            {
+               //it is a newline
+               sb.append('\n');
+               beginEsc = false;
+            }
+            else
+            {
+               sb.append(k);
+            }
+         }
+         else if (k == ':')
+         {
+            if (beginEsc)
+            {
+               sb.append(k);
+               beginEsc = false;
+            }
+            else
+            {
+               //error
+               throw new HornetQStompException("Colon not escaped!");
+            }
+         }
+         else
+         {
+            if (beginEsc)
+            {
+               //error, no other escape defined.
+               throw new HornetQStompException("Bad escape char found: " + k);
+            }
+            else
+            {
+               sb.append(k);
+            }
+         }
+      }
+      return sb.toString();
+   }
+
+   public static void main(String[] args)
+   {
+      String rawStr = "hello world\\n\\:"
+   }
+
+}



More information about the hornetq-commits mailing list