[hornetq-commits] JBoss hornetq SVN: r11832 - in trunk: tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11 and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Dec 5 09:19:44 EST 2011


Author: gaohoward
Date: 2011-12-05 09:19:44 -0500 (Mon, 05 Dec 2011)
New Revision: 11832

Added:
   trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/ExtraStompTest.java
Modified:
   trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java
   trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestBase2.java
Log:
HORNETQ-726, HORNETQ-742



Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java	2011-12-05 14:19:17 UTC (rev 11831)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java	2011-12-05 14:19:44 UTC (rev 11832)
@@ -316,9 +316,12 @@
 
          int bodyPos = serverMessage.getEndOfBodyPosition() == -1 ? buffer
                .writerIndex() : serverMessage.getEndOfBodyPosition();
+
+         buffer.readerIndex(MessageImpl.BUFFER_HEADER_SPACE
+                     + DataConstants.SIZE_INT);
+         
          int size = bodyPos - buffer.readerIndex();
-         buffer.readerIndex(MessageImpl.BUFFER_HEADER_SPACE
-               + DataConstants.SIZE_INT);
+
          byte[] data = new byte[size];
 
          if (serverMessage.containsProperty(Stomp.Headers.CONTENT_LENGTH)
@@ -341,7 +344,10 @@
          }
          frame.setByteBody(data);
 
-         serverMessage.getBodyBuffer().resetReaderIndex();
+         // Reset indexes so they are in order when
+         // StompSession.sendMessage is called again with the same
+         // ServerMessage instance
+         buffer.setIndex(MessageImpl.BUFFER_HEADER_SPACE + DataConstants.SIZE_INT, bodyPos);
 
          StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame,
                deliveryCount);

