[hornetq-commits] JBoss hornetq SVN: r9747 - trunk/src/main/org/hornetq/core/protocol/stomp.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Oct 5 05:51:50 EDT 2010


Author: timfox
Date: 2010-10-05 05:51:50 -0400 (Tue, 05 Oct 2010)
New Revision: 9747

Added:
   trunk/src/main/org/hornetq/core/protocol/stomp/StompDecoder.java
Log:
https://jira.jboss.org/browse/HORNETQ-544

Added: trunk/src/main/org/hornetq/core/protocol/stomp/StompDecoder.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompDecoder.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompDecoder.java	2010-10-05 09:51:50 UTC (rev 9747)
@@ -0,0 +1,558 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.stomp;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.logging.Logger;
+
+/**
+ * A StompDecoder
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public class StompDecoder
+{
+   private static final Logger log = Logger.getLogger(StompDecoder.class);
+
+   private static final boolean TRIM_LEADING_HEADER_VALUE_WHITESPACE = true;
+
+   private static final String COMMAND_ABORT = "ABORT";
+
+   private static final int COMMAND_ABORT_LENGTH = COMMAND_ABORT.length();
+
+   private static final String COMMAND_ACK = "ACK";
+
+   private static final int COMMAND_ACK_LENGTH = COMMAND_ACK.length();
+
+   private static final String COMMAND_BEGIN = "BEGIN";
+
+   private static final int COMMAND_BEGIN_LENGTH = COMMAND_BEGIN.length();
+
+   private static final String COMMAND_COMMIT = "COMMIT";
+
+   private static final int COMMAND_COMMIT_LENGTH = COMMAND_COMMIT.length();
+
+   private static final String COMMAND_CONNECT = "CONNECT";
+
+   private static final int COMMAND_CONNECT_LENGTH = COMMAND_CONNECT.length();
+
+   private static final String COMMAND_DISCONNECT = "DISCONNECT";
+
+   private static final int COMMAND_DISCONNECT_LENGTH = COMMAND_DISCONNECT.length();
+
+   private static final String COMMAND_SEND = "SEND";
+
+   private static final int COMMAND_SEND_LENGTH = COMMAND_SEND.length();
+
+   private static final String COMMAND_SUBSCRIBE = "SUBSCRIBE";
+
+   private static final int COMMAND_SUBSCRIBE_LENGTH = COMMAND_SUBSCRIBE.length();
+
+   private static final String COMMAND_UNSUBSCRIBE = "UNSUBSCRIBE";
+
+   private static final int COMMAND_UNSUBSCRIBE_LENGTH = COMMAND_UNSUBSCRIBE.length();
+
+   private static final byte A = (byte)'A';
+
+   private static final byte B = (byte)'B';
+
+   private static final byte C = (byte)'C';
+
+   private static final byte D = (byte)'D';
+
+   private static final byte E = (byte)'E';
+
+   private static final byte M = (byte)'M';
+
+   private static final byte S = (byte)'S';
+
+   private static final byte U = (byte)'U';
+
+   private static final byte HEADER_SEPARATOR = (byte)':';
+
+   private static final byte NEW_LINE = (byte)'\n';
+
+   private static final byte SPACE = (byte)' ';
+
+   private static final byte TAB = (byte)'\t';
+
+   private static String CONTENT_LENGTH_HEADER_NAME = "content-length";
+
+   private byte[] workingBuffer = new byte[1024];
+
+   private int pos;
+
+   private int data;
+
+   private String command;
+
+   private Map<String, Object> headers;
+
+   private int headerBytesCopyStart;
+
+   private boolean readingHeaders;
+
+   private boolean headerValueWhitespace;
+
+   private boolean inHeaderName;
+
+   private String headerName;
+
+   private boolean whiteSpaceOnly;
+
+   private int contentLength;
+
+   private int bodyStart;
+
+   public StompDecoder()
+   {
+      init();
+   }
+
+   public boolean hasBytes()
+   {
+      return data > pos;
+   }
+
+   /*
+    * Stomp format is a command on the first line
+    * followed by a set of headers NAME:VALUE
+    * followed by an empty line
+    * followed by an optional message body
+    * terminated with a null character
+    */
+   public synchronized StompFrame decode(final HornetQBuffer buffer) throws Exception
+   {
+      int readable = buffer.readableBytes();
+
+      if (data + readable >= workingBuffer.length)
+      {
+         resizeWorking(data + readable);
+      }
+
+      buffer.readBytes(workingBuffer, data, readable);
+
+      data += readable;
+
+      if (command == null)
+      {
+         if (data < 4)
+         {
+            // Need at least four bytes to identify the command
+            // - up to 3 bytes for the command name + potentially another byte for a leading \n
+
+            return null;
+         }
+
+         int offset;
+
+         if (workingBuffer[0] == NEW_LINE)
+         {
+            // Yuck, some badly behaved STOMP clients add a \n *after* the terminating NUL char at the end of the
+            // STOMP
+            // frame this can manifest as an extra \n at the beginning when the next STOMP frame is read - we need to
+            // deal
+            // with this
+            offset = 1;
+         }
+         else
+         {
+            offset = 0;
+         }
+
+         byte b = workingBuffer[offset];
+
+         switch (b)
+         {
+            case A:
+            {
+               if (workingBuffer[offset + 1] == B)
+               {
+                  if (!tryIncrement(offset + COMMAND_ABORT_LENGTH + 1))
+                  {
+                     return null;
+                  }
+
+                  // ABORT
+                  command = COMMAND_ABORT;
+               }
+               else
+               {
+                  if (!tryIncrement(offset + COMMAND_ACK_LENGTH + 1))
+                  {
+                     return null;
+                  }
+
+                  // ACK
+                  command = COMMAND_ACK;
+               }
+               break;
+            }
+            case B:
+            {
+               if (!tryIncrement(offset + COMMAND_BEGIN_LENGTH + 1))
+               {
+                  return null;
+               }
+
+               // BEGIN
+               command = COMMAND_BEGIN;
+
+               break;
+            }
+            case C:
+            {
+               if (workingBuffer[offset + 2] == M)
+               {
+                  if (!tryIncrement(offset + COMMAND_COMMIT_LENGTH + 1))
+                  {
+                     return null;
+                  }
+
+                  // COMMIT
+                  command = COMMAND_COMMIT;
+               }
+               else
+               {
+                  if (!tryIncrement(offset + COMMAND_CONNECT_LENGTH + 1))
+                  {
+                     return null;
+                  }
+
+                  // CONNECT
+                  command = COMMAND_CONNECT;
+               }
+               break;
+            }
+            case D:
+            {
+               if (!tryIncrement(offset + COMMAND_DISCONNECT_LENGTH + 1))
+               {
+                  return null;
+               }
+
+               // DISCONNECT
+               command = COMMAND_DISCONNECT;
+
+               break;
+            }
+            case S:
+            {
+               if (workingBuffer[offset + 1] == E)
+               {
+                  if (!tryIncrement(offset + COMMAND_SEND_LENGTH + 1))
+                  {
+                     return null;
+                  }
+
+                  // SEND
+                  command = COMMAND_SEND;
+               }
+               else
+               {
+                  if (!tryIncrement(offset + COMMAND_SUBSCRIBE_LENGTH + 1))
+                  {
+                     return null;
+                  }
+
+                  // SUBSCRIBE
+                  command = COMMAND_SUBSCRIBE;
+               }
+               break;
+            }
+            case U:
+            {
+               if (!tryIncrement(offset + COMMAND_UNSUBSCRIBE_LENGTH + 1))
+               {
+                  return null;
+               }
+
+               // UNSUBSCRIBE
+               command = COMMAND_UNSUBSCRIBE;
+
+               break;
+            }
+            default:
+            {
+               throwInvalid();
+            }
+         }
+
+         // Sanity check
+
+         if (workingBuffer[pos - 1] != NEW_LINE)
+         {
+            throwInvalid();
+         }
+      }
+
+      if (readingHeaders)
+      {
+         if (headerBytesCopyStart == -1)
+         {
+            headerBytesCopyStart = pos;
+         }
+
+         // Now the headers
+
+         outer: while (true)
+         {
+            byte b = workingBuffer[pos++];
+
+            switch (b)
+            {
+               case HEADER_SEPARATOR:
+               {
+                  if (inHeaderName)
+                  {
+                     byte[] data = new byte[pos - headerBytesCopyStart - 1];
+
+                     System.arraycopy(workingBuffer, headerBytesCopyStart, data, 0, data.length);
+
+                     headerName = new String(data);
+
+                     inHeaderName = false;
+
+                     headerBytesCopyStart = pos;
+
+                     headerValueWhitespace = true;
+                  }
+
+                  whiteSpaceOnly = false;
+
+                  break;
+               }
+               case NEW_LINE:
+               {
+                  if (whiteSpaceOnly)
+                  {
+                     // Headers are terminated by a blank line
+                     readingHeaders = false;
+
+                     break outer;
+                  }
+
+                  byte[] data = new byte[pos - headerBytesCopyStart - 1];
+
+                  System.arraycopy(workingBuffer, headerBytesCopyStart, data, 0, data.length);
+
+                  String headerValue = new String(data);
+
+                  headers.put(headerName, headerValue);
+
+                  if (headerName.equals(CONTENT_LENGTH_HEADER_NAME))
+                  {
+                     contentLength = Integer.parseInt(headerValue.toString());
+                  }
+
+                  whiteSpaceOnly = true;
+
+                  headerBytesCopyStart = pos;
+
+                  inHeaderName = true;
+
+                  headerValueWhitespace = false;
+
+                  break;
+               }
+               case SPACE:
+               {
+               }
+               case TAB:
+               {
+                  if (TRIM_LEADING_HEADER_VALUE_WHITESPACE && headerValueWhitespace)
+                  {
+                     // trim off leading whitespace from header values.
+                     // The STOMP spec examples seem to imply that whitespace should be trimmed although it is not
+                     // explicit in the spec
+                     // ActiveMQ + StompConnect also seem to trim whitespace from header values.
+                     // Trimming is problematic though if the user has set a header with a value which deliberately
+                     // has
+                     // leading whitespace since
+                     // this will be removed
+                     headerBytesCopyStart++;
+                  }
+
+                  break;
+               }
+               default:
+               {
+                  whiteSpaceOnly = false;
+
+                  headerValueWhitespace = false;
+               }
+            }
+            if (pos == data)
+            {
+               // Run out of data
+
+               return null;
+            }
+         }
+      }
+
+      // Now the body
+
+      byte[] content = null;
+
+      if (contentLength != -1)
+      {
+         if (pos + contentLength > data)
+         {
+            // Need more bytes
+         }
+         else
+         {
+            content = new byte[contentLength];
+
+            System.arraycopy(workingBuffer, pos, content, 0, contentLength);
+
+            pos += contentLength + 1;
+         }
+      }
+      else
+      {
+         // Need to scan for terminating NUL
+
+         if (bodyStart == -1)
+         {
+            bodyStart = pos;
+         }
+
+         while (pos < data)
+         {
+            if (workingBuffer[pos++] == 0)
+            {
+               content = new byte[pos - bodyStart - 1];
+
+               System.arraycopy(workingBuffer, bodyStart, content, 0, content.length);
+
+               break;
+            }
+         }
+      }
+
+      if (content != null)
+      {
+         if (data > pos)
+         {
+            // More data still in the buffer from the next packet
+
+            System.arraycopy(workingBuffer, pos, workingBuffer, 0, data - pos);
+         }
+
+         data = data - pos;
+
+         // reset
+
+         StompFrame ret = new StompFrame(command, headers, content);
+
+         init();
+
+         return ret;
+      }
+      else
+      {
+         return null;
+      }
+   }
+
+   private void throwInvalid() throws StompException
+   {
+      throw new StompException("Invalid STOMP frame: " + this.dumpByteArray(workingBuffer));
+   }
+
+   private void init()
+   {
+      pos = 0;
+
+      command = null;
+
+      headers = new HashMap<String, Object>();
+
+      this.headerBytesCopyStart = -1;
+
+      readingHeaders = true;
+
+      inHeaderName = true;
+
+      headerValueWhitespace = false;
+
+      headerName = null;
+
+      whiteSpaceOnly = true;
+
+      contentLength = -1;
+
+      bodyStart = -1;
+   }
+
+   private void resizeWorking(final int newSize)
+   {
+      byte[] oldBuffer = workingBuffer;
+
+      workingBuffer = new byte[newSize];
+
+      System.arraycopy(oldBuffer, 0, workingBuffer, 0, oldBuffer.length);
+   }
+
+   private boolean tryIncrement(final int length)
+   {
+      if (pos + length >= data)
+      {
+         return false;
+      }
+      else
+      {
+         pos += length;
+
+         return true;
+      }
+   }
+
+   private String dumpByteArray(final byte[] bytes)
+   {
+      StringBuilder str = new StringBuilder();
+
+      for (int i = 0; i < data; i++)
+      {
+         char b = (char)bytes[i];
+
+         if (b == '\n')
+         {
+            str.append("\\n");
+         }
+         else if (b == 0)
+         {
+            str.append("NUL");
+         }
+         else
+         {
+            str.append(b);
+         }
+
+         if (i != bytes.length - 1)
+         {
+            str.append(",");
+         }
+      }
+
+      return str.toString();
+   }
+}



More information about the hornetq-commits mailing list