[hornetq-commits] JBoss hornetq SVN: r11832 - in trunk: tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11 and 1 other directory.
do-not-reply at jboss.org
do-not-reply at jboss.org
Mon Dec 5 09:19:44 EST 2011
Author: gaohoward
Date: 2011-12-05 09:19:44 -0500 (Mon, 05 Dec 2011)
New Revision: 11832
Added:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/ExtraStompTest.java
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestBase2.java
Log:
HORNETQ-726, HORNETQ-742
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java 2011-12-05 14:19:17 UTC (rev 11831)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java 2011-12-05 14:19:44 UTC (rev 11832)
@@ -316,9 +316,12 @@
int bodyPos = serverMessage.getEndOfBodyPosition() == -1 ? buffer
.writerIndex() : serverMessage.getEndOfBodyPosition();
+
+ buffer.readerIndex(MessageImpl.BUFFER_HEADER_SPACE
+ + DataConstants.SIZE_INT);
+
int size = bodyPos - buffer.readerIndex();
- buffer.readerIndex(MessageImpl.BUFFER_HEADER_SPACE
- + DataConstants.SIZE_INT);
+
byte[] data = new byte[size];
if (serverMessage.containsProperty(Stomp.Headers.CONTENT_LENGTH)
@@ -341,7 +344,10 @@
}
frame.setByteBody(data);
- serverMessage.getBodyBuffer().resetReaderIndex();
+ // Reset indexes so they are in order when
+ // StompSession.sendMessage is called again with the same
+ // ServerMessage instance
+ buffer.setIndex(MessageImpl.BUFFER_HEADER_SPACE + DataConstants.SIZE_INT, bodyPos);
StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame,
deliveryCount);
Added: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/ExtraStompTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/ExtraStompTest.java (rev 0)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/ExtraStompTest.java 2011-12-05 14:19:44 UTC (rev 11832)
@@ -0,0 +1,296 @@
+package org.hornetq.tests.integration.stomp.v11;
+
+import org.hornetq.tests.integration.stomp.util.ClientStompFrame;
+import org.hornetq.tests.integration.stomp.util.StompClientConnection;
+import org.hornetq.tests.integration.stomp.util.StompClientConnectionFactory;
+
+/*
+ * Some Stomp tests against server with persistence enabled are put here.
+ */
+public class ExtraStompTest extends StompTestBase2
+{
+
+ protected void setUp() throws Exception
+ {
+ persistenceEnabled = true;
+ super.setUp();
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+ public void testSendAndReceive10() throws Exception
+ {
+ StompClientConnection connV11 = StompClientConnectionFactory.createClientConnection("1.0", hostname, port);
+
+ try
+ {
+ String msg1 = "Hello World 1!";
+ String msg2 = "Hello World 2!";
+
+ connV11.connect(defUser, defPass);
+
+ ClientStompFrame frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("content-length", String.valueOf(msg1.getBytes("UTF-8").length));
+ frame.addHeader("persistent", "true");
+ frame.setBody(msg1);
+
+ connV11.sendFrame(frame);
+
+ ClientStompFrame frame2 = connV11.createFrame("SEND");
+ frame2.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame2.addHeader("content-length", String.valueOf(msg2.getBytes("UTF-8").length));
+ frame2.addHeader("persistent", "true");
+ frame2.setBody(msg2);
+
+ connV11.sendFrame(frame2);
+
+ ClientStompFrame subFrame = connV11.createFrame("SUBSCRIBE");
+ subFrame.addHeader("id", "a-sub");
+ subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+ subFrame.addHeader("ack", "auto");
+
+ connV11.sendFrame(subFrame);
+
+ frame = connV11.receiveFrame();
+
+ System.out.println("received " + frame);
+
+ assertEquals("MESSAGE", frame.getCommand());
+
+ assertEquals("a-sub", frame.getHeader("subscription"));
+
+ assertNotNull(frame.getHeader("message-id"));
+
+ assertEquals(getQueuePrefix() + getQueueName(), frame.getHeader("destination"));
+
+ assertEquals(msg1, frame.getBody());
+
+ frame = connV11.receiveFrame();
+
+ System.out.println("received " + frame);
+
+ assertEquals("MESSAGE", frame.getCommand());
+
+ assertEquals("a-sub", frame.getHeader("subscription"));
+
+ assertNotNull(frame.getHeader("message-id"));
+
+ assertEquals(getQueuePrefix() + getQueueName(), frame.getHeader("destination"));
+
+ assertEquals(msg2, frame.getBody());
+
+ //unsub
+ ClientStompFrame unsubFrame = connV11.createFrame("UNSUBSCRIBE");
+ unsubFrame.addHeader("id", "a-sub");
+ connV11.sendFrame(unsubFrame);
+
+ }
+ finally
+ {
+ connV11.disconnect();
+ }
+ }
+
+ public void testSendAndReceive11() throws Exception
+ {
+ StompClientConnection connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+
+ try
+ {
+ String msg1 = "Hello World 1!";
+ String msg2 = "Hello World 2!";
+
+ connV11.connect(defUser, defPass);
+
+ ClientStompFrame frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("content-length", String.valueOf(msg1.getBytes("UTF-8").length));
+ frame.addHeader("persistent", "true");
+ frame.setBody(msg1);
+
+ connV11.sendFrame(frame);
+
+ ClientStompFrame frame2 = connV11.createFrame("SEND");
+ frame2.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame2.addHeader("content-length", String.valueOf(msg2.getBytes("UTF-8").length));
+ frame2.addHeader("persistent", "true");
+ frame2.setBody(msg2);
+
+ connV11.sendFrame(frame2);
+
+ ClientStompFrame subFrame = connV11.createFrame("SUBSCRIBE");
+ subFrame.addHeader("id", "a-sub");
+ subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+ subFrame.addHeader("ack", "auto");
+
+ connV11.sendFrame(subFrame);
+
+ frame = connV11.receiveFrame();
+
+ System.out.println("received " + frame);
+
+ assertEquals("MESSAGE", frame.getCommand());
+
+ assertEquals("a-sub", frame.getHeader("subscription"));
+
+ assertNotNull(frame.getHeader("message-id"));
+
+ assertEquals(getQueuePrefix() + getQueueName(), frame.getHeader("destination"));
+
+ assertEquals(msg1, frame.getBody());
+
+ frame = connV11.receiveFrame();
+
+ System.out.println("received " + frame);
+
+ assertEquals("MESSAGE", frame.getCommand());
+
+ assertEquals("a-sub", frame.getHeader("subscription"));
+
+ assertNotNull(frame.getHeader("message-id"));
+
+ assertEquals(getQueuePrefix() + getQueueName(), frame.getHeader("destination"));
+
+ assertEquals(msg2, frame.getBody());
+
+ //unsub
+ ClientStompFrame unsubFrame = connV11.createFrame("UNSUBSCRIBE");
+ unsubFrame.addHeader("id", "a-sub");
+ connV11.sendFrame(unsubFrame);
+
+ }
+ finally
+ {
+ connV11.disconnect();
+ }
+ }
+
+ public void testNoGarbageAfterPersistentMessageV10() throws Exception
+ {
+ StompClientConnection connV11 = StompClientConnectionFactory
+ .createClientConnection("1.0", hostname, port);
+ try
+ {
+ connV11.connect(defUser, defPass);
+
+ ClientStompFrame subFrame = connV11.createFrame("SUBSCRIBE");
+ subFrame.addHeader("id", "a-sub");
+ subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+ subFrame.addHeader("ack", "auto");
+
+ connV11.sendFrame(subFrame);
+
+ ClientStompFrame frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("content-length", "11");
+ frame.addHeader("persistent", "true");
+ frame.setBody("Hello World");
+
+ connV11.sendFrame(frame);
+
+ frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("content-length", "11");
+ frame.addHeader("persistent", "true");
+ frame.setBody("Hello World");
+
+ connV11.sendFrame(frame);
+
+ frame = connV11.receiveFrame(10000);
+
+ System.out.println("received: " + frame);
+
+ assertEquals("Hello World", frame.getBody());
+
+ //if hornetq sends trailing garbage bytes, the second message
+ //will not be normal
+ frame = connV11.receiveFrame(10000);
+
+ System.out.println("received: " + frame);
+
+ assertEquals("Hello World", frame.getBody());
+
+ //unsub
+ ClientStompFrame unsubFrame = connV11.createFrame("UNSUBSCRIBE");
+ unsubFrame.addHeader("id", "a-sub");
+ connV11.sendFrame(unsubFrame);
+ }
+ finally
+ {
+ connV11.disconnect();
+ }
+ }
+
+ public void testNoGarbageOnPersistentRedeliveryV10() throws Exception
+ {
+ StompClientConnection connV11 = StompClientConnectionFactory
+ .createClientConnection("1.0", hostname, port);
+ try
+ {
+ connV11.connect(defUser, defPass);
+
+ ClientStompFrame frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("content-length", "11");
+ frame.addHeader("persistent", "true");
+ frame.setBody("Hello World");
+
+ connV11.sendFrame(frame);
+
+ frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("content-length", "11");
+ frame.addHeader("persistent", "true");
+ frame.setBody("Hello World");
+
+ connV11.sendFrame(frame);
+
+ ClientStompFrame subFrame = connV11.createFrame("SUBSCRIBE");
+ subFrame.addHeader("id", "a-sub");
+ subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+ subFrame.addHeader("ack", "client");
+
+ connV11.sendFrame(subFrame);
+
+ // receive but don't ack
+ frame = connV11.receiveFrame(10000);
+ frame = connV11.receiveFrame(10000);
+
+ System.out.println("received: " + frame);
+
+ //unsub
+ ClientStompFrame unsubFrame = connV11.createFrame("UNSUBSCRIBE");
+ unsubFrame.addHeader("id", "a-sub");
+ connV11.sendFrame(unsubFrame);
+
+ subFrame = connV11.createFrame("SUBSCRIBE");
+ subFrame.addHeader("id", "a-sub");
+ subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+ subFrame.addHeader("ack", "auto");
+
+ connV11.sendFrame(subFrame);
+
+ frame = connV11.receiveFrame(10000);
+ frame = connV11.receiveFrame(10000);
+
+ //second receive will get problem if trailing bytes
+ assertEquals("Hello World", frame.getBody());
+
+ System.out.println("received again: " + frame);
+
+ //unsub
+ unsubFrame = connV11.createFrame("UNSUBSCRIBE");
+ unsubFrame.addHeader("id", "a-sub");
+ connV11.sendFrame(unsubFrame);
+ }
+ finally
+ {
+ connV11.disconnect();
+ }
+ }
+
+}
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestBase2.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestBase2.java 2011-12-05 14:19:17 UTC (rev 11831)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestBase2.java 2011-12-05 14:19:44 UTC (rev 11832)
@@ -74,7 +74,7 @@
protected String defPass = "wombats";
-
+ protected boolean persistenceEnabled = false;
// Implementation methods
// -------------------------------------------------------------------------
@@ -101,7 +101,9 @@
{
Configuration config = createBasicConfig();
config.setSecurityEnabled(false);
- config.setPersistenceEnabled(false);
+ config.setPersistenceEnabled(persistenceEnabled);
+
+ System.out.println("-----------------server persist: " + persistenceEnabled);
Map<String, Object> params = new HashMap<String, Object>();
params.put(TransportConstants.PROTOCOL_PROP_NAME, ProtocolType.STOMP.toString());
@@ -113,7 +115,7 @@
JMSConfiguration jmsConfig = new JMSConfigurationImpl();
jmsConfig.getQueueConfigurations()
- .add(new JMSQueueConfigurationImpl(getQueueName(), null, false, getQueueName()));
+ .add(new JMSQueueConfigurationImpl(getQueueName(), null, true, getQueueName()));
jmsConfig.getTopicConfigurations().add(new TopicConfigurationImpl(getTopicName(), getTopicName()));
server = new JMSServerManagerImpl(hornetQServer, jmsConfig);
server.setContext(new InVMContext());
More information about the hornetq-commits
mailing list