[hornetq-commits] JBoss hornetq SVN: r9359 - in trunk: tests/src/org/hornetq/tests/integration/cluster/bridge and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Jun 24 08:40:31 EDT 2010


Author: jmesnil
Date: 2010-06-24 08:40:30 -0400 (Thu, 24 Jun 2010)
New Revision: 9359

Added:
   trunk/tests/src/org/hornetq/tests/integration/management/ManagementWithStompTest.java
Modified:
   trunk/docs/user-manual/en/interoperability.xml
   trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithPagingTest.java
Log:
https://jira.jboss.org/browse/HORNETQ-343: Management operation results can not be read from Stomp clients

* store management operation results in the message body buffer as a nullable SimpleString (instead of a nullable String) so that it can be read by Stomp messages
* added ManagementWithStompTest
* fixed doc which contains erroneous explanation about Stomp/JMS/Core mapping

Modified: trunk/docs/user-manual/en/interoperability.xml
===================================================================
--- trunk/docs/user-manual/en/interoperability.xml	2010-06-24 03:39:11 UTC (rev 9358)
+++ trunk/docs/user-manual/en/interoperability.xml	2010-06-24 12:40:30 UTC (rev 9359)
@@ -94,13 +94,13 @@
                our Stomp implementation checks for presence of the <literal>content-length</literal> header to decide how to map a Stomp message
                to a JMS Message or a Core message.
              </para>
-             <para>If the Stomp message has a <literal>content-length</literal> header, it will be mapped to a JMS <emphasis>TextMessage</emphasis>
+             <para>If the Stomp message does <emphasis>not</emphasis> have a <literal>content-length</literal> header, it will be mapped to a JMS <emphasis>TextMessage</emphasis>
                or a Core message with a <emphasis>single nullable SimpleString in the body buffer</emphasis>.</para>
-             <para>Alternatively, if the Stomp message does <emphasis>not</emphasis> have a <literal>content-length</literal> header, 
+             <para>Alternatively, if the Stomp message <emphasis>has</emphasis> a <literal>content-length</literal> header, 
                it will be mapped to a JMS <emphasis>BytesMessage</emphasis>
                or a Core message with a <emphasis>byte[] in the body buffer</emphasis>.</para>
              <para>The same logic applies when mapping a JMS message or a Core message to Stomp. A Stomp client can check the presence
-                of the <literal>content-length</literal> header to determine the type of the message body (UTF-8 String or bytes).</para>
+                of the <literal>content-length</literal> header to determine the type of the message body (String or bytes).</para>
           </section>
         </section>
         

Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithPagingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithPagingTest.java	2010-06-24 03:39:11 UTC (rev 9358)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithPagingTest.java	2010-06-24 12:40:30 UTC (rev 9359)
@@ -76,9 +76,9 @@
    }
    
    // https://jira.jboss.org/browse/HORNETQ-382
-   public void _testReconnectWithPaging() throws Exception
+   public void testReconnectWithPaging() throws Exception
    {
-      final byte[] content = new byte[2048];
+      final byte[] content = new byte[2048]; // 2 kiB
       for (int i=0; i < content.length; ++i) {
           content[i] = (byte) i;
       }
@@ -108,7 +108,7 @@
       final long retryInterval = 50;
       final double retryIntervalMultiplier = 1d;
       final int reconnectAttempts = -1;
-      final int confirmationWindowSize = 1024;
+      final int confirmationWindowSize = 1024; // 1 kiB
 
       Pair<String, String> connectorPair = new Pair<String, String>(server1tc.getName(), null);
 
@@ -139,8 +139,8 @@
       
       AddressSettings addressSettings = new AddressSettings();
       addressSettings.setRedeliveryDelay(0);
-      addressSettings.setMaxSizeBytes(1048576);
-      addressSettings.setPageSizeBytes(104857);
+      addressSettings.setMaxSizeBytes(10485760); // 1 MiB
+      addressSettings.setPageSizeBytes(1048576); // 100 kiB
       addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
       
       server0.getConfiguration().getAddressesSettings().put("#", addressSettings);
@@ -159,6 +159,7 @@
       ClientSession session0 = csf0.createSession(false, true, true);
 
       ClientSessionFactory csf1 = HornetQClient.createClientSessionFactory(server1tc);
+      //csf1.setAckBatchSize(20480); // 20 kiB
       ClientSession session1 = csf1.createSession(false, true, true);
 
       ClientProducer prod0 = session0.createProducer(testAddress);
@@ -185,7 +186,7 @@
       };
       t.start();
       
