Author: gaohoward
Date: 2011-09-16 11:45:09 -0400 (Fri, 16 Sep 2011)
New Revision: 11356
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/HornetQStompException.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
Log:
tests
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/HornetQStompException.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/HornetQStompException.java 2011-09-16
14:39:57 UTC (rev 11355)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/HornetQStompException.java 2011-09-16
15:45:09 UTC (rev 11356)
@@ -42,6 +42,7 @@
public HornetQStompException(String msg, Throwable t)
{
super(msg, t);
+ this.body = t.getMessage();
}
public HornetQStompException(Throwable t)
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java 2011-09-16
14:39:57 UTC (rev 11355)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java 2011-09-16
15:45:09 UTC (rev 11356)
@@ -140,7 +140,12 @@
public void acknowledge(String messageID, String subscriptionID) throws Exception
{
long id = Long.parseLong(messageID);
- long consumerID = messagesToAck.remove(id);
+ Long consumerID = messagesToAck.remove(id);
+
+ if (consumerID == null)
+ {
+ throw new HornetQStompException("failed to ack because no message with id:
" + id);
+ }
StompSubscription sub = subscriptions.get(consumerID);
if (subscriptionID != null)
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java 2011-09-16
14:39:57 UTC (rev 11355)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java 2011-09-16
15:45:09 UTC (rev 11356)
@@ -104,6 +104,13 @@
log.error("---------------postprocessed response: " + response);
}
+ else
+ {
+ if (request.hasHeader(Stomp.Headers.RECEIPT_REQUESTED))
+ {
+ response.addHeader(Stomp.Headers.Response.RECEIPT_ID,
request.getHeader(Stomp.Headers.RECEIPT_REQUESTED));
+ }
+ }
return response;
}
Modified:
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
===================================================================
---
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-09-16
14:39:57 UTC (rev 11355)
+++
branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-09-16
15:45:09 UTC (rev 11356)
@@ -690,7 +690,78 @@
Assert.assertNotNull(message);
}
+ public void testErrorWithReceipt() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+ subscribe(connV11, "sub1", "client");
+
+ sendMessage(getName());
+
+ ClientStompFrame frame = connV11.receiveFrame();
+
+ String messageID = frame.getHeader("message-id");
+
+ System.out.println("Received message with id " + messageID);
+
+ ClientStompFrame ackFrame = connV11.createFrame("ACK");
+ //give it a wrong sub id
+ ackFrame.addHeader("subscription", "sub2");
+ ackFrame.addHeader("message-id", messageID);
+ ackFrame.addHeader("receipt", "answer-me");
+
+ ClientStompFrame error = connV11.sendFrame(ackFrame);
+
+ System.out.println("Receiver error: " + error);
+
+ assertEquals("ERROR", error.getCommand());
+
+ assertEquals("answer-me", error.getHeader("receipt-id"));
+
+ connV11.disconnect();
+
+ //message should still there
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message message = consumer.receive(1000);
+ Assert.assertNotNull(message);
+ }
+
+ public void testErrorWithReceipt2() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ subscribe(connV11, "sub1", "client");
+
+ sendMessage(getName());
+
+ ClientStompFrame frame = connV11.receiveFrame();
+
+ String messageID = frame.getHeader("message-id");
+
+ System.out.println("Received message with id " + messageID);
+
+ ClientStompFrame ackFrame = connV11.createFrame("ACK");
+ //give it a wrong sub id
+ ackFrame.addHeader("subscription", "sub1");
+ ackFrame.addHeader("message-id", String.valueOf(Long.valueOf(messageID) +
1));
+ ackFrame.addHeader("receipt", "answer-me");
+
+ ClientStompFrame error = connV11.sendFrame(ackFrame);
+
+ System.out.println("Receiver error: " + error);
+
+ assertEquals("ERROR", error.getCommand());
+
+ assertEquals("answer-me", error.getHeader("receipt-id"));
+
+ connV11.disconnect();
+
+ //message should still there
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message message = consumer.receive(1000);
+ Assert.assertNotNull(message);
+ }
+
private void ack(StompClientConnection conn, String subId, String mid) throws
IOException, InterruptedException
{
ClientStompFrame ackFrame = conn.createFrame("ACK");
Show replies by date