Author: gaohoward
Date: 2011-11-23 00:02:11 -0500 (Wed, 23 Nov 2011)
New Revision: 11744
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
Log:
refactoring
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java 2011-11-22
21:37:07 UTC (rev 11743)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java 2011-11-23
05:02:11 UTC (rev 11744)
@@ -16,10 +16,13 @@
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.message.impl.MessageImpl;
+import org.hornetq.core.protocol.stomp.Stomp.Headers;
import org.hornetq.core.protocol.stomp.v10.StompFrameHandlerV10;
import org.hornetq.core.protocol.stomp.v11.StompFrameHandlerV11;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.impl.ServerMessageImpl;
+import org.hornetq.utils.DataConstants;
/**
*
@@ -133,9 +136,6 @@
return receipt;
}
- public abstract StompFrame createMessageFrame(ServerMessage serverMessage,
- StompSubscription subscription, int deliveryCount) throws Exception;
-
public abstract StompFrame createStompFrame(String command);
public abstract StompFrame decode(StompDecoder decoder, final HornetQBuffer buffer)
throws HornetQStompException;
@@ -298,4 +298,56 @@
return response;
}
+ public StompFrame createMessageFrame(ServerMessage serverMessage,
+ StompSubscription subscription, int deliveryCount) throws Exception
+ {
+ StompFrame frame = createStompFrame(Stomp.Responses.MESSAGE);
+
+ if (subscription.getID() != null)
+ {
+ frame.addHeader(Stomp.Headers.Message.SUBSCRIPTION,
+ subscription.getID());
+ }
+
+ synchronized (serverMessage)
+ {
+
+ HornetQBuffer buffer = serverMessage.getBodyBuffer();
+
+ int bodyPos = serverMessage.getEndOfBodyPosition() == -1 ? buffer
+ .writerIndex() : serverMessage.getEndOfBodyPosition();
+ int size = bodyPos - buffer.readerIndex();
+ buffer.readerIndex(MessageImpl.BUFFER_HEADER_SPACE
+ + DataConstants.SIZE_INT);
+ byte[] data = new byte[size];
+
+ if (serverMessage.containsProperty(Stomp.Headers.CONTENT_LENGTH)
+ || serverMessage.getType() == Message.BYTES_TYPE)
+ {
+ frame.addHeader(Headers.CONTENT_LENGTH, String.valueOf(data.length));
+ buffer.readBytes(data);
+ }
+ else
+ {
+ SimpleString text = buffer.readNullableSimpleString();
+ if (text != null)
+ {
+ data = text.toString().getBytes("UTF-8");
+ }
+ else
+ {
+ data = new byte[0];
+ }
+ }
+ frame.setByteBody(data);
+
+ serverMessage.getBodyBuffer().resetReaderIndex();
+
+ StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame,
+ deliveryCount);
+ }
+
+ return frame;
+ }
+
}
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java 2011-11-22
21:37:07 UTC (rev 11743)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java 2011-11-23
05:02:11 UTC (rev 11744)
@@ -171,56 +171,6 @@
}
@Override
- public StompFrame createMessageFrame(ServerMessage serverMessage,
- StompSubscription subscription, int deliveryCount) throws Exception
- {
- StompFrame frame = new StompFrame(Stomp.Responses.MESSAGE);
-
- if (subscription.getID() != null)
- {
- frame.addHeader(Stomp.Headers.Message.SUBSCRIPTION, subscription.getID());
- }
-
- synchronized(serverMessage)
- {
-
- HornetQBuffer buffer = serverMessage.getBodyBuffer();
-
- int bodyPos = serverMessage.getEndOfBodyPosition() == -1 ? buffer.writerIndex()
- :
serverMessage.getEndOfBodyPosition();
- int size = bodyPos - buffer.readerIndex();
- buffer.readerIndex(MessageImpl.BUFFER_HEADER_SPACE + DataConstants.SIZE_INT);
- byte[] data = new byte[size];
-
- if (serverMessage.containsProperty(Stomp.Headers.CONTENT_LENGTH) ||
serverMessage.getType() == Message.BYTES_TYPE)
- {
- frame.addHeader(Headers.CONTENT_LENGTH, String.valueOf(data.length));
- buffer.readBytes(data);
- }
- else
- {
- SimpleString text = buffer.readNullableSimpleString();
- if (text != null)
- {
- data = text.toString().getBytes("UTF-8");
- }
- else
- {
- data = new byte[0];
- }
- }
- frame.setByteBody(data);
-
- serverMessage.getBodyBuffer().resetReaderIndex();
-
- StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame,
deliveryCount);
- }
-
- return frame;
-
- }
-
- @Override
public StompFrame createStompFrame(String command)
{
return new StompFrameV10(command);
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2011-11-22
21:37:07 UTC (rev 11743)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2011-11-23
05:02:11 UTC (rev 11744)
@@ -247,53 +247,6 @@
}
@Override
- public StompFrame createMessageFrame(ServerMessage serverMessage,
- StompSubscription subscription, int deliveryCount)
- throws Exception
- {
- StompFrame frame = new StompFrameV11(Stomp.Responses.MESSAGE);
-
- if (subscription.getID() != null)
- {
- frame.addHeader(Stomp.Headers.Message.SUBSCRIPTION, subscription.getID());
- }
-
- HornetQBuffer buffer = serverMessage.getBodyBuffer();
-
- int bodyPos = serverMessage.getEndOfBodyPosition() == -1 ? buffer.writerIndex()
- :
serverMessage.getEndOfBodyPosition();
- int size = bodyPos - buffer.readerIndex();
- buffer.readerIndex(MessageImpl.BUFFER_HEADER_SPACE + DataConstants.SIZE_INT);
- byte[] data = new byte[size];
- if (serverMessage.containsProperty(Stomp.Headers.CONTENT_LENGTH) ||
serverMessage.getType() == Message.BYTES_TYPE)
- {
- frame.addHeader(Stomp.Headers.CONTENT_LENGTH, String.valueOf(data.length));
- buffer.readBytes(data);
- }
- else
- {
- SimpleString text = buffer.readNullableSimpleString();
- if (text != null)
- {
- data = text.toString().getBytes("UTF-8");
- }
- else
- {
- data = new byte[0];
- }
- }
-
- frame.setByteBody(data);
-
- serverMessage.getBodyBuffer().resetReaderIndex();
-
- StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame,
deliveryCount);
-
- return frame;
-
- }
-
- @Override
public void replySent(StompFrame reply)
{
if (reply.getCommand().equals(Stomp.Responses.CONNECTED))