Author: gaohoward
Date: 2011-09-19 00:53:40 -0400 (Mon, 19 Sep 2011)
New Revision: 11362
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
Log:
test
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java 2011-09-17
05:43:39 UTC (rev 11361)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java 2011-09-19
04:53:40 UTC (rev 11362)
@@ -86,8 +86,6 @@
StompFrame frame = connection.createStompMessage(serverMessage, subscription,
deliveryCount);
- log.error("--------------lllll- Sending frame: " + frame);
-
if (subscription.getAck().equals(Stomp.Headers.Subscribe.AckModeValues.AUTO))
{
session.acknowledge(consumerID, serverMessage.getMessageID());
@@ -155,7 +153,15 @@
throw new HornetQStompException("subscription id " + subscriptionID
+ " does not match " + sub.getID());
}
}
- session.acknowledge(consumerID, id);
+
+ if (sub.getAck().equals(Stomp.Headers.Subscribe.AckModeValues.CLIENT_INDIVIDUAL))
+ {
+ session.individualAcknowledge(consumerID, id);
+ }
+ else
+ {
+ session.acknowledge(consumerID, id);
+ }
session.commit();
}
Modified:
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
===================================================================
---
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-09-17
05:43:39 UTC (rev 11361)
+++
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-09-19
04:53:40 UTC (rev 11362)
@@ -761,7 +761,161 @@
Message message = consumer.receive(1000);
Assert.assertNotNull(message);
}
+
+ public void testAckModeClient() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+ subscribe(connV11, "sub1", "client");
+
+ int num = 50;
+ //send a bunch of messages
+ for (int i = 0; i < num; i++)
+ {
+ this.sendMessage("client-ack" + i);
+ }
+
+ ClientStompFrame frame = null;
+
+ for (int i = 0; i < num; i++)
+ {
+ frame = connV11.receiveFrame();
+ assertNotNull(frame);
+ }
+
+ //ack the last
+ this.ack(connV11, "sub1", frame);
+
+ connV11.disconnect();
+
+ //no messages can be received.
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message message = consumer.receive(1000);
+ Assert.assertNull(message);
+ }
+
+ public void testAckModeClient2() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ subscribe(connV11, "sub1", "client");
+
+ int num = 50;
+ //send a bunch of messages
+ for (int i = 0; i < num; i++)
+ {
+ this.sendMessage("client-ack" + i);
+ }
+
+ ClientStompFrame frame = null;
+
+ for (int i = 0; i < num; i++)
+ {
+ frame = connV11.receiveFrame();
+ assertNotNull(frame);
+
+ //ack the 49th
+ if (i == num - 2)
+ {
+ this.ack(connV11, "sub1", frame);
+ }
+ }
+
+ connV11.disconnect();
+
+ //no messages can be received.
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message message = consumer.receive(1000);
+ Assert.assertNotNull(message);
+ message = consumer.receive(1000);
+ Assert.assertNull(message);
+ }
+
+ public void testAckModeAuto() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ subscribe(connV11, "sub1", "auto");
+
+ int num = 50;
+ //send a bunch of messages
+ for (int i = 0; i < num; i++)
+ {
+ this.sendMessage("auto-ack" + i);
+ }
+
+ ClientStompFrame frame = null;
+
+ for (int i = 0; i < num; i++)
+ {
+ frame = connV11.receiveFrame();
+ assertNotNull(frame);
+ }
+
+ connV11.disconnect();
+
+ //no messages can be received.
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message message = consumer.receive(1000);
+ Assert.assertNull(message);
+ }
+
+ public void testAckModeClientIndividual() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ subscribe(connV11, "sub1", "client-individual");
+
+ int num = 50;
+ //send a bunch of messages
+ for (int i = 0; i < num; i++)
+ {
+ this.sendMessage("client-individual-ack" + i);
+ }
+
+ ClientStompFrame frame = null;
+
+ for (int i = 0; i < num; i++)
+ {
+ frame = connV11.receiveFrame();
+ assertNotNull(frame);
+
+ //ack on even numbers
+ if (i%2 == 0)
+ {
+ this.ack(connV11, "sub1", frame);
+ }
+ }
+
+ connV11.disconnect();
+
+ //no messages can be received.
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ Message message = null;
+ for (int i = 0; i < num/2; i++)
+ {
+ message = consumer.receive(1000);
+ Assert.assertNotNull(message);
+ }
+ message = consumer.receive(1000);
+ Assert.assertNull(message);
+ }
+
+ private void ack(StompClientConnection connV112, String subId,
+ ClientStompFrame frame) throws IOException, InterruptedException
+ {
+ String messageID = frame.getHeader("message-id");
+
+ ClientStompFrame ackFrame = connV11.createFrame("ACK");
+ //give it a wrong sub id
+ ackFrame.addHeader("subscription", subId);
+ ackFrame.addHeader("message-id", messageID);
+ ackFrame.addHeader("receipt", "answer-me");
+
+ connV11.sendFrame(ackFrame);
+ }
+
private void ack(StompClientConnection conn, String subId, String mid) throws
IOException, InterruptedException
{
ClientStompFrame ackFrame = conn.createFrame("ACK");