Author: jmesnil
Date: 2010-01-28 08:27:55 -0500 (Thu, 28 Jan 2010)
New Revision: 8855
Modified:
trunk/src/main/org/hornetq/core/protocol/stomp/Stomp.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompException.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameError.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompSubscription.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompUtils.java
Log:
fix code format
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/Stomp.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/Stomp.java 2010-01-28 13:26:39 UTC (rev
8854)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/Stomp.java 2010-01-28 13:27:55 UTC (rev
8855)
@@ -17,108 +17,165 @@
*/
package org.hornetq.core.protocol.stomp;
-
/**
* The standard verbs and headers used for the <a
href="http://stomp.codehaus.org/">STOMP</a> protocol.
*
* @version $Revision: 57 $
*/
-public interface Stomp {
- String NULL = "\u0000";
- String NEWLINE = "\n";
+public interface Stomp
+{
+ String NULL = "\u0000";
- public static interface Commands {
- String CONNECT = "CONNECT";
- String SEND = "SEND";
- String DISCONNECT = "DISCONNECT";
- String SUBSCRIBE = "SUBSCRIBE";
- String UNSUBSCRIBE = "UNSUBSCRIBE";
- String BEGIN_TRANSACTION = "BEGIN";
- String COMMIT_TRANSACTION = "COMMIT";
- String ABORT_TRANSACTION = "ABORT";
- String BEGIN = "BEGIN";
- String COMMIT = "COMMIT";
- String ABORT = "ABORT";
- String ACK = "ACK";
- }
+ String NEWLINE = "\n";
- public interface Responses {
- String CONNECTED = "CONNECTED";
- String ERROR = "ERROR";
- String MESSAGE = "MESSAGE";
- String RECEIPT = "RECEIPT";
- }
+ public static interface Commands
+ {
+ String CONNECT = "CONNECT";
- public interface Headers {
- String SEPERATOR = ":";
- String RECEIPT_REQUESTED = "receipt";
- String TRANSACTION = "transaction";
- String CONTENT_LENGTH = "content-length";
+ String SEND = "SEND";
- public interface Response {
- String RECEIPT_ID = "receipt-id";
- }
+ String DISCONNECT = "DISCONNECT";
- public interface Send {
- String DESTINATION = "destination";
- String CORRELATION_ID = "correlation-id";
- String REPLY_TO = "reply-to";
- String EXPIRATION_TIME = "expires";
- String PRIORITY = "priority";
- String TYPE = "type";
- Object PERSISTENT = "persistent";
- }
+ String SUBSCRIBE = "SUBSCRIBE";
- public interface Message {
- String MESSAGE_ID = "message-id";
- String DESTINATION = "destination";
- String CORRELATION_ID = "correlation-id";
- String EXPIRATION_TIME = "expires";
- String REPLY_TO = "reply-to";
- String PRORITY = "priority";
- String REDELIVERED = "redelivered";
- String TIMESTAMP = "timestamp";
- String TYPE = "type";
- String SUBSCRIPTION = "subscription";
- }
+ String UNSUBSCRIBE = "UNSUBSCRIBE";
- public interface Subscribe {
- String DESTINATION = "destination";
- String ACK_MODE = "ack";
- String ID = "id";
- String SELECTOR = "selector";
- String DURABLE_SUBSCRIPTION_NAME = "durable-subscription-name";
- String NO_LOCAL = "no-local";
+ String BEGIN_TRANSACTION = "BEGIN";
- public interface AckModeValues {
- String AUTO = "auto";
- String CLIENT = "client";
- }
- }
+ String COMMIT_TRANSACTION = "COMMIT";
- public interface Unsubscribe {
- String DESTINATION = "destination";
- String ID = "id";
- }
+ String ABORT_TRANSACTION = "ABORT";
- public interface Connect {
- String LOGIN = "login";
- String PASSCODE = "passcode";
- String CLIENT_ID = "client-id";
- String REQUEST_ID = "request-id";
- }
+ String BEGIN = "BEGIN";
- public interface Error {
- String MESSAGE = "message";
- }
+ String COMMIT = "COMMIT";
- public interface Connected {
- String SESSION = "session";
- String RESPONSE_ID = "response-id";
- }
+ String ABORT = "ABORT";
- public interface Ack {
- String MESSAGE_ID = "message-id";
- }
- }
+ String ACK = "ACK";
+ }
+
+ public interface Responses
+ {
+ String CONNECTED = "CONNECTED";
+
+ String ERROR = "ERROR";
+
+ String MESSAGE = "MESSAGE";
+
+ String RECEIPT = "RECEIPT";
+ }
+
+ public interface Headers
+ {
+ String SEPERATOR = ":";
+
+ String RECEIPT_REQUESTED = "receipt";
+
+ String TRANSACTION = "transaction";
+
+ String CONTENT_LENGTH = "content-length";
+
+ public interface Response
+ {
+ String RECEIPT_ID = "receipt-id";
+ }
+
+ public interface Send
+ {
+ String DESTINATION = "destination";
+
+ String CORRELATION_ID = "correlation-id";
+
+ String REPLY_TO = "reply-to";
+
+ String EXPIRATION_TIME = "expires";
+
+ String PRIORITY = "priority";
+
+ String TYPE = "type";
+
+ Object PERSISTENT = "persistent";
+ }
+
+ public interface Message
+ {
+ String MESSAGE_ID = "message-id";
+
+ String DESTINATION = "destination";
+
+ String CORRELATION_ID = "correlation-id";
+
+ String EXPIRATION_TIME = "expires";
+
+ String REPLY_TO = "reply-to";
+
+ String PRORITY = "priority";
+
+ String REDELIVERED = "redelivered";
+
+ String TIMESTAMP = "timestamp";
+
+ String TYPE = "type";
+
+ String SUBSCRIPTION = "subscription";
+ }
+
+ public interface Subscribe
+ {
+ String DESTINATION = "destination";
+
+ String ACK_MODE = "ack";
+
+ String ID = "id";
+
+ String SELECTOR = "selector";
+
+ String DURABLE_SUBSCRIPTION_NAME = "durable-subscription-name";
+
+ String NO_LOCAL = "no-local";
+
+ public interface AckModeValues
+ {
+ String AUTO = "auto";
+
+ String CLIENT = "client";
+ }
+ }
+
+ public interface Unsubscribe
+ {
+ String DESTINATION = "destination";
+
+ String ID = "id";
+ }
+
+ public interface Connect
+ {
+ String LOGIN = "login";
+
+ String PASSCODE = "passcode";
+
+ String CLIENT_ID = "client-id";
+
+ String REQUEST_ID = "request-id";
+ }
+
+ public interface Error
+ {
+ String MESSAGE = "message";
+ }
+
+ public interface Connected
+ {
+ String SESSION = "session";
+
+ String RESPONSE_ID = "response-id";
+ }
+
+ public interface Ack
+ {
+ String MESSAGE_ID = "message-id";
+ }
+ }
}
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java 2010-01-28
13:26:39 UTC (rev 8854)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java 2010-01-28
13:27:55 UTC (rev 8855)
@@ -39,11 +39,11 @@
private static final Logger log = Logger.getLogger(StompConnection.class);
private final StompProtocolManager manager;
-
+
private final Connection transportConnection;
-
+
private String login;
-
+
private String passcode;
private String clientID;
@@ -52,12 +52,10 @@
private boolean destroyed = false;
- private final List<FailureListener> failureListeners = new
CopyOnWriteArrayList<FailureListener>();
-
StompConnection(final Connection transportConnection, final StompProtocolManager
manager)
{
this.transportConnection = transportConnection;
-
+
this.manager = manager;
}
@@ -67,12 +65,6 @@
public void addFailureListener(FailureListener listener)
{
- if (listener == null)
- {
- throw new IllegalStateException("FailureListener cannot be null");
- }
-
- failureListeners.add(listener);
}
public boolean checkDataReceived()
@@ -95,8 +87,8 @@
destroyed = true;
transportConnection.close();
-
- callFailureListeners(new HornetQException(HornetQException.INTERNAL_ERROR,
"Stomp connection destroyed"));
+
+ manager.cleanup(this);
}
public void disconnect()
@@ -108,7 +100,7 @@
}
public void flush()
- {
+ {
}
public List<FailureListener> getFailureListeners()
@@ -124,7 +116,7 @@
}
public String getRemoteAddress()
- {
+ {
return transportConnection.getRemoteAddress();
}
@@ -150,22 +142,13 @@
public boolean removeFailureListener(FailureListener listener)
{
- if (listener == null)
- {
- throw new IllegalStateException("FailureListener cannot be null");
- }
-
- return failureListeners.remove(listener);
+ return false;
}
public void setFailureListeners(List<FailureListener> listeners)
{
- failureListeners.clear();
-
- failureListeners.addAll(listeners);
}
-
public void bufferReceived(Object connectionID, HornetQBuffer buffer)
{
manager.handleBuffer(this, buffer);
@@ -200,30 +183,9 @@
{
return valid;
}
-
+
public void setValid(boolean valid)
{
this.valid = valid;
}
-
- private void callFailureListeners(final HornetQException me)
- {
- final List<FailureListener> listenersClone = new
ArrayList<FailureListener>(failureListeners);
-
- for (final FailureListener listener : listenersClone)
- {
- try
- {
- listener.connectionFailed(me);
- }
- catch (final Throwable t)
- {
- // Failure of one listener to execute shouldn't prevent others
- // from
- // executing
- log.error("Failed to execute failure listener", t);
- }
- }
- }
-
}
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompException.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompException.java 2010-01-28 13:26:39
UTC (rev 8854)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompException.java 2010-01-28 13:27:55
UTC (rev 8855)
@@ -22,29 +22,36 @@
/**
* @author <a href="http://hiramchirino.com">chirino</a>
*/
-class StompException extends IOException {
- private static final long serialVersionUID = -2869735532997332242L;
- private final boolean fatal;
+class StompException extends IOException
+{
+ private static final long serialVersionUID = -2869735532997332242L;
- public StompException() {
- this(null);
- }
+ private final boolean fatal;
- public StompException(String s) {
- this(s, false);
- }
+ public StompException()
+ {
+ this(null);
+ }
- public StompException(String s, boolean fatal) {
- this(s, fatal, null);
- }
+ public StompException(String s)
+ {
+ this(s, false);
+ }
- public StompException(String s, boolean fatal, Throwable cause) {
- super(s);
- this.fatal = fatal;
- initCause(cause);
- }
+ public StompException(String s, boolean fatal)
+ {
+ this(s, fatal, null);
+ }
- public boolean isFatal() {
- return fatal;
- }
+ public StompException(String s, boolean fatal, Throwable cause)
+ {
+ super(s);
+ this.fatal = fatal;
+ initCause(cause);
+ }
+
+ public boolean isFatal()
+ {
+ return fatal;
+ }
}
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameError.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameError.java 2010-01-28
13:26:39 UTC (rev 8854)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameError.java 2010-01-28
13:27:55 UTC (rev 8855)
@@ -22,14 +22,17 @@
*
* @author <a href="http://hiramchirino.com">chirino</a>
*/
-class StompFrameError extends StompFrame {
- private final StompException exception;
+class StompFrameError extends StompFrame
+{
+ private final StompException exception;
- public StompFrameError(StompException exception) {
- this.exception = exception;
- }
+ public StompFrameError(StompException exception)
+ {
+ this.exception = exception;
+ }
- public StompException getException() {
- return exception;
- }
+ public StompException getException()
+ {
+ return exception;
+ }
}
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java 2010-01-28
13:26:39 UTC (rev 8854)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java 2010-01-28
13:27:55 UTC (rev 8855)
@@ -30,169 +30,206 @@
/**
* Implements marshalling and unmarsalling the <a
href="http://stomp.codehaus.org/">Stomp</a> protocol.
*/
-class StompMarshaller {
- public static final byte[] NO_DATA = new byte[]{};
- private static final byte[] END_OF_FRAME = new byte[]{0, '\n'};
- private static final int MAX_COMMAND_LENGTH = 1024;
- private static final int MAX_HEADER_LENGTH = 1024 * 10;
- private static final int MAX_HEADERS = 1000;
- private static final int MAX_DATA_LENGTH = 1024 * 1024 * 100;
- private int version = 1;
+class StompMarshaller
+{
+ public static final byte[] NO_DATA = new byte[] {};
- public int getVersion() {
- return version;
- }
+ private static final byte[] END_OF_FRAME = new byte[] { 0, '\n' };
- public void setVersion(int version) {
- this.version = version;
- }
+ private static final int MAX_COMMAND_LENGTH = 1024;
- public byte[] marshal(StompFrame command) throws IOException {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(baos);
- marshal(command, dos);
- dos.close();
- return baos.toByteArray();
- }
+ private static final int MAX_HEADER_LENGTH = 1024 * 10;
- public void marshal(StompFrame stomp, DataOutput os) throws IOException {
- StringBuffer buffer = new StringBuffer();
- buffer.append(stomp.getCommand());
- buffer.append(Stomp.NEWLINE);
+ private static final int MAX_HEADERS = 1000;
- // Output the headers.
- for (Iterator<Map.Entry<String, Object>> iter =
stomp.getHeaders().entrySet().iterator(); iter.hasNext();) {
- Map.Entry<String, Object> entry = iter.next();
- buffer.append(entry.getKey());
- buffer.append(Stomp.Headers.SEPERATOR);
- buffer.append(entry.getValue());
- buffer.append(Stomp.NEWLINE);
- }
+ private static final int MAX_DATA_LENGTH = 1024 * 1024 * 100;
- // Add a newline to seperate the headers from the content.
- buffer.append(Stomp.NEWLINE);
+ private int version = 1;
- os.write(buffer.toString().getBytes("UTF-8"));
- os.write(stomp.getContent());
- os.write(END_OF_FRAME);
- }
+ public int getVersion()
+ {
+ return version;
+ }
- public StompFrame unmarshal(HornetQBuffer in) throws IOException {
+ public void setVersion(int version)
+ {
+ this.version = version;
+ }
- try {
- String action = null;
+ public byte[] marshal(StompFrame command) throws IOException
+ {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(baos);
+ marshal(command, dos);
+ dos.close();
+ return baos.toByteArray();
+ }
- // skip white space to next real action line
- while (true) {
- action = readLine(in, MAX_COMMAND_LENGTH, "The maximum command
length was exceeded");
- if (action == null) {
- throw new IOException("connection was closed");
- }
- else {
- action = action.trim();
- if (action.length() > 0) {
- break;
- }
- }
+ public void marshal(StompFrame stomp, DataOutput os) throws IOException
+ {
+ StringBuffer buffer = new StringBuffer();
+ buffer.append(stomp.getCommand());
+ buffer.append(Stomp.NEWLINE);
+
+ // Output the headers.
+ for (Iterator<Map.Entry<String, Object>> iter =
stomp.getHeaders().entrySet().iterator(); iter.hasNext();)
+ {
+ Map.Entry<String, Object> entry = iter.next();
+ buffer.append(entry.getKey());
+ buffer.append(Stomp.Headers.SEPERATOR);
+ buffer.append(entry.getValue());
+ buffer.append(Stomp.NEWLINE);
+ }
+
+ // Add a newline to seperate the headers from the content.
+ buffer.append(Stomp.NEWLINE);
+
+ os.write(buffer.toString().getBytes("UTF-8"));
+ os.write(stomp.getContent());
+ os.write(END_OF_FRAME);
+ }
+
+ public StompFrame unmarshal(HornetQBuffer in) throws IOException
+ {
+
+ try
+ {
+ String action = null;
+
+ // skip white space to next real action line
+ while (true)
+ {
+ action = readLine(in, MAX_COMMAND_LENGTH, "The maximum command length
was exceeded");
+ if (action == null)
+ {
+ throw new IOException("connection was closed");
}
+ else
+ {
+ action = action.trim();
+ if (action.length() > 0)
+ {
+ break;
+ }
+ }
+ }
- // Parse the headers
- HashMap<String, Object> headers = new HashMap<String,
Object>(25);
- while (true) {
- String line = readLine(in, MAX_HEADER_LENGTH, "The maximum header
length was exceeded");
- if (line != null && line.trim().length() > 0) {
+ // Parse the headers
+ HashMap<String, Object> headers = new HashMap<String, Object>(25);
+ while (true)
+ {
+ String line = readLine(in, MAX_HEADER_LENGTH, "The maximum header length
was exceeded");
+ if (line != null && line.trim().length() > 0)
+ {
- if (headers.size() > MAX_HEADERS) {
- throw new StompException("The maximum number of headers was
exceeded", true);
- }
+ if (headers.size() > MAX_HEADERS)
+ {
+ throw new StompException("The maximum number of headers was
exceeded", true);
+ }
- try {
- int seperator_index = line.indexOf(Stomp.Headers.SEPERATOR);
- String name = line.substring(0, seperator_index).trim();
- String value = line.substring(seperator_index + 1,
line.length()).trim();
- headers.put(name, value);
- }
- catch (Exception e) {
- throw new StompException("Unable to parser header line
[" + line + "]", true);
- }
- }
- else {
- break;
- }
+ try
+ {
+ int seperator_index = line.indexOf(Stomp.Headers.SEPERATOR);
+ String name = line.substring(0, seperator_index).trim();
+ String value = line.substring(seperator_index + 1,
line.length()).trim();
+ headers.put(name, value);
+ }
+ catch (Exception e)
+ {
+ throw new StompException("Unable to parser header line [" +
line + "]", true);
+ }
}
+ else
+ {
+ break;
+ }
+ }
- // Read in the data part.
- byte[] data = NO_DATA;
- String contentLength = (String) headers.get(Stomp.Headers.CONTENT_LENGTH);
- if (contentLength != null) {
+ // Read in the data part.
+ byte[] data = NO_DATA;
+ String contentLength = (String)headers.get(Stomp.Headers.CONTENT_LENGTH);
+ if (contentLength != null)
+ {
- // Bless the client, he's telling us how much data to read in.
- int length;
- try {
- length = Integer.parseInt(contentLength.trim());
- }
- catch (NumberFormatException e) {
- throw new StompException("Specified content-length is not a
valid integer", true);
- }
+ // Bless the client, he's telling us how much data to read in.
+ int length;
+ try
+ {
+ length = Integer.parseInt(contentLength.trim());
+ }
+ catch (NumberFormatException e)
+ {
+ throw new StompException("Specified content-length is not a valid
integer", true);
+ }
- if (length > MAX_DATA_LENGTH) {
- throw new StompException("The maximum data length was
exceeded", true);
- }
+ if (length > MAX_DATA_LENGTH)
+ {
+ throw new StompException("The maximum data length was exceeded",
true);
+ }
- data = new byte[length];
- in.readBytes(data);
+ data = new byte[length];
+ in.readBytes(data);
- if (in.readByte() != 0) {
- throw new StompException(Stomp.Headers.CONTENT_LENGTH + " bytes
were read and " + "there was no trailing null byte", true);
- }
+ if (in.readByte() != 0)
+ {
+ throw new StompException(Stomp.Headers.CONTENT_LENGTH + " bytes were
read and " +
+ "there was no trailing null byte",
true);
}
- else {
+ }
+ else
+ {
- // We don't know how much to read.. data ends when we hit a 0
- byte b;
- ByteArrayOutputStream baos = null;
- while (in.readableBytes() > 0 && (b = in.readByte()) != 0) {
+ // We don't know how much to read.. data ends when we hit a 0
+ byte b;
+ ByteArrayOutputStream baos = null;
+ while (in.readableBytes() > 0 && (b = in.readByte()) != 0)
+ {
- if (baos == null) {
- baos = new ByteArrayOutputStream();
- }
- else if (baos.size() > MAX_DATA_LENGTH) {
- throw new StompException("The maximum data length was
exceeded", true);
- }
+ if (baos == null)
+ {
+ baos = new ByteArrayOutputStream();
+ }
+ else if (baos.size() > MAX_DATA_LENGTH)
+ {
+ throw new StompException("The maximum data length was
exceeded", true);
+ }
- baos.write(b);
- }
+ baos.write(b);
+ }
- if (baos != null) {
- baos.close();
- data = baos.toByteArray();
- }
+ if (baos != null)
+ {
+ baos.close();
+ data = baos.toByteArray();
}
+ }
- return new StompFrame(action, headers, data);
- }
- catch (StompException e) {
- return new StompFrameError(e);
- }
- }
+ return new StompFrame(action, headers, data);
+ }
+ catch (StompException e)
+ {
+ return new StompFrameError(e);
+ }
+ }
- protected String readLine(HornetQBuffer in, int maxLength, String errorMessage)
throws IOException {
- char[] chars = new char[MAX_HEADER_LENGTH];
-
- int count = 0;
- while (in.readable())
- {
- byte b = in.readByte();
-
- if (b == (byte)'\n')
- {
- break;
- }
- else
- {
- chars[count++] = (char)b;
- }
- }
- return new String(chars, 0, count);
- }
+ protected String readLine(HornetQBuffer in, int maxLength, String errorMessage) throws
IOException
+ {
+ char[] chars = new char[MAX_HEADER_LENGTH];
+
+ int count = 0;
+ while (in.readable())
+ {
+ byte b = in.readByte();
+
+ if (b == (byte)'\n')
+ {
+ break;
+ }
+ else
+ {
+ chars[count++] = (char)b;
+ }
+ }
+ return new String(chars, 0, count);
+ }
}
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-01-28
13:26:39 UTC (rev 8854)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-01-28
13:27:55 UTC (rev 8855)
@@ -191,7 +191,7 @@
{
send(conn, response);
}
-
+
if (Stomp.Commands.DISCONNECT.equals(command))
{
conn.destroy();
@@ -285,7 +285,7 @@
private StompFrame onAck(StompFrame frame, HornetQServer server, StompConnection
connection) throws Exception
{
Map<String, Object> headers = frame.getHeaders();
- String messageID = (String)headers.get(Stomp.Headers.Ack.MESSAGE_ID);
+ String messageID = (String)headers.get(Stomp.Headers.Ack.MESSAGE_ID);
String txID = (String)headers.get(Stomp.Headers.TRANSACTION);
StompSession stompSession = null;
if (txID != null)
@@ -294,7 +294,7 @@
}
stompSession = getSession(connection);
stompSession.acknowledge(messageID);
-
+
return null;
}
@@ -537,5 +537,47 @@
}
}
+ public void cleanup(StompConnection connection)
+ {
+ connection.setValid(false);
+
+ StompSession session = sessions.remove(connection);
+ if (session != null)
+ {
+ try
+ {
+ session.getSession().rollback(true);
+ session.getSession().close();
+ session.getSession().runConnectionFailureRunners();
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ }
+ }
+
+ // removed the transacted session belonging to the connection
+ Iterator<Entry<String, StompSession>> iterator =
transactedSessions.entrySet().iterator();
+ while (iterator.hasNext())
+ {
+ Map.Entry<String, StompSession> entry = (Map.Entry<String,
StompSession>)iterator.next();
+ if (entry.getValue().getConnection() == connection)
+ {
+ ServerSession serverSession = entry.getValue().getSession();
+ try
+ {
+ serverSession.rollback(true);
+ serverSession.close();
+ serverSession.runConnectionFailureRunners();
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ }
+ iterator.remove();
+ }
+ }
+ }
+
// Inner classes -------------------------------------------------
}
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java 2010-01-28 13:26:39
UTC (rev 8854)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java 2010-01-28 13:27:55
UTC (rev 8855)
@@ -97,10 +97,10 @@
buffer.readBytes(data);
headers.put(Headers.CONTENT_LENGTH, data.length);
}
-
+
StompFrame frame = new StompFrame(Stomp.Responses.MESSAGE, headers, data);
StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame,
deliveryCount);
-
+
int length = manager.send(connection, frame);
if (subscription.getAck().equals(Stomp.Headers.Subscribe.AckModeValues.AUTO))
@@ -150,14 +150,11 @@
SimpleString queue = SimpleString.toSimpleString(destination);
if (destination.startsWith(StompUtils.HQ_TOPIC_PREFIX))
{
- //subscribes to a topic
+ // subscribes to a topic
queue = UUIDGenerator.getInstance().generateSimpleStringUUID();
session.createQueue(SimpleString.toSimpleString(destination), queue, null, true,
false);
}
- session.createConsumer(consumerID,
- queue,
- SimpleString.toSimpleString(selector),
- false);
+ session.createConsumer(consumerID, queue, SimpleString.toSimpleString(selector),
false);
session.receiveConsumerCredits(consumerID, -1);
StompSubscription subscription = new StompSubscription(subscriptionID, destination,
ack);
subscriptions.put(consumerID, subscription);
@@ -185,7 +182,7 @@
}
boolean containsSubscription(String subscriptionID)
- {
+ {
Iterator<Entry<Long, StompSubscription>> iterator =
subscriptions.entrySet().iterator();
while (iterator.hasNext())
{
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompSubscription.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompSubscription.java 2010-01-28
13:26:39 UTC (rev 8854)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompSubscription.java 2010-01-28
13:27:55 UTC (rev 8855)
@@ -54,7 +54,7 @@
{
return destination;
}
-
+
public String getID()
{
return subID;
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompUtils.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompUtils.java 2010-01-28 13:26:39 UTC
(rev 8854)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompUtils.java 2010-01-28 13:27:55 UTC
(rev 8855)
@@ -124,11 +124,11 @@
public static void copyStandardHeadersFromFrameToMessage(StompFrame frame,
ServerMessageImpl msg) throws Exception
{
Map<String, Object> headers = new HashMap<String,
Object>(frame.getHeaders());
-
+
String priority = (String)headers.remove(Stomp.Headers.Send.PRIORITY);
if (priority != null)
{
- msg.setPriority(Byte.parseByte(priority));
+ msg.setPriority(Byte.parseByte(priority));
}
String persistent = (String)headers.remove(Stomp.Headers.Send.PERSISTENT);
if (persistent != null)
@@ -160,34 +160,39 @@
}
}
- public static void copyStandardHeadersFromMessageToFrame(Message message, StompFrame
command, int deliveryCount) throws Exception {
+ public static void copyStandardHeadersFromMessageToFrame(Message message, StompFrame
command, int deliveryCount) throws Exception
+ {
final Map<String, Object> headers = command.getHeaders();
headers.put(Stomp.Headers.Message.DESTINATION,
toStompDestination(message.getAddress().toString()));
headers.put(Stomp.Headers.Message.MESSAGE_ID, message.getMessageID());
- if (message.getObjectProperty("JMSCorrelationID") != null) {
- headers.put(Stomp.Headers.Message.CORRELATION_ID,
message.getObjectProperty("JMSCorrelationID"));
+ if (message.getObjectProperty("JMSCorrelationID") != null)
+ {
+ headers.put(Stomp.Headers.Message.CORRELATION_ID,
message.getObjectProperty("JMSCorrelationID"));
}
headers.put(Stomp.Headers.Message.EXPIRATION_TIME, "" +
message.getExpiration());
headers.put(Stomp.Headers.Message.REDELIVERED, deliveryCount > 1);
headers.put(Stomp.Headers.Message.PRORITY, "" + message.getPriority());
- if (message.getStringProperty(ClientMessageImpl.REPLYTO_HEADER_NAME) != null) {
- headers.put(Stomp.Headers.Message.REPLY_TO,
toStompDestination(message.getStringProperty(ClientMessageImpl.REPLYTO_HEADER_NAME)));
+ if (message.getStringProperty(ClientMessageImpl.REPLYTO_HEADER_NAME) != null)
+ {
+ headers.put(Stomp.Headers.Message.REPLY_TO,
+
toStompDestination(message.getStringProperty(ClientMessageImpl.REPLYTO_HEADER_NAME)));
}
headers.put(Stomp.Headers.Message.TIMESTAMP, "" +
message.getTimestamp());
- if (message.getObjectProperty("JMSType") != null) {
- headers.put(Stomp.Headers.Message.TYPE,
message.getObjectProperty("JMSType"));
+ if (message.getObjectProperty("JMSType") != null)
+ {
+ headers.put(Stomp.Headers.Message.TYPE,
message.getObjectProperty("JMSType"));
}
// now lets add all the message headers
Set<SimpleString> names = message.getPropertyNames();
for (SimpleString name : names)
{
- headers.put(name.toString(), message.getObjectProperty(name));
+ headers.put(name.toString(), message.getObjectProperty(name));
}
- }
+ }
// Constructors --------------------------------------------------
// Public --------------------------------------------------------