Author: jmesnil
Date: 2010-06-24 08:40:30 -0400 (Thu, 24 Jun 2010)
New Revision: 9359
Added:
trunk/tests/src/org/hornetq/tests/integration/management/ManagementWithStompTest.java
Modified:
trunk/docs/user-manual/en/interoperability.xml
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithPagingTest.java
Log:
https://jira.jboss.org/browse/HORNETQ-343: Management operation results can not be read
from Stomp clients
* store management operation results in the message body buffer as a nullable SimpleString
(instead of a nullable String) so that it can be read by Stomp messages
* added ManagementWithStompTest
* fixed doc which contains erroneous explanation about Stomp/JMS/Core mapping
Modified: trunk/docs/user-manual/en/interoperability.xml
===================================================================
--- trunk/docs/user-manual/en/interoperability.xml 2010-06-24 03:39:11 UTC (rev 9358)
+++ trunk/docs/user-manual/en/interoperability.xml 2010-06-24 12:40:30 UTC (rev 9359)
@@ -94,13 +94,13 @@
our Stomp implementation checks for presence of the
<literal>content-length</literal> header to decide how to map a Stomp message
to a JMS Message or a Core message.
</para>
- <para>If the Stomp message has a
<literal>content-length</literal> header, it will be mapped to a JMS
<emphasis>TextMessage</emphasis>
+ <para>If the Stomp message does <emphasis>not</emphasis>
have a <literal>content-length</literal> header, it will be mapped to a JMS
<emphasis>TextMessage</emphasis>
or a Core message with a <emphasis>single nullable SimpleString in
the body buffer</emphasis>.</para>
- <para>Alternatively, if the Stomp message does
<emphasis>not</emphasis> have a <literal>content-length</literal>
header,
+ <para>Alternatively, if the Stomp message
<emphasis>has</emphasis> a <literal>content-length</literal>
header,
it will be mapped to a JMS <emphasis>BytesMessage</emphasis>
or a Core message with a <emphasis>byte[] in the body
buffer</emphasis>.</para>
<para>The same logic applies when mapping a JMS message or a Core
message to Stomp. A Stomp client can check the presence
- of the <literal>content-length</literal> header to determine
the type of the message body (UTF-8 String or bytes).</para>
+ of the <literal>content-length</literal> header to determine
the type of the message body (String or bytes).</para>
</section>
</section>
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithPagingTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithPagingTest.java 2010-06-24
03:39:11 UTC (rev 9358)
+++
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeWithPagingTest.java 2010-06-24
12:40:30 UTC (rev 9359)
@@ -76,9 +76,9 @@
}
//
https://jira.jboss.org/browse/HORNETQ-382
- public void _testReconnectWithPaging() throws Exception
+ public void testReconnectWithPaging() throws Exception
{
- final byte[] content = new byte[2048];
+ final byte[] content = new byte[2048]; // 2 kiB
for (int i=0; i < content.length; ++i) {
content[i] = (byte) i;
}
@@ -108,7 +108,7 @@
final long retryInterval = 50;
final double retryIntervalMultiplier = 1d;
final int reconnectAttempts = -1;
- final int confirmationWindowSize = 1024;
+ final int confirmationWindowSize = 1024; // 1 kiB
Pair<String, String> connectorPair = new Pair<String,
String>(server1tc.getName(), null);
@@ -139,8 +139,8 @@
AddressSettings addressSettings = new AddressSettings();
addressSettings.setRedeliveryDelay(0);
- addressSettings.setMaxSizeBytes(1048576);
- addressSettings.setPageSizeBytes(104857);
+ addressSettings.setMaxSizeBytes(10485760); // 1 MiB
+ addressSettings.setPageSizeBytes(1048576); // 100 kiB
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
server0.getConfiguration().getAddressesSettings().put("#",
addressSettings);
@@ -159,6 +159,7 @@
ClientSession session0 = csf0.createSession(false, true, true);
ClientSessionFactory csf1 = HornetQClient.createClientSessionFactory(server1tc);
+ //csf1.setAckBatchSize(20480); // 20 kiB
ClientSession session1 = csf1.createSession(false, true, true);
ClientProducer prod0 = session0.createProducer(testAddress);
@@ -185,7 +186,7 @@
};
t.start();
- final int numMessages = 500;
+ final int numMessages = 5000;
SimpleString propKey = new SimpleString("propkey");
@@ -194,6 +195,7 @@
ClientMessage message = session0.createMessage(false);
message.putIntProperty(propKey, i);
message.getBodyBuffer().writeBytes(content);
+ //message.setPriority((byte)3);
prod0.send(message);
System.out.println(">>>> " + i);
}
Added:
trunk/tests/src/org/hornetq/tests/integration/management/ManagementWithStompTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/ManagementWithStompTest.java
(rev 0)
+++
trunk/tests/src/org/hornetq/tests/integration/management/ManagementWithStompTest.java 2010-06-24
12:40:30 UTC (rev 9359)
@@ -0,0 +1,264 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.management;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.util.HashMap;
+import java.util.Map;
+
+import junit.framework.Assert;
+
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.management.QueueControl;
+import org.hornetq.api.core.management.ResourceNames;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.protocol.stomp.Stomp;
+import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
+import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
+import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
+import org.hornetq.core.remoting.impl.netty.TransportConstants;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.HornetQServers;
+import org.hornetq.spi.core.protocol.ProtocolType;
+import org.hornetq.tests.util.RandomUtil;
+
+/**
+ * A ManagementWithStompTest
+ *
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ *
+ */
+public class ManagementWithStompTest extends ManagementTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ protected HornetQServer server;
+
+ protected ClientSession session;
+
+ private Socket stompSocket;
+
+ private ByteArrayOutputStream inputBuffer;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+
+ public void testGetManagementAttributeFromStomp() throws Exception
+ {
+ SimpleString address = RandomUtil.randomSimpleString();
+ SimpleString queue = RandomUtil.randomSimpleString();
+
+ session.createQueue(address, queue, null, false);
+
+ String frame = "CONNECT\n" + "login: brianm\n" +
"passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame = "SUBSCRIBE\n" + "destination:" + queue +
"\n\n" + Stomp.NULL;
+ sendFrame(frame);
+
+ // retrieve the address of the queue
+ frame = "\nSEND\n" + "destination:" +
ConfigurationImpl.DEFAULT_MANAGEMENT_ADDRESS + "\n" +
+ "reply-to:" + address + "\n" +
+ "_HQ_ResourceName:" + ResourceNames.CORE_QUEUE + queue + "\n"
+
+ "_HQ_Attribute: Address\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(10000);
+ System.out.println(frame);
+ assertTrue(frame.contains("_HQ_OperationSucceeded:true"));
+ // the address will be returned in the message body in a JSON array
+ Assert.assertTrue(frame.contains("[\"" + address +
"\"]"));
+
+ frame = "UNSUBSCRIBE\n" + "destination:" + queue +
"\n" +
+ "receipt: 123\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+ waitForReceipt();
+
+ String disconnectFrame = "DISCONNECT\n\n" + Stomp.NULL;
+ sendFrame(disconnectFrame);
+
+ session.deleteQueue(queue);
+ }
+
+ public void testInvokeOperationFromStomp() throws Exception
+ {
+ SimpleString address = RandomUtil.randomSimpleString();
+ SimpleString queue = RandomUtil.randomSimpleString();
+
+ session.createQueue(address, queue, null, false);
+
+ String frame = "CONNECT\n" + "login: brianm\n" +
"passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame = "SUBSCRIBE\n" + "destination:" + queue +
"\n\n" + Stomp.NULL;
+ sendFrame(frame);
+
+ // count number of message with filter "color = 'blue'"
+ frame = "\nSEND\n" + "destination:" +
ConfigurationImpl.DEFAULT_MANAGEMENT_ADDRESS + "\n" +
+ "reply-to:" + address + "\n" +
+ "_HQ_ResourceName:" + ResourceNames.CORE_QUEUE + queue +
"\n" +
+ "_HQ_OperationName: countMessages\n\n" +
+ "[\"color = 'blue'\"]" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(10000);
+ System.out.println(frame);
+ assertTrue(frame.contains("_HQ_OperationSucceeded:true"));
+ // there is no such messages => 0 returned in a JSON array
+ assertTrue(frame.contains("[0]"));
+
+ frame = "UNSUBSCRIBE\n" + "destination:" + queue +
"\n" +
+ "receipt: 123\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+ waitForReceipt();
+
+ String disconnectFrame = "DISCONNECT\n\n" + Stomp.NULL;
+ sendFrame(disconnectFrame);
+
+ session.deleteQueue(queue);
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ Configuration conf = new ConfigurationImpl();
+ conf.setSecurityEnabled(false);
+ conf.setJMXManagementEnabled(true);
+ Map<String, Object> params = new HashMap<String, Object>();
+ params.put(TransportConstants.PROTOCOL_PROP_NAME, ProtocolType.STOMP.toString());
+ params.put(TransportConstants.PORT_PROP_NAME,
TransportConstants.DEFAULT_STOMP_PORT);
+ TransportConfiguration stompTransport = new
TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
+ conf.getAcceptorConfigurations().add(stompTransport);
+ conf.getAcceptorConfigurations().add(new
TransportConfiguration(InVMAcceptorFactory.class.getName()));
+ server = HornetQServers.newHornetQServer(conf, mbeanServer, false);
+ server.start();
+
+ ClientSessionFactory sf = HornetQClient.createClientSessionFactory(new
TransportConfiguration(InVMConnectorFactory.class.getName()));
+ sf.setBlockOnNonDurableSend(true);
+ sf.setBlockOnNonDurableSend(true);
+ session = sf.createSession(false, true, false);
+ session.start();
+
+ stompSocket = new Socket("127.0.0.1",
TransportConstants.DEFAULT_STOMP_PORT);
+ inputBuffer = new ByteArrayOutputStream();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ session.close();
+
+ server.stop();
+
+ session = null;
+
+ server = null;
+
+ super.tearDown();
+ }
+
+ protected QueueControl createManagementControl(final SimpleString address, final
SimpleString queue) throws Exception
+ {
+ QueueControl queueControl = ManagementControlHelper.createQueueControl(address,
queue, mbeanServer);
+
+ return queueControl;
+ }
+
+ // Private -------------------------------------------------------
+
+ public void sendFrame(String data) throws Exception
+ {
+ byte[] bytes = data.getBytes("UTF-8");
+ OutputStream outputStream = stompSocket.getOutputStream();
+ for (int i = 0; i < bytes.length; i++)
+ {
+ outputStream.write(bytes[i]);
+ }
+ outputStream.flush();
+ }
+
+ public String receiveFrame(long timeOut) throws Exception
+ {
+ stompSocket.setSoTimeout((int)timeOut);
+ InputStream is = stompSocket.getInputStream();
+ int c = 0;
+ for (;;)
+ {
+ c = is.read();
+ if (c < 0)
+ {
+ throw new IOException("socket closed.");
+ }
+ else if (c == 0)
+ {
+ c = is.read();
+ if (c != '\n')
+ {
+ byte[] ba = inputBuffer.toByteArray();
+ System.out.println(new String(ba, "UTF-8"));
+ }
+ Assert.assertEquals("Expecting stomp frame to terminate with \0\n",
c, '\n');
+ byte[] ba = inputBuffer.toByteArray();
+ inputBuffer.reset();
+ return new String(ba, "UTF-8");
+ }
+ else
+ {
+ inputBuffer.write(c);
+ }
+ }
+ }
+
+ protected void waitForReceipt() throws Exception
+ {
+ String frame = receiveFrame(50000);
+ assertNotNull(frame);
+ assertTrue(frame.indexOf("RECEIPT") > -1);
+ }
+
+ // Inner classes -------------------------------------------------
+
+}