Author: gaohoward
Date: 2011-09-04 20:34:29 -0400 (Sun, 04 Sep 2011)
New Revision: 11293
Added:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/HornetQStompException.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/Stomp.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompProtocolManager.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompUtils.java
Log:
curr work
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/HornetQStompException.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/HornetQStompException.java 2011-09-02
11:52:55 UTC (rev 11292)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/HornetQStompException.java 2011-09-05
00:34:29 UTC (rev 11293)
@@ -1,3 +1,15 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
package org.hornetq.core.protocol.stomp;
import java.util.ArrayList;
@@ -35,6 +47,11 @@
this.body = body;
}
+ public StompFrame getFrame()
+ {
+ return null;
+ }
+
private class Header
{
public String key;
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/Stomp.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/Stomp.java 2011-09-02
11:52:55 UTC (rev 11292)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/Stomp.java 2011-09-05
00:34:29 UTC (rev 11293)
@@ -40,12 +40,6 @@
String UNSUBSCRIBE = "UNSUBSCRIBE";
- String BEGIN_TRANSACTION = "BEGIN";
-
- String COMMIT_TRANSACTION = "COMMIT";
-
- String ABORT_TRANSACTION = "ABORT";
-
String BEGIN = "BEGIN";
String COMMIT = "COMMIT";
@@ -53,6 +47,11 @@
String ABORT = "ABORT";
String ACK = "ACK";
+
+ //1.1
+ String NACK = "NACK";
+
+ String STOMP = "STOMP";
}
public interface Responses
@@ -76,6 +75,10 @@
String CONTENT_LENGTH = "content-length";
+ String ACCEPT_VERSION = "accept-version";
+
+ String CONTENT_TYPE = "content-type";
+
public interface Response
{
String RECEIPT_ID = "receipt-id";
@@ -140,6 +143,8 @@
String AUTO = "auto";
String CLIENT = "client";
+
+ String CLIENT_INDIVIDUAL = "client-individual";
}
}
@@ -167,7 +172,11 @@
public interface Error
{
+ //1.0 only
String MESSAGE = "message";
+
+ //1.1
+ String VERSION = "version";
}
public interface Connected
@@ -175,11 +184,19 @@
String SESSION = "session";
String RESPONSE_ID = "response-id";
+
+ //1.1
+ String VERSION = "version";
+
+ String SERVER = "server";
}
public interface Ack
{
String MESSAGE_ID = "message-id";
+
+ //1.1
+ String SUBSCRIPTION = "subscription";
}
}
}
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java 2011-09-02
11:52:55 UTC (rev 11292)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java 2011-09-05
00:34:29 UTC (rev 11293)
@@ -27,6 +27,8 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.CloseListener;
import org.hornetq.core.remoting.FailureListener;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.spi.core.remoting.Connection;
@@ -39,8 +41,9 @@
*/
public class StompConnection implements RemotingConnection
{
-
private static final Logger log = Logger.getLogger(StompConnection.class);
+
+ protected static final String CONNECTION_ID_PROP = "__HQ_CID";
private final StompProtocolManager manager;
@@ -68,7 +71,11 @@
private volatile boolean dataReceived;
- private StompVersions version = StompVersions.V1_0;
+ private StompVersions version;
+
+ private VersionedStompFrameHandler frameHandler;
+
+ private boolean initialized;
public StompDecoder getDecoder()
{
@@ -199,10 +206,6 @@
manager.cleanup(this);
}
- public void disconnect()
- {
- }
-
public void fail(final HornetQException me)
{
synchronized (failLock)
@@ -358,8 +361,10 @@
* accept-version value takes form of "v1,v2,v3..."
* we need to return the highest supported version
*/
- public void negotiateVersion(String acceptVersion) throws HornetQStompException
+ public void negotiateVersion(StompFrame frame) throws HornetQStompException
{
+ String acceptVersion = frame.getHeader(Stomp.Headers.ACCEPT_VERSION);
+
if (acceptVersion == null)
{
this.version = StompVersions.V1_0;
@@ -391,6 +396,9 @@
throw error;
}
}
+
+ this.frameHandler = VersionedStompFrameHandler.getHandler(this, this.version);
+ this.initialized = true;
}
//reject if the host doesn't match
@@ -411,4 +419,251 @@
throw error;
}
}
+
+ public void handleFrame(StompFrame request)
+ {
+ StompFrame reply = null;
+ try
+ {
+ if (!initialized)
+ {
+ if (!Stomp.Commands.CONNECT.equals(request.getCommand()))
+ {
+ throw new HornetQStompException("Connection hasn't been
established.");
+ }
+ //decide version
+ negotiateVersion(request);
+ }
+ reply = frameHandler.handleFrame(request);
+ }
+ catch (HornetQStompException e)
+ {
+ reply = e.getFrame();
+ }
+
+ if (reply != null)
+ {
+ sendFrame(reply);
+ }
+ }
+
+ public void sendFrame(StompFrame frame)
+ {
+ manager.sendReply(this, frame);
+ }
+
+ public boolean validateUser(String login, String passcode)
+ {
+ this.valid = manager.validateUser(login, passcode);
+ if (valid)
+ {
+ this.login = login;
+ this.passcode = passcode;
+ }
+ return valid;
+ }
+
+ public ServerMessageImpl createServerMessage()
+ {
+ return manager.createServerMessage();
+ }
+
+ public StompSession getSession(String txID) throws HornetQStompException
+ {
+ StompSession session = null;
+ try
+ {
+ if (txID == null)
+ {
+ session = manager.getSession(this);
+ }
+ else
+ {
+ session = manager.getTransactedSession(this, txID);
+ }
+ }
+ catch (Exception e)
+ {
+ throw new HornetQStompException("Exception getting session", e);
+ }
+
+ return session;
+ }
+
+ public void validate() throws HornetQStompException
+ {
+ if (!this.valid)
+ {
+ throw new HornetQStompException("Connection is not valid.");
+ }
+ }
+
+ public void sendServerMessage(ServerMessageImpl message, String txID) throws
HornetQStompException
+ {
+ StompSession stompSession = getSession(txID);
+
+ if (stompSession.isNoLocal())
+ {
+ message.putStringProperty(CONNECTION_ID_PROP, getID().toString());
+ }
+ try
+ {
+ stompSession.getSession().send(message, true);
+ }
+ catch (Exception e)
+ {
+ throw new HornetQStompException("Error sending message " + message,
e);
+ }
+ }
+
+ @Override
+ public void disconnect()
+ {
+ destroy();
+ }
+
+ public void beginTransaction(String txID) throws HornetQStompException
+ {
+ try
+ {
+ manager.beginTransaction(this, txID);
+ }
+ catch (HornetQStompException e)
+ {
+ throw e;
+ }
+ catch (Exception e)
+ {
+ throw new HornetQStompException("Error beginning a transaction: " +
txID, e);
+ }
+ }
+
+ public void commitTransaction(String txID) throws HornetQStompException
+ {
+ try
+ {
+ manager.commitTransaction(this, txID);
+ }
+ catch (HornetQStompException e)
+ {
+ throw e;
+ }
+ catch (Exception e)
+ {
+ throw new HornetQStompException("Error committing " + txID, e);
+ }
+ }
+
+ public void abortTransaction(String txID) throws HornetQStompException
+ {
+ try
+ {
+ manager.abortTransaction(this, txID);
+ }
+ catch (HornetQStompException e)
+ {
+ throw e;
+ }
+ catch (Exception e)
+ {
+ throw new HornetQStompException("Error aborting " + txID, e);
+ }
+ }
+
+ public void subscribe(String destination, String selector, String ack,
+ String id, String durableSubscriptionName, boolean noLocal) throws
HornetQStompException
+ {
+ if (noLocal)
+ {
+ String noLocalFilter = CONNECTION_ID_PROP + " <> '" +
getID().toString() + "'";
+ if (selector == null)
+ {
+ selector = noLocalFilter;
+ }
+ else
+ {
+ selector += " AND " + noLocalFilter;
+ }
+ }
+ if (ack == null)
+ {
+ ack = Stomp.Headers.Subscribe.AckModeValues.AUTO;
+ }
+
+ String subscriptionID = null;
+ if (id != null)
+ {
+ subscriptionID = id;
+ }
+ else
+ {
+ if (destination == null)
+ {
+ throw new HornetQStompException("Client must set destination or id
header to a SUBSCRIBE command");
+ }
+ subscriptionID = "subscription/" + destination;
+ }
+
+ try
+ {
+ manager.createSubscription(this, subscriptionID, durableSubscriptionName,
destination, selector, ack, noLocal);
+ }
+ catch (HornetQStompException e)
+ {
+ throw e;
+ }
+ catch (Exception e)
+ {
+ throw new HornetQStompException("Error creating subscription " +
subscriptionID, e);
+ }
+ }
+
+ public void unsubscribe(String subscriptionID) throws HornetQStompException
+ {
+ try
+ {
+ manager.unsubscribe(this, subscriptionID);
+ }
+ catch (HornetQStompException e)
+ {
+ throw e;
+ }
+ catch (Exception e)
+ {
+ throw new HornetQStompException("Error unsubscripting " +
subscriptionID, e);
+ }
+ }
+
+ public void acknowledge(String messageID, String subscriptionID) throws
HornetQStompException
+ {
+ try
+ {
+ manager.acknowledge(this, messageID, subscriptionID);
+ }
+ catch (HornetQStompException e)
+ {
+ throw e;
+ }
+ catch (Exception e)
+ {
+ throw new HornetQStompException("Error acknowledging message " +
messageID, e);
+ }
+ }
+
+ public String getVersion()
+ {
+ return String.valueOf(version);
+ }
+
+ public String getHornetQServerName()
+ {
+ //hard coded, review later.
+ return "HornetQ/2.2.5 HornetQ Messaging Engine";
+ }
+
+ public StompFrame createStompMessage(ServerMessage serverMessage,
+ StompSubscription subscription, int deliveryCount)
+ {
+ return frameHandler.createMessageFrame(serverMessage, subscription,
deliveryCount);
+ }
}
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java 2011-09-02
11:52:55 UTC (rev 11292)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java 2011-09-05
00:34:29 UTC (rev 11293)
@@ -17,6 +17,10 @@
*/
package org.hornetq.core.protocol.stomp;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -31,7 +35,7 @@
* @author Tim Fox
*
*/
-class StompFrame
+public class StompFrame
{
private static final Logger log = Logger.getLogger(StompFrame.class);
@@ -39,45 +43,30 @@
private static final byte[] END_OF_FRAME = new byte[] { 0, '\n' };
- private final String command;
+ private String command;
- private final Map<String, Object> headers;
+ private Map<String, String> headers;
+
+ //stomp 1.1 talks about repetitive headers.
+ private List<Header> allHeaders = new ArrayList<Header>();
- private final byte[] content;
+ private String body;
private HornetQBuffer buffer = null;
private int size;
- public StompFrame(String command, Map<String, Object> headers, byte[] data)
+ public StompFrame(String command)
{
this.command = command;
- this.headers = headers;
- this.content = data;
+ this.headers = new LinkedHashMap<String, String>();
}
- public StompFrame(String command, Map<String, Object> headers)
- {
- this.command = command;
- this.headers = headers;
- this.content = NO_DATA;
- }
-
public String getCommand()
{
return command;
}
- public byte[] getContent()
- {
- return content;
- }
-
- public Map<String, Object> getHeaders()
- {
- return headers;
- }
-
public int getEncodedSize() throws Exception
{
if (buffer == null)
@@ -90,18 +79,18 @@
@Override
public String toString()
{
- return "StompFrame[command=" + command + ", headers=" + headers
+ ", content-length=" + content.length + "]";
+ return "StompFrame[command=" + command + ", headers=" + headers
+ ", content-length=";
}
public String asString()
{
String out = command + '\n';
- for (Entry<String, Object> header : headers.entrySet())
+ for (Entry<String, String> header : headers.entrySet())
{
out += header.getKey() + ": " + header.getValue() + '\n';
}
out += '\n';
- out += new String(content);
+ out += body;
return out;
}
@@ -116,7 +105,7 @@
head.append(command);
head.append(Stomp.NEWLINE);
// Output the headers.
- for (Map.Entry<String, Object> header : headers.entrySet())
+ for (Map.Entry<String, String> header : headers.entrySet())
{
head.append(header.getKey());
head.append(Stomp.Headers.SEPARATOR);
@@ -134,4 +123,60 @@
}
return buffer;
}
+
+ public String getHeader(String key)
+ {
+ return headers.get(key);
+ }
+
+ public void addHeader(String key, String val)
+ {
+ if (!headers.containsKey(key))
+ {
+ headers.put(key, val);
+ }
+ allHeaders.add(new Header(key, val));
+ }
+
+ public Map<String, String> getHeadersMap()
+ {
+ return headers;
+ }
+
+ private class Header
+ {
+ public String key;
+ public String val;
+
+ public Header(String key, String val)
+ {
+ this.key = key;
+ this.val = val;
+ }
+ }
+
+ public void setBody(String body)
+ {
+ this.body = body;
+ }
+
+ public boolean hasHeader(String key)
+ {
+ return headers.containsKey(key);
+ }
+
+ public String getBody()
+ {
+ return body;
+ }
+
+ //Since 1.1, there is a content-type header that needs to take care of
+ public byte[] getBodyAsBytes() throws UnsupportedEncodingException
+ {
+ if (body != null)
+ {
+ return body.getBytes("UTF-8");
+ }
+ return new byte[0];
+ }
}
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2011-09-02
11:52:55 UTC (rev 11292)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2011-09-05
00:34:29 UTC (rev 11293)
@@ -13,10 +13,6 @@
package org.hornetq.core.protocol.stomp;
-import java.io.ByteArrayOutputStream;
-import java.io.OutputStreamWriter;
-import java.io.PrintWriter;
-import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -27,8 +23,6 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Interceptor;
-import org.hornetq.api.core.Message;
-import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.logging.Logger;
@@ -53,9 +47,6 @@
private static final Logger log = Logger.getLogger(StompProtocolManager.class);
- // TODO use same value than HornetQConnection
- private static final String CONNECTION_ID_PROP = "__HQ_CID";
-
// Attributes ----------------------------------------------------
private final HornetQServer server;
@@ -69,36 +60,6 @@
// Static --------------------------------------------------------
- private static StompFrame createError(Exception e, StompFrame request)
- {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- try
- {
- // Let the stomp client know about any protocol errors.
- PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos,
"UTF-8"));
- e.printStackTrace(stream);
- stream.close();
-
- Map<String, Object> headers = new HashMap<String, Object>();
- headers.put(Stomp.Headers.Error.MESSAGE, e.getMessage());
-
- final String receiptId =
(String)request.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
- if (receiptId != null)
- {
- headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
- }
-
- byte[] payload = baos.toByteArray();
- headers.put(Stomp.Headers.CONTENT_LENGTH, payload.length);
- return new StompFrame(Stomp.Responses.ERROR, headers, payload);
- }
- catch (UnsupportedEncodingException ex)
- {
- log.warn("Unable to create ERROR frame from the exception", ex);
- return null;
- }
- }
-
// Constructors --------------------------------------------------
public StompProtocolManager(final HornetQServer server, final List<Interceptor>
interceptors)
@@ -143,19 +104,15 @@
public void handleBuffer(final RemotingConnection connection, final HornetQBuffer
buffer)
{
- long start = System.nanoTime();
StompConnection conn = (StompConnection)connection;
conn.setDataReceived();
StompDecoder decoder = conn.getDecoder();
-
- // log.info("in handle");
do
{
StompFrame request;
-
try
{
request = decoder.decode(buffer);
@@ -163,7 +120,6 @@
catch (Exception e)
{
log.error("Failed to decode", e);
-
return;
}
@@ -174,93 +130,13 @@
try
{
- String command = request.getCommand();
-
- StompFrame response = null;
-
- if (Stomp.Commands.CONNECT.equals(command))
- {
- response = onConnect(request, conn);
- }
- else if (Stomp.Commands.DISCONNECT.equals(command))
- {
- response = onDisconnect(request, conn);
- }
- else if (Stomp.Commands.SEND.equals(command))
- {
- response = onSend(request, conn);
- }
- else if (Stomp.Commands.SUBSCRIBE.equals(command))
- {
- response = onSubscribe(request, conn);
- }
- else if (Stomp.Commands.UNSUBSCRIBE.equals(command))
- {
- response = onUnsubscribe(request, conn);
- }
- else if (Stomp.Commands.ACK.equals(command))
- {
- response = onAck(request, conn);
- }
- else if (Stomp.Commands.BEGIN.equals(command))
- {
- response = onBegin(request, server, conn);
- }
- else if (Stomp.Commands.COMMIT.equals(command))
- {
- response = onCommit(request, conn);
- }
- else if (Stomp.Commands.ABORT.equals(command))
- {
- response = onAbort(request, conn);
- }
- else
- {
- log.error("Unsupported Stomp frame: " + request);
- response = new StompFrame(Stomp.Responses.ERROR,
- new HashMap<String, Object>(),
- ("Unsupported frame: " +
command).getBytes());
- }
-
- if (request.getHeaders().containsKey(Stomp.Headers.RECEIPT_REQUESTED))
- {
- if (response == null)
- {
- Map<String, Object> h = new HashMap<String, Object>();
- response = new StompFrame(Stomp.Responses.RECEIPT, h);
- }
- response.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID,
-
request.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED));
- }
-
- if (response != null)
- {
- sendReply(conn, response);
- }
-
- if (Stomp.Commands.DISCONNECT.equals(command))
- {
- conn.destroy();
- }
+ conn.handleFrame(request);
}
- catch (Exception e)
- {
- e.printStackTrace();
- StompFrame error = createError(e, request);
- if (error != null)
- {
- sendReply(conn, error);
- }
- }
finally
{
server.getStorageManager().clearContext();
}
} while (decoder.hasBytes());
-
- long end = System.nanoTime();
-
- // log.info("handle took " + (end-start));
}
// Public --------------------------------------------------------
@@ -297,182 +173,8 @@
// Private -------------------------------------------------------
- private StompFrame onSubscribe(StompFrame frame, StompConnection connection) throws
Exception
+ public StompSession getSession(StompConnection connection) throws Exception
{
- Map<String, Object> headers = frame.getHeaders();
- String destination = (String)headers.get(Stomp.Headers.Subscribe.DESTINATION);
- String selector = (String)headers.get(Stomp.Headers.Subscribe.SELECTOR);
- String ack = (String)headers.get(Stomp.Headers.Subscribe.ACK_MODE);
- String id = (String)headers.get(Stomp.Headers.Subscribe.ID);
- String durableSubscriptionName =
(String)headers.get(Stomp.Headers.Subscribe.DURABLE_SUBSCRIBER_NAME);
- boolean noLocal = false;
- if (headers.containsKey(Stomp.Headers.Subscribe.NO_LOCAL))
- {
- noLocal =
Boolean.parseBoolean((String)headers.get(Stomp.Headers.Subscribe.NO_LOCAL));
- }
- if (noLocal)
- {
- String noLocalFilter = CONNECTION_ID_PROP + " <> '" +
connection.getID().toString() + "'";
- if (selector == null)
- {
- selector = noLocalFilter;
- }
- else
- {
- selector += " AND " + noLocalFilter;
- }
- }
- if (ack == null)
- {
- ack = Stomp.Headers.Subscribe.AckModeValues.AUTO;
- }
- String subscriptionID = null;
- if (id != null)
- {
- subscriptionID = id;
- }
- else
- {
- if (destination == null)
- {
- throw new StompException("Client must set destination or id header to a
SUBSCRIBE command");
- }
- subscriptionID = "subscription/" + destination;
- }
- StompSession stompSession = getSession(connection);
- stompSession.setNoLocal(noLocal);
- if (stompSession.containsSubscription(subscriptionID))
- {
- throw new StompException("There already is a subscription for: " +
subscriptionID +
- ". Either use unique subscription IDs or do not
create multiple subscriptions for the same destination");
- }
- long consumerID = server.getStorageManager().generateUniqueID();
- String clientID = (connection.getClientID() != null) ? connection.getClientID() :
null;
- stompSession.addSubscription(consumerID,
- subscriptionID,
- clientID,
- durableSubscriptionName,
- destination,
- selector,
- ack);
-
- return null;
- }
-
- private StompFrame onUnsubscribe(StompFrame frame, StompConnection connection) throws
Exception
- {
- Map<String, Object> headers = frame.getHeaders();
- String destination = (String)headers.get(Stomp.Headers.Unsubscribe.DESTINATION);
- String id = (String)headers.get(Stomp.Headers.Unsubscribe.ID);
-
- String subscriptionID = null;
- if (id != null)
- {
- subscriptionID = id;
- }
- else
- {
- if (destination == null)
- {
- throw new StompException("Must specify the subscription's id or the
destination you are unsubscribing from");
- }
- subscriptionID = "subscription/" + destination;
- }
-
- StompSession stompSession = getSession(connection);
- boolean unsubscribed = stompSession.unsubscribe(subscriptionID);
- if (!unsubscribed)
- {
- throw new StompException("Cannot unsubscribe as no subscription exists for
id: " + subscriptionID);
- }
- return null;
- }
-
- private StompFrame onAck(StompFrame frame, StompConnection connection) throws
Exception
- {
- Map<String, Object> headers = frame.getHeaders();
- String messageID = (String)headers.get(Stomp.Headers.Ack.MESSAGE_ID);
- String txID = (String)headers.get(Stomp.Headers.TRANSACTION);
- StompSession stompSession = null;
- if (txID != null)
- {
- log.warn("Transactional acknowledgement is not supported");
- }
- stompSession = getSession(connection);
- stompSession.acknowledge(messageID);
-
- return null;
- }
-
- private StompFrame onBegin(StompFrame frame, HornetQServer server, StompConnection
connection) throws Exception
- {
- Map<String, Object> headers = frame.getHeaders();
- String txID = (String)headers.get(Stomp.Headers.TRANSACTION);
- if (txID == null)
- {
- throw new StompException("transaction header is mandatory to BEGIN a
transaction");
- }
- if (transactedSessions.containsKey(txID))
- {
- throw new StompException("Transaction already started: " + txID);
- }
- // create the transacted session
- getTransactedSession(connection, txID);
-
- return null;
- }
-
- private StompFrame onCommit(StompFrame frame, StompConnection connection) throws
Exception
- {
- Map<String, Object> headers = frame.getHeaders();
- String txID = (String)headers.get(Stomp.Headers.TRANSACTION);
- if (txID == null)
- {
- throw new StompException("transaction header is mandatory to COMMIT a
transaction");
- }
-
- StompSession session = getTransactedSession(connection, txID);
- if (session == null)
- {
- throw new StompException("No transaction started: " + txID);
- }
- transactedSessions.remove(txID);
- session.getSession().commit();
-
- return null;
- }
-
- private StompFrame onAbort(StompFrame frame, StompConnection connection) throws
Exception
- {
- Map<String, Object> headers = frame.getHeaders();
- String txID = (String)headers.get(Stomp.Headers.TRANSACTION);
- if (txID == null)
- {
- throw new StompException("transaction header is mandatory to ABORT a
transaction");
- }
-
- StompSession session = getTransactedSession(connection, txID);
-
- if (session == null)
- {
- throw new StompException("No transaction started: " + txID);
- }
- transactedSessions.remove(txID);
- session.getSession().rollback(false);
-
- return null;
- }
-
- private void checkConnected(StompConnection connection) throws StompException
- {
- if (!connection.isValid())
- {
- throw new StompException("Not connected");
- }
- }
-
- private StompSession getSession(StompConnection connection) throws Exception
- {
StompSession stompSession = sessions.get(connection.getID());
if (stompSession == null)
{
@@ -497,7 +199,7 @@
return stompSession;
}
- private StompSession getTransactedSession(StompConnection connection, String txID)
throws Exception
+ public StompSession getTransactedSession(StompConnection connection, String txID)
throws Exception
{
StompSession stompSession = transactedSessions.get(txID);
if (stompSession == null)
@@ -522,89 +224,6 @@
return stompSession;
}
- private StompFrame onDisconnect(StompFrame frame, StompConnection connection) throws
Exception
- {
- cleanup(connection);
- return null;
- }
-
- private StompFrame onSend(StompFrame frame, StompConnection connection) throws
Exception
- {
- checkConnected(connection);
- Map<String, Object> headers = frame.getHeaders();
- String destination = (String)headers.remove(Stomp.Headers.Send.DESTINATION);
- String txID = (String)headers.remove(Stomp.Headers.TRANSACTION);
- long timestamp = System.currentTimeMillis();
-
- ServerMessageImpl message = new
ServerMessageImpl(server.getStorageManager().generateUniqueID(), 512);
- message.setTimestamp(timestamp);
- message.setAddress(SimpleString.toSimpleString(destination));
- StompUtils.copyStandardHeadersFromFrameToMessage(frame, message);
- if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH))
- {
- message.setType(Message.BYTES_TYPE);
- message.getBodyBuffer().writeBytes(frame.getContent());
- }
- else
- {
- message.setType(Message.TEXT_TYPE);
- String text = new String(frame.getContent(), "UTF-8");
-
message.getBodyBuffer().writeNullableSimpleString(SimpleString.toSimpleString(text));
- }
-
- StompSession stompSession = null;
- if (txID == null)
- {
- stompSession = getSession(connection);
- }
- else
- {
- stompSession = getTransactedSession(connection, txID);
- }
- if (stompSession.isNoLocal())
- {
- message.putStringProperty(CONNECTION_ID_PROP, connection.getID().toString());
- }
- stompSession.getSession().send(message, true);
-
- return null;
- }
-
- private StompFrame onConnect(StompFrame frame, final StompConnection connection)
throws Exception
- {
- Map<String, Object> headers = frame.getHeaders();
- String login = (String)headers.get(Stomp.Headers.Connect.LOGIN);
- String passcode = (String)headers.get(Stomp.Headers.Connect.PASSCODE);
- String clientID = (String)headers.get(Stomp.Headers.Connect.CLIENT_ID);
- String requestID = (String)headers.get(Stomp.Headers.Connect.REQUEST_ID);
- //since 1.1
- String acceptVersion = (String)headers.get(Stomp.Headers.Connect.ACCEPT_VERSION);
- String host = (String)headers.get(Stomp.Headers.Connect.HOST);
-
- HornetQSecurityManager sm = server.getSecurityManager();
-
- // The sm will be null case security is not enabled...
- if (sm != null)
- {
- sm.validateUser(login, passcode);
- }
-
- connection.negotiateVersion(acceptVersion);
- connection.setHost(host);
- connection.setLogin(login);
- connection.setPasscode(passcode);
- connection.setClientID(clientID);
- connection.setValid(true);
-
- HashMap<String, Object> h = new HashMap<String, Object>();
- h.put(Stomp.Headers.Connected.SESSION, connection.getID());
- if (requestID != null)
- {
- h.put(Stomp.Headers.Connected.RESPONSE_ID, requestID);
- }
- return new StompFrame(Stomp.Responses.CONNECTED, h);
- }
-
public void cleanup(final StompConnection connection)
{
connection.setValid(false);
@@ -652,15 +271,18 @@
});
}
- private void sendReply(final StompConnection connection, final StompFrame frame)
+ public void sendReply(final StompConnection connection, final StompFrame frame)
{
server.getStorageManager().afterCompleteOperations(new IOAsyncTask()
{
public void onError(final int errorCode, final String errorMessage)
{
log.warn("Error processing IOCallback code = " + errorCode + "
message = " + errorMessage);
+
+ HornetQStompException e = new HornetQStompException("Error sending
reply",
+ new HornetQException(errorCode, errorMessage));
- StompFrame error = createError(new HornetQException(errorCode, errorMessage),
frame);
+ StompFrame error = e.getFrame();
send(connection, error);
}
@@ -681,5 +303,95 @@
return "hornetq";
}
+ public boolean validateUser(String login, String passcode)
+ {
+ boolean validated = true;
+
+ HornetQSecurityManager sm = server.getSecurityManager();
+
+ // The sm will be null case security is not enabled...
+ if (sm != null)
+ {
+ validated = sm.validateUser(login, passcode);
+ }
+
+ return validated;
+ }
+
+ public ServerMessageImpl createServerMessage()
+ {
+ return new ServerMessageImpl(server.getStorageManager().generateUniqueID(), 512);
+ }
+
+ public void commitTransaction(StompConnection connection, String txID) throws
Exception
+ {
+ StompSession session = getTransactedSession(connection, txID);
+ if (session == null)
+ {
+ throw new HornetQStompException("No transaction started: " + txID);
+ }
+ transactedSessions.remove(txID);
+ session.getSession().commit();
+ }
+
+ public void abortTransaction(StompConnection connection, String txID) throws
Exception
+ {
+ StompSession session = getTransactedSession(connection, txID);
+ if (session == null)
+ {
+ throw new HornetQStompException("No transaction started: " + txID);
+ }
+ transactedSessions.remove(txID);
+ session.getSession().rollback(false);
+ }
// Inner classes -------------------------------------------------
+
+ public void createSubscription(StompConnection connection,
+ String subscriptionID, String durableSubscriptionName,
+ String destination, String selector, String ack, boolean noLocal) throws
Exception
+ {
+ StompSession stompSession = getSession(connection);
+ stompSession.setNoLocal(noLocal);
+ if (stompSession.containsSubscription(subscriptionID))
+ {
+ throw new HornetQStompException("There already is a subscription for:
" + subscriptionID +
+ ". Either use unique subscription IDs or do not
create multiple subscriptions for the same destination");
+ }
+ long consumerID = server.getStorageManager().generateUniqueID();
+ String clientID = (connection.getClientID() != null) ? connection.getClientID() :
null;
+ stompSession.addSubscription(consumerID,
+ subscriptionID,
+ clientID,
+ durableSubscriptionName,
+ destination,
+ selector,
+ ack);
+ }
+
+ public void unsubscribe(StompConnection connection,
+ String subscriptionID) throws Exception
+ {
+ StompSession stompSession = getSession(connection);
+ boolean unsubscribed = stompSession.unsubscribe(subscriptionID);
+ if (!unsubscribed)
+ {
+ throw new HornetQStompException("Cannot unsubscribe as no subscription
exists for id: " + subscriptionID);
+ }
+ }
+
+ public void acknowledge(StompConnection connection, String messageID, String
subscriptionID) throws Exception
+ {
+ StompSession stompSession = getSession(connection);
+ stompSession.acknowledge(messageID, subscriptionID);
+ }
+
+ public void beginTransaction(StompConnection connection, String txID) throws
Exception
+ {
+ if (transactedSessions.containsKey(txID))
+ {
+ throw new HornetQStompException("Transaction already started: " +
txID);
+ }
+ // create the transacted session
+ getTransactedSession(connection, txID);
+ }
}
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java 2011-09-02
11:52:55 UTC (rev 11292)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java 2011-09-05
00:34:29 UTC (rev 11293)
@@ -12,7 +12,6 @@
*/
package org.hornetq.core.protocol.stomp;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
@@ -39,7 +38,7 @@
*
* @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
*/
-class StompSession implements SessionCallback
+public class StompSession implements SessionCallback
{
private static final Logger log = Logger.getLogger(StompSession.class);
@@ -84,41 +83,9 @@
try
{
StompSubscription subscription = subscriptions.get(consumerID);
-
- Map<String, Object> headers = new HashMap<String, Object>();
- headers.put(Stomp.Headers.Message.DESTINATION,
serverMessage.getAddress().toString());
- if (subscription.getID() != null)
- {
- headers.put(Stomp.Headers.Message.SUBSCRIPTION, subscription.getID());
- }
- HornetQBuffer buffer = serverMessage.getBodyBuffer();
-
- int bodyPos = serverMessage.getEndOfBodyPosition() == -1 ? buffer.writerIndex()
- :
serverMessage.getEndOfBodyPosition();
- int size = bodyPos - buffer.readerIndex();
- buffer.readerIndex(MessageImpl.BUFFER_HEADER_SPACE + DataConstants.SIZE_INT);
- byte[] data = new byte[size];
- if (serverMessage.containsProperty(Stomp.Headers.CONTENT_LENGTH) ||
serverMessage.getType() == Message.BYTES_TYPE)
- {
- headers.put(Headers.CONTENT_LENGTH, data.length);
- buffer.readBytes(data);
- }
- else
- {
- SimpleString text = buffer.readNullableSimpleString();
- if (text != null)
- {
- data = text.toString().getBytes("UTF-8");
- }
- else
- {
- data = new byte[0];
- }
- }
- serverMessage.getBodyBuffer().resetReaderIndex();
- StompFrame frame = new StompFrame(Stomp.Responses.MESSAGE, headers, data);
- StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame,
deliveryCount);
-
+
+ StompFrame frame = connection.createStompMessage(serverMessage, subscription,
deliveryCount);
+
if (subscription.getAck().equals(Stomp.Headers.Subscribe.AckModeValues.AUTO))
{
session.acknowledge(consumerID, serverMessage.getMessageID());
@@ -168,10 +135,19 @@
connection.getTransportConnection().removeReadyListener(listener);
}
- public void acknowledge(String messageID) throws Exception
+ public void acknowledge(String messageID, String subscriptionID) throws Exception
{
long id = Long.parseLong(messageID);
long consumerID = messagesToAck.remove(id);
+ StompSubscription sub = subscriptions.get(consumerID);
+
+ if (subscriptionID != null)
+ {
+ if (!sub.getID().equals(subscriptionID))
+ {
+ throw new HornetQStompException("subscription id " + subscriptionID
+ " does not match " + sub.getID());
+ }
+ }
session.acknowledge(consumerID, id);
session.commit();
}
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompUtils.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompUtils.java 2011-09-02
11:52:55 UTC (rev 11292)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompUtils.java 2011-09-05
00:34:29 UTC (rev 11293)
@@ -32,7 +32,7 @@
*
*
*/
-class StompUtils
+public class StompUtils
{
// Constants -----------------------------------------------------
private static final String DEFAULT_MESSAGE_PRIORITY= "4";
@@ -46,8 +46,6 @@
public static void copyStandardHeadersFromFrameToMessage(StompFrame frame,
ServerMessageImpl msg) throws Exception
{
- Map<String, Object> headers = new HashMap<String,
Object>(frame.getHeaders());
-
String priority = (String)headers.remove(Stomp.Headers.Send.PRIORITY);
if (priority != null)
{
@@ -93,27 +91,26 @@
public static void copyStandardHeadersFromMessageToFrame(MessageInternal message,
StompFrame command, int deliveryCount) throws Exception
{
- final Map<String, Object> headers = command.getHeaders();
- headers.put(Stomp.Headers.Message.DESTINATION, message.getAddress().toString());
- headers.put(Stomp.Headers.Message.MESSAGE_ID, message.getMessageID());
+ command.addHeader(Stomp.Headers.Message.MESSAGE_ID,
String.valueOf(message.getMessageID()));
+ command.addHeader(Stomp.Headers.Message.DESTINATION,
message.getAddress().toString());
if (message.getObjectProperty("JMSCorrelationID") != null)
{
- headers.put(Stomp.Headers.Message.CORRELATION_ID,
message.getObjectProperty("JMSCorrelationID"));
+ command.addHeader(Stomp.Headers.Message.CORRELATION_ID,
message.getObjectProperty("JMSCorrelationID").toString());
}
- headers.put(Stomp.Headers.Message.EXPIRATION_TIME, "" +
message.getExpiration());
- headers.put(Stomp.Headers.Message.REDELIVERED, deliveryCount > 1);
- headers.put(Stomp.Headers.Message.PRORITY, "" + message.getPriority());
+ command.addHeader(Stomp.Headers.Message.EXPIRATION_TIME, "" +
message.getExpiration());
+ command.addHeader(Stomp.Headers.Message.REDELIVERED, String.valueOf(deliveryCount
> 1));
+ command.addHeader(Stomp.Headers.Message.PRORITY, "" +
message.getPriority());
if (message.getStringProperty(ClientMessageImpl.REPLYTO_HEADER_NAME) != null)
{
- headers.put(Stomp.Headers.Message.REPLY_TO,
+ command.addHeader(Stomp.Headers.Message.REPLY_TO,
message.getStringProperty(ClientMessageImpl.REPLYTO_HEADER_NAME));
}
- headers.put(Stomp.Headers.Message.TIMESTAMP, "" +
message.getTimestamp());
+ command.addHeader(Stomp.Headers.Message.TIMESTAMP, "" +
message.getTimestamp());
if (message.getObjectProperty("JMSType") != null)
{
- headers.put(Stomp.Headers.Message.TYPE,
message.getObjectProperty("JMSType"));
+ command.addHeader(Stomp.Headers.Message.TYPE,
message.getObjectProperty("JMSType").toString());
}
// now lets add all the message headers
@@ -127,7 +124,7 @@
continue;
}
- headers.put(name.toString(), message.getObjectProperty(name));
+ command.addHeader(name.toString(), message.getObjectProperty(name).toString());
}
}
// Constructors --------------------------------------------------
Added:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java
(rev 0)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java 2011-09-05
00:34:29 UTC (rev 11293)
@@ -0,0 +1,131 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.protocol.stomp;
+
+import java.io.UnsupportedEncodingException;
+
+import org.hornetq.core.protocol.stomp.v10.StompFrameHandlerV10;
+import org.hornetq.core.protocol.stomp.v11.StompFrameHandlerV11;
+import org.hornetq.core.server.ServerMessage;
+
+/**
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ */
+public abstract class VersionedStompFrameHandler
+{
+ protected StompConnection connection;
+
+ public static VersionedStompFrameHandler getHandler(StompConnection connection,
StompVersions version)
+ {
+ if (version == StompVersions.V1_0)
+ {
+ return new StompFrameHandlerV10(connection);
+ }
+ if (version == StompVersions.V1_1)
+ {
+ return new StompFrameHandlerV11(connection);
+ }
+ return null;
+ }
+
+ public StompFrame handleFrame(StompFrame request)
+ {
+ StompFrame response = null;
+
+ if (Stomp.Commands.SEND.equals(request.getCommand()))
+ {
+ response = onSend(request);
+ }
+ else if (Stomp.Commands.ACK.equals(request.getCommand()))
+ {
+ response = onAck(request);
+ }
+ else if (Stomp.Commands.NACK.equals(request.getCommand()))
+ {
+ response = onNack(request);
+ }
+ else if (Stomp.Commands.BEGIN.equals(request.getCommand()))
+ {
+ response = onBegin(request);
+ }
+ else if (Stomp.Commands.COMMIT.equals(request.getCommand()))
+ {
+ response = onCommit(request);
+ }
+ else if (Stomp.Commands.ABORT.equals(request.getCommand()))
+ {
+ response = onAbort(request);
+ }
+ else if (Stomp.Commands.SUBSCRIBE.equals(request.getCommand()))
+ {
+ response = onSubscribe(request);
+ }
+ else if (Stomp.Commands.UNSUBSCRIBE.equals(request.getCommand()))
+ {
+ response = onUnsubscribe(request);
+ }
+ else if (Stomp.Commands.CONNECT.equals(request.getCommand()))
+ {
+ response = onConnect(request);
+ }
+ else if (Stomp.Commands.STOMP.equals(request.getCommand()))
+ {
+ response = onStomp(request);
+ }
+ else if (Stomp.Commands.DISCONNECT.equals(request.getCommand()))
+ {
+ response = onDisconnect(request);
+ }
+ else
+ {
+ response = onUnknown(request.getCommand());
+ }
+
+ if (request.hasHeader(Stomp.Headers.RECEIPT_REQUESTED) && (response ==
null))
+ {
+ response = handleReceipt(request.getHeader(Stomp.Headers.RECEIPT_REQUESTED));
+ }
+
+ return response;
+ }
+
+ public abstract StompFrame onConnect(StompFrame frame);
+ public abstract StompFrame onDisconnect(StompFrame frame);
+ public abstract StompFrame onSend(StompFrame frame);
+ public abstract StompFrame onAck(StompFrame request);
+ public abstract StompFrame onBegin(StompFrame frame);
+ public abstract StompFrame onCommit(StompFrame request);
+ public abstract StompFrame onAbort(StompFrame request);
+ public abstract StompFrame onSubscribe(StompFrame request);
+ public abstract StompFrame onUnsubscribe(StompFrame request);
+ public abstract StompFrame onStomp(StompFrame request);
+ public abstract StompFrame onNack(StompFrame request);
+
+ public StompFrame onUnknown(String command)
+ {
+ StompFrame response = new HornetQStompException("Unsupported command " +
command).getFrame();
+ return response;
+ }
+
+ public StompFrame handleReceipt(String receiptID)
+ {
+ StompFrame receipt = new StompFrame(Stomp.Responses.RECEIPT);
+ receipt.addHeader(Stomp.Headers.Response.RECEIPT_ID, receiptID);
+
+ return receipt;
+ }
+
+ public abstract StompFrame createMessageFrame(ServerMessage serverMessage,
+ StompSubscription subscription, int deliveryCount) throws Exception;
+}
Added:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java
(rev 0)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java 2011-09-05
00:34:29 UTC (rev 11293)
@@ -0,0 +1,348 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.protocol.stomp.v10;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Map;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.Message;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.message.impl.MessageImpl;
+import org.hornetq.core.protocol.stomp.HornetQStompException;
+import org.hornetq.core.protocol.stomp.Stomp;
+import org.hornetq.core.protocol.stomp.StompConnection;
+import org.hornetq.core.protocol.stomp.StompFrame;
+import org.hornetq.core.protocol.stomp.StompSubscription;
+import org.hornetq.core.protocol.stomp.StompUtils;
+import org.hornetq.core.protocol.stomp.VersionedStompFrameHandler;
+import org.hornetq.core.protocol.stomp.Stomp.Headers;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.impl.ServerMessageImpl;
+import org.hornetq.utils.DataConstants;
+
+/**
+*
+* @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+*/
+public class StompFrameHandlerV10 extends VersionedStompFrameHandler
+{
+ private static final Logger log = Logger.getLogger(StompFrameHandlerV10.class);
+
+ public StompFrameHandlerV10(StompConnection connection)
+ {
+ this.connection = connection;
+ }
+
+ @Override
+ public StompFrame onConnect(StompFrame frame)
+ {
+ StompFrame response = null;
+ Map<String, String> headers = frame.getHeadersMap();
+ String login = (String)headers.get(Stomp.Headers.Connect.LOGIN);
+ String passcode = (String)headers.get(Stomp.Headers.Connect.PASSCODE);
+ String clientID = (String)headers.get(Stomp.Headers.Connect.CLIENT_ID);
+ String requestID = (String)headers.get(Stomp.Headers.Connect.REQUEST_ID);
+
+ if (connection.validateUser(login, passcode))
+ {
+ connection.setClientID(clientID);
+ connection.setValid(true);
+
+ response = new StompFrame(Stomp.Responses.CONNECTED);
+
+ response.addHeader(Stomp.Headers.Connected.SESSION,
connection.getID().toString());
+
+ if (requestID != null)
+ {
+ response.addHeader(Stomp.Headers.Connected.RESPONSE_ID, requestID);
+ }
+ }
+ else
+ {
+ //not valid
+ response = new StompFrame(Stomp.Responses.ERROR);
+ response.addHeader(Stomp.Headers.Error.MESSAGE, "Failed to connect");
+ response.setBody("The login account is not valid.");
+
+ connection.sendFrame(response);
+ connection.destroy();
+
+ return null;
+ }
+ return response;
+ }
+
+ @Override
+ public StompFrame onDisconnect(StompFrame frame)
+ {
+ connection.destroy();
+ return null;
+ }
+
+ @Override
+ public StompFrame onSend(StompFrame frame)
+ {
+ StompFrame response = null;
+ try
+ {
+ connection.validate();
+ String destination = frame.getHeader(Stomp.Headers.Send.DESTINATION);
+ String txID = frame.getHeader(Stomp.Headers.TRANSACTION);
+
+ long timestamp = System.currentTimeMillis();
+
+ ServerMessageImpl message = connection.createServerMessage();
+ message.setTimestamp(timestamp);
+ message.setAddress(SimpleString.toSimpleString(destination));
+ StompUtils.copyStandardHeadersFromFrameToMessage(frame, message);
+ if (frame.hasHeader(Stomp.Headers.CONTENT_LENGTH))
+ {
+ message.setType(Message.BYTES_TYPE);
+ message.getBodyBuffer().writeBytes(frame.getBodyAsBytes());
+ }
+ else
+ {
+ message.setType(Message.TEXT_TYPE);
+ String text = frame.getBody();
+
message.getBodyBuffer().writeNullableSimpleString(SimpleString.toSimpleString(text));
+ }
+
+ connection.sendServerMessage(message, txID);
+ }
+ catch (HornetQStompException e)
+ {
+ response = e.getFrame();
+ }
+ catch (Exception e)
+ {
+ response = new HornetQStompException("Error handling send",
e).getFrame();
+ }
+
+ return response;
+ }
+
+ @Override
+ public StompFrame onBegin(StompFrame frame)
+ {
+ StompFrame response = null;
+ String txID = frame.getHeader(Stomp.Headers.TRANSACTION);
+ if (txID == null)
+ {
+ response = new HornetQStompException("Need a transaction id to
begin").getFrame();
+ }
+ else
+ {
+ try
+ {
+ connection.beginTransaction(txID);
+ }
+ catch (HornetQStompException e)
+ {
+ response = e.getFrame();
+ }
+ }
+ return response;
+ }
+
+ @Override
+ public StompFrame onCommit(StompFrame request)
+ {
+ StompFrame response = null;
+
+ String txID = request.getHeader(Stomp.Headers.TRANSACTION);
+ if (txID == null)
+ {
+ response = new HornetQStompException("transaction header is mandatory to
COMMIT a transaction").getFrame();
+ return response;
+ }
+
+ try
+ {
+ connection.commitTransaction(txID);
+ }
+ catch (HornetQStompException e)
+ {
+ response = e.getFrame();
+ }
+ return response;
+ }
+
+ @Override
+ public StompFrame onAbort(StompFrame request)
+ {
+ StompFrame response = null;
+ String txID = request.getHeader(Stomp.Headers.TRANSACTION);
+
+ if (txID == null)
+ {
+ response = new HornetQStompException("transaction header is mandatory to
ABORT a transaction").getFrame();
+ return response;
+ }
+
+ try
+ {
+ connection.abortTransaction(txID);
+ }
+ catch (HornetQStompException e)
+ {
+ response = e.getFrame();
+ }
+
+ return response;
+ }
+
+ @Override
+ public StompFrame onSubscribe(StompFrame request)
+ {
+ StompFrame response = null;
+ String destination = request.getHeader(Stomp.Headers.Subscribe.DESTINATION);
+
+ String selector = request.getHeader(Stomp.Headers.Subscribe.SELECTOR);
+ String ack = request.getHeader(Stomp.Headers.Subscribe.ACK_MODE);
+ String id = request.getHeader(Stomp.Headers.Subscribe.ID);
+ String durableSubscriptionName =
request.getHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIBER_NAME);
+ boolean noLocal = false;
+
+ if (request.hasHeader(Stomp.Headers.Subscribe.NO_LOCAL))
+ {
+ noLocal =
Boolean.parseBoolean(request.getHeader(Stomp.Headers.Subscribe.NO_LOCAL));
+ }
+
+ try
+ {
+ connection.subscribe(destination, selector, ack, id, durableSubscriptionName,
noLocal);
+ }
+ catch (HornetQStompException e)
+ {
+ response = e.getFrame();
+ }
+
+ return response;
+ }
+
+ @Override
+ public StompFrame onUnsubscribe(StompFrame request)
+ {
+ StompFrame response = null;
+ String destination = request.getHeader(Stomp.Headers.Unsubscribe.DESTINATION);
+ String id = request.getHeader(Stomp.Headers.Unsubscribe.ID);
+
+ String subscriptionID = null;
+ if (id != null)
+ {
+ subscriptionID = id;
+ }
+ else
+ {
+ if (destination == null)
+ {
+ response = new HornetQStompException("Must specify the
subscription's id or " +
+ "the destination you are unsubscribing from").getFrame();
+ return response;
+ }
+ subscriptionID = "subscription/" + destination;
+ }
+
+ try
+ {
+ connection.unsubscribe(subscriptionID);
+ }
+ catch (HornetQStompException e)
+ {
+ return e.getFrame();
+ }
+ return response;
+ }
+
+ @Override
+ public StompFrame onAck(StompFrame request)
+ {
+ StompFrame response = null;
+
+ String messageID = request.getHeader(Stomp.Headers.Ack.MESSAGE_ID);
+ String txID = request.getHeader(Stomp.Headers.TRANSACTION);
+
+ if (txID != null)
+ {
+ log.warn("Transactional acknowledgement is not supported");
+ }
+
+ try
+ {
+ connection.acknowledge(messageID, null);
+ }
+ catch (HornetQStompException e)
+ {
+ response = e.getFrame();
+ }
+
+ return response;
+ }
+
+ @Override
+ public StompFrame onStomp(StompFrame request)
+ {
+ return onUnknown(request.getCommand());
+ }
+
+ @Override
+ public StompFrame onNack(StompFrame request)
+ {
+ return onUnknown(request.getCommand());
+ }
+
+ @Override
+ public StompFrame createMessageFrame(ServerMessage serverMessage,
+ StompSubscription subscription, int deliveryCount) throws Exception
+ {
+ StompFrame frame = new StompFrame(Stomp.Responses.MESSAGE);
+
+ if (subscription.getID() != null)
+ {
+ frame.addHeader(Stomp.Headers.Message.SUBSCRIPTION, subscription.getID());
+ }
+ HornetQBuffer buffer = serverMessage.getBodyBuffer();
+
+ int bodyPos = serverMessage.getEndOfBodyPosition() == -1 ? buffer.writerIndex()
+ :
serverMessage.getEndOfBodyPosition();
+ int size = bodyPos - buffer.readerIndex();
+ buffer.readerIndex(MessageImpl.BUFFER_HEADER_SPACE + DataConstants.SIZE_INT);
+ byte[] data = new byte[size];
+ if (serverMessage.containsProperty(Stomp.Headers.CONTENT_LENGTH) ||
serverMessage.getType() == Message.BYTES_TYPE)
+ {
+ frame.addHeader(Headers.CONTENT_LENGTH, String.valueOf(data.length));
+ buffer.readBytes(data);
+ }
+ else
+ {
+ SimpleString text = buffer.readNullableSimpleString();
+ if (text != null)
+ {
+ data = text.toString().getBytes("UTF-8");
+ }
+ else
+ {
+ data = new byte[0];
+ }
+ }
+ serverMessage.getBodyBuffer().resetReaderIndex();
+
+ StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame,
deliveryCount);
+
+ return frame;
+
+ }
+
+}
Added:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
(rev 0)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2011-09-05
00:34:29 UTC (rev 11293)
@@ -0,0 +1,362 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.protocol.stomp.v11;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Map;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.Message;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.message.impl.MessageImpl;
+import org.hornetq.core.protocol.stomp.HornetQStompException;
+import org.hornetq.core.protocol.stomp.Stomp;
+import org.hornetq.core.protocol.stomp.StompConnection;
+import org.hornetq.core.protocol.stomp.StompFrame;
+import org.hornetq.core.protocol.stomp.StompSubscription;
+import org.hornetq.core.protocol.stomp.StompUtils;
+import org.hornetq.core.protocol.stomp.VersionedStompFrameHandler;
+import org.hornetq.core.protocol.stomp.Stomp.Headers;
+import org.hornetq.core.protocol.stomp.v10.StompFrameHandlerV10;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.impl.ServerMessageImpl;
+import org.hornetq.spi.core.security.HornetQSecurityManager;
+import org.hornetq.utils.DataConstants;
+
+public class StompFrameHandlerV11 extends VersionedStompFrameHandler
+{
+ private static final Logger log = Logger.getLogger(StompFrameHandlerV11.class);
+
+ public StompFrameHandlerV11(StompConnection connection)
+ {
+ this.connection = connection;
+ }
+
+ @Override
+ public StompFrame onConnect(StompFrame frame)
+ {
+ StompFrame response = null;
+ Map<String, String> headers = frame.getHeadersMap();
+ String login = headers.get(Stomp.Headers.Connect.LOGIN);
+ String passcode = headers.get(Stomp.Headers.Connect.PASSCODE);
+ String clientID = headers.get(Stomp.Headers.Connect.CLIENT_ID);
+ String requestID = headers.get(Stomp.Headers.Connect.REQUEST_ID);
+
+ if (connection.validateUser(login, passcode))
+ {
+ connection.setClientID(clientID);
+ connection.setValid(true);
+
+ response = new StompFrame(Stomp.Responses.CONNECTED);
+
+ //version
+ response.addHeader(Stomp.Headers.Connected.VERSION, connection.getVersion());
+
+ //session
+ response.addHeader(Stomp.Headers.Connected.SESSION,
connection.getID().toString());
+
+ //server
+ response.addHeader(Stomp.Headers.Connected.SERVER,
connection.getHornetQServerName());
+
+ if (requestID != null)
+ {
+ response.addHeader(Stomp.Headers.Connected.RESPONSE_ID, requestID);
+ }
+ }
+ else
+ {
+ //not valid
+ response = new StompFrame(Stomp.Responses.ERROR);
+ response.addHeader(Stomp.Headers.Error.VERSION, "1.0,1.1");
+
+ response.setBody("Supported protocol versions are 1.0 and 1.1");
+
+ connection.sendFrame(response);
+ connection.destroy();
+
+ return null;
+ }
+ return response;
+ }
+
+ @Override
+ public StompFrame onDisconnect(StompFrame frame)
+ {
+ connection.destroy();
+ return null;
+ }
+
+ @Override
+ public StompFrame onSend(StompFrame frame)
+ {
+ StompFrame response = null;
+ try
+ {
+ connection.validate();
+ String destination = frame.getHeader(Stomp.Headers.Send.DESTINATION);
+ String txID = frame.getHeader(Stomp.Headers.TRANSACTION);
+
+ long timestamp = System.currentTimeMillis();
+
+ ServerMessageImpl message = connection.createServerMessage();
+ message.setTimestamp(timestamp);
+ message.setAddress(SimpleString.toSimpleString(destination));
+ StompUtils.copyStandardHeadersFromFrameToMessage(frame, message);
+ if (frame.hasHeader(Stomp.Headers.CONTENT_LENGTH))
+ {
+ message.setType(Message.BYTES_TYPE);
+ message.getBodyBuffer().writeBytes(frame.getBodyAsBytes());
+ }
+ else
+ {
+ message.setType(Message.TEXT_TYPE);
+ String text = frame.getBody();
+
message.getBodyBuffer().writeNullableSimpleString(SimpleString.toSimpleString(text));
+ }
+
+ connection.sendServerMessage(message, txID);
+ }
+ catch (HornetQStompException e)
+ {
+ response = e.getFrame();
+ }
+ catch (Exception e)
+ {
+ response = new HornetQStompException("Error handling send",
e).getFrame();
+ }
+
+ return response;
+ }
+
+ @Override
+ public StompFrame onBegin(StompFrame frame)
+ {
+ StompFrame response = null;
+ String txID = frame.getHeader(Stomp.Headers.TRANSACTION);
+ if (txID == null)
+ {
+ response = new HornetQStompException("Need a transaction id to
begin").getFrame();
+ }
+ else
+ {
+ try
+ {
+ connection.beginTransaction(txID);
+ }
+ catch (HornetQStompException e)
+ {
+ response = e.getFrame();
+ }
+ }
+ return response;
+ }
+
+ @Override
+ public StompFrame onCommit(StompFrame request)
+ {
+ StompFrame response = null;
+
+ String txID = request.getHeader(Stomp.Headers.TRANSACTION);
+ if (txID == null)
+ {
+ response = new HornetQStompException("transaction header is mandatory to
COMMIT a transaction").getFrame();
+ return response;
+ }
+
+ try
+ {
+ connection.commitTransaction(txID);
+ }
+ catch (HornetQStompException e)
+ {
+ response = e.getFrame();
+ }
+ return response;
+ }
+
+ @Override
+ public StompFrame onAbort(StompFrame request)
+ {
+ StompFrame response = null;
+ String txID = request.getHeader(Stomp.Headers.TRANSACTION);
+
+ if (txID == null)
+ {
+ response = new HornetQStompException("transaction header is mandatory to
ABORT a transaction").getFrame();
+ return response;
+ }
+
+ try
+ {
+ connection.abortTransaction(txID);
+ }
+ catch (HornetQStompException e)
+ {
+ response = e.getFrame();
+ }
+
+ return response;
+ }
+
+ @Override
+ public StompFrame onSubscribe(StompFrame request)
+ {
+ StompFrame response = null;
+ String destination = request.getHeader(Stomp.Headers.Subscribe.DESTINATION);
+
+ String selector = request.getHeader(Stomp.Headers.Subscribe.SELECTOR);
+ String ack = request.getHeader(Stomp.Headers.Subscribe.ACK_MODE);
+ String id = request.getHeader(Stomp.Headers.Subscribe.ID);
+ String durableSubscriptionName =
request.getHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIBER_NAME);
+ boolean noLocal = false;
+
+ if (request.hasHeader(Stomp.Headers.Subscribe.NO_LOCAL))
+ {
+ noLocal =
Boolean.parseBoolean(request.getHeader(Stomp.Headers.Subscribe.NO_LOCAL));
+ }
+
+ try
+ {
+ connection.subscribe(destination, selector, ack, id, durableSubscriptionName,
noLocal);
+ }
+ catch (HornetQStompException e)
+ {
+ response = e.getFrame();
+ }
+
+ return response;
+ }
+
+ @Override
+ public StompFrame onUnsubscribe(StompFrame request)
+ {
+ StompFrame response = null;
+ //unsubscribe in 1.1 only needs id header
+ String id = request.getHeader(Stomp.Headers.Unsubscribe.ID);
+
+ String subscriptionID = null;
+ if (id != null)
+ {
+ subscriptionID = id;
+ }
+ else
+ {
+ response = new HornetQStompException("Must specify the subscription's
id").getFrame();
+ return response;
+ }
+
+ try
+ {
+ connection.unsubscribe(subscriptionID);
+ }
+ catch (HornetQStompException e)
+ {
+ response = e.getFrame();
+ }
+ return response;
+ }
+
+ @Override
+ public StompFrame onAck(StompFrame request)
+ {
+ StompFrame response = null;
+
+ String messageID = request.getHeader(Stomp.Headers.Ack.MESSAGE_ID);
+ String txID = request.getHeader(Stomp.Headers.TRANSACTION);
+ String subscriptionID = request.getHeader(Stomp.Headers.Ack.SUBSCRIPTION);
+
+ if (txID != null)
+ {
+ log.warn("Transactional acknowledgement is not supported");
+ }
+
+ if (subscriptionID == null)
+ {
+ response = new HornetQStompException("subscription header is
required").getFrame();
+ return response;
+ }
+
+ try
+ {
+ connection.acknowledge(messageID, subscriptionID);
+ }
+ catch (HornetQStompException e)
+ {
+ response = e.getFrame();
+ }
+
+ return response;
+ }
+
+ @Override
+ public StompFrame onStomp(StompFrame request)
+ {
+ return onConnect(request);
+ }
+
+ @Override
+ public StompFrame onNack(StompFrame request)
+ {
+ //this eventually means discard the message (it never be redelivered again).
+ //we can consider supporting redeliver to a different sub.
+ return onAck(request);
+ }
+
+ @Override
+ public StompFrame createMessageFrame(ServerMessage serverMessage,
+ StompSubscription subscription, int deliveryCount)
+ throws Exception
+ {
+ StompFrame frame = new StompFrame(Stomp.Responses.MESSAGE);
+
+ if (subscription.getID() != null)
+ {
+ frame.addHeader(Stomp.Headers.Message.SUBSCRIPTION, subscription.getID());
+ }
+
+ HornetQBuffer buffer = serverMessage.getBodyBuffer();
+
+ int bodyPos = serverMessage.getEndOfBodyPosition() == -1 ? buffer.writerIndex()
+ :
serverMessage.getEndOfBodyPosition();
+ int size = bodyPos - buffer.readerIndex();
+ buffer.readerIndex(MessageImpl.BUFFER_HEADER_SPACE + DataConstants.SIZE_INT);
+ byte[] data = new byte[size];
+ if (serverMessage.containsProperty(Stomp.Headers.CONTENT_LENGTH) ||
serverMessage.getType() == Message.BYTES_TYPE)
+ {
+ frame.addHeader(Stomp.Headers.CONTENT_LENGTH, String.valueOf(data.length));
+ buffer.readBytes(data);
+ }
+ else
+ {
+ SimpleString text = buffer.readNullableSimpleString();
+ if (text != null)
+ {
+ data = text.toString().getBytes("UTF-8");
+ }
+ else
+ {
+ data = new byte[0];
+ }
+ }
+ frame.addHeader(Stomp.Headers.CONTENT_TYPE, "text/plain");
+
+ serverMessage.getBodyBuffer().resetReaderIndex();
+
+ StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame,
deliveryCount);
+
+ return frame;
+
+ }
+
+}