[hornetq-commits] JBoss hornetq SVN: r10142 - in branches/stomp_1_1: tests/src/org/hornetq/tests/integration/stomp and 1 other directory.
do-not-reply at jboss.org
do-not-reply at jboss.org
Mon Jan 24 22:17:51 EST 2011
Author: gurkapa
Date: 2011-01-24 22:17:50 -0500 (Mon, 24 Jan 2011)
New Revision: 10142
Modified:
branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/Stomp.java
branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompDecoder.java
branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompUtils.java
branches/stomp_1_1/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
Log:
https://issues.jboss.org/browse/HORNETQ-553 Implement STOMP 1.1 Specification
Implementation of the version negotiation introduced in 1.1 spec
Modified: branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/Stomp.java
===================================================================
--- branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/Stomp.java 2011-01-25 02:55:02 UTC (rev 10141)
+++ branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/Stomp.java 2011-01-25 03:17:50 UTC (rev 10142)
@@ -28,9 +28,18 @@
String NEWLINE = "\n";
+ public static interface Versions
+ {
+ String V10 = "1.0";
+
+ String V11 = "1.1";
+ }
+
public static interface Commands
{
String CONNECT = "CONNECT";
+
+ String STOMP = "STOMP";
String SEND = "SEND";
@@ -75,6 +84,8 @@
String TRANSACTION = "transaction";
String CONTENT_LENGTH = "content-length";
+
+ String CONTENT_TYPE = "content-type";
public interface Response
{
@@ -159,11 +170,17 @@
String CLIENT_ID = "client-id";
String REQUEST_ID = "request-id";
+
+ String ACCEPT_VERSION = "accept-version";
+
+ String HOST = "host";
}
public interface Error
{
String MESSAGE = "message";
+
+ String VERSION = "version";
}
public interface Connected
@@ -171,6 +188,8 @@
String SESSION = "session";
String RESPONSE_ID = "response-id";
+
+ String VERSION = "version";
}
public interface Ack
Modified: branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
--- branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompConnection.java 2011-01-25 02:55:02 UTC (rev 10141)
+++ branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompConnection.java 2011-01-25 03:17:50 UTC (rev 10142)
@@ -48,6 +48,8 @@
private String passcode;
private String clientID;
+
+ private String version;
private boolean valid;
@@ -309,6 +311,16 @@
this.valid = valid;
}
+ public void setVersion(String version)
+ {
+ this.version = version;
+ }
+
+ public String getVersion()
+ {
+ return version;
+ }
+
private void callFailureListeners(final HornetQException me)
{
final List<FailureListener> listenersClone = new ArrayList<FailureListener>(failureListeners);
Modified: branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompDecoder.java
===================================================================
--- branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompDecoder.java 2011-01-25 02:55:02 UTC (rev 10141)
+++ branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompDecoder.java 2011-01-25 03:17:50 UTC (rev 10142)
@@ -59,6 +59,10 @@
private static final String COMMAND_SEND = "SEND";
private static final int COMMAND_SEND_LENGTH = COMMAND_SEND.length();
+
+ private static final String COMMAND_STOMP = "STOMP";
+
+ private static final int COMMAND_STOMP_LENGTH = COMMAND_STOMP.length();
private static final String COMMAND_SUBSCRIBE = "SUBSCRIBE";
@@ -82,6 +86,8 @@
private static final byte S = (byte)'S';
+ private static final byte T = (byte)'T';
+
private static final byte U = (byte)'U';
private static final byte HEADER_SEPARATOR = (byte)':';
@@ -267,6 +273,16 @@
// SEND
command = COMMAND_SEND;
+ }
+ else if (workingBuffer[offset + 1] == T)
+ {
+ if (!tryIncrement(offset + COMMAND_STOMP_LENGTH + 1))
+ {
+ return null;
+ }
+
+ // STOMP
+ command = COMMAND_STOMP;
}
else
{
Modified: branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2011-01-25 02:55:02 UTC (rev 10141)
+++ branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2011-01-25 03:17:50 UTC (rev 10142)
@@ -50,12 +50,12 @@
class StompProtocolManager implements ProtocolManager
{
// Constants -----------------------------------------------------
-
+
private static final Logger log = Logger.getLogger(StompProtocolManager.class);
// TODO use same value than HornetQConnection
private static final String CONNECTION_ID_PROP = "__HQ_CID";
-
+
// Attributes ----------------------------------------------------
private final HornetQServer server;
@@ -178,7 +178,7 @@
StompFrame response = null;
- if (Stomp.Commands.CONNECT.equals(command))
+ if (Stomp.Commands.CONNECT.equals(command) || Stomp.Commands.STOMP.equals(command))
{
response = onConnect(request, conn);
}
@@ -577,6 +577,7 @@
String passcode = (String)headers.get(Stomp.Headers.Connect.PASSCODE);
String clientID = (String)headers.get(Stomp.Headers.Connect.CLIENT_ID);
String requestID = (String)headers.get(Stomp.Headers.Connect.REQUEST_ID);
+ String acceptVersion = (String)headers.get(Stomp.Headers.Connect.ACCEPT_VERSION);
HornetQSecurityManager sm = server.getSecurityManager();
@@ -585,11 +586,18 @@
{
sm.validateUser(login, passcode);
}
+
+ String version = negotiateVersion(acceptVersion);
connection.setLogin(login);
connection.setPasscode(passcode);
connection.setClientID(clientID);
connection.setValid(true);
+ if (version == null){
+ // client and server does not have any version in common. Return Error frame
+ return createNegotiationFailedFrame();
+ }
+ connection.setVersion(version);
HashMap<String, Object> h = new HashMap<String, Object>();
h.put(Stomp.Headers.Connected.SESSION, connection.getID());
@@ -597,9 +605,45 @@
{
h.put(Stomp.Headers.Connected.RESPONSE_ID, requestID);
}
+ if (acceptVersion != null)
+ {
+ // Only put this in header if we got a accept-version header.
+ h.put(Stomp.Headers.Connected.VERSION, version);
+ }
return new StompFrame(Stomp.Responses.CONNECTED, h);
}
+ private String negotiateVersion(String acceptVersion)
+ {
+ if (acceptVersion != null)
+ {
+ String bestVersion = null;
+ for(String v : acceptVersion.split(","))
+ {
+ if(Stomp.Versions.V11.equals(v.trim()))
+ {
+ bestVersion = Stomp.Versions.V11;
+ }
+ else if (Stomp.Versions.V10.equals(v.trim()) && bestVersion == null)
+ {
+ bestVersion = Stomp.Versions.V10;
+ }
+ }
+ return bestVersion;
+ }
+ return Stomp.Versions.V10;
+ }
+
+ private StompFrame createNegotiationFailedFrame() throws Exception
+ {
+ HashMap<String, Object> h = new HashMap<String, Object>();
+ h.put(Stomp.Headers.Error.VERSION, Stomp.Versions.V10 + "," + Stomp.Versions.V11);
+ h.put(Stomp.Headers.CONTENT_TYPE, "text/plain");
+
+ StringBuffer eMess = new StringBuffer("Supported protocol versions are " + Stomp.Versions.V10 + " " + Stomp.Versions.V11);
+ return new StompFrame(Stomp.Responses.ERROR, h, eMess.toString().getBytes("UTF-8"));
+ }
+
public void cleanup(StompConnection connection)
{
connection.setValid(false);
Modified: branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompUtils.java
===================================================================
--- branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompUtils.java 2011-01-25 02:55:02 UTC (rev 10141)
+++ branches/stomp_1_1/src/main/org/hornetq/core/protocol/stomp/StompUtils.java 2011-01-25 03:17:50 UTC (rev 10142)
@@ -130,6 +130,7 @@
headers.put(name.toString(), message.getObjectProperty(name));
}
}
+
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
Modified: branches/stomp_1_1/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
--- branches/stomp_1_1/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2011-01-25 02:55:02 UTC (rev 10141)
+++ branches/stomp_1_1/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2011-01-25 03:17:50 UTC (rev 10142)
@@ -88,7 +88,56 @@
Assert.assertTrue(f.startsWith("CONNECTED"));
Assert.assertTrue(f.indexOf("response-id:1") >= 0);
}
+
+ public void testV11Connect() throws Exception
+ {
+ String connect_frame = "CONNECT\n" + "login: brianm\n" +
+ "passcode: wombats\n" +
+ "request-id: 1\n" +
+ "accept-version: 1.0,1.1\n" +
+ "\n" +
+ Stomp.NULL;
+ sendFrame(connect_frame);
+ String f = receiveFrame(10000);
+ Assert.assertTrue(f.startsWith("CONNECTED"));
+ Assert.assertTrue(f.indexOf("response-id:1") >= 0);
+ Assert.assertTrue(f.indexOf("version:1.1") >= 0);
+ }
+
+ public void testConnectWithStomp() throws Exception
+ {
+ String connect_frame = "STOMP\n" + "login: brianm\n" +
+ "passcode: wombats\n" +
+ "request-id: 1\n" +
+ "accept-version: 1.0,1.1\n" +
+ "\n" +
+ Stomp.NULL;
+ sendFrame(connect_frame);
+
+ String f = receiveFrame(10000);
+ Assert.assertTrue(f.startsWith("CONNECTED"));
+ Assert.assertTrue(f.indexOf("response-id:1") >= 0);
+ Assert.assertTrue(f.indexOf("version:1.1") >= 0);
+ }
+
+ public void testProtocolNegotiationFail() throws Exception
+ {
+ String connect_frame = "CONNECT\n" + "login: brianm\n" +
+ "passcode: wombats\n" +
+ "request-id: 1\n" +
+ "accept-version: 1.2\n" +
+ "\n" +
+ Stomp.NULL;
+ sendFrame(connect_frame);
+
+ String f = receiveFrame(10000);
+ Assert.assertTrue(f.startsWith("ERROR"));
+ Assert.assertTrue(f.indexOf("version:1.0,1.1") >= 0);
+ Assert.assertTrue(f.indexOf("content-type:text/plain") >= 0);
+ Assert.assertTrue(f.indexOf("Supported protocol versions are") >= 0);
+ }
+
public void testDisconnectAndError() throws Exception
{
More information about the hornetq-commits
mailing list