[hornetq-commits] JBoss hornetq SVN: r11406 - in branches/STOMP11: hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11 and 3 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Sat Sep 24 10:24:33 EDT 2011


Author: gaohoward
Date: 2011-09-24 10:24:32 -0400 (Sat, 24 Sep 2011)
New Revision: 11406

Modified:
   branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java
   branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompUtils.java
   branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
   branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/StompTest.java
   branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java
   branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java
   branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
Log:
tests


Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java	2011-09-24 06:22:26 UTC (rev 11405)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java	2011-09-24 14:24:32 UTC (rev 11406)
@@ -601,6 +601,7 @@
             selector += " AND " + noLocalFilter;
          }
       }
+
       if (ack == null)
       {
          ack = Stomp.Headers.Subscribe.AckModeValues.AUTO;

Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompUtils.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompUtils.java	2011-09-24 06:22:26 UTC (rev 11405)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompUtils.java	2011-09-24 14:24:32 UTC (rev 11406)
@@ -119,9 +119,11 @@
       Set<SimpleString> names = message.getPropertyNames();
       for (SimpleString name : names)
       {
+         String value = name.toString();
          if (name.equals(ClientMessageImpl.REPLYTO_HEADER_NAME) ||
-                  name.toString().equals("JMSType") ||
-                  name.toString().equals("JMSCorrelationID"))
+                  value.equals("JMSType") ||
+                  value.equals("JMSCorrelationID") ||
+                  value.equals(Stomp.Headers.Message.DESTINATION))
          {
             continue;
          }

Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java	2011-09-24 06:22:26 UTC (rev 11405)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java	2011-09-24 14:24:32 UTC (rev 11406)
@@ -432,7 +432,7 @@
       byte[] data = new byte[size];
       if (serverMessage.containsProperty(Stomp.Headers.CONTENT_LENGTH) || serverMessage.getType() == Message.BYTES_TYPE)
       {
-         frame.addHeader(Stomp.Headers.CONTENT_LENGTH, String.valueOf(data.length > 0 ? (data.length - 1) : data.length));
+         frame.addHeader(Stomp.Headers.CONTENT_LENGTH, String.valueOf(data.length));
          buffer.readBytes(data);
       }
       else
@@ -1036,14 +1036,12 @@
          }
          else
          {
-            content = new byte[decoder.contentLength + 1];
+            content = new byte[decoder.contentLength];
 
             System.arraycopy(decoder.workingBuffer, decoder.pos, content, 0, decoder.contentLength);
 
             decoder.pos += decoder.contentLength + 1;
             
-            content[decoder.contentLength] = 0;
-            
             //drain all the rest
             if (decoder.bodyStart == -1)
             {

Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/StompTest.java	2011-09-24 06:22:26 UTC (rev 11405)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/StompTest.java	2011-09-24 14:24:32 UTC (rev 11406)
@@ -192,7 +192,6 @@
 
    public void testSendMessageWithReceipt() throws Exception
    {
-
       MessageConsumer consumer = session.createConsumer(queue);
 
       String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
@@ -407,6 +406,9 @@
       sendMessage(payload, queue);
 
       frame = receiveFrame(10000);
+      
+      System.out.println("Message: " + frame);
+      
       Assert.assertTrue(frame.startsWith("MESSAGE"));
 
       Pattern cl = Pattern.compile("Content-length:\\s*(\\d+)", Pattern.CASE_INSENSITIVE);

Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java	2011-09-24 06:22:26 UTC (rev 11405)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java	2011-09-24 14:24:32 UTC (rev 11406)
@@ -149,6 +149,11 @@
    {
       return frameQueue.poll(10, TimeUnit.SECONDS);
    }
+
+   public ClientStompFrame receiveFrame(long timeout) throws InterruptedException
+   {
+      return frameQueue.poll(timeout, TimeUnit.MILLISECONDS);
+   }
    
    //put bytes to byte array.
    private void receiveBytes(int n) throws UnsupportedEncodingException

Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java	2011-09-24 06:22:26 UTC (rev 11405)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java	2011-09-24 14:24:32 UTC (rev 11406)
@@ -26,6 +26,8 @@
    
    ClientStompFrame receiveFrame() throws InterruptedException;
 
+   ClientStompFrame receiveFrame(long timeout) throws InterruptedException;;
+
    void connect() throws Exception;
 
    void disconnect() throws IOException, InterruptedException;

Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java	2011-09-24 06:22:26 UTC (rev 11405)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java	2011-09-24 14:24:32 UTC (rev 11406)
@@ -23,15 +23,16 @@
 import java.util.concurrent.TimeUnit;
 
 import javax.jms.BytesMessage;
+import javax.jms.DeliveryMode;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
 import javax.jms.TextMessage;
 
 import junit.framework.Assert;
 
 import org.hornetq.core.logging.Logger;
-import org.hornetq.core.protocol.stomp.Stomp;
 import org.hornetq.tests.integration.stomp.util.ClientStompFrame;
 import org.hornetq.tests.integration.stomp.util.StompClientConnection;
 import org.hornetq.tests.integration.stomp.util.StompClientConnectionFactory;
@@ -936,7 +937,46 @@
       
       Assert.assertNull(message);
    }
+   
+   public void testTwoSubscribers() throws Exception
+   {
+      connV11.connect(defUser, defPass, "myclientid");
 
+      this.subscribeTopic(connV11, "sub1", "auto", null);
+      
+      StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+      newConn.connect(defUser, defPass, "myclientid2");
+      
+      this.subscribeTopic(newConn, "sub2", "auto", null);
+
+      ClientStompFrame frame = connV11.createFrame("SEND");
+      frame.addHeader("destination", getTopicPrefix() + getTopicName());
+      
+      frame.setBody("Hello World");
+      
+      connV11.sendFrame(frame);
+      
+      // receive message from socket
+      frame = connV11.receiveFrame(1000);
+      
+      System.out.println("received frame : " + frame);
+      assertEquals("Hello World", frame.getBody());
+      assertEquals("sub1", frame.getHeader("subscription"));
+      
+      frame = newConn.receiveFrame(1000);
+      
+      System.out.println("received 2 frame : " + frame);
+      assertEquals("Hello World", frame.getBody());
+      assertEquals("sub2", frame.getHeader("subscription"));
+      
+      // remove suscription
+      this.unsubscribe(connV11, "sub1", true);
+      this.unsubscribe(newConn, "sub2", true);
+      
+      connV11.disconnect();
+      newConn.disconnect();
+   }
+
    //tests below are adapted from StompTest
    public void testBeginSameTransactionTwice() throws Exception
    {
@@ -1255,8 +1295,8 @@
       
       BytesMessage message = (BytesMessage)consumer.receive(10000);
       Assert.assertNotNull(message);
-      //there is one extra null byte
-      assertEquals(data.length + 1, message.getBodyLength());
+
+      assertEquals(data.length, message.getBodyLength());
       assertEquals(data[0], message.readByte());
       assertEquals(data[1], message.readByte());
       assertEquals(data[2], message.readByte());
@@ -1307,10 +1347,394 @@
       long tnow = System.currentTimeMillis();
       long tmsg = message.getJMSTimestamp();
       Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
+      
+      assertNull(consumer.receive(1000));
+      
+      connV11.disconnect();
    }
 
+   public void testSendMessageWithReceipt() throws Exception
+   {
+      MessageConsumer consumer = session.createConsumer(queue);
+
+      connV11.connect(defUser, defPass);
+
+      ClientStompFrame frame = connV11.createFrame("SEND");
+      frame.addHeader("destination", getQueuePrefix() + getQueueName());
+      frame.addHeader("receipt", "1234");
+      frame.setBody("Hello World");
+
+      frame = connV11.sendFrame(frame);
+      
+      assertTrue(frame.getCommand().equals("RECEIPT"));
+      assertEquals("1234", frame.getHeader("receipt-id"));
+
+      TextMessage message = (TextMessage)consumer.receive(1000);
+      Assert.assertNotNull(message);
+      Assert.assertEquals("Hello World", message.getText());
+
+      // Make sure that the timestamp is valid - should
+      // be very close to the current time.
+      long tnow = System.currentTimeMillis();
+      long tmsg = message.getJMSTimestamp();
+      Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
+      
+      connV11.disconnect();
+   }
+
+   public void testSendMessageWithStandardHeaders() throws Exception
+   {
+      MessageConsumer consumer = session.createConsumer(queue);
+
+      connV11.connect(defUser, defPass);
+
+      ClientStompFrame frame = connV11.createFrame("SEND");
+      frame.addHeader("destination", getQueuePrefix() + getQueueName());
+      frame.addHeader("correlation-id", "c123");
+      frame.addHeader("persistent", "true");
+      frame.addHeader("priority", "3");
+      frame.addHeader("type", "t345");
+      frame.addHeader("JMSXGroupID", "abc");
+      frame.addHeader("foo", "abc");
+      frame.addHeader("bar", "123");
+
+      frame.setBody("Hello World");
+
+      frame = connV11.sendFrame(frame);
+
+      TextMessage message = (TextMessage)consumer.receive(1000);
+      Assert.assertNotNull(message);
+      Assert.assertEquals("Hello World", message.getText());
+      Assert.assertEquals("JMSCorrelationID", "c123", message.getJMSCorrelationID());
+      Assert.assertEquals("getJMSType", "t345", message.getJMSType());
+      Assert.assertEquals("getJMSPriority", 3, message.getJMSPriority());
+      Assert.assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode());
+      Assert.assertEquals("foo", "abc", message.getStringProperty("foo"));
+      Assert.assertEquals("bar", "123", message.getStringProperty("bar"));
+
+      Assert.assertEquals("JMSXGroupID", "abc", message.getStringProperty("JMSXGroupID"));
+      
+      connV11.disconnect();
+   }
+
+   public void testSubscribeToTopic() throws Exception
+   {
+      connV11.connect(defUser, defPass);
+      
+      this.subscribeTopic(connV11, "sub1", null, null, true);
+
+      sendMessage(getName(), topic);
+      
+      ClientStompFrame frame = connV11.receiveFrame();
+
+      Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
+      Assert.assertTrue(frame.getHeader("destination").equals(getTopicPrefix() + getTopicName()));
+      Assert.assertTrue(frame.getBody().equals(getName()));
+
+      this.unsubscribe(connV11, "sub1", true);
+
+      sendMessage(getName(), topic);
+
+      frame = connV11.receiveFrame(1000);
+      assertNull(frame);
+
+      connV11.disconnect();
+   }
+
+   public void testSubscribeToTopicWithNoLocal() throws Exception
+   {
+      connV11.connect(defUser, defPass);
+
+      this.subscribeTopic(connV11, "sub1", null, null, true, true);
+
+      // send a message on the same connection => it should not be received
+      ClientStompFrame frame = connV11.createFrame("SEND");
+      frame.addHeader("destination", getTopicPrefix() + getTopicName());
+
+      frame.setBody("Hello World");
+
+      connV11.sendFrame(frame);
+
+      frame = connV11.receiveFrame(2000);
+      
+      assertNull(frame);
+
+      // send message on another JMS connection => it should be received
+      sendMessage(getName(), topic);
+
+      frame = connV11.receiveFrame();
+
+      Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
+      Assert.assertTrue(frame.getHeader("destination").equals(getTopicPrefix() + getTopicName()));
+      Assert.assertTrue(frame.getBody().equals(getName()));
+      
+      this.unsubscribe(connV11, "sub1");
+      
+      connV11.disconnect();
+   }
+
+   public void testSubscribeWithAutoAck() throws Exception
+   {
+      connV11.connect(defUser, defPass);
+
+      this.subscribe(connV11, "sub1", "auto");
+
+      sendMessage(getName());
+
+      ClientStompFrame frame = connV11.receiveFrame();
+
+      Assert.assertEquals("MESSAGE", frame.getCommand());
+      Assert.assertNotNull(frame.getHeader("destination"));
+      Assert.assertEquals(getName(), frame.getBody());
+
+      connV11.disconnect();
+      
+      // message should not be received as it was auto-acked
+      MessageConsumer consumer = session.createConsumer(queue);
+      Message message = consumer.receive(1000);
+      Assert.assertNull(message);
+   }
+
+   public void testSubscribeWithAutoAckAndBytesMessage() throws Exception
+   {
+      connV11.connect(defUser, defPass);
+      
+      this.subscribe(connV11, "sub1", "auto");
+
+      byte[] payload = new byte[] { 1, 2, 3, 4, 5 };
+      sendMessage(payload, queue);
+
+      ClientStompFrame frame = connV11.receiveFrame();
+
+      assertEquals("MESSAGE", frame.getCommand());
+      
+      System.out.println("Message: " + frame);
+
+      assertEquals("5", frame.getHeader("content-length"));
+
+      assertEquals(null, frame.getHeader("type"));
+      
+      assertEquals(frame.getBody(), new String(payload, "UTF-8"));
+      
+      connV11.disconnect();
+   }
+
+   public void testSubscribeWithClientAck() throws Exception
+   {
+      connV11.connect(defUser, defPass);
+      
+      this.subscribe(connV11, "sub1", "client");
+
+      sendMessage(getName());
+
+      ClientStompFrame frame = connV11.receiveFrame();
+
+      this.ack(connV11, "sub1", frame);
+
+      connV11.disconnect();
+
+      // message should not be received since message was acknowledged by the client
+      MessageConsumer consumer = session.createConsumer(queue);
+      Message message = consumer.receive(1000);
+      Assert.assertNull(message);
+   }
+
+   public void testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithExplicitDisconnect() throws Exception
+   {
+      assertSubscribeWithClientAckThenConsumeWithAutoAck(true);
+   }
+
+   public void testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithNoDisconnectFrame() throws Exception
+   {
+      assertSubscribeWithClientAckThenConsumeWithAutoAck(false);
+   }
+
+   public void testSubscribeWithID() throws Exception
+   {
+      connV11.connect(defUser, defPass);
+      
+      this.subscribe(connV11, "mysubid", "auto");
+
+      sendMessage(getName());
+
+      ClientStompFrame frame = connV11.receiveFrame();
+
+      Assert.assertTrue(frame.getHeader("subscription") != null);
+
+      connV11.disconnect();
+   }
+
+   public void testSubscribeWithMessageSentWithProperties() throws Exception
+   {
+      connV11.connect(defUser, defPass);
+      
+      this.subscribe(connV11, "sub1", "auto");
+
+      MessageProducer producer = session.createProducer(queue);
+      BytesMessage message = session.createBytesMessage();
+      message.setStringProperty("S", "value");
+      message.setBooleanProperty("n", false);
+      message.setByteProperty("byte", (byte)9);
+      message.setDoubleProperty("d", 2.0);
+      message.setFloatProperty("f", (float)6.0);
+      message.setIntProperty("i", 10);
+      message.setLongProperty("l", 121);
+      message.setShortProperty("s", (short)12);
+      message.writeBytes("Hello World".getBytes("UTF-8"));
+      producer.send(message);
+
+      ClientStompFrame frame = connV11.receiveFrame();
+      Assert.assertNotNull(frame);
+      
+      Assert.assertTrue(frame.getHeader("S") != null);
+      Assert.assertTrue(frame.getHeader("n") != null);
+      Assert.assertTrue(frame.getHeader("byte") != null);
+      Assert.assertTrue(frame.getHeader("d") != null);
+      Assert.assertTrue(frame.getHeader("f") != null);
+      Assert.assertTrue(frame.getHeader("i") != null);
+      Assert.assertTrue(frame.getHeader("l") != null);
+      Assert.assertTrue(frame.getHeader("s") != null);
+      Assert.assertEquals("Hello World", frame.getBody());
+
+      connV11.disconnect();
+   }
+
+   public void testSuccessiveTransactionsWithSameID() throws Exception
+   {
+      MessageConsumer consumer = session.createConsumer(queue);
+
+      connV11.connect(defUser, defPass);
+
+      // first tx
+      this.beginTransaction(connV11, "tx1");
+
+      ClientStompFrame frame = connV11.createFrame("SEND");
+      frame.addHeader("destination", getQueuePrefix() + getQueueName());
+      frame.addHeader("transaction", "tx1");
+      frame.setBody("Hello World");
+
+      connV11.sendFrame(frame);
+
+      this.commitTransaction(connV11, "tx1");
+
+      Message message = consumer.receive(1000);
+      Assert.assertNotNull("Should have received a message", message);
+
+      // 2nd tx with same tx ID
+      this.beginTransaction(connV11, "tx1");
+
+      frame = connV11.createFrame("SEND");
+      frame.addHeader("destination", getQueuePrefix() + getQueueName());
+      frame.addHeader("transaction", "tx1");
+      
+      frame.setBody("Hello World");
+      
+      connV11.sendFrame(frame);
+
+      this.commitTransaction(connV11, "tx1");
+
+      message = consumer.receive(1000);
+      Assert.assertNotNull("Should have received a message", message);
+      
+      connV11.disconnect();
+   }
+
+   public void testTransactionCommit() throws Exception
+   {
+      MessageConsumer consumer = session.createConsumer(queue);
+
+      connV11.connect(defUser, defPass);
+
+      this.beginTransaction(connV11, "tx1");
+
+      ClientStompFrame frame = connV11.createFrame("SEND");
+      frame.addHeader("destination", getQueuePrefix() + getQueueName());
+      frame.addHeader("transaction", "tx1");
+      frame.addHeader("receipt", "123");
+      frame.setBody("Hello World");
+
+      frame = connV11.sendFrame(frame);
+      
+      assertEquals("123", frame.getHeader("receipt-id"));
+      
+      // check the message is not committed
+      assertNull(consumer.receive(100));
+      
+      this.commitTransaction(connV11, "tx1", true);
+
+      Message message = consumer.receive(1000);
+      Assert.assertNotNull("Should have received a message", message);
+      
+      connV11.disconnect();
+   }
+
+   public void testTransactionRollback() throws Exception
+   {
+      MessageConsumer consumer = session.createConsumer(queue);
+
+      connV11.connect(defUser, defPass);
+      
+      this.beginTransaction(connV11, "tx1");
+
+      ClientStompFrame frame = connV11.createFrame("SEND");
+      frame.addHeader("destination", getQueuePrefix() + getQueueName());
+      frame.addHeader("transaction", "tx1");
+
+      frame.setBody("first message");
+      
+      connV11.sendFrame(frame);
+
+      // rollback first message
+      this.abortTransaction(connV11, "tx1");
+
+      this.beginTransaction(connV11, "tx1");
+
+      frame = connV11.createFrame("SEND");
+      frame.addHeader("destination", getQueuePrefix() + getQueueName());
+      frame.addHeader("transaction", "tx1");
+
+      frame.setBody("second message");
+      
+      connV11.sendFrame(frame);
+
+      this.commitTransaction(connV11, "tx1", true);
+
+      // only second msg should be received since first msg was rolled back
+      TextMessage message = (TextMessage)consumer.receive(1000);
+      Assert.assertNotNull(message);
+      Assert.assertEquals("second message", message.getText());
+      
+      connV11.disconnect();
+   }
+
+   public void testUnsubscribe() throws Exception
+   {
+      connV11.connect(defUser, defPass);
+
+      this.subscribe(connV11, "sub1", "auto");
+
+      // send a message to our queue
+      sendMessage("first message");
+
+      // receive message from socket
+      ClientStompFrame frame = connV11.receiveFrame();
+      
+      Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
+
+      // remove suscription
+      this.unsubscribe(connV11, "sub1", true);
+
+      // send a message to our queue
+      sendMessage("second message");
+
+      frame = connV11.receiveFrame(1000);
+      assertNull(frame);
+      
+      connV11.disconnect();
+   }
+
    //-----------------private help methods
-   
+
    private void abortTransaction(StompClientConnection conn, String txID) throws IOException, InterruptedException
    {
       ClientStompFrame abortFrame = conn.createFrame("ABORT");
@@ -1326,7 +1750,27 @@
       
       conn.sendFrame(beginFrame);
    }
-   
+
+   private void commitTransaction(StompClientConnection conn, String txID) throws IOException, InterruptedException
+   {
+      commitTransaction(conn, txID, false);
+   }
+
+   private void commitTransaction(StompClientConnection conn, String txID, boolean receipt) throws IOException, InterruptedException
+   {
+      ClientStompFrame beginFrame = conn.createFrame("COMMIT");
+      beginFrame.addHeader("transaction", txID);
+      if (receipt)
+      {
+         beginFrame.addHeader("receipt", "1234");
+      }
+      ClientStompFrame resp = conn.sendFrame(beginFrame);
+      if (receipt)
+      {
+         assertEquals("1234", resp.getHeader("receipt-id"));
+      }
+   }
+
    private void ack(StompClientConnection conn, String subId,
          ClientStompFrame frame) throws IOException, InterruptedException
    {
@@ -1376,14 +1820,30 @@
    {
       subscribe(conn, subId, ack, durableId, null);
    }
+
+   private void subscribe(StompClientConnection conn, String subId,
+         String ack, String durableId, boolean receipt) throws IOException, InterruptedException
+   {
+      subscribe(conn, subId, ack, durableId, null, receipt);
+   }
+
+   private void subscribe(StompClientConnection conn, String subId, String ack,
+         String durableId, String selector) throws IOException,
+         InterruptedException
+   {
+      subscribe(conn, subId, ack, durableId, selector, false);
+   }
    
    private void subscribe(StompClientConnection conn, String subId,
-         String ack, String durableId, String selector) throws IOException, InterruptedException
+         String ack, String durableId, String selector, boolean receipt) throws IOException, InterruptedException
    {
       ClientStompFrame subFrame = conn.createFrame("SUBSCRIBE");
       subFrame.addHeader("id", subId);
       subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
-      subFrame.addHeader("ack", ack);
+      if (ack != null)
+      {
+         subFrame.addHeader("ack", ack);
+      }
       if (durableId != null)
       {
          subFrame.addHeader("durable-subscriber-name", durableId);
@@ -1392,19 +1852,60 @@
       {
          subFrame.addHeader("selector", selector);
       }
-      conn.sendFrame(subFrame);
+      if (receipt)
+      {
+         subFrame.addHeader("receipt", "1234");
+      }
+      
+      subFrame = conn.sendFrame(subFrame);
+      
+      if (receipt)
+      {
+         assertEquals("1234", subFrame.getHeader("receipt-id"));
+      }
    }
 
    private void subscribeTopic(StompClientConnection conn, String subId,
          String ack, String durableId) throws IOException, InterruptedException
    {
+      subscribeTopic(conn, subId, ack, durableId, false);
+   }
+
+   private void subscribeTopic(StompClientConnection conn, String subId,
+         String ack, String durableId, boolean receipt) throws IOException, InterruptedException
+   {
+      subscribeTopic(conn, subId, ack, durableId, receipt, false);
+   }
+
+   private void subscribeTopic(StompClientConnection conn, String subId,
+         String ack, String durableId, boolean receipt, boolean noLocal) throws IOException, InterruptedException
+   {
       ClientStompFrame subFrame = conn.createFrame("SUBSCRIBE");
       subFrame.addHeader("id", subId);
       subFrame.addHeader("destination", getTopicPrefix() + getTopicName());
-      subFrame.addHeader("ack", ack);
-      subFrame.addHeader("durable-subscriber-name", durableId);
+      if (ack != null)
+      {
+         subFrame.addHeader("ack", ack);
+      }
+      if (durableId != null)
+      {
+         subFrame.addHeader("durable-subscriber-name", durableId);
+      }
+      if (receipt)
+      {
+         subFrame.addHeader("receipt", "1234");
+      }
+      if (noLocal)
+      {
+         subFrame.addHeader("no-local", "true");
+      }
       
-      conn.sendFrame(subFrame);
+      ClientStompFrame frame = conn.sendFrame(subFrame);
+      
+      if (receipt)
+      {
+         assertTrue(frame.getHeader("receipt-id").equals("1234"));
+      }
    }
 
    private void unsubscribe(StompClientConnection conn, String subId) throws IOException, InterruptedException
@@ -1414,7 +1915,77 @@
       
       conn.sendFrame(subFrame);
    }
+   
+   private void unsubscribe(StompClientConnection conn, String subId,
+         boolean receipt) throws IOException, InterruptedException
+   {
+      ClientStompFrame subFrame = conn.createFrame("UNSUBSCRIBE");
+      subFrame.addHeader("id", subId);
+      
+      if (receipt)
+      {
+         subFrame.addHeader("receipt", "4321");
+      }
+      
+      ClientStompFrame f = conn.sendFrame(subFrame);
+      
+      if (receipt)
+      {
+         System.out.println("response: " + f);
+         assertEquals("RECEIPT", f.getCommand());
+         assertEquals("4321", f.getHeader("receipt-id"));
+      }
+   }
 
+   protected void assertSubscribeWithClientAckThenConsumeWithAutoAck(boolean sendDisconnect) throws Exception
+   {
+      connV11.connect(defUser, defPass);
+
+      this.subscribe(connV11, "sub1", "client");
+
+      sendMessage(getName());
+      
+      ClientStompFrame frame = connV11.receiveFrame();
+
+      Assert.assertEquals("MESSAGE", frame.getCommand());
+
+      log.info("Reconnecting!");
+
+      if (sendDisconnect)
+      {
+         connV11.disconnect();
+         connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+      }
+      else
+      {
+         connV11.destroy();
+         connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+      }
+
+      // message should be received since message was not acknowledged
+      connV11.connect(defUser, defPass);
+
+      this.subscribe(connV11, "sub1", null);
+
+      frame = connV11.receiveFrame();
+      Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
+
+      connV11.disconnect();
+
+      // now lets make sure we don't see the message again
+      connV11.destroy();
+      connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+      connV11.connect(defUser, defPass);
+      
+      this.subscribe(connV11, "sub1", null, null, true);
+
+      sendMessage("shouldBeNextMessage");
+
+      frame = connV11.receiveFrame();
+      Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
+      Assert.assertEquals("shouldBeNextMessage", frame.getBody());
+   }
+
 }
 
 



More information about the hornetq-commits mailing list