Added: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/ExtraStompTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/ExtraStompTest.java	                        (rev 0)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/ExtraStompTest.java	2011-12-05 14:19:44 UTC (rev 11832)
@@ -0,0 +1,296 @@
+package org.hornetq.tests.integration.stomp.v11;
+
+import org.hornetq.tests.integration.stomp.util.ClientStompFrame;
+import org.hornetq.tests.integration.stomp.util.StompClientConnection;
+import org.hornetq.tests.integration.stomp.util.StompClientConnectionFactory;
+
+/*
+ * Some Stomp tests against server with persistence enabled are put here.
+ */
+public class ExtraStompTest extends StompTestBase2
+{
+   
+   protected void setUp() throws Exception
+   {
+      persistenceEnabled = true;
+      super.setUp();
+   }
+   
+   protected void tearDown() throws Exception
+   {
+      super.tearDown();
+   }
+
+   public void testSendAndReceive10() throws Exception
+   {
+      StompClientConnection connV11 = StompClientConnectionFactory.createClientConnection("1.0", hostname, port);
+
+      try
+      {
+         String msg1 = "Hello World 1!";
+         String msg2 = "Hello World 2!";
+         
+      connV11.connect(defUser, defPass);
+      
+      ClientStompFrame frame = connV11.createFrame("SEND");
+      frame.addHeader("destination", getQueuePrefix() + getQueueName());
+      frame.addHeader("content-length", String.valueOf(msg1.getBytes("UTF-8").length));
+      frame.addHeader("persistent", "true");
+      frame.setBody(msg1);
+      
+      connV11.sendFrame(frame);
+
+      ClientStompFrame frame2 = connV11.createFrame("SEND");
+      frame2.addHeader("destination", getQueuePrefix() + getQueueName());
+      frame2.addHeader("content-length", String.valueOf(msg2.getBytes("UTF-8").length));
+      frame2.addHeader("persistent", "true");
+      frame2.setBody(msg2);
+      
+      connV11.sendFrame(frame2);
+      
+      ClientStompFrame subFrame = connV11.createFrame("SUBSCRIBE");
+      subFrame.addHeader("id", "a-sub");
+      subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+      subFrame.addHeader("ack", "auto");
+      
+      connV11.sendFrame(subFrame);
+      
+      frame = connV11.receiveFrame();
+      
+      System.out.println("received " + frame);
+      
+      assertEquals("MESSAGE", frame.getCommand());
+      
+      assertEquals("a-sub", frame.getHeader("subscription"));
+      
+      assertNotNull(frame.getHeader("message-id"));
+      
+      assertEquals(getQueuePrefix() + getQueueName(), frame.getHeader("destination"));
+      
+      assertEquals(msg1, frame.getBody());
+      
+      frame = connV11.receiveFrame();
+      
+      System.out.println("received " + frame);      
+      
+      assertEquals("MESSAGE", frame.getCommand());
+      
+      assertEquals("a-sub", frame.getHeader("subscription"));
+      
+      assertNotNull(frame.getHeader("message-id"));
+      
+      assertEquals(getQueuePrefix() + getQueueName(), frame.getHeader("destination"));
+      
+      assertEquals(msg2, frame.getBody());
+      
+      //unsub
+      ClientStompFrame unsubFrame = connV11.createFrame("UNSUBSCRIBE");
+      unsubFrame.addHeader("id", "a-sub");
+      connV11.sendFrame(unsubFrame);
+      
+      }
+      finally
+      {
+         connV11.disconnect();
+      }
+   }
+
+   public void testSendAndReceive11() throws Exception
+   {
+      StompClientConnection connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+
+      try
+      {
+         String msg1 = "Hello World 1!";
+         String msg2 = "Hello World 2!";
+         
+      connV11.connect(defUser, defPass);
+      
+      ClientStompFrame frame = connV11.createFrame("SEND");
+      frame.addHeader("destination", getQueuePrefix() + getQueueName());
+      frame.addHeader("content-length", String.valueOf(msg1.getBytes("UTF-8").length));
+      frame.addHeader("persistent", "true");
+      frame.setBody(msg1);
+      
+      connV11.sendFrame(frame);
+
+      ClientStompFrame frame2 = connV11.createFrame("SEND");
+      frame2.addHeader("destination", getQueuePrefix() + getQueueName());
+      frame2.addHeader("content-length", String.valueOf(msg2.getBytes("UTF-8").length));
+      frame2.addHeader("persistent", "true");
+      frame2.setBody(msg2);
+      
+      connV11.sendFrame(frame2);
+      
+      ClientStompFrame subFrame = connV11.createFrame("SUBSCRIBE");
+      subFrame.addHeader("id", "a-sub");
+      subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+      subFrame.addHeader("ack", "auto");
+      
+      connV11.sendFrame(subFrame);
+      
+      frame = connV11.receiveFrame();
+      
+      System.out.println("received " + frame);
+      
+      assertEquals("MESSAGE", frame.getCommand());
+      
+      assertEquals("a-sub", frame.getHeader("subscription"));
+      
+      assertNotNull(frame.getHeader("message-id"));
+      
+      assertEquals(getQueuePrefix() + getQueueName(), frame.getHeader("destination"));
+      
+      assertEquals(msg1, frame.getBody());
+      
+      frame = connV11.receiveFrame();
+      
+      System.out.println("received " + frame);      
+      
+      assertEquals("MESSAGE", frame.getCommand());
+      
+      assertEquals("a-sub", frame.getHeader("subscription"));
+      
+      assertNotNull(frame.getHeader("message-id"));
+      
+      assertEquals(getQueuePrefix() + getQueueName(), frame.getHeader("destination"));
+      
+      assertEquals(msg2, frame.getBody());
+      
+      //unsub
+      ClientStompFrame unsubFrame = connV11.createFrame("UNSUBSCRIBE");
+      unsubFrame.addHeader("id", "a-sub");
+      connV11.sendFrame(unsubFrame);
+      
+      }
+      finally
+      {
+         connV11.disconnect();
+      }
+   }
+
+   public void testNoGarbageAfterPersistentMessageV10() throws Exception
+   {
+      StompClientConnection connV11 = StompClientConnectionFactory
+            .createClientConnection("1.0", hostname, port);
+      try
+      {
+         connV11.connect(defUser, defPass);
+
+         ClientStompFrame subFrame = connV11.createFrame("SUBSCRIBE");
+         subFrame.addHeader("id", "a-sub");
+         subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+         subFrame.addHeader("ack", "auto");
+
+         connV11.sendFrame(subFrame);
+
+         ClientStompFrame frame = connV11.createFrame("SEND");
+         frame.addHeader("destination", getQueuePrefix() + getQueueName());
+         frame.addHeader("content-length", "11");
+         frame.addHeader("persistent", "true");
+         frame.setBody("Hello World");
+
+         connV11.sendFrame(frame);
+
+         frame = connV11.createFrame("SEND");
+         frame.addHeader("destination", getQueuePrefix() + getQueueName());
+         frame.addHeader("content-length", "11");
+         frame.addHeader("persistent", "true");
+         frame.setBody("Hello World");
+
+         connV11.sendFrame(frame);
+
+         frame = connV11.receiveFrame(10000);
+         
+         System.out.println("received: " + frame);
+         
+         assertEquals("Hello World", frame.getBody());
+
+         //if hornetq sends trailing garbage bytes, the second message
+         //will not be normal
+         frame = connV11.receiveFrame(10000);
+         
+         System.out.println("received: " + frame);
+         
+         assertEquals("Hello World", frame.getBody());
+
+         //unsub
+         ClientStompFrame unsubFrame = connV11.createFrame("UNSUBSCRIBE");
+         unsubFrame.addHeader("id", "a-sub");
+         connV11.sendFrame(unsubFrame);
+      }
+      finally
+      {
+         connV11.disconnect();
+      }
+   }
+   
+   public void testNoGarbageOnPersistentRedeliveryV10() throws Exception
+   {
+      StompClientConnection connV11 = StompClientConnectionFactory
+            .createClientConnection("1.0", hostname, port);
+      try
+      {
+         connV11.connect(defUser, defPass);
+
+         ClientStompFrame frame = connV11.createFrame("SEND");
+         frame.addHeader("destination", getQueuePrefix() + getQueueName());
+         frame.addHeader("content-length", "11");
+         frame.addHeader("persistent", "true");
+         frame.setBody("Hello World");
+
+         connV11.sendFrame(frame);
+
+         frame = connV11.createFrame("SEND");
+         frame.addHeader("destination", getQueuePrefix() + getQueueName());
+         frame.addHeader("content-length", "11");
+         frame.addHeader("persistent", "true");
+         frame.setBody("Hello World");
+
+         connV11.sendFrame(frame);
+
+         ClientStompFrame subFrame = connV11.createFrame("SUBSCRIBE");
+         subFrame.addHeader("id", "a-sub");
+         subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+         subFrame.addHeader("ack", "client");
+
+         connV11.sendFrame(subFrame);
+
+         // receive but don't ack
+         frame = connV11.receiveFrame(10000);
+         frame = connV11.receiveFrame(10000);
+         
+         System.out.println("received: " + frame);
+
+         //unsub
+         ClientStompFrame unsubFrame = connV11.createFrame("UNSUBSCRIBE");
+         unsubFrame.addHeader("id", "a-sub");
+         connV11.sendFrame(unsubFrame);
+
+         subFrame = connV11.createFrame("SUBSCRIBE");
+         subFrame.addHeader("id", "a-sub");
+         subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+         subFrame.addHeader("ack", "auto");
+
+         connV11.sendFrame(subFrame);
+         
+         frame = connV11.receiveFrame(10000);
+         frame = connV11.receiveFrame(10000);
+         
+         //second receive will get problem if trailing bytes
+         assertEquals("Hello World", frame.getBody());
+         
+         System.out.println("received again: " + frame);
+
+         //unsub
+         unsubFrame = connV11.createFrame("UNSUBSCRIBE");
+         unsubFrame.addHeader("id", "a-sub");
+         connV11.sendFrame(unsubFrame);
+      }
+      finally
+      {
+         connV11.disconnect();
+      }
+   }
+
+}

Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestBase2.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestBase2.java	2011-12-05 14:19:17 UTC (rev 11831)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestBase2.java	2011-12-05 14:19:44 UTC (rev 11832)
@@ -74,7 +74,7 @@
    
    protected String defPass = "wombats";
    
-   
+   protected boolean persistenceEnabled = false;
 
    // Implementation methods
    // -------------------------------------------------------------------------
@@ -101,7 +101,9 @@
    {
       Configuration config = createBasicConfig();
       config.setSecurityEnabled(false);
-      config.setPersistenceEnabled(false);
+      config.setPersistenceEnabled(persistenceEnabled);
+      
+      System.out.println("-----------------server persist: " + persistenceEnabled);
 
       Map<String, Object> params = new HashMap<String, Object>();
       params.put(TransportConstants.PROTOCOL_PROP_NAME, ProtocolType.STOMP.toString());
@@ -113,7 +115,7 @@
 
       JMSConfiguration jmsConfig = new JMSConfigurationImpl();
       jmsConfig.getQueueConfigurations()
-               .add(new JMSQueueConfigurationImpl(getQueueName(), null, false, getQueueName()));
+               .add(new JMSQueueConfigurationImpl(getQueueName(), null, true, getQueueName()));
       jmsConfig.getTopicConfigurations().add(new TopicConfigurationImpl(getTopicName(), getTopicName()));
       server = new JMSServerManagerImpl(hornetQServer, jmsConfig);
       server.setContext(new InVMContext());



More information about the hornetq-commits mailing list