Author: jmesnil
Date: 2010-01-26 08:31:50 -0500 (Tue, 26 Jan 2010)
New Revision: 8844
Added:
trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompSubscription.java
Removed:
trunk/src/main/org/hornetq/core/protocol/stomp/StompSessionCallback.java
Modified:
trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompFrame.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompUtils.java
trunk/src/main/org/hornetq/core/server/ServerSession.java
trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-129: Implement STOMP v1.0
* added missing Stomp commands and responses
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java 2010-01-22
15:32:13 UTC (rev 8843)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java 2010-01-26
13:31:50 UTC (rev 8844)
@@ -33,15 +33,23 @@
*
*
*/
-public class StompConnection implements RemotingConnection
+class StompConnection implements RemotingConnection
{
private static final Logger log = Logger.getLogger(StompConnection.class);
- private final ProtocolManager manager;
+ private final StompProtocolManager manager;
private final Connection transportConnection;
- StompConnection(final Connection transportConnection, final ProtocolManager manager)
+ private String login;
+
+ private String passcode;
+
+ private String clientID;
+
+ private boolean valid;
+
+ StompConnection(final Connection transportConnection, final StompProtocolManager
manager)
{
this.transportConnection = transportConnection;
@@ -68,6 +76,8 @@
public void destroy()
{
+ manager.cleanup(this);
+ transportConnection.close();
}
public void disconnect()
@@ -84,7 +94,7 @@
public List<FailureListener> getFailureListeners()
{
- return Collections.EMPTY_LIST;
+ return Collections.emptyList();
}
public Object getID()
@@ -132,9 +142,38 @@
manager.handleBuffer(this, buffer);
}
- public int isReadyToHandle(HornetQBuffer buffer)
+ public void setLogin(String login)
{
- return -1;
+ this.login = login;
}
+ public String getLogin()
+ {
+ return login;
+ }
+
+ public void setPasscode(String passcode)
+ {
+ this.passcode = passcode;
+ }
+
+ public String getPasscode()
+ {
+ return passcode;
+ }
+
+ public void setClientID(String clientID)
+ {
+ this.clientID = clientID;
+ }
+
+ public boolean isValid()
+ {
+ return valid;
+ }
+
+ public void setValid(boolean valid)
+ {
+ this.valid = valid;
+ }
}
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompFrame.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompFrame.java 2010-01-22 15:32:13 UTC
(rev 8843)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompFrame.java 2010-01-26 13:31:50 UTC
(rev 8844)
@@ -65,6 +65,6 @@
@Override
public String toString()
{
- return "StompFrame[command=" + command + ", headers=" + headers
+ ",content-length=" + content.length + "]";
+ return "StompFrame[command=" + command + ", headers=" + headers
+ ", content-length=" + content.length + "]";
}
}
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java 2010-01-22
15:32:13 UTC (rev 8843)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java 2010-01-26
13:31:50 UTC (rev 8844)
@@ -31,7 +31,7 @@
* Implements marshalling and unmarsalling the <a
href="http://stomp.codehaus.org/">Stomp</a> protocol.
*/
class StompMarshaller {
- private static final byte[] NO_DATA = new byte[]{};
+ public static final byte[] NO_DATA = new byte[]{};
private static final byte[] END_OF_FRAME = new byte[]{0, '\n'};
private static final int MAX_COMMAND_LENGTH = 1024;
private static final int MAX_HEADER_LENGTH = 1024 * 10;
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-01-22
15:32:13 UTC (rev 8843)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-01-26
13:31:50 UTC (rev 8844)
@@ -31,7 +31,6 @@
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.ServerSession;
import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.spi.core.protocol.ConnectionEntry;
@@ -43,28 +42,64 @@
/**
* StompProtocolManager
*
- * A stupid protocol to demonstrate how to implement a new protocol in HornetQ
- *
- * @author Tim Fox
- *
- *
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
*/
-public class StompProtocolManager implements ProtocolManager
+class StompProtocolManager implements ProtocolManager
{
+ // Constants -----------------------------------------------------
+
private static final Logger log = Logger.getLogger(StompProtocolManager.class);
+ // Attributes ----------------------------------------------------
+
private final HornetQServer server;
private final StompMarshaller marshaller;
- private final Map<RemotingConnection, ServerSession> sessions = new
HashMap<RemotingConnection, ServerSession>();
+ private final Map<String, StompSession> transactedSessions = new
HashMap<String, StompSession>();
+ private final Map<RemotingConnection, StompSession> sessions = new
HashMap<RemotingConnection, StompSession>();
+
+ // Static --------------------------------------------------------
+
+ private static StompFrame createError(Exception e, StompFrame request)
+ {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try
+ {
+ // Let the stomp client know about any protocol errors.
+ PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos,
"UTF-8"));
+ e.printStackTrace(stream);
+ stream.close();
+
+ Map<String, Object> headers = new HashMap<String, Object>();
+ headers.put(Stomp.Headers.Error.MESSAGE, e.getMessage());
+
+ final String receiptId =
(String)request.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
+ if (receiptId != null)
+ {
+ headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
+ }
+
+ return new StompFrame(Stomp.Responses.ERROR, headers, baos.toByteArray());
+ }
+ catch (UnsupportedEncodingException ex)
+ {
+ log.warn("Unable to create ERROR frame from the exception", ex);
+ return null;
+ }
+ }
+
+ // Constructors --------------------------------------------------
+
public StompProtocolManager(final HornetQServer server, final List<Interceptor>
interceptors)
{
this.server = server;
this.marshaller = new StompMarshaller();
}
+ // ProtocolManager implementation --------------------------------
+
public ConnectionEntry createConnectionEntry(final Connection connection)
{
StompConnection conn = new StompConnection(connection, this);
@@ -76,160 +111,293 @@
{
}
+ public int isReadyToHandle(HornetQBuffer buffer)
+ {
+ return -1;
+ }
+
public void handleBuffer(RemotingConnection connection, HornetQBuffer buffer)
{
- StompFrame frame = null;
+ StompConnection conn = (StompConnection)connection;
+ StompFrame request = null;
try
{
- frame = marshaller.unmarshal(buffer);
- System.out.println("RECEIVED " + frame);
+ request = marshaller.unmarshal(buffer);
+ System.out.println("<<< " + request);
- String command = frame.getCommand();
+ String command = request.getCommand();
StompFrame response = null;
if (Stomp.Commands.CONNECT.equals(command))
{
- response = onConnect(frame, server, connection);
+ response = onConnect(request, server, conn);
}
else if (Stomp.Commands.DISCONNECT.equals(command))
{
- response = onDisconnect(frame, server, connection);
+ response = onDisconnect(request, server, conn);
}
else if (Stomp.Commands.SEND.equals(command))
{
- response = onSend(frame, server, connection);
+ response = onSend(request, server, conn);
}
else if (Stomp.Commands.SUBSCRIBE.equals(command))
{
- response = onSubscribe(frame, server, connection);
+ response = onSubscribe(request, server, conn);
}
+ else if (Stomp.Commands.UNSUBSCRIBE.equals(command))
+ {
+ response = onUnsubscribe(request, server, conn);
+ }
+ else if (Stomp.Commands.ACK.equals(command))
+ {
+ response = onAck(request, server, conn);
+ }
+ else if (Stomp.Commands.BEGIN.equals(command))
+ {
+ response = onBegin(request, server, conn);
+ }
+ else if (Stomp.Commands.COMMIT.equals(command))
+ {
+ response = onCommit(request, server, conn);
+ }
+ else if (Stomp.Commands.ABORT.equals(command))
+ {
+ response = onAbort(request, server, conn);
+ }
else
{
- log.error("Unsupported Stomp frame: " + frame);
+
+ log.error("Unsupported Stomp frame: " + request);
response = new StompFrame(Stomp.Responses.ERROR,
new HashMap<String, Object>(),
("Unsupported frame: " +
command).getBytes());
}
+ if (request.getHeaders().containsKey(Stomp.Headers.RECEIPT_REQUESTED))
+ {
+ if (response == null)
+ {
+ Map<String, Object> h = new HashMap<String, Object>();
+ response = new StompFrame(Stomp.Responses.RECEIPT, h,
StompMarshaller.NO_DATA);
+ }
+ response.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID,
request.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED));
+ }
+
if (response != null)
{
send(connection, response);
}
}
- catch (StompException ex)
+ catch (Exception e)
{
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- try
+ StompFrame error = createError(e, request);
+ if (error != null)
{
- // Let the stomp client know about any protocol errors.
- PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos,
"UTF-8"));
- ex.printStackTrace(stream);
- stream.close();
+ send(connection, error);
}
- catch (UnsupportedEncodingException e)
- {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
+ }
+ }
- Map<String, Object> headers = new HashMap<String, Object>();
- headers.put(Stomp.Headers.Error.MESSAGE, ex.getMessage());
+ // Public --------------------------------------------------------
- final String receiptId =
(String)frame.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
- if (receiptId != null)
- {
- headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
- }
+ // Package protected ---------------------------------------------
- StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR, headers,
baos.toByteArray());
- try
- {
- send(connection, errorMessage);
- }
- catch (IOException e)
- {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ private StompFrame onSubscribe(StompFrame frame, HornetQServer server, StompConnection
connection) throws Exception,
+
StompException,
+
HornetQException
+ {
+ Map<String, Object> headers = frame.getHeaders();
+ String destination = (String)headers.get(Stomp.Headers.Subscribe.DESTINATION);
+ String selector = (String)headers.get(Stomp.Headers.Subscribe.SELECTOR);
+ String ack = (String)headers.get(Stomp.Headers.Subscribe.ACK_MODE);
+ String subID =
(String)headers.get(Stomp.Headers.Subscribe.DURABLE_SUBSCRIPTION_NAME);
+
+ if (ack == null)
+ {
+ ack = Stomp.Headers.Subscribe.AckModeValues.AUTO;
}
- catch (Exception ex)
+ StompSession stompSession = getSession(connection);
+ long consumerID = server.getStorageManager().generateUniqueID();
+ stompSession.addSubscription(consumerID, subID, destination, selector, ack);
+
+ return null;
+ }
+
+ private StompFrame onUnsubscribe(StompFrame frame, HornetQServer server,
StompConnection connection) throws Exception,
+
StompException,
+
HornetQException
+ {
+ Map<String, Object> headers = frame.getHeaders();
+ String destination = (String)headers.get(Stomp.Headers.Unsubscribe.DESTINATION);
+ String id = (String)headers.get(Stomp.Headers.Unsubscribe.ID);
+
+ StompSession stompSession = getSession(connection);
+ stompSession.unsubscribe(destination);
+
+ return null;
+ }
+
+ private StompFrame onAck(StompFrame frame, HornetQServer server, StompConnection
connection) throws Exception,
+
StompException,
+
HornetQException
+ {
+ Map<String, Object> headers = frame.getHeaders();
+ String messageID = (String)headers.get(Stomp.Headers.Ack.MESSAGE_ID);
+ String txID = (String)headers.get(Stomp.Headers.TRANSACTION);
+
+ StompSession stompSession = getSession(connection);
+ stompSession.acknowledge(messageID);
+
+ return null;
+ }
+
+ private StompFrame onBegin(StompFrame frame, HornetQServer server, StompConnection
connection) throws Exception
+ {
+ Map<String, Object> headers = frame.getHeaders();
+ String txID = (String)headers.get(Stomp.Headers.TRANSACTION);
+
+ if (transactedSessions.containsKey(txID))
{
- ex.printStackTrace();
+ throw new StompException("Transaction already started: " + txID);
}
+ StompSession stompSession = getTransactedSession(connection, txID);
+ return null;
}
- private StompFrame onSubscribe(StompFrame frame, HornetQServer server,
RemotingConnection connection) throws Exception,
-
StompException,
-
HornetQException
+ private StompFrame onCommit(StompFrame frame, HornetQServer server, StompConnection
connection) throws Exception,
+
StompException,
+
HornetQException
{
Map<String, Object> headers = frame.getHeaders();
- String queue = (String)headers.get(Stomp.Headers.Send.DESTINATION);
- SimpleString queueName =
SimpleString.toSimpleString(StompUtils.toHornetQAddress(queue));
+ String txID = (String)headers.get(Stomp.Headers.TRANSACTION);
- ServerSession session = checkAndGetSession(connection);
- long consumerID = server.getStorageManager().generateUniqueID();
- session.createConsumer(consumerID, queueName, null, false);
- session.receiveConsumerCredits(consumerID, -1);
- session.start();
+ StompSession session = transactedSessions.remove(txID);
+ if (session == null)
+ {
+ throw new StompException("No transaction started: " + txID);
+ }
+
+ session.getSession().commit();
+
return null;
}
- private ServerSession checkAndGetSession(RemotingConnection connection) throws
StompException
+ private StompFrame onAbort(StompFrame frame, HornetQServer server, StompConnection
connection) throws Exception,
+
StompException,
+
HornetQException
{
- ServerSession session = sessions.get(connection);
+ Map<String, Object> headers = frame.getHeaders();
+ String txID = (String)headers.get(Stomp.Headers.TRANSACTION);
+
+ StompSession session = transactedSessions.remove(txID);
+
if (session == null)
{
+ throw new StompException("No transaction started: " + txID);
+ }
+ session.getSession().rollback(false);
+
+ return null;
+ }
+
+ private void checkConnected(StompConnection connection) throws StompException
+ {
+ if (!connection.isValid())
+ {
throw new StompException("Not connected");
}
- return session;
}
- private StompFrame onDisconnect(StompFrame frame, HornetQServer server,
RemotingConnection connection) throws StompException
+ private StompSession getSession(StompConnection connection) throws Exception
{
- ServerSession session = checkAndGetSession(connection);
+ StompSession stompSession = sessions.get(connection);
+ if (stompSession == null)
+ {
+ stompSession = new StompSession(marshaller, connection);
+ String name = UUIDGenerator.getInstance().generateStringUUID();
+ ServerSession session = server.createSession(name,
+ connection.getLogin(),
+ connection.getPasscode(),
+
HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ connection,
+ true,
+ false,
+ false,
+ false,
+ stompSession);
+ stompSession.setServerSession(session);
+ sessions.put(connection, stompSession);
+ }
+ return stompSession;
+ }
+
+ private StompSession getTransactedSession(StompConnection connection, String txID)
throws Exception
+ {
+ StompSession stompSession = transactedSessions.get(txID);
+ if (stompSession == null)
+ {
+ stompSession = new StompSession(marshaller, connection);
+ String name = UUIDGenerator.getInstance().generateStringUUID();
+ ServerSession session = server.createSession(name,
+ connection.getLogin(),
+ connection.getPasscode(),
+
HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ connection,
+ false,
+ false,
+ false,
+ false,
+ stompSession);
+ stompSession.setServerSession(session);
+ transactedSessions.put(txID, stompSession);
+ }
+ return stompSession;
+ }
+
+ private StompFrame onDisconnect(StompFrame frame, HornetQServer server,
StompConnection connection) throws Exception
+ {
+ StompSession session = getSession(connection);
if (session != null)
{
try
{
- session.close();
+ session.getSession().rollback(true);
+ session.getSession().close();
}
catch (Exception e)
{
throw new StompException(e.getMessage());
}
sessions.remove(connection);
+ connection.setValid(false);
}
return null;
}
- private StompFrame onSend(StompFrame frame, HornetQServer server, RemotingConnection
connection) throws Exception
+ private StompFrame onSend(StompFrame frame, HornetQServer server, StompConnection
connection) throws Exception
{
- ServerSession session = checkAndGetSession(connection);
-
+ checkConnected(connection);
Map<String, Object> headers = frame.getHeaders();
- String queue = (String)headers.get(Stomp.Headers.Send.DESTINATION);
- /*
- String type = (String)headers.get(Stomp.Headers.Send.TYPE);
- long expiration = (Long)headers.get(Stomp.Headers.Send.EXPIRATION_TIME);
- byte priority = (Byte)headers.get(Stomp.Headers.Send.PRIORITY);
- boolean durable = (Boolean)headers.get(Stomp.Headers.Send.PERSISTENT);
- */
+ String queue = (String)headers.remove(Stomp.Headers.Send.DESTINATION);
+ String txID = (String)headers.remove(Stomp.Headers.TRANSACTION);
byte type = Message.TEXT_TYPE;
if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH))
{
type = Message.BYTES_TYPE;
}
long timestamp = System.currentTimeMillis();
- boolean durable = false;
- long expiration = -1;
- byte priority = 9;
SimpleString address =
SimpleString.toSimpleString(StompUtils.toHornetQAddress(queue));
ServerMessageImpl message = new
ServerMessageImpl(server.getStorageManager().generateUniqueID(), 512);
message.setType(type);
message.setTimestamp(timestamp);
message.setAddress(address);
+ StompUtils.copyStandardHeadersFromFrameToMessage(frame, message);
byte[] content = frame.getContent();
if (type == Message.TEXT_TYPE)
{
@@ -240,56 +408,75 @@
message.getBodyBuffer().writeBytes(content);
}
- session.send(message);
- if (headers.containsKey(Stomp.Headers.RECEIPT_REQUESTED))
+ ServerSession session = null;
+ if (txID == null)
{
- Map<String, Object> h = new HashMap<String, Object>();
- h.put(Stomp.Headers.Response.RECEIPT_ID,
headers.get(Stomp.Headers.RECEIPT_REQUESTED));
- return new StompFrame(Stomp.Responses.RECEIPT, h, new byte[] {});
- }
- else
+ session = getSession(connection).getSession();
+ } else
{
- return null;
+ session = transactedSessions.get(txID).getSession();
}
+
+ session.send(message);
+ return null;
}
- private StompFrame onConnect(StompFrame frame, HornetQServer server, final
RemotingConnection connection) throws Exception
+ private StompFrame onConnect(StompFrame frame, HornetQServer server, final
StompConnection connection) throws Exception
{
Map<String, Object> headers = frame.getHeaders();
String login = (String)headers.get(Stomp.Headers.Connect.LOGIN);
String passcode = (String)headers.get(Stomp.Headers.Connect.PASSCODE);
+ String clientID = (String)headers.get(Stomp.Headers.Connect.CLIENT_ID);
String requestID = (String)headers.get(Stomp.Headers.Connect.REQUEST_ID);
- String name = UUIDGenerator.getInstance().generateStringUUID();
- ServerSession session = server.createSession(name,
- login,
- passcode,
-
HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
- connection,
- true,
- true,
- false,
- false,
- new StompSessionCallback(marshaller,
connection));
- sessions.put(connection, session);
- System.out.println(">>> created session " + session);
+ server.getSecurityManager().validateUser(login, passcode);
+
+ connection.setLogin(login);
+ connection.setPasscode(passcode);
+ connection.setClientID(clientID);
+ connection.setValid(true);
+
HashMap<String, Object> h = new HashMap<String, Object>();
- h.put(Stomp.Headers.Connected.SESSION, name);
- h.put(Stomp.Headers.Connected.RESPONSE_ID, requestID);
- return new StompFrame(Stomp.Responses.CONNECTED, h, new byte[] {});
+ h.put(Stomp.Headers.Connected.SESSION, connection.getID());
+ if (requestID != null)
+ {
+ h.put(Stomp.Headers.Connected.RESPONSE_ID, requestID);
+ }
+ return new StompFrame(Stomp.Responses.CONNECTED, h, StompMarshaller.NO_DATA);
}
- private void send(RemotingConnection connection, StompFrame frame) throws IOException
+ private void send(RemotingConnection connection, StompFrame frame)
{
- System.out.println("SENDING >>> " + frame);
- byte[] bytes = marshaller.marshal(frame);
- HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
- System.out.println("ready to send reply: " + buffer);
- connection.getTransportConnection().write(buffer, true);
+ System.out.println(">>> " + frame);
+
+ try
+ {
+ byte[] bytes = marshaller.marshal(frame);
+ HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
+ connection.getTransportConnection().write(buffer, true);
+ }
+ catch (IOException e)
+ {
+ log.error("Unable to send frame " + frame, e);
+ }
}
- public int isReadyToHandle(HornetQBuffer buffer)
+ public synchronized void cleanup(StompConnection conn)
{
- return -1;
+ StompSession session = sessions.remove(conn);
+ if (session != null)
+ {
+ try
+ {
+ session.getSession().rollback(true);
+ session.getSession().close();
+ }
+ catch (Exception e)
+ {
+ log.error(e);
+ }
+ }
}
+
+ // Inner classes -------------------------------------------------
}
Copied: trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java (from rev 8843,
trunk/src/main/org/hornetq/core/protocol/stomp/StompSessionCallback.java)
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
(rev 0)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java 2010-01-26 13:31:50
UTC (rev 8844)
@@ -0,0 +1,178 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.protocol.stomp;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.api.core.Message;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.message.impl.MessageImpl;
+import org.hornetq.core.protocol.stomp.Stomp.Headers;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.ServerSession;
+import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.spi.core.protocol.SessionCallback;
+
+/**
+ * A StompSession
+ *
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ */
+class StompSession implements SessionCallback
+{
+ private final RemotingConnection connection;
+
+ private final StompMarshaller marshaller;
+
+ private ServerSession session;
+
+ private final Map<Long, StompSubscription> subscriptions = new HashMap<Long,
StompSubscription>();
+
+ // key = message ID, value = consumer ID
+ private final Map<Long, Long> messagesToAck = new HashMap<Long, Long>();
+
+ StompSession(final StompMarshaller marshaller, final RemotingConnection connection)
+ {
+ this.marshaller = marshaller;
+ this.connection = connection;
+ }
+
+ void setServerSession(ServerSession session)
+ {
+ this.session = session;
+ }
+
+ public ServerSession getSession()
+ {
+ return session;
+ }
+
+ public void sendProducerCreditsMessage(int credits, SimpleString address, int offset)
+ {
+ }
+
+ public int sendMessage(ServerMessage serverMessage, long consumerID, int
deliveryCount)
+ {
+ try
+ {
+ Map<String, Object> headers = new HashMap<String, Object>();
+ headers.put(Stomp.Headers.Message.DESTINATION,
StompUtils.toStompDestination(serverMessage.getAddress()
+
.toString()));
+ byte[] data = new byte[] {};
+ if (serverMessage.getType() == Message.TEXT_TYPE)
+ {
+ SimpleString text =
serverMessage.getBodyBuffer().readNullableSimpleString();
+ if (text != null)
+ {
+ data = text.toString().getBytes();
+ }
+ }
+ else
+ {
+ HornetQBuffer buffer = serverMessage.getBodyBuffer();
+ buffer.readerIndex(MessageImpl.BUFFER_HEADER_SPACE);
+ int size = serverMessage.getEndOfBodyPosition() - buffer.readerIndex();
+ data = new byte[size];
+ buffer.readBytes(data);
+ headers.put(Headers.CONTENT_LENGTH, data.length);
+ }
+ StompFrame frame = new StompFrame(Stomp.Responses.MESSAGE, headers, data);
+ StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame,
deliveryCount);
+ System.out.println(">>> " + frame);
+ byte[] bytes = marshaller.marshal(frame);
+ HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
+ connection.getTransportConnection().write(buffer, true);
+
+ StompSubscription subscription = subscriptions.get(consumerID);
+
+ if (subscription.getAck().equals(Stomp.Headers.Subscribe.AckModeValues.AUTO))
+ {
+ session.acknowledge(consumerID, serverMessage.getMessageID());
+ session.commit();
+ }
+ else
+ {
+ messagesToAck.put(serverMessage.getMessageID(), consumerID);
+ }
+ return bytes.length;
+
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ return 0;
+ }
+
+ }
+
+ public int sendLargeMessageContinuation(long consumerID, byte[] body, boolean
continues, boolean requiresResponse)
+ {
+ return 0;
+ }
+
+ public int sendLargeMessage(long consumerID, byte[] headerBuffer, long bodySize, int
deliveryCount)
+ {
+ return 0;
+ }
+
+ public void closed()
+ {
+ }
+
+ public void acknowledge(String messageID) throws Exception
+ {
+ long id = Long.parseLong(messageID);
+ long consumerID = messagesToAck.remove(id);
+ session.acknowledge(consumerID, id);
+ session.commit();
+ }
+
+ public void addSubscription(long consumerID, String clientID, String destination,
String selector, String ack) throws Exception
+ {
+ String queue = StompUtils.toHornetQAddress(destination);
+ synchronized (session)
+ {
+ session.createConsumer(consumerID,
+ SimpleString.toSimpleString(queue),
+ SimpleString.toSimpleString(selector),
+ false);
+ session.receiveConsumerCredits(consumerID, -1);
+ StompSubscription subscription = new StompSubscription(consumerID, clientID,
destination, ack);
+ subscriptions.put(consumerID, subscription);
+ // FIXME not very smart: since we can't start the consumer, we start the
session
+ // everytime to start the new consumer (and all previous consumers...)
+ session.start();
+ }
+ }
+
+ public void unsubscribe(String destination) throws Exception
+ {
+ Iterator<Entry<Long, StompSubscription>> iterator =
subscriptions.entrySet().iterator();
+ while (iterator.hasNext())
+ {
+ Map.Entry<Long, StompSubscription> entry = (Map.Entry<Long,
StompSubscription>)iterator.next();
+ long consumerID = entry.getKey();
+ StompSubscription sub = entry.getValue();
+ if (sub.getDestination().equals(destination))
+ {
+ iterator.remove();
+ session.closeConsumer(consumerID);
+ }
+ }
+ }
+}
\ No newline at end of file
Deleted: trunk/src/main/org/hornetq/core/protocol/stomp/StompSessionCallback.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompSessionCallback.java 2010-01-22
15:32:13 UTC (rev 8843)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompSessionCallback.java 2010-01-26
13:31:50 UTC (rev 8844)
@@ -1,93 +0,0 @@
-/*
- * Copyright 2010 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-package org.hornetq.core.protocol.stomp;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.api.core.HornetQBuffers;
-import org.hornetq.api.core.Message;
-import org.hornetq.api.core.SimpleString;
-import org.hornetq.core.server.ServerMessage;
-import org.hornetq.spi.core.protocol.RemotingConnection;
-import org.hornetq.spi.core.protocol.SessionCallback;
-
-/**
- * A StompSessionCallback
- *
- * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
- */
-class StompSessionCallback implements SessionCallback
-{
- private final RemotingConnection connection;
-
- private final StompMarshaller marshaller;
-
- StompSessionCallback(final StompMarshaller marshaller, final RemotingConnection
connection)
- {
- this.marshaller = marshaller;
- this.connection = connection;
- }
-
- public void sendProducerCreditsMessage(int credits, SimpleString address, int offset)
- {
- }
-
- public int sendMessage(ServerMessage serverMessage, long consumerID, int
deliveryCount)
- {
- try
- {
- Map<String, Object> headers = new HashMap<String, Object>();
- headers.put(Stomp.Headers.Message.DESTINATION,
StompUtils.toStompDestination(serverMessage.getAddress()
-
.toString()));
- byte[] data = new byte[] {};
- if (serverMessage.getType() == Message.TEXT_TYPE)
- {
- SimpleString text =
serverMessage.getBodyBuffer().readNullableSimpleString();
- if (text != null)
- {
- data = text.toString().getBytes();
- }
- }
- StompFrame msg = new StompFrame(Stomp.Responses.MESSAGE, headers, data);
- System.out.println("SENDING : " + msg);
- byte[] bytes = marshaller.marshal(msg);
- HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
- connection.getTransportConnection().write(buffer, true);
-
- return bytes.length;
-
- }
- catch (Exception e)
- {
- e.printStackTrace();
- return 0;
- }
-
- }
-
- public int sendLargeMessageContinuation(long consumerID, byte[] body, boolean
continues, boolean requiresResponse)
- {
- return 0;
- }
-
- public int sendLargeMessage(long consumerID, byte[] headerBuffer, long bodySize, int
deliveryCount)
- {
- return 0;
- }
-
- public void closed()
- {
- }
-}
\ No newline at end of file
Added: trunk/src/main/org/hornetq/core/protocol/stomp/StompSubscription.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompSubscription.java
(rev 0)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompSubscription.java 2010-01-26
13:31:50 UTC (rev 8844)
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.stomp;
+
+/**
+ * A StompSubscription
+ *
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ *
+ *
+ */
+public class StompSubscription
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private final long consumerID;
+
+ private final String subID;
+
+ private final String destination;
+
+ private final String ack;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public StompSubscription(long consumerID, String subID, String destination, String
ack)
+ {
+ this.consumerID = consumerID;
+ this.subID = subID;
+ this.destination = destination;
+ this.ack = ack;
+ }
+
+ // Public --------------------------------------------------------
+
+ public String getAck()
+ {
+ return ack;
+ }
+
+ public String getDestination()
+ {
+ return destination;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompUtils.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompUtils.java 2010-01-22 15:32:13 UTC
(rev 8843)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompUtils.java 2010-01-26 13:31:50 UTC
(rev 8844)
@@ -13,7 +13,16 @@
package org.hornetq.core.protocol.stomp;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Message;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.client.impl.ClientMessageImpl;
+import org.hornetq.core.server.impl.ServerMessageImpl;
/**
* A StompUtils
@@ -71,9 +80,9 @@
}
else
{
- throw new HornetQException(HornetQException.ILLEGAL_STATE, "Illegal
destination name: [" + stompDestination +
- "] --
StompConnect destinations " +
- "must begin with
one of: /queue/ /topic/ /temp-queue/ /temp-topic/");
+ // it is also possible the STOMP client send a message directly to a HornetQ
address
+ // in that case, we do nothing:
+ return stompDestination;
}
}
@@ -101,17 +110,85 @@
}
else
{
- throw new HornetQException(HornetQException.ILLEGAL_STATE, "Illegal address
name: [" + hornetqAddress +
- "] -- Acceptable
address must comply to JMS semantics");
+ // do nothing
+ return hornetqAddress;
}
}
-
- private static String convert(String str, String oldPrefix, String newPrefix)
+
+ private static String convert(String str, String oldPrefix, String newPrefix)
{
String sub = str.substring(oldPrefix.length(), str.length());
return new String(newPrefix + sub);
}
+ public static void copyStandardHeadersFromFrameToMessage(StompFrame frame,
ServerMessageImpl msg) throws Exception
+ {
+ Map<String, Object> headers = new HashMap<String,
Object>(frame.getHeaders());
+
+ String priority = (String)headers.remove(Stomp.Headers.Send.PRIORITY);
+ if (priority != null)
+ {
+ msg.setPriority(Byte.parseByte(priority));
+ }
+ String persistent = (String)headers.remove(Stomp.Headers.Send.PERSISTENT);
+ if (persistent != null)
+ {
+ msg.setDurable(Boolean.parseBoolean(persistent));
+ }
+ // FIXME should use a proper constant
+ msg.putObjectProperty("JMSCorrelationID",
headers.remove(Stomp.Headers.Send.CORRELATION_ID));
+ msg.putObjectProperty("JMSType",
headers.remove(Stomp.Headers.Send.TYPE));
+
+ String groupID = (String)headers.remove("JMSXGroupID");
+ if (groupID != null)
+ {
+ msg.putStringProperty(Message.HDR_GROUP_ID,
SimpleString.toSimpleString(groupID));
+ }
+ Object o = headers.remove(Stomp.Headers.Send.REPLY_TO);
+ if (o != null)
+ {
+ msg.putStringProperty(ClientMessageImpl.REPLYTO_HEADER_NAME,
SimpleString.toSimpleString((String)o));
+ }
+
+ // now the general headers
+ for (Iterator<Map.Entry<String, Object>> iter =
headers.entrySet().iterator(); iter.hasNext();)
+ {
+ Map.Entry<String, Object> entry = iter.next();
+ String name = (String)entry.getKey();
+ Object value = entry.getValue();
+ System.out.println(name + "=" + value);
+ msg.putObjectProperty(name, value);
+ }
+ }
+
+ public static void copyStandardHeadersFromMessageToFrame(Message message, StompFrame
command, int deliveryCount) throws Exception {
+ final Map<String, Object> headers = command.getHeaders();
+ headers.put(Stomp.Headers.Message.DESTINATION,
toStompDestination(message.getAddress().toString()));
+ headers.put(Stomp.Headers.Message.MESSAGE_ID, message.getMessageID());
+
+ if (message.getObjectProperty("JMSCorrelationID") != null) {
+ headers.put(Stomp.Headers.Message.CORRELATION_ID,
message.getObjectProperty("JMSCorrelationID"));
+ }
+ headers.put(Stomp.Headers.Message.EXPIRATION_TIME, "" +
message.getExpiration());
+ headers.put(Stomp.Headers.Message.REDELIVERED, deliveryCount > 1);
+ headers.put(Stomp.Headers.Message.PRORITY, "" + message.getPriority());
+
+ if (message.getStringProperty(ClientMessageImpl.REPLYTO_HEADER_NAME) != null) {
+ headers.put(Stomp.Headers.Message.REPLY_TO,
toStompDestination(message.getStringProperty(ClientMessageImpl.REPLYTO_HEADER_NAME)));
+ }
+ headers.put(Stomp.Headers.Message.TIMESTAMP, "" +
message.getTimestamp());
+
+ if (message.getObjectProperty("JMSType") != null) {
+ headers.put(Stomp.Headers.Message.TYPE,
message.getObjectProperty("JMSType"));
+ }
+
+ // now lets add all the message headers
+ Set<SimpleString> names = message.getPropertyNames();
+ for (SimpleString name : names)
+ {
+ headers.put(name.toString(), message.getObjectProperty(name));
+ }
+ }
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
Modified: trunk/src/main/org/hornetq/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerSession.java 2010-01-22 15:32:13 UTC (rev
8843)
+++ trunk/src/main/org/hornetq/core/server/ServerSession.java 2010-01-26 13:31:50 UTC (rev
8844)
@@ -40,7 +40,7 @@
Object getConnectionID();
- void removeConsumer(ServerConsumer consumer) throws Exception;
+ void removeConsumer(long consumerID) throws Exception;
void acknowledge(long consumerID, long messageID) throws Exception;
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2010-01-22
15:32:13 UTC (rev 8843)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2010-01-26
13:31:50 UTC (rev 8844)
@@ -284,7 +284,7 @@
messageQueue.removeConsumer(this);
}
- session.removeConsumer(this);
+ session.removeConsumer(id);
LinkedList<MessageReference> refs = cancelRefs(false, null);
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-01-22 15:32:13
UTC (rev 8843)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-01-26 13:31:50
UTC (rev 8844)
@@ -228,11 +228,11 @@
return remotingConnection.getID();
}
- public void removeConsumer(final ServerConsumer consumer) throws Exception
+ public void removeConsumer(final long consumerID) throws Exception
{
- if (consumers.remove(consumer.getID()) == null)
+ if (consumers.remove(consumerID) == null)
{
- throw new IllegalStateException("Cannot find consumer with id " +
consumer.getID() + " to remove");
+ throw new IllegalStateException("Cannot find consumer with id " +
consumerID + " to remove");
}
}
Modified: trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-01-22 15:32:13
UTC (rev 8843)
+++ trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-01-26 13:31:50
UTC (rev 8844)
@@ -31,6 +31,7 @@
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
@@ -217,7 +218,7 @@
Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
}
- public void _testJMSXGroupIdCanBeSet() throws Exception {
+ public void testJMSXGroupIdCanBeSet() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
@@ -242,11 +243,11 @@
TextMessage message = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(message);
- // TODO do we support it?
- //Assert.assertEquals("TEST", ((TextMessage) message).getGroupID());
+ // differ from StompConnect
+ Assert.assertEquals("TEST", ((TextMessage)
message).getStringProperty("JMSXGroupID"));
}
- public void _testSendMessageWithCustomHeadersAndSelector() throws Exception {
+ public void testSendMessageWithCustomHeadersAndSelector() throws Exception {
MessageConsumer consumer = session.createConsumer(queue, "foo =
'abc'");
@@ -277,7 +278,7 @@
Assert.assertEquals("bar", "123",
message.getStringProperty("bar"));
}
- public void _testSendMessageWithStandardHeaders() throws Exception {
+ public void testSendMessageWithStandardHeaders() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
@@ -294,6 +295,7 @@
frame =
"SEND\n" +
"correlation-id:c123\n" +
+ "persistent:true\n" +
"priority:3\n" +
"type:t345\n" +
"JMSXGroupID:abc\n" +
@@ -311,6 +313,7 @@
Assert.assertEquals("JMSCorrelationID", "c123",
message.getJMSCorrelationID());
Assert.assertEquals("getJMSType", "t345",
message.getJMSType());
Assert.assertEquals("getJMSPriority", 3, message.getJMSPriority());
+ Assert.assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode());
Assert.assertEquals("foo", "abc",
message.getStringProperty("foo"));
Assert.assertEquals("bar", "123",
message.getStringProperty("bar"));
@@ -350,9 +353,15 @@
"\n\n" +
Stomp.NULL;
sendFrame(frame);
+
+ // message should not be received as it was auto-acked
+ MessageConsumer consumer = session.createConsumer(queue);
+ TextMessage message = (TextMessage) consumer.receive(1000);
+ Assert.assertNull(message);
+
}
- public void _testSubscribeWithAutoAckAndBytesMessage() throws Exception {
+ public void testSubscribeWithAutoAckAndBytesMessage() throws Exception {
String frame =
"CONNECT\n" +
@@ -371,7 +380,8 @@
Stomp.NULL;
sendFrame(frame);
- sendBytesMessage(new byte[]{1, 2, 3, 4, 5});
+ byte[] payload = new byte[]{1, 2, 3, 4, 5};
+ sendBytesMessage(payload);
frame = receiveFrame(10000);
Assert.assertTrue(frame.startsWith("MESSAGE"));
@@ -382,7 +392,8 @@
Assert.assertEquals("5", cl_matcher.group(1));
Assert.assertFalse(Pattern.compile("type:\\s*null",
Pattern.CASE_INSENSITIVE).matcher(frame).find());
-
+ Assert.assertTrue(frame.indexOf(new String(payload)) > -1);
+
frame =
"DISCONNECT\n" +
"\n\n" +
@@ -390,7 +401,7 @@
sendFrame(frame);
}
- public void _testSubscribeWithMessageSentWithProperties() throws Exception {
+ public void testSubscribeWithMessageSentWithProperties() throws Exception {
String frame =
"CONNECT\n" +
@@ -422,6 +433,8 @@
producer.send(message);
frame = receiveFrame(10000);
+ Assert.assertNotNull(frame);
+ System.out.println(frame);
Assert.assertTrue(frame.startsWith("MESSAGE"));
Assert.assertTrue(frame.indexOf("S:") > 0);
Assert.assertTrue(frame.indexOf("n:") > 0);
@@ -442,7 +455,7 @@
sendFrame(frame);
}
- public void _testMessagesAreInOrder() throws Exception {
+ public void testMessagesAreInOrder() throws Exception {
int ctr = 10;
String[] data = new String[ctr];
@@ -493,7 +506,7 @@
sendFrame(frame);
}
- public void _testSubscribeWithAutoAckAndSelector() throws Exception {
+ public void testSubscribeWithAutoAckAndSelector() throws Exception {
String frame =
"CONNECT\n" +
@@ -527,8 +540,54 @@
sendFrame(frame);
}
- public void _testSubscribeWithClientAck() throws Exception {
+ public void testSubscribeWithClientAck() throws Exception {
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SUBSCRIBE\n" +
+ "destination:/queue/" + getQueueName() + "\n"
+
+ "ack:client\n\n" +
+ Stomp.NULL;
+
+ sendFrame(frame);
+
+ sendMessage(getName());
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+ Pattern cl = Pattern.compile("message-id:\\s*(\\d+)",
Pattern.CASE_INSENSITIVE);
+ Matcher cl_matcher = cl.matcher(frame);
+ Assert.assertTrue(cl_matcher.find());
+ String messageID = cl_matcher.group(1);
+
+ frame =
+ "ACK\n" +
+ "message-id: " + messageID + "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame =
+ "DISCONNECT\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ // message should not be received since message was acknowledged by the client
+ MessageConsumer consumer = session.createConsumer(queue);
+ TextMessage message = (TextMessage) consumer.receive(1000);
+ Assert.assertNull(message);
+ }
+
+ public void testRedeliveryWithClientAck() throws Exception {
+
String frame =
"CONNECT\n" +
"login: brianm\n" +
@@ -546,6 +605,7 @@
Stomp.NULL;
sendFrame(frame);
+
sendMessage(getName());
frame = receiveFrame(10000);
Assert.assertTrue(frame.startsWith("MESSAGE"));
@@ -563,11 +623,11 @@
Assert.assertTrue(message.getJMSRedelivered());
}
- public void
_testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithNoDisconnectFrame() throws
Exception {
+ public void
testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithNoDisconnectFrame() throws
Exception {
assertSubscribeWithClientAckThenConsumeWithAutoAck(false);
}
- public void
_testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithExplicitDisconnect() throws
Exception {
+ public void
testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithExplicitDisconnect() throws
Exception {
assertSubscribeWithClientAckThenConsumeWithAutoAck(true);
}
@@ -663,7 +723,7 @@
Assert.assertTrue(frame.contains("shouldBeNextMessage"));
}
- public void _testUnsubscribe() throws Exception {
+ public void testUnsubscribe() throws Exception {
String frame =
"CONNECT\n" +
@@ -685,7 +745,7 @@
sendMessage("first message");
//receive message from socket
- frame = receiveFrame(1000);
+ frame = receiveFrame(10000);
Assert.assertTrue(frame.startsWith("MESSAGE"));
//remove suscription
@@ -711,7 +771,7 @@
}
}
- public void _testTransactionCommit() throws Exception {
+ public void testTransactionCommit() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
String frame =
@@ -757,7 +817,7 @@
Assert.assertNotNull("Should have received a message", message);
}
- public void _testTransactionRollback() throws Exception {
+ public void testTransactionRollback() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
String frame =