Author: gaohoward
Date: 2011-11-21 23:12:00 -0500 (Mon, 21 Nov 2011)
New Revision: 11737
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
Log:
code refactor
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java 2011-11-22
02:33:35 UTC (rev 11736)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java 2011-11-22
04:12:00 UTC (rev 11737)
@@ -12,13 +12,14 @@
*/
package org.hornetq.core.protocol.stomp;
-import java.io.UnsupportedEncodingException;
-
import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.Message;
+import org.hornetq.api.core.SimpleString;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.protocol.stomp.v10.StompFrameHandlerV10;
import org.hornetq.core.protocol.stomp.v11.StompFrameHandlerV11;
import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.impl.ServerMessageImpl;
/**
*
@@ -113,12 +114,7 @@
public abstract StompFrame onConnect(StompFrame frame);
public abstract StompFrame onDisconnect(StompFrame frame);
- public abstract StompFrame onSend(StompFrame frame);
public abstract StompFrame onAck(StompFrame request);
- public abstract StompFrame onBegin(StompFrame frame);
- public abstract StompFrame onCommit(StompFrame request);
- public abstract StompFrame onAbort(StompFrame request);
- public abstract StompFrame onSubscribe(StompFrame request);
public abstract StompFrame onUnsubscribe(StompFrame request);
public abstract StompFrame onStomp(StompFrame request);
public abstract StompFrame onNack(StompFrame request);
@@ -145,5 +141,141 @@
public abstract StompFrame createStompFrame(String command);
public abstract StompFrame decode(StompDecoder decoder, final HornetQBuffer buffer)
throws HornetQStompException;
+
+ public StompFrame onCommit(StompFrame request)
+ {
+ StompFrame response = null;
+
+ String txID = request.getHeader(Stomp.Headers.TRANSACTION);
+ if (txID == null)
+ {
+ response = new HornetQStompException("transaction header is mandatory to
COMMIT a transaction").getFrame();
+ return response;
+ }
+ try
+ {
+ connection.commitTransaction(txID);
+ }
+ catch (HornetQStompException e)
+ {
+ response = e.getFrame();
+ }
+ return response;
+ }
+
+ public StompFrame onSend(StompFrame frame)
+ {
+ StompFrame response = null;
+ try
+ {
+ connection.validate();
+ String destination = frame.getHeader(Stomp.Headers.Send.DESTINATION);
+ String txID = frame.getHeader(Stomp.Headers.TRANSACTION);
+
+ long timestamp = System.currentTimeMillis();
+
+ ServerMessageImpl message = connection.createServerMessage();
+ message.setTimestamp(timestamp);
+ message.setAddress(SimpleString.toSimpleString(destination));
+ StompUtils.copyStandardHeadersFromFrameToMessage(frame, message);
+ if (frame.hasHeader(Stomp.Headers.CONTENT_LENGTH))
+ {
+ message.setType(Message.BYTES_TYPE);
+ message.getBodyBuffer().writeBytes(frame.getBodyAsBytes());
+ }
+ else
+ {
+ message.setType(Message.TEXT_TYPE);
+ String text = frame.getBody();
+
message.getBodyBuffer().writeNullableSimpleString(SimpleString.toSimpleString(text));
+ }
+
+ connection.sendServerMessage(message, txID);
+ }
+ catch (HornetQStompException e)
+ {
+ response = e.getFrame();
+ }
+ catch (Exception e)
+ {
+ response = new HornetQStompException("Error handling send",
e).getFrame();
+ }
+
+ return response;
+ }
+
+ public StompFrame onBegin(StompFrame frame)
+ {
+ StompFrame response = null;
+ String txID = frame.getHeader(Stomp.Headers.TRANSACTION);
+ if (txID == null)
+ {
+ response = new HornetQStompException("Need a transaction id to
begin").getFrame();
+ }
+ else
+ {
+ try
+ {
+ connection.beginTransaction(txID);
+ }
+ catch (HornetQStompException e)
+ {
+ response = e.getFrame();
+ }
+ }
+ return response;
+ }
+
+ public StompFrame onAbort(StompFrame request)
+ {
+ StompFrame response = null;
+ String txID = request.getHeader(Stomp.Headers.TRANSACTION);
+
+ if (txID == null)
+ {
+ response = new HornetQStompException("transaction header is mandatory to
ABORT a transaction").getFrame();
+ return response;
+ }
+
+ try
+ {
+ connection.abortTransaction(txID);
+ }
+ catch (HornetQStompException e)
+ {
+ response = e.getFrame();
+ }
+
+ return response;
+ }
+
+ public StompFrame onSubscribe(StompFrame request)
+ {
+ StompFrame response = null;
+ String destination = request.getHeader(Stomp.Headers.Subscribe.DESTINATION);
+
+ String selector = request.getHeader(Stomp.Headers.Subscribe.SELECTOR);
+ String ack = request.getHeader(Stomp.Headers.Subscribe.ACK_MODE);
+ String id = request.getHeader(Stomp.Headers.Subscribe.ID);
+ String durableSubscriptionName =
request.getHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIBER_NAME);
+ boolean noLocal = false;
+
+ if (request.hasHeader(Stomp.Headers.Subscribe.NO_LOCAL))
+ {
+ noLocal =
Boolean.parseBoolean(request.getHeader(Stomp.Headers.Subscribe.NO_LOCAL));
+ }
+
+ try
+ {
+ connection.subscribe(destination, selector, ack, id, durableSubscriptionName,
noLocal);
+ }
+ catch (HornetQStompException e)
+ {
+ response = e.getFrame();
+ }
+
+ return response;
+ }
+
}
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java 2011-11-22
02:33:35 UTC (rev 11736)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java 2011-11-22
04:12:00 UTC (rev 11737)
@@ -31,7 +31,6 @@
import org.hornetq.core.protocol.stomp.VersionedStompFrameHandler;
import org.hornetq.core.protocol.stomp.Stomp.Headers;
import org.hornetq.core.server.ServerMessage;
-import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.utils.DataConstants;
/**
@@ -101,147 +100,6 @@
}
@Override
- public StompFrame onSend(StompFrame frame)
- {
- StompFrame response = null;
- try
- {
- connection.validate();
- String destination = frame.getHeader(Stomp.Headers.Send.DESTINATION);
- String txID = frame.getHeader(Stomp.Headers.TRANSACTION);
-
- long timestamp = System.currentTimeMillis();
-
- ServerMessageImpl message = connection.createServerMessage();
- message.setTimestamp(timestamp);
- message.setAddress(SimpleString.toSimpleString(destination));
- StompUtils.copyStandardHeadersFromFrameToMessage(frame, message);
- if (frame.hasHeader(Stomp.Headers.CONTENT_LENGTH))
- {
- message.setType(Message.BYTES_TYPE);
- message.getBodyBuffer().writeBytes(frame.getBodyAsBytes());
- }
- else
- {
- message.setType(Message.TEXT_TYPE);
- String text = frame.getBody();
-
message.getBodyBuffer().writeNullableSimpleString(SimpleString.toSimpleString(text));
- }
-
- connection.sendServerMessage(message, txID);
- }
- catch (HornetQStompException e)
- {
- response = e.getFrame();
- }
- catch (Exception e)
- {
- response = new HornetQStompException("Error handling send",
e).getFrame();
- }
-
- return response;
- }
-
- @Override
- public StompFrame onBegin(StompFrame frame)
- {
- StompFrame response = null;
- String txID = frame.getHeader(Stomp.Headers.TRANSACTION);
- if (txID == null)
- {
- response = new HornetQStompException("Need a transaction id to
begin").getFrame();
- }
- else
- {
- try
- {
- connection.beginTransaction(txID);
- }
- catch (HornetQStompException e)
- {
- response = e.getFrame();
- }
- }
- return response;
- }
-
- @Override
- public StompFrame onCommit(StompFrame request)
- {
- StompFrame response = null;
-
- String txID = request.getHeader(Stomp.Headers.TRANSACTION);
- if (txID == null)
- {
- response = new HornetQStompException("transaction header is mandatory to
COMMIT a transaction").getFrame();
- return response;
- }
-
- try
- {
- connection.commitTransaction(txID);
- }
- catch (HornetQStompException e)
- {
- response = e.getFrame();
- }
- return response;
- }
-
- @Override
- public StompFrame onAbort(StompFrame request)
- {
- StompFrame response = null;
- String txID = request.getHeader(Stomp.Headers.TRANSACTION);
-
- if (txID == null)
- {
- response = new HornetQStompException("transaction header is mandatory to
ABORT a transaction").getFrame();
- return response;
- }
-
- try
- {
- connection.abortTransaction(txID);
- }
- catch (HornetQStompException e)
- {
- response = e.getFrame();
- }
-
- return response;
- }
-
- @Override
- public StompFrame onSubscribe(StompFrame request)
- {
- StompFrame response = null;
- String destination = request.getHeader(Stomp.Headers.Subscribe.DESTINATION);
-
- String selector = request.getHeader(Stomp.Headers.Subscribe.SELECTOR);
- String ack = request.getHeader(Stomp.Headers.Subscribe.ACK_MODE);
- String id = request.getHeader(Stomp.Headers.Subscribe.ID);
- String durableSubscriptionName =
request.getHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIBER_NAME);
- boolean noLocal = false;
-
- if (request.hasHeader(Stomp.Headers.Subscribe.NO_LOCAL))
- {
- noLocal =
Boolean.parseBoolean(request.getHeader(Stomp.Headers.Subscribe.NO_LOCAL));
- }
-
- try
- {
- connection.subscribe(destination, selector, ack, id, durableSubscriptionName,
noLocal);
- }
- catch (HornetQStompException e)
- {
- response = e.getFrame();
- }
-
- return response;
- }
-
- @Override
public StompFrame onUnsubscribe(StompFrame request)
{
StompFrame response = null;
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2011-11-22
02:33:35 UTC (rev 11736)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2011-11-22
04:12:00 UTC (rev 11737)
@@ -17,7 +17,6 @@
import java.util.concurrent.atomic.AtomicLong;
import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.api.core.HornetQBuffers;
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.logging.Logger;
@@ -33,7 +32,6 @@
import org.hornetq.core.protocol.stomp.StompUtils;
import org.hornetq.core.protocol.stomp.VersionedStompFrameHandler;
import org.hornetq.core.server.ServerMessage;
-import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.utils.DataConstants;
/**
@@ -193,147 +191,6 @@
}
@Override
- public StompFrame onSend(StompFrame frame)
- {
- StompFrame response = null;
- try
- {
- connection.validate();
- String destination = frame.getHeader(Stomp.Headers.Send.DESTINATION);
- String txID = frame.getHeader(Stomp.Headers.TRANSACTION);
-
- long timestamp = System.currentTimeMillis();
-
- ServerMessageImpl message = connection.createServerMessage();
- message.setTimestamp(timestamp);
- message.setAddress(SimpleString.toSimpleString(destination));
- StompUtils.copyStandardHeadersFromFrameToMessage(frame, message);
- if (frame.hasHeader(Stomp.Headers.CONTENT_LENGTH))
- {
- message.setType(Message.BYTES_TYPE);
- message.getBodyBuffer().writeBytes(frame.getBodyAsBytes());
- }
- else
- {
- message.setType(Message.TEXT_TYPE);
- String text = frame.getBody();
-
message.getBodyBuffer().writeNullableSimpleString(SimpleString.toSimpleString(text));
- }
-
- connection.sendServerMessage(message, txID);
- }
- catch (HornetQStompException e)
- {
- response = e.getFrame();
- }
- catch (Exception e)
- {
- response = new HornetQStompException("Error handling send",
e).getFrame();
- }
-
- return response;
- }
-
- @Override
- public StompFrame onBegin(StompFrame frame)
- {
- StompFrame response = null;
- String txID = frame.getHeader(Stomp.Headers.TRANSACTION);
- if (txID == null)
- {
- response = new HornetQStompException("Need a transaction id to
begin").getFrame();
- }
- else
- {
- try
- {
- connection.beginTransaction(txID);
- }
- catch (HornetQStompException e)
- {
- response = e.getFrame();
- }
- }
- return response;
- }
-
- @Override
- public StompFrame onCommit(StompFrame request)
- {
- StompFrame response = null;
-
- String txID = request.getHeader(Stomp.Headers.TRANSACTION);
- if (txID == null)
- {
- response = new HornetQStompException("transaction header is mandatory to
COMMIT a transaction").getFrame();
- return response;
- }
-
- try
- {
- connection.commitTransaction(txID);
- }
- catch (HornetQStompException e)
- {
- response = e.getFrame();
- }
- return response;
- }
-
- @Override
- public StompFrame onAbort(StompFrame request)
- {
- StompFrame response = null;
- String txID = request.getHeader(Stomp.Headers.TRANSACTION);
-
- if (txID == null)
- {
- response = new HornetQStompException("transaction header is mandatory to
ABORT a transaction").getFrame();
- return response;
- }
-
- try
- {
- connection.abortTransaction(txID);
- }
- catch (HornetQStompException e)
- {
- response = e.getFrame();
- }
-
- return response;
- }
-
- @Override
- public StompFrame onSubscribe(StompFrame request)
- {
- StompFrame response = null;
- String destination = request.getHeader(Stomp.Headers.Subscribe.DESTINATION);
-
- String selector = request.getHeader(Stomp.Headers.Subscribe.SELECTOR);
- String ack = request.getHeader(Stomp.Headers.Subscribe.ACK_MODE);
- String id = request.getHeader(Stomp.Headers.Subscribe.ID);
- String durableSubscriptionName =
request.getHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIBER_NAME);
- boolean noLocal = false;
-
- if (request.hasHeader(Stomp.Headers.Subscribe.NO_LOCAL))
- {
- noLocal =
Boolean.parseBoolean(request.getHeader(Stomp.Headers.Subscribe.NO_LOCAL));
- }
-
- try
- {
- connection.subscribe(destination, selector, ack, id, durableSubscriptionName,
noLocal);
- }
- catch (HornetQStompException e)
- {
- response = e.getFrame();
- }
-
- return response;
- }
-
- @Override
public StompFrame onUnsubscribe(StompFrame request)
{
StompFrame response = null;