[hornetq-commits] JBoss hornetq SVN: r11406 - in branches/STOMP11: hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11 and 3 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Sat Sep 24 10:24:33 EDT 2011
Author: gaohoward
Date: 2011-09-24 10:24:32 -0400 (Sat, 24 Sep 2011)
New Revision: 11406
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompUtils.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/StompTest.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
Log:
tests
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java 2011-09-24 06:22:26 UTC (rev 11405)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java 2011-09-24 14:24:32 UTC (rev 11406)
@@ -601,6 +601,7 @@
selector += " AND " + noLocalFilter;
}
}
+
if (ack == null)
{
ack = Stomp.Headers.Subscribe.AckModeValues.AUTO;
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompUtils.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompUtils.java 2011-09-24 06:22:26 UTC (rev 11405)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompUtils.java 2011-09-24 14:24:32 UTC (rev 11406)
@@ -119,9 +119,11 @@
Set<SimpleString> names = message.getPropertyNames();
for (SimpleString name : names)
{
+ String value = name.toString();
if (name.equals(ClientMessageImpl.REPLYTO_HEADER_NAME) ||
- name.toString().equals("JMSType") ||
- name.toString().equals("JMSCorrelationID"))
+ value.equals("JMSType") ||
+ value.equals("JMSCorrelationID") ||
+ value.equals(Stomp.Headers.Message.DESTINATION))
{
continue;
}
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2011-09-24 06:22:26 UTC (rev 11405)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2011-09-24 14:24:32 UTC (rev 11406)
@@ -432,7 +432,7 @@
byte[] data = new byte[size];
if (serverMessage.containsProperty(Stomp.Headers.CONTENT_LENGTH) || serverMessage.getType() == Message.BYTES_TYPE)
{
- frame.addHeader(Stomp.Headers.CONTENT_LENGTH, String.valueOf(data.length > 0 ? (data.length - 1) : data.length));
+ frame.addHeader(Stomp.Headers.CONTENT_LENGTH, String.valueOf(data.length));
buffer.readBytes(data);
}
else
@@ -1036,14 +1036,12 @@
}
else
{
- content = new byte[decoder.contentLength + 1];
+ content = new byte[decoder.contentLength];
System.arraycopy(decoder.workingBuffer, decoder.pos, content, 0, decoder.contentLength);
decoder.pos += decoder.contentLength + 1;
- content[decoder.contentLength] = 0;
-
//drain all the rest
if (decoder.bodyStart == -1)
{
Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/StompTest.java 2011-09-24 06:22:26 UTC (rev 11405)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/StompTest.java 2011-09-24 14:24:32 UTC (rev 11406)
@@ -192,7 +192,6 @@
public void testSendMessageWithReceipt() throws Exception
{
-
MessageConsumer consumer = session.createConsumer(queue);
String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
@@ -407,6 +406,9 @@
sendMessage(payload, queue);
frame = receiveFrame(10000);
+
+ System.out.println("Message: " + frame);
+
Assert.assertTrue(frame.startsWith("MESSAGE"));
Pattern cl = Pattern.compile("Content-length:\\s*(\\d+)", Pattern.CASE_INSENSITIVE);
Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java 2011-09-24 06:22:26 UTC (rev 11405)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java 2011-09-24 14:24:32 UTC (rev 11406)
@@ -149,6 +149,11 @@
{
return frameQueue.poll(10, TimeUnit.SECONDS);
}
+
+ public ClientStompFrame receiveFrame(long timeout) throws InterruptedException
+ {
+ return frameQueue.poll(timeout, TimeUnit.MILLISECONDS);
+ }
//put bytes to byte array.
private void receiveBytes(int n) throws UnsupportedEncodingException
Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java 2011-09-24 06:22:26 UTC (rev 11405)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java 2011-09-24 14:24:32 UTC (rev 11406)
@@ -26,6 +26,8 @@
ClientStompFrame receiveFrame() throws InterruptedException;
+ ClientStompFrame receiveFrame(long timeout) throws InterruptedException;;
+
void connect() throws Exception;
void disconnect() throws IOException, InterruptedException;
Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-09-24 06:22:26 UTC (rev 11405)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-09-24 14:24:32 UTC (rev 11406)
@@ -23,15 +23,16 @@
import java.util.concurrent.TimeUnit;
import javax.jms.BytesMessage;
+import javax.jms.DeliveryMode;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
import javax.jms.TextMessage;
import junit.framework.Assert;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.protocol.stomp.Stomp;
import org.hornetq.tests.integration.stomp.util.ClientStompFrame;
import org.hornetq.tests.integration.stomp.util.StompClientConnection;
import org.hornetq.tests.integration.stomp.util.StompClientConnectionFactory;
@@ -936,7 +937,46 @@
Assert.assertNull(message);
}
+
+ public void testTwoSubscribers() throws Exception
+ {
+ connV11.connect(defUser, defPass, "myclientid");
+ this.subscribeTopic(connV11, "sub1", "auto", null);
+
+ StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ newConn.connect(defUser, defPass, "myclientid2");
+
+ this.subscribeTopic(newConn, "sub2", "auto", null);
+
+ ClientStompFrame frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getTopicPrefix() + getTopicName());
+
+ frame.setBody("Hello World");
+
+ connV11.sendFrame(frame);
+
+ // receive message from socket
+ frame = connV11.receiveFrame(1000);
+
+ System.out.println("received frame : " + frame);
+ assertEquals("Hello World", frame.getBody());
+ assertEquals("sub1", frame.getHeader("subscription"));
+
+ frame = newConn.receiveFrame(1000);
+
+ System.out.println("received 2 frame : " + frame);
+ assertEquals("Hello World", frame.getBody());
+ assertEquals("sub2", frame.getHeader("subscription"));
+
+ // remove suscription
+ this.unsubscribe(connV11, "sub1", true);
+ this.unsubscribe(newConn, "sub2", true);
+
+ connV11.disconnect();
+ newConn.disconnect();
+ }
+
//tests below are adapted from StompTest
public void testBeginSameTransactionTwice() throws Exception
{
@@ -1255,8 +1295,8 @@
BytesMessage message = (BytesMessage)consumer.receive(10000);
Assert.assertNotNull(message);
- //there is one extra null byte
- assertEquals(data.length + 1, message.getBodyLength());
+
+ assertEquals(data.length, message.getBodyLength());
assertEquals(data[0], message.readByte());
assertEquals(data[1], message.readByte());
assertEquals(data[2], message.readByte());
@@ -1307,10 +1347,394 @@
long tnow = System.currentTimeMillis();
long tmsg = message.getJMSTimestamp();
Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
+
+ assertNull(consumer.receive(1000));
+
+ connV11.disconnect();
}
+ public void testSendMessageWithReceipt() throws Exception
+ {
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ connV11.connect(defUser, defPass);
+
+ ClientStompFrame frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("receipt", "1234");
+ frame.setBody("Hello World");
+
+ frame = connV11.sendFrame(frame);
+
+ assertTrue(frame.getCommand().equals("RECEIPT"));
+ assertEquals("1234", frame.getHeader("receipt-id"));
+
+ TextMessage message = (TextMessage)consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("Hello World", message.getText());
+
+ // Make sure that the timestamp is valid - should
+ // be very close to the current time.
+ long tnow = System.currentTimeMillis();
+ long tmsg = message.getJMSTimestamp();
+ Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
+
+ connV11.disconnect();
+ }
+
+ public void testSendMessageWithStandardHeaders() throws Exception
+ {
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ connV11.connect(defUser, defPass);
+
+ ClientStompFrame frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("correlation-id", "c123");
+ frame.addHeader("persistent", "true");
+ frame.addHeader("priority", "3");
+ frame.addHeader("type", "t345");
+ frame.addHeader("JMSXGroupID", "abc");
+ frame.addHeader("foo", "abc");
+ frame.addHeader("bar", "123");
+
+ frame.setBody("Hello World");
+
+ frame = connV11.sendFrame(frame);
+
+ TextMessage message = (TextMessage)consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("Hello World", message.getText());
+ Assert.assertEquals("JMSCorrelationID", "c123", message.getJMSCorrelationID());
+ Assert.assertEquals("getJMSType", "t345", message.getJMSType());
+ Assert.assertEquals("getJMSPriority", 3, message.getJMSPriority());
+ Assert.assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode());
+ Assert.assertEquals("foo", "abc", message.getStringProperty("foo"));
+ Assert.assertEquals("bar", "123", message.getStringProperty("bar"));
+
+ Assert.assertEquals("JMSXGroupID", "abc", message.getStringProperty("JMSXGroupID"));
+
+ connV11.disconnect();
+ }
+
+ public void testSubscribeToTopic() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ this.subscribeTopic(connV11, "sub1", null, null, true);
+
+ sendMessage(getName(), topic);
+
+ ClientStompFrame frame = connV11.receiveFrame();
+
+ Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
+ Assert.assertTrue(frame.getHeader("destination").equals(getTopicPrefix() + getTopicName()));
+ Assert.assertTrue(frame.getBody().equals(getName()));
+
+ this.unsubscribe(connV11, "sub1", true);
+
+ sendMessage(getName(), topic);
+
+ frame = connV11.receiveFrame(1000);
+ assertNull(frame);
+
+ connV11.disconnect();
+ }
+
+ public void testSubscribeToTopicWithNoLocal() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ this.subscribeTopic(connV11, "sub1", null, null, true, true);
+
+ // send a message on the same connection => it should not be received
+ ClientStompFrame frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getTopicPrefix() + getTopicName());
+
+ frame.setBody("Hello World");
+
+ connV11.sendFrame(frame);
+
+ frame = connV11.receiveFrame(2000);
+
+ assertNull(frame);
+
+ // send message on another JMS connection => it should be received
+ sendMessage(getName(), topic);
+
+ frame = connV11.receiveFrame();
+
+ Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
+ Assert.assertTrue(frame.getHeader("destination").equals(getTopicPrefix() + getTopicName()));
+ Assert.assertTrue(frame.getBody().equals(getName()));
+
+ this.unsubscribe(connV11, "sub1");
+
+ connV11.disconnect();
+ }
+
+ public void testSubscribeWithAutoAck() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ this.subscribe(connV11, "sub1", "auto");
+
+ sendMessage(getName());
+
+ ClientStompFrame frame = connV11.receiveFrame();
+
+ Assert.assertEquals("MESSAGE", frame.getCommand());
+ Assert.assertNotNull(frame.getHeader("destination"));
+ Assert.assertEquals(getName(), frame.getBody());
+
+ connV11.disconnect();
+
+ // message should not be received as it was auto-acked
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message message = consumer.receive(1000);
+ Assert.assertNull(message);
+ }
+
+ public void testSubscribeWithAutoAckAndBytesMessage() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ this.subscribe(connV11, "sub1", "auto");
+
+ byte[] payload = new byte[] { 1, 2, 3, 4, 5 };
+ sendMessage(payload, queue);
+
+ ClientStompFrame frame = connV11.receiveFrame();
+
+ assertEquals("MESSAGE", frame.getCommand());
+
+ System.out.println("Message: " + frame);
+
+ assertEquals("5", frame.getHeader("content-length"));
+
+ assertEquals(null, frame.getHeader("type"));
+
+ assertEquals(frame.getBody(), new String(payload, "UTF-8"));
+
+ connV11.disconnect();
+ }
+
+ public void testSubscribeWithClientAck() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ this.subscribe(connV11, "sub1", "client");
+
+ sendMessage(getName());
+
+ ClientStompFrame frame = connV11.receiveFrame();
+
+ this.ack(connV11, "sub1", frame);
+
+ connV11.disconnect();
+
+ // message should not be received since message was acknowledged by the client
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message message = consumer.receive(1000);
+ Assert.assertNull(message);
+ }
+
+ public void testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithExplicitDisconnect() throws Exception
+ {
+ assertSubscribeWithClientAckThenConsumeWithAutoAck(true);
+ }
+
+ public void testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithNoDisconnectFrame() throws Exception
+ {
+ assertSubscribeWithClientAckThenConsumeWithAutoAck(false);
+ }
+
+ public void testSubscribeWithID() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ this.subscribe(connV11, "mysubid", "auto");
+
+ sendMessage(getName());
+
+ ClientStompFrame frame = connV11.receiveFrame();
+
+ Assert.assertTrue(frame.getHeader("subscription") != null);
+
+ connV11.disconnect();
+ }
+
+ public void testSubscribeWithMessageSentWithProperties() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ this.subscribe(connV11, "sub1", "auto");
+
+ MessageProducer producer = session.createProducer(queue);
+ BytesMessage message = session.createBytesMessage();
+ message.setStringProperty("S", "value");
+ message.setBooleanProperty("n", false);
+ message.setByteProperty("byte", (byte)9);
+ message.setDoubleProperty("d", 2.0);
+ message.setFloatProperty("f", (float)6.0);
+ message.setIntProperty("i", 10);
+ message.setLongProperty("l", 121);
+ message.setShortProperty("s", (short)12);
+ message.writeBytes("Hello World".getBytes("UTF-8"));
+ producer.send(message);
+
+ ClientStompFrame frame = connV11.receiveFrame();
+ Assert.assertNotNull(frame);
+
+ Assert.assertTrue(frame.getHeader("S") != null);
+ Assert.assertTrue(frame.getHeader("n") != null);
+ Assert.assertTrue(frame.getHeader("byte") != null);
+ Assert.assertTrue(frame.getHeader("d") != null);
+ Assert.assertTrue(frame.getHeader("f") != null);
+ Assert.assertTrue(frame.getHeader("i") != null);
+ Assert.assertTrue(frame.getHeader("l") != null);
+ Assert.assertTrue(frame.getHeader("s") != null);
+ Assert.assertEquals("Hello World", frame.getBody());
+
+ connV11.disconnect();
+ }
+
+ public void testSuccessiveTransactionsWithSameID() throws Exception
+ {
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ connV11.connect(defUser, defPass);
+
+ // first tx
+ this.beginTransaction(connV11, "tx1");
+
+ ClientStompFrame frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("transaction", "tx1");
+ frame.setBody("Hello World");
+
+ connV11.sendFrame(frame);
+
+ this.commitTransaction(connV11, "tx1");
+
+ Message message = consumer.receive(1000);
+ Assert.assertNotNull("Should have received a message", message);
+
+ // 2nd tx with same tx ID
+ this.beginTransaction(connV11, "tx1");
+
+ frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("transaction", "tx1");
+
+ frame.setBody("Hello World");
+
+ connV11.sendFrame(frame);
+
+ this.commitTransaction(connV11, "tx1");
+
+ message = consumer.receive(1000);
+ Assert.assertNotNull("Should have received a message", message);
+
+ connV11.disconnect();
+ }
+
+ public void testTransactionCommit() throws Exception
+ {
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ connV11.connect(defUser, defPass);
+
+ this.beginTransaction(connV11, "tx1");
+
+ ClientStompFrame frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("transaction", "tx1");
+ frame.addHeader("receipt", "123");
+ frame.setBody("Hello World");
+
+ frame = connV11.sendFrame(frame);
+
+ assertEquals("123", frame.getHeader("receipt-id"));
+
+ // check the message is not committed
+ assertNull(consumer.receive(100));
+
+ this.commitTransaction(connV11, "tx1", true);
+
+ Message message = consumer.receive(1000);
+ Assert.assertNotNull("Should have received a message", message);
+
+ connV11.disconnect();
+ }
+
+ public void testTransactionRollback() throws Exception
+ {
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ connV11.connect(defUser, defPass);
+
+ this.beginTransaction(connV11, "tx1");
+
+ ClientStompFrame frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("transaction", "tx1");
+
+ frame.setBody("first message");
+
+ connV11.sendFrame(frame);
+
+ // rollback first message
+ this.abortTransaction(connV11, "tx1");
+
+ this.beginTransaction(connV11, "tx1");
+
+ frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("transaction", "tx1");
+
+ frame.setBody("second message");
+
+ connV11.sendFrame(frame);
+
+ this.commitTransaction(connV11, "tx1", true);
+
+ // only second msg should be received since first msg was rolled back
+ TextMessage message = (TextMessage)consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("second message", message.getText());
+
+ connV11.disconnect();
+ }
+
+ public void testUnsubscribe() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ this.subscribe(connV11, "sub1", "auto");
+
+ // send a message to our queue
+ sendMessage("first message");
+
+ // receive message from socket
+ ClientStompFrame frame = connV11.receiveFrame();
+
+ Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
+
+ // remove suscription
+ this.unsubscribe(connV11, "sub1", true);
+
+ // send a message to our queue
+ sendMessage("second message");
+
+ frame = connV11.receiveFrame(1000);
+ assertNull(frame);
+
+ connV11.disconnect();
+ }
+
//-----------------private help methods
-
+
private void abortTransaction(StompClientConnection conn, String txID) throws IOException, InterruptedException
{
ClientStompFrame abortFrame = conn.createFrame("ABORT");
@@ -1326,7 +1750,27 @@
conn.sendFrame(beginFrame);
}
-
+
+ private void commitTransaction(StompClientConnection conn, String txID) throws IOException, InterruptedException
+ {
+ commitTransaction(conn, txID, false);
+ }
+
+ private void commitTransaction(StompClientConnection conn, String txID, boolean receipt) throws IOException, InterruptedException
+ {
+ ClientStompFrame beginFrame = conn.createFrame("COMMIT");
+ beginFrame.addHeader("transaction", txID);
+ if (receipt)
+ {
+ beginFrame.addHeader("receipt", "1234");
+ }
+ ClientStompFrame resp = conn.sendFrame(beginFrame);
+ if (receipt)
+ {
+ assertEquals("1234", resp.getHeader("receipt-id"));
+ }
+ }
+
private void ack(StompClientConnection conn, String subId,
ClientStompFrame frame) throws IOException, InterruptedException
{
@@ -1376,14 +1820,30 @@
{
subscribe(conn, subId, ack, durableId, null);
}
+
+ private void subscribe(StompClientConnection conn, String subId,
+ String ack, String durableId, boolean receipt) throws IOException, InterruptedException
+ {
+ subscribe(conn, subId, ack, durableId, null, receipt);
+ }
+
+ private void subscribe(StompClientConnection conn, String subId, String ack,
+ String durableId, String selector) throws IOException,
+ InterruptedException
+ {
+ subscribe(conn, subId, ack, durableId, selector, false);
+ }
private void subscribe(StompClientConnection conn, String subId,
- String ack, String durableId, String selector) throws IOException, InterruptedException
+ String ack, String durableId, String selector, boolean receipt) throws IOException, InterruptedException
{
ClientStompFrame subFrame = conn.createFrame("SUBSCRIBE");
subFrame.addHeader("id", subId);
subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
- subFrame.addHeader("ack", ack);
+ if (ack != null)
+ {
+ subFrame.addHeader("ack", ack);
+ }
if (durableId != null)
{
subFrame.addHeader("durable-subscriber-name", durableId);
@@ -1392,19 +1852,60 @@
{
subFrame.addHeader("selector", selector);
}
- conn.sendFrame(subFrame);
+ if (receipt)
+ {
+ subFrame.addHeader("receipt", "1234");
+ }
+
+ subFrame = conn.sendFrame(subFrame);
+
+ if (receipt)
+ {
+ assertEquals("1234", subFrame.getHeader("receipt-id"));
+ }
}
private void subscribeTopic(StompClientConnection conn, String subId,
String ack, String durableId) throws IOException, InterruptedException
{
+ subscribeTopic(conn, subId, ack, durableId, false);
+ }
+
+ private void subscribeTopic(StompClientConnection conn, String subId,
+ String ack, String durableId, boolean receipt) throws IOException, InterruptedException
+ {
+ subscribeTopic(conn, subId, ack, durableId, receipt, false);
+ }
+
+ private void subscribeTopic(StompClientConnection conn, String subId,
+ String ack, String durableId, boolean receipt, boolean noLocal) throws IOException, InterruptedException
+ {
ClientStompFrame subFrame = conn.createFrame("SUBSCRIBE");
subFrame.addHeader("id", subId);
subFrame.addHeader("destination", getTopicPrefix() + getTopicName());
- subFrame.addHeader("ack", ack);
- subFrame.addHeader("durable-subscriber-name", durableId);
+ if (ack != null)
+ {
+ subFrame.addHeader("ack", ack);
+ }
+ if (durableId != null)
+ {
+ subFrame.addHeader("durable-subscriber-name", durableId);
+ }
+ if (receipt)
+ {
+ subFrame.addHeader("receipt", "1234");
+ }
+ if (noLocal)
+ {
+ subFrame.addHeader("no-local", "true");
+ }
- conn.sendFrame(subFrame);
+ ClientStompFrame frame = conn.sendFrame(subFrame);
+
+ if (receipt)
+ {
+ assertTrue(frame.getHeader("receipt-id").equals("1234"));
+ }
}
private void unsubscribe(StompClientConnection conn, String subId) throws IOException, InterruptedException
@@ -1414,7 +1915,77 @@
conn.sendFrame(subFrame);
}
+
+ private void unsubscribe(StompClientConnection conn, String subId,
+ boolean receipt) throws IOException, InterruptedException
+ {
+ ClientStompFrame subFrame = conn.createFrame("UNSUBSCRIBE");
+ subFrame.addHeader("id", subId);
+
+ if (receipt)
+ {
+ subFrame.addHeader("receipt", "4321");
+ }
+
+ ClientStompFrame f = conn.sendFrame(subFrame);
+
+ if (receipt)
+ {
+ System.out.println("response: " + f);
+ assertEquals("RECEIPT", f.getCommand());
+ assertEquals("4321", f.getHeader("receipt-id"));
+ }
+ }
+ protected void assertSubscribeWithClientAckThenConsumeWithAutoAck(boolean sendDisconnect) throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ this.subscribe(connV11, "sub1", "client");
+
+ sendMessage(getName());
+
+ ClientStompFrame frame = connV11.receiveFrame();
+
+ Assert.assertEquals("MESSAGE", frame.getCommand());
+
+ log.info("Reconnecting!");
+
+ if (sendDisconnect)
+ {
+ connV11.disconnect();
+ connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ }
+ else
+ {
+ connV11.destroy();
+ connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ }
+
+ // message should be received since message was not acknowledged
+ connV11.connect(defUser, defPass);
+
+ this.subscribe(connV11, "sub1", null);
+
+ frame = connV11.receiveFrame();
+ Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
+
+ connV11.disconnect();
+
+ // now lets make sure we don't see the message again
+ connV11.destroy();
+ connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ connV11.connect(defUser, defPass);
+
+ this.subscribe(connV11, "sub1", null, null, true);
+
+ sendMessage("shouldBeNextMessage");
+
+ frame = connV11.receiveFrame();
+ Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
+ Assert.assertEquals("shouldBeNextMessage", frame.getBody());
+ }
+
}
More information about the hornetq-commits
mailing list