[hornetq-commits] JBoss hornetq SVN: r9118 - in trunk: tests/src/org/hornetq/tests/integration/stomp and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Apr 14 14:35:55 EDT 2010


Author: timfox
Date: 2010-04-14 14:35:54 -0400 (Wed, 14 Apr 2010)
New Revision: 9118

Modified:
   trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
   trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
Log:
fixed stomp race condition

Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java	2010-04-14 17:12:52 UTC (rev 9117)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java	2010-04-14 18:35:54 UTC (rev 9118)
@@ -16,6 +16,7 @@
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.SimpleString;
@@ -48,12 +49,12 @@
 
    private final OperationContext sessionContext;
 
-   private final Map<Long, StompSubscription> subscriptions = new HashMap<Long, StompSubscription>();
+   private final Map<Long, StompSubscription> subscriptions = new ConcurrentHashMap<Long, StompSubscription>();
 
    // key = message ID, value = consumer ID
-   private final Map<Long, Long> messagesToAck = new HashMap<Long, Long>();
+   private final Map<Long, Long> messagesToAck = new ConcurrentHashMap<Long, Long>();
 
-   private boolean noLocal = false;
+   private volatile boolean noLocal = false;
 
    StompSession(final StompConnection connection, final StompProtocolManager manager, OperationContext sessionContext)
    {
@@ -90,19 +91,17 @@
          }
          HornetQBuffer buffer = serverMessage.getBodyBuffer();
          buffer.readerIndex(MessageImpl.BUFFER_HEADER_SPACE + DataConstants.SIZE_INT);
-         int bodyPos = serverMessage.getEndOfBodyPosition() == -1 ? buffer.writerIndex() : serverMessage.getEndOfBodyPosition();
+         int bodyPos = serverMessage.getEndOfBodyPosition() == -1 ? buffer.writerIndex()
+                                                                 : serverMessage.getEndOfBodyPosition();
          int size = bodyPos - buffer.readerIndex();
          byte[] data = new byte[size];
          buffer.readBytes(data);
          headers.put(Headers.CONTENT_LENGTH, data.length);
          serverMessage.getBodyBuffer().resetReaderIndex();
-         
+
          StompFrame frame = new StompFrame(Stomp.Responses.MESSAGE, headers, data);
          StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame, deliveryCount);
 
-         manager.send(connection, frame);
-         int length =  frame.getEncodedSize();
-         
          if (subscription.getAck().equals(Stomp.Headers.Subscribe.AckModeValues.AUTO))
          {
             session.acknowledge(consumerID, serverMessage.getMessageID());
@@ -112,6 +111,11 @@
          {
             messagesToAck.put(serverMessage.getMessageID(), consumerID);
          }
+
+         // Must send AFTER adding to messagesToAck - or could get acked from client BEFORE it's been added!
+         manager.send(connection, frame);
+         int length = frame.getEncodedSize();
+
          return length;
 
       }
@@ -149,7 +153,7 @@
                                String subscriptionID,
                                String clientID,
                                String durableSubscriptionName,
-                               String destination, 
+                               String destination,
                                String selector,
                                String ack) throws Exception
    {
@@ -159,7 +163,7 @@
          // subscribes to a topic
          if (durableSubscriptionName != null)
          {
-            if (clientID == null) 
+            if (clientID == null)
             {
                throw new IllegalStateException("Cannot create a subscriber on the durable subscription if the client-id of the connection is not set");
             }
@@ -177,7 +181,8 @@
                   throw new IllegalStateException("Cannot create a subscriber on the durable subscription since it already has a subscriber: " + queue);
                }
             }
