[hornetq-commits] JBoss hornetq SVN: r11874 - trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Dec 7 22:05:11 EST 2011


Author: gaohoward
Date: 2011-12-07 22:05:11 -0500 (Wed, 07 Dec 2011)
New Revision: 11874

Added:
   trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompV11Test.java
Removed:
   trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
Log:
test name correction



Deleted: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java	2011-12-07 20:53:32 UTC (rev 11873)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java	2011-12-08 03:05:11 UTC (rev 11874)
@@ -1,2221 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.hornetq.tests.integration.stomp.v11;
-
-import java.io.IOException;
-import java.nio.channels.ClosedChannelException;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.BytesMessage;
-import javax.jms.DeliveryMode;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.TextMessage;
-
-import junit.framework.Assert;
-
-import org.hornetq.core.logging.Logger;
-import org.hornetq.tests.integration.stomp.util.ClientStompFrame;
-import org.hornetq.tests.integration.stomp.util.StompClientConnection;
-import org.hornetq.tests.integration.stomp.util.StompClientConnectionFactory;
-import org.hornetq.tests.integration.stomp.util.StompClientConnectionV11;
-
-/*
- * 
- */
-public class StompTestV11 extends StompTestBase2
-{
-   private static final transient Logger log = Logger.getLogger(StompTestV11.class);
-   
-   private StompClientConnection connV11;
-   
-   protected void setUp() throws Exception
-   {
-      super.setUp();
-      connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
-   }
-   
-   protected void tearDown() throws Exception
-   {
-      System.out.println("Connection 11 : " + connV11.isConnected());
-      if (connV11.isConnected())
-      {
-         connV11.disconnect();
-      }
-      super.tearDown();
-   }
-   
-   public void testConnection() throws Exception
-   {
-      StompClientConnection connection = StompClientConnectionFactory.createClientConnection("1.0", hostname, port);
-      
-      connection.connect(defUser, defPass);
-      
-      assertTrue(connection.isConnected());
-      
-      assertEquals("1.0", connection.getVersion());
-      
-      connection.disconnect();
-
-      connection = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
-      
-      connection.connect(defUser, defPass);
-      
-      assertTrue(connection.isConnected());
-      
-      assertEquals("1.1", connection.getVersion());
-      
-      connection.disconnect();
-      
-      connection = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
-      
-      connection.connect();
-      
-      assertFalse(connection.isConnected());
-      
-      //new way of connection
-      StompClientConnectionV11 conn = (StompClientConnectionV11) StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
-      conn.connect1(defUser, defPass);
-      
-      assertTrue(conn.isConnected());
-      
-      conn.disconnect();
-   }
-   
-   public void testNegotiation() throws Exception
-   {
-      // case 1 accept-version absent. It is a 1.0 connect
-      ClientStompFrame frame = connV11.createFrame("CONNECT");
-      frame.addHeader("host", "127.0.0.1");
-      frame.addHeader("login", this.defUser);
-      frame.addHeader("passcode", this.defPass);
-      
-      ClientStompFrame reply = connV11.sendFrame(frame);
-      
-      assertEquals("CONNECTED", reply.getCommand());
-      
-      //reply headers: version, session, server
-      assertEquals(null, reply.getHeader("version"));
-
-      connV11.disconnect();
-
-      // case 2 accept-version=1.0, result: 1.0
-      connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
-      frame = connV11.createFrame("CONNECT");
-      frame.addHeader("accept-version", "1.0");
-      frame.addHeader("host", "127.0.0.1");
-      frame.addHeader("login", this.defUser);
-      frame.addHeader("passcode", this.defPass);
-      
-      reply = connV11.sendFrame(frame);
-      
-      assertEquals("CONNECTED", reply.getCommand());
-      
-      //reply headers: version, session, server
-      assertEquals("1.0", reply.getHeader("version"));
-      
-      connV11.disconnect();
-
-      // case 3 accept-version=1.1, result: 1.1
-      connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
-      frame = connV11.createFrame("CONNECT");
-      frame.addHeader("accept-version", "1.1");
-      frame.addHeader("host", "127.0.0.1");
-      frame.addHeader("login", this.defUser);
-      frame.addHeader("passcode", this.defPass);
-      
-      reply = connV11.sendFrame(frame);
-      
-      assertEquals("CONNECTED", reply.getCommand());
-      
-      //reply headers: version, session, server
-      assertEquals("1.1", reply.getHeader("version"));
-      
-      connV11.disconnect();
-
-      // case 4 accept-version=1.0,1.1,1.2, result 1.1
-      connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
-      frame = connV11.createFrame("CONNECT");
-      frame.addHeader("accept-version", "1.0,1.1,1.2");
-      frame.addHeader("host", "127.0.0.1");
-      frame.addHeader("login", this.defUser);
-      frame.addHeader("passcode", this.defPass);
-      
-      reply = connV11.sendFrame(frame);
-      
-      assertEquals("CONNECTED", reply.getCommand());
-      
-      //reply headers: version, session, server
-      assertEquals("1.1", reply.getHeader("version"));
-      
-      connV11.disconnect();
-
-      // case 5 accept-version=1.2, result error
-      connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
-      frame = connV11.createFrame("CONNECT");
-      frame.addHeader("accept-version", "1.2");
-      frame.addHeader("host", "127.0.0.1");
-      frame.addHeader("login", this.defUser);
-      frame.addHeader("passcode", this.defPass);
-      
-      reply = connV11.sendFrame(frame);
-      
-      assertEquals("ERROR", reply.getCommand());
-      
-      System.out.println("Got error frame " + reply);
-      
-   }
-   
-   public void testSendAndReceive() throws Exception
-   {
-      connV11.connect(defUser, defPass);
-      ClientStompFrame frame = connV11.createFrame("SEND");
-      frame.addHeader("destination", getQueuePrefix() + getQueueName());
-      frame.addHeader("content-type", "text/plain");
-      frame.setBody("Hello World 1!");
-      
-      ClientStompFrame response = connV11.sendFrame(frame);
-      
-      assertNull(response);
-      
-      frame.addHeader("receipt", "1234");
-      frame.setBody("Hello World 2!");
-      
-      response = connV11.sendFrame(frame);
-      
-      assertNotNull(response);
-      
-      assertEquals("RECEIPT", response.getCommand());
-      
-      assertEquals("1234", response.getHeader("receipt-id"));
-      
-      //subscribe
-      StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
-      newConn.connect(defUser, defPass);
-      
-      ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
-      subFrame.addHeader("id", "a-sub");
-      subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
-      subFrame.addHeader("ack", "auto");
-      
-      newConn.sendFrame(subFrame);
-      
-      frame = newConn.receiveFrame();
-      
-      System.out.println("received " + frame);
-      
-      assertEquals("MESSAGE", frame.getCommand());
-      
-      assertEquals("a-sub", frame.getHeader("subscription"));
-      
-      assertNotNull(frame.getHeader("message-id"));
-      
-      assertEquals(getQueuePrefix() + getQueueName(), frame.getHeader("destination"));
-      
-      assertEquals("Hello World 1!", frame.getBody());
-      
-      frame = newConn.receiveFrame();
-      
-      System.out.println("received " + frame);      
-      
-      //unsub
-      ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
-      unsubFrame.addHeader("id", "a-sub");
-      newConn.sendFrame(unsubFrame);
-      
-      newConn.disconnect();
-   }
-
-   public void testHeaderContentType() throws Exception
-   {
-      connV11.connect(defUser, defPass);
-      ClientStompFrame frame = connV11.createFrame("SEND");
-      frame.addHeader("destination", getQueuePrefix() + getQueueName());
-      frame.addHeader("content-type", "application/xml");
-      frame.setBody("Hello World 1!");
-      
-      connV11.sendFrame(frame);
-      
-      //subscribe
-      StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
-      newConn.connect(defUser, defPass);
-      
-      ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
-      subFrame.addHeader("id", "a-sub");
-      subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
-      subFrame.addHeader("ack", "auto");
-      
-      newConn.sendFrame(subFrame);
-      
-      frame = newConn.receiveFrame();
-      
-      System.out.println("received " + frame);
-      
-      assertEquals("MESSAGE", frame.getCommand());
-      
-      assertEquals("application/xml", frame.getHeader("content-type"));
-      
-      //unsub
-      ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
-      unsubFrame.addHeader("id", "a-sub");
-      
-      newConn.disconnect();
-   }
-
-   public void testHeaderContentLength() throws Exception
-   {
-      connV11.connect(defUser, defPass);
-      ClientStompFrame frame = connV11.createFrame("SEND");
-      
-      String body = "Hello World 1!";
-      String cLen = String.valueOf(body.getBytes("UTF-8").length);
-      
-      frame.addHeader("destination", getQueuePrefix() + getQueueName());
-      frame.addHeader("content-type", "application/xml");
-      frame.addHeader("content-length", cLen);
-      frame.setBody(body + "extra");
-      
-      connV11.sendFrame(frame);
-      
-      //subscribe
-      StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
-      newConn.connect(defUser, defPass);
-      
-      ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
-      subFrame.addHeader("id", "a-sub");
-      subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
-      subFrame.addHeader("ack", "auto");
-      
-      newConn.sendFrame(subFrame);
-      
-      frame = newConn.receiveFrame();
-      
-      System.out.println("received " + frame);
-      
-      assertEquals("MESSAGE", frame.getCommand());
-      
-      assertEquals(cLen, frame.getHeader("content-length"));
-      
-      //unsub
-      ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
-      unsubFrame.addHeader("id", "a-sub");
-      
-      newConn.disconnect();
-   }
-
-   public void testHeaderEncoding() throws Exception
-   {
-      connV11.connect(defUser, defPass);
-      ClientStompFrame frame = connV11.createFrame("SEND");
-      
-      String body = "Hello World 1!";
-      String cLen = String.valueOf(body.getBytes("UTF-8").length);
-      
-      frame.addHeader("destination", getQueuePrefix() + getQueueName());
-      frame.addHeader("content-type", "application/xml");
-      frame.addHeader("content-length", cLen);
-      String hKey = "special-header\\\\\\n\\:";
-      String hVal = "\\:\\\\\\ngood";
-      frame.addHeader(hKey, hVal);
-      
-      System.out.println("key: |" + hKey + "| val: |" + hVal);
-      
-      frame.setBody(body);
-      
-      connV11.sendFrame(frame);
-      
-      //subscribe
-      StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
-      newConn.connect(defUser, defPass);
-      
-      ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
-      subFrame.addHeader("id", "a-sub");
-      subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
-      subFrame.addHeader("ack", "auto");
-      
-      newConn.sendFrame(subFrame);
-      
-      frame = newConn.receiveFrame();
-      
-      System.out.println("received " + frame);
-      
-      assertEquals("MESSAGE", frame.getCommand());
-      
-      String value = frame.getHeader("special-header" + "\\" + "\n" + ":");
-      
-      assertEquals(":" + "\\" + "\n" + "good", value);
-      
-      //unsub
-      ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
-      unsubFrame.addHeader("id", "a-sub");
-      
-      newConn.disconnect();
-   }
-   
-   public void testHeartBeat() throws Exception
-   {
-      //no heart beat at all if heat-beat absent
-      ClientStompFrame frame = connV11.createFrame("CONNECT");
-      frame.addHeader("host", "127.0.0.1");
-      frame.addHeader("login", this.defUser);
-      frame.addHeader("passcode", this.defPass);
-      
-      ClientStompFrame reply = connV11.sendFrame(frame);
-      
-      assertEquals("CONNECTED", reply.getCommand());
-      
-      Thread.sleep(5000);
-      
-      assertEquals(0, connV11.getFrameQueueSize());
-      
-      connV11.disconnect();
-      
-      //no heart beat for (0,0)
-      connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
-      frame = connV11.createFrame("CONNECT");
-      frame.addHeader("host", "127.0.0.1");
-      frame.addHeader("login", this.defUser);
-      frame.addHeader("passcode", this.defPass);
-      frame.addHeader("heart-beat", "0,0");
-      frame.addHeader("accept-version", "1.0,1.1");
-      
-      reply = connV11.sendFrame(frame);
-      
-      assertEquals("CONNECTED", reply.getCommand());
-      
-      assertEquals("0,0", reply.getHeader("heart-beat"));
-      
-      Thread.sleep(5000);
-      
-      assertEquals(0, connV11.getFrameQueueSize());
-      
-      connV11.disconnect();
-
-      //heart-beat (1,0), should receive a min client ping accepted by server
-      connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
-      frame = connV11.createFrame("CONNECT");
-      frame.addHeader("host", "127.0.0.1");
-      frame.addHeader("login", this.defUser);
-      frame.addHeader("passcode", this.defPass);
-      frame.addHeader("heart-beat", "1,0");
-      frame.addHeader("accept-version", "1.0,1.1");
-      
-      reply = connV11.sendFrame(frame);
-      
-      assertEquals("CONNECTED", reply.getCommand());
-      
-      assertEquals("0,500", reply.getHeader("heart-beat"));
-      
-      Thread.sleep(2000);
-      
-      //now server side should be disconnected because we didn't send ping for 2 sec
-      frame = connV11.createFrame("SEND");
-      frame.addHeader("destination", getQueuePrefix() + getQueueName());
-      frame.addHeader("content-type", "text/plain");
-      frame.setBody("Hello World");
-
-      //send will fail
-      try
-      {
-         connV11.sendFrame(frame);
-         fail("connection should have been destroyed by now");
-      }
-      catch (IOException e)
-      {
-         //ignore
-      }
-      
-      //heart-beat (1,0), start a ping, then send a message, should be ok.
-      connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
-      frame = connV11.createFrame("CONNECT");
-      frame.addHeader("host", "127.0.0.1");
-      frame.addHeader("login", this.defUser);
-      frame.addHeader("passcode", this.defPass);
-      frame.addHeader("heart-beat", "1,0");
-      frame.addHeader("accept-version", "1.0,1.1");
-      
-      reply = connV11.sendFrame(frame);
-      
-      assertEquals("CONNECTED", reply.getCommand());
-      
-      assertEquals("0,500", reply.getHeader("heart-beat"));
-      
-      System.out.println("========== start pinger!");
-      
-      connV11.startPinger(500);
-      
-      Thread.sleep(2000);
-      
-      //now server side should be disconnected because we didn't send ping for 2 sec
-      frame = connV11.createFrame("SEND");
-      frame.addHeader("destination", getQueuePrefix() + getQueueName());
-      frame.addHeader("content-type", "text/plain");
-      frame.setBody("Hello World");
-
-      //send will be ok
-      connV11.sendFrame(frame);
-      
-      connV11.stopPinger();
-      
-      connV11.disconnect();
-
-   }
-   
-   //server ping
-   public void testHeartBeat2() throws Exception
-   {
-      //heart-beat (1,1)
-      ClientStompFrame frame = connV11.createFrame("CONNECT");
-      frame.addHeader("host", "127.0.0.1");
-      frame.addHeader("login", this.defUser);
-      frame.addHeader("passcode", this.defPass);
-      frame.addHeader("heart-beat", "1,1");
-      frame.addHeader("accept-version", "1.0,1.1");
-      
-      ClientStompFrame reply = connV11.sendFrame(frame);
-      
-      assertEquals("CONNECTED", reply.getCommand());
-      assertEquals("500,500", reply.getHeader("heart-beat"));
-
-      connV11.disconnect();
-      
-      //heart-beat (500,1000)
-      connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
-      frame = connV11.createFrame("CONNECT");
-      frame.addHeader("host", "127.0.0.1");
-      frame.addHeader("login", this.defUser);
-      frame.addHeader("passcode", this.defPass);
-      frame.addHeader("heart-beat", "500,1000");
-      frame.addHeader("accept-version", "1.0,1.1");
-      
-      reply = connV11.sendFrame(frame);
-      
-      assertEquals("CONNECTED", reply.getCommand());
-      
-      assertEquals("1000,500", reply.getHeader("heart-beat"));
-      
-      System.out.println("========== start pinger!");
-      
-      connV11.startPinger(500);
-      
-      Thread.sleep(10000);
-      
-      //now check the frame size
-      int size = connV11.getServerPingNumber();
-      
-      System.out.println("ping received: " + size);
-      
-      assertTrue(size > 5);
-      
-      //now server side should be disconnected because we didn't send ping for 2 sec
-      frame = connV11.createFrame("SEND");
-      frame.addHeader("destination", getQueuePrefix() + getQueueName());
-      frame.addHeader("content-type", "text/plain");
-      frame.setBody("Hello World");
-
-      //send will be ok
-      connV11.sendFrame(frame);
-      
-      connV11.disconnect();
-   }
-   
-   public void testSendWithHeartBeatsAndReceive() throws Exception
-   {
-      StompClientConnection newConn = null;
-      try
-      {
-         ClientStompFrame frame = connV11.createFrame("CONNECT");
-         frame.addHeader("host", "127.0.0.1");
-         frame.addHeader("login", this.defUser);
-         frame.addHeader("passcode", this.defPass);
-         frame.addHeader("heart-beat", "500,1000");
-         frame.addHeader("accept-version", "1.0,1.1");
-
-         connV11.sendFrame(frame);
-
-         connV11.startPinger(500);
-
-         frame = connV11.createFrame("SEND");
-         frame.addHeader("destination", getQueuePrefix() + getQueueName());
-         frame.addHeader("content-type", "text/plain");
-
-         for (int i = 0; i < 10; i++)
-         {
-            frame.setBody("Hello World " + i + "!");
-            connV11.sendFrame(frame);
-            Thread.sleep(500);
-         }
-
-         // subscribe
-         newConn = StompClientConnectionFactory.createClientConnection("1.1",
-               hostname, port);
-         newConn.connect(defUser, defPass);
-
-         ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
-         subFrame.addHeader("id", "a-sub");
-         subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
-         subFrame.addHeader("ack", "auto");
-
-         newConn.sendFrame(subFrame);
-
-         int cnt = 0;
-
-         frame = newConn.receiveFrame();
-
-         while (frame != null)
-         {
-            cnt++;
-            Thread.sleep(500);
-            frame = newConn.receiveFrame(5000);
-         }
-
-         assertEquals(10, cnt);
-
-         // unsub
-         ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
-         unsubFrame.addHeader("id", "a-sub");
-         newConn.sendFrame(unsubFrame);
-      }
-      finally
-      {
-         if (newConn != null)
-            newConn.disconnect();
-         connV11.disconnect();
-      }
-   }
-   
-   public void testSendAndReceiveWithHeartBeats() throws Exception
-   {
-      connV11.connect(defUser, defPass);
-      ClientStompFrame frame = connV11.createFrame("SEND");
-      frame.addHeader("destination", getQueuePrefix() + getQueueName());
-      frame.addHeader("content-type", "text/plain");
-      
-      for (int i = 0; i < 10; i++)
-      {
-         frame.setBody("Hello World " + i + "!");
-         connV11.sendFrame(frame);
-         Thread.sleep(500);
-      }
-      
-      //subscribe
-      StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
-      try
-      {
-         frame = newConn.createFrame("CONNECT");
-         frame.addHeader("host", "127.0.0.1");
-         frame.addHeader("login", this.defUser);
-         frame.addHeader("passcode", this.defPass);
-         frame.addHeader("heart-beat", "500,1000");
-         frame.addHeader("accept-version", "1.0,1.1");
-
-         newConn.sendFrame(frame);
-
-         newConn.startPinger(500);
-
-         Thread.sleep(500);
-
-         ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
-         subFrame.addHeader("id", "a-sub");
-         subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
-         subFrame.addHeader("ack", "auto");
-
-         newConn.sendFrame(subFrame);
-
-         int cnt = 0;
-
-         frame = newConn.receiveFrame();
-
-         while (frame != null)
-         {
-            cnt++;
-            Thread.sleep(500);
-            frame = newConn.receiveFrame(5000);
-         }
-
-         assertEquals(10, cnt);
-
-         // unsub
-         ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
-         unsubFrame.addHeader("id", "a-sub");
-         newConn.sendFrame(unsubFrame);
-      }
-      finally
-      {
-         newConn.disconnect();
-      }
-   }
-
-   public void testSendWithHeartBeatsAndReceiveWithHeartBeats() throws Exception
-   {
-      StompClientConnection newConn = null;
-      try
-      {
-         ClientStompFrame frame = connV11.createFrame("CONNECT");
-         frame.addHeader("host", "127.0.0.1");
-         frame.addHeader("login", this.defUser);
-         frame.addHeader("passcode", this.defPass);
-         frame.addHeader("heart-beat", "500,1000");
-         frame.addHeader("accept-version", "1.0,1.1");
-
-         connV11.sendFrame(frame);
-
-         connV11.startPinger(500);
-
-         frame = connV11.createFrame("SEND");
-         frame.addHeader("destination", getQueuePrefix() + getQueueName());
-         frame.addHeader("content-type", "text/plain");
-
-         for (int i = 0; i < 10; i++)
-         {
-            frame.setBody("Hello World " + i + "!");
-            connV11.sendFrame(frame);
-            Thread.sleep(500);
-         }
-
-         // subscribe
-         newConn = StompClientConnectionFactory.createClientConnection("1.1",
-               hostname, port);
-         frame = newConn.createFrame("CONNECT");
-         frame.addHeader("host", "127.0.0.1");
-         frame.addHeader("login", this.defUser);
-         frame.addHeader("passcode", this.defPass);
-         frame.addHeader("heart-beat", "500,1000");
-         frame.addHeader("accept-version", "1.0,1.1");
-
-         newConn.sendFrame(frame);
-
-         newConn.startPinger(500);
-
-         Thread.sleep(500);
-
-         ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
-         subFrame.addHeader("id", "a-sub");
-         subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
-         subFrame.addHeader("ack", "auto");
-
-         newConn.sendFrame(subFrame);
-
-         int cnt = 0;
-
-         frame = newConn.receiveFrame();
-
-         while (frame != null)
-         {
-            cnt++;
-            Thread.sleep(500);
-            frame = newConn.receiveFrame(5000);
-         }
-         assertEquals(10, cnt);
-
-         // unsub
-         ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
-         unsubFrame.addHeader("id", "a-sub");
-         newConn.sendFrame(unsubFrame);
-      }
-      finally
-      {
-         if (newConn != null)
-            newConn.disconnect();
-         connV11.disconnect();
-      }
-   }
-
-   public void testNack() throws Exception
-   {
-      connV11.connect(defUser, defPass);
-
-      subscribe(connV11, "sub1", "client");
-
-      sendMessage(getName());
-
-      ClientStompFrame frame = connV11.receiveFrame();
-
-      String messageID = frame.getHeader("message-id");
-      
-      System.out.println("Received message with id " + messageID);
-      
-      nack(connV11, "sub1", messageID);
-      
-      unsubscribe(connV11, "sub1");
-      
-      connV11.disconnect();
-
-      //Nack makes the message be dropped.
-      MessageConsumer consumer = session.createConsumer(queue);
-      Message message = consumer.receive(1000);
-      Assert.assertNull(message);
-   }
-
-   public void testNackWithWrongSubId() throws Exception
-   {
-      connV11.connect(defUser, defPass);
-
-      subscribe(connV11, "sub1", "client");
-
-      sendMessage(getName());
-
-      ClientStompFrame frame = connV11.receiveFrame();
-
-      String messageID = frame.getHeader("message-id");
-      
-      System.out.println("Received message with id " + messageID);
-      
-      nack(connV11, "sub2", messageID);
-      
-      ClientStompFrame error = connV11.receiveFrame();
-      
-      System.out.println("Receiver error: " + error);
-      
-      unsubscribe(connV11, "sub1");
-      
-      connV11.disconnect();
-
-      //message should be still there
-      MessageConsumer consumer = session.createConsumer(queue);
-      Message message = consumer.receive(1000);
-      Assert.assertNotNull(message);
-   }
-
-   public void testNackWithWrongMessageId() throws Exception
-   {
-      connV11.connect(defUser, defPass);
-
-      subscribe(connV11, "sub1", "client");
-
-      sendMessage(getName());
-
-      ClientStompFrame frame = connV11.receiveFrame();
-
-      String messageID = frame.getHeader("message-id");
-      
-      System.out.println("Received message with id " + messageID);
-      
-      nack(connV11, "sub2", "someother");
-      
-      ClientStompFrame error = connV11.receiveFrame();
-      
-      System.out.println("Receiver error: " + error);
-      
-      unsubscribe(connV11, "sub1");
-      
-      connV11.disconnect();
-
-      //message should still there
-      MessageConsumer consumer = session.createConsumer(queue);
-      Message message = consumer.receive(1000);
-      Assert.assertNotNull(message);
-   }
-   
-   
-   public void testAck() throws Exception
-   {
-      connV11.connect(defUser, defPass);
-
-      subscribe(connV11, "sub1", "client");
-
-      sendMessage(getName());
-
-      ClientStompFrame frame = connV11.receiveFrame();
-
-      String messageID = frame.getHeader("message-id");
-      
-      System.out.println("Received message with id " + messageID);
-      
-      ack(connV11, "sub1", messageID, null);
-      
-      unsubscribe(connV11, "sub1");
-      
-      connV11.disconnect();
-
-      //Nack makes the message be dropped.
-      MessageConsumer consumer = session.createConsumer(queue);
-      Message message = consumer.receive(1000);
-      Assert.assertNull(message);
-   }
-
-   public void testAckWithWrongSubId() throws Exception
-   {
-      connV11.connect(defUser, defPass);
-
-      subscribe(connV11, "sub1", "client");
-
-      sendMessage(getName());
-
-      ClientStompFrame frame = connV11.receiveFrame();
-
-      String messageID = frame.getHeader("message-id");
-      
-      System.out.println("Received message with id " + messageID);
-      
-      ack(connV11, "sub2", messageID, null);
-      
-      ClientStompFrame error = connV11.receiveFrame();
-      
-      System.out.println("Receiver error: " + error);
-      
-      unsubscribe(connV11, "sub1");
-      
-      connV11.disconnect();
-
-      //message should be still there
-      MessageConsumer consumer = session.createConsumer(queue);
-      Message message = consumer.receive(1000);
-      Assert.assertNotNull(message);
-   }
-
-   public void testAckWithWrongMessageId() throws Exception
-   {
-      connV11.connect(defUser, defPass);
-
-      subscribe(connV11, "sub1", "client");
-
-      sendMessage(getName());
-
-      ClientStompFrame frame = connV11.receiveFrame();
-
-      String messageID = frame.getHeader("message-id");
-      
-      System.out.println("Received message with id " + messageID);
-      
-      ack(connV11, "sub2", "someother", null);
-      
-      ClientStompFrame error = connV11.receiveFrame();
-      
-      System.out.println("Receiver error: " + error);
-      
-      unsubscribe(connV11, "sub1");
-      
-      connV11.disconnect();
-
-      //message should still there
-      MessageConsumer consumer = session.createConsumer(queue);
-      Message message = consumer.receive(1000);
-      Assert.assertNotNull(message);
-   }
-   
-   public void testErrorWithReceipt() throws Exception
-   {
-      connV11.connect(defUser, defPass);
-
-      subscribe(connV11, "sub1", "client");
-
-      sendMessage(getName());
-
-      ClientStompFrame frame = connV11.receiveFrame();
-
-      String messageID = frame.getHeader("message-id");
-      
-      System.out.println("Received message with id " + messageID);
-      
-      ClientStompFrame ackFrame = connV11.createFrame("ACK");
-      //give it a wrong sub id
-      ackFrame.addHeader("subscription", "sub2");
-      ackFrame.addHeader("message-id", messageID);
-      ackFrame.addHeader("receipt", "answer-me");
-      
-      ClientStompFrame error = connV11.sendFrame(ackFrame);
-      
-      System.out.println("Receiver error: " + error);
-      
-      assertEquals("ERROR", error.getCommand());
-      
-      assertEquals("answer-me", error.getHeader("receipt-id"));
-      
-      unsubscribe(connV11, "sub1");
-      
-      connV11.disconnect();
-
-      //message should still there
-      MessageConsumer consumer = session.createConsumer(queue);
-      Message message = consumer.receive(1000);
-      Assert.assertNotNull(message);      
-   }
-   
-   public void testErrorWithReceipt2() throws Exception
-   {
-      connV11.connect(defUser, defPass);
-
-      subscribe(connV11, "sub1", "client");
-
-      sendMessage(getName());
-
-      ClientStompFrame frame = connV11.receiveFrame();
-
-      String messageID = frame.getHeader("message-id");
-      
-      System.out.println("Received message with id " + messageID);
-      
-      ClientStompFrame ackFrame = connV11.createFrame("ACK");
-      //give it a wrong sub id
-      ackFrame.addHeader("subscription", "sub1");
-      ackFrame.addHeader("message-id", String.valueOf(Long.valueOf(messageID) + 1));
-      ackFrame.addHeader("receipt", "answer-me");
-      
-      ClientStompFrame error = connV11.sendFrame(ackFrame);
-      
-      System.out.println("Receiver error: " + error);
-      
-      assertEquals("ERROR", error.getCommand());
-      
-      assertEquals("answer-me", error.getHeader("receipt-id"));
-      
-      unsubscribe(connV11, "sub1");
-      
-      connV11.disconnect();
-
-      //message should still there
-      MessageConsumer consumer = session.createConsumer(queue);
-      Message message = consumer.receive(1000);
-      Assert.assertNotNull(message);      
-   }
-   
-   public void testAckModeClient() throws Exception
-   {
-      connV11.connect(defUser, defPass);
-
-      subscribe(connV11, "sub1", "client");
-      
-      int num = 50;
-      //send a bunch of messages
-      for (int i = 0; i < num; i++)
-      {
-         this.sendMessage("client-ack" + i);
-      }
-      
-      ClientStompFrame frame = null;
-      
-      for (int i = 0; i < num; i++)
-      {
-         frame = connV11.receiveFrame();
-         assertNotNull(frame);
-      }
-      
-      //ack the last
-      this.ack(connV11, "sub1", frame);
-      
-      unsubscribe(connV11, "sub1");
-      
-      connV11.disconnect();
-      
-      //no messages can be received.
-      MessageConsumer consumer = session.createConsumer(queue);
-      Message message = consumer.receive(1000);
-      Assert.assertNull(message);
-   }
-   
-   public void testAckModeClient2() throws Exception
-   {
-      connV11.connect(defUser, defPass);
-
-      subscribe(connV11, "sub1", "client");
-      
-      int num = 50;
-      //send a bunch of messages
-      for (int i = 0; i < num; i++)
-      {
-         this.sendMessage("client-ack" + i);
-      }
-      
-      ClientStompFrame frame = null;
-      
-      for (int i = 0; i < num; i++)
-      {
-         frame = connV11.receiveFrame();
-         assertNotNull(frame);
-
-         //ack the 49th
-         if (i == num - 2)
-         {
-            this.ack(connV11, "sub1", frame);
-         }
-      }
-      
-      unsubscribe(connV11, "sub1");
-      
-      connV11.disconnect();
-      
-      //no messages can be received.
-      MessageConsumer consumer = session.createConsumer(queue);
-      Message message = consumer.receive(1000);
-      Assert.assertNotNull(message);
-      message = consumer.receive(1000);
-      Assert.assertNull(message);
-   }
-   
-   public void testAckModeAuto() throws Exception
-   {
-      connV11.connect(defUser, defPass);
-
-      subscribe(connV11, "sub1", "auto");
-      
-      int num = 50;
-      //send a bunch of messages
-      for (int i = 0; i < num; i++)
-      {
-         this.sendMessage("auto-ack" + i);
-      }
-      
-      ClientStompFrame frame = null;
-      
-      for (int i = 0; i < num; i++)
-      {
-         frame = connV11.receiveFrame();
-         assertNotNull(frame);
-      }
-      
-      unsubscribe(connV11, "sub1");
-      
-      connV11.disconnect();
-      
-      //no messages can be received.
-      MessageConsumer consumer = session.createConsumer(queue);
-      Message message = consumer.receive(1000);
-      Assert.assertNull(message);
-   }
-   
-   public void testAckModeClientIndividual() throws Exception
-   {
-      connV11.connect(defUser, defPass);
-
-      subscribe(connV11, "sub1", "client-individual");
-      
-      int num = 50;
-      //send a bunch of messages
-      for (int i = 0; i < num; i++)
-      {
-         this.sendMessage("client-individual-ack" + i);
-      }
-      
-      ClientStompFrame frame = null;
-      
-      for (int i = 0; i < num; i++)
-      {
-         frame = connV11.receiveFrame();
-         assertNotNull(frame);
-         
-         System.out.println(i + " == received: " + frame);
-         //ack on even numbers
-         if (i%2 == 0)
-         {
-            this.ack(connV11, "sub1", frame);
-         }
-      }
-      
-      unsubscribe(connV11, "sub1");
-      
-      connV11.disconnect();
-      
-      //no messages can be received.
-      MessageConsumer consumer = session.createConsumer(queue);
-      
-      TextMessage message = null;
-      for (int i = 0; i < num/2; i++)
-      {
-         message = (TextMessage) consumer.receive(1000);
-         Assert.assertNotNull(message);
-         System.out.println("Legal: " + message.getText());
-      }
-      
-      message = (TextMessage) consumer.receive(1000);
-      
-      Assert.assertNull(message);
-   }
-   
-   public void testTwoSubscribers() throws Exception
-   {
-      connV11.connect(defUser, defPass, "myclientid");
-
-      this.subscribeTopic(connV11, "sub1", "auto", null);
-      
-      StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
-      newConn.connect(defUser, defPass, "myclientid2");
-      
-      this.subscribeTopic(newConn, "sub2", "auto", null);
-
-      ClientStompFrame frame = connV11.createFrame("SEND");
-      frame.addHeader("destination", getTopicPrefix() + getTopicName());
-      
-      frame.setBody("Hello World");
-      
-      connV11.sendFrame(frame);
-      
-      // receive message from socket
-      frame = connV11.receiveFrame(1000);
-      
-      System.out.println("received frame : " + frame);
-      assertEquals("Hello World", frame.getBody());
-      assertEquals("sub1", frame.getHeader("subscription"));
-      
-      frame = newConn.receiveFrame(1000);
-      
-      System.out.println("received 2 frame : " + frame);
-      assertEquals("Hello World", frame.getBody());
-      assertEquals("sub2", frame.getHeader("subscription"));
-      
-      // remove suscription
-      this.unsubscribe(connV11, "sub1", true);
-      this.unsubscribe(newConn, "sub2", true);
-      
-      connV11.disconnect();
-      newConn.disconnect();
-   }
-   
-   public void testSendAndReceiveOnDifferentConnections() throws Exception
-   {
-      connV11.connect(defUser, defPass);
-      
-      ClientStompFrame sendFrame = connV11.createFrame("SEND");
-      sendFrame.addHeader("destination", getQueuePrefix() + getQueueName());
-      sendFrame.setBody("Hello World");
-
-      connV11.sendFrame(sendFrame);
-      
-      StompClientConnection connV11_2 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
-      connV11_2.connect(defUser, defPass);
-      
-      this.subscribe(connV11_2, "sub1", "auto");
-      
-      ClientStompFrame frame = connV11_2.receiveFrame(2000);
-      
-      assertEquals("MESSAGE", frame.getCommand());
-      assertEquals("Hello World", frame.getBody());
-      
-      connV11.disconnect();
-      connV11_2.disconnect();
-   }
-
-   //----------------Note: tests below are adapted from StompTest
-   
-   public void testBeginSameTransactionTwice() throws Exception
-   {
-      connV11.connect(defUser, defPass);
-
-      beginTransaction(connV11, "tx1");
-      
-      beginTransaction(connV11, "tx1");
-      
-      ClientStompFrame f = connV11.receiveFrame();
-      Assert.assertTrue(f.getCommand().equals("ERROR"));
-   }
-
-   public void testBodyWithUTF8() throws Exception
-   {
-      connV11.connect(defUser, defPass);
-
-      this.subscribe(connV11, getName(), "auto");
-
-      String text = "A" + "\u00ea" + "\u00f1" + "\u00fc" + "C";
-      System.out.println(text);
-      sendMessage(text);
-
-      ClientStompFrame frame = connV11.receiveFrame();
-      System.out.println(frame);
-      Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
-      Assert.assertNotNull(frame.getHeader("destination"));
-      Assert.assertTrue(frame.getBody().equals(text));
-      
-      connV11.disconnect();
-   }
-   
-   public void testClientAckNotPartOfTransaction() throws Exception
-   {
-      connV11.connect(defUser, defPass);
-
-      this.subscribe(connV11, getName(), "client");
-
-      sendMessage(getName());
-
-      ClientStompFrame frame = connV11.receiveFrame();
-
-      Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
-      Assert.assertNotNull(frame.getHeader("destination"));
-      Assert.assertTrue(frame.getBody().equals(getName()));
-      Assert.assertNotNull(frame.getHeader("message-id"));
-
-      String messageID = frame.getHeader("message-id");
-
-      beginTransaction(connV11, "tx1");
-
-      this.ack(connV11, getName(), messageID, "tx1");
-
-      abortTransaction(connV11, "tx1");
-
-      frame = connV11.receiveFrame();
-      
-      assertNull(frame);
-
-      this.unsubscribe(connV11, getName());
-
-      connV11.disconnect();
-   }
-
-   public void testDisconnectAndError() throws Exception
-   {
-      connV11.connect(defUser, defPass);
-
-      this.subscribe(connV11, getName(), "client");
-
-      ClientStompFrame frame = connV11.createFrame("DISCONNECT");
-      frame.addHeader("receipt", "1");
-      
-      ClientStompFrame result = connV11.sendFrame(frame);
-      
-      if (result == null || (!"RECEIPT".equals(result.getCommand())) || (!"1".equals(result.getHeader("receipt-id"))))
-      {
-         fail("Disconnect failed! " + result);
-      }
-
-      // sending a message will result in an error
-      ClientStompFrame sendFrame = connV11.createFrame("SEND");
-      sendFrame.addHeader("destination", getQueuePrefix() + getQueueName());
-      sendFrame.setBody("Hello World");
-      
-      try
-      {
-         connV11.sendFrame(sendFrame);
-         fail("connection should have been closed by server.");
-      }
-      catch (ClosedChannelException e)
-      {
-         //ok.
-      }
-      
-      connV11.destroy();
-   }
-
-   public void testDurableSubscriber() throws Exception
-   {
-      connV11.connect(defUser, defPass);
-
-      this.subscribe(connV11, "sub1", "client", getName());
-
-      this.subscribe(connV11, "sub1", "client", getName());
-
-      ClientStompFrame frame = connV11.receiveFrame();
-      Assert.assertTrue(frame.getCommand().equals("ERROR"));
-
-      connV11.disconnect();
-   }
-
-   public void testDurableSubscriberWithReconnection() throws Exception
-   {
-      connV11.connect(defUser, defPass, "myclientid");
-
-      this.subscribeTopic(connV11, "sub1", "auto", getName());
-
-      ClientStompFrame frame = connV11.createFrame("DISCONNECT");
-      frame.addHeader("receipt", "1");
-      
-      ClientStompFrame result = connV11.sendFrame(frame);
-      
-      if (result == null || (!"RECEIPT".equals(result.getCommand())) || (!"1".equals(result.getHeader("receipt-id"))))
-      {
-         fail("Disconnect failed! " + result);
-      }
-
-      // send the message when the durable subscriber is disconnected
-      sendMessage(getName(), topic);
-
-      connV11.destroy();
-      connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
-      connV11.connect(defUser, defPass, "myclientid");
-
-      this.subscribeTopic(connV11, "sub1", "auto", getName());
-
-      // we must have received the message
-      frame = connV11.receiveFrame();
-
-      Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
-      Assert.assertNotNull(frame.getHeader("destination"));
-      Assert.assertEquals(getName(), frame.getBody());
-
-      this.unsubscribe(connV11, "sub1");
-      
-      connV11.disconnect();
-   }
-
-   public void testJMSXGroupIdCanBeSet() throws Exception
-   {
-      MessageConsumer consumer = session.createConsumer(queue);
-
-      connV11.connect(defUser, defPass);
-
-      ClientStompFrame frame = connV11.createFrame("SEND");
-      frame.addHeader("destination", getQueuePrefix() + getQueueName());
-      frame.addHeader("JMSXGroupID", "TEST");
-      frame.setBody("Hello World");
-      
-      connV11.sendFrame(frame);
-
-      TextMessage message = (TextMessage)consumer.receive(1000);
-      Assert.assertNotNull(message);
-      Assert.assertEquals("Hello World", message.getText());
-      // differ from StompConnect
-      Assert.assertEquals("TEST", message.getStringProperty("JMSXGroupID"));
-   }
-
-   public void testMessagesAreInOrder() throws Exception
-   {
-      int ctr = 10;
-      String[] data = new String[ctr];
-
-      connV11.connect(defUser, defPass);
-      
-      this.subscribe(connV11, "sub1", "auto");
-
-      for (int i = 0; i < ctr; ++i)
-      {
-         data[i] = getName() + i;
-         sendMessage(data[i]);
-      }
-
-      ClientStompFrame frame = null;
-      
-      for (int i = 0; i < ctr; ++i)
-      {
-         frame = connV11.receiveFrame();
-         Assert.assertTrue("Message not in order", frame.getBody().equals(data[i]));
-      }
-
-      for (int i = 0; i < ctr; ++i)
-      {
-         data[i] = getName() + ":second:" + i;
-         sendMessage(data[i]);
-      }
-
-      for (int i = 0; i < ctr; ++i)
-      {
-         frame = connV11.receiveFrame();
-         Assert.assertTrue("Message not in order", frame.getBody().equals(data[i]));
-      }
-
-      connV11.disconnect();
-   }
-
-   public void testSubscribeWithAutoAckAndSelector() throws Exception
-   {
-      connV11.connect(defUser, defPass);
-      
-      this.subscribe(connV11, "sub1", "auto", null, "foo = 'zzz'");
-
-      sendMessage("Ignored message", "foo", "1234");
-      sendMessage("Real message", "foo", "zzz");
-
-      ClientStompFrame frame = connV11.receiveFrame();
-
-      Assert.assertTrue("Should have received the real message but got: " + frame, frame.getBody().equals("Real message"));
-
-      connV11.disconnect();
-   }
-
-   public void testRedeliveryWithClientAck() throws Exception
-   {
-      connV11.connect(defUser, defPass);
-
-      this.subscribe(connV11, "subId", "client");
-
-      sendMessage(getName());
-
-      ClientStompFrame frame = connV11.receiveFrame();
-      
-      assertTrue(frame.getCommand().equals("MESSAGE"));
-
-      connV11.disconnect();
-
-      // message should be received since message was not acknowledged
-      MessageConsumer consumer = session.createConsumer(queue);
-      Message message = consumer.receive(1000);
-      Assert.assertNotNull(message);
-      Assert.assertTrue(message.getJMSRedelivered());
-   }
-
-   public void testSendManyMessages() throws Exception
-   {
-      MessageConsumer consumer = session.createConsumer(queue);
-
-      connV11.connect(defUser, defPass);
-
-      int count = 1000;
-      final CountDownLatch latch = new CountDownLatch(count);
-      consumer.setMessageListener(new MessageListener()
-      {
-         public void onMessage(Message arg0)
-         {
-            latch.countDown();
-         }
-      });
-
-      ClientStompFrame frame = connV11.createFrame("SEND");
-      frame.addHeader("destination", getQueuePrefix() + getQueueName());
-      frame.setBody("Hello World");
-
-      for (int i = 1; i <= count; i++)
-      {
-         connV11.sendFrame(frame);
-      }
-
-      assertTrue(latch.await(60, TimeUnit.SECONDS));
-      
-      connV11.disconnect();
-   }
-
-   public void testSendMessage() throws Exception
-   {
-      MessageConsumer consumer = session.createConsumer(queue);
-
-      connV11.connect(defUser, defPass);
-
-      ClientStompFrame frame = connV11.createFrame("SEND");
-      frame.addHeader("destination", getQueuePrefix() + getQueueName());
-      frame.setBody("Hello World");
-      
-      connV11.sendFrame(frame);
-
-      TextMessage message = (TextMessage)consumer.receive(1000);
-      Assert.assertNotNull(message);
-      Assert.assertEquals("Hello World", message.getText());
-      // Assert default priority 4 is used when priority header is not set
-      Assert.assertEquals("getJMSPriority", 4, message.getJMSPriority());
-
-      // Make sure that the timestamp is valid - should
-      // be very close to the current time.
-      long tnow = System.currentTimeMillis();
-      long tmsg = message.getJMSTimestamp();
-      Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
-   }
-
-   public void testSendMessageWithContentLength() throws Exception
-   {
-      MessageConsumer consumer = session.createConsumer(queue);
-
-      connV11.connect(defUser, defPass);
-
-      byte[] data = new byte[] { 1, 0, 0, 4 };
-      
-      ClientStompFrame frame = connV11.createFrame("SEND");
-      
-      frame.addHeader("destination", getQueuePrefix() + getQueueName());
-      frame.setBody(new String(data, "UTF-8"));
-      
-      frame.addHeader("content-length", String.valueOf(data.length));
-
-      connV11.sendFrame(frame);
-      
-      BytesMessage message = (BytesMessage)consumer.receive(10000);
-      Assert.assertNotNull(message);
-
-      assertEquals(data.length, message.getBodyLength());
-      assertEquals(data[0], message.readByte());
-      assertEquals(data[1], message.readByte());
-      assertEquals(data[2], message.readByte());
-      assertEquals(data[3], message.readByte());
-   }
-
-   public void testSendMessageWithCustomHeadersAndSelector() throws Exception
-   {
-      MessageConsumer consumer = session.createConsumer(queue, "foo = 'abc'");
-
-      connV11.connect(defUser, defPass);
-
-      ClientStompFrame frame = connV11.createFrame("SEND");
-      frame.addHeader("foo", "abc");
-      frame.addHeader("bar", "123");
-      
-      frame.addHeader("destination", getQueuePrefix() + getQueueName());
-      frame.setBody("Hello World");
-      
-      connV11.sendFrame(frame);
-
-      TextMessage message = (TextMessage)consumer.receive(1000);
-      Assert.assertNotNull(message);
-      Assert.assertEquals("Hello World", message.getText());
-      Assert.assertEquals("foo", "abc", message.getStringProperty("foo"));
-      Assert.assertEquals("bar", "123", message.getStringProperty("bar"));
-   }
-   
-   public void testSendMessageWithLeadingNewLine() throws Exception
-   {
-      MessageConsumer consumer = session.createConsumer(queue);
-
-      connV11.connect(defUser, defPass);
-      
-      ClientStompFrame frame = connV11.createFrame("SEND");
-
-      frame.addHeader("destination", getQueuePrefix() + getQueueName());
-      frame.setBody("Hello World");
-
-      connV11.sendWickedFrame(frame);
-
-      TextMessage message = (TextMessage)consumer.receive(1000);
-      Assert.assertNotNull(message);
-      Assert.assertEquals("Hello World", message.getText());
-
-      // Make sure that the timestamp is valid - should
-      // be very close to the current time.
-      long tnow = System.currentTimeMillis();
-      long tmsg = message.getJMSTimestamp();
-      Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
-      
-      assertNull(consumer.receive(1000));
-      
-      connV11.disconnect();
-   }
-
-   public void testSendMessageWithReceipt() throws Exception
-   {
-      MessageConsumer consumer = session.createConsumer(queue);
-
-      connV11.connect(defUser, defPass);
-
-      ClientStompFrame frame = connV11.createFrame("SEND");
-      frame.addHeader("destination", getQueuePrefix() + getQueueName());
-      frame.addHeader("receipt", "1234");
-      frame.setBody("Hello World");
-
-      frame = connV11.sendFrame(frame);
-      
-      assertTrue(frame.getCommand().equals("RECEIPT"));
-      assertEquals("1234", frame.getHeader("receipt-id"));
-
-      TextMessage message = (TextMessage)consumer.receive(1000);
-      Assert.assertNotNull(message);
-      Assert.assertEquals("Hello World", message.getText());
-
-      // Make sure that the timestamp is valid - should
-      // be very close to the current time.
-      long tnow = System.currentTimeMillis();
-      long tmsg = message.getJMSTimestamp();
-      Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
-      
-      connV11.disconnect();
-   }
-
-   public void testSendMessageWithStandardHeaders() throws Exception
-   {
-      MessageConsumer consumer = session.createConsumer(queue);
-
-      connV11.connect(defUser, defPass);
-
-      ClientStompFrame frame = connV11.createFrame("SEND");
-      frame.addHeader("destination", getQueuePrefix() + getQueueName());
-      frame.addHeader("correlation-id", "c123");
-      frame.addHeader("persistent", "true");
-      frame.addHeader("priority", "3");
-      frame.addHeader("type", "t345");
-      frame.addHeader("JMSXGroupID", "abc");
-      frame.addHeader("foo", "abc");
-      frame.addHeader("bar", "123");
-
-      frame.setBody("Hello World");
-
-      frame = connV11.sendFrame(frame);
-
-      TextMessage message = (TextMessage)consumer.receive(1000);
-      Assert.assertNotNull(message);
-      Assert.assertEquals("Hello World", message.getText());
-      Assert.assertEquals("JMSCorrelationID", "c123", message.getJMSCorrelationID());
-      Assert.assertEquals("getJMSType", "t345", message.getJMSType());
-      Assert.assertEquals("getJMSPriority", 3, message.getJMSPriority());
-      Assert.assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode());
-      Assert.assertEquals("foo", "abc", message.getStringProperty("foo"));
-      Assert.assertEquals("bar", "123", message.getStringProperty("bar"));
-
-      Assert.assertEquals("JMSXGroupID", "abc", message.getStringProperty("JMSXGroupID"));
-      
-      connV11.disconnect();
-   }
-
-   public void testSubscribeToTopic() throws Exception
-   {
-      connV11.connect(defUser, defPass);
-      
-      this.subscribeTopic(connV11, "sub1", null, null, true);
-
-      sendMessage(getName(), topic);
-      
-      ClientStompFrame frame = connV11.receiveFrame();
-
-      Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
-      Assert.assertTrue(frame.getHeader("destination").equals(getTopicPrefix() + getTopicName()));
-      Assert.assertTrue(frame.getBody().equals(getName()));
-
-      this.unsubscribe(connV11, "sub1", true);
-
-      sendMessage(getName(), topic);
-
-      frame = connV11.receiveFrame(1000);
-      assertNull(frame);
-
-      connV11.disconnect();
-   }
-
-   public void testSubscribeToTopicWithNoLocal() throws Exception
-   {
-      connV11.connect(defUser, defPass);
-
-      this.subscribeTopic(connV11, "sub1", null, null, true, true);
-
-      // send a message on the same connection => it should not be received
-      ClientStompFrame frame = connV11.createFrame("SEND");
-      frame.addHeader("destination", getTopicPrefix() + getTopicName());
-
-      frame.setBody("Hello World");
-
-      connV11.sendFrame(frame);
-
-      frame = connV11.receiveFrame(2000);
-      
-      assertNull(frame);
-
-      // send message on another JMS connection => it should be received
-      sendMessage(getName(), topic);
-
-      frame = connV11.receiveFrame();
-
-      Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
-      Assert.assertTrue(frame.getHeader("destination").equals(getTopicPrefix() + getTopicName()));
-      Assert.assertTrue(frame.getBody().equals(getName()));
-      
-      this.unsubscribe(connV11, "sub1");
-      
-      connV11.disconnect();
-   }
-
-   public void testSubscribeWithAutoAck() throws Exception
-   {
-      connV11.connect(defUser, defPass);
-
-      this.subscribe(connV11, "sub1", "auto");
-
-      sendMessage(getName());
-
-      ClientStompFrame frame = connV11.receiveFrame();
-
-      Assert.assertEquals("MESSAGE", frame.getCommand());
-      Assert.assertNotNull(frame.getHeader("destination"));
-      Assert.assertEquals(getName(), frame.getBody());
-
-      connV11.disconnect();
-      
-      // message should not be received as it was auto-acked
-      MessageConsumer consumer = session.createConsumer(queue);
-      Message message = consumer.receive(1000);
-      Assert.assertNull(message);
-   }
-
-   public void testSubscribeWithAutoAckAndBytesMessage() throws Exception
-   {
-      connV11.connect(defUser, defPass);
-      
-      this.subscribe(connV11, "sub1", "auto");
-
-      byte[] payload = new byte[] { 1, 2, 3, 4, 5 };
-      sendMessage(payload, queue);
-
-      ClientStompFrame frame = connV11.receiveFrame();
-
-      assertEquals("MESSAGE", frame.getCommand());
-      
-      System.out.println("Message: " + frame);
-
-      assertEquals("5", frame.getHeader("content-length"));
-
-      assertEquals(null, frame.getHeader("type"));
-      
-      assertEquals(frame.getBody(), new String(payload, "UTF-8"));
-      
-      connV11.disconnect();
-   }
-
-   public void testSubscribeWithClientAck() throws Exception
-   {
-      connV11.connect(defUser, defPass);
-      
-      this.subscribe(connV11, "sub1", "client");
-
-      sendMessage(getName());
-
-      ClientStompFrame frame = connV11.receiveFrame();
-
-      this.ack(connV11, "sub1", frame);
-
-      connV11.disconnect();
-
-      // message should not be received since message was acknowledged by the client
-      MessageConsumer consumer = session.createConsumer(queue);
-      Message message = consumer.receive(1000);
-      Assert.assertNull(message);
-   }
-
-   public void testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithExplicitDisconnect() throws Exception
-   {
-      assertSubscribeWithClientAckThenConsumeWithAutoAck(true);
-   }
-
-   public void testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithNoDisconnectFrame() throws Exception
-   {
-      assertSubscribeWithClientAckThenConsumeWithAutoAck(false);
-   }
-
-   public void testSubscribeWithID() throws Exception
-   {
-      connV11.connect(defUser, defPass);
-      
-      this.subscribe(connV11, "mysubid", "auto");
-
-      sendMessage(getName());
-
-      ClientStompFrame frame = connV11.receiveFrame();
-
-      Assert.assertTrue(frame.getHeader("subscription") != null);
-
-      connV11.disconnect();
-   }
-
-   public void testSubscribeWithMessageSentWithProperties() throws Exception
-   {
-      connV11.connect(defUser, defPass);
-      
-      this.subscribe(connV11, "sub1", "auto");
-
-      MessageProducer producer = session.createProducer(queue);
-      BytesMessage message = session.createBytesMessage();
-      message.setStringProperty("S", "value");
-      message.setBooleanProperty("n", false);
-      message.setByteProperty("byte", (byte)9);
-      message.setDoubleProperty("d", 2.0);
-      message.setFloatProperty("f", (float)6.0);
-      message.setIntProperty("i", 10);
-      message.setLongProperty("l", 121);
-      message.setShortProperty("s", (short)12);
-      message.writeBytes("Hello World".getBytes("UTF-8"));
-      producer.send(message);
-
-      ClientStompFrame frame = connV11.receiveFrame();
-      Assert.assertNotNull(frame);
-      
-      Assert.assertTrue(frame.getHeader("S") != null);
-      Assert.assertTrue(frame.getHeader("n") != null);
-      Assert.assertTrue(frame.getHeader("byte") != null);
-      Assert.assertTrue(frame.getHeader("d") != null);
-      Assert.assertTrue(frame.getHeader("f") != null);
-      Assert.assertTrue(frame.getHeader("i") != null);
-      Assert.assertTrue(frame.getHeader("l") != null);
-      Assert.assertTrue(frame.getHeader("s") != null);
-      Assert.assertEquals("Hello World", frame.getBody());
-
-      connV11.disconnect();
-   }
-
-   public void testSuccessiveTransactionsWithSameID() throws Exception
-   {
-      MessageConsumer consumer = session.createConsumer(queue);
-
-      connV11.connect(defUser, defPass);
-
-      // first tx
-      this.beginTransaction(connV11, "tx1");
-
-      ClientStompFrame frame = connV11.createFrame("SEND");
-      frame.addHeader("destination", getQueuePrefix() + getQueueName());
-      frame.addHeader("transaction", "tx1");
-      frame.setBody("Hello World");
-
-      connV11.sendFrame(frame);
-
-      this.commitTransaction(connV11, "tx1");
-
-      Message message = consumer.receive(1000);
-      Assert.assertNotNull("Should have received a message", message);
-
-      // 2nd tx with same tx ID
-      this.beginTransaction(connV11, "tx1");
-
-      frame = connV11.createFrame("SEND");
-      frame.addHeader("destination", getQueuePrefix() + getQueueName());
-      frame.addHeader("transaction", "tx1");
-      
-      frame.setBody("Hello World");
-      
-      connV11.sendFrame(frame);
-
-      this.commitTransaction(connV11, "tx1");
-
-      message = consumer.receive(1000);
-      Assert.assertNotNull("Should have received a message", message);
-      
-      connV11.disconnect();
-   }
-
-   public void testTransactionCommit() throws Exception
-   {
-      MessageConsumer consumer = session.createConsumer(queue);
-
-      connV11.connect(defUser, defPass);
-
-      this.beginTransaction(connV11, "tx1");
-
-      ClientStompFrame frame = connV11.createFrame("SEND");
-      frame.addHeader("destination", getQueuePrefix() + getQueueName());
-      frame.addHeader("transaction", "tx1");
-      frame.addHeader("receipt", "123");
-      frame.setBody("Hello World");
-
-      frame = connV11.sendFrame(frame);
-      
-      assertEquals("123", frame.getHeader("receipt-id"));
-      
-      // check the message is not committed
-      assertNull(consumer.receive(100));
-      
-      this.commitTransaction(connV11, "tx1", true);
-
-      Message message = consumer.receive(1000);
-      Assert.assertNotNull("Should have received a message", message);
-      
-      connV11.disconnect();
-   }
-
-   public void testTransactionRollback() throws Exception
-   {
-      MessageConsumer consumer = session.createConsumer(queue);
-
-      connV11.connect(defUser, defPass);
-      
-      this.beginTransaction(connV11, "tx1");
-
-      ClientStompFrame frame = connV11.createFrame("SEND");
-      frame.addHeader("destination", getQueuePrefix() + getQueueName());
-      frame.addHeader("transaction", "tx1");
-
-      frame.setBody("first message");
-      
-      connV11.sendFrame(frame);
-
-      // rollback first message
-      this.abortTransaction(connV11, "tx1");
-
-      this.beginTransaction(connV11, "tx1");
-
-      frame = connV11.createFrame("SEND");
-      frame.addHeader("destination", getQueuePrefix() + getQueueName());
-      frame.addHeader("transaction", "tx1");
-
-      frame.setBody("second message");
-      
-      connV11.sendFrame(frame);
-
-      this.commitTransaction(connV11, "tx1", true);
-
-      // only second msg should be received since first msg was rolled back
-      TextMessage message = (TextMessage)consumer.receive(1000);
-      Assert.assertNotNull(message);
-      Assert.assertEquals("second message", message.getText());
-      
-      connV11.disconnect();
-   }
-
-   public void testUnsubscribe() throws Exception
-   {
-      connV11.connect(defUser, defPass);
-
-      this.subscribe(connV11, "sub1", "auto");
-
-      // send a message to our queue
-      sendMessage("first message");
-
-      // receive message from socket
-      ClientStompFrame frame = connV11.receiveFrame();
-      
-      Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
-
-      // remove suscription
-      this.unsubscribe(connV11, "sub1", true);
-
-      // send a message to our queue
-      sendMessage("second message");
-
-      frame = connV11.receiveFrame(1000);
-      assertNull(frame);
-      
-      connV11.disconnect();
-   }
-
-   //-----------------private help methods
-
-   private void abortTransaction(StompClientConnection conn, String txID) throws IOException, InterruptedException
-   {
-      ClientStompFrame abortFrame = conn.createFrame("ABORT");
-      abortFrame.addHeader("transaction", txID);
-
-      conn.sendFrame(abortFrame);
-   }
-
-   private void beginTransaction(StompClientConnection conn, String txID) throws IOException, InterruptedException
-   {
-      ClientStompFrame beginFrame = conn.createFrame("BEGIN");
-      beginFrame.addHeader("transaction", txID);
-      
-      conn.sendFrame(beginFrame);
-   }
-
-   private void commitTransaction(StompClientConnection conn, String txID) throws IOException, InterruptedException
-   {
-      commitTransaction(conn, txID, false);
-   }
-
-   private void commitTransaction(StompClientConnection conn, String txID, boolean receipt) throws IOException, InterruptedException
-   {
-      ClientStompFrame beginFrame = conn.createFrame("COMMIT");
-      beginFrame.addHeader("transaction", txID);
-      if (receipt)
-      {
-         beginFrame.addHeader("receipt", "1234");
-      }
-      ClientStompFrame resp = conn.sendFrame(beginFrame);
-      if (receipt)
-      {
-         assertEquals("1234", resp.getHeader("receipt-id"));
-      }
-   }
-
-   private void ack(StompClientConnection conn, String subId,
-         ClientStompFrame frame) throws IOException, InterruptedException
-   {
-      String messageID = frame.getHeader("message-id");
-      
-      ClientStompFrame ackFrame = conn.createFrame("ACK");
-
-      ackFrame.addHeader("subscription", subId);
-      ackFrame.addHeader("message-id", messageID);
-      
-      ClientStompFrame response = conn.sendFrame(ackFrame);
-      if (response != null)
-      {
-         throw new IOException("failed to ack " + response);
-      }
-   }
-
-   private void ack(StompClientConnection conn, String subId, String mid, String txID) throws IOException, InterruptedException
-   {
-      ClientStompFrame ackFrame = conn.createFrame("ACK");
-      ackFrame.addHeader("subscription", subId);
-      ackFrame.addHeader("message-id", mid);
-      if (txID != null)
-      {
-         ackFrame.addHeader("transaction", txID);
-      }
-      
-      conn.sendFrame(ackFrame);
-   }
-
-   private void nack(StompClientConnection conn, String subId, String mid) throws IOException, InterruptedException
-   {
-      ClientStompFrame ackFrame = conn.createFrame("NACK");
-      ackFrame.addHeader("subscription", subId);
-      ackFrame.addHeader("message-id", mid);
-      
-      conn.sendFrame(ackFrame);
-   }
-   
-   private void subscribe(StompClientConnection conn, String subId, String ack) throws IOException, InterruptedException
-   {
-      subscribe(conn, subId, ack, null, null);
-   }
-
-   private void subscribe(StompClientConnection conn, String subId,
-         String ack, String durableId) throws IOException, InterruptedException
-   {
-      subscribe(conn, subId, ack, durableId, null);
-   }
-
-   private void subscribe(StompClientConnection conn, String subId,
-         String ack, String durableId, boolean receipt) throws IOException, InterruptedException
-   {
-      subscribe(conn, subId, ack, durableId, null, receipt);
-   }
-
-   private void subscribe(StompClientConnection conn, String subId, String ack,
-         String durableId, String selector) throws IOException,
-         InterruptedException
-   {
-      subscribe(conn, subId, ack, durableId, selector, false);
-   }
-   
-   private void subscribe(StompClientConnection conn, String subId,
-         String ack, String durableId, String selector, boolean receipt) throws IOException, InterruptedException
-   {
-      ClientStompFrame subFrame = conn.createFrame("SUBSCRIBE");
-      subFrame.addHeader("id", subId);
-      subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
-      if (ack != null)
-      {
-         subFrame.addHeader("ack", ack);
-      }
-      if (durableId != null)
-      {
-         subFrame.addHeader("durable-subscriber-name", durableId);
-      }
-      if (selector != null)
-      {
-         subFrame.addHeader("selector", selector);
-      }
-      if (receipt)
-      {
-         subFrame.addHeader("receipt", "1234");
-      }
-      
-      subFrame = conn.sendFrame(subFrame);
-      
-      if (receipt)
-      {
-         assertEquals("1234", subFrame.getHeader("receipt-id"));
-      }
-   }
-
-   private void subscribeTopic(StompClientConnection conn, String subId,
-         String ack, String durableId) throws IOException, InterruptedException
-   {
-      subscribeTopic(conn, subId, ack, durableId, false);
-   }
-
-   private void subscribeTopic(StompClientConnection conn, String subId,
-         String ack, String durableId, boolean receipt) throws IOException, InterruptedException
-   {
-      subscribeTopic(conn, subId, ack, durableId, receipt, false);
-   }
-
-   private void subscribeTopic(StompClientConnection conn, String subId,
-         String ack, String durableId, boolean receipt, boolean noLocal) throws IOException, InterruptedException
-   {
-      ClientStompFrame subFrame = conn.createFrame("SUBSCRIBE");
-      subFrame.addHeader("id", subId);
-      subFrame.addHeader("destination", getTopicPrefix() + getTopicName());
-      if (ack != null)
-      {
-         subFrame.addHeader("ack", ack);
-      }
-      if (durableId != null)
-      {
-         subFrame.addHeader("durable-subscriber-name", durableId);
-      }
-      if (receipt)
-      {
-         subFrame.addHeader("receipt", "1234");
-      }
-      if (noLocal)
-      {
-         subFrame.addHeader("no-local", "true");
-      }
-      
-      ClientStompFrame frame = conn.sendFrame(subFrame);
-      
-      if (receipt)
-      {
-         assertTrue(frame.getHeader("receipt-id").equals("1234"));
-      }
-   }
-
-   private void unsubscribe(StompClientConnection conn, String subId) throws IOException, InterruptedException
-   {
-      ClientStompFrame subFrame = conn.createFrame("UNSUBSCRIBE");
-      subFrame.addHeader("id", subId);
-      
-      conn.sendFrame(subFrame);
-   }
-   
-   private void unsubscribe(StompClientConnection conn, String subId,
-         boolean receipt) throws IOException, InterruptedException
-   {
-      ClientStompFrame subFrame = conn.createFrame("UNSUBSCRIBE");
-      subFrame.addHeader("id", subId);
-      
-      if (receipt)
-      {
-         subFrame.addHeader("receipt", "4321");
-      }
-      
-      ClientStompFrame f = conn.sendFrame(subFrame);
-      
-      if (receipt)
-      {
-         System.out.println("response: " + f);
-         assertEquals("RECEIPT", f.getCommand());
-         assertEquals("4321", f.getHeader("receipt-id"));
-      }
-   }
-
-   protected void assertSubscribeWithClientAckThenConsumeWithAutoAck(boolean sendDisconnect) throws Exception
-   {
-      connV11.connect(defUser, defPass);
-
-      this.subscribe(connV11, "sub1", "client");
-
-      sendMessage(getName());
-      
-      ClientStompFrame frame = connV11.receiveFrame();
-
-      Assert.assertEquals("MESSAGE", frame.getCommand());
-
-      log.info("Reconnecting!");
-
-      if (sendDisconnect)
-      {
-         connV11.disconnect();
-         connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
-      }
-      else
-      {
-         connV11.destroy();
-         connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
-      }
-
-      // message should be received since message was not acknowledged
-      connV11.connect(defUser, defPass);
-
-      this.subscribe(connV11, "sub1", null);
-
-      frame = connV11.receiveFrame();
-      Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
-
-      connV11.disconnect();
-
-      // now lets make sure we don't see the message again
-      connV11.destroy();
-      connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
-      connV11.connect(defUser, defPass);
-      
-      this.subscribe(connV11, "sub1", null, null, true);
-
-      sendMessage("shouldBeNextMessage");
-
-      frame = connV11.receiveFrame();
-      Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
-      Assert.assertEquals("shouldBeNextMessage", frame.getBody());
-   }
-
-}
-
-
-
-
-

Copied: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompV11Test.java (from rev 11873, trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java)
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompV11Test.java	                        (rev 0)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompV11Test.java	2011-12-08 03:05:11 UTC (rev 11874)
@@ -0,0 +1,2221 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.hornetq.tests.integration.stomp.v11;
+
+import java.io.IOException;
+import java.nio.channels.ClosedChannelException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.BytesMessage;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.TextMessage;
+
+import junit.framework.Assert;
+
+import org.hornetq.core.logging.Logger;
+import org.hornetq.tests.integration.stomp.util.ClientStompFrame;
+import org.hornetq.tests.integration.stomp.util.StompClientConnection;
+import org.hornetq.tests.integration.stomp.util.StompClientConnectionFactory;
+import org.hornetq.tests.integration.stomp.util.StompClientConnectionV11;
+
+/*
+ * 
+ */
+public class StompV11Test extends StompTestBase2
+{
+   private static final transient Logger log = Logger.getLogger(StompV11Test.class);
+   
+   private StompClientConnection connV11;
+   
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+      connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+   }
+   
+   protected void tearDown() throws Exception
+   {
+      System.out.println("Connection 11 : " + connV11.isConnected());
+      if (connV11.isConnected())
+      {
+         connV11.disconnect();
+      }
+      super.tearDown();
+   }
+   
+   public void testConnection() throws Exception
+   {
+      StompClientConnection connection = StompClientConnectionFactory.createClientConnection("1.0", hostname, port);
+      
+      connection.connect(defUser, defPass);
+      
+      assertTrue(connection.isConnected());
+      
+      assertEquals("1.0", connection.getVersion());
+      
+      connection.disconnect();
+
+      connection = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+      
+      connection.connect(defUser, defPass);
+      
+      assertTrue(connection.isConnected());
+      
+      assertEquals("1.1", connection.getVersion());
+      
+      connection.disconnect();
+      
+      connection = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+      
+      connection.connect();
+      
+      assertFalse(connection.isConnected());
+      
+      //new way of connection
+      StompClientConnectionV11 conn = (StompClientConnectionV11) StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+      conn.connect1(defUser, defPass);
+      
+      assertTrue(conn.isConnected());
+      
+      conn.disconnect();
+   }
+   
+   public void testNegotiation() throws Exception
+   {
+      // case 1 accept-version absent. It is a 1.0 connect
+      ClientStompFrame frame = connV11.createFrame("CONNECT");
+      frame.addHeader("host", "127.0.0.1");
+      frame.addHeader("login", this.defUser);
+      frame.addHeader("passcode", this.defPass);
+      
+      ClientStompFrame reply = connV11.sendFrame(frame);
+      
+      assertEquals("CONNECTED", reply.getCommand());
+      
+      //reply headers: version, session, server
+      assertEquals(null, reply.getHeader("version"));
+
+      connV11.disconnect();
+
+      // case 2 accept-version=1.0, result: 1.0
+      connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+      frame = connV11.createFrame("CONNECT");
+      frame.addHeader("accept-version", "1.0");
+      frame.addHeader("host", "127.0.0.1");
+      frame.addHeader("login", this.defUser);
+      frame.addHeader("passcode", this.defPass);
+      
+      reply = connV11.sendFrame(frame);
+      
+      assertEquals("CONNECTED", reply.getCommand());
+      
+      //reply headers: version, session, server
+      assertEquals("1.0", reply.getHeader("version"));
+      
+      connV11.disconnect();
+
+      // case 3 accept-version=1.1, result: 1.1
+      connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+      frame = connV11.createFrame("CONNECT");
+      frame.addHeader("accept-version", "1.1");
+      frame.addHeader("host", "127.0.0.1");
+      frame.addHeader("login", this.defUser);
+      frame.addHeader("passcode", this.defPass);
+      
+      reply = connV11.sendFrame(frame);
+      
+      assertEquals("CONNECTED", reply.getCommand());
+      
+      //reply headers: version, session, server
+      assertEquals("1.1", reply.getHeader("version"));
+      
+      connV11.disconnect();
+
+      // case 4 accept-version=1.0,1.1,1.2, result 1.1
+      connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+      frame = connV11.createFrame("CONNECT");
+      frame.addHeader("accept-version", "1.0,1.1,1.2");
+      frame.addHeader("host", "127.0.0.1");
+      frame.addHeader("login", this.defUser);
+      frame.addHeader("passcode", this.defPass);
+      
+      reply = connV11.sendFrame(frame);
+      
+      assertEquals("CONNECTED", reply.getCommand());
+      
+      //reply headers: version, session, server
+      assertEquals("1.1", reply.getHeader("version"));
+      
+      connV11.disconnect();
+
+      // case 5 accept-version=1.2, result error
+      connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+      frame = connV11.createFrame("CONNECT");
+      frame.addHeader("accept-version", "1.2");
+      frame.addHeader("host", "127.0.0.1");
+      frame.addHeader("login", this.defUser);
+      frame.addHeader("passcode", this.defPass);
+      
+      reply = connV11.sendFrame(frame);
+      
+      assertEquals("ERROR", reply.getCommand());
+      
+      System.out.println("Got error frame " + reply);
+      
+   }
+   
+   public void testSendAndReceive() throws Exception
+   {
+      connV11.connect(defUser, defPass);
+      ClientStompFrame frame = connV11.createFrame("SEND");
+      frame.addHeader("destination", getQueuePrefix() + getQueueName());
+      frame.addHeader("content-type", "text/plain");
+      frame.setBody("Hello World 1!");
+      
+      ClientStompFrame response = connV11.sendFrame(frame);
+      
+      assertNull(response);
+      
+      frame.addHeader("receipt", "1234");
+      frame.setBody("Hello World 2!");
+      
+      response = connV11.sendFrame(frame);
+      
+      assertNotNull(response);
+      
+      assertEquals("RECEIPT", response.getCommand());
+      
+      assertEquals("1234", response.getHeader("receipt-id"));
+      
+      //subscribe
+      StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+      newConn.connect(defUser, defPass);
+      
+      ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
+      subFrame.addHeader("id", "a-sub");
+      subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+      subFrame.addHeader("ack", "auto");
+      
+      newConn.sendFrame(subFrame);
+      
+      frame = newConn.receiveFrame();
+      
+      System.out.println("received " + frame);
+      
+      assertEquals("MESSAGE", frame.getCommand());
+      
+      assertEquals("a-sub", frame.getHeader("subscription"));
+      
+      assertNotNull(frame.getHeader("message-id"));
+      
+      assertEquals(getQueuePrefix() + getQueueName(), frame.getHeader("destination"));
+      
+      assertEquals("Hello World 1!", frame.getBody());
+      
+      frame = newConn.receiveFrame();
+      
+      System.out.println("received " + frame);      
+      
+      //unsub
+      ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
+      unsubFrame.addHeader("id", "a-sub");
+      newConn.sendFrame(unsubFrame);
+      
+      newConn.disconnect();
+   }
+
+   public void testHeaderContentType() throws Exception
+   {
+      connV11.connect(defUser, defPass);
+      ClientStompFrame frame = connV11.createFrame("SEND");
+      frame.addHeader("destination", getQueuePrefix() + getQueueName());
+      frame.addHeader("content-type", "application/xml");
+      frame.setBody("Hello World 1!");
+      
+      connV11.sendFrame(frame);
+      
+      //subscribe
+      StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+      newConn.connect(defUser, defPass);
+      
+      ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
+      subFrame.addHeader("id", "a-sub");
+      subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+      subFrame.addHeader("ack", "auto");
+      
+      newConn.sendFrame(subFrame);
+      
+      frame = newConn.receiveFrame();
+      
+      System.out.println("received " + frame);
+      
+      assertEquals("MESSAGE", frame.getCommand());
+      
+      assertEquals("application/xml", frame.getHeader("content-type"));
+      
+      //unsub
+      ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
+      unsubFrame.addHeader("id", "a-sub");
+      
+      newConn.disconnect();
+   }
+
+   public void testHeaderContentLength() throws Exception
+   {
+      connV11.connect(defUser, defPass);
+      ClientStompFrame frame = connV11.createFrame("SEND");
+      
+      String body = "Hello World 1!";
+      String cLen = String.valueOf(body.getBytes("UTF-8").length);
+      
+      frame.addHeader("destination", getQueuePrefix() + getQueueName());
+      frame.addHeader("content-type", "application/xml");
+      frame.addHeader("content-length", cLen);
+      frame.setBody(body + "extra");
+      
+      connV11.sendFrame(frame);
+      
+      //subscribe
+      StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+      newConn.connect(defUser, defPass);
+      
+      ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
+      subFrame.addHeader("id", "a-sub");
+      subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+      subFrame.addHeader("ack", "auto");
+      
+      newConn.sendFrame(subFrame);
+      
+      frame = newConn.receiveFrame();
+      
+      System.out.println("received " + frame);
+      
+      assertEquals("MESSAGE", frame.getCommand());
+      
+      assertEquals(cLen, frame.getHeader("content-length"));
+      
+      //unsub
+      ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
+      unsubFrame.addHeader("id", "a-sub");
+      
+      newConn.disconnect();
+   }
+
+   public void testHeaderEncoding() throws Exception
+   {
+      connV11.connect(defUser, defPass);
+      ClientStompFrame frame = connV11.createFrame("SEND");
+      
+      String body = "Hello World 1!";
+      String cLen = String.valueOf(body.getBytes("UTF-8").length);
+      
+      frame.addHeader("destination", getQueuePrefix() + getQueueName());
+      frame.addHeader("content-type", "application/xml");
+      frame.addHeader("content-length", cLen);
+      String hKey = "special-header\\\\\\n\\:";
+      String hVal = "\\:\\\\\\ngood";
+      frame.addHeader(hKey, hVal);
+      
+      System.out.println("key: |" + hKey + "| val: |" + hVal);
+      
+      frame.setBody(body);
+      
+      connV11.sendFrame(frame);
+      
+      //subscribe
+      StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+      newConn.connect(defUser, defPass);
+      
+      ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
+      subFrame.addHeader("id", "a-sub");
+      subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+      subFrame.addHeader("ack", "auto");
+      
+      newConn.sendFrame(subFrame);
+      
+      frame = newConn.receiveFrame();
+      
+      System.out.println("received " + frame);
+      
+      assertEquals("MESSAGE", frame.getCommand());
+      
+      String value = frame.getHeader("special-header" + "\\" + "\n" + ":");
+      
+      assertEquals(":" + "\\" + "\n" + "good", value);
+      
+      //unsub
+      ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
+      unsubFrame.addHeader("id", "a-sub");
+      
+      newConn.disconnect();
+   }
+   
+   public void testHeartBeat() throws Exception
+   {
+      //no heart beat at all if heat-beat absent
+      ClientStompFrame frame = connV11.createFrame("CONNECT");
+      frame.addHeader("host", "127.0.0.1");
+      frame.addHeader("login", this.defUser);
+      frame.addHeader("passcode", this.defPass);
+      
+      ClientStompFrame reply = connV11.sendFrame(frame);
+      
+      assertEquals("CONNECTED", reply.getCommand());
+      
+      Thread.sleep(5000);
+      
+      assertEquals(0, connV11.getFrameQueueSize());
+      
+      connV11.disconnect();
+      
+      //no heart beat for (0,0)
+      connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+      frame = connV11.createFrame("CONNECT");
+      frame.addHeader("host", "127.0.0.1");
+      frame.addHeader("login", this.defUser);
+      frame.addHeader("passcode", this.defPass);
+      frame.addHeader("heart-beat", "0,0");
+      frame.addHeader("accept-version", "1.0,1.1");
+      
+      reply = connV11.sendFrame(frame);
+      
+      assertEquals("CONNECTED", reply.getCommand());
+      
+      assertEquals("0,0", reply.getHeader("heart-beat"));
+      
+      Thread.sleep(5000);
+      
+      assertEquals(0, connV11.getFrameQueueSize());
+      
+      connV11.disconnect();
+
+      //heart-beat (1,0), should receive a min client ping accepted by server
+      connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+      frame = connV11.createFrame("CONNECT");
+      frame.addHeader("host", "127.0.0.1");
+      frame.addHeader("login", this.defUser);
+      frame.addHeader("passcode", this.defPass);
+      frame.addHeader("heart-beat", "1,0");
+      frame.addHeader("accept-version", "1.0,1.1");
+      
+      reply = connV11.sendFrame(frame);
+      
+      assertEquals("CONNECTED", reply.getCommand());
+      
+      assertEquals("0,500", reply.getHeader("heart-beat"));
+      
+      Thread.sleep(2000);
+      
+      //now server side should be disconnected because we didn't send ping for 2 sec
+      frame = connV11.createFrame("SEND");
+      frame.addHeader("destination", getQueuePrefix() + getQueueName());
+      frame.addHeader("content-type", "text/plain");
+      frame.setBody("Hello World");
+
+      //send will fail
+      try
+      {
+         connV11.sendFrame(frame);
+         fail("connection should have been destroyed by now");
+      }
+      catch (IOException e)
+      {
+         //ignore
+      }
+      
+      //heart-beat (1,0), start a ping, then send a message, should be ok.
+      connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+      frame = connV11.createFrame("CONNECT");
+      frame.addHeader("host", "127.0.0.1");
+      frame.addHeader("login", this.defUser);
+      frame.addHeader("passcode", this.defPass);
+      frame.addHeader("heart-beat", "1,0");
+      frame.addHeader("accept-version", "1.0,1.1");
+      
+      reply = connV11.sendFrame(frame);
+      
+      assertEquals("CONNECTED", reply.getCommand());
+      
+      assertEquals("0,500", reply.getHeader("heart-beat"));
+      
+      System.out.println("========== start pinger!");
+      
+      connV11.startPinger(500);
+      
+      Thread.sleep(2000);
+      
+      //now server side should be disconnected because we didn't send ping for 2 sec
+      frame = connV11.createFrame("SEND");
+      frame.addHeader("destination", getQueuePrefix() + getQueueName());
+      frame.addHeader("content-type", "text/plain");
+      frame.setBody("Hello World");
+
+      //send will be ok
+      connV11.sendFrame(frame);
+      
+      connV11.stopPinger();
+      
+      connV11.disconnect();
+
+   }
+   
+   //server ping
+   public void testHeartBeat2() throws Exception
+   {
+      //heart-beat (1,1)
+      ClientStompFrame frame = connV11.createFrame("CONNECT");
+      frame.addHeader("host", "127.0.0.1");
+      frame.addHeader("login", this.defUser);
+      frame.addHeader("passcode", this.defPass);
+      frame.addHeader("heart-beat", "1,1");
+      frame.addHeader("accept-version", "1.0,1.1");
+      
+      ClientStompFrame reply = connV11.sendFrame(frame);
+      
+      assertEquals("CONNECTED", reply.getCommand());
+      assertEquals("500,500", reply.getHeader("heart-beat"));
+
+      connV11.disconnect();
+      
+      //heart-beat (500,1000)
+      connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+      frame = connV11.createFrame("CONNECT");
+      frame.addHeader("host", "127.0.0.1");
+      frame.addHeader("login", this.defUser);
+      frame.addHeader("passcode", this.defPass);
+      frame.addHeader("heart-beat", "500,1000");
+      frame.addHeader("accept-version", "1.0,1.1");
+      
+      reply = connV11.sendFrame(frame);
+      
+      assertEquals("CONNECTED", reply.getCommand());
+      
+      assertEquals("1000,500", reply.getHeader("heart-beat"));
+      
+      System.out.println("========== start pinger!");
+      
+      connV11.startPinger(500);
+      
+      Thread.sleep(10000);
+      
+      //now check the frame size
+      int size = connV11.getServerPingNumber();
+      
+      System.out.println("ping received: " + size);
+      
+      assertTrue(size > 5);
+      
+      //now server side should be disconnected because we didn't send ping for 2 sec
+      frame = connV11.createFrame("SEND");
+      frame.addHeader("destination", getQueuePrefix() + getQueueName());
+      frame.addHeader("content-type", "text/plain");
+      frame.setBody("Hello World");
+
+      //send will be ok
+      connV11.sendFrame(frame);
+      
+      connV11.disconnect();
+   }
+   
+   public void testSendWithHeartBeatsAndReceive() throws Exception
+   {
+      StompClientConnection newConn = null;
+      try
+      {
+         ClientStompFrame frame = connV11.createFrame("CONNECT");
+         frame.addHeader("host", "127.0.0.1");
+         frame.addHeader("login", this.defUser);
+         frame.addHeader("passcode", this.defPass);
+         frame.addHeader("heart-beat", "500,1000");
+         frame.addHeader("accept-version", "1.0,1.1");
+
+         connV11.sendFrame(frame);
+
+         connV11.startPinger(500);
+
+         frame = connV11.createFrame("SEND");
+         frame.addHeader("destination", getQueuePrefix() + getQueueName());
+         frame.addHeader("content-type", "text/plain");
+
+         for (int i = 0; i < 10; i++)
+         {
+            frame.setBody("Hello World " + i + "!");
+            connV11.sendFrame(frame);
+            Thread.sleep(500);
+         }
+
+         // subscribe
+         newConn = StompClientConnectionFactory.createClientConnection("1.1",
+               hostname, port);
+         newConn.connect(defUser, defPass);
+
+         ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
+         subFrame.addHeader("id", "a-sub");
+         subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+         subFrame.addHeader("ack", "auto");
+
+         newConn.sendFrame(subFrame);
+
+         int cnt = 0;
+
+         frame = newConn.receiveFrame();
+
+         while (frame != null)
+         {
+            cnt++;
+            Thread.sleep(500);
+            frame = newConn.receiveFrame(5000);
+         }
+
+         assertEquals(10, cnt);
+
+         // unsub
+         ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
+         unsubFrame.addHeader("id", "a-sub");
+         newConn.sendFrame(unsubFrame);
+      }
+      finally
+      {
+         if (newConn != null)
+            newConn.disconnect();
+         connV11.disconnect();
+      }
+   }
+   
+   public void testSendAndReceiveWithHeartBeats() throws Exception
+   {
+      connV11.connect(defUser, defPass);
+      ClientStompFrame frame = connV11.createFrame("SEND");
+      frame.addHeader("destination", getQueuePrefix() + getQueueName());
+      frame.addHeader("content-type", "text/plain");
+      
+      for (int i = 0; i < 10; i++)
+      {
+         frame.setBody("Hello World " + i + "!");
+         connV11.sendFrame(frame);
+         Thread.sleep(500);
+      }
+      
+      //subscribe
+      StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+      try
+      {
+         frame = newConn.createFrame("CONNECT");
+         frame.addHeader("host", "127.0.0.1");
+         frame.addHeader("login", this.defUser);
+         frame.addHeader("passcode", this.defPass);
+         frame.addHeader("heart-beat", "500,1000");
+         frame.addHeader("accept-version", "1.0,1.1");
+
+         newConn.sendFrame(frame);
+
+         newConn.startPinger(500);
+
+         Thread.sleep(500);
+
+         ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
+         subFrame.addHeader("id", "a-sub");
+         subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+         subFrame.addHeader("ack", "auto");
+
+         newConn.sendFrame(subFrame);
+
+         int cnt = 0;
+
+         frame = newConn.receiveFrame();
+
+         while (frame != null)
+         {
+            cnt++;
+            Thread.sleep(500);
+            frame = newConn.receiveFrame(5000);
+         }
+
+         assertEquals(10, cnt);
+
+         // unsub
+         ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
+         unsubFrame.addHeader("id", "a-sub");
+         newConn.sendFrame(unsubFrame);
+      }
+      finally
+      {
+         newConn.disconnect();
+      }
+   }
+
+   public void testSendWithHeartBeatsAndReceiveWithHeartBeats() throws Exception
+   {
+      StompClientConnection newConn = null;
+      try
+      {
+         ClientStompFrame frame = connV11.createFrame("CONNECT");
+         frame.addHeader("host", "127.0.0.1");
+         frame.addHeader("login", this.defUser);
+         frame.addHeader("passcode", this.defPass);
+         frame.addHeader("heart-beat", "500,1000");
+         frame.addHeader("accept-version", "1.0,1.1");
+
+         connV11.sendFrame(frame);
+
+         connV11.startPinger(500);
+
+         frame = connV11.createFrame("SEND");
+         frame.addHeader("destination", getQueuePrefix() + getQueueName());
+         frame.addHeader("content-type", "text/plain");
+
+         for (int i = 0; i < 10; i++)
+         {
+            frame.setBody("Hello World " + i + "!");
+            connV11.sendFrame(frame);
+            Thread.sleep(500);
+         }
+
+         // subscribe
+         newConn = StompClientConnectionFactory.createClientConnection("1.1",
+               hostname, port);
+         frame = newConn.createFrame("CONNECT");
+         frame.addHeader("host", "127.0.0.1");
+         frame.addHeader("login", this.defUser);
+         frame.addHeader("passcode", this.defPass);
+         frame.addHeader("heart-beat", "500,1000");
+         frame.addHeader("accept-version", "1.0,1.1");
+
+         newConn.sendFrame(frame);
+
+         newConn.startPinger(500);
+
+         Thread.sleep(500);
+
+         ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
+         subFrame.addHeader("id", "a-sub");
+         subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+         subFrame.addHeader("ack", "auto");
+
+         newConn.sendFrame(subFrame);
+
+         int cnt = 0;
+
+         frame = newConn.receiveFrame();
+
+         while (frame != null)
+         {
+            cnt++;
+            Thread.sleep(500);
+            frame = newConn.receiveFrame(5000);
+         }
+         assertEquals(10, cnt);
+
+         // unsub
+         ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
+         unsubFrame.addHeader("id", "a-sub");
+         newConn.sendFrame(unsubFrame);
+      }
+      finally
+      {
+         if (newConn != null)
+            newConn.disconnect();
+         connV11.disconnect();
+      }
+   }
+
+   public void testNack() throws Exception
+   {
+      connV11.connect(defUser, defPass);
+
+      subscribe(connV11, "sub1", "client");
+
+      sendMessage(getName());
+
+      ClientStompFrame frame = connV11.receiveFrame();
+
+      String messageID = frame.getHeader("message-id");
+      
+      System.out.println("Received message with id " + messageID);
+      
+      nack(connV11, "sub1", messageID);
+      
+      unsubscribe(connV11, "sub1");
+      
+      connV11.disconnect();
+
+      //Nack makes the message be dropped.
+      MessageConsumer consumer = session.createConsumer(queue);
+      Message message = consumer.receive(1000);
+      Assert.assertNull(message);
+   }
+
+   public void testNackWithWrongSubId() throws Exception
+   {
+      connV11.connect(defUser, defPass);
+
+      subscribe(connV11, "sub1", "client");
+
+      sendMessage(getName());
+
+      ClientStompFrame frame = connV11.receiveFrame();
+
+      String messageID = frame.getHeader("message-id");
+      
+      System.out.println("Received message with id " + messageID);
+      
+      nack(connV11, "sub2", messageID);
+      
+      ClientStompFrame error = connV11.receiveFrame();
+      
+      System.out.println("Receiver error: " + error);
+      
+      unsubscribe(connV11, "sub1");
+      
+      connV11.disconnect();
+
+      //message should be still there
+      MessageConsumer consumer = session.createConsumer(queue);
+      Message message = consumer.receive(1000);
+      Assert.assertNotNull(message);
+   }
+
+   public void testNackWithWrongMessageId() throws Exception
+   {
+      connV11.connect(defUser, defPass);
+
+      subscribe(connV11, "sub1", "client");
+
+      sendMessage(getName());
+
+      ClientStompFrame frame = connV11.receiveFrame();
+
+      String messageID = frame.getHeader("message-id");
+      
+      System.out.println("Received message with id " + messageID);
+      
+      nack(connV11, "sub2", "someother");
+      
+      ClientStompFrame error = connV11.receiveFrame();
+      
+      System.out.println("Receiver error: " + error);
+      
+      unsubscribe(connV11, "sub1");
+      
+      connV11.disconnect();
+
+      //message should still there
+      MessageConsumer consumer = session.createConsumer(queue);
+      Message message = consumer.receive(1000);
+      Assert.assertNotNull(message);
+   }
+   
+   
+   public void testAck() throws Exception
+   {
+      connV11.connect(defUser, defPass);
+
+      subscribe(connV11, "sub1", "client");
+
+      sendMessage(getName());
+
+      ClientStompFrame frame = connV11.receiveFrame();
+
+      String messageID = frame.getHeader("message-id");
+      
+      System.out.println("Received message with id " + messageID);
+      
+      ack(connV11, "sub1", messageID, null);
+      
+      unsubscribe(connV11, "sub1");
+      
+      connV11.disconnect();
+
+      //Nack makes the message be dropped.
+      MessageConsumer consumer = session.createConsumer(queue);
+      Message message = consumer.receive(1000);
+      Assert.assertNull(message);
+   }
+
+   public void testAckWithWrongSubId() throws Exception
+   {
+      connV11.connect(defUser, defPass);
+
+      subscribe(connV11, "sub1", "client");
+
+      sendMessage(getName());
+
+      ClientStompFrame frame = connV11.receiveFrame();
+
+      String messageID = frame.getHeader("message-id");
+      
+      System.out.println("Received message with id " + messageID);
+      
+      ack(connV11, "sub2", messageID, null);
+      
+      ClientStompFrame error = connV11.receiveFrame();
+      
+      System.out.println("Receiver error: " + error);
+      
+      unsubscribe(connV11, "sub1");
+      
+      connV11.disconnect();
+
+      //message should be still there
+      MessageConsumer consumer = session.createConsumer(queue);
+      Message message = consumer.receive(1000);
+      Assert.assertNotNull(message);
+   }
+
+   public void testAckWithWrongMessageId() throws Exception
+   {
+      connV11.connect(defUser, defPass);
+
+      subscribe(connV11, "sub1", "client");
+
+      sendMessage(getName());
+
+      ClientStompFrame frame = connV11.receiveFrame();
+
+      String messageID = frame.getHeader("message-id");
+      
+      System.out.println("Received message with id " + messageID);
+      
+      ack(connV11, "sub2", "someother", null);
+      
+      ClientStompFrame error = connV11.receiveFrame();
+      
+      System.out.println("Receiver error: " + error);
+      
+      unsubscribe(connV11, "sub1");
+      
+      connV11.disconnect();
+
+      //message should still there
+      MessageConsumer consumer = session.createConsumer(queue);
+      Message message = consumer.receive(1000);
+      Assert.assertNotNull(message);
+   }
+   
+   public void testErrorWithReceipt() throws Exception
+   {
+      connV11.connect(defUser, defPass);
+
+      subscribe(connV11, "sub1", "client");
+
+      sendMessage(getName());
+
+      ClientStompFrame frame = connV11.receiveFrame();
+
+      String messageID = frame.getHeader("message-id");
+      
+      System.out.println("Received message with id " + messageID);
+      
+      ClientStompFrame ackFrame = connV11.createFrame("ACK");
+      //give it a wrong sub id
+      ackFrame.addHeader("subscription", "sub2");
+      ackFrame.addHeader("message-id", messageID);
+      ackFrame.addHeader("receipt", "answer-me");
+      
+      ClientStompFrame error = connV11.sendFrame(ackFrame);
+      
+      System.out.println("Receiver error: " + error);
+      
+      assertEquals("ERROR", error.getCommand());
+      
+      assertEquals("answer-me", error.getHeader("receipt-id"));
+      
+      unsubscribe(connV11, "sub1");
+      
+      connV11.disconnect();
+
+      //message should still there
+      MessageConsumer consumer = session.createConsumer(queue);
+      Message message = consumer.receive(1000);
+      Assert.assertNotNull(message);      
+   }
+   
+   public void testErrorWithReceipt2() throws Exception
+   {
+      connV11.connect(defUser, defPass);
+
+      subscribe(connV11, "sub1", "client");
+
+      sendMessage(getName());
+
+      ClientStompFrame frame = connV11.receiveFrame();
+
+      String messageID = frame.getHeader("message-id");
+      
+      System.out.println("Received message with id " + messageID);
+      
+      ClientStompFrame ackFrame = connV11.createFrame("ACK");
+      //give it a wrong sub id
+      ackFrame.addHeader("subscription", "sub1");
+      ackFrame.addHeader("message-id", String.valueOf(Long.valueOf(messageID) + 1));
+      ackFrame.addHeader("receipt", "answer-me");
+      
+      ClientStompFrame error = connV11.sendFrame(ackFrame);
+      
+      System.out.println("Receiver error: " + error);
+      
+      assertEquals("ERROR", error.getCommand());
+      
+      assertEquals("answer-me", error.getHeader("receipt-id"));
+      
+      unsubscribe(connV11, "sub1");
+      
+      connV11.disconnect();
+
+      //message should still there
+      MessageConsumer consumer = session.createConsumer(queue);
+      Message message = consumer.receive(1000);
+      Assert.assertNotNull(message);      
+   }
+   
+   public void testAckModeClient() throws Exception
+   {
+      connV11.connect(defUser, defPass);
+
+      subscribe(connV11, "sub1", "client");
+      
+      int num = 50;
+      //send a bunch of messages
+      for (int i = 0; i < num; i++)
+      {
+         this.sendMessage("client-ack" + i);
+      }
+      
+      ClientStompFrame frame = null;
+      
+      for (int i = 0; i < num; i++)
+      {
+         frame = connV11.receiveFrame();
+         assertNotNull(frame);
+      }
+      
+      //ack the last
+      this.ack(connV11, "sub1", frame);
+      
+      unsubscribe(connV11, "sub1");
+      
+      connV11.disconnect();
+      
+      //no messages can be received.
+      MessageConsumer consumer = session.createConsumer(queue);
+      Message message = consumer.receive(1000);
+      Assert.assertNull(message);
+   }
+   
+   public void testAckModeClient2() throws Exception
+   {
+      connV11.connect(defUser, defPass);
+
+      subscribe(connV11, "sub1", "client");
+      
+      int num = 50;
+      //send a bunch of messages
+      for (int i = 0; i < num; i++)
+      {
+         this.sendMessage("client-ack" + i);
+      }
+      
+      ClientStompFrame frame = null;
+      
+      for (int i = 0; i < num; i++)
+      {
+         frame = connV11.receiveFrame();
+         assertNotNull(frame);
+
+         //ack the 49th
+         if (i == num - 2)
+         {
+            this.ack(connV11, "sub1", frame);
+         }
+      }
+      
+      unsubscribe(connV11, "sub1");
+      
+      connV11.disconnect();
+      
+      //no messages can be received.
+      MessageConsumer consumer = session.createConsumer(queue);
+      Message message = consumer.receive(1000);
+      Assert.assertNotNull(message);
+      message = consumer.receive(1000);
+      Assert.assertNull(message);
+   }
+   
+   public void testAckModeAuto() throws Exception
+   {
+      connV11.connect(defUser, defPass);
+
+      subscribe(connV11, "sub1", "auto");
+      
+      int num = 50;
+      //send a bunch of messages
+      for (int i = 0; i < num; i++)
+      {
+         this.sendMessage("auto-ack" + i);
+      }
+      
+      ClientStompFrame frame = null;
+      
+      for (int i = 0; i < num; i++)
+      {
+         frame = connV11.receiveFrame();
+         assertNotNull(frame);
+      }
+      
+      unsubscribe(connV11, "sub1");
+      
+      connV11.disconnect();
+      
+      //no messages can be received.
+      MessageConsumer consumer = session.createConsumer(queue);
+      Message message = consumer.receive(1000);
+      Assert.assertNull(message);
+   }
+   
+   public void testAckModeClientIndividual() throws Exception
+   {
+      connV11.connect(defUser, defPass);
+
+      subscribe(connV11, "sub1", "client-individual");
+      
+      int num = 50;
+      //send a bunch of messages
+      for (int i = 0; i < num; i++)
+      {
+         this.sendMessage("client-individual-ack" + i);
+      }
+      
+      ClientStompFrame frame = null;
+      
+      for (int i = 0; i < num; i++)
+      {
+         frame = connV11.receiveFrame();
+         assertNotNull(frame);
+         
+         System.out.println(i + " == received: " + frame);
+         //ack on even numbers
+         if (i%2 == 0)
+         {
+            this.ack(connV11, "sub1", frame);
+         }
+      }
+      
+      unsubscribe(connV11, "sub1");
+      
+      connV11.disconnect();
+      
+      //no messages can be received.
+      MessageConsumer consumer = session.createConsumer(queue);
+      
+      TextMessage message = null;
+      for (int i = 0; i < num/2; i++)
+      {
+         message = (TextMessage) consumer.receive(1000);
+         Assert.assertNotNull(message);
+         System.out.println("Legal: " + message.getText());
+      }
+      
+      message = (TextMessage) consumer.receive(1000);
+      
+      Assert.assertNull(message);
+   }
+   
+   public void testTwoSubscribers() throws Exception
+   {
+      connV11.connect(defUser, defPass, "myclientid");
+
+      this.subscribeTopic(connV11, "sub1", "auto", null);
+      
+      StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+      newConn.connect(defUser, defPass, "myclientid2");
+      
+      this.subscribeTopic(newConn, "sub2", "auto", null);
+
+      ClientStompFrame frame = connV11.createFrame("SEND");
+      frame.addHeader("destination", getTopicPrefix() + getTopicName());
+      
+      frame.setBody("Hello World");
+      
+      connV11.sendFrame(frame);
+      
+      // receive message from socket
+      frame = connV11.receiveFrame(1000);
+      
+      System.out.println("received frame : " + frame);
+      assertEquals("Hello World", frame.getBody());
+      assertEquals("sub1", frame.getHeader("subscription"));
+      
+      frame = newConn.receiveFrame(1000);
+      
+      System.out.println("received 2 frame : " + frame);
+      assertEquals("Hello World", frame.getBody());
+      assertEquals("sub2", frame.getHeader("subscription"));
+      
+      // remove suscription
+      this.unsubscribe(connV11, "sub1", true);
+      this.unsubscribe(newConn, "sub2", true);
+      
+      connV11.disconnect();
+      newConn.disconnect();
+   }
+   
+   public void testSendAndReceiveOnDifferentConnections() throws Exception
+   {
+      connV11.connect(defUser, defPass);
+      
+      ClientStompFrame sendFrame = connV11.createFrame("SEND");
+      sendFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+      sendFrame.setBody("Hello World");
+
+      connV11.sendFrame(sendFrame);
+      
+      StompClientConnection connV11_2 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+      connV11_2.connect(defUser, defPass);
+      
+      this.subscribe(connV11_2, "sub1", "auto");
+      
+      ClientStompFrame frame = connV11_2.receiveFrame(2000);
+      
+      assertEquals("MESSAGE", frame.getCommand());
+      assertEquals("Hello World", frame.getBody());
+      
+      connV11.disconnect();
+      connV11_2.disconnect();
+   }
+
+   //----------------Note: tests below are adapted from StompTest
+   
+   public void testBeginSameTransactionTwice() throws Exception
+   {
+      connV11.connect(defUser, defPass);
+
+      beginTransaction(connV11, "tx1");
+      
+      beginTransaction(connV11, "tx1");
+      
+      ClientStompFrame f = connV11.receiveFrame();
+      Assert.assertTrue(f.getCommand().equals("ERROR"));
+   }
+
+   public void testBodyWithUTF8() throws Exception
+   {
+      connV11.connect(defUser, defPass);
+
+      this.subscribe(connV11, getName(), "auto");
+
+      String text = "A" + "\u00ea" + "\u00f1" + "\u00fc" + "C";
+      System.out.println(text);
+      sendMessage(text);
+
+      ClientStompFrame frame = connV11.receiveFrame();
+      System.out.println(frame);
+      Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
+      Assert.assertNotNull(frame.getHeader("destination"));
+      Assert.assertTrue(frame.getBody().equals(text));
+      
+      connV11.disconnect();
+   }
+   
+   public void testClientAckNotPartOfTransaction() throws Exception
+   {
+      connV11.connect(defUser, defPass);
+
+      this.subscribe(connV11, getName(), "client");
+
+      sendMessage(getName());
+
+      ClientStompFrame frame = connV11.receiveFrame();
+
+      Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
+      Assert.assertNotNull(frame.getHeader("destination"));
+      Assert.assertTrue(frame.getBody().equals(getName()));
+      Assert.assertNotNull(frame.getHeader("message-id"));
+
+      String messageID = frame.getHeader("message-id");
+
+      beginTransaction(connV11, "tx1");
+
+      this.ack(connV11, getName(), messageID, "tx1");
+
+      abortTransaction(connV11, "tx1");
+
+      frame = connV11.receiveFrame();
+      
+      assertNull(frame);
+
+      this.unsubscribe(connV11, getName());
+
+      connV11.disconnect();
+   }
+
+   public void testDisconnectAndError() throws Exception
+   {
+      connV11.connect(defUser, defPass);
+
+      this.subscribe(connV11, getName(), "client");
+
+      ClientStompFrame frame = connV11.createFrame("DISCONNECT");
+      frame.addHeader("receipt", "1");
+      
+      ClientStompFrame result = connV11.sendFrame(frame);
+      
+      if (result == null || (!"RECEIPT".equals(result.getCommand())) || (!"1".equals(result.getHeader("receipt-id"))))
+      {
+         fail("Disconnect failed! " + result);
+      }
+
+      // sending a message will result in an error
+      ClientStompFrame sendFrame = connV11.createFrame("SEND");
+      sendFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+      sendFrame.setBody("Hello World");
+      
+      try
+      {
+         connV11.sendFrame(sendFrame);
+         fail("connection should have been closed by server.");
+      }
+      catch (ClosedChannelException e)
+      {
+         //ok.
+      }
+      
+      connV11.destroy();
+   }
+
+   public void testDurableSubscriber() throws Exception
+   {
+      connV11.connect(defUser, defPass);
+
+      this.subscribe(connV11, "sub1", "client", getName());
+
+      this.subscribe(connV11, "sub1", "client", getName());
+
+      ClientStompFrame frame = connV11.receiveFrame();
+      Assert.assertTrue(frame.getCommand().equals("ERROR"));
+
+      connV11.disconnect();
+   }
+
+   public void testDurableSubscriberWithReconnection() throws Exception
+   {
+      connV11.connect(defUser, defPass, "myclientid");
+
+      this.subscribeTopic(connV11, "sub1", "auto", getName());
+
+      ClientStompFrame frame = connV11.createFrame("DISCONNECT");
+      frame.addHeader("receipt", "1");
+      
+      ClientStompFrame result = connV11.sendFrame(frame);
+      
+      if (result == null || (!"RECEIPT".equals(result.getCommand())) || (!"1".equals(result.getHeader("receipt-id"))))
+      {
+         fail("Disconnect failed! " + result);
+      }
+
+      // send the message when the durable subscriber is disconnected
+      sendMessage(getName(), topic);
+
+      connV11.destroy();
+      connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+      connV11.connect(defUser, defPass, "myclientid");
+
+      this.subscribeTopic(connV11, "sub1", "auto", getName());
+
+      // we must have received the message
+      frame = connV11.receiveFrame();
+
+      Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
+      Assert.assertNotNull(frame.getHeader("destination"));
+      Assert.assertEquals(getName(), frame.getBody());
+
+      this.unsubscribe(connV11, "sub1");
+      
+      connV11.disconnect();
+   }
+
+   public void testJMSXGroupIdCanBeSet() throws Exception
+   {
+      MessageConsumer consumer = session.createConsumer(queue);
+
+      connV11.connect(defUser, defPass);
+
+      ClientStompFrame frame = connV11.createFrame("SEND");
+      frame.addHeader("destination", getQueuePrefix() + getQueueName());
+      frame.addHeader("JMSXGroupID", "TEST");
+      frame.setBody("Hello World");
+      
+      connV11.sendFrame(frame);
+
+      TextMessage message = (TextMessage)consumer.receive(1000);
+      Assert.assertNotNull(message);
+      Assert.assertEquals("Hello World", message.getText());
+      // differ from StompConnect
+      Assert.assertEquals("TEST", message.getStringProperty("JMSXGroupID"));
+   }
+
+   public void testMessagesAreInOrder() throws Exception
+   {
+      int ctr = 10;
+      String[] data = new String[ctr];
+
+      connV11.connect(defUser, defPass);
+      
+      this.subscribe(connV11, "sub1", "auto");
+
+      for (int i = 0; i < ctr; ++i)
+      {
+         data[i] = getName() + i;
+         sendMessage(data[i]);
+      }
+
+      ClientStompFrame frame = null;
+      
+      for (int i = 0; i < ctr; ++i)
+      {
+         frame = connV11.receiveFrame();
+         Assert.assertTrue("Message not in order", frame.getBody().equals(data[i]));
+      }
+
+      for (int i = 0; i < ctr; ++i)
+      {
+         data[i] = getName() + ":second:" + i;
+         sendMessage(data[i]);
+      }
+
+      for (int i = 0; i < ctr; ++i)
+      {
+         frame = connV11.receiveFrame();
+         Assert.assertTrue("Message not in order", frame.getBody().equals(data[i]));
+      }
+
+      connV11.disconnect();
+   }
+
+   public void testSubscribeWithAutoAckAndSelector() throws Exception
+   {
+      connV11.connect(defUser, defPass);
+      
+      this.subscribe(connV11, "sub1", "auto", null, "foo = 'zzz'");
+
+      sendMessage("Ignored message", "foo", "1234");
+      sendMessage("Real message", "foo", "zzz");
+
+      ClientStompFrame frame = connV11.receiveFrame();
+
+      Assert.assertTrue("Should have received the real message but got: " + frame, frame.getBody().equals("Real message"));
+
+      connV11.disconnect();
+   }
+
+   public void testRedeliveryWithClientAck() throws Exception
+   {
+      connV11.connect(defUser, defPass);
+
+      this.subscribe(connV11, "subId", "client");
+
+      sendMessage(getName());
+
+      ClientStompFrame frame = connV11.receiveFrame();
+      
+      assertTrue(frame.getCommand().equals("MESSAGE"));
+
+      connV11.disconnect();
+
+      // message should be received since message was not acknowledged
+      MessageConsumer consumer = session.createConsumer(queue);
+      Message message = consumer.receive(1000);
+      Assert.assertNotNull(message);
+      Assert.assertTrue(message.getJMSRedelivered());
+   }
+
+   public void testSendManyMessages() throws Exception
+   {
+      MessageConsumer consumer = session.createConsumer(queue);
+
+      connV11.connect(defUser, defPass);
+
+      int count = 1000;
+      final CountDownLatch latch = new CountDownLatch(count);
+      consumer.setMessageListener(new MessageListener()
+      {
+         public void onMessage(Message arg0)
+         {
+            latch.countDown();
+         }
+      });
+
+      ClientStompFrame frame = connV11.createFrame("SEND");
+      frame.addHeader("destination", getQueuePrefix() + getQueueName());
+      frame.setBody("Hello World");
+
+      for (int i = 1; i <= count; i++)
+      {
+         connV11.sendFrame(frame);
+      }
+
+      assertTrue(latch.await(60, TimeUnit.SECONDS));
+      
+      connV11.disconnect();
+   }
+
+   public void testSendMessage() throws Exception
+   {
+      MessageConsumer consumer = session.createConsumer(queue);
+
+      connV11.connect(defUser, defPass);
+
+      ClientStompFrame frame = connV11.createFrame("SEND");
+      frame.addHeader("destination", getQueuePrefix() + getQueueName());
+      frame.setBody("Hello World");
+      
+      connV11.sendFrame(frame);
+
+      TextMessage message = (TextMessage)consumer.receive(1000);
+      Assert.assertNotNull(message);
+      Assert.assertEquals("Hello World", message.getText());
+      // Assert default priority 4 is used when priority header is not set
+      Assert.assertEquals("getJMSPriority", 4, message.getJMSPriority());
+
+      // Make sure that the timestamp is valid - should
+      // be very close to the current time.
+      long tnow = System.currentTimeMillis();
+      long tmsg = message.getJMSTimestamp();
+      Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
+   }
+
+   public void testSendMessageWithContentLength() throws Exception
+   {
+      MessageConsumer consumer = session.createConsumer(queue);
+
+      connV11.connect(defUser, defPass);
+
+      byte[] data = new byte[] { 1, 0, 0, 4 };
+      
+      ClientStompFrame frame = connV11.createFrame("SEND");
+      
+      frame.addHeader("destination", getQueuePrefix() + getQueueName());
+      frame.setBody(new String(data, "UTF-8"));
+      
+      frame.addHeader("content-length", String.valueOf(data.length));
+
+      connV11.sendFrame(frame);
+      
+      BytesMessage message = (BytesMessage)consumer.receive(10000);
+      Assert.assertNotNull(message);
+
+      assertEquals(data.length, message.getBodyLength());
+      assertEquals(data[0], message.readByte());
+      assertEquals(data[1], message.readByte());
+      assertEquals(data[2], message.readByte());
+      assertEquals(data[3], message.readByte());
+   }
+
+   public void testSendMessageWithCustomHeadersAndSelector() throws Exception
+   {
+      MessageConsumer consumer = session.createConsumer(queue, "foo = 'abc'");
+
+      connV11.connect(defUser, defPass);
+
+      ClientStompFrame frame = connV11.createFrame("SEND");
+      frame.addHeader("foo", "abc");
+      frame.addHeader("bar", "123");
+      
+      frame.addHeader("destination", getQueuePrefix() + getQueueName());
+      frame.setBody("Hello World");
+      
+      connV11.sendFrame(frame);
+
+      TextMessage message = (TextMessage)consumer.receive(1000);
+      Assert.assertNotNull(message);
+      Assert.assertEquals("Hello World", message.getText());
+      Assert.assertEquals("foo", "abc", message.getStringProperty("foo"));
+      Assert.assertEquals("bar", "123", message.getStringProperty("bar"));
+   }
+   
+   public void testSendMessageWithLeadingNewLine() throws Exception
+   {
+      MessageConsumer consumer = session.createConsumer(queue);
+
+      connV11.connect(defUser, defPass);
+      
+      ClientStompFrame frame = connV11.createFrame("SEND");
+
+      frame.addHeader("destination", getQueuePrefix() + getQueueName());
+      frame.setBody("Hello World");
+
+      connV11.sendWickedFrame(frame);
+
+      TextMessage message = (TextMessage)consumer.receive(1000);
+      Assert.assertNotNull(message);
+      Assert.assertEquals("Hello World", message.getText());
+
+      // Make sure that the timestamp is valid - should
+      // be very close to the current time.
+      long tnow = System.currentTimeMillis();
+      long tmsg = message.getJMSTimestamp();
+      Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
+      
+      assertNull(consumer.receive(1000));
+      
+      connV11.disconnect();
+   }
+
+   public void testSendMessageWithReceipt() throws Exception
+   {
+      MessageConsumer consumer = session.createConsumer(queue);
+
+      connV11.connect(defUser, defPass);
+
+      ClientStompFrame frame = connV11.createFrame("SEND");
+      frame.addHeader("destination", getQueuePrefix() + getQueueName());
+      frame.addHeader("receipt", "1234");
+      frame.setBody("Hello World");
+
+      frame = connV11.sendFrame(frame);
+      
+      assertTrue(frame.getCommand().equals("RECEIPT"));
+      assertEquals("1234", frame.getHeader("receipt-id"));
+
+      TextMessage message = (TextMessage)consumer.receive(1000);
+      Assert.assertNotNull(message);
+      Assert.assertEquals("Hello World", message.getText());
+
+      // Make sure that the timestamp is valid - should
+      // be very close to the current time.
+      long tnow = System.currentTimeMillis();
+      long tmsg = message.getJMSTimestamp();
+      Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
+      
+      connV11.disconnect();
+   }
+
+   public void testSendMessageWithStandardHeaders() throws Exception
+   {
+      MessageConsumer consumer = session.createConsumer(queue);
+
+      connV11.connect(defUser, defPass);
+
+      ClientStompFrame frame = connV11.createFrame("SEND");
+      frame.addHeader("destination", getQueuePrefix() + getQueueName());
+      frame.addHeader("correlation-id", "c123");
+      frame.addHeader("persistent", "true");
+      frame.addHeader("priority", "3");
+      frame.addHeader("type", "t345");
+      frame.addHeader("JMSXGroupID", "abc");
+      frame.addHeader("foo", "abc");
+      frame.addHeader("bar", "123");
+
+      frame.setBody("Hello World");
+
+      frame = connV11.sendFrame(frame);
+
+      TextMessage message = (TextMessage)consumer.receive(1000);
+      Assert.assertNotNull(message);
+      Assert.assertEquals("Hello World", message.getText());
+      Assert.assertEquals("JMSCorrelationID", "c123", message.getJMSCorrelationID());
+      Assert.assertEquals("getJMSType", "t345", message.getJMSType());
+      Assert.assertEquals("getJMSPriority", 3, message.getJMSPriority());
+      Assert.assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode());
+      Assert.assertEquals("foo", "abc", message.getStringProperty("foo"));
+      Assert.assertEquals("bar", "123", message.getStringProperty("bar"));
+
+      Assert.assertEquals("JMSXGroupID", "abc", message.getStringProperty("JMSXGroupID"));
+      
+      connV11.disconnect();
+   }
+
+   public void testSubscribeToTopic() throws Exception
+   {
+      connV11.connect(defUser, defPass);
+      
+      this.subscribeTopic(connV11, "sub1", null, null, true);
+
+      sendMessage(getName(), topic);
+      
+      ClientStompFrame frame = connV11.receiveFrame();
+
+      Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
+      Assert.assertTrue(frame.getHeader("destination").equals(getTopicPrefix() + getTopicName()));
+      Assert.assertTrue(frame.getBody().equals(getName()));
+
+      this.unsubscribe(connV11, "sub1", true);
+
+      sendMessage(getName(), topic);
+
+      frame = connV11.receiveFrame(1000);
+      assertNull(frame);
+
+      connV11.disconnect();
+   }
+
+   public void testSubscribeToTopicWithNoLocal() throws Exception
+   {
+      connV11.connect(defUser, defPass);
+
+      this.subscribeTopic(connV11, "sub1", null, null, true, true);
+
+      // send a message on the same connection => it should not be received
+      ClientStompFrame frame = connV11.createFrame("SEND");
+      frame.addHeader("destination", getTopicPrefix() + getTopicName());
+
+      frame.setBody("Hello World");
+
+      connV11.sendFrame(frame);
+
+      frame = connV11.receiveFrame(2000);
+      
+      assertNull(frame);
+
+      // send message on another JMS connection => it should be received
+      sendMessage(getName(), topic);
+
+      frame = connV11.receiveFrame();
+
+      Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
+      Assert.assertTrue(frame.getHeader("destination").equals(getTopicPrefix() + getTopicName()));
+      Assert.assertTrue(frame.getBody().equals(getName()));
+      
+      this.unsubscribe(connV11, "sub1");
+      
+      connV11.disconnect();
+   }
+
+   public void testSubscribeWithAutoAck() throws Exception
+   {
+      connV11.connect(defUser, defPass);
+
+      this.subscribe(connV11, "sub1", "auto");
+
+      sendMessage(getName());
+
+      ClientStompFrame frame = connV11.receiveFrame();
+
+      Assert.assertEquals("MESSAGE", frame.getCommand());
+      Assert.assertNotNull(frame.getHeader("destination"));
+      Assert.assertEquals(getName(), frame.getBody());
+
+      connV11.disconnect();
+      
+      // message should not be received as it was auto-acked
+      MessageConsumer consumer = session.createConsumer(queue);
+      Message message = consumer.receive(1000);
+      Assert.assertNull(message);
+   }
+
+   public void testSubscribeWithAutoAckAndBytesMessage() throws Exception
+   {
+      connV11.connect(defUser, defPass);
+      
+      this.subscribe(connV11, "sub1", "auto");
+
+      byte[] payload = new byte[] { 1, 2, 3, 4, 5 };
+      sendMessage(payload, queue);
+
+      ClientStompFrame frame = connV11.receiveFrame();
+
+      assertEquals("MESSAGE", frame.getCommand());
+      
+      System.out.println("Message: " + frame);
+
+      assertEquals("5", frame.getHeader("content-length"));
+
+      assertEquals(null, frame.getHeader("type"));
+      
+      assertEquals(frame.getBody(), new String(payload, "UTF-8"));
+      
+      connV11.disconnect();
+   }
+
+   public void testSubscribeWithClientAck() throws Exception
+   {
+      connV11.connect(defUser, defPass);
+      
+      this.subscribe(connV11, "sub1", "client");
+
+      sendMessage(getName());
+
+      ClientStompFrame frame = connV11.receiveFrame();
+
+      this.ack(connV11, "sub1", frame);
+
+      connV11.disconnect();
+
+      // message should not be received since message was acknowledged by the client
+      MessageConsumer consumer = session.createConsumer(queue);
+      Message message = consumer.receive(1000);
+      Assert.assertNull(message);
+   }
+
+   public void testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithExplicitDisconnect() throws Exception
+   {
+      assertSubscribeWithClientAckThenConsumeWithAutoAck(true);
+   }
+
+   public void testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithNoDisconnectFrame() throws Exception
+   {
+      assertSubscribeWithClientAckThenConsumeWithAutoAck(false);
+   }
+
+   public void testSubscribeWithID() throws Exception
+   {
+      connV11.connect(defUser, defPass);
+      
+      this.subscribe(connV11, "mysubid", "auto");
+
+      sendMessage(getName());
+
+      ClientStompFrame frame = connV11.receiveFrame();
+
+      Assert.assertTrue(frame.getHeader("subscription") != null);
+
+      connV11.disconnect();
+   }
+
+   public void testSubscribeWithMessageSentWithProperties() throws Exception
+   {
+      connV11.connect(defUser, defPass);
+      
+      this.subscribe(connV11, "sub1", "auto");
+
+      MessageProducer producer = session.createProducer(queue);
+      BytesMessage message = session.createBytesMessage();
+      message.setStringProperty("S", "value");
+      message.setBooleanProperty("n", false);
+      message.setByteProperty("byte", (byte)9);
+      message.setDoubleProperty("d", 2.0);
+      message.setFloatProperty("f", (float)6.0);
+      message.setIntProperty("i", 10);
+      message.setLongProperty("l", 121);
+      message.setShortProperty("s", (short)12);
+      message.writeBytes("Hello World".getBytes("UTF-8"));
+      producer.send(message);
+
+      ClientStompFrame frame = connV11.receiveFrame();
+      Assert.assertNotNull(frame);
+      
+      Assert.assertTrue(frame.getHeader("S") != null);
+      Assert.assertTrue(frame.getHeader("n") != null);
+      Assert.assertTrue(frame.getHeader("byte") != null);
+      Assert.assertTrue(frame.getHeader("d") != null);
+      Assert.assertTrue(frame.getHeader("f") != null);
+      Assert.assertTrue(frame.getHeader("i") != null);
+      Assert.assertTrue(frame.getHeader("l") != null);
+      Assert.assertTrue(frame.getHeader("s") != null);
+      Assert.assertEquals("Hello World", frame.getBody());
+
+      connV11.disconnect();
+   }
+
+   public void testSuccessiveTransactionsWithSameID() throws Exception
+   {
+      MessageConsumer consumer = session.createConsumer(queue);
+
+      connV11.connect(defUser, defPass);
+
+      // first tx
+      this.beginTransaction(connV11, "tx1");
+
+      ClientStompFrame frame = connV11.createFrame("SEND");
+      frame.addHeader("destination", getQueuePrefix() + getQueueName());
+      frame.addHeader("transaction", "tx1");
+      frame.setBody("Hello World");
+
+      connV11.sendFrame(frame);
+
+      this.commitTransaction(connV11, "tx1");
+
+      Message message = consumer.receive(1000);
+      Assert.assertNotNull("Should have received a message", message);
+
+      // 2nd tx with same tx ID
+      this.beginTransaction(connV11, "tx1");
+
+      frame = connV11.createFrame("SEND");
+      frame.addHeader("destination", getQueuePrefix() + getQueueName());
+      frame.addHeader("transaction", "tx1");
+      
+      frame.setBody("Hello World");
+      
+      connV11.sendFrame(frame);
+
+      this.commitTransaction(connV11, "tx1");
+
+      message = consumer.receive(1000);
+      Assert.assertNotNull("Should have received a message", message);
+      
+      connV11.disconnect();
+   }
+
+   public void testTransactionCommit() throws Exception
+   {
+      MessageConsumer consumer = session.createConsumer(queue);
+
+      connV11.connect(defUser, defPass);
+
+      this.beginTransaction(connV11, "tx1");
+
+      ClientStompFrame frame = connV11.createFrame("SEND");
+      frame.addHeader("destination", getQueuePrefix() + getQueueName());
+      frame.addHeader("transaction", "tx1");
+      frame.addHeader("receipt", "123");
+      frame.setBody("Hello World");
+
+      frame = connV11.sendFrame(frame);
+      
+      assertEquals("123", frame.getHeader("receipt-id"));
+      
+      // check the message is not committed
+      assertNull(consumer.receive(100));
+      
+      this.commitTransaction(connV11, "tx1", true);
+
+      Message message = consumer.receive(1000);
+      Assert.assertNotNull("Should have received a message", message);
+      
+      connV11.disconnect();
+   }
+
+   public void testTransactionRollback() throws Exception
+   {
+      MessageConsumer consumer = session.createConsumer(queue);
+
+      connV11.connect(defUser, defPass);
+      
+      this.beginTransaction(connV11, "tx1");
+
+      ClientStompFrame frame = connV11.createFrame("SEND");
+      frame.addHeader("destination", getQueuePrefix() + getQueueName());
+      frame.addHeader("transaction", "tx1");
+
+      frame.setBody("first message");
+      
+      connV11.sendFrame(frame);
+
+      // rollback first message
+      this.abortTransaction(connV11, "tx1");
+
+      this.beginTransaction(connV11, "tx1");
+
+      frame = connV11.createFrame("SEND");
+      frame.addHeader("destination", getQueuePrefix() + getQueueName());
+      frame.addHeader("transaction", "tx1");
+
+      frame.setBody("second message");
+      
+      connV11.sendFrame(frame);
+
+      this.commitTransaction(connV11, "tx1", true);
+
+      // only second msg should be received since first msg was rolled back
+      TextMessage message = (TextMessage)consumer.receive(1000);
+      Assert.assertNotNull(message);
+      Assert.assertEquals("second message", message.getText());
+      
+      connV11.disconnect();
+   }
+
+   public void testUnsubscribe() throws Exception
+   {
+      connV11.connect(defUser, defPass);
+
+      this.subscribe(connV11, "sub1", "auto");
+
+      // send a message to our queue
+      sendMessage("first message");
+
+      // receive message from socket
+      ClientStompFrame frame = connV11.receiveFrame();
+      
+      Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
+
+      // remove suscription
+      this.unsubscribe(connV11, "sub1", true);
+
+      // send a message to our queue
+      sendMessage("second message");
+
+      frame = connV11.receiveFrame(1000);
+      assertNull(frame);
+      
+      connV11.disconnect();
+   }
+
+   //-----------------private help methods
+
+   private void abortTransaction(StompClientConnection conn, String txID) throws IOException, InterruptedException
+   {
+      ClientStompFrame abortFrame = conn.createFrame("ABORT");
+      abortFrame.addHeader("transaction", txID);
+
+      conn.sendFrame(abortFrame);
+   }
+
+   private void beginTransaction(StompClientConnection conn, String txID) throws IOException, InterruptedException
+   {
+      ClientStompFrame beginFrame = conn.createFrame("BEGIN");
+      beginFrame.addHeader("transaction", txID);
+      
+      conn.sendFrame(beginFrame);
+   }
+
+   private void commitTransaction(StompClientConnection conn, String txID) throws IOException, InterruptedException
+   {
+      commitTransaction(conn, txID, false);
+   }
+
+   private void commitTransaction(StompClientConnection conn, String txID, boolean receipt) throws IOException, InterruptedException
+   {
+      ClientStompFrame beginFrame = conn.createFrame("COMMIT");
+      beginFrame.addHeader("transaction", txID);
+      if (receipt)
+      {
+         beginFrame.addHeader("receipt", "1234");
+      }
+      ClientStompFrame resp = conn.sendFrame(beginFrame);
+      if (receipt)
+      {
+         assertEquals("1234", resp.getHeader("receipt-id"));
+      }
+   }
+
+   private void ack(StompClientConnection conn, String subId,
+         ClientStompFrame frame) throws IOException, InterruptedException
+   {
+      String messageID = frame.getHeader("message-id");
+      
+      ClientStompFrame ackFrame = conn.createFrame("ACK");
+
+      ackFrame.addHeader("subscription", subId);
+      ackFrame.addHeader("message-id", messageID);
+      
+      ClientStompFrame response = conn.sendFrame(ackFrame);
+      if (response != null)
+      {
+         throw new IOException("failed to ack " + response);
+      }
+   }
+
+   private void ack(StompClientConnection conn, String subId, String mid, String txID) throws IOException, InterruptedException
+   {
+      ClientStompFrame ackFrame = conn.createFrame("ACK");
+      ackFrame.addHeader("subscription", subId);
+      ackFrame.addHeader("message-id", mid);
+      if (txID != null)
+      {
+         ackFrame.addHeader("transaction", txID);
+      }
+      
+      conn.sendFrame(ackFrame);
+   }
+
+   private void nack(StompClientConnection conn, String subId, String mid) throws IOException, InterruptedException
+   {
+      ClientStompFrame ackFrame = conn.createFrame("NACK");
+      ackFrame.addHeader("subscription", subId);
+      ackFrame.addHeader("message-id", mid);
+      
+      conn.sendFrame(ackFrame);
+   }
+   
+   private void subscribe(StompClientConnection conn, String subId, String ack) throws IOException, InterruptedException
+   {
+      subscribe(conn, subId, ack, null, null);
+   }
+
+   private void subscribe(StompClientConnection conn, String subId,
+         String ack, String durableId) throws IOException, InterruptedException
+   {
+      subscribe(conn, subId, ack, durableId, null);
+   }
+
+   private void subscribe(StompClientConnection conn, String subId,
+         String ack, String durableId, boolean receipt) throws IOException, InterruptedException
+   {
+      subscribe(conn, subId, ack, durableId, null, receipt);
+   }
+
+   private void subscribe(StompClientConnection conn, String subId, String ack,
+         String durableId, String selector) throws IOException,
+         InterruptedException
+   {
+      subscribe(conn, subId, ack, durableId, selector, false);
+   }
+   
+   private void subscribe(StompClientConnection conn, String subId,
+         String ack, String durableId, String selector, boolean receipt) throws IOException, InterruptedException
+   {
+      ClientStompFrame subFrame = conn.createFrame("SUBSCRIBE");
+      subFrame.addHeader("id", subId);
+      subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+      if (ack != null)
+      {
+         subFrame.addHeader("ack", ack);
+      }
+      if (durableId != null)
+      {
+         subFrame.addHeader("durable-subscriber-name", durableId);
+      }
+      if (selector != null)
+      {
+         subFrame.addHeader("selector", selector);
+      }
+      if (receipt)
+      {
+         subFrame.addHeader("receipt", "1234");
+      }
+      
+      subFrame = conn.sendFrame(subFrame);
+      
+      if (receipt)
+      {
+         assertEquals("1234", subFrame.getHeader("receipt-id"));
+      }
+   }
+
+   private void subscribeTopic(StompClientConnection conn, String subId,
+         String ack, String durableId) throws IOException, InterruptedException
+   {
+      subscribeTopic(conn, subId, ack, durableId, false);
+   }
+
+   private void subscribeTopic(StompClientConnection conn, String subId,
+         String ack, String durableId, boolean receipt) throws IOException, InterruptedException
+   {
+      subscribeTopic(conn, subId, ack, durableId, receipt, false);
+   }
+
+   private void subscribeTopic(StompClientConnection conn, String subId,
+         String ack, String durableId, boolean receipt, boolean noLocal) throws IOException, InterruptedException
+   {
+      ClientStompFrame subFrame = conn.createFrame("SUBSCRIBE");
+      subFrame.addHeader("id", subId);
+      subFrame.addHeader("destination", getTopicPrefix() + getTopicName());
+      if (ack != null)
+      {
+         subFrame.addHeader("ack", ack);
+      }
+      if (durableId != null)
+      {
+         subFrame.addHeader("durable-subscriber-name", durableId);
+      }
+      if (receipt)
+      {
+         subFrame.addHeader("receipt", "1234");
+      }
+      if (noLocal)
+      {
+         subFrame.addHeader("no-local", "true");
+      }
+      
+      ClientStompFrame frame = conn.sendFrame(subFrame);
+      
+      if (receipt)
+      {
+         assertTrue(frame.getHeader("receipt-id").equals("1234"));
+      }
+   }
+
+   private void unsubscribe(StompClientConnection conn, String subId) throws IOException, InterruptedException
+   {
+      ClientStompFrame subFrame = conn.createFrame("UNSUBSCRIBE");
+      subFrame.addHeader("id", subId);
+      
+      conn.sendFrame(subFrame);
+   }
+   
+   private void unsubscribe(StompClientConnection conn, String subId,
+         boolean receipt) throws IOException, InterruptedException
+   {
+      ClientStompFrame subFrame = conn.createFrame("UNSUBSCRIBE");
+      subFrame.addHeader("id", subId);
+      
+      if (receipt)
+      {
+         subFrame.addHeader("receipt", "4321");
+      }
+      
+      ClientStompFrame f = conn.sendFrame(subFrame);
+      
+      if (receipt)
+      {
+         System.out.println("response: " + f);
+         assertEquals("RECEIPT", f.getCommand());
+         assertEquals("4321", f.getHeader("receipt-id"));
+      }
+   }
+
+   protected void assertSubscribeWithClientAckThenConsumeWithAutoAck(boolean sendDisconnect) throws Exception
+   {
+      connV11.connect(defUser, defPass);
+
+      this.subscribe(connV11, "sub1", "client");
+
+      sendMessage(getName());
+      
+      ClientStompFrame frame = connV11.receiveFrame();
+
+      Assert.assertEquals("MESSAGE", frame.getCommand());
+
+      log.info("Reconnecting!");
+
+      if (sendDisconnect)
+      {
+         connV11.disconnect();
+         connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+      }
+      else
+      {
+         connV11.destroy();
+         connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+      }
+
+      // message should be received since message was not acknowledged
+      connV11.connect(defUser, defPass);
+
+      this.subscribe(connV11, "sub1", null);
+
+      frame = connV11.receiveFrame();
+      Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
+
+      connV11.disconnect();
+
+      // now lets make sure we don't see the message again
+      connV11.destroy();
+      connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+      connV11.connect(defUser, defPass);
+      
+      this.subscribe(connV11, "sub1", null, null, true);
+
+      sendMessage("shouldBeNextMessage");
+
+      frame = connV11.receiveFrame();
+      Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
+      Assert.assertEquals("shouldBeNextMessage", frame.getBody());
+   }
+
+}
+
+
+
+
+



More information about the hornetq-commits mailing list