Author: gaohoward
Date: 2011-09-21 11:32:44 -0400 (Wed, 21 Sep 2011)
New Revision: 11382
Modified:
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
Log:
tests
Modified:
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
===================================================================
---
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-09-21
13:13:36 UTC (rev 11381)
+++
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-09-21
15:32:44 UTC (rev 11382)
@@ -27,6 +27,7 @@
import junit.framework.Assert;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.protocol.stomp.Stomp;
import org.hornetq.tests.integration.stomp.util.ClientStompFrame;
import org.hornetq.tests.integration.stomp.util.StompClientConnection;
import org.hornetq.tests.integration.stomp.util.StompClientConnectionFactory;
@@ -1081,6 +1082,109 @@
connV11.disconnect();
}
+ public void testJMSXGroupIdCanBeSet() throws Exception
+ {
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ connV11.connect(defUser, defPass);
+
+ ClientStompFrame frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("JMSXGroupID", "TEST");
+ frame.setBody("Hello World");
+
+ connV11.sendFrame(frame);
+
+ TextMessage message = (TextMessage)consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("Hello World", message.getText());
+ // differ from StompConnect
+ Assert.assertEquals("TEST",
message.getStringProperty("JMSXGroupID"));
+ }
+
+ public void testMessagesAreInOrder() throws Exception
+ {
+ int ctr = 10;
+ String[] data = new String[ctr];
+
+ connV11.connect(defUser, defPass);
+
+ this.subscribe(connV11, "sub1", "auto");
+
+ for (int i = 0; i < ctr; ++i)
+ {
+ data[i] = getName() + i;
+ sendMessage(data[i]);
+ }
+
+ ClientStompFrame frame = null;
+
+ for (int i = 0; i < ctr; ++i)
+ {
+ frame = connV11.receiveFrame();
+ Assert.assertTrue("Message not in order",
frame.getBody().equals(data[i]));
+ }
+
+ for (int i = 0; i < ctr; ++i)
+ {
+ data[i] = getName() + ":second:" + i;
+ sendMessage(data[i]);
+ }
+
+ for (int i = 0; i < ctr; ++i)
+ {
+ frame = connV11.receiveFrame();
+ Assert.assertTrue("Message not in order",
frame.getBody().equals(data[i]));
+ }
+
+ connV11.disconnect();
+ }
+
+ public void testSubscribeWithAutoAckAndSelector() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ this.subscribe(connV11, "sub1", "auto", null, "foo =
'zzz'");
+
+ sendMessage("Ignored message", "foo", "1234");
+ sendMessage("Real message", "foo", "zzz");
+
+ ClientStompFrame frame = connV11.receiveFrame();
+
+ Assert.assertTrue("Should have received the real message but got: " +
frame, frame.getBody().equals("Real message"));
+
+ connV11.disconnect();
+ }
+
+ public void testRedeliveryWithClientAck() throws Exception
+ {
+
+ String frame = "CONNECT\n" + "login: brianm\n" +
"passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() +
getQueueName() + "\n" + "ack:client\n\n" + Stomp.NULL;
+
+ sendFrame(frame);
+
+ sendMessage(getName());
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+
+ frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ sendFrame(frame);
+
+ // message should be received since message was not acknowledged
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message message = consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertTrue(message.getJMSRedelivered());
+ }
+
+ //-----------------private help methods
+
private void abortTransaction(StompClientConnection conn, String txID) throws
IOException, InterruptedException
{
ClientStompFrame abortFrame = conn.createFrame("ABORT");
@@ -1138,23 +1242,30 @@
private void subscribe(StompClientConnection conn, String subId, String ack) throws
IOException, InterruptedException
{
- ClientStompFrame subFrame = conn.createFrame("SUBSCRIBE");
- subFrame.addHeader("id", subId);
- subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
- subFrame.addHeader("ack", ack);
-
- conn.sendFrame(subFrame);
+ subscribe(conn, subId, ack, null, null);
}
private void subscribe(StompClientConnection conn, String subId,
String ack, String durableId) throws IOException, InterruptedException
{
+ subscribe(conn, subId, ack, durableId, null);
+ }
+
+ private void subscribe(StompClientConnection conn, String subId,
+ String ack, String durableId, String selector) throws IOException,
InterruptedException
+ {
ClientStompFrame subFrame = conn.createFrame("SUBSCRIBE");
subFrame.addHeader("id", subId);
subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
subFrame.addHeader("ack", ack);
- subFrame.addHeader("durable-subscriber-name", durableId);
-
+ if (durableId != null)
+ {
+ subFrame.addHeader("durable-subscriber-name", durableId);
+ }
+ if (selector != null)
+ {
+ subFrame.addHeader("selector", selector);
+ }
conn.sendFrame(subFrame);
}