-      final int numMessages = 500;
+      final int numMessages = 5000;
 
       SimpleString propKey = new SimpleString("propkey");
 
@@ -194,6 +195,7 @@
          ClientMessage message = session0.createMessage(false);
          message.putIntProperty(propKey, i);
          message.getBodyBuffer().writeBytes(content);
+         //message.setPriority((byte)3);
          prod0.send(message);
          System.out.println(">>>> " + i);
       }

Added: trunk/tests/src/org/hornetq/tests/integration/management/ManagementWithStompTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/ManagementWithStompTest.java	                        (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/management/ManagementWithStompTest.java	2010-06-24 12:40:30 UTC (rev 9359)
@@ -0,0 +1,264 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.management;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.util.HashMap;
+import java.util.Map;
+
+import junit.framework.Assert;
+
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.management.QueueControl;
+import org.hornetq.api.core.management.ResourceNames;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.protocol.stomp.Stomp;
+import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
+import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
+import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
+import org.hornetq.core.remoting.impl.netty.TransportConstants;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.HornetQServers;
+import org.hornetq.spi.core.protocol.ProtocolType;
+import org.hornetq.tests.util.RandomUtil;
+
+/**
+ * A ManagementWithStompTest
+ *
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ */
+public class ManagementWithStompTest extends ManagementTestBase
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   protected HornetQServer server;
+   
+   protected ClientSession session;
+
+   private Socket stompSocket;
+
+   private ByteArrayOutputStream inputBuffer;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+
+   public void testGetManagementAttributeFromStomp() throws Exception
+   {
+      SimpleString address = RandomUtil.randomSimpleString();
+      SimpleString queue = RandomUtil.randomSimpleString();
+
+      session.createQueue(address, queue, null, false);
+
+      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+      sendFrame(frame);
+
+      frame = receiveFrame(10000);
+      Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+      frame = "SUBSCRIBE\n" + "destination:" + queue + "\n\n" + Stomp.NULL;
+      sendFrame(frame);
+
+      // retrieve the address of the queue
+      frame = "\nSEND\n" + "destination:" + ConfigurationImpl.DEFAULT_MANAGEMENT_ADDRESS + "\n" +
+            "reply-to:" + address + "\n" +
+      		"_HQ_ResourceName:" + ResourceNames.CORE_QUEUE + queue + "\n" +
+      		"_HQ_Attribute: Address\n\n" +
+      		Stomp.NULL;
+      sendFrame(frame);
+
+      frame = receiveFrame(10000);
+      System.out.println(frame);
+      assertTrue(frame.contains("_HQ_OperationSucceeded:true"));
+      // the address will be returned in the message body in a JSON array
+      Assert.assertTrue(frame.contains("[\"" + address + "\"]"));
+      
+      frame = "UNSUBSCRIBE\n" + "destination:" + queue + "\n" +
+         "receipt: 123\n\n" +
+         Stomp.NULL;
+      sendFrame(frame);
+      waitForReceipt();
+      
+      String disconnectFrame = "DISCONNECT\n\n" + Stomp.NULL;
+      sendFrame(disconnectFrame);
+
+      session.deleteQueue(queue);
+   }
+
+   public void testInvokeOperationFromStomp() throws Exception
+   {
+      SimpleString address = RandomUtil.randomSimpleString();
+      SimpleString queue = RandomUtil.randomSimpleString();
+
+      session.createQueue(address, queue, null, false);
+
+      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+      sendFrame(frame);
+
+      frame = receiveFrame(10000);
+      Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+      frame = "SUBSCRIBE\n" + "destination:" + queue + "\n\n" + Stomp.NULL;
+      sendFrame(frame);
+
+      // count number of message with filter "color = 'blue'"
+      frame = "\nSEND\n" + "destination:" + ConfigurationImpl.DEFAULT_MANAGEMENT_ADDRESS + "\n" +
+            "reply-to:" + address + "\n" +
+            "_HQ_ResourceName:" + ResourceNames.CORE_QUEUE + queue + "\n" +
+            "_HQ_OperationName: countMessages\n\n" +
+            "[\"color = 'blue'\"]" +
+            Stomp.NULL;
+      sendFrame(frame);
+
+      frame = receiveFrame(10000);
+      System.out.println(frame);
+      assertTrue(frame.contains("_HQ_OperationSucceeded:true"));
+      // there is no such messages => 0 returned in a JSON array
+      assertTrue(frame.contains("[0]"));
+      
+      frame = "UNSUBSCRIBE\n" + "destination:" + queue + "\n" +
+         "receipt: 123\n\n" +
+         Stomp.NULL;
+      sendFrame(frame);
+      waitForReceipt();
+      
+      String disconnectFrame = "DISCONNECT\n\n" + Stomp.NULL;
+      sendFrame(disconnectFrame);
+
+      session.deleteQueue(queue);
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+
+      Configuration conf = new ConfigurationImpl();
+      conf.setSecurityEnabled(false);
+      conf.setJMXManagementEnabled(true);
+      Map<String, Object> params = new HashMap<String, Object>();
+      params.put(TransportConstants.PROTOCOL_PROP_NAME, ProtocolType.STOMP.toString());
+      params.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_STOMP_PORT);
+      TransportConfiguration stompTransport = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
+      conf.getAcceptorConfigurations().add(stompTransport);
+      conf.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
+      server = HornetQServers.newHornetQServer(conf, mbeanServer, false);
+      server.start();
+
+      ClientSessionFactory sf = HornetQClient.createClientSessionFactory(new TransportConfiguration(InVMConnectorFactory.class.getName()));
+      sf.setBlockOnNonDurableSend(true);
+      sf.setBlockOnNonDurableSend(true);
+      session = sf.createSession(false, true, false);
+      session.start();
+
+      stompSocket = new Socket("127.0.0.1", TransportConstants.DEFAULT_STOMP_PORT);
+      inputBuffer = new ByteArrayOutputStream();
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      session.close();
+
+      server.stop();
+
+      session = null;
+
+      server = null;
+
+      super.tearDown();
+   }
+
+   protected QueueControl createManagementControl(final SimpleString address, final SimpleString queue) throws Exception
+   {
+      QueueControl queueControl = ManagementControlHelper.createQueueControl(address, queue, mbeanServer);
+
+      return queueControl;
+   }
+
+   // Private -------------------------------------------------------
+
+   public void sendFrame(String data) throws Exception
+   {
+      byte[] bytes = data.getBytes("UTF-8");
+      OutputStream outputStream = stompSocket.getOutputStream();
+      for (int i = 0; i < bytes.length; i++)
+      {
+         outputStream.write(bytes[i]);
+      }
+      outputStream.flush();
+   }
+
+   public String receiveFrame(long timeOut) throws Exception
+   {
+      stompSocket.setSoTimeout((int)timeOut);
+      InputStream is = stompSocket.getInputStream();
+      int c = 0;
+      for (;;)
+      {
+         c = is.read();
+         if (c < 0)
+         {
+            throw new IOException("socket closed.");
+         }
+         else if (c == 0)
+         {
+            c = is.read();
+            if (c != '\n')
+            {
+               byte[] ba = inputBuffer.toByteArray();
+               System.out.println(new String(ba, "UTF-8"));
+            }
+            Assert.assertEquals("Expecting stomp frame to terminate with \0\n", c, '\n');
+            byte[] ba = inputBuffer.toByteArray();
+            inputBuffer.reset();
+            return new String(ba, "UTF-8");
+         }
+         else
+         {
+            inputBuffer.write(c);
+         }
+      }
+   }
+   
+   protected void waitForReceipt() throws Exception
+   {
+      String frame = receiveFrame(50000);
+      assertNotNull(frame);
+      assertTrue(frame.indexOf("RECEIPT") > -1);
+   }
+
+   // Inner classes -------------------------------------------------
+
+}



More information about the hornetq-commits mailing list