-         } else
+         }
+         else
          {
             queue = UUIDGenerator.getInstance().generateSimpleStringUUID();
             session.createQueue(SimpleString.toSimpleString(destination), queue, null, true, false);
@@ -239,7 +244,7 @@
    {
       return noLocal;
    }
-   
+
    public void setNoLocal(boolean noLocal)
    {
       this.noLocal = noLocal;

Modified: trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java	2010-04-14 17:12:52 UTC (rev 9117)
+++ trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java	2010-04-14 18:35:54 UTC (rev 9118)
@@ -66,1407 +66,1211 @@
 import org.hornetq.spi.core.protocol.ProtocolType;
 import org.hornetq.tests.util.UnitTestCase;
 
-public class StompTest extends UnitTestCase {
-    private static final transient Logger log = Logger.getLogger(StompTest.class);
-    private int port = 61613;
-    private Socket stompSocket;
-    private ByteArrayOutputStream inputBuffer;
-    private ConnectionFactory connectionFactory;
-    private Connection connection;
-    private Session session;
-    private Queue queue;
-    private Topic topic;
-    private JMSServerManager server;
+public class StompTest extends UnitTestCase
+{
+   private static final transient Logger log = Logger.getLogger(StompTest.class);
 
-    public void _testSendManyMessages() throws Exception {
-       MessageConsumer consumer = session.createConsumer(queue);
- 
-       String frame =
-               "CONNECT\n" +
-                       "login: brianm\n" +
-                       "passcode: wombats\n\n" +
-                       Stomp.NULL;
-       sendFrame(frame);
-       frame = receiveFrame(10000);
-       
-       Assert.assertTrue(frame.startsWith("CONNECTED"));    
-       int count = 1000;
-       final CountDownLatch latch = new CountDownLatch(count);
-       consumer.setMessageListener(new MessageListener()
+   private int port = 61613;
+
+   private Socket stompSocket;
+
+   private ByteArrayOutputStream inputBuffer;
+
+   private ConnectionFactory connectionFactory;
+
+   private Connection connection;
+
+   private Session session;
+
+   private Queue queue;
+
+   private Topic topic;
+
+   private JMSServerManager server;
+   
+   public void _testSendManyMessages() throws Exception
+   {
+      MessageConsumer consumer = session.createConsumer(queue);
+
+      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+      sendFrame(frame);
+      frame = receiveFrame(10000);
+
+      Assert.assertTrue(frame.startsWith("CONNECTED"));
+      int count = 1000;
+      final CountDownLatch latch = new CountDownLatch(count);
+      consumer.setMessageListener(new MessageListener()
       {
-         
+
          public void onMessage(Message arg0)
          {
             System.out.println("<<< " + (1000 - latch.getCount()));
             latch.countDown();
          }
       });
-       
-       frame =
-               "SEND\n" +
-                       "destination:" + getQueuePrefix() + getQueueName() + "\n\n" +
-                       "Hello World" +
-                       Stomp.NULL;    
-       for (int i=1; i <= count; i++) {
-          // Thread.sleep(1);
-          System.out.println(">>> " + i);
-          sendFrame(frame);
-       }
-       
-       assertTrue(latch.await(60, TimeUnit.SECONDS));
-        
-    }
-    
-    public void testConnect() throws Exception {
 
-       String connect_frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "request-id: 1\n" + "\n" + Stomp.NULL;
-       sendFrame(connect_frame);
+      frame = "SEND\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL;
+      for (int i = 1; i <= count; i++)
+      {
+         // Thread.sleep(1);
+         System.out.println(">>> " + i);
+         sendFrame(frame);
+      }
 
-        String f = receiveFrame(10000);
-        Assert.assertTrue(f.startsWith("CONNECTED"));
-        Assert.assertTrue(f.indexOf("response-id:1") >= 0);
-    }
-    
-    public void testDisconnectAndError() throws Exception {
+      assertTrue(latch.await(60, TimeUnit.SECONDS));
 
-       String connectFrame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "request-id: 1\n" + "\n" + Stomp.NULL;
-       sendFrame(connectFrame);
+   }
 
-       String f = receiveFrame(10000);
-       Assert.assertTrue(f.startsWith("CONNECTED"));
-       Assert.assertTrue(f.indexOf("response-id:1") >= 0);
-       
-       String disconnectFrame = "DISCONNECT\n\n" + Stomp.NULL;
-       sendFrame(disconnectFrame);
-       
-       waitForFrameToTakeEffect();
-       
-       // sending a message will result in an error
-       String frame =
-          "SEND\n" +
-                  "destination:" + getQueuePrefix() + getQueueName() + "\n\n" +
-                  "Hello World" +
-                  Stomp.NULL;
-       try {
-          sendFrame(frame);
-          Assert.fail("the socket must have been closed when the server handled the DISCONNECT");
-       } catch (IOException e)
-       {
-       }
+   public void testConnect() throws Exception
+   {
+
+      String connect_frame = "CONNECT\n" + "login: brianm\n" +
+                             "passcode: wombats\n" +
+                             "request-id: 1\n" +
+                             "\n" +
+                             Stomp.NULL;
+      sendFrame(connect_frame);
+
+      String f = receiveFrame(10000);
+      Assert.assertTrue(f.startsWith("CONNECTED"));
+      Assert.assertTrue(f.indexOf("response-id:1") >= 0);
    }
 
+   public void testDisconnectAndError() throws Exception
+   {
 
-    public void testSendMessage() throws Exception {
+      String connectFrame = "CONNECT\n" + "login: brianm\n" +
+                            "passcode: wombats\n" +
+                            "request-id: 1\n" +
+                            "\n" +
+                            Stomp.NULL;
+      sendFrame(connectFrame);
 
-        MessageConsumer consumer = session.createConsumer(queue);
+      String f = receiveFrame(10000);
+      Assert.assertTrue(f.startsWith("CONNECTED"));
+      Assert.assertTrue(f.indexOf("response-id:1") >= 0);
 
-        String frame =
-                "CONNECT\n" +
-                        "login: brianm\n" +
-                        "passcode: wombats\n\n" +
-                        Stomp.NULL;
-        sendFrame(frame);
+      String disconnectFrame = "DISCONNECT\n\n" + Stomp.NULL;
+      sendFrame(disconnectFrame);
 
-        frame = receiveFrame(10000);
-        Assert.assertTrue(frame.startsWith("CONNECTED"));
+      waitForFrameToTakeEffect();
 
-        frame =
-                "\nSEND\n" +
-                        "destination:" + getQueuePrefix() + getQueueName() + "\n\n" +
-                        "Hello World" +
-                        Stomp.NULL;
+      // sending a message will result in an error
+      String frame = "SEND\n" + "destination:" +
+                     getQueuePrefix() +
+                     getQueueName() +
+                     "\n\n" +
+                     "Hello World" +
+                     Stomp.NULL;
+      try
+      {
+         sendFrame(frame);
+         Assert.fail("the socket must have been closed when the server handled the DISCONNECT");
+      }
+      catch (IOException e)
+      {
+      }
+   }
 
-        sendFrame(frame);
-        
-        BytesMessage message = (BytesMessage) consumer.receive(1000);
-        Assert.assertNotNull(message);
-        Assert.assertEquals("Hello World", readContent(message));
+   public void testSendMessage() throws Exception
+   {
 
-        // Make sure that the timestamp is valid - should
-        // be very close to the current time.
-        long tnow = System.currentTimeMillis();
-        long tmsg = message.getJMSTimestamp();
-        Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
-    }
-    
-    public void testSendMessageWithReceipt() throws Exception {
+      MessageConsumer consumer = session.createConsumer(queue);
 
-       MessageConsumer consumer = session.createConsumer(queue);
+      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+      sendFrame(frame);
 
-       String frame =
-               "CONNECT\n" +
-                       "login: brianm\n" +
-                       "passcode: wombats\n\n" +
-                       Stomp.NULL;
-       sendFrame(frame);
+      frame = receiveFrame(10000);
+      Assert.assertTrue(frame.startsWith("CONNECTED"));
 
-       frame = receiveFrame(10000);
-       Assert.assertTrue(frame.startsWith("CONNECTED"));
+      frame = "\nSEND\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL;
 
-       frame =
-               "SEND\n" +
-                       "destination:" + getQueuePrefix() + getQueueName() + "\n" +
-                       "receipt: 1234\n\n" +
-                       "Hello World" +
-                       Stomp.NULL;
+      sendFrame(frame);
 
-       sendFrame(frame);
+      BytesMessage message = (BytesMessage)consumer.receive(1000);
+      Assert.assertNotNull(message);
+      Assert.assertEquals("Hello World", readContent(message));
 
-       String f = receiveFrame(10000);
-       Assert.assertTrue(f.startsWith("RECEIPT"));
-       Assert.assertTrue(f.indexOf("receipt-id:1234") >= 0);
+      // Make sure that the timestamp is valid - should
+      // be very close to the current time.
+      long tnow = System.currentTimeMillis();
+      long tmsg = message.getJMSTimestamp();
+      Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
+   }
 
-       BytesMessage message = (BytesMessage) consumer.receive(1000);
-       Assert.assertNotNull(message);
-       Assert.assertEquals("Hello World", readContent(message));
+   public void testSendMessageWithReceipt() throws Exception
+   {
 
-       // Make sure that the timestamp is valid - should
-       // be very close to the current time.
-       long tnow = System.currentTimeMillis();
-       long tmsg = message.getJMSTimestamp();
-       Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
+      MessageConsumer consumer = session.createConsumer(queue);
+
+      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+      sendFrame(frame);
+
+      frame = receiveFrame(10000);
+      Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+      frame = "SEND\n" + "destination:" +
+              getQueuePrefix() +
+              getQueueName() +
+              "\n" +
+              "receipt: 1234\n\n" +
+              "Hello World" +
+              Stomp.NULL;
+
+      sendFrame(frame);
+
+      String f = receiveFrame(10000);
+      Assert.assertTrue(f.startsWith("RECEIPT"));
+      Assert.assertTrue(f.indexOf("receipt-id:1234") >= 0);
+
+      BytesMessage message = (BytesMessage)consumer.receive(1000);
+      Assert.assertNotNull(message);
+      Assert.assertEquals("Hello World", readContent(message));
+
+      // Make sure that the timestamp is valid - should
+      // be very close to the current time.
+      long tnow = System.currentTimeMillis();
+      long tmsg = message.getJMSTimestamp();
+      Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
    }
-    
-    public void testSendMessageWithContentLength() throws Exception {
 
-       MessageConsumer consumer = session.createConsumer(queue);
+   public void testSendMessageWithContentLength() throws Exception
+   {
 
-       String frame =
-               "CONNECT\n" +
-                       "login: brianm\n" +
-                       "passcode: wombats\n\n" +
-                       Stomp.NULL;
-       sendFrame(frame);
+      MessageConsumer consumer = session.createConsumer(queue);
 
-       frame = receiveFrame(10000);
-       Assert.assertTrue(frame.startsWith("CONNECTED"));
+      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+      sendFrame(frame);
 
-       byte[] data = new byte[] {1, 0, 0, 4};
-       
-       frame =
-               "SEND\n" +
-                       "destination:" + getQueuePrefix() + getQueueName() + "\n" +
-                       "content-length:" + data.length + "\n\n";
-       ByteArrayOutputStream baos = new ByteArrayOutputStream();
-       baos.write(frame.getBytes("UTF-8"));
-       baos.write(data);
-       baos.write('\0');
-       sendFrame(baos.toByteArray());
-       
-       BytesMessage message = (BytesMessage) consumer.receive(1000);
-       Assert.assertNotNull(message);
-       assertEquals(data.length, message.getBodyLength());
-       assertEquals(data[0], message.readByte());
-       assertEquals(data[1], message.readByte());
-       assertEquals(data[2], message.readByte());
-       assertEquals(data[3], message.readByte());
+      frame = receiveFrame(10000);
+      Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+      byte[] data = new byte[] { 1, 0, 0, 4 };
+
+      frame = "SEND\n" + "destination:" +
+              getQueuePrefix() +
+              getQueueName() +
+              "\n" +
+              "content-length:" +
+              data.length +
+              "\n\n";
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      baos.write(frame.getBytes("UTF-8"));
+      baos.write(data);
+      baos.write('\0');
+      sendFrame(baos.toByteArray());
+
+      BytesMessage message = (BytesMessage)consumer.receive(1000);
+      Assert.assertNotNull(message);
+      assertEquals(data.length, message.getBodyLength());
+      assertEquals(data[0], message.readByte());
+      assertEquals(data[1], message.readByte());
+      assertEquals(data[2], message.readByte());
+      assertEquals(data[3], message.readByte());
    }
 
-    public void testJMSXGroupIdCanBeSet() throws Exception {
+   public void testJMSXGroupIdCanBeSet() throws Exception
+   {
 
-        MessageConsumer consumer = session.createConsumer(queue);
+      MessageConsumer consumer = session.createConsumer(queue);
 
-        String frame =
-                "CONNECT\n" +
-                        "login: brianm\n" +
-                        "passcode: wombats\n\n" +
-                        Stomp.NULL;
-        sendFrame(frame);
+      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+      sendFrame(frame);
 
-        frame = receiveFrame(10000);
-        Assert.assertTrue(frame.startsWith("CONNECTED"));
+      frame = receiveFrame(10000);
+      Assert.assertTrue(frame.startsWith("CONNECTED"));
 
-        frame =
-                "SEND\n" +
-                        "destination:" + getQueuePrefix() + getQueueName() + "\n" +
-                        "JMSXGroupID: TEST\n\n" +
-                        "Hello World" +
-                        Stomp.NULL;
+      frame = "SEND\n" + "destination:" +
+              getQueuePrefix() +
+              getQueueName() +
+              "\n" +
+              "JMSXGroupID: TEST\n\n" +
+              "Hello World" +
+              Stomp.NULL;
 
-        sendFrame(frame);
+      sendFrame(frame);
 
-        BytesMessage message = (BytesMessage) consumer.receive(1000);
-        Assert.assertNotNull(message);
-        Assert.assertEquals("Hello World", readContent(message));
-        // differ from StompConnect
-        Assert.assertEquals("TEST", message.getStringProperty("JMSXGroupID"));
-    }
+      BytesMessage message = (BytesMessage)consumer.receive(1000);
+      Assert.assertNotNull(message);
+      Assert.assertEquals("Hello World", readContent(message));
+      // differ from StompConnect
+      Assert.assertEquals("TEST", message.getStringProperty("JMSXGroupID"));
+   }
 
-    public void testSendMessageWithCustomHeadersAndSelector() throws Exception {
+   public void testSendMessageWithCustomHeadersAndSelector() throws Exception
+   {
 
-        MessageConsumer consumer = session.createConsumer(queue, "foo = 'abc'");
+      MessageConsumer consumer = session.createConsumer(queue, "foo = 'abc'");
 
-        String frame =
-                "CONNECT\n" +
-                        "login: brianm\n" +
-                        "passcode: wombats\n\n" +
-                        Stomp.NULL;
-        sendFrame(frame);
+      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+      sendFrame(frame);
 
-        frame = receiveFrame(10000);
-        Assert.assertTrue(frame.startsWith("CONNECTED"));
+      frame = receiveFrame(10000);
+      Assert.assertTrue(frame.startsWith("CONNECTED"));
 
-        frame =
-                "SEND\n" +
-                        "foo:abc\n" +
-                        "bar:123\n" +
-                        "destination:" + getQueuePrefix() + getQueueName() + "\n\n" +
-                        "Hello World" +
-                        Stomp.NULL;
+      frame = "SEND\n" + "foo:abc\n" +
+              "bar:123\n" +
+              "destination:" +
+              getQueuePrefix() +
+              getQueueName() +
+              "\n\n" +
+              "Hello World" +
+              Stomp.NULL;
 
-        sendFrame(frame);
+      sendFrame(frame);
 
-        BytesMessage message = (BytesMessage) consumer.receive(1000);
-        Assert.assertNotNull(message);
-        Assert.assertEquals("Hello World", readContent(message));
-        Assert.assertEquals("foo", "abc", message.getStringProperty("foo"));
-        Assert.assertEquals("bar", "123", message.getStringProperty("bar"));
-    }
+      BytesMessage message = (BytesMessage)consumer.receive(1000);
+      Assert.assertNotNull(message);
+      Assert.assertEquals("Hello World", readContent(message));
+      Assert.assertEquals("foo", "abc", message.getStringProperty("foo"));
+      Assert.assertEquals("bar", "123", message.getStringProperty("bar"));
+   }
 
-    public void testSendMessageWithStandardHeaders() throws Exception {
+   public void testSendMessageWithStandardHeaders() throws Exception
+   {
 
-        MessageConsumer consumer = session.createConsumer(queue);
+      MessageConsumer consumer = session.createConsumer(queue);
 
-        String frame =
-                "CONNECT\n" +
-                        "login: brianm\n" +
-                        "passcode: wombats\n\n" +
-                        Stomp.NULL;
-        sendFrame(frame);
+      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+      sendFrame(frame);
 
-        frame = receiveFrame(10000);
-        Assert.assertTrue(frame.startsWith("CONNECTED"));
+      frame = receiveFrame(10000);
+      Assert.assertTrue(frame.startsWith("CONNECTED"));
 
-        frame =
-                "SEND\n" +
-                        "correlation-id:c123\n" +
-                        "persistent:true\n" +
-                        "priority:3\n" +
-                        "type:t345\n" +
-                        "JMSXGroupID:abc\n" +
-                        "foo:abc\n" +
-                        "bar:123\n" +
-                        "destination:" + getQueuePrefix() + getQueueName() + "\n\n" +
-                        "Hello World" +
-                        Stomp.NULL;
+      frame = "SEND\n" + "correlation-id:c123\n" +
+              "persistent:true\n" +
+              "priority:3\n" +
+              "type:t345\n" +
+              "JMSXGroupID:abc\n" +
+              "foo:abc\n" +
+              "bar:123\n" +
+              "destination:" +
+              getQueuePrefix() +
+              getQueueName() +
+              "\n\n" +
+              "Hello World" +
+              Stomp.NULL;
 
-        sendFrame(frame);
+      sendFrame(frame);
 
-        BytesMessage message = (BytesMessage) consumer.receive(1000);
-        Assert.assertNotNull(message);
-        Assert.assertEquals("Hello World", readContent(message));
-        Assert.assertEquals("JMSCorrelationID", "c123", message.getJMSCorrelationID());
-        Assert.assertEquals("getJMSType", "t345", message.getJMSType());
-        Assert.assertEquals("getJMSPriority", 3, message.getJMSPriority());
-        Assert.assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode());
-        Assert.assertEquals("foo", "abc", message.getStringProperty("foo"));
-        Assert.assertEquals("bar", "123", message.getStringProperty("bar"));
+      BytesMessage message = (BytesMessage)consumer.receive(1000);
+      Assert.assertNotNull(message);
+      Assert.assertEquals("Hello World", readContent(message));
+      Assert.assertEquals("JMSCorrelationID", "c123", message.getJMSCorrelationID());
+      Assert.assertEquals("getJMSType", "t345", message.getJMSType());
+      Assert.assertEquals("getJMSPriority", 3, message.getJMSPriority());
+      Assert.assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode());
+      Assert.assertEquals("foo", "abc", message.getStringProperty("foo"));
+      Assert.assertEquals("bar", "123", message.getStringProperty("bar"));
 
-        Assert.assertEquals("JMSXGroupID", "abc", message.getStringProperty("JMSXGroupID"));
-        // FIXME do we support it?
-        //Assert.assertEquals("GroupID", "abc", amqMessage.getGroupID());
-    }
+      Assert.assertEquals("JMSXGroupID", "abc", message.getStringProperty("JMSXGroupID"));
+      // FIXME do we support it?
+      // Assert.assertEquals("GroupID", "abc", amqMessage.getGroupID());
+   }
 
-    public void testSubscribeWithAutoAck() throws Exception {
+   public void testSubscribeWithAutoAck() throws Exception
+   {
 
-        String frame =
-                "CONNECT\n" +
-                        "login: brianm\n" +
-                        "passcode: wombats\n\n" +
-                        Stomp.NULL;
-        sendFrame(frame);
+      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+      sendFrame(frame);
 
-        frame = receiveFrame(100000);
-        Assert.assertTrue(frame.startsWith("CONNECTED"));
+      frame = receiveFrame(100000);
+      Assert.assertTrue(frame.startsWith("CONNECTED"));
 
-        frame =
-                "SUBSCRIBE\n" +
-                        "destination:" + getQueuePrefix() + getQueueName() + "\n" +
-                        "ack:auto\n\n" +
-                        Stomp.NULL;
-        sendFrame(frame);
+      frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
+      sendFrame(frame);
 
-        sendMessage(getName());
+      sendMessage(getName());
 
-        frame = receiveFrame(10000);
-        Assert.assertTrue(frame.startsWith("MESSAGE"));
-        Assert.assertTrue(frame.indexOf("destination:") > 0);
-        Assert.assertTrue(frame.indexOf(getName()) > 0);
+      frame = receiveFrame(10000);
+      Assert.assertTrue(frame.startsWith("MESSAGE"));
+      Assert.assertTrue(frame.indexOf("destination:") > 0);
+      Assert.assertTrue(frame.indexOf(getName()) > 0);
 
-        frame =
-                "DISCONNECT\n" +
-                        "\n\n" +
-                        Stomp.NULL;
-        sendFrame(frame);
-        
-        // message should not be received as it was auto-acked
-        MessageConsumer consumer = session.createConsumer(queue);
-        Message message = consumer.receive(1000);
-        Assert.assertNull(message);
+      frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+      sendFrame(frame);
 
-    }
+      // message should not be received as it was auto-acked
+      MessageConsumer consumer = session.createConsumer(queue);
+      Message message = consumer.receive(1000);
+      Assert.assertNull(message);
 
-    public void testSubscribeWithAutoAckAndBytesMessage() throws Exception {
+   }
 
-        String frame =
-                "CONNECT\n" +
-                        "login: brianm\n" +
-                        "passcode: wombats\n\n" +
-                        Stomp.NULL;
-        sendFrame(frame);
+   public void testSubscribeWithAutoAckAndBytesMessage() throws Exception
+   {
 
-        frame = receiveFrame(100000);
-        Assert.assertTrue(frame.startsWith("CONNECTED"));
+      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+      sendFrame(frame);
 
-        frame =
-                "SUBSCRIBE\n" +
-                        "destination:" + getQueuePrefix() + getQueueName() + "\n" +
-                        "ack:auto\n\n" +
-                        Stomp.NULL;
-        sendFrame(frame);
+      frame = receiveFrame(100000);
+      Assert.assertTrue(frame.startsWith("CONNECTED"));
 
-        byte[] payload = new byte[]{1, 2, 3, 4, 5}; 
-        sendMessage(payload, queue);
+      frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
+      sendFrame(frame);
 
-        frame = receiveFrame(10000);
-        Assert.assertTrue(frame.startsWith("MESSAGE"));
+      byte[] payload = new byte[] { 1, 2, 3, 4, 5 };
+      sendMessage(payload, queue);
 
-        Pattern cl = Pattern.compile("Content-length:\\s*(\\d+)", Pattern.CASE_INSENSITIVE);
-        Matcher cl_matcher = cl.matcher(frame);
-        Assert.assertTrue(cl_matcher.find());
-        Assert.assertEquals("5", cl_matcher.group(1));
+      frame = receiveFrame(10000);
+      Assert.assertTrue(frame.startsWith("MESSAGE"));
 
-        Assert.assertFalse(Pattern.compile("type:\\s*null", Pattern.CASE_INSENSITIVE).matcher(frame).find());
-        Assert.assertTrue(frame.indexOf(new String(payload)) > -1);
-        
-        frame =
-                "DISCONNECT\n" +
-                        "\n\n" +
-                        Stomp.NULL;
-        sendFrame(frame);
-    }
+      Pattern cl = Pattern.compile("Content-length:\\s*(\\d+)", Pattern.CASE_INSENSITIVE);
+      Matcher cl_matcher = cl.matcher(frame);
+      Assert.assertTrue(cl_matcher.find());
+      Assert.assertEquals("5", cl_matcher.group(1));
 
-    public void testSubscribeWithMessageSentWithProperties() throws Exception {
+      Assert.assertFalse(Pattern.compile("type:\\s*null", Pattern.CASE_INSENSITIVE).matcher(frame).find());
+      Assert.assertTrue(frame.indexOf(new String(payload)) > -1);
 
-        String frame =
-                "CONNECT\n" +
-                        "login: brianm\n" +
-                        "passcode: wombats\n\n" +
-                        Stomp.NULL;
-        sendFrame(frame);
+      frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+      sendFrame(frame);
+   }
 
-        frame = receiveFrame(100000);
-        Assert.assertTrue(frame.startsWith("CONNECTED"));
+   public void testSubscribeWithMessageSentWithProperties() throws Exception
+   {
 
-        frame =
-                "SUBSCRIBE\n" +
-                        "destination:" + getQueuePrefix() + getQueueName() + "\n" +
-                        "ack:auto\n\n" +
-                        Stomp.NULL;
-        sendFrame(frame);
+      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+      sendFrame(frame);
 
-        MessageProducer producer = session.createProducer(queue);
-        BytesMessage message = session.createBytesMessage();
-        message.setStringProperty("S", "value");
-        message.setBooleanProperty("n", false);
-        message.setByteProperty("byte", (byte) 9);
-        message.setDoubleProperty("d", 2.0);
-        message.setFloatProperty("f", (float) 6.0);
-        message.setIntProperty("i", 10);
-        message.setLongProperty("l", 121);
-        message.setShortProperty("s", (short) 12);
-        message.writeBytes("Hello World".getBytes("UTF-8"));
-        producer.send(message);
+      frame = receiveFrame(100000);
+      Assert.assertTrue(frame.startsWith("CONNECTED"));
 
-        frame = receiveFrame(10000);
-        Assert.assertNotNull(frame);
-        Assert.assertTrue(frame.startsWith("MESSAGE"));
-        Assert.assertTrue(frame.indexOf("S:") > 0);
-        Assert.assertTrue(frame.indexOf("n:") > 0);
-        Assert.assertTrue(frame.indexOf("byte:") > 0);
-        Assert.assertTrue(frame.indexOf("d:") > 0);
-        Assert.assertTrue(frame.indexOf("f:") > 0);
-        Assert.assertTrue(frame.indexOf("i:") > 0);
-        Assert.assertTrue(frame.indexOf("l:") > 0);
-        Assert.assertTrue(frame.indexOf("s:") > 0);
-        Assert.assertTrue(frame.indexOf("Hello World") > 0);
+      frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
+      sendFrame(frame);
 
-//        System.out.println("out: "+frame);
+      MessageProducer producer = session.createProducer(queue);
+      BytesMessage message = session.createBytesMessage();
+      message.setStringProperty("S", "value");
+      message.setBooleanProperty("n", false);
+      message.setByteProperty("byte", (byte)9);
+      message.setDoubleProperty("d", 2.0);
+      message.setFloatProperty("f", (float)6.0);
+      message.setIntProperty("i", 10);
+      message.setLongProperty("l", 121);
+      message.setShortProperty("s", (short)12);
+      message.writeBytes("Hello World".getBytes("UTF-8"));
+      producer.send(message);
 
-        frame =
-                "DISCONNECT\n" +
-                        "\n\n" +
-                        Stomp.NULL;
-        sendFrame(frame);
-    }
-    
-    public void testSubscribeWithID() throws Exception {
+      frame = receiveFrame(10000);
+      Assert.assertNotNull(frame);
+      Assert.assertTrue(frame.startsWith("MESSAGE"));
+      Assert.assertTrue(frame.indexOf("S:") > 0);
+      Assert.assertTrue(frame.indexOf("n:") > 0);
+      Assert.assertTrue(frame.indexOf("byte:") > 0);
+      Assert.assertTrue(frame.indexOf("d:") > 0);
+      Assert.assertTrue(frame.indexOf("f:") > 0);
+      Assert.assertTrue(frame.indexOf("i:") > 0);
+      Assert.assertTrue(frame.indexOf("l:") > 0);
+      Assert.assertTrue(frame.indexOf("s:") > 0);
+      Assert.assertTrue(frame.indexOf("Hello World") > 0);
 
-       String frame =
-               "CONNECT\n" +
-                       "login: brianm\n" +
-                       "passcode: wombats\n\n" +
-                       Stomp.NULL;
-       sendFrame(frame);
+      // System.out.println("out: "+frame);
 
-       frame = receiveFrame(100000);
-       Assert.assertTrue(frame.startsWith("CONNECTED"));
+      frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+      sendFrame(frame);
+   }
 
-       frame =
-               "SUBSCRIBE\n" +
-                       "destination:" + getQueuePrefix() + getQueueName() + "\n" +
-                       "ack:auto\n" +
-                       "id: mysubid\n\n" +
-                       Stomp.NULL;
-       sendFrame(frame);
+   public void testSubscribeWithID() throws Exception
+   {
 
-       sendMessage(getName());
+      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+      sendFrame(frame);
 
-       frame = receiveFrame(10000);
-       Assert.assertTrue(frame.startsWith("MESSAGE"));
-       Assert.assertTrue(frame.indexOf("destination:") > 0);
-       Assert.assertTrue(frame.indexOf("subscription:") > 0);
-       Assert.assertTrue(frame.indexOf(getName()) > 0);
+      frame = receiveFrame(100000);
+      Assert.assertTrue(frame.startsWith("CONNECTED"));
 
-       frame =
-               "DISCONNECT\n" +
-                       "\n\n" +
-                       Stomp.NULL;
-       sendFrame(frame);
+      frame = "SUBSCRIBE\n" + "destination:" +
+              getQueuePrefix() +
+              getQueueName() +
+              "\n" +
+              "ack:auto\n" +
+              "id: mysubid\n\n" +
+              Stomp.NULL;
+      sendFrame(frame);
+
+      sendMessage(getName());
+
+      frame = receiveFrame(10000);
+      Assert.assertTrue(frame.startsWith("MESSAGE"));
+      Assert.assertTrue(frame.indexOf("destination:") > 0);
+      Assert.assertTrue(frame.indexOf("subscription:") > 0);
+      Assert.assertTrue(frame.indexOf(getName()) > 0);
+
+      frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+      sendFrame(frame);
    }
 
-    public void testBodyWithUTF8() throws Exception {
+   public void testBodyWithUTF8() throws Exception
+   {
 
-       String frame =
-               "CONNECT\n" +
-                       "login: brianm\n" +
-                       "passcode: wombats\n\n" +
-                       Stomp.NULL;
-       sendFrame(frame);
+      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+      sendFrame(frame);
 
-       frame = receiveFrame(100000);
-       Assert.assertTrue(frame.startsWith("CONNECTED"));
+      frame = receiveFrame(100000);
+      Assert.assertTrue(frame.startsWith("CONNECTED"));
 
-       frame =
-               "SUBSCRIBE\n" +
-                       "destination:" + getQueuePrefix() + getQueueName() + "\n" +
-                       "ack:auto\n\n" +
-                       Stomp.NULL;
-       sendFrame(frame);
+      frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
+      sendFrame(frame);
 
-       String text = "A" + "\u00ea" + "\u00f1" + 
-              "\u00fc" + "C";
-       System.out.println(text);
-       sendMessage(text);
+      String text = "A" + "\u00ea" + "\u00f1" + "\u00fc" + "C";
+      System.out.println(text);
+      sendMessage(text);
 
-       frame = receiveFrame(10000);
-       System.out.println(frame);
-       Assert.assertTrue(frame.startsWith("MESSAGE"));
-       Assert.assertTrue(frame.indexOf("destination:") > 0);
-       Assert.assertTrue(frame.indexOf(text) > 0);
+      frame = receiveFrame(10000);
+      System.out.println(frame);
+      Assert.assertTrue(frame.startsWith("MESSAGE"));
+      Assert.assertTrue(frame.indexOf("destination:") > 0);
+      Assert.assertTrue(frame.indexOf(text) > 0);
 
-       frame =
-               "DISCONNECT\n" +
-                       "\n\n" +
-                       Stomp.NULL;
-       sendFrame(frame);
+      frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+      sendFrame(frame);
    }
-    
-    public void testMessagesAreInOrder() throws Exception {
-        int ctr = 10;
-        String[] data = new String[ctr];
 
-        String frame =
-                "CONNECT\n" +
-                        "login: brianm\n" +
-                        "passcode: wombats\n\n" +
-                        Stomp.NULL;
-        sendFrame(frame);
+   public void testMessagesAreInOrder() throws Exception
+   {
+      int ctr = 10;
+      String[] data = new String[ctr];
 
-        frame = receiveFrame(100000);
-        Assert.assertTrue(frame.startsWith("CONNECTED"));
+      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+      sendFrame(frame);
 
-        frame =
-                "SUBSCRIBE\n" +
-                        "destination:" + getQueuePrefix() + getQueueName() + "\n" +
-                        "ack:auto\n\n" +
-                        Stomp.NULL;
-        sendFrame(frame);
+      frame = receiveFrame(100000);
+      Assert.assertTrue(frame.startsWith("CONNECTED"));
 
-        for (int i = 0; i < ctr; ++i) {
-            data[i] = getName() + i;
-            sendMessage(data[i]);
-        }
+      frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
+      sendFrame(frame);
 
-        for (int i = 0; i < ctr; ++i) {
-            frame = receiveFrame(1000);
-            Assert.assertTrue("Message not in order", frame.indexOf(data[i]) >= 0);
-        }
+      for (int i = 0; i < ctr; ++i)
+      {
+         data[i] = getName() + i;
+         sendMessage(data[i]);
+      }
 
-        // sleep a while before publishing another set of messages
-        waitForFrameToTakeEffect();
+      for (int i = 0; i < ctr; ++i)
+      {
+         frame = receiveFrame(1000);
+         Assert.assertTrue("Message not in order", frame.indexOf(data[i]) >= 0);
+      }
 
-        for (int i = 0; i < ctr; ++i) {
-            data[i] = getName() + ":second:" + i;
-            sendMessage(data[i]);
-        }
+      // sleep a while before publishing another set of messages
+      waitForFrameToTakeEffect();
 
-        for (int i = 0; i < ctr; ++i) {
-            frame = receiveFrame(1000);
-            Assert.assertTrue("Message not in order", frame.indexOf(data[i]) >= 0);
-        }
+      for (int i = 0; i < ctr; ++i)
+      {
+         data[i] = getName() + ":second:" + i;
+         sendMessage(data[i]);
+      }
 
-        frame =
-                "DISCONNECT\n" +
-                        "\n\n" +
-                        Stomp.NULL;
-        sendFrame(frame);
-    }
+      for (int i = 0; i < ctr; ++i)
+      {
+         frame = receiveFrame(1000);
+         Assert.assertTrue("Message not in order", frame.indexOf(data[i]) >= 0);
+      }
 
-    public void testSubscribeWithAutoAckAndSelector() throws Exception {
+      frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+      sendFrame(frame);
+   }
 
-        String frame =
-                "CONNECT\n" +
-                        "login: brianm\n" +
-                        "passcode: wombats\n\n" +
-                        Stomp.NULL;
-        sendFrame(frame);
+   public void testSubscribeWithAutoAckAndSelector() throws Exception
+   {
 
-        frame = receiveFrame(100000);
-        Assert.assertTrue(frame.startsWith("CONNECTED"));
+      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+      sendFrame(frame);
 
-        frame =
-                "SUBSCRIBE\n" +
-                        "destination:" + getQueuePrefix() + getQueueName() + "\n" +
-                        "selector: foo = 'zzz'\n" +
-                        "ack:auto\n\n" +
-                        Stomp.NULL;
-        sendFrame(frame);
+      frame = receiveFrame(100000);
+      Assert.assertTrue(frame.startsWith("CONNECTED"));
 
-        sendMessage("Ignored message", "foo", "1234");
-        sendMessage("Real message", "foo", "zzz");
+      frame = "SUBSCRIBE\n" + "destination:" +
+              getQueuePrefix() +
+              getQueueName() +
+              "\n" +
+              "selector: foo = 'zzz'\n" +
+              "ack:auto\n\n" +
+              Stomp.NULL;
+      sendFrame(frame);
 
-        frame = receiveFrame(10000);
-        Assert.assertTrue(frame.startsWith("MESSAGE"));
-        Assert.assertTrue("Should have received the real message but got: " + frame, frame.indexOf("Real message") > 0);
+      sendMessage("Ignored message", "foo", "1234");
+      sendMessage("Real message", "foo", "zzz");
 
-        frame =
-                "DISCONNECT\n" +
-                        "\n\n" +
-                        Stomp.NULL;
-        sendFrame(frame);
-    }
+      frame = receiveFrame(10000);
+      Assert.assertTrue(frame.startsWith("MESSAGE"));
+      Assert.assertTrue("Should have received the real message but got: " + frame, frame.indexOf("Real message") > 0);
 
-    public void testSubscribeWithClientAck() throws Exception {
+      frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+      sendFrame(frame);
+   }
 
-       String frame =
-               "CONNECT\n" +
-                       "login: brianm\n" +
-                       "passcode: wombats\n\n" +
-                       Stomp.NULL;
-       sendFrame(frame);
+   public void testSubscribeWithClientAck() throws Exception
+   {
 
-       frame = receiveFrame(10000);
-       Assert.assertTrue(frame.startsWith("CONNECTED"));
+      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+      sendFrame(frame);
 
-       frame =
-               "SUBSCRIBE\n" +
-                       "destination:" + getQueuePrefix() + getQueueName() + "\n" +
-                       "ack:client\n\n" +
-                       Stomp.NULL;
+      frame = receiveFrame(10000);
+      Assert.assertTrue(frame.startsWith("CONNECTED"));
 
-       sendFrame(frame);
-       
-       sendMessage(getName());
-       frame = receiveFrame(10000);
-       Assert.assertTrue(frame.startsWith("MESSAGE"));
-       Pattern cl = Pattern.compile("message-id:\\s*(\\S+)", Pattern.CASE_INSENSITIVE);
-       Matcher cl_matcher = cl.matcher(frame);
-       Assert.assertTrue(cl_matcher.find());
-       String messageID = cl_matcher.group(1);
+      frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:client\n\n" + Stomp.NULL;
 
-       frame =
-          "ACK\n" +
-                  "message-id: " + messageID + "\n\n" +
-                  Stomp.NULL;
-       sendFrame(frame);
+      sendFrame(frame);
 
-       frame =
-               "DISCONNECT\n" +
-                       "\n\n" +
-                       Stomp.NULL;
-       sendFrame(frame);
+      sendMessage(getName());
+      frame = receiveFrame(10000);
+      Assert.assertTrue(frame.startsWith("MESSAGE"));
+      Pattern cl = Pattern.compile("message-id:\\s*(\\S+)", Pattern.CASE_INSENSITIVE);
+      Matcher cl_matcher = cl.matcher(frame);
+      Assert.assertTrue(cl_matcher.find());
+      String messageID = cl_matcher.group(1);
 
-       // message should not be received since message was acknowledged by the client
-       MessageConsumer consumer = session.createConsumer(queue);
-       Message message = consumer.receive(1000);
-       Assert.assertNull(message);
+      frame = "ACK\n" + "message-id: " + messageID + "\n\n" + Stomp.NULL;
+      sendFrame(frame);
+
+      frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+      sendFrame(frame);
+
+      // message should not be received since message was acknowledged by the client
+      MessageConsumer consumer = session.createConsumer(queue);
+      Message message = consumer.receive(1000);
+      Assert.assertNull(message);
    }
-    
-    public void testRedeliveryWithClientAck() throws Exception {
 
-        String frame =
-                "CONNECT\n" +
-                        "login: brianm\n" +
-                        "passcode: wombats\n\n" +
-                        Stomp.NULL;
-        sendFrame(frame);
+   public void testRedeliveryWithClientAck() throws Exception
+   {
 
-        frame = receiveFrame(10000);
-        Assert.assertTrue(frame.startsWith("CONNECTED"));
+      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+      sendFrame(frame);
 
-        frame =
-                "SUBSCRIBE\n" +
-                        "destination:" + getQueuePrefix() + getQueueName() + "\n" +
-                        "ack:client\n\n" +
-                        Stomp.NULL;
+      frame = receiveFrame(10000);
+      Assert.assertTrue(frame.startsWith("CONNECTED"));
 
-        sendFrame(frame);
-        
-        sendMessage(getName());
-        frame = receiveFrame(10000);
-        Assert.assertTrue(frame.startsWith("MESSAGE"));
+      frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:client\n\n" + Stomp.NULL;
 
-        frame =
-                "DISCONNECT\n" +
-                        "\n\n" +
-                        Stomp.NULL;
-        sendFrame(frame);
+      sendFrame(frame);
 
-        // message should be received since message was not acknowledged
-        MessageConsumer consumer = session.createConsumer(queue);
-        Message message = consumer.receive(1000);
-        Assert.assertNotNull(message);
-        Assert.assertTrue(message.getJMSRedelivered());
-    }
-    
-    public void testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithNoDisconnectFrame() throws Exception {
-        assertSubscribeWithClientAckThenConsumeWithAutoAck(false);
-    }
+      sendMessage(getName());
+      frame = receiveFrame(10000);
+      Assert.assertTrue(frame.startsWith("MESSAGE"));
 
-    public void testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithExplicitDisconnect() throws Exception {
-        assertSubscribeWithClientAckThenConsumeWithAutoAck(true);
-    }
+      frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+      sendFrame(frame);
 
-    protected void assertSubscribeWithClientAckThenConsumeWithAutoAck(boolean sendDisconnect) throws Exception {
+      // message should be received since message was not acknowledged
+      MessageConsumer consumer = session.createConsumer(queue);
+      Message message = consumer.receive(1000);
+      Assert.assertNotNull(message);
+      Assert.assertTrue(message.getJMSRedelivered());
+   }
 
-        String frame =
-                "CONNECT\n" +
-                        "login: brianm\n" +
-                        "passcode: wombats\n\n" +
-                        Stomp.NULL;
-        sendFrame(frame);
+   public void testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithNoDisconnectFrame() throws Exception
+   {
+      assertSubscribeWithClientAckThenConsumeWithAutoAck(false);
+   }
 
-        frame = receiveFrame(10000);
-        Assert.assertTrue(frame.startsWith("CONNECTED"));
+   public void testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithExplicitDisconnect() throws Exception
+   {
+      assertSubscribeWithClientAckThenConsumeWithAutoAck(true);
+   }
 
-        frame =
-                "SUBSCRIBE\n" +
-                        "destination:" + getQueuePrefix() + getQueueName() + "\n" +
-                        "ack:client\n\n" +
-                        Stomp.NULL;
+   protected void assertSubscribeWithClientAckThenConsumeWithAutoAck(boolean sendDisconnect) throws Exception
+   {
 
-        sendFrame(frame);
-        sendMessage(getName());
+      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+      sendFrame(frame);
 
-        frame = receiveFrame(10000);
-        Assert.assertTrue(frame.startsWith("MESSAGE"));
+      frame = receiveFrame(10000);
+      Assert.assertTrue(frame.startsWith("CONNECTED"));
 
-        log.info("Reconnecting!");
-        
-        if (sendDisconnect) {
-            frame =
-                    "DISCONNECT\n" +
-                            "\n\n" +
-                            Stomp.NULL;
-            sendFrame(frame);
-            waitForFrameToTakeEffect();
-            reconnect();
-        }
-        else {
-            reconnect(1000);
-            waitForFrameToTakeEffect();
-        }
+      frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:client\n\n" + Stomp.NULL;
 
+      sendFrame(frame);
+      sendMessage(getName());
 
-        // message should be received since message was not acknowledged
-        frame =
-                "CONNECT\n" +
-                        "login: brianm\n" +
-                        "passcode: wombats\n\n" +
-                        Stomp.NULL;
-        sendFrame(frame);
+      frame = receiveFrame(10000);
+      Assert.assertTrue(frame.startsWith("MESSAGE"));
 
-        frame = receiveFrame(10000);
-        Assert.assertTrue(frame.startsWith("CONNECTED"));
+      log.info("Reconnecting!");
 
-        frame =
-                "SUBSCRIBE\n" +
-                        "destination:" + getQueuePrefix() + getQueueName() + "\n\n" +
-                        Stomp.NULL;
+      if (sendDisconnect)
+      {
+         frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+         sendFrame(frame);
+         waitForFrameToTakeEffect();
+         reconnect();
+      }
+      else
+      {
+         reconnect(1000);
+         waitForFrameToTakeEffect();
+      }
 
-        sendFrame(frame);
+      // message should be received since message was not acknowledged
+      frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+      sendFrame(frame);
 
-        frame = receiveFrame(10000);
-        Assert.assertTrue(frame.startsWith("MESSAGE"));
+      frame = receiveFrame(10000);
+      Assert.assertTrue(frame.startsWith("CONNECTED"));
 
-        frame =
-                "DISCONNECT\n" +
-                        "\n\n" +
-                        Stomp.NULL;
-        sendFrame(frame);
-        waitForFrameToTakeEffect();
-        
-        // now lets make sure we don't see the message again
-        reconnect();
+      frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n\n" + Stomp.NULL;
 
-        frame =
-                "CONNECT\n" +
-                        "login: brianm\n" +
-                        "passcode: wombats\n\n" +
-                        Stomp.NULL;
-        sendFrame(frame);
+      sendFrame(frame);
 
-        frame = receiveFrame(10000);
-        Assert.assertTrue(frame.startsWith("CONNECTED"));
+      frame = receiveFrame(10000);
+      Assert.assertTrue(frame.startsWith("MESSAGE"));
 
-        frame =
-                "SUBSCRIBE\n" +
-                        "destination:" + getQueuePrefix() + getQueueName() + "\n" +
-                        "receipt: 1234\n\n" +
-                        Stomp.NULL;
+      frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+      sendFrame(frame);
+      waitForFrameToTakeEffect();
 
-        sendFrame(frame);
-        // wait for SUBSCRIBE's receipt
-        frame = receiveFrame(10000);
-        Assert.assertTrue(frame.startsWith("RECEIPT"));
+      // now lets make sure we don't see the message again
+      reconnect();
 
-        sendMessage("shouldBeNextMessage");
+      frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+      sendFrame(frame);
 
-        frame = receiveFrame(10000);
-        Assert.assertTrue(frame.startsWith("MESSAGE"));
-        System.out.println(frame);
-        Assert.assertTrue(frame.contains("shouldBeNextMessage"));
-    }
+      frame = receiveFrame(10000);
+      Assert.assertTrue(frame.startsWith("CONNECTED"));
 
-    public void testUnsubscribe() throws Exception {
+      frame = "SUBSCRIBE\n" + "destination:" +
+              getQueuePrefix() +
+              getQueueName() +
+              "\n" +
+              "receipt: 1234\n\n" +
+              Stomp.NULL;
 
-        String frame =
-                "CONNECT\n" +
-                        "login: brianm\n" +
-                        "passcode: wombats\n\n" +
-                        Stomp.NULL;
-        sendFrame(frame);
-        frame = receiveFrame(100000);
-        Assert.assertTrue(frame.startsWith("CONNECTED"));
+      sendFrame(frame);
+      // wait for SUBSCRIBE's receipt
+      frame = receiveFrame(10000);
+      Assert.assertTrue(frame.startsWith("RECEIPT"));
 
-        frame =
-                "SUBSCRIBE\n" +
-                        "destination:" + getQueuePrefix() + getQueueName() + "\n" +
-                        "ack:auto\n\n" +
-                        Stomp.NULL;
-        sendFrame(frame);
+      sendMessage("shouldBeNextMessage");
 
-        //send a message to our queue
-        sendMessage("first message");
+      frame = receiveFrame(10000);
+      Assert.assertTrue(frame.startsWith("MESSAGE"));
+      System.out.println(frame);
+      Assert.assertTrue(frame.contains("shouldBeNextMessage"));
+   }
 
-        //receive message from socket
-        frame = receiveFrame(10000);
-        Assert.assertTrue(frame.startsWith("MESSAGE"));
+   public void testUnsubscribe() throws Exception
+   {
 
-        //remove suscription
-        frame =
-                "UNSUBSCRIBE\n" +
-                        "destination:" + getQueuePrefix() + getQueueName() + "\n" +
-                        "receipt:567\n" +
-                        "\n\n" +
-                        Stomp.NULL;
-        sendFrame(frame);
-        waitForReceipt();
+      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+      sendFrame(frame);
+      frame = receiveFrame(100000);
+      Assert.assertTrue(frame.startsWith("CONNECTED"));
 
-        //send a message to our queue
-        sendMessage("second message");
+      frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
+      sendFrame(frame);
 
-        try {
-            frame = receiveFrame(1000);
-            log.info("Received frame: " + frame);
-            Assert.fail("No message should have been received since subscription was removed");
-        }
-        catch (SocketTimeoutException e) {
+      // send a message to our queue
+      sendMessage("first message");
 
-        }
-    }
+      // receive message from socket
+      frame = receiveFrame(10000);
+      Assert.assertTrue(frame.startsWith("MESSAGE"));
 
-    public void testUnsubscribeWithID() throws Exception {
+      // remove suscription
+      frame = "UNSUBSCRIBE\n" + "destination:" +
+              getQueuePrefix() +
+              getQueueName() +
+              "\n" +
+              "receipt:567\n" +
+              "\n\n" +
+              Stomp.NULL;
+      sendFrame(frame);
+      waitForReceipt();
 
-        String frame =
-                "CONNECT\n" +
-                        "login: brianm\n" +
-                        "passcode: wombats\n\n" +
-                        Stomp.NULL;
-        sendFrame(frame);
-        frame = receiveFrame(100000);
-        Assert.assertTrue(frame.startsWith("CONNECTED"));
+      // send a message to our queue
+      sendMessage("second message");
 
-        frame =
-                "SUBSCRIBE\n" +
-                        "destination:" + getQueuePrefix() + getQueueName() + "\n" +
-                        "id: mysubid\n" +
-                        "ack:auto\n\n" +
-                        Stomp.NULL;
-        sendFrame(frame);
+      try
+      {
+         frame = receiveFrame(1000);
+         log.info("Received frame: " + frame);
+         Assert.fail("No message should have been received since subscription was removed");
+      }
+      catch (SocketTimeoutException e)
+      {
 
-        //send a message to our queue
-        sendMessage("first message");
+      }
+   }
 
-        //receive message from socket
-        frame = receiveFrame(10000);
-        Assert.assertTrue(frame.startsWith("MESSAGE"));
+   public void testUnsubscribeWithID() throws Exception
+   {
 
-        //remove suscription
-        frame =
-                "UNSUBSCRIBE\n" +
-                        "id:mysubid\n" +
-                        "receipt: 345\n" +
-                        "\n\n" +
-                        Stomp.NULL;
-        sendFrame(frame);
-        waitForReceipt();
+      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+      sendFrame(frame);
+      frame = receiveFrame(100000);
+      Assert.assertTrue(frame.startsWith("CONNECTED"));
 
-        //send a message to our queue
-        sendMessage("second message");
+      frame = "SUBSCRIBE\n" + "destination:" +
+              getQueuePrefix() +
+              getQueueName() +
+              "\n" +
+              "id: mysubid\n" +
+              "ack:auto\n\n" +
+              Stomp.NULL;
+      sendFrame(frame);
 
-        try {
-            frame = receiveFrame(1000);
-            log.info("Received frame: " + frame);
-            Assert.fail("No message should have been received since subscription was removed");
-        }
-        catch (SocketTimeoutException e) {
+      // send a message to our queue
+      sendMessage("first message");
 
-        }
-    }
+      // receive message from socket
+      frame = receiveFrame(10000);
+      Assert.assertTrue(frame.startsWith("MESSAGE"));
 
-    public void testTransactionCommit() throws Exception {
-        MessageConsumer consumer = session.createConsumer(queue);
+      // remove suscription
+      frame = "UNSUBSCRIBE\n" + "id:mysubid\n" + "receipt: 345\n" + "\n\n" + Stomp.NULL;
+      sendFrame(frame);
+      waitForReceipt();
 
-        String frame =
-                "CONNECT\n" +
-                        "login: brianm\n" +
-                        "passcode: wombats\n\n" +
-                        Stomp.NULL;
-        sendFrame(frame);
+      // send a message to our queue
+      sendMessage("second message");
 
-        String f = receiveFrame(1000);
-        Assert.assertTrue(f.startsWith("CONNECTED"));
+      try
+      {
+         frame = receiveFrame(1000);
+         log.info("Received frame: " + frame);
+         Assert.fail("No message should have been received since subscription was removed");
+      }
+      catch (SocketTimeoutException e)
+      {
 
-        frame =
-                "BEGIN\n" +
-                        "transaction: tx1\n" +
-                        "\n\n" +
-                        Stomp.NULL;
-        sendFrame(frame);
+      }
+   }
 
-        frame =
-                "SEND\n" +
-                        "destination:" + getQueuePrefix() + getQueueName() + "\n" +
-                        "transaction: tx1\n" +
-                        "receipt: 123\n" +
-                        "\n\n" +
-                        "Hello World" +
-                        Stomp.NULL;
-        sendFrame(frame);
-        waitForReceipt();
-        
-        // check the message is not committed
-        assertNull(consumer.receive(100));
-        
-        frame =
-                "COMMIT\n" +
-                        "transaction: tx1\n" +
-                        "receipt:456\n" +
-                        "\n\n" +
-                        Stomp.NULL;
-        sendFrame(frame);
-        waitForReceipt();
+   public void testTransactionCommit() throws Exception
+   {
+      MessageConsumer consumer = session.createConsumer(queue);
 
-        Message message = consumer.receive(1000);
-        Assert.assertNotNull("Should have received a message", message);
-    }
-    
-    public void testSuccessiveTransactionsWithSameID() throws Exception {
-       MessageConsumer consumer = session.createConsumer(queue);
+      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+      sendFrame(frame);
 
-       String frame =
-               "CONNECT\n" +
-                       "login: brianm\n" +
-                       "passcode: wombats\n\n" +
-                       Stomp.NULL;
-       sendFrame(frame);
+      String f = receiveFrame(1000);
+      Assert.assertTrue(f.startsWith("CONNECTED"));
 
-       String f = receiveFrame(1000);
-       Assert.assertTrue(f.startsWith("CONNECTED"));
+      frame = "BEGIN\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
+      sendFrame(frame);
 
-       // first tx
-       frame =
-               "BEGIN\n" +
-                       "transaction: tx1\n" +
-                       "\n\n" +
-                       Stomp.NULL;
-       sendFrame(frame);
+      frame = "SEND\n" + "destination:" +
+              getQueuePrefix() +
+              getQueueName() +
+              "\n" +
+              "transaction: tx1\n" +
+              "receipt: 123\n" +
+              "\n\n" +
+              "Hello World" +
+              Stomp.NULL;
+      sendFrame(frame);
+      waitForReceipt();
 
-       frame =
-               "SEND\n" +
-                       "destination:" + getQueuePrefix() + getQueueName() + "\n" +
-                       "transaction: tx1\n" +
-                       "\n\n" +
-                       "Hello World" +
-                       Stomp.NULL;
-       sendFrame(frame);
+      // check the message is not committed
+      assertNull(consumer.receive(100));
 
-       frame =
-               "COMMIT\n" +
-                       "transaction: tx1\n" +
-                       "\n\n" +
-                       Stomp.NULL;
-       sendFrame(frame);
+      frame = "COMMIT\n" + "transaction: tx1\n" + "receipt:456\n" + "\n\n" + Stomp.NULL;
+      sendFrame(frame);
+      waitForReceipt();
 
-       Message message = consumer.receive(1000);
-       Assert.assertNotNull("Should have received a message", message);
+      Message message = consumer.receive(1000);
+      Assert.assertNotNull("Should have received a message", message);
+   }
 
-       // 2nd tx with same tx ID
-       frame =
-               "BEGIN\n" +
-                       "transaction: tx1\n" +
-                       "\n\n" +
-                       Stomp.NULL;
-       sendFrame(frame);
+   public void testSuccessiveTransactionsWithSameID() throws Exception
+   {
+      MessageConsumer consumer = session.createConsumer(queue);
 
-       frame =
-               "SEND\n" +
-                       "destination:" + getQueuePrefix() + getQueueName() + "\n" +
-                       "transaction: tx1\n" +
-                       "\n\n" +
-                       "Hello World" +
-                       Stomp.NULL;
-       sendFrame(frame);
+      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+      sendFrame(frame);
 
-       frame =
-               "COMMIT\n" +
-                       "transaction: tx1\n" +
-                       "\n\n" +
-                       Stomp.NULL;
-       sendFrame(frame);
+      String f = receiveFrame(1000);
+      Assert.assertTrue(f.startsWith("CONNECTED"));
 
-       message = consumer.receive(1000);
-       Assert.assertNotNull("Should have received a message", message);
-}
-    
-    public void testBeginSameTransactionTwice() throws Exception {
-       String frame =
-               "CONNECT\n" +
-                       "login: brianm\n" +
-                       "passcode: wombats\n\n" +
-                       Stomp.NULL;
-       sendFrame(frame);
+      // first tx
+      frame = "BEGIN\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
+      sendFrame(frame);
 
-       String f = receiveFrame(1000);
-       Assert.assertTrue(f.startsWith("CONNECTED"));
+      frame = "SEND\n" + "destination:" +
+              getQueuePrefix() +
+              getQueueName() +
+              "\n" +
+              "transaction: tx1\n" +
+              "\n\n" +
+              "Hello World" +
+              Stomp.NULL;
+      sendFrame(frame);
 
-       frame =
-               "BEGIN\n" +
-                       "transaction: tx1\n" +
-                       "\n\n" +
-                       Stomp.NULL;
-       sendFrame(frame);
+      frame = "COMMIT\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
+      sendFrame(frame);
 
-       // begin the tx a 2nd time
-       frame =
-          "BEGIN\n" +
-                  "transaction: tx1\n" +
-                  "\n\n" +
-                  Stomp.NULL;
-       sendFrame(frame);
+      Message message = consumer.receive(1000);
+      Assert.assertNotNull("Should have received a message", message);
 
-       f = receiveFrame(1000);
-       Assert.assertTrue(f.startsWith("ERROR"));
+      // 2nd tx with same tx ID
+      frame = "BEGIN\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
+      sendFrame(frame);
 
+      frame = "SEND\n" + "destination:" +
+              getQueuePrefix() +
+              getQueueName() +
+              "\n" +
+              "transaction: tx1\n" +
+              "\n\n" +
+              "Hello World" +
+              Stomp.NULL;
+      sendFrame(frame);
+
+      frame = "COMMIT\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
+      sendFrame(frame);
+
+      message = consumer.receive(1000);
+      Assert.assertNotNull("Should have received a message", message);
    }
 
-    public void testTransactionRollback() throws Exception {
-        MessageConsumer consumer = session.createConsumer(queue);
+   public void testBeginSameTransactionTwice() throws Exception
+   {
+      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+      sendFrame(frame);
 
-        String frame =
-                "CONNECT\n" +
-                        "login: brianm\n" +
-                        "passcode: wombats\n\n" +
-                        Stomp.NULL;
-        sendFrame(frame);
+      String f = receiveFrame(1000);
+      Assert.assertTrue(f.startsWith("CONNECTED"));
 
-        String f = receiveFrame(1000);
-        Assert.assertTrue(f.startsWith("CONNECTED"));
+      frame = "BEGIN\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
+      sendFrame(frame);
 
-        frame =
-                "BEGIN\n" +
-                        "transaction: tx1\n" +
-                        "\n\n" +
-                        Stomp.NULL;
-        sendFrame(frame);
+      // begin the tx a 2nd time
+      frame = "BEGIN\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
+      sendFrame(frame);
 
-        frame =
-                "SEND\n" +
-                        "destination:" + getQueuePrefix() + getQueueName() + "\n" +
-                        "transaction: tx1\n" +
-                        "\n" +
-                        "first message" +
-                        Stomp.NULL;
-        sendFrame(frame);
+      f = receiveFrame(1000);
+      Assert.assertTrue(f.startsWith("ERROR"));
 
-        //rollback first message
-        frame =
-                "ABORT\n" +
-                        "transaction: tx1\n" +
-                        "\n\n" +
-                        Stomp.NULL;
-        sendFrame(frame);
+   }
 
-        frame =
-                "BEGIN\n" +
-                        "transaction: tx1\n" +
-                        "\n\n" +
-                        Stomp.NULL;
-        sendFrame(frame);
+   public void testTransactionRollback() throws Exception
+   {
+      MessageConsumer consumer = session.createConsumer(queue);
 
-        frame =
-                "SEND\n" +
-                        "destination:" + getQueuePrefix() + getQueueName() + "\n" +
-                        "transaction: tx1\n" +
-                        "\n" +
-                        "second message" +
-                        Stomp.NULL;
-        sendFrame(frame);
+      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+      sendFrame(frame);
 
-        frame =
-                "COMMIT\n" +
-                        "transaction: tx1\n" +
-                        "receipt:789\n" +
-                        "\n\n" +
-                        Stomp.NULL;
-        sendFrame(frame);
-        waitForReceipt();
+      String f = receiveFrame(1000);
+      Assert.assertTrue(f.startsWith("CONNECTED"));
 
-        //only second msg should be received since first msg was rolled back
-        BytesMessage message = (BytesMessage) consumer.receive(1000);
-        Assert.assertNotNull(message);
-        Assert.assertEquals("second message", readContent(message));
-    }
-    
-    public void testSubscribeToTopic() throws Exception {
+      frame = "BEGIN\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
+      sendFrame(frame);
 
-       String frame =
-               "CONNECT\n" +
-                       "login: brianm\n" +
-                       "passcode: wombats\n\n" +
-                       Stomp.NULL;
-       sendFrame(frame);
+      frame = "SEND\n" + "destination:" +
+              getQueuePrefix() +
+              getQueueName() +
+              "\n" +
+              "transaction: tx1\n" +
+              "\n" +
+              "first message" +
+              Stomp.NULL;
+      sendFrame(frame);
 
-       frame = receiveFrame(100000);
-       Assert.assertTrue(frame.startsWith("CONNECTED"));
+      // rollback first message
+      frame = "ABORT\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
+      sendFrame(frame);
 
-       frame =
-               "SUBSCRIBE\n" +
-                       "destination:" + getTopicPrefix() + getTopicName() + "\n" +
-                       "receipt: 12\n" +
-                       "\n\n" +
-                       Stomp.NULL;
-       sendFrame(frame);
-       // wait for SUBSCRIBE's receipt
-       frame = receiveFrame(10000);
-       Assert.assertTrue(frame.startsWith("RECEIPT"));
+      frame = "BEGIN\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
+      sendFrame(frame);
 
-       sendMessage(getName(), topic);
+      frame = "SEND\n" + "destination:" +
+              getQueuePrefix() +
+              getQueueName() +
+              "\n" +
+              "transaction: tx1\n" +
+              "\n" +
+              "second message" +
+              Stomp.NULL;
+      sendFrame(frame);
 
-       frame = receiveFrame(10000);
-       Assert.assertTrue(frame.startsWith("MESSAGE"));
-       Assert.assertTrue(frame.indexOf("destination:") > 0);
-       Assert.assertTrue(frame.indexOf(getName()) > 0);
+      frame = "COMMIT\n" + "transaction: tx1\n" + "receipt:789\n" + "\n\n" + Stomp.NULL;
+      sendFrame(frame);
+      waitForReceipt();
 
-       frame =
-          "UNSUBSCRIBE\n" +
-                  "destination:" + getTopicPrefix() + getTopicName() + "\n" +
-                  "receipt: 1234\n" +
-                  "\n\n" +
-                  Stomp.NULL;
-       sendFrame(frame);
-       // wait for UNSUBSCRIBE's receipt
-       frame = receiveFrame(10000);
-       Assert.assertTrue(frame.startsWith("RECEIPT"));
-  
-       sendMessage(getName(), topic);
+      // only second msg should be received since first msg was rolled back
+      BytesMessage message = (BytesMessage)consumer.receive(1000);
+      Assert.assertNotNull(message);
+      Assert.assertEquals("second message", readContent(message));
+   }
 
-       try {
-          frame = receiveFrame(1000);
-          log.info("Received frame: " + frame);
-          Assert.fail("No message should have been received since subscription was removed");
+   public void testSubscribeToTopic() throws Exception
+   {
+
+      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+      sendFrame(frame);
+
+      frame = receiveFrame(100000);
+      Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+      frame = "SUBSCRIBE\n" + "destination:" +
+              getTopicPrefix() +
+              getTopicName() +
+              "\n" +
+              "receipt: 12\n" +
+              "\n\n" +
+              Stomp.NULL;
+      sendFrame(frame);
+      // wait for SUBSCRIBE's receipt
+      frame = receiveFrame(10000);
+      Assert.assertTrue(frame.startsWith("RECEIPT"));
+
+      sendMessage(getName(), topic);
+
+      frame = receiveFrame(10000);
+      Assert.assertTrue(frame.startsWith("MESSAGE"));
+      Assert.assertTrue(frame.indexOf("destination:") > 0);
+      Assert.assertTrue(frame.indexOf(getName()) > 0);
+
+      frame = "UNSUBSCRIBE\n" + "destination:" +
+              getTopicPrefix() +
+              getTopicName() +
+              "\n" +
+              "receipt: 1234\n" +
+              "\n\n" +
+              Stomp.NULL;
+      sendFrame(frame);
+      // wait for UNSUBSCRIBE's receipt
+      frame = receiveFrame(10000);
+      Assert.assertTrue(frame.startsWith("RECEIPT"));
+
+      sendMessage(getName(), topic);
+
+      try
+      {
+         frame = receiveFrame(1000);
+         log.info("Received frame: " + frame);
+         Assert.fail("No message should have been received since subscription was removed");
       }
-      catch (SocketTimeoutException e) {
+      catch (SocketTimeoutException e)
+      {
 
       }
-       
-      frame =
-               "DISCONNECT\n" +
-                       "\n\n" +
-                       Stomp.NULL;
-       sendFrame(frame);
+
+      frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+      sendFrame(frame);
    }
-    
-    public void testDurableSubscriberWithReconnection() throws Exception {
 
-       String connectFame =
-               "CONNECT\n" +
-                       "login: brianm\n" +
-                       "passcode: wombats\n" +
-                       "client-id: myclientid\n\n" +
-                       Stomp.NULL;
-       sendFrame(connectFame);
+   public void testDurableSubscriberWithReconnection() throws Exception
+   {
 
-       String frame = receiveFrame(100000);
-       Assert.assertTrue(frame.startsWith("CONNECTED"));
+      String connectFame = "CONNECT\n" + "login: brianm\n" +
+                           "passcode: wombats\n" +
+                           "client-id: myclientid\n\n" +
+                           Stomp.NULL;
+      sendFrame(connectFame);
 
-       String subscribeFrame =
-               "SUBSCRIBE\n" +
-                       "destination:" + getTopicPrefix() + getTopicName() + "\n" +
-                       "durable-subscriber-name: " + getName() + "\n" + 
-                       "\n\n" +
-                       Stomp.NULL;
-       sendFrame(subscribeFrame);
-       waitForFrameToTakeEffect();
+      String frame = receiveFrame(100000);
+      Assert.assertTrue(frame.startsWith("CONNECTED"));
 
-       String disconnectFrame =
-          "DISCONNECT\n" +
-                  "\n\n" +
-                  Stomp.NULL;
-       sendFrame(disconnectFrame);
-       waitForFrameToTakeEffect();
-       
-       // send the message when the durable subscriber is disconnected
-       sendMessage(getName(), topic);
+      String subscribeFrame = "SUBSCRIBE\n" + "destination:" +
+                              getTopicPrefix() +
+                              getTopicName() +
+                              "\n" +
+                              "durable-subscriber-name: " +
+                              getName() +
+                              "\n" +
+                              "\n\n" +
+                              Stomp.NULL;
+      sendFrame(subscribeFrame);
+      waitForFrameToTakeEffect();
 
-       reconnect(1000);
-       sendFrame(connectFame);
-       frame = receiveFrame(100000);
-       Assert.assertTrue(frame.startsWith("CONNECTED"));
-       
-       sendFrame(subscribeFrame);
+      String disconnectFrame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+      sendFrame(disconnectFrame);
+      waitForFrameToTakeEffect();
 
-       // we must have received the message 
-       frame = receiveFrame(10000);
-       Assert.assertTrue(frame.startsWith("MESSAGE"));
-       Assert.assertTrue(frame.indexOf("destination:") > 0);
-       Assert.assertTrue(frame.indexOf(getName()) > 0);
+      // send the message when the durable subscriber is disconnected
+      sendMessage(getName(), topic);
 
-       String unsubscribeFrame =
-          "UNSUBSCRIBE\n" +
-                  "destination:" + getTopicPrefix() + getTopicName() + "\n" +
-                  "receipt: 1234\n" +
-                  "\n\n" +
-                  Stomp.NULL;
-       sendFrame(unsubscribeFrame);
-       // wait for UNSUBSCRIBE's receipt
-       frame = receiveFrame(10000);
-       Assert.assertTrue(frame.startsWith("RECEIPT"));
-       
-       sendFrame(disconnectFrame);
+      reconnect(1000);
+      sendFrame(connectFame);
+      frame = receiveFrame(100000);
+      Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+      sendFrame(subscribeFrame);
+
+      // we must have received the message
+      frame = receiveFrame(10000);
+      Assert.assertTrue(frame.startsWith("MESSAGE"));
+      Assert.assertTrue(frame.indexOf("destination:") > 0);
+      Assert.assertTrue(frame.indexOf(getName()) > 0);
+
+      String unsubscribeFrame = "UNSUBSCRIBE\n" + "destination:" +
+                                getTopicPrefix() +
+                                getTopicName() +
+                                "\n" +
+                                "receipt: 1234\n" +
+                                "\n\n" +
+                                Stomp.NULL;
+      sendFrame(unsubscribeFrame);
+      // wait for UNSUBSCRIBE's receipt
+      frame = receiveFrame(10000);
+      Assert.assertTrue(frame.startsWith("RECEIPT"));
+
+      sendFrame(disconnectFrame);
    }
-    
-    public void testDurableSubscriber() throws Exception {
 
-       String frame =
-               "CONNECT\n" +
-                       "login: brianm\n" +
-                       "passcode: wombats\n" +
-                       "client-id: myclientid\n\n" +
-                       Stomp.NULL;
-       sendFrame(frame);
+   public void testDurableSubscriber() throws Exception
+   {
 
-       frame = receiveFrame(100000);
-       Assert.assertTrue(frame.startsWith("CONNECTED"));
+      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "client-id: myclientid\n\n" + Stomp.NULL;
+      sendFrame(frame);
 
-       String subscribeFrame =
-               "SUBSCRIBE\n" +
-                       "destination:" + getTopicPrefix() + getTopicName() + "\n" +
-                       "receipt: 12\n" +
-                       "durable-subscriber-name: " + getName() + "\n" + 
-                       "\n\n" +
-                       Stomp.NULL;
-       sendFrame(subscribeFrame);
-       // wait for SUBSCRIBE's receipt
-       frame = receiveFrame(10000);
-       Assert.assertTrue(frame.startsWith("RECEIPT"));
+      frame = receiveFrame(100000);
+      Assert.assertTrue(frame.startsWith("CONNECTED"));
 
-       // creating a subscriber with the same durable-subscriber-name must fail
-       sendFrame(subscribeFrame);
-       frame = receiveFrame(10000);
-       Assert.assertTrue(frame.startsWith("ERROR"));
-       
-       frame =
-               "DISCONNECT\n" +
-                       "\n\n" +
-                       Stomp.NULL;
-       sendFrame(frame);
+      String subscribeFrame = "SUBSCRIBE\n" + "destination:" +
+                              getTopicPrefix() +
+                              getTopicName() +
+                              "\n" +
+                              "receipt: 12\n" +
+                              "durable-subscriber-name: " +
+                              getName() +
+                              "\n" +
+                              "\n\n" +
+                              Stomp.NULL;
+      sendFrame(subscribeFrame);
+      // wait for SUBSCRIBE's receipt
+      frame = receiveFrame(10000);
+      Assert.assertTrue(frame.startsWith("RECEIPT"));
+
+      // creating a subscriber with the same durable-subscriber-name must fail
+      sendFrame(subscribeFrame);
+      frame = receiveFrame(10000);
+      Assert.assertTrue(frame.startsWith("ERROR"));
+
+      frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+      sendFrame(frame);
    }
-    
-    public void testSubscribeToTopicWithNoLocal() throws Exception {
 
-       String frame =
-               "CONNECT\n" +
-                       "login: brianm\n" +
-                       "passcode: wombats\n\n" +
-                       Stomp.NULL;
-       sendFrame(frame);
+   public void testSubscribeToTopicWithNoLocal() throws Exception
+   {
 
-       frame = receiveFrame(100000);
-       Assert.assertTrue(frame.startsWith("CONNECTED"));
+      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+      sendFrame(frame);
 
-       frame =
-               "SUBSCRIBE\n" +
-                       "destination:" + getTopicPrefix() + getTopicName() + "\n" +
-                       "receipt: 12\n" +
-                       "no-local: true\n" +
-                       "\n\n" +
-                       Stomp.NULL;
-       sendFrame(frame);
-       // wait for SUBSCRIBE's receipt
-       frame = receiveFrame(10000);
-       Assert.assertTrue(frame.startsWith("RECEIPT"));
+      frame = receiveFrame(100000);
+      Assert.assertTrue(frame.startsWith("CONNECTED"));
 
-       // send a message on the same connection => it should not be received
-       frame = "SEND\n" +
-          "destination:" + getTopicPrefix() + getTopicName() + "\n\n" +
-                  "Hello World" +
-                  Stomp.NULL;
-       sendFrame(frame);
-  
-       try {
-          frame = receiveFrame(2000);
-          log.info("Received frame: " + frame);
-          Assert.fail("No message should have been received since subscription is noLocal");
+      frame = "SUBSCRIBE\n" + "destination:" +
+              getTopicPrefix() +
+              getTopicName() +
+              "\n" +
+              "receipt: 12\n" +
+              "no-local: true\n" +
+              "\n\n" +
+              Stomp.NULL;
+      sendFrame(frame);
+      // wait for SUBSCRIBE's receipt
+      frame = receiveFrame(10000);
+      Assert.assertTrue(frame.startsWith("RECEIPT"));
+
+      // send a message on the same connection => it should not be received
+      frame = "SEND\n" + "destination:" + getTopicPrefix() + getTopicName() + "\n\n" + "Hello World" + Stomp.NULL;
+      sendFrame(frame);
+
+      try
+      {
+         frame = receiveFrame(2000);
+         log.info("Received frame: " + frame);
+         Assert.fail("No message should have been received since subscription is noLocal");
       }
-      catch (SocketTimeoutException e) {
+      catch (SocketTimeoutException e)
+      {
       }
-      
+
       // send message on another JMS connection => it should be received
       sendMessage(getName(), topic);
       frame = receiveFrame(10000);
       Assert.assertTrue(frame.startsWith("MESSAGE"));
       Assert.assertTrue(frame.indexOf("destination:") > 0);
       Assert.assertTrue(frame.indexOf(getName()) > 0);
-      
-      frame =
-               "DISCONNECT\n" +
-                       "\n\n" +
-                       Stomp.NULL;
-       sendFrame(frame);
+
+      frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+      sendFrame(frame);
    }
-    
-    public void testClientAckNotPartOfTransaction() throws Exception {
 
-       String frame =
-               "CONNECT\n" +
-                       "login: brianm\n" +
-                       "passcode: wombats\n\n" +
-                       Stomp.NULL;
-       sendFrame(frame);
+   public void testClientAckNotPartOfTransaction() throws Exception
+   {
 
-       frame = receiveFrame(100000);
-       Assert.assertTrue(frame.startsWith("CONNECTED"));
+      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+      sendFrame(frame);
 
-       frame =
-               "SUBSCRIBE\n" +
-                       "destination:" + getQueuePrefix() + getQueueName() + "\n" +
-                       "ack:client\n" +
-                       "\n\n" +
-                       Stomp.NULL;
-       sendFrame(frame);
+      frame = receiveFrame(100000);
+      Assert.assertTrue(frame.startsWith("CONNECTED"));
 
-       sendMessage(getName());
+      frame = "SUBSCRIBE\n" + "destination:" +
+              getQueuePrefix() +
+              getQueueName() +
+              "\n" +
+              "ack:client\n" +
+              "\n\n" +
+              Stomp.NULL;
+      sendFrame(frame);
 
-       frame = receiveFrame(10000);
-       Assert.assertTrue(frame.startsWith("MESSAGE"));
-       Assert.assertTrue(frame.indexOf("destination:") > 0);
-       Assert.assertTrue(frame.indexOf(getName()) > 0);
-       Assert.assertTrue(frame.indexOf("message-id:") > 0);
-       Pattern cl = Pattern.compile("message-id:\\s*(\\S+)", Pattern.CASE_INSENSITIVE);
-       Matcher cl_matcher = cl.matcher(frame);
-       Assert.assertTrue(cl_matcher.find());
-       String messageID = cl_matcher.group(1);
+      sendMessage(getName());
 
-       frame =
-          "BEGIN\n" +
-                  "transaction: tx1\n" +
-                  "\n\n" +
-                  Stomp.NULL;
-       sendFrame(frame);
+      frame = receiveFrame(10000);
+      Assert.assertTrue(frame.startsWith("MESSAGE"));
+      Assert.assertTrue(frame.indexOf("destination:") > 0);
+      Assert.assertTrue(frame.indexOf(getName()) > 0);
+      Assert.assertTrue(frame.indexOf("message-id:") > 0);
+      Pattern cl = Pattern.compile("message-id:\\s*(\\S+)", Pattern.CASE_INSENSITIVE);
+      Matcher cl_matcher = cl.matcher(frame);
+      Assert.assertTrue(cl_matcher.find());
+      String messageID = cl_matcher.group(1);
 
-       frame =
-          "ACK\n" +
-                  "message-id:" + messageID + "\n" +
-                  "transaction: tx1\n" +
-                  "\n" +
-                  "second message" +
-                  Stomp.NULL;
-       sendFrame(frame);
+      frame = "BEGIN\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
+      sendFrame(frame);
 
-       frame =
-          "ABORT\n" +
-                  "transaction: tx1\n" +
-                  "\n\n" +
-                  Stomp.NULL;
-       sendFrame(frame);
-  
-       try {
-           frame = receiveFrame(1000);
-           log.info("Received frame: " + frame);
-           Assert.fail("No message should have been received as the message was acked even though the transaction has been aborted");
-       }
-       catch (SocketTimeoutException e) {
-       }
-       
-       frame =
-          "UNSUBSCRIBE\n" +
-                  "destination:" + getQueuePrefix() + getQueueName() + "\n\n" +
-                  Stomp.NULL;
-       sendFrame(frame);
-       
-      frame =
-               "DISCONNECT\n" +
-                       "\n\n" +
-                       Stomp.NULL;
-       sendFrame(frame);
+      frame = "ACK\n" + "message-id:" + messageID + "\n" + "transaction: tx1\n" + "\n" + "second message" + Stomp.NULL;
+      sendFrame(frame);
+
+      frame = "ABORT\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
+      sendFrame(frame);
+
+      try
+      {
+         frame = receiveFrame(1000);
+         log.info("Received frame: " + frame);
+         Assert.fail("No message should have been received as the message was acked even though the transaction has been aborted");
+      }
+      catch (SocketTimeoutException e)
+      {
+      }
+
+      frame = "UNSUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n\n" + Stomp.NULL;
+      sendFrame(frame);
+
+      frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+      sendFrame(frame);
    }
-   
-    // Implementation methods
-    //-------------------------------------------------------------------------
-    protected void setUp() throws Exception {
-       super.setUp();
-       
-       server = createServer();
-       server.start();
-        connectionFactory = createConnectionFactory();
 
-        stompSocket = createSocket();
-        inputBuffer = new ByteArrayOutputStream();
+   // Implementation methods
+   // -------------------------------------------------------------------------
+   protected void setUp() throws Exception
+   {
+      super.setUp();
 
-        connection = connectionFactory.createConnection();
-        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        queue = session.createQueue(getQueueName());
-        topic = session.createTopic(getTopicName());
-        connection.start();
-    }
+      server = createServer();
+      server.start();
+      connectionFactory = createConnectionFactory();
 
-    /**
-    * @return
-    * @throws Exception 
-    */
+      stompSocket = createSocket();
+      inputBuffer = new ByteArrayOutputStream();
+
+      connection = connectionFactory.createConnection();
+      session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      queue = session.createQueue(getQueueName());
+      topic = session.createTopic(getTopicName());
+      connection.start();
+   }
+
+   /**
+   * @return
+   * @throws Exception 
+   */
    private JMSServerManager createServer() throws Exception
    {
       Configuration config = new ConfigurationImpl();
@@ -1479,149 +1283,178 @@
       TransportConfiguration stompTransport = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
       config.getAcceptorConfigurations().add(stompTransport);
       config.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
-       HornetQServer hornetQServer = HornetQServers.newHornetQServer(config);
-       
-       JMSConfiguration jmsConfig = new JMSConfigurationImpl();
-       jmsConfig.getQueueConfigurations().add(new JMSQueueConfigurationImpl(getQueueName(), null, false, getQueueName()));
-       jmsConfig.getTopicConfigurations().add(new TopicConfigurationImpl(getTopicName(), getTopicName()));
-       server = new JMSServerManagerImpl(hornetQServer, jmsConfig);
-       server.setContext(null);
-       return server;
+      HornetQServer hornetQServer = HornetQServers.newHornetQServer(config);
+
+      JMSConfiguration jmsConfig = new JMSConfigurationImpl();
+      jmsConfig.getQueueConfigurations()
+               .add(new JMSQueueConfigurationImpl(getQueueName(), null, false, getQueueName()));
+      jmsConfig.getTopicConfigurations().add(new TopicConfigurationImpl(getTopicName(), getTopicName()));
+      server = new JMSServerManagerImpl(hornetQServer, jmsConfig);
+      server.setContext(null);
+      return server;
    }
 
-   protected void tearDown() throws Exception {
-        connection.close();
-        if (stompSocket != null) {
-            stompSocket.close();
-        }
-        server.stop();
-        
-        super.tearDown();
-    }
+   protected void tearDown() throws Exception
+   {
+      connection.close();
+      if (stompSocket != null)
+      {
+         stompSocket.close();
+      }
+      server.stop();
 
-    protected void reconnect() throws Exception {
-        reconnect(0);
-    }
-    protected void reconnect(long sleep) throws Exception {
-        stompSocket.close();
+      super.tearDown();
+   }
 
-        if (sleep > 0) {
-            Thread.sleep(sleep);
-        }
+   protected void reconnect() throws Exception
+   {
+      reconnect(0);
+   }
 
-        stompSocket = createSocket();
-        inputBuffer = new ByteArrayOutputStream();
-    }
+   protected void reconnect(long sleep) throws Exception
+   {
+      stompSocket.close();
 
-    protected ConnectionFactory createConnectionFactory() {
-       return new HornetQConnectionFactory(new TransportConfiguration(InVMConnectorFactory.class.getName()));
-    }
+      if (sleep > 0)
+      {
+         Thread.sleep(sleep);
+      }
 
-    protected Socket createSocket() throws IOException {
-        return new Socket("127.0.0.1", port);
-    }
+      stompSocket = createSocket();
+      inputBuffer = new ByteArrayOutputStream();
+   }
 
-    protected String getQueueName() {
-        return "test";
-    }
+   protected ConnectionFactory createConnectionFactory()
+   {
+      return new HornetQConnectionFactory(new TransportConfiguration(InVMConnectorFactory.class.getName()));
+   }
 
-    protected String getQueuePrefix() {
-       return "jms.queue.";
+   protected Socket createSocket() throws IOException
+   {
+      return new Socket("127.0.0.1", port);
    }
-    
-    protected String getTopicName() {
-       return "testtopic";
+
+   protected String getQueueName()
+   {
+      return "test";
    }
 
-    protected String getTopicPrefix() {
-       return "jms.topic.";
+   protected String getQueuePrefix()
+   {
+      return "jms.queue.";
    }
 
-    public void sendFrame(String data) throws Exception {
-        byte[] bytes = data.getBytes("UTF-8");
-        OutputStream outputStream = stompSocket.getOutputStream();
-        for (int i = 0; i < bytes.length; i++) {
-            outputStream.write(bytes[i]);
-        }
-        outputStream.flush();
-    }
+   protected String getTopicName()
+   {
+      return "testtopic";
+   }
 
-    public void sendFrame(byte[] data) throws Exception {
-        OutputStream outputStream = stompSocket.getOutputStream();
-        for (int i = 0; i < data.length; i++) {
-            outputStream.write(data[i]);
-        }
-        outputStream.flush();
-    }
-    
-    public String receiveFrame(long timeOut) throws Exception {
-        stompSocket.setSoTimeout((int) timeOut);
-        InputStream is = stompSocket.getInputStream();
-        int c = 0;
-        for (; ;) {
+   protected String getTopicPrefix()
+   {
+      return "jms.topic.";
+   }
+
+   public void sendFrame(String data) throws Exception
+   {
+      byte[] bytes = data.getBytes("UTF-8");
+      OutputStream outputStream = stompSocket.getOutputStream();
+      for (int i = 0; i < bytes.length; i++)
+      {
+         outputStream.write(bytes[i]);
+      }
+      outputStream.flush();
+   }
+
+   public void sendFrame(byte[] data) throws Exception
+   {
+      OutputStream outputStream = stompSocket.getOutputStream();
+      for (int i = 0; i < data.length; i++)
+      {
+         outputStream.write(data[i]);
+      }
+      outputStream.flush();
+   }
+
+   public String receiveFrame(long timeOut) throws Exception
+   {
+      stompSocket.setSoTimeout((int)timeOut);
+      InputStream is = stompSocket.getInputStream();
+      int c = 0;
+      for (;;)
+      {
+         c = is.read();
+         if (c < 0)
+         {
+            throw new IOException("socket closed.");
+         }
+         else if (c == 0)
+         {
             c = is.read();
-            if (c < 0) {
-                throw new IOException("socket closed.");
+            if (c != '\n')
+            {
+               byte[] ba = inputBuffer.toByteArray();
+               System.out.println(new String(ba, "UTF-8"));
             }
-            else if (c == 0) {
-                c = is.read();
-                if (c != '\n')
-                {
-                   byte[] ba = inputBuffer.toByteArray();
-                   System.out.println(new String(ba, "UTF-8"));
-                }
-                Assert.assertEquals("Expecting stomp frame to terminate with \0\n", c, '\n');
-                byte[] ba = inputBuffer.toByteArray();
-                inputBuffer.reset();
-                return new String(ba, "UTF-8");
-            }
-            else {
-                inputBuffer.write(c);
-            }
-        }
-    }
+            Assert.assertEquals("Expecting stomp frame to terminate with \0\n", c, '\n');
+            byte[] ba = inputBuffer.toByteArray();
+            inputBuffer.reset();
+            return new String(ba, "UTF-8");
+         }
+         else
+         {
+            inputBuffer.write(c);
+         }
+      }
+   }
 
-    public void sendMessage(String msg) throws Exception {
-       sendMessage(msg.getBytes("UTF-8"), "foo", "xyz", queue);
+   public void sendMessage(String msg) throws Exception
+   {
+      sendMessage(msg.getBytes("UTF-8"), "foo", "xyz", queue);
    }
 
-    public void sendMessage(String msg, Destination destination) throws Exception {
-        sendMessage(msg.getBytes("UTF-8"), "foo", "xyz", destination);
-    }
+   public void sendMessage(String msg, Destination destination) throws Exception
+   {
+      sendMessage(msg.getBytes("UTF-8"), "foo", "xyz", destination);
+   }
 
-    public void sendMessage(byte[] data, Destination destination) throws Exception {
-       sendMessage(data, "foo", "xyz", destination);
+   public void sendMessage(byte[] data, Destination destination) throws Exception
+   {
+      sendMessage(data, "foo", "xyz", destination);
    }
-    
-    public void sendMessage(String msg, String propertyName, String propertyValue) throws Exception {
-       sendMessage(msg.getBytes("UTF-8"), propertyName, propertyValue, queue);
-    }
 
-    public void sendMessage(byte[] data, String propertyName, String propertyValue, Destination destination) throws Exception {
-        MessageProducer producer = session.createProducer(destination);
-        BytesMessage message = session.createBytesMessage();
-        message.setStringProperty(propertyName, propertyValue);
-        message.writeBytes(data);
-        producer.send(message);
-    }
-    
-    public String readContent(BytesMessage message) throws Exception
-    {
-       byte[] data = new byte[1024];
-       int size = message.readBytes(data);
-       return new String(data, 0, size, "UTF-8");
-    }
-    
-    protected void waitForReceipt() throws Exception {
-       String frame = receiveFrame(50000);
-       assertNotNull(frame);
-       assertTrue(frame.indexOf("RECEIPT") > -1);
+   public void sendMessage(String msg, String propertyName, String propertyValue) throws Exception
+   {
+      sendMessage(msg.getBytes("UTF-8"), propertyName, propertyValue, queue);
    }
-    
-    protected void waitForFrameToTakeEffect() throws InterruptedException {
-        // bit of a dirty hack :)
-        // another option would be to force some kind of receipt to be returned
-        // from the frame
-        Thread.sleep(2000);
-    }
+
+   public void sendMessage(byte[] data, String propertyName, String propertyValue, Destination destination) throws Exception
+   {
+      MessageProducer producer = session.createProducer(destination);
+      BytesMessage message = session.createBytesMessage();
+      message.setStringProperty(propertyName, propertyValue);
+      message.writeBytes(data);
+      producer.send(message);
+   }
+
+   public String readContent(BytesMessage message) throws Exception
+   {
+      byte[] data = new byte[1024];
+      int size = message.readBytes(data);
+      return new String(data, 0, size, "UTF-8");
+   }
+
+   protected void waitForReceipt() throws Exception
+   {
+      String frame = receiveFrame(50000);
+      assertNotNull(frame);
+      assertTrue(frame.indexOf("RECEIPT") > -1);
+   }
+
+   protected void waitForFrameToTakeEffect() throws InterruptedException
+   {
+      // bit of a dirty hack :)
+      // another option would be to force some kind of receipt to be returned
+      // from the frame
+      Thread.sleep(2000);
+   }
 }



More information about the hornetq-commits mailing list