[hornetq-commits] JBoss hornetq SVN: r11362 - in branches/STOMP11: tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11 and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Sep 19 00:53:40 EDT 2011


Author: gaohoward
Date: 2011-09-19 00:53:40 -0400 (Mon, 19 Sep 2011)
New Revision: 11362

Modified:
   branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java
   branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
Log:
test


Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java	2011-09-17 05:43:39 UTC (rev 11361)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java	2011-09-19 04:53:40 UTC (rev 11362)
@@ -86,8 +86,6 @@
          
          StompFrame frame = connection.createStompMessage(serverMessage, subscription, deliveryCount);
          
-         log.error("--------------lllll- Sending frame: " + frame);
-         
          if (subscription.getAck().equals(Stomp.Headers.Subscribe.AckModeValues.AUTO))
          {
             session.acknowledge(consumerID, serverMessage.getMessageID());
@@ -155,7 +153,15 @@
             throw new HornetQStompException("subscription id " + subscriptionID + " does not match " + sub.getID());
          }
       }
-      session.acknowledge(consumerID, id);
+      
+      if (sub.getAck().equals(Stomp.Headers.Subscribe.AckModeValues.CLIENT_INDIVIDUAL))
+      {
+         session.individualAcknowledge(consumerID, id);
+      }
+      else
+      {
+         session.acknowledge(consumerID, id);
+      }
       session.commit();
    }
 

Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java	2011-09-17 05:43:39 UTC (rev 11361)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java	2011-09-19 04:53:40 UTC (rev 11362)
@@ -761,7 +761,161 @@
       Message message = consumer.receive(1000);
       Assert.assertNotNull(message);      
    }
+   
+   public void testAckModeClient() throws Exception
+   {
+      connV11.connect(defUser, defPass);
 
+      subscribe(connV11, "sub1", "client");
+      
+      int num = 50;
+      //send a bunch of messages
+      for (int i = 0; i < num; i++)
+      {
+         this.sendMessage("client-ack" + i);
+      }
+      
+      ClientStompFrame frame = null;
+      
+      for (int i = 0; i < num; i++)
+      {
+         frame = connV11.receiveFrame();
+         assertNotNull(frame);
+      }
+      
+      //ack the last
+      this.ack(connV11, "sub1", frame);
+      
+      connV11.disconnect();
+      
+      //no messages can be received.
+      MessageConsumer consumer = session.createConsumer(queue);
+      Message message = consumer.receive(1000);
+      Assert.assertNull(message);
+   }
+   
+   public void testAckModeClient2() throws Exception
+   {
+      connV11.connect(defUser, defPass);
+
+      subscribe(connV11, "sub1", "client");
+      
+      int num = 50;
+      //send a bunch of messages
+      for (int i = 0; i < num; i++)
+      {
+         this.sendMessage("client-ack" + i);
+      }
+      
+      ClientStompFrame frame = null;
+      
+      for (int i = 0; i < num; i++)
+      {
+         frame = connV11.receiveFrame();
+         assertNotNull(frame);
+
+         //ack the 49th
+         if (i == num - 2)
+         {
+            this.ack(connV11, "sub1", frame);
+         }
+      }
+      
+      connV11.disconnect();
+      
+      //no messages can be received.
+      MessageConsumer consumer = session.createConsumer(queue);
+      Message message = consumer.receive(1000);
+      Assert.assertNotNull(message);
+      message = consumer.receive(1000);
+      Assert.assertNull(message);
+   }
+   
+   public void testAckModeAuto() throws Exception
+   {
+      connV11.connect(defUser, defPass);
+
+      subscribe(connV11, "sub1", "auto");
+      
+      int num = 50;
+      //send a bunch of messages
+      for (int i = 0; i < num; i++)
+      {
+         this.sendMessage("auto-ack" + i);
+      }
+      
+      ClientStompFrame frame = null;
+      
+      for (int i = 0; i < num; i++)
+      {
+         frame = connV11.receiveFrame();
+         assertNotNull(frame);
+      }
+      
+      connV11.disconnect();
+      
+      //no messages can be received.
+      MessageConsumer consumer = session.createConsumer(queue);
+      Message message = consumer.receive(1000);
+      Assert.assertNull(message);
+   }
+   
+   public void testAckModeClientIndividual() throws Exception
+   {
+      connV11.connect(defUser, defPass);
+
+      subscribe(connV11, "sub1", "client-individual");
+      
+      int num = 50;
+      //send a bunch of messages
+      for (int i = 0; i < num; i++)
+      {
+         this.sendMessage("client-individual-ack" + i);
+      }
+      
+      ClientStompFrame frame = null;
+      
+      for (int i = 0; i < num; i++)
+      {
+         frame = connV11.receiveFrame();
+         assertNotNull(frame);
+         
+         //ack on even numbers
+         if (i%2 == 0)
+         {
+            this.ack(connV11, "sub1", frame);
+         }
+      }
+      
+      connV11.disconnect();
+      
+      //no messages can be received.
+      MessageConsumer consumer = session.createConsumer(queue);
+      
+      Message message = null;
+      for (int i = 0; i < num/2; i++)
+      {
+         message = consumer.receive(1000);
+         Assert.assertNotNull(message);
+      }
+      message = consumer.receive(1000);
+      Assert.assertNull(message);
+   }
+
+   private void ack(StompClientConnection connV112, String subId,
+         ClientStompFrame frame) throws IOException, InterruptedException
+   {
+      String messageID = frame.getHeader("message-id");
+      
+      ClientStompFrame ackFrame = connV11.createFrame("ACK");
+      //give it a wrong sub id
+      ackFrame.addHeader("subscription", subId);
+      ackFrame.addHeader("message-id", messageID);
+      ackFrame.addHeader("receipt", "answer-me");
+      
+      connV11.sendFrame(ackFrame);
+   }
+
    private void ack(StompClientConnection conn, String subId, String mid) throws IOException, InterruptedException
    {
       ClientStompFrame ackFrame = conn.createFrame("ACK");



More information about the hornetq-commits mailing list