Author: gaohoward
Date: 2011-09-09 08:42:03 -0400 (Fri, 09 Sep 2011)
New Revision: 11309
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompUtils.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameV10.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameV11.java
Log:
more code
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java 2011-09-08
16:13:30 UTC (rev 11308)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java 2011-09-09
12:42:03 UTC (rev 11309)
@@ -122,6 +122,8 @@
public static final byte TAB = (byte)'\t';
+ public static final String CONTENT_TYPE_HEADER_NAME = "content-type";
+
public static String CONTENT_LENGTH_HEADER_NAME = "content-length";
public byte[] workingBuffer = new byte[1024];
@@ -147,6 +149,8 @@
public boolean whiteSpaceOnly;
public int contentLength;
+
+ public String contentType;
public int bodyStart;
@@ -599,6 +603,8 @@
contentLength = -1;
+ contentType = null;
+
bodyStart = -1;
}
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java 2011-09-08
16:13:30 UTC (rev 11308)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java 2011-09-09
12:42:03 UTC (rev 11309)
@@ -37,28 +37,25 @@
*/
public class StompFrame
{
- private static final Logger log = Logger.getLogger(StompFrame.class);
+ protected static final Logger log = Logger.getLogger(StompFrame.class);
- public static final byte[] NO_DATA = new byte[] {};
+ protected static final byte[] NO_DATA = new byte[] {};
- private static final byte[] END_OF_FRAME = new byte[] { 0, '\n' };
+ protected static final byte[] END_OF_FRAME = new byte[] { 0, '\n' };
- private String command;
+ protected String command;
- private Map<String, String> headers;
-
- //stomp 1.1 talks about repetitive headers.
- private List<Header> allHeaders = new ArrayList<Header>();
+ protected Map<String, String> headers;
- private String body;
+ protected String body;
- private byte[] bytesBody;
+ protected byte[] bytesBody;
- private HornetQBuffer buffer = null;
+ protected HornetQBuffer buffer = null;
- private int size;
+ protected int size;
- private boolean disconnect;
+ protected boolean disconnect;
public StompFrame(String command)
{
@@ -111,7 +108,6 @@
out += body;
return out;
}
-
public HornetQBuffer toHornetQBuffer() throws Exception
{
@@ -149,11 +145,7 @@
public void addHeader(String key, String val)
{
- if (!headers.containsKey(key))
- {
- headers.put(key, val);
- }
- allHeaders.add(new Header(key, val));
+ headers.put(key, val);
}
public Map<String, String> getHeadersMap()
@@ -171,11 +163,31 @@
this.key = key;
this.val = val;
}
+
+ public String getEscapedKey()
+ {
+ return escape(key);
+ }
+
+ public String getEscapedValue()
+ {
+ return escape(val);
+ }
+
+ private String escape(String str)
+ {
+ str = str.replaceAll("\n", "\\n");
+ str = str.replaceAll("\\", "\\\\");
+ str = str.replaceAll(":", "\\:");
+
+ return str;
+ }
}
- public void setBody(String body)
+ public void setBody(String body) throws UnsupportedEncodingException
{
this.body = body;
+ this.bytesBody = body.getBytes("UTF-8");
}
public boolean hasHeader(String key)
@@ -191,11 +203,7 @@
//Since 1.1, there is a content-type header that needs to take care of
public byte[] getBodyAsBytes() throws UnsupportedEncodingException
{
- if (body != null)
- {
- return body.getBytes("UTF-8");
- }
- return new byte[0];
+ return bytesBody;
}
public boolean needsDisconnect()
@@ -203,11 +211,6 @@
return disconnect;
}
- public List<Header> getHeaders()
- {
- return this.allHeaders;
- }
-
public void setByteBody(byte[] content)
{
this.bytesBody = content;
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompUtils.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompUtils.java 2011-09-08
16:13:30 UTC (rev 11308)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompUtils.java 2011-09-09
12:42:03 UTC (rev 11309)
@@ -46,6 +46,8 @@
public static void copyStandardHeadersFromFrameToMessage(StompFrame frame,
ServerMessageImpl msg) throws Exception
{
+ Map<String, String> headers = new HashMap<String,
String>(frame.getHeadersMap());
+
String priority = (String)headers.remove(Stomp.Headers.Send.PRIORITY);
if (priority != null)
{
@@ -80,10 +82,10 @@
}
// now the general headers
- for (Iterator<Map.Entry<String, Object>> iter =
headers.entrySet().iterator(); iter.hasNext();)
+ for (Iterator<Map.Entry<String, String>> iter =
headers.entrySet().iterator(); iter.hasNext();)
{
- Map.Entry<String, Object> entry = iter.next();
- String name = (String)entry.getKey();
+ Map.Entry<String, String> entry = iter.next();
+ String name = entry.getKey();
Object value = entry.getValue();
msg.putObjectProperty(name, value);
}
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java 2011-09-08
16:13:30 UTC (rev 11308)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java 2011-09-09
12:42:03 UTC (rev 11309)
@@ -122,14 +122,7 @@
public StompFrame handleReceipt(String receiptID)
{
StompFrame receipt = new StompFrame(Stomp.Responses.RECEIPT);
- try
- {
- receipt.addHeader(Stomp.Headers.Response.RECEIPT_ID, receiptID);
- }
- catch (HornetQStompException e)
- {
- return e.getFrame();
- }
+ receipt.addHeader(Stomp.Headers.Response.RECEIPT_ID, receiptID);
return receipt;
}
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java 2011-09-08
16:13:30 UTC (rev 11308)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java 2011-09-09
12:42:03 UTC (rev 11309)
@@ -75,9 +75,17 @@
else
{
//not valid
- response = new StompFrame(Stomp.Responses.ERROR);
+ response = new StompFrameV10(Stomp.Responses.ERROR);
response.addHeader(Stomp.Headers.Error.MESSAGE, "Failed to connect");
- response.setBody("The login account is not valid.");
+ try
+ {
+ response.setBody("The login account is not valid.");
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ log.error("Encoding problem", e);
+ //then we will send a null body message.
+ }
connection.sendFrame(response);
connection.destroy();
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameV10.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameV10.java 2011-09-08
16:13:30 UTC (rev 11308)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameV10.java 2011-09-09
12:42:03 UTC (rev 11309)
@@ -1,22 +1,28 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
package org.hornetq.core.protocol.stomp.v10;
-import org.hornetq.core.protocol.stomp.HornetQStompException;
import org.hornetq.core.protocol.stomp.StompFrame;
+/**
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ */
public class StompFrameV10 extends StompFrame
{
public StompFrameV10(String command)
{
super(command);
}
-
- @Override
- public void addHeader(String key, String val) throws HornetQStompException
- {
- //trimming
- String newKey = key.trim();
- String newVal = val.trim();
- super.addHeader(newKey, newVal);
- }
}
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2011-09-08
16:13:30 UTC (rev 11308)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2011-09-09
12:42:03 UTC (rev 11309)
@@ -17,12 +17,14 @@
import java.util.concurrent.atomic.AtomicLong;
import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.protocol.stomp.FrameEventListener;
import org.hornetq.core.protocol.stomp.HornetQStompException;
+import org.hornetq.core.protocol.stomp.SimpleBytes;
import org.hornetq.core.protocol.stomp.Stomp;
import org.hornetq.core.protocol.stomp.StompConnection;
import org.hornetq.core.protocol.stomp.StompDecoder;
@@ -69,7 +71,7 @@
connection.setClientID(clientID);
connection.setValid(true);
- response = new StompFrame(Stomp.Responses.CONNECTED);
+ response = new StompFrameV11(Stomp.Responses.CONNECTED);
// version
response.addHeader(Stomp.Headers.Connected.VERSION,
@@ -113,6 +115,10 @@
{
response = e.getFrame();
}
+ catch (UnsupportedEncodingException e)
+ {
+ response = new HornetQStompException("Encoding error.",
e).getFrame();
+ }
return response;
}
@@ -437,7 +443,7 @@
}
}
- public StompFrame createPingFrame()
+ public StompFrame createPingFrame() throws UnsupportedEncodingException
{
StompFrame frame = new StompFrame(Stomp.Commands.STOMP);
frame.setBody("\n");
@@ -480,7 +486,14 @@
public void run()
{
lastAccepted.set(System.currentTimeMillis());
- pingFrame = createPingFrame();
+ try
+ {
+ pingFrame = createPingFrame();
+ }
+ catch (UnsupportedEncodingException e1)
+ {
+ log.error("Cannot create ping frame due to encoding problem.",
e1);
+ }
synchronized (this)
{
@@ -880,6 +893,11 @@
{
decoder.contentLength = Integer.parseInt(headerValue);
}
+
+ if (decoder.headerName.equals(StompDecoder.CONTENT_TYPE_HEADER_NAME))
+ {
+ decoder.contentType = headerValue;
+ }
decoder.whiteSpaceOnly = true;
@@ -974,4 +992,5 @@
return null;
}
}
+
}
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameV11.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameV11.java 2011-09-08
16:13:30 UTC (rev 11308)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameV11.java 2011-09-09
12:42:03 UTC (rev 11309)
@@ -1,14 +1,38 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
package org.hornetq.core.protocol.stomp.v11;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
import org.hornetq.core.protocol.stomp.HornetQStompException;
+import org.hornetq.core.protocol.stomp.SimpleBytes;
+import org.hornetq.core.protocol.stomp.Stomp;
import org.hornetq.core.protocol.stomp.StompFrame;
+import org.hornetq.core.protocol.stomp.StompFrame.Header;
+/**
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ */
public class StompFrameV11 extends StompFrame
{
- public static final char ESC_CHAR = '\\';
- public static final char COLON = ':';
+ //stomp 1.1 talks about repetitive headers.
+ private List<Header> allHeaders = new ArrayList<Header>();
+ private String contentType;
public StompFrameV11(String command, Map<String, String> headers, byte[]
content)
{
@@ -19,76 +43,46 @@
{
super(command);
}
-
- public static String escaping(String rawString) throws HornetQStompException
+
+ @Override
+ public HornetQBuffer toHornetQBuffer() throws Exception
{
- int len = rawString.length();
-
- SimpleBytes sb = new SimpleBytes(1024);
-
- boolean beginEsc = false;
- for (int i = 0; i < len; i++)
+ if (buffer == null)
{
- char k = rawString.charAt(i);
+ buffer = HornetQBuffers.dynamicBuffer(bytesBody.length + 512);
- if (k == ESC_CHAR)
+ StringBuffer head = new StringBuffer();
+ head.append(command);
+ head.append(Stomp.NEWLINE);
+ // Output the headers.
+ for (Header h : allHeaders)
{
- if (beginEsc)
- {
- //it is a backslash
- sb.append('\\');
- beginEsc = false;
- }
- else
- {
- beginEsc = true;
- }
+ head.append(h.getEscapedKey());
+ head.append(Stomp.Headers.SEPARATOR);
+ head.append(h.getEscapedValue());
+ head.append(Stomp.NEWLINE);
}
- else if (k == 'n')
- {
- if (beginEsc)
- {
- //it is a newline
- sb.append('\n');
- beginEsc = false;
- }
- else
- {
- sb.append(k);
- }
- }
- else if (k == ':')
- {
- if (beginEsc)
- {
- sb.append(k);
- beginEsc = false;
- }
- else
- {
- //error
- throw new HornetQStompException("Colon not escaped!");
- }
- }
- else
- {
- if (beginEsc)
- {
- //error, no other escape defined.
- throw new HornetQStompException("Bad escape char found: " + k);
- }
- else
- {
- sb.append(k);
- }
- }
+ // Add a newline to separate the headers from the content.
+ head.append(Stomp.NEWLINE);
+
+ buffer.writeBytes(head.toString().getBytes("UTF-8"));
+ buffer.writeBytes(bytesBody);
+ buffer.writeBytes(END_OF_FRAME);
+
+ size = buffer.writerIndex();
}
- return sb.toString();
+ return buffer;
}
-
- public static void main(String[] args)
+
+ @Override
+ public void addHeader(String key, String val)
{
- String rawStr = "hello world\\n\\:"
+ if (!headers.containsKey(key))
+ {
+ headers.put(key, val);
+ }
+ allHeaders.add(new Header(key, val));
}
+
}