[hornetq-commits] JBoss hornetq SVN: r11356 - in branches/STOMP11: tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11 and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Sep 16 11:45:09 EDT 2011


Author: gaohoward
Date: 2011-09-16 11:45:09 -0400 (Fri, 16 Sep 2011)
New Revision: 11356

Modified:
   branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/HornetQStompException.java
   branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java
   branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java
   branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
Log:
tests


Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/HornetQStompException.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/HornetQStompException.java	2011-09-16 14:39:57 UTC (rev 11355)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/HornetQStompException.java	2011-09-16 15:45:09 UTC (rev 11356)
@@ -42,6 +42,7 @@
    public HornetQStompException(String msg, Throwable t)
    {
       super(msg, t);
+      this.body = t.getMessage();
    }
    
    public HornetQStompException(Throwable t)

Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java	2011-09-16 14:39:57 UTC (rev 11355)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java	2011-09-16 15:45:09 UTC (rev 11356)
@@ -140,7 +140,12 @@
    public void acknowledge(String messageID, String subscriptionID) throws Exception
    {
       long id = Long.parseLong(messageID);
-      long consumerID = messagesToAck.remove(id);
+      Long consumerID = messagesToAck.remove(id);
+      
+      if (consumerID == null)
+      {
+         throw new HornetQStompException("failed to ack because no message with id: " + id);
+      }
       StompSubscription sub = subscriptions.get(consumerID);
 
       if (subscriptionID != null)

Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java	2011-09-16 14:39:57 UTC (rev 11355)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java	2011-09-16 15:45:09 UTC (rev 11356)
@@ -104,6 +104,13 @@
          
          log.error("---------------postprocessed response: " + response);
       }
+      else
+      {
+         if (request.hasHeader(Stomp.Headers.RECEIPT_REQUESTED))
+         {
+            response.addHeader(Stomp.Headers.Response.RECEIPT_ID, request.getHeader(Stomp.Headers.RECEIPT_REQUESTED));
+         }
+      }
       
       return response;
    }

Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java	2011-09-16 14:39:57 UTC (rev 11355)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java	2011-09-16 15:45:09 UTC (rev 11356)
@@ -690,7 +690,78 @@
       Assert.assertNotNull(message);
    }
    
+   public void testErrorWithReceipt() throws Exception
+   {
+      connV11.connect(defUser, defPass);
 
+      subscribe(connV11, "sub1", "client");
+
+      sendMessage(getName());
+
+      ClientStompFrame frame = connV11.receiveFrame();
+
+      String messageID = frame.getHeader("message-id");
+      
+      System.out.println("Received message with id " + messageID);
+      
+      ClientStompFrame ackFrame = connV11.createFrame("ACK");
+      //give it a wrong sub id
+      ackFrame.addHeader("subscription", "sub2");
+      ackFrame.addHeader("message-id", messageID);
+      ackFrame.addHeader("receipt", "answer-me");
+      
+      ClientStompFrame error = connV11.sendFrame(ackFrame);
+      
+      System.out.println("Receiver error: " + error);
+      
+      assertEquals("ERROR", error.getCommand());
+      
+      assertEquals("answer-me", error.getHeader("receipt-id"));
+      
+      connV11.disconnect();
+
+      //message should still there
+      MessageConsumer consumer = session.createConsumer(queue);
+      Message message = consumer.receive(1000);
+      Assert.assertNotNull(message);      
+   }
+   
+   public void testErrorWithReceipt2() throws Exception
+   {
+      connV11.connect(defUser, defPass);
+
+      subscribe(connV11, "sub1", "client");
+
+      sendMessage(getName());
+
+      ClientStompFrame frame = connV11.receiveFrame();
+
+      String messageID = frame.getHeader("message-id");
+      
+      System.out.println("Received message with id " + messageID);
+      
+      ClientStompFrame ackFrame = connV11.createFrame("ACK");
+      //give it a wrong sub id
+      ackFrame.addHeader("subscription", "sub1");
+      ackFrame.addHeader("message-id", String.valueOf(Long.valueOf(messageID) + 1));
+      ackFrame.addHeader("receipt", "answer-me");
+      
+      ClientStompFrame error = connV11.sendFrame(ackFrame);
+      
+      System.out.println("Receiver error: " + error);
+      
+      assertEquals("ERROR", error.getCommand());
+      
+      assertEquals("answer-me", error.getHeader("receipt-id"));
+      
+      connV11.disconnect();
+
+      //message should still there
+      MessageConsumer consumer = session.createConsumer(queue);
+      Message message = consumer.receive(1000);
+      Assert.assertNotNull(message);      
+   }
+
    private void ack(StompClientConnection conn, String subId, String mid) throws IOException, InterruptedException
    {
       ClientStompFrame ackFrame = conn.createFrame("ACK");



More information about the hornetq-commits mailing list