Author: gaohoward
Date: 2011-09-13 10:13:22 -0400 (Tue, 13 Sep 2011)
New Revision: 11341
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompVersions.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameV11.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractClientStompFrame.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV10.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
Log:
more tests
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java 2011-09-13
11:15:17 UTC (rev 11340)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java 2011-09-13
14:13:22 UTC (rev 11341)
@@ -20,6 +20,7 @@
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -184,13 +185,41 @@
return escape(val);
}
- private String escape(String str)
+ public static String escape(String str)
{
- str = str.replaceAll("\n", "\\n");
- str = str.replaceAll("\\", "\\\\");
- str = str.replaceAll(":", "\\:");
+ int len = str.length();
- return str;
+ char[] buffer = new char[2*len];
+ int iBuffer = 0;
+ for (int i = 0; i < len; i++)
+ {
+ char c = str.charAt(i);
+ if (c == '\n')
+ {
+ buffer[iBuffer++] = '\\';
+ buffer[iBuffer] = 'n';
+ }
+ else if (c == '\\')
+ {
+ buffer[iBuffer++] = '\\';
+ buffer[iBuffer] = '\\';
+ }
+ else if (c == ':')
+ {
+ buffer[iBuffer++] = '\\';
+ buffer[iBuffer] = ':';
+ }
+ else
+ {
+ buffer[iBuffer] = c;
+ }
+ iBuffer++;
+ }
+
+ char[] total = new char[iBuffer];
+ System.arraycopy(buffer, 0, total, 0, iBuffer);
+
+ return new String(total);
}
}
@@ -232,4 +261,9 @@
{
this.bytesBody = content;
}
+
+ public void setNeedsDisconnect(boolean b)
+ {
+ disconnect = b;
+ }
}
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompVersions.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompVersions.java 2011-09-13
11:15:17 UTC (rev 11340)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompVersions.java 2011-09-13
14:13:22 UTC (rev 11341)
@@ -21,6 +21,15 @@
*/
public enum StompVersions
{
- V1_0,
- V1_1
+ V1_0,
+ V1_1;
+
+ public String toString()
+ {
+ if (this == V1_0)
+ {
+ return "1.0";
+ }
+ return "1.1";
+ }
}
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java 2011-09-13
11:15:17 UTC (rev 11340)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java 2011-09-13
14:13:22 UTC (rev 11341)
@@ -15,6 +15,7 @@
import java.io.UnsupportedEncodingException;
import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.logging.Logger;
import org.hornetq.core.protocol.stomp.v10.StompFrameHandlerV10;
import org.hornetq.core.protocol.stomp.v11.StompFrameHandlerV11;
import org.hornetq.core.server.ServerMessage;
@@ -24,7 +25,9 @@
* @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
*/
public abstract class VersionedStompFrameHandler
-{
+{
+ private static final Logger log = Logger.getLogger(VersionedStompFrameHandler.class);
+
protected StompConnection connection;
public static VersionedStompFrameHandler getHandler(StompConnection connection,
StompVersions version)
@@ -93,9 +96,13 @@
response = onUnknown(request.getCommand());
}
- if (request.hasHeader(Stomp.Headers.RECEIPT_REQUESTED) && (response ==
null))
+ log.error("-------------------- handled " + request);
+
+ if (response == null)
{
- response = handleReceipt(request.getHeader(Stomp.Headers.RECEIPT_REQUESTED));
+ response = postprocess(request);
+
+ log.error("---------------postprocessed response: " + response);
}
return response;
@@ -126,6 +133,11 @@
return receipt;
}
+
+ public StompFrame postprocess(StompFrame request)
+ {
+ return null;
+ }
public abstract StompFrame createMessageFrame(ServerMessage serverMessage,
StompSubscription subscription, int deliveryCount) throws Exception;
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2011-09-13
11:15:17 UTC (rev 11340)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2011-09-13
14:13:22 UTC (rev 11341)
@@ -146,9 +146,25 @@
@Override
public StompFrame onDisconnect(StompFrame frame)
{
- connection.destroy();
+ log.error("----------------- frame: " + frame);
+
return null;
}
+
+ @Override
+ public StompFrame postprocess(StompFrame request)
+ {
+ StompFrame response = null;
+ if (request.hasHeader(Stomp.Headers.RECEIPT_REQUESTED))
+ {
+ response = handleReceipt(request.getHeader(Stomp.Headers.RECEIPT_REQUESTED));
+ if (request.getCommand().equals(Stomp.Commands.DISCONNECT))
+ {
+ response.setNeedsDisconnect(true);
+ }
+ }
+ return response;
+ }
@Override
public StompFrame onSend(StompFrame frame)
@@ -415,6 +431,8 @@
@Override
public void replySent(StompFrame reply)
{
+ log.error("----------------------- reply sent notified: " + reply);
+
if (reply.getCommand().equals(Stomp.Responses.CONNECTED))
{
//kick off the pinger
@@ -807,8 +825,10 @@
// Now the headers
boolean isEscaping = false;
- SimpleBytes holder = new SimpleBytes(1024);
+ SimpleBytes holder = new SimpleBytes(1024);
+ log.error("--------------------------------- Decoding command: " +
decoder.command);
+
outer: while (true)
{
byte b = decoder.workingBuffer[decoder.pos++];
@@ -887,6 +907,8 @@
}
holder.reset();
+ log.error("---------- A new header decoded: " +
decoder.headerName + " : " + headerValue);
+
decoder.headers.put(decoder.headerName, headerValue);
if
(decoder.headerName.equals(StompDecoder.CONTENT_LENGTH_HEADER_NAME))
@@ -914,6 +936,8 @@
decoder.whiteSpaceOnly = false;
decoder.headerValueWhitespace = false;
+
+ holder.append(b);
}
}
if (decoder.pos == decoder.data)
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameV11.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameV11.java 2011-09-13
11:15:17 UTC (rev 11340)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameV11.java 2011-09-13
14:13:22 UTC (rev 11341)
@@ -49,7 +49,14 @@
{
if (buffer == null)
{
- buffer = HornetQBuffers.dynamicBuffer(bytesBody.length + 512);
+ if (bytesBody != null)
+ {
+ buffer = HornetQBuffers.dynamicBuffer(bytesBody.length + 512);
+ }
+ else
+ {
+ buffer = HornetQBuffers.dynamicBuffer(512);
+ }
StringBuffer head = new StringBuffer();
head.append(command);
@@ -66,7 +73,11 @@
head.append(Stomp.NEWLINE);
buffer.writeBytes(head.toString().getBytes("UTF-8"));
- buffer.writeBytes(bytesBody);
+ if (bytesBody != null)
+ {
+ buffer.writeBytes(bytesBody);
+ }
+
buffer.writeBytes(END_OF_FRAME);
size = buffer.writerIndex();
Modified:
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractClientStompFrame.java
===================================================================
---
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractClientStompFrame.java 2011-09-13
11:15:17 UTC (rev 11340)
+++
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractClientStompFrame.java 2011-09-13
14:13:22 UTC (rev 11341)
@@ -64,10 +64,15 @@
sb.append(headers.get(i).key + ":" + headers.get(i).val +
"\n");
}
sb.append("\n");
- sb.append(body);
+ if (body != null)
+ {
+ sb.append(body);
+ }
sb.append((char)0);
String data = new String(sb.toString());
+
+ System.out.println("---------------------------full frame is : " +
data);
byte[] byteValue = data.getBytes("UTF-8");
ByteBuffer buffer = ByteBuffer.allocateDirect(byteValue.length);
Modified:
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java
===================================================================
---
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java 2011-09-13
11:15:17 UTC (rev 11340)
+++
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java 2011-09-13
14:13:22 UTC (rev 11341)
@@ -129,7 +129,6 @@
}
else
{
- System.out.println("Added to list: " + b);
receiveList.add(b);
}
}
@@ -180,11 +179,22 @@
public void connect() throws Exception
{
connect(null, null);
+ connected = true;
}
public void connect(String username, String password) throws Exception
{
throw new RuntimeException("connect method not implemented!");
}
+
+ public boolean isConnected()
+ {
+ return connected;
+ }
+
+ public String getVersion()
+ {
+ return version;
+ }
}
Modified:
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java
===================================================================
---
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java 2011-09-13
11:15:17 UTC (rev 11340)
+++
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java 2011-09-13
14:13:22 UTC (rev 11341)
@@ -29,6 +29,14 @@
void connect() throws Exception;
void disconnect() throws IOException, InterruptedException;
+
+ void connect(String defUser, String defPass) throws Exception;
+
+ boolean isConnected();
+
+ String getVersion();
+
+ ClientStompFrame createFrame(String command);
}
Modified:
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV10.java
===================================================================
---
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV10.java 2011-09-13
11:15:17 UTC (rev 11340)
+++
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV10.java 2011-09-13
14:13:22 UTC (rev 11341)
@@ -36,6 +36,8 @@
ClientStompFrame response = this.sendFrame(frame);
System.out.println("Got response : " + response);
+
+ connected = true;
}
@Override
@@ -45,5 +47,15 @@
this.sendFrame(frame);
close();
+
+ connected = false;
}
+
+ @Override
+ public ClientStompFrame createFrame(
+ String command)
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
}
Modified:
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java
===================================================================
---
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java 2011-09-13
11:15:17 UTC (rev 11340)
+++
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java 2011-09-13
14:13:22 UTC (rev 11341)
@@ -55,6 +55,7 @@
this.passcode = passcode;
this.connected = true;
}
+ connected = true;
}
public void connect1(String username, String passcode) throws IOException,
InterruptedException
@@ -85,11 +86,18 @@
public void disconnect() throws IOException, InterruptedException
{
ClientStompFrame frame = factory.newFrame(DISCONNECT_COMMAND);
- frame.addHeader(RECEIPT_HEADER, "77");
- this.sendFrame(frame);
+ ClientStompFrame result = this.sendFrame(frame);
close();
+
+ connected = false;
}
+ @Override
+ public ClientStompFrame createFrame(String command)
+ {
+ return new ClientStompFrameV11(command);
+ }
+
}
Modified:
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
===================================================================
---
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-09-13
11:15:17 UTC (rev 11340)
+++
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-09-13
14:13:22 UTC (rev 11341)
@@ -18,14 +18,89 @@
package org.hornetq.tests.integration.stomp.v11;
import org.hornetq.core.logging.Logger;
+import org.hornetq.tests.integration.stomp.util.ClientStompFrame;
import org.hornetq.tests.integration.stomp.util.StompClientConnection;
+import org.hornetq.tests.integration.stomp.util.StompClientConnectionFactory;
+
public class StompTestV11 extends StompTestBase2
{
private static final transient Logger log = Logger.getLogger(StompTestV11.class);
+ private StompClientConnection connV10;
+ private StompClientConnection connV11;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ connV10 = StompClientConnectionFactory.createClientConnection("1.0",
hostname, port);
+ connV11 = StompClientConnectionFactory.createClientConnection("1.1",
hostname, port);
+ }
+
+ protected void tearDown() throws Exception
+ {
+ if (connV10.isConnected())
+ {
+ connV10.disconnect();
+ }
+ if (connV11.isConnected())
+ {
+ connV11.disconnect();
+ }
+ super.tearDown();
+ }
+
public void testConnection() throws Exception
{
- StompClientConnection connection =
StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ StompClientConnection connection =
StompClientConnectionFactory.createClientConnection("1.0", hostname, port);
+
+ connection.connect(defUser, defPass);
+
+ assertTrue(connection.isConnected());
+
+ assertEquals("1.0", connection.getVersion());
+
+ connection.disconnect();
+
+ connection = StompClientConnectionFactory.createClientConnection("1.1",
hostname, port);
+
+ connection.connect(defUser, defPass);
+
+ assertTrue(connection.isConnected());
+
+ assertEquals("1.1", connection.getVersion());
+
+ connection.disconnect();
}
+
+ public void testNegotiation() throws Exception
+ {
+ ClientStompFrame frame = connV11.createFrame("CONNECT");
+ frame.addHeader("accept-version", "1.0,1.1");
+ frame.addHeader("host", "127.0.0.1");
+ frame.addHeader("login", this.defUser);
+ frame.addHeader("passcode", this.defPass);
+
+ ClientStompFrame reply = connV11.sendFrame(frame);
+
+ assertEquals("CONNECTED", reply.getCommand());
+
+ //reply headers: version, session, server
+ assertEquals("1.1", reply.getHeader("version"));
+
+ String sessionId = reply.getHeader("session");
+
+ log.info("session id: " + sessionId);
+
+ assertNotNull(sessionId);
+
+ String server = reply.getHeader("server");
+
+ log.info("server: " + server);
+
+ assertNotNull(server);
+
+ connV11.disconnect();
+
+ }
}