Author: gaohoward
Date: 2011-12-07 22:05:11 -0500 (Wed, 07 Dec 2011)
New Revision: 11874
Added:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompV11Test.java
Removed:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
Log:
test name correction
Deleted:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-12-07
20:53:32 UTC (rev 11873)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-12-08
03:05:11 UTC (rev 11874)
@@ -1,2221 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *
http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.hornetq.tests.integration.stomp.v11;
-
-import java.io.IOException;
-import java.nio.channels.ClosedChannelException;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.BytesMessage;
-import javax.jms.DeliveryMode;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.TextMessage;
-
-import junit.framework.Assert;
-
-import org.hornetq.core.logging.Logger;
-import org.hornetq.tests.integration.stomp.util.ClientStompFrame;
-import org.hornetq.tests.integration.stomp.util.StompClientConnection;
-import org.hornetq.tests.integration.stomp.util.StompClientConnectionFactory;
-import org.hornetq.tests.integration.stomp.util.StompClientConnectionV11;
-
-/*
- *
- */
-public class StompTestV11 extends StompTestBase2
-{
- private static final transient Logger log = Logger.getLogger(StompTestV11.class);
-
- private StompClientConnection connV11;
-
- protected void setUp() throws Exception
- {
- super.setUp();
- connV11 = StompClientConnectionFactory.createClientConnection("1.1",
hostname, port);
- }
-
- protected void tearDown() throws Exception
- {
- System.out.println("Connection 11 : " + connV11.isConnected());
- if (connV11.isConnected())
- {
- connV11.disconnect();
- }
- super.tearDown();
- }
-
- public void testConnection() throws Exception
- {
- StompClientConnection connection =
StompClientConnectionFactory.createClientConnection("1.0", hostname, port);
-
- connection.connect(defUser, defPass);
-
- assertTrue(connection.isConnected());
-
- assertEquals("1.0", connection.getVersion());
-
- connection.disconnect();
-
- connection = StompClientConnectionFactory.createClientConnection("1.1",
hostname, port);
-
- connection.connect(defUser, defPass);
-
- assertTrue(connection.isConnected());
-
- assertEquals("1.1", connection.getVersion());
-
- connection.disconnect();
-
- connection = StompClientConnectionFactory.createClientConnection("1.1",
hostname, port);
-
- connection.connect();
-
- assertFalse(connection.isConnected());
-
- //new way of connection
- StompClientConnectionV11 conn = (StompClientConnectionV11)
StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
- conn.connect1(defUser, defPass);
-
- assertTrue(conn.isConnected());
-
- conn.disconnect();
- }
-
- public void testNegotiation() throws Exception
- {
- // case 1 accept-version absent. It is a 1.0 connect
- ClientStompFrame frame = connV11.createFrame("CONNECT");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
-
- ClientStompFrame reply = connV11.sendFrame(frame);
-
- assertEquals("CONNECTED", reply.getCommand());
-
- //reply headers: version, session, server
- assertEquals(null, reply.getHeader("version"));
-
- connV11.disconnect();
-
- // case 2 accept-version=1.0, result: 1.0
- connV11 = StompClientConnectionFactory.createClientConnection("1.1",
hostname, port);
- frame = connV11.createFrame("CONNECT");
- frame.addHeader("accept-version", "1.0");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
-
- reply = connV11.sendFrame(frame);
-
- assertEquals("CONNECTED", reply.getCommand());
-
- //reply headers: version, session, server
- assertEquals("1.0", reply.getHeader("version"));
-
- connV11.disconnect();
-
- // case 3 accept-version=1.1, result: 1.1
- connV11 = StompClientConnectionFactory.createClientConnection("1.1",
hostname, port);
- frame = connV11.createFrame("CONNECT");
- frame.addHeader("accept-version", "1.1");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
-
- reply = connV11.sendFrame(frame);
-
- assertEquals("CONNECTED", reply.getCommand());
-
- //reply headers: version, session, server
- assertEquals("1.1", reply.getHeader("version"));
-
- connV11.disconnect();
-
- // case 4 accept-version=1.0,1.1,1.2, result 1.1
- connV11 = StompClientConnectionFactory.createClientConnection("1.1",
hostname, port);
- frame = connV11.createFrame("CONNECT");
- frame.addHeader("accept-version", "1.0,1.1,1.2");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
-
- reply = connV11.sendFrame(frame);
-
- assertEquals("CONNECTED", reply.getCommand());
-
- //reply headers: version, session, server
- assertEquals("1.1", reply.getHeader("version"));
-
- connV11.disconnect();
-
- // case 5 accept-version=1.2, result error
- connV11 = StompClientConnectionFactory.createClientConnection("1.1",
hostname, port);
- frame = connV11.createFrame("CONNECT");
- frame.addHeader("accept-version", "1.2");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
-
- reply = connV11.sendFrame(frame);
-
- assertEquals("ERROR", reply.getCommand());
-
- System.out.println("Got error frame " + reply);
-
- }
-
- public void testSendAndReceive() throws Exception
- {
- connV11.connect(defUser, defPass);
- ClientStompFrame frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("content-type", "text/plain");
- frame.setBody("Hello World 1!");
-
- ClientStompFrame response = connV11.sendFrame(frame);
-
- assertNull(response);
-
- frame.addHeader("receipt", "1234");
- frame.setBody("Hello World 2!");
-
- response = connV11.sendFrame(frame);
-
- assertNotNull(response);
-
- assertEquals("RECEIPT", response.getCommand());
-
- assertEquals("1234", response.getHeader("receipt-id"));
-
- //subscribe
- StompClientConnection newConn =
StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
- newConn.connect(defUser, defPass);
-
- ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
- subFrame.addHeader("id", "a-sub");
- subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
- subFrame.addHeader("ack", "auto");
-
- newConn.sendFrame(subFrame);
-
- frame = newConn.receiveFrame();
-
- System.out.println("received " + frame);
-
- assertEquals("MESSAGE", frame.getCommand());
-
- assertEquals("a-sub", frame.getHeader("subscription"));
-
- assertNotNull(frame.getHeader("message-id"));
-
- assertEquals(getQueuePrefix() + getQueueName(),
frame.getHeader("destination"));
-
- assertEquals("Hello World 1!", frame.getBody());
-
- frame = newConn.receiveFrame();
-
- System.out.println("received " + frame);
-
- //unsub
- ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
- unsubFrame.addHeader("id", "a-sub");
- newConn.sendFrame(unsubFrame);
-
- newConn.disconnect();
- }
-
- public void testHeaderContentType() throws Exception
- {
- connV11.connect(defUser, defPass);
- ClientStompFrame frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("content-type", "application/xml");
- frame.setBody("Hello World 1!");
-
- connV11.sendFrame(frame);
-
- //subscribe
- StompClientConnection newConn =
StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
- newConn.connect(defUser, defPass);
-
- ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
- subFrame.addHeader("id", "a-sub");
- subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
- subFrame.addHeader("ack", "auto");
-
- newConn.sendFrame(subFrame);
-
- frame = newConn.receiveFrame();
-
- System.out.println("received " + frame);
-
- assertEquals("MESSAGE", frame.getCommand());
-
- assertEquals("application/xml",
frame.getHeader("content-type"));
-
- //unsub
- ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
- unsubFrame.addHeader("id", "a-sub");
-
- newConn.disconnect();
- }
-
- public void testHeaderContentLength() throws Exception
- {
- connV11.connect(defUser, defPass);
- ClientStompFrame frame = connV11.createFrame("SEND");
-
- String body = "Hello World 1!";
- String cLen = String.valueOf(body.getBytes("UTF-8").length);
-
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("content-type", "application/xml");
- frame.addHeader("content-length", cLen);
- frame.setBody(body + "extra");
-
- connV11.sendFrame(frame);
-
- //subscribe
- StompClientConnection newConn =
StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
- newConn.connect(defUser, defPass);
-
- ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
- subFrame.addHeader("id", "a-sub");
- subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
- subFrame.addHeader("ack", "auto");
-
- newConn.sendFrame(subFrame);
-
- frame = newConn.receiveFrame();
-
- System.out.println("received " + frame);
-
- assertEquals("MESSAGE", frame.getCommand());
-
- assertEquals(cLen, frame.getHeader("content-length"));
-
- //unsub
- ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
- unsubFrame.addHeader("id", "a-sub");
-
- newConn.disconnect();
- }
-
- public void testHeaderEncoding() throws Exception
- {
- connV11.connect(defUser, defPass);
- ClientStompFrame frame = connV11.createFrame("SEND");
-
- String body = "Hello World 1!";
- String cLen = String.valueOf(body.getBytes("UTF-8").length);
-
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("content-type", "application/xml");
- frame.addHeader("content-length", cLen);
- String hKey = "special-header\\\\\\n\\:";
- String hVal = "\\:\\\\\\ngood";
- frame.addHeader(hKey, hVal);
-
- System.out.println("key: |" + hKey + "| val: |" + hVal);
-
- frame.setBody(body);
-
- connV11.sendFrame(frame);
-
- //subscribe
- StompClientConnection newConn =
StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
- newConn.connect(defUser, defPass);
-
- ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
- subFrame.addHeader("id", "a-sub");
- subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
- subFrame.addHeader("ack", "auto");
-
- newConn.sendFrame(subFrame);
-
- frame = newConn.receiveFrame();
-
- System.out.println("received " + frame);
-
- assertEquals("MESSAGE", frame.getCommand());
-
- String value = frame.getHeader("special-header" + "\\" +
"\n" + ":");
-
- assertEquals(":" + "\\" + "\n" + "good",
value);
-
- //unsub
- ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
- unsubFrame.addHeader("id", "a-sub");
-
- newConn.disconnect();
- }
-
- public void testHeartBeat() throws Exception
- {
- //no heart beat at all if heat-beat absent
- ClientStompFrame frame = connV11.createFrame("CONNECT");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
-
- ClientStompFrame reply = connV11.sendFrame(frame);
-
- assertEquals("CONNECTED", reply.getCommand());
-
- Thread.sleep(5000);
-
- assertEquals(0, connV11.getFrameQueueSize());
-
- connV11.disconnect();
-
- //no heart beat for (0,0)
- connV11 = StompClientConnectionFactory.createClientConnection("1.1",
hostname, port);
- frame = connV11.createFrame("CONNECT");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
- frame.addHeader("heart-beat", "0,0");
- frame.addHeader("accept-version", "1.0,1.1");
-
- reply = connV11.sendFrame(frame);
-
- assertEquals("CONNECTED", reply.getCommand());
-
- assertEquals("0,0", reply.getHeader("heart-beat"));
-
- Thread.sleep(5000);
-
- assertEquals(0, connV11.getFrameQueueSize());
-
- connV11.disconnect();
-
- //heart-beat (1,0), should receive a min client ping accepted by server
- connV11 = StompClientConnectionFactory.createClientConnection("1.1",
hostname, port);
- frame = connV11.createFrame("CONNECT");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
- frame.addHeader("heart-beat", "1,0");
- frame.addHeader("accept-version", "1.0,1.1");
-
- reply = connV11.sendFrame(frame);
-
- assertEquals("CONNECTED", reply.getCommand());
-
- assertEquals("0,500", reply.getHeader("heart-beat"));
-
- Thread.sleep(2000);
-
- //now server side should be disconnected because we didn't send ping for 2 sec
- frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("content-type", "text/plain");
- frame.setBody("Hello World");
-
- //send will fail
- try
- {
- connV11.sendFrame(frame);
- fail("connection should have been destroyed by now");
- }
- catch (IOException e)
- {
- //ignore
- }
-
- //heart-beat (1,0), start a ping, then send a message, should be ok.
- connV11 = StompClientConnectionFactory.createClientConnection("1.1",
hostname, port);
- frame = connV11.createFrame("CONNECT");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
- frame.addHeader("heart-beat", "1,0");
- frame.addHeader("accept-version", "1.0,1.1");
-
- reply = connV11.sendFrame(frame);
-
- assertEquals("CONNECTED", reply.getCommand());
-
- assertEquals("0,500", reply.getHeader("heart-beat"));
-
- System.out.println("========== start pinger!");
-
- connV11.startPinger(500);
-
- Thread.sleep(2000);
-
- //now server side should be disconnected because we didn't send ping for 2 sec
- frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("content-type", "text/plain");
- frame.setBody("Hello World");
-
- //send will be ok
- connV11.sendFrame(frame);
-
- connV11.stopPinger();
-
- connV11.disconnect();
-
- }
-
- //server ping
- public void testHeartBeat2() throws Exception
- {
- //heart-beat (1,1)
- ClientStompFrame frame = connV11.createFrame("CONNECT");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
- frame.addHeader("heart-beat", "1,1");
- frame.addHeader("accept-version", "1.0,1.1");
-
- ClientStompFrame reply = connV11.sendFrame(frame);
-
- assertEquals("CONNECTED", reply.getCommand());
- assertEquals("500,500", reply.getHeader("heart-beat"));
-
- connV11.disconnect();
-
- //heart-beat (500,1000)
- connV11 = StompClientConnectionFactory.createClientConnection("1.1",
hostname, port);
- frame = connV11.createFrame("CONNECT");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
- frame.addHeader("heart-beat", "500,1000");
- frame.addHeader("accept-version", "1.0,1.1");
-
- reply = connV11.sendFrame(frame);
-
- assertEquals("CONNECTED", reply.getCommand());
-
- assertEquals("1000,500", reply.getHeader("heart-beat"));
-
- System.out.println("========== start pinger!");
-
- connV11.startPinger(500);
-
- Thread.sleep(10000);
-
- //now check the frame size
- int size = connV11.getServerPingNumber();
-
- System.out.println("ping received: " + size);
-
- assertTrue(size > 5);
-
- //now server side should be disconnected because we didn't send ping for 2 sec
- frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("content-type", "text/plain");
- frame.setBody("Hello World");
-
- //send will be ok
- connV11.sendFrame(frame);
-
- connV11.disconnect();
- }
-
- public void testSendWithHeartBeatsAndReceive() throws Exception
- {
- StompClientConnection newConn = null;
- try
- {
- ClientStompFrame frame = connV11.createFrame("CONNECT");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
- frame.addHeader("heart-beat", "500,1000");
- frame.addHeader("accept-version", "1.0,1.1");
-
- connV11.sendFrame(frame);
-
- connV11.startPinger(500);
-
- frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("content-type", "text/plain");
-
- for (int i = 0; i < 10; i++)
- {
- frame.setBody("Hello World " + i + "!");
- connV11.sendFrame(frame);
- Thread.sleep(500);
- }
-
- // subscribe
- newConn = StompClientConnectionFactory.createClientConnection("1.1",
- hostname, port);
- newConn.connect(defUser, defPass);
-
- ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
- subFrame.addHeader("id", "a-sub");
- subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
- subFrame.addHeader("ack", "auto");
-
- newConn.sendFrame(subFrame);
-
- int cnt = 0;
-
- frame = newConn.receiveFrame();
-
- while (frame != null)
- {
- cnt++;
- Thread.sleep(500);
- frame = newConn.receiveFrame(5000);
- }
-
- assertEquals(10, cnt);
-
- // unsub
- ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
- unsubFrame.addHeader("id", "a-sub");
- newConn.sendFrame(unsubFrame);
- }
- finally
- {
- if (newConn != null)
- newConn.disconnect();
- connV11.disconnect();
- }
- }
-
- public void testSendAndReceiveWithHeartBeats() throws Exception
- {
- connV11.connect(defUser, defPass);
- ClientStompFrame frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("content-type", "text/plain");
-
- for (int i = 0; i < 10; i++)
- {
- frame.setBody("Hello World " + i + "!");
- connV11.sendFrame(frame);
- Thread.sleep(500);
- }
-
- //subscribe
- StompClientConnection newConn =
StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
- try
- {
- frame = newConn.createFrame("CONNECT");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
- frame.addHeader("heart-beat", "500,1000");
- frame.addHeader("accept-version", "1.0,1.1");
-
- newConn.sendFrame(frame);
-
- newConn.startPinger(500);
-
- Thread.sleep(500);
-
- ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
- subFrame.addHeader("id", "a-sub");
- subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
- subFrame.addHeader("ack", "auto");
-
- newConn.sendFrame(subFrame);
-
- int cnt = 0;
-
- frame = newConn.receiveFrame();
-
- while (frame != null)
- {
- cnt++;
- Thread.sleep(500);
- frame = newConn.receiveFrame(5000);
- }
-
- assertEquals(10, cnt);
-
- // unsub
- ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
- unsubFrame.addHeader("id", "a-sub");
- newConn.sendFrame(unsubFrame);
- }
- finally
- {
- newConn.disconnect();
- }
- }
-
- public void testSendWithHeartBeatsAndReceiveWithHeartBeats() throws Exception
- {
- StompClientConnection newConn = null;
- try
- {
- ClientStompFrame frame = connV11.createFrame("CONNECT");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
- frame.addHeader("heart-beat", "500,1000");
- frame.addHeader("accept-version", "1.0,1.1");
-
- connV11.sendFrame(frame);
-
- connV11.startPinger(500);
-
- frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("content-type", "text/plain");
-
- for (int i = 0; i < 10; i++)
- {
- frame.setBody("Hello World " + i + "!");
- connV11.sendFrame(frame);
- Thread.sleep(500);
- }
-
- // subscribe
- newConn = StompClientConnectionFactory.createClientConnection("1.1",
- hostname, port);
- frame = newConn.createFrame("CONNECT");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
- frame.addHeader("heart-beat", "500,1000");
- frame.addHeader("accept-version", "1.0,1.1");
-
- newConn.sendFrame(frame);
-
- newConn.startPinger(500);
-
- Thread.sleep(500);
-
- ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
- subFrame.addHeader("id", "a-sub");
- subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
- subFrame.addHeader("ack", "auto");
-
- newConn.sendFrame(subFrame);
-
- int cnt = 0;
-
- frame = newConn.receiveFrame();
-
- while (frame != null)
- {
- cnt++;
- Thread.sleep(500);
- frame = newConn.receiveFrame(5000);
- }
- assertEquals(10, cnt);
-
- // unsub
- ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
- unsubFrame.addHeader("id", "a-sub");
- newConn.sendFrame(unsubFrame);
- }
- finally
- {
- if (newConn != null)
- newConn.disconnect();
- connV11.disconnect();
- }
- }
-
- public void testNack() throws Exception
- {
- connV11.connect(defUser, defPass);
-
- subscribe(connV11, "sub1", "client");
-
- sendMessage(getName());
-
- ClientStompFrame frame = connV11.receiveFrame();
-
- String messageID = frame.getHeader("message-id");
-
- System.out.println("Received message with id " + messageID);
-
- nack(connV11, "sub1", messageID);
-
- unsubscribe(connV11, "sub1");
-
- connV11.disconnect();
-
- //Nack makes the message be dropped.
- MessageConsumer consumer = session.createConsumer(queue);
- Message message = consumer.receive(1000);
- Assert.assertNull(message);
- }
-
- public void testNackWithWrongSubId() throws Exception
- {
- connV11.connect(defUser, defPass);
-
- subscribe(connV11, "sub1", "client");
-
- sendMessage(getName());
-
- ClientStompFrame frame = connV11.receiveFrame();
-
- String messageID = frame.getHeader("message-id");
-
- System.out.println("Received message with id " + messageID);
-
- nack(connV11, "sub2", messageID);
-
- ClientStompFrame error = connV11.receiveFrame();
-
- System.out.println("Receiver error: " + error);
-
- unsubscribe(connV11, "sub1");
-
- connV11.disconnect();
-
- //message should be still there
- MessageConsumer consumer = session.createConsumer(queue);
- Message message = consumer.receive(1000);
- Assert.assertNotNull(message);
- }
-
- public void testNackWithWrongMessageId() throws Exception
- {
- connV11.connect(defUser, defPass);
-
- subscribe(connV11, "sub1", "client");
-
- sendMessage(getName());
-
- ClientStompFrame frame = connV11.receiveFrame();
-
- String messageID = frame.getHeader("message-id");
-
- System.out.println("Received message with id " + messageID);
-
- nack(connV11, "sub2", "someother");
-
- ClientStompFrame error = connV11.receiveFrame();
-
- System.out.println("Receiver error: " + error);
-
- unsubscribe(connV11, "sub1");
-
- connV11.disconnect();
-
- //message should still there
- MessageConsumer consumer = session.createConsumer(queue);
- Message message = consumer.receive(1000);
- Assert.assertNotNull(message);
- }
-
-
- public void testAck() throws Exception
- {
- connV11.connect(defUser, defPass);
-
- subscribe(connV11, "sub1", "client");
-
- sendMessage(getName());
-
- ClientStompFrame frame = connV11.receiveFrame();
-
- String messageID = frame.getHeader("message-id");
-
- System.out.println("Received message with id " + messageID);
-
- ack(connV11, "sub1", messageID, null);
-
- unsubscribe(connV11, "sub1");
-
- connV11.disconnect();
-
- //Nack makes the message be dropped.
- MessageConsumer consumer = session.createConsumer(queue);
- Message message = consumer.receive(1000);
- Assert.assertNull(message);
- }
-
- public void testAckWithWrongSubId() throws Exception
- {
- connV11.connect(defUser, defPass);
-
- subscribe(connV11, "sub1", "client");
-
- sendMessage(getName());
-
- ClientStompFrame frame = connV11.receiveFrame();
-
- String messageID = frame.getHeader("message-id");
-
- System.out.println("Received message with id " + messageID);
-
- ack(connV11, "sub2", messageID, null);
-
- ClientStompFrame error = connV11.receiveFrame();
-
- System.out.println("Receiver error: " + error);
-
- unsubscribe(connV11, "sub1");
-
- connV11.disconnect();
-
- //message should be still there
- MessageConsumer consumer = session.createConsumer(queue);
- Message message = consumer.receive(1000);
- Assert.assertNotNull(message);
- }
-
- public void testAckWithWrongMessageId() throws Exception
- {
- connV11.connect(defUser, defPass);
-
- subscribe(connV11, "sub1", "client");
-
- sendMessage(getName());
-
- ClientStompFrame frame = connV11.receiveFrame();
-
- String messageID = frame.getHeader("message-id");
-
- System.out.println("Received message with id " + messageID);
-
- ack(connV11, "sub2", "someother", null);
-
- ClientStompFrame error = connV11.receiveFrame();
-
- System.out.println("Receiver error: " + error);
-
- unsubscribe(connV11, "sub1");
-
- connV11.disconnect();
-
- //message should still there
- MessageConsumer consumer = session.createConsumer(queue);
- Message message = consumer.receive(1000);
- Assert.assertNotNull(message);
- }
-
- public void testErrorWithReceipt() throws Exception
- {
- connV11.connect(defUser, defPass);
-
- subscribe(connV11, "sub1", "client");
-
- sendMessage(getName());
-
- ClientStompFrame frame = connV11.receiveFrame();
-
- String messageID = frame.getHeader("message-id");
-
- System.out.println("Received message with id " + messageID);
-
- ClientStompFrame ackFrame = connV11.createFrame("ACK");
- //give it a wrong sub id
- ackFrame.addHeader("subscription", "sub2");
- ackFrame.addHeader("message-id", messageID);
- ackFrame.addHeader("receipt", "answer-me");
-
- ClientStompFrame error = connV11.sendFrame(ackFrame);
-
- System.out.println("Receiver error: " + error);
-
- assertEquals("ERROR", error.getCommand());
-
- assertEquals("answer-me", error.getHeader("receipt-id"));
-
- unsubscribe(connV11, "sub1");
-
- connV11.disconnect();
-
- //message should still there
- MessageConsumer consumer = session.createConsumer(queue);
- Message message = consumer.receive(1000);
- Assert.assertNotNull(message);
- }
-
- public void testErrorWithReceipt2() throws Exception
- {
- connV11.connect(defUser, defPass);
-
- subscribe(connV11, "sub1", "client");
-
- sendMessage(getName());
-
- ClientStompFrame frame = connV11.receiveFrame();
-
- String messageID = frame.getHeader("message-id");
-
- System.out.println("Received message with id " + messageID);
-
- ClientStompFrame ackFrame = connV11.createFrame("ACK");
- //give it a wrong sub id
- ackFrame.addHeader("subscription", "sub1");
- ackFrame.addHeader("message-id", String.valueOf(Long.valueOf(messageID) +
1));
- ackFrame.addHeader("receipt", "answer-me");
-
- ClientStompFrame error = connV11.sendFrame(ackFrame);
-
- System.out.println("Receiver error: " + error);
-
- assertEquals("ERROR", error.getCommand());
-
- assertEquals("answer-me", error.getHeader("receipt-id"));
-
- unsubscribe(connV11, "sub1");
-
- connV11.disconnect();
-
- //message should still there
- MessageConsumer consumer = session.createConsumer(queue);
- Message message = consumer.receive(1000);
- Assert.assertNotNull(message);
- }
-
- public void testAckModeClient() throws Exception
- {
- connV11.connect(defUser, defPass);
-
- subscribe(connV11, "sub1", "client");
-
- int num = 50;
- //send a bunch of messages
- for (int i = 0; i < num; i++)
- {
- this.sendMessage("client-ack" + i);
- }
-
- ClientStompFrame frame = null;
-
- for (int i = 0; i < num; i++)
- {
- frame = connV11.receiveFrame();
- assertNotNull(frame);
- }
-
- //ack the last
- this.ack(connV11, "sub1", frame);
-
- unsubscribe(connV11, "sub1");
-
- connV11.disconnect();
-
- //no messages can be received.
- MessageConsumer consumer = session.createConsumer(queue);
- Message message = consumer.receive(1000);
- Assert.assertNull(message);
- }
-
- public void testAckModeClient2() throws Exception
- {
- connV11.connect(defUser, defPass);
-
- subscribe(connV11, "sub1", "client");
-
- int num = 50;
- //send a bunch of messages
- for (int i = 0; i < num; i++)
- {
- this.sendMessage("client-ack" + i);
- }
-
- ClientStompFrame frame = null;
-
- for (int i = 0; i < num; i++)
- {
- frame = connV11.receiveFrame();
- assertNotNull(frame);
-
- //ack the 49th
- if (i == num - 2)
- {
- this.ack(connV11, "sub1", frame);
- }
- }
-
- unsubscribe(connV11, "sub1");
-
- connV11.disconnect();
-
- //no messages can be received.
- MessageConsumer consumer = session.createConsumer(queue);
- Message message = consumer.receive(1000);
- Assert.assertNotNull(message);
- message = consumer.receive(1000);
- Assert.assertNull(message);
- }
-
- public void testAckModeAuto() throws Exception
- {
- connV11.connect(defUser, defPass);
-
- subscribe(connV11, "sub1", "auto");
-
- int num = 50;
- //send a bunch of messages
- for (int i = 0; i < num; i++)
- {
- this.sendMessage("auto-ack" + i);
- }
-
- ClientStompFrame frame = null;
-
- for (int i = 0; i < num; i++)
- {
- frame = connV11.receiveFrame();
- assertNotNull(frame);
- }
-
- unsubscribe(connV11, "sub1");
-
- connV11.disconnect();
-
- //no messages can be received.
- MessageConsumer consumer = session.createConsumer(queue);
- Message message = consumer.receive(1000);
- Assert.assertNull(message);
- }
-
- public void testAckModeClientIndividual() throws Exception
- {
- connV11.connect(defUser, defPass);
-
- subscribe(connV11, "sub1", "client-individual");
-
- int num = 50;
- //send a bunch of messages
- for (int i = 0; i < num; i++)
- {
- this.sendMessage("client-individual-ack" + i);
- }
-
- ClientStompFrame frame = null;
-
- for (int i = 0; i < num; i++)
- {
- frame = connV11.receiveFrame();
- assertNotNull(frame);
-
- System.out.println(i + " == received: " + frame);
- //ack on even numbers
- if (i%2 == 0)
- {
- this.ack(connV11, "sub1", frame);
- }
- }
-
- unsubscribe(connV11, "sub1");
-
- connV11.disconnect();
-
- //no messages can be received.
- MessageConsumer consumer = session.createConsumer(queue);
-
- TextMessage message = null;
- for (int i = 0; i < num/2; i++)
- {
- message = (TextMessage) consumer.receive(1000);
- Assert.assertNotNull(message);
- System.out.println("Legal: " + message.getText());
- }
-
- message = (TextMessage) consumer.receive(1000);
-
- Assert.assertNull(message);
- }
-
- public void testTwoSubscribers() throws Exception
- {
- connV11.connect(defUser, defPass, "myclientid");
-
- this.subscribeTopic(connV11, "sub1", "auto", null);
-
- StompClientConnection newConn =
StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
- newConn.connect(defUser, defPass, "myclientid2");
-
- this.subscribeTopic(newConn, "sub2", "auto", null);
-
- ClientStompFrame frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getTopicPrefix() + getTopicName());
-
- frame.setBody("Hello World");
-
- connV11.sendFrame(frame);
-
- // receive message from socket
- frame = connV11.receiveFrame(1000);
-
- System.out.println("received frame : " + frame);
- assertEquals("Hello World", frame.getBody());
- assertEquals("sub1", frame.getHeader("subscription"));
-
- frame = newConn.receiveFrame(1000);
-
- System.out.println("received 2 frame : " + frame);
- assertEquals("Hello World", frame.getBody());
- assertEquals("sub2", frame.getHeader("subscription"));
-
- // remove suscription
- this.unsubscribe(connV11, "sub1", true);
- this.unsubscribe(newConn, "sub2", true);
-
- connV11.disconnect();
- newConn.disconnect();
- }
-
- public void testSendAndReceiveOnDifferentConnections() throws Exception
- {
- connV11.connect(defUser, defPass);
-
- ClientStompFrame sendFrame = connV11.createFrame("SEND");
- sendFrame.addHeader("destination", getQueuePrefix() + getQueueName());
- sendFrame.setBody("Hello World");
-
- connV11.sendFrame(sendFrame);
-
- StompClientConnection connV11_2 =
StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
- connV11_2.connect(defUser, defPass);
-
- this.subscribe(connV11_2, "sub1", "auto");
-
- ClientStompFrame frame = connV11_2.receiveFrame(2000);
-
- assertEquals("MESSAGE", frame.getCommand());
- assertEquals("Hello World", frame.getBody());
-
- connV11.disconnect();
- connV11_2.disconnect();
- }
-
- //----------------Note: tests below are adapted from StompTest
-
- public void testBeginSameTransactionTwice() throws Exception
- {
- connV11.connect(defUser, defPass);
-
- beginTransaction(connV11, "tx1");
-
- beginTransaction(connV11, "tx1");
-
- ClientStompFrame f = connV11.receiveFrame();
- Assert.assertTrue(f.getCommand().equals("ERROR"));
- }
-
- public void testBodyWithUTF8() throws Exception
- {
- connV11.connect(defUser, defPass);
-
- this.subscribe(connV11, getName(), "auto");
-
- String text = "A" + "\u00ea" + "\u00f1" +
"\u00fc" + "C";
- System.out.println(text);
- sendMessage(text);
-
- ClientStompFrame frame = connV11.receiveFrame();
- System.out.println(frame);
- Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
- Assert.assertNotNull(frame.getHeader("destination"));
- Assert.assertTrue(frame.getBody().equals(text));
-
- connV11.disconnect();
- }
-
- public void testClientAckNotPartOfTransaction() throws Exception
- {
- connV11.connect(defUser, defPass);
-
- this.subscribe(connV11, getName(), "client");
-
- sendMessage(getName());
-
- ClientStompFrame frame = connV11.receiveFrame();
-
- Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
- Assert.assertNotNull(frame.getHeader("destination"));
- Assert.assertTrue(frame.getBody().equals(getName()));
- Assert.assertNotNull(frame.getHeader("message-id"));
-
- String messageID = frame.getHeader("message-id");
-
- beginTransaction(connV11, "tx1");
-
- this.ack(connV11, getName(), messageID, "tx1");
-
- abortTransaction(connV11, "tx1");
-
- frame = connV11.receiveFrame();
-
- assertNull(frame);
-
- this.unsubscribe(connV11, getName());
-
- connV11.disconnect();
- }
-
- public void testDisconnectAndError() throws Exception
- {
- connV11.connect(defUser, defPass);
-
- this.subscribe(connV11, getName(), "client");
-
- ClientStompFrame frame = connV11.createFrame("DISCONNECT");
- frame.addHeader("receipt", "1");
-
- ClientStompFrame result = connV11.sendFrame(frame);
-
- if (result == null || (!"RECEIPT".equals(result.getCommand())) ||
(!"1".equals(result.getHeader("receipt-id"))))
- {
- fail("Disconnect failed! " + result);
- }
-
- // sending a message will result in an error
- ClientStompFrame sendFrame = connV11.createFrame("SEND");
- sendFrame.addHeader("destination", getQueuePrefix() + getQueueName());
- sendFrame.setBody("Hello World");
-
- try
- {
- connV11.sendFrame(sendFrame);
- fail("connection should have been closed by server.");
- }
- catch (ClosedChannelException e)
- {
- //ok.
- }
-
- connV11.destroy();
- }
-
- public void testDurableSubscriber() throws Exception
- {
- connV11.connect(defUser, defPass);
-
- this.subscribe(connV11, "sub1", "client", getName());
-
- this.subscribe(connV11, "sub1", "client", getName());
-
- ClientStompFrame frame = connV11.receiveFrame();
- Assert.assertTrue(frame.getCommand().equals("ERROR"));
-
- connV11.disconnect();
- }
-
- public void testDurableSubscriberWithReconnection() throws Exception
- {
- connV11.connect(defUser, defPass, "myclientid");
-
- this.subscribeTopic(connV11, "sub1", "auto", getName());
-
- ClientStompFrame frame = connV11.createFrame("DISCONNECT");
- frame.addHeader("receipt", "1");
-
- ClientStompFrame result = connV11.sendFrame(frame);
-
- if (result == null || (!"RECEIPT".equals(result.getCommand())) ||
(!"1".equals(result.getHeader("receipt-id"))))
- {
- fail("Disconnect failed! " + result);
- }
-
- // send the message when the durable subscriber is disconnected
- sendMessage(getName(), topic);
-
- connV11.destroy();
- connV11 = StompClientConnectionFactory.createClientConnection("1.1",
hostname, port);
- connV11.connect(defUser, defPass, "myclientid");
-
- this.subscribeTopic(connV11, "sub1", "auto", getName());
-
- // we must have received the message
- frame = connV11.receiveFrame();
-
- Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
- Assert.assertNotNull(frame.getHeader("destination"));
- Assert.assertEquals(getName(), frame.getBody());
-
- this.unsubscribe(connV11, "sub1");
-
- connV11.disconnect();
- }
-
- public void testJMSXGroupIdCanBeSet() throws Exception
- {
- MessageConsumer consumer = session.createConsumer(queue);
-
- connV11.connect(defUser, defPass);
-
- ClientStompFrame frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("JMSXGroupID", "TEST");
- frame.setBody("Hello World");
-
- connV11.sendFrame(frame);
-
- TextMessage message = (TextMessage)consumer.receive(1000);
- Assert.assertNotNull(message);
- Assert.assertEquals("Hello World", message.getText());
- // differ from StompConnect
- Assert.assertEquals("TEST",
message.getStringProperty("JMSXGroupID"));
- }
-
- public void testMessagesAreInOrder() throws Exception
- {
- int ctr = 10;
- String[] data = new String[ctr];
-
- connV11.connect(defUser, defPass);
-
- this.subscribe(connV11, "sub1", "auto");
-
- for (int i = 0; i < ctr; ++i)
- {
- data[i] = getName() + i;
- sendMessage(data[i]);
- }
-
- ClientStompFrame frame = null;
-
- for (int i = 0; i < ctr; ++i)
- {
- frame = connV11.receiveFrame();
- Assert.assertTrue("Message not in order",
frame.getBody().equals(data[i]));
- }
-
- for (int i = 0; i < ctr; ++i)
- {
- data[i] = getName() + ":second:" + i;
- sendMessage(data[i]);
- }
-
- for (int i = 0; i < ctr; ++i)
- {
- frame = connV11.receiveFrame();
- Assert.assertTrue("Message not in order",
frame.getBody().equals(data[i]));
- }
-
- connV11.disconnect();
- }
-
- public void testSubscribeWithAutoAckAndSelector() throws Exception
- {
- connV11.connect(defUser, defPass);
-
- this.subscribe(connV11, "sub1", "auto", null, "foo =
'zzz'");
-
- sendMessage("Ignored message", "foo", "1234");
- sendMessage("Real message", "foo", "zzz");
-
- ClientStompFrame frame = connV11.receiveFrame();
-
- Assert.assertTrue("Should have received the real message but got: " +
frame, frame.getBody().equals("Real message"));
-
- connV11.disconnect();
- }
-
- public void testRedeliveryWithClientAck() throws Exception
- {
- connV11.connect(defUser, defPass);
-
- this.subscribe(connV11, "subId", "client");
-
- sendMessage(getName());
-
- ClientStompFrame frame = connV11.receiveFrame();
-
- assertTrue(frame.getCommand().equals("MESSAGE"));
-
- connV11.disconnect();
-
- // message should be received since message was not acknowledged
- MessageConsumer consumer = session.createConsumer(queue);
- Message message = consumer.receive(1000);
- Assert.assertNotNull(message);
- Assert.assertTrue(message.getJMSRedelivered());
- }
-
- public void testSendManyMessages() throws Exception
- {
- MessageConsumer consumer = session.createConsumer(queue);
-
- connV11.connect(defUser, defPass);
-
- int count = 1000;
- final CountDownLatch latch = new CountDownLatch(count);
- consumer.setMessageListener(new MessageListener()
- {
- public void onMessage(Message arg0)
- {
- latch.countDown();
- }
- });
-
- ClientStompFrame frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.setBody("Hello World");
-
- for (int i = 1; i <= count; i++)
- {
- connV11.sendFrame(frame);
- }
-
- assertTrue(latch.await(60, TimeUnit.SECONDS));
-
- connV11.disconnect();
- }
-
- public void testSendMessage() throws Exception
- {
- MessageConsumer consumer = session.createConsumer(queue);
-
- connV11.connect(defUser, defPass);
-
- ClientStompFrame frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.setBody("Hello World");
-
- connV11.sendFrame(frame);
-
- TextMessage message = (TextMessage)consumer.receive(1000);
- Assert.assertNotNull(message);
- Assert.assertEquals("Hello World", message.getText());
- // Assert default priority 4 is used when priority header is not set
- Assert.assertEquals("getJMSPriority", 4, message.getJMSPriority());
-
- // Make sure that the timestamp is valid - should
- // be very close to the current time.
- long tnow = System.currentTimeMillis();
- long tmsg = message.getJMSTimestamp();
- Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
- }
-
- public void testSendMessageWithContentLength() throws Exception
- {
- MessageConsumer consumer = session.createConsumer(queue);
-
- connV11.connect(defUser, defPass);
-
- byte[] data = new byte[] { 1, 0, 0, 4 };
-
- ClientStompFrame frame = connV11.createFrame("SEND");
-
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.setBody(new String(data, "UTF-8"));
-
- frame.addHeader("content-length", String.valueOf(data.length));
-
- connV11.sendFrame(frame);
-
- BytesMessage message = (BytesMessage)consumer.receive(10000);
- Assert.assertNotNull(message);
-
- assertEquals(data.length, message.getBodyLength());
- assertEquals(data[0], message.readByte());
- assertEquals(data[1], message.readByte());
- assertEquals(data[2], message.readByte());
- assertEquals(data[3], message.readByte());
- }
-
- public void testSendMessageWithCustomHeadersAndSelector() throws Exception
- {
- MessageConsumer consumer = session.createConsumer(queue, "foo =
'abc'");
-
- connV11.connect(defUser, defPass);
-
- ClientStompFrame frame = connV11.createFrame("SEND");
- frame.addHeader("foo", "abc");
- frame.addHeader("bar", "123");
-
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.setBody("Hello World");
-
- connV11.sendFrame(frame);
-
- TextMessage message = (TextMessage)consumer.receive(1000);
- Assert.assertNotNull(message);
- Assert.assertEquals("Hello World", message.getText());
- Assert.assertEquals("foo", "abc",
message.getStringProperty("foo"));
- Assert.assertEquals("bar", "123",
message.getStringProperty("bar"));
- }
-
- public void testSendMessageWithLeadingNewLine() throws Exception
- {
- MessageConsumer consumer = session.createConsumer(queue);
-
- connV11.connect(defUser, defPass);
-
- ClientStompFrame frame = connV11.createFrame("SEND");
-
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.setBody("Hello World");
-
- connV11.sendWickedFrame(frame);
-
- TextMessage message = (TextMessage)consumer.receive(1000);
- Assert.assertNotNull(message);
- Assert.assertEquals("Hello World", message.getText());
-
- // Make sure that the timestamp is valid - should
- // be very close to the current time.
- long tnow = System.currentTimeMillis();
- long tmsg = message.getJMSTimestamp();
- Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
-
- assertNull(consumer.receive(1000));
-
- connV11.disconnect();
- }
-
- public void testSendMessageWithReceipt() throws Exception
- {
- MessageConsumer consumer = session.createConsumer(queue);
-
- connV11.connect(defUser, defPass);
-
- ClientStompFrame frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("receipt", "1234");
- frame.setBody("Hello World");
-
- frame = connV11.sendFrame(frame);
-
- assertTrue(frame.getCommand().equals("RECEIPT"));
- assertEquals("1234", frame.getHeader("receipt-id"));
-
- TextMessage message = (TextMessage)consumer.receive(1000);
- Assert.assertNotNull(message);
- Assert.assertEquals("Hello World", message.getText());
-
- // Make sure that the timestamp is valid - should
- // be very close to the current time.
- long tnow = System.currentTimeMillis();
- long tmsg = message.getJMSTimestamp();
- Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
-
- connV11.disconnect();
- }
-
- public void testSendMessageWithStandardHeaders() throws Exception
- {
- MessageConsumer consumer = session.createConsumer(queue);
-
- connV11.connect(defUser, defPass);
-
- ClientStompFrame frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("correlation-id", "c123");
- frame.addHeader("persistent", "true");
- frame.addHeader("priority", "3");
- frame.addHeader("type", "t345");
- frame.addHeader("JMSXGroupID", "abc");
- frame.addHeader("foo", "abc");
- frame.addHeader("bar", "123");
-
- frame.setBody("Hello World");
-
- frame = connV11.sendFrame(frame);
-
- TextMessage message = (TextMessage)consumer.receive(1000);
- Assert.assertNotNull(message);
- Assert.assertEquals("Hello World", message.getText());
- Assert.assertEquals("JMSCorrelationID", "c123",
message.getJMSCorrelationID());
- Assert.assertEquals("getJMSType", "t345",
message.getJMSType());
- Assert.assertEquals("getJMSPriority", 3, message.getJMSPriority());
- Assert.assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode());
- Assert.assertEquals("foo", "abc",
message.getStringProperty("foo"));
- Assert.assertEquals("bar", "123",
message.getStringProperty("bar"));
-
- Assert.assertEquals("JMSXGroupID", "abc",
message.getStringProperty("JMSXGroupID"));
-
- connV11.disconnect();
- }
-
- public void testSubscribeToTopic() throws Exception
- {
- connV11.connect(defUser, defPass);
-
- this.subscribeTopic(connV11, "sub1", null, null, true);
-
- sendMessage(getName(), topic);
-
- ClientStompFrame frame = connV11.receiveFrame();
-
- Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
- Assert.assertTrue(frame.getHeader("destination").equals(getTopicPrefix()
+ getTopicName()));
- Assert.assertTrue(frame.getBody().equals(getName()));
-
- this.unsubscribe(connV11, "sub1", true);
-
- sendMessage(getName(), topic);
-
- frame = connV11.receiveFrame(1000);
- assertNull(frame);
-
- connV11.disconnect();
- }
-
- public void testSubscribeToTopicWithNoLocal() throws Exception
- {
- connV11.connect(defUser, defPass);
-
- this.subscribeTopic(connV11, "sub1", null, null, true, true);
-
- // send a message on the same connection => it should not be received
- ClientStompFrame frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getTopicPrefix() + getTopicName());
-
- frame.setBody("Hello World");
-
- connV11.sendFrame(frame);
-
- frame = connV11.receiveFrame(2000);
-
- assertNull(frame);
-
- // send message on another JMS connection => it should be received
- sendMessage(getName(), topic);
-
- frame = connV11.receiveFrame();
-
- Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
- Assert.assertTrue(frame.getHeader("destination").equals(getTopicPrefix()
+ getTopicName()));
- Assert.assertTrue(frame.getBody().equals(getName()));
-
- this.unsubscribe(connV11, "sub1");
-
- connV11.disconnect();
- }
-
- public void testSubscribeWithAutoAck() throws Exception
- {
- connV11.connect(defUser, defPass);
-
- this.subscribe(connV11, "sub1", "auto");
-
- sendMessage(getName());
-
- ClientStompFrame frame = connV11.receiveFrame();
-
- Assert.assertEquals("MESSAGE", frame.getCommand());
- Assert.assertNotNull(frame.getHeader("destination"));
- Assert.assertEquals(getName(), frame.getBody());
-
- connV11.disconnect();
-
- // message should not be received as it was auto-acked
- MessageConsumer consumer = session.createConsumer(queue);
- Message message = consumer.receive(1000);
- Assert.assertNull(message);
- }
-
- public void testSubscribeWithAutoAckAndBytesMessage() throws Exception
- {
- connV11.connect(defUser, defPass);
-
- this.subscribe(connV11, "sub1", "auto");
-
- byte[] payload = new byte[] { 1, 2, 3, 4, 5 };
- sendMessage(payload, queue);
-
- ClientStompFrame frame = connV11.receiveFrame();
-
- assertEquals("MESSAGE", frame.getCommand());
-
- System.out.println("Message: " + frame);
-
- assertEquals("5", frame.getHeader("content-length"));
-
- assertEquals(null, frame.getHeader("type"));
-
- assertEquals(frame.getBody(), new String(payload, "UTF-8"));
-
- connV11.disconnect();
- }
-
- public void testSubscribeWithClientAck() throws Exception
- {
- connV11.connect(defUser, defPass);
-
- this.subscribe(connV11, "sub1", "client");
-
- sendMessage(getName());
-
- ClientStompFrame frame = connV11.receiveFrame();
-
- this.ack(connV11, "sub1", frame);
-
- connV11.disconnect();
-
- // message should not be received since message was acknowledged by the client
- MessageConsumer consumer = session.createConsumer(queue);
- Message message = consumer.receive(1000);
- Assert.assertNull(message);
- }
-
- public void
testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithExplicitDisconnect() throws
Exception
- {
- assertSubscribeWithClientAckThenConsumeWithAutoAck(true);
- }
-
- public void
testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithNoDisconnectFrame() throws
Exception
- {
- assertSubscribeWithClientAckThenConsumeWithAutoAck(false);
- }
-
- public void testSubscribeWithID() throws Exception
- {
- connV11.connect(defUser, defPass);
-
- this.subscribe(connV11, "mysubid", "auto");
-
- sendMessage(getName());
-
- ClientStompFrame frame = connV11.receiveFrame();
-
- Assert.assertTrue(frame.getHeader("subscription") != null);
-
- connV11.disconnect();
- }
-
- public void testSubscribeWithMessageSentWithProperties() throws Exception
- {
- connV11.connect(defUser, defPass);
-
- this.subscribe(connV11, "sub1", "auto");
-
- MessageProducer producer = session.createProducer(queue);
- BytesMessage message = session.createBytesMessage();
- message.setStringProperty("S", "value");
- message.setBooleanProperty("n", false);
- message.setByteProperty("byte", (byte)9);
- message.setDoubleProperty("d", 2.0);
- message.setFloatProperty("f", (float)6.0);
- message.setIntProperty("i", 10);
- message.setLongProperty("l", 121);
- message.setShortProperty("s", (short)12);
- message.writeBytes("Hello World".getBytes("UTF-8"));
- producer.send(message);
-
- ClientStompFrame frame = connV11.receiveFrame();
- Assert.assertNotNull(frame);
-
- Assert.assertTrue(frame.getHeader("S") != null);
- Assert.assertTrue(frame.getHeader("n") != null);
- Assert.assertTrue(frame.getHeader("byte") != null);
- Assert.assertTrue(frame.getHeader("d") != null);
- Assert.assertTrue(frame.getHeader("f") != null);
- Assert.assertTrue(frame.getHeader("i") != null);
- Assert.assertTrue(frame.getHeader("l") != null);
- Assert.assertTrue(frame.getHeader("s") != null);
- Assert.assertEquals("Hello World", frame.getBody());
-
- connV11.disconnect();
- }
-
- public void testSuccessiveTransactionsWithSameID() throws Exception
- {
- MessageConsumer consumer = session.createConsumer(queue);
-
- connV11.connect(defUser, defPass);
-
- // first tx
- this.beginTransaction(connV11, "tx1");
-
- ClientStompFrame frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("transaction", "tx1");
- frame.setBody("Hello World");
-
- connV11.sendFrame(frame);
-
- this.commitTransaction(connV11, "tx1");
-
- Message message = consumer.receive(1000);
- Assert.assertNotNull("Should have received a message", message);
-
- // 2nd tx with same tx ID
- this.beginTransaction(connV11, "tx1");
-
- frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("transaction", "tx1");
-
- frame.setBody("Hello World");
-
- connV11.sendFrame(frame);
-
- this.commitTransaction(connV11, "tx1");
-
- message = consumer.receive(1000);
- Assert.assertNotNull("Should have received a message", message);
-
- connV11.disconnect();
- }
-
- public void testTransactionCommit() throws Exception
- {
- MessageConsumer consumer = session.createConsumer(queue);
-
- connV11.connect(defUser, defPass);
-
- this.beginTransaction(connV11, "tx1");
-
- ClientStompFrame frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("transaction", "tx1");
- frame.addHeader("receipt", "123");
- frame.setBody("Hello World");
-
- frame = connV11.sendFrame(frame);
-
- assertEquals("123", frame.getHeader("receipt-id"));
-
- // check the message is not committed
- assertNull(consumer.receive(100));
-
- this.commitTransaction(connV11, "tx1", true);
-
- Message message = consumer.receive(1000);
- Assert.assertNotNull("Should have received a message", message);
-
- connV11.disconnect();
- }
-
- public void testTransactionRollback() throws Exception
- {
- MessageConsumer consumer = session.createConsumer(queue);
-
- connV11.connect(defUser, defPass);
-
- this.beginTransaction(connV11, "tx1");
-
- ClientStompFrame frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("transaction", "tx1");
-
- frame.setBody("first message");
-
- connV11.sendFrame(frame);
-
- // rollback first message
- this.abortTransaction(connV11, "tx1");
-
- this.beginTransaction(connV11, "tx1");
-
- frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("transaction", "tx1");
-
- frame.setBody("second message");
-
- connV11.sendFrame(frame);
-
- this.commitTransaction(connV11, "tx1", true);
-
- // only second msg should be received since first msg was rolled back
- TextMessage message = (TextMessage)consumer.receive(1000);
- Assert.assertNotNull(message);
- Assert.assertEquals("second message", message.getText());
-
- connV11.disconnect();
- }
-
- public void testUnsubscribe() throws Exception
- {
- connV11.connect(defUser, defPass);
-
- this.subscribe(connV11, "sub1", "auto");
-
- // send a message to our queue
- sendMessage("first message");
-
- // receive message from socket
- ClientStompFrame frame = connV11.receiveFrame();
-
- Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
-
- // remove suscription
- this.unsubscribe(connV11, "sub1", true);
-
- // send a message to our queue
- sendMessage("second message");
-
- frame = connV11.receiveFrame(1000);
- assertNull(frame);
-
- connV11.disconnect();
- }
-
- //-----------------private help methods
-
- private void abortTransaction(StompClientConnection conn, String txID) throws
IOException, InterruptedException
- {
- ClientStompFrame abortFrame = conn.createFrame("ABORT");
- abortFrame.addHeader("transaction", txID);
-
- conn.sendFrame(abortFrame);
- }
-
- private void beginTransaction(StompClientConnection conn, String txID) throws
IOException, InterruptedException
- {
- ClientStompFrame beginFrame = conn.createFrame("BEGIN");
- beginFrame.addHeader("transaction", txID);
-
- conn.sendFrame(beginFrame);
- }
-
- private void commitTransaction(StompClientConnection conn, String txID) throws
IOException, InterruptedException
- {
- commitTransaction(conn, txID, false);
- }
-
- private void commitTransaction(StompClientConnection conn, String txID, boolean
receipt) throws IOException, InterruptedException
- {
- ClientStompFrame beginFrame = conn.createFrame("COMMIT");
- beginFrame.addHeader("transaction", txID);
- if (receipt)
- {
- beginFrame.addHeader("receipt", "1234");
- }
- ClientStompFrame resp = conn.sendFrame(beginFrame);
- if (receipt)
- {
- assertEquals("1234", resp.getHeader("receipt-id"));
- }
- }
-
- private void ack(StompClientConnection conn, String subId,
- ClientStompFrame frame) throws IOException, InterruptedException
- {
- String messageID = frame.getHeader("message-id");
-
- ClientStompFrame ackFrame = conn.createFrame("ACK");
-
- ackFrame.addHeader("subscription", subId);
- ackFrame.addHeader("message-id", messageID);
-
- ClientStompFrame response = conn.sendFrame(ackFrame);
- if (response != null)
- {
- throw new IOException("failed to ack " + response);
- }
- }
-
- private void ack(StompClientConnection conn, String subId, String mid, String txID)
throws IOException, InterruptedException
- {
- ClientStompFrame ackFrame = conn.createFrame("ACK");
- ackFrame.addHeader("subscription", subId);
- ackFrame.addHeader("message-id", mid);
- if (txID != null)
- {
- ackFrame.addHeader("transaction", txID);
- }
-
- conn.sendFrame(ackFrame);
- }
-
- private void nack(StompClientConnection conn, String subId, String mid) throws
IOException, InterruptedException
- {
- ClientStompFrame ackFrame = conn.createFrame("NACK");
- ackFrame.addHeader("subscription", subId);
- ackFrame.addHeader("message-id", mid);
-
- conn.sendFrame(ackFrame);
- }
-
- private void subscribe(StompClientConnection conn, String subId, String ack) throws
IOException, InterruptedException
- {
- subscribe(conn, subId, ack, null, null);
- }
-
- private void subscribe(StompClientConnection conn, String subId,
- String ack, String durableId) throws IOException, InterruptedException
- {
- subscribe(conn, subId, ack, durableId, null);
- }
-
- private void subscribe(StompClientConnection conn, String subId,
- String ack, String durableId, boolean receipt) throws IOException,
InterruptedException
- {
- subscribe(conn, subId, ack, durableId, null, receipt);
- }
-
- private void subscribe(StompClientConnection conn, String subId, String ack,
- String durableId, String selector) throws IOException,
- InterruptedException
- {
- subscribe(conn, subId, ack, durableId, selector, false);
- }
-
- private void subscribe(StompClientConnection conn, String subId,
- String ack, String durableId, String selector, boolean receipt) throws
IOException, InterruptedException
- {
- ClientStompFrame subFrame = conn.createFrame("SUBSCRIBE");
- subFrame.addHeader("id", subId);
- subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
- if (ack != null)
- {
- subFrame.addHeader("ack", ack);
- }
- if (durableId != null)
- {
- subFrame.addHeader("durable-subscriber-name", durableId);
- }
- if (selector != null)
- {
- subFrame.addHeader("selector", selector);
- }
- if (receipt)
- {
- subFrame.addHeader("receipt", "1234");
- }
-
- subFrame = conn.sendFrame(subFrame);
-
- if (receipt)
- {
- assertEquals("1234", subFrame.getHeader("receipt-id"));
- }
- }
-
- private void subscribeTopic(StompClientConnection conn, String subId,
- String ack, String durableId) throws IOException, InterruptedException
- {
- subscribeTopic(conn, subId, ack, durableId, false);
- }
-
- private void subscribeTopic(StompClientConnection conn, String subId,
- String ack, String durableId, boolean receipt) throws IOException,
InterruptedException
- {
- subscribeTopic(conn, subId, ack, durableId, receipt, false);
- }
-
- private void subscribeTopic(StompClientConnection conn, String subId,
- String ack, String durableId, boolean receipt, boolean noLocal) throws
IOException, InterruptedException
- {
- ClientStompFrame subFrame = conn.createFrame("SUBSCRIBE");
- subFrame.addHeader("id", subId);
- subFrame.addHeader("destination", getTopicPrefix() + getTopicName());
- if (ack != null)
- {
- subFrame.addHeader("ack", ack);
- }
- if (durableId != null)
- {
- subFrame.addHeader("durable-subscriber-name", durableId);
- }
- if (receipt)
- {
- subFrame.addHeader("receipt", "1234");
- }
- if (noLocal)
- {
- subFrame.addHeader("no-local", "true");
- }
-
- ClientStompFrame frame = conn.sendFrame(subFrame);
-
- if (receipt)
- {
- assertTrue(frame.getHeader("receipt-id").equals("1234"));
- }
- }
-
- private void unsubscribe(StompClientConnection conn, String subId) throws IOException,
InterruptedException
- {
- ClientStompFrame subFrame = conn.createFrame("UNSUBSCRIBE");
- subFrame.addHeader("id", subId);
-
- conn.sendFrame(subFrame);
- }
-
- private void unsubscribe(StompClientConnection conn, String subId,
- boolean receipt) throws IOException, InterruptedException
- {
- ClientStompFrame subFrame = conn.createFrame("UNSUBSCRIBE");
- subFrame.addHeader("id", subId);
-
- if (receipt)
- {
- subFrame.addHeader("receipt", "4321");
- }
-
- ClientStompFrame f = conn.sendFrame(subFrame);
-
- if (receipt)
- {
- System.out.println("response: " + f);
- assertEquals("RECEIPT", f.getCommand());
- assertEquals("4321", f.getHeader("receipt-id"));
- }
- }
-
- protected void assertSubscribeWithClientAckThenConsumeWithAutoAck(boolean
sendDisconnect) throws Exception
- {
- connV11.connect(defUser, defPass);
-
- this.subscribe(connV11, "sub1", "client");
-
- sendMessage(getName());
-
- ClientStompFrame frame = connV11.receiveFrame();
-
- Assert.assertEquals("MESSAGE", frame.getCommand());
-
- log.info("Reconnecting!");
-
- if (sendDisconnect)
- {
- connV11.disconnect();
- connV11 = StompClientConnectionFactory.createClientConnection("1.1",
hostname, port);
- }
- else
- {
- connV11.destroy();
- connV11 = StompClientConnectionFactory.createClientConnection("1.1",
hostname, port);
- }
-
- // message should be received since message was not acknowledged
- connV11.connect(defUser, defPass);
-
- this.subscribe(connV11, "sub1", null);
-
- frame = connV11.receiveFrame();
- Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
-
- connV11.disconnect();
-
- // now lets make sure we don't see the message again
- connV11.destroy();
- connV11 = StompClientConnectionFactory.createClientConnection("1.1",
hostname, port);
- connV11.connect(defUser, defPass);
-
- this.subscribe(connV11, "sub1", null, null, true);
-
- sendMessage("shouldBeNextMessage");
-
- frame = connV11.receiveFrame();
- Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
- Assert.assertEquals("shouldBeNextMessage", frame.getBody());
- }
-
-}
-
-
-
-
-
Copied:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompV11Test.java
(from rev 11873,
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java)
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompV11Test.java
(rev 0)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompV11Test.java 2011-12-08
03:05:11 UTC (rev 11874)
@@ -0,0 +1,2221 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.hornetq.tests.integration.stomp.v11;
+
+import java.io.IOException;
+import java.nio.channels.ClosedChannelException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.BytesMessage;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.TextMessage;
+
+import junit.framework.Assert;
+
+import org.hornetq.core.logging.Logger;
+import org.hornetq.tests.integration.stomp.util.ClientStompFrame;
+import org.hornetq.tests.integration.stomp.util.StompClientConnection;
+import org.hornetq.tests.integration.stomp.util.StompClientConnectionFactory;
+import org.hornetq.tests.integration.stomp.util.StompClientConnectionV11;
+
+/*
+ *
+ */
+public class StompV11Test extends StompTestBase2
+{
+ private static final transient Logger log = Logger.getLogger(StompV11Test.class);
+
+ private StompClientConnection connV11;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ connV11 = StompClientConnectionFactory.createClientConnection("1.1",
hostname, port);
+ }
+
+ protected void tearDown() throws Exception
+ {
+ System.out.println("Connection 11 : " + connV11.isConnected());
+ if (connV11.isConnected())
+ {
+ connV11.disconnect();
+ }
+ super.tearDown();
+ }
+
+ public void testConnection() throws Exception
+ {
+ StompClientConnection connection =
StompClientConnectionFactory.createClientConnection("1.0", hostname, port);
+
+ connection.connect(defUser, defPass);
+
+ assertTrue(connection.isConnected());
+
+ assertEquals("1.0", connection.getVersion());
+
+ connection.disconnect();
+
+ connection = StompClientConnectionFactory.createClientConnection("1.1",
hostname, port);
+
+ connection.connect(defUser, defPass);
+
+ assertTrue(connection.isConnected());
+
+ assertEquals("1.1", connection.getVersion());
+
+ connection.disconnect();
+
+ connection = StompClientConnectionFactory.createClientConnection("1.1",
hostname, port);
+
+ connection.connect();
+
+ assertFalse(connection.isConnected());
+
+ //new way of connection
+ StompClientConnectionV11 conn = (StompClientConnectionV11)
StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ conn.connect1(defUser, defPass);
+
+ assertTrue(conn.isConnected());
+
+ conn.disconnect();
+ }
+
+ public void testNegotiation() throws Exception
+ {
+ // case 1 accept-version absent. It is a 1.0 connect
+ ClientStompFrame frame = connV11.createFrame("CONNECT");
+ frame.addHeader("host", "127.0.0.1");
+ frame.addHeader("login", this.defUser);
+ frame.addHeader("passcode", this.defPass);
+
+ ClientStompFrame reply = connV11.sendFrame(frame);
+
+ assertEquals("CONNECTED", reply.getCommand());
+
+ //reply headers: version, session, server
+ assertEquals(null, reply.getHeader("version"));
+
+ connV11.disconnect();
+
+ // case 2 accept-version=1.0, result: 1.0
+ connV11 = StompClientConnectionFactory.createClientConnection("1.1",
hostname, port);
+ frame = connV11.createFrame("CONNECT");
+ frame.addHeader("accept-version", "1.0");
+ frame.addHeader("host", "127.0.0.1");
+ frame.addHeader("login", this.defUser);
+ frame.addHeader("passcode", this.defPass);
+
+ reply = connV11.sendFrame(frame);
+
+ assertEquals("CONNECTED", reply.getCommand());
+
+ //reply headers: version, session, server
+ assertEquals("1.0", reply.getHeader("version"));
+
+ connV11.disconnect();
+
+ // case 3 accept-version=1.1, result: 1.1
+ connV11 = StompClientConnectionFactory.createClientConnection("1.1",
hostname, port);
+ frame = connV11.createFrame("CONNECT");
+ frame.addHeader("accept-version", "1.1");
+ frame.addHeader("host", "127.0.0.1");
+ frame.addHeader("login", this.defUser);
+ frame.addHeader("passcode", this.defPass);
+
+ reply = connV11.sendFrame(frame);
+
+ assertEquals("CONNECTED", reply.getCommand());
+
+ //reply headers: version, session, server
+ assertEquals("1.1", reply.getHeader("version"));
+
+ connV11.disconnect();
+
+ // case 4 accept-version=1.0,1.1,1.2, result 1.1
+ connV11 = StompClientConnectionFactory.createClientConnection("1.1",
hostname, port);
+ frame = connV11.createFrame("CONNECT");
+ frame.addHeader("accept-version", "1.0,1.1,1.2");
+ frame.addHeader("host", "127.0.0.1");
+ frame.addHeader("login", this.defUser);
+ frame.addHeader("passcode", this.defPass);
+
+ reply = connV11.sendFrame(frame);
+
+ assertEquals("CONNECTED", reply.getCommand());
+
+ //reply headers: version, session, server
+ assertEquals("1.1", reply.getHeader("version"));
+
+ connV11.disconnect();
+
+ // case 5 accept-version=1.2, result error
+ connV11 = StompClientConnectionFactory.createClientConnection("1.1",
hostname, port);
+ frame = connV11.createFrame("CONNECT");
+ frame.addHeader("accept-version", "1.2");
+ frame.addHeader("host", "127.0.0.1");
+ frame.addHeader("login", this.defUser);
+ frame.addHeader("passcode", this.defPass);
+
+ reply = connV11.sendFrame(frame);
+
+ assertEquals("ERROR", reply.getCommand());
+
+ System.out.println("Got error frame " + reply);
+
+ }
+
+ public void testSendAndReceive() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+ ClientStompFrame frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("content-type", "text/plain");
+ frame.setBody("Hello World 1!");
+
+ ClientStompFrame response = connV11.sendFrame(frame);
+
+ assertNull(response);
+
+ frame.addHeader("receipt", "1234");
+ frame.setBody("Hello World 2!");
+
+ response = connV11.sendFrame(frame);
+
+ assertNotNull(response);
+
+ assertEquals("RECEIPT", response.getCommand());
+
+ assertEquals("1234", response.getHeader("receipt-id"));
+
+ //subscribe
+ StompClientConnection newConn =
StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ newConn.connect(defUser, defPass);
+
+ ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
+ subFrame.addHeader("id", "a-sub");
+ subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+ subFrame.addHeader("ack", "auto");
+
+ newConn.sendFrame(subFrame);
+
+ frame = newConn.receiveFrame();
+
+ System.out.println("received " + frame);
+
+ assertEquals("MESSAGE", frame.getCommand());
+
+ assertEquals("a-sub", frame.getHeader("subscription"));
+
+ assertNotNull(frame.getHeader("message-id"));
+
+ assertEquals(getQueuePrefix() + getQueueName(),
frame.getHeader("destination"));
+
+ assertEquals("Hello World 1!", frame.getBody());
+
+ frame = newConn.receiveFrame();
+
+ System.out.println("received " + frame);
+
+ //unsub
+ ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
+ unsubFrame.addHeader("id", "a-sub");
+ newConn.sendFrame(unsubFrame);
+
+ newConn.disconnect();
+ }
+
+ public void testHeaderContentType() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+ ClientStompFrame frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("content-type", "application/xml");
+ frame.setBody("Hello World 1!");
+
+ connV11.sendFrame(frame);
+
+ //subscribe
+ StompClientConnection newConn =
StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ newConn.connect(defUser, defPass);
+
+ ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
+ subFrame.addHeader("id", "a-sub");
+ subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+ subFrame.addHeader("ack", "auto");
+
+ newConn.sendFrame(subFrame);
+
+ frame = newConn.receiveFrame();
+
+ System.out.println("received " + frame);
+
+ assertEquals("MESSAGE", frame.getCommand());
+
+ assertEquals("application/xml",
frame.getHeader("content-type"));
+
+ //unsub
+ ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
+ unsubFrame.addHeader("id", "a-sub");
+
+ newConn.disconnect();
+ }
+
+ public void testHeaderContentLength() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+ ClientStompFrame frame = connV11.createFrame("SEND");
+
+ String body = "Hello World 1!";
+ String cLen = String.valueOf(body.getBytes("UTF-8").length);
+
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("content-type", "application/xml");
+ frame.addHeader("content-length", cLen);
+ frame.setBody(body + "extra");
+
+ connV11.sendFrame(frame);
+
+ //subscribe
+ StompClientConnection newConn =
StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ newConn.connect(defUser, defPass);
+
+ ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
+ subFrame.addHeader("id", "a-sub");
+ subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+ subFrame.addHeader("ack", "auto");
+
+ newConn.sendFrame(subFrame);
+
+ frame = newConn.receiveFrame();
+
+ System.out.println("received " + frame);
+
+ assertEquals("MESSAGE", frame.getCommand());
+
+ assertEquals(cLen, frame.getHeader("content-length"));
+
+ //unsub
+ ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
+ unsubFrame.addHeader("id", "a-sub");
+
+ newConn.disconnect();
+ }
+
+ public void testHeaderEncoding() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+ ClientStompFrame frame = connV11.createFrame("SEND");
+
+ String body = "Hello World 1!";
+ String cLen = String.valueOf(body.getBytes("UTF-8").length);
+
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("content-type", "application/xml");
+ frame.addHeader("content-length", cLen);
+ String hKey = "special-header\\\\\\n\\:";
+ String hVal = "\\:\\\\\\ngood";
+ frame.addHeader(hKey, hVal);
+
+ System.out.println("key: |" + hKey + "| val: |" + hVal);
+
+ frame.setBody(body);
+
+ connV11.sendFrame(frame);
+
+ //subscribe
+ StompClientConnection newConn =
StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ newConn.connect(defUser, defPass);
+
+ ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
+ subFrame.addHeader("id", "a-sub");
+ subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+ subFrame.addHeader("ack", "auto");
+
+ newConn.sendFrame(subFrame);
+
+ frame = newConn.receiveFrame();
+
+ System.out.println("received " + frame);
+
+ assertEquals("MESSAGE", frame.getCommand());
+
+ String value = frame.getHeader("special-header" + "\\" +
"\n" + ":");
+
+ assertEquals(":" + "\\" + "\n" + "good",
value);
+
+ //unsub
+ ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
+ unsubFrame.addHeader("id", "a-sub");
+
+ newConn.disconnect();
+ }
+
+ public void testHeartBeat() throws Exception
+ {
+ //no heart beat at all if heat-beat absent
+ ClientStompFrame frame = connV11.createFrame("CONNECT");
+ frame.addHeader("host", "127.0.0.1");
+ frame.addHeader("login", this.defUser);
+ frame.addHeader("passcode", this.defPass);
+
+ ClientStompFrame reply = connV11.sendFrame(frame);
+
+ assertEquals("CONNECTED", reply.getCommand());
+
+ Thread.sleep(5000);
+
+ assertEquals(0, connV11.getFrameQueueSize());
+
+ connV11.disconnect();
+
+ //no heart beat for (0,0)
+ connV11 = StompClientConnectionFactory.createClientConnection("1.1",
hostname, port);
+ frame = connV11.createFrame("CONNECT");
+ frame.addHeader("host", "127.0.0.1");
+ frame.addHeader("login", this.defUser);
+ frame.addHeader("passcode", this.defPass);
+ frame.addHeader("heart-beat", "0,0");
+ frame.addHeader("accept-version", "1.0,1.1");
+
+ reply = connV11.sendFrame(frame);
+
+ assertEquals("CONNECTED", reply.getCommand());
+
+ assertEquals("0,0", reply.getHeader("heart-beat"));
+
+ Thread.sleep(5000);
+
+ assertEquals(0, connV11.getFrameQueueSize());
+
+ connV11.disconnect();
+
+ //heart-beat (1,0), should receive a min client ping accepted by server
+ connV11 = StompClientConnectionFactory.createClientConnection("1.1",
hostname, port);
+ frame = connV11.createFrame("CONNECT");
+ frame.addHeader("host", "127.0.0.1");
+ frame.addHeader("login", this.defUser);
+ frame.addHeader("passcode", this.defPass);
+ frame.addHeader("heart-beat", "1,0");
+ frame.addHeader("accept-version", "1.0,1.1");
+
+ reply = connV11.sendFrame(frame);
+
+ assertEquals("CONNECTED", reply.getCommand());
+
+ assertEquals("0,500", reply.getHeader("heart-beat"));
+
+ Thread.sleep(2000);
+
+ //now server side should be disconnected because we didn't send ping for 2 sec
+ frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("content-type", "text/plain");
+ frame.setBody("Hello World");
+
+ //send will fail
+ try
+ {
+ connV11.sendFrame(frame);
+ fail("connection should have been destroyed by now");
+ }
+ catch (IOException e)
+ {
+ //ignore
+ }
+
+ //heart-beat (1,0), start a ping, then send a message, should be ok.
+ connV11 = StompClientConnectionFactory.createClientConnection("1.1",
hostname, port);
+ frame = connV11.createFrame("CONNECT");
+ frame.addHeader("host", "127.0.0.1");
+ frame.addHeader("login", this.defUser);
+ frame.addHeader("passcode", this.defPass);
+ frame.addHeader("heart-beat", "1,0");
+ frame.addHeader("accept-version", "1.0,1.1");
+
+ reply = connV11.sendFrame(frame);
+
+ assertEquals("CONNECTED", reply.getCommand());
+
+ assertEquals("0,500", reply.getHeader("heart-beat"));
+
+ System.out.println("========== start pinger!");
+
+ connV11.startPinger(500);
+
+ Thread.sleep(2000);
+
+ //now server side should be disconnected because we didn't send ping for 2 sec
+ frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("content-type", "text/plain");
+ frame.setBody("Hello World");
+
+ //send will be ok
+ connV11.sendFrame(frame);
+
+ connV11.stopPinger();
+
+ connV11.disconnect();
+
+ }
+
+ //server ping
+ public void testHeartBeat2() throws Exception
+ {
+ //heart-beat (1,1)
+ ClientStompFrame frame = connV11.createFrame("CONNECT");
+ frame.addHeader("host", "127.0.0.1");
+ frame.addHeader("login", this.defUser);
+ frame.addHeader("passcode", this.defPass);
+ frame.addHeader("heart-beat", "1,1");
+ frame.addHeader("accept-version", "1.0,1.1");
+
+ ClientStompFrame reply = connV11.sendFrame(frame);
+
+ assertEquals("CONNECTED", reply.getCommand());
+ assertEquals("500,500", reply.getHeader("heart-beat"));
+
+ connV11.disconnect();
+
+ //heart-beat (500,1000)
+ connV11 = StompClientConnectionFactory.createClientConnection("1.1",
hostname, port);
+ frame = connV11.createFrame("CONNECT");
+ frame.addHeader("host", "127.0.0.1");
+ frame.addHeader("login", this.defUser);
+ frame.addHeader("passcode", this.defPass);
+ frame.addHeader("heart-beat", "500,1000");
+ frame.addHeader("accept-version", "1.0,1.1");
+
+ reply = connV11.sendFrame(frame);
+
+ assertEquals("CONNECTED", reply.getCommand());
+
+ assertEquals("1000,500", reply.getHeader("heart-beat"));
+
+ System.out.println("========== start pinger!");
+
+ connV11.startPinger(500);
+
+ Thread.sleep(10000);
+
+ //now check the frame size
+ int size = connV11.getServerPingNumber();
+
+ System.out.println("ping received: " + size);
+
+ assertTrue(size > 5);
+
+ //now server side should be disconnected because we didn't send ping for 2 sec
+ frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("content-type", "text/plain");
+ frame.setBody("Hello World");
+
+ //send will be ok
+ connV11.sendFrame(frame);
+
+ connV11.disconnect();
+ }
+
+ public void testSendWithHeartBeatsAndReceive() throws Exception
+ {
+ StompClientConnection newConn = null;
+ try
+ {
+ ClientStompFrame frame = connV11.createFrame("CONNECT");
+ frame.addHeader("host", "127.0.0.1");
+ frame.addHeader("login", this.defUser);
+ frame.addHeader("passcode", this.defPass);
+ frame.addHeader("heart-beat", "500,1000");
+ frame.addHeader("accept-version", "1.0,1.1");
+
+ connV11.sendFrame(frame);
+
+ connV11.startPinger(500);
+
+ frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("content-type", "text/plain");
+
+ for (int i = 0; i < 10; i++)
+ {
+ frame.setBody("Hello World " + i + "!");
+ connV11.sendFrame(frame);
+ Thread.sleep(500);
+ }
+
+ // subscribe
+ newConn = StompClientConnectionFactory.createClientConnection("1.1",
+ hostname, port);
+ newConn.connect(defUser, defPass);
+
+ ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
+ subFrame.addHeader("id", "a-sub");
+ subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+ subFrame.addHeader("ack", "auto");
+
+ newConn.sendFrame(subFrame);
+
+ int cnt = 0;
+
+ frame = newConn.receiveFrame();
+
+ while (frame != null)
+ {
+ cnt++;
+ Thread.sleep(500);
+ frame = newConn.receiveFrame(5000);
+ }
+
+ assertEquals(10, cnt);
+
+ // unsub
+ ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
+ unsubFrame.addHeader("id", "a-sub");
+ newConn.sendFrame(unsubFrame);
+ }
+ finally
+ {
+ if (newConn != null)
+ newConn.disconnect();
+ connV11.disconnect();
+ }
+ }
+
+ public void testSendAndReceiveWithHeartBeats() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+ ClientStompFrame frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("content-type", "text/plain");
+
+ for (int i = 0; i < 10; i++)
+ {
+ frame.setBody("Hello World " + i + "!");
+ connV11.sendFrame(frame);
+ Thread.sleep(500);
+ }
+
+ //subscribe
+ StompClientConnection newConn =
StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ try
+ {
+ frame = newConn.createFrame("CONNECT");
+ frame.addHeader("host", "127.0.0.1");
+ frame.addHeader("login", this.defUser);
+ frame.addHeader("passcode", this.defPass);
+ frame.addHeader("heart-beat", "500,1000");
+ frame.addHeader("accept-version", "1.0,1.1");
+
+ newConn.sendFrame(frame);
+
+ newConn.startPinger(500);
+
+ Thread.sleep(500);
+
+ ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
+ subFrame.addHeader("id", "a-sub");
+ subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+ subFrame.addHeader("ack", "auto");
+
+ newConn.sendFrame(subFrame);
+
+ int cnt = 0;
+
+ frame = newConn.receiveFrame();
+
+ while (frame != null)
+ {
+ cnt++;
+ Thread.sleep(500);
+ frame = newConn.receiveFrame(5000);
+ }
+
+ assertEquals(10, cnt);
+
+ // unsub
+ ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
+ unsubFrame.addHeader("id", "a-sub");
+ newConn.sendFrame(unsubFrame);
+ }
+ finally
+ {
+ newConn.disconnect();
+ }
+ }
+
+ public void testSendWithHeartBeatsAndReceiveWithHeartBeats() throws Exception
+ {
+ StompClientConnection newConn = null;
+ try
+ {
+ ClientStompFrame frame = connV11.createFrame("CONNECT");
+ frame.addHeader("host", "127.0.0.1");
+ frame.addHeader("login", this.defUser);
+ frame.addHeader("passcode", this.defPass);
+ frame.addHeader("heart-beat", "500,1000");
+ frame.addHeader("accept-version", "1.0,1.1");
+
+ connV11.sendFrame(frame);
+
+ connV11.startPinger(500);
+
+ frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("content-type", "text/plain");
+
+ for (int i = 0; i < 10; i++)
+ {
+ frame.setBody("Hello World " + i + "!");
+ connV11.sendFrame(frame);
+ Thread.sleep(500);
+ }
+
+ // subscribe
+ newConn = StompClientConnectionFactory.createClientConnection("1.1",
+ hostname, port);
+ frame = newConn.createFrame("CONNECT");
+ frame.addHeader("host", "127.0.0.1");
+ frame.addHeader("login", this.defUser);
+ frame.addHeader("passcode", this.defPass);
+ frame.addHeader("heart-beat", "500,1000");
+ frame.addHeader("accept-version", "1.0,1.1");
+
+ newConn.sendFrame(frame);
+
+ newConn.startPinger(500);
+
+ Thread.sleep(500);
+
+ ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
+ subFrame.addHeader("id", "a-sub");
+ subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+ subFrame.addHeader("ack", "auto");
+
+ newConn.sendFrame(subFrame);
+
+ int cnt = 0;
+
+ frame = newConn.receiveFrame();
+
+ while (frame != null)
+ {
+ cnt++;
+ Thread.sleep(500);
+ frame = newConn.receiveFrame(5000);
+ }
+ assertEquals(10, cnt);
+
+ // unsub
+ ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
+ unsubFrame.addHeader("id", "a-sub");
+ newConn.sendFrame(unsubFrame);
+ }
+ finally
+ {
+ if (newConn != null)
+ newConn.disconnect();
+ connV11.disconnect();
+ }
+ }
+
+ public void testNack() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ subscribe(connV11, "sub1", "client");
+
+ sendMessage(getName());
+
+ ClientStompFrame frame = connV11.receiveFrame();
+
+ String messageID = frame.getHeader("message-id");
+
+ System.out.println("Received message with id " + messageID);
+
+ nack(connV11, "sub1", messageID);
+
+ unsubscribe(connV11, "sub1");
+
+ connV11.disconnect();
+
+ //Nack makes the message be dropped.
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message message = consumer.receive(1000);
+ Assert.assertNull(message);
+ }
+
+ public void testNackWithWrongSubId() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ subscribe(connV11, "sub1", "client");
+
+ sendMessage(getName());
+
+ ClientStompFrame frame = connV11.receiveFrame();
+
+ String messageID = frame.getHeader("message-id");
+
+ System.out.println("Received message with id " + messageID);
+
+ nack(connV11, "sub2", messageID);
+
+ ClientStompFrame error = connV11.receiveFrame();
+
+ System.out.println("Receiver error: " + error);
+
+ unsubscribe(connV11, "sub1");
+
+ connV11.disconnect();
+
+ //message should be still there
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message message = consumer.receive(1000);
+ Assert.assertNotNull(message);
+ }
+
+ public void testNackWithWrongMessageId() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ subscribe(connV11, "sub1", "client");
+
+ sendMessage(getName());
+
+ ClientStompFrame frame = connV11.receiveFrame();
+
+ String messageID = frame.getHeader("message-id");
+
+ System.out.println("Received message with id " + messageID);
+
+ nack(connV11, "sub2", "someother");
+
+ ClientStompFrame error = connV11.receiveFrame();
+
+ System.out.println("Receiver error: " + error);
+
+ unsubscribe(connV11, "sub1");
+
+ connV11.disconnect();
+
+ //message should still there
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message message = consumer.receive(1000);
+ Assert.assertNotNull(message);
+ }
+
+
+ public void testAck() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ subscribe(connV11, "sub1", "client");
+
+ sendMessage(getName());
+
+ ClientStompFrame frame = connV11.receiveFrame();
+
+ String messageID = frame.getHeader("message-id");
+
+ System.out.println("Received message with id " + messageID);
+
+ ack(connV11, "sub1", messageID, null);
+
+ unsubscribe(connV11, "sub1");
+
+ connV11.disconnect();
+
+ //Nack makes the message be dropped.
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message message = consumer.receive(1000);
+ Assert.assertNull(message);
+ }
+
+ public void testAckWithWrongSubId() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ subscribe(connV11, "sub1", "client");
+
+ sendMessage(getName());
+
+ ClientStompFrame frame = connV11.receiveFrame();
+
+ String messageID = frame.getHeader("message-id");
+
+ System.out.println("Received message with id " + messageID);
+
+ ack(connV11, "sub2", messageID, null);
+
+ ClientStompFrame error = connV11.receiveFrame();
+
+ System.out.println("Receiver error: " + error);
+
+ unsubscribe(connV11, "sub1");
+
+ connV11.disconnect();
+
+ //message should be still there
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message message = consumer.receive(1000);
+ Assert.assertNotNull(message);
+ }
+
+ public void testAckWithWrongMessageId() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ subscribe(connV11, "sub1", "client");
+
+ sendMessage(getName());
+
+ ClientStompFrame frame = connV11.receiveFrame();
+
+ String messageID = frame.getHeader("message-id");
+
+ System.out.println("Received message with id " + messageID);
+
+ ack(connV11, "sub2", "someother", null);
+
+ ClientStompFrame error = connV11.receiveFrame();
+
+ System.out.println("Receiver error: " + error);
+
+ unsubscribe(connV11, "sub1");
+
+ connV11.disconnect();
+
+ //message should still there
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message message = consumer.receive(1000);
+ Assert.assertNotNull(message);
+ }
+
+ public void testErrorWithReceipt() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ subscribe(connV11, "sub1", "client");
+
+ sendMessage(getName());
+
+ ClientStompFrame frame = connV11.receiveFrame();
+
+ String messageID = frame.getHeader("message-id");
+
+ System.out.println("Received message with id " + messageID);
+
+ ClientStompFrame ackFrame = connV11.createFrame("ACK");
+ //give it a wrong sub id
+ ackFrame.addHeader("subscription", "sub2");
+ ackFrame.addHeader("message-id", messageID);
+ ackFrame.addHeader("receipt", "answer-me");
+
+ ClientStompFrame error = connV11.sendFrame(ackFrame);
+
+ System.out.println("Receiver error: " + error);
+
+ assertEquals("ERROR", error.getCommand());
+
+ assertEquals("answer-me", error.getHeader("receipt-id"));
+
+ unsubscribe(connV11, "sub1");
+
+ connV11.disconnect();
+
+ //message should still there
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message message = consumer.receive(1000);
+ Assert.assertNotNull(message);
+ }
+
+ public void testErrorWithReceipt2() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ subscribe(connV11, "sub1", "client");
+
+ sendMessage(getName());
+
+ ClientStompFrame frame = connV11.receiveFrame();
+
+ String messageID = frame.getHeader("message-id");
+
+ System.out.println("Received message with id " + messageID);
+
+ ClientStompFrame ackFrame = connV11.createFrame("ACK");
+ //give it a wrong sub id
+ ackFrame.addHeader("subscription", "sub1");
+ ackFrame.addHeader("message-id", String.valueOf(Long.valueOf(messageID) +
1));
+ ackFrame.addHeader("receipt", "answer-me");
+
+ ClientStompFrame error = connV11.sendFrame(ackFrame);
+
+ System.out.println("Receiver error: " + error);
+
+ assertEquals("ERROR", error.getCommand());
+
+ assertEquals("answer-me", error.getHeader("receipt-id"));
+
+ unsubscribe(connV11, "sub1");
+
+ connV11.disconnect();
+
+ //message should still there
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message message = consumer.receive(1000);
+ Assert.assertNotNull(message);
+ }
+
+ public void testAckModeClient() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ subscribe(connV11, "sub1", "client");
+
+ int num = 50;
+ //send a bunch of messages
+ for (int i = 0; i < num; i++)
+ {
+ this.sendMessage("client-ack" + i);
+ }
+
+ ClientStompFrame frame = null;
+
+ for (int i = 0; i < num; i++)
+ {
+ frame = connV11.receiveFrame();
+ assertNotNull(frame);
+ }
+
+ //ack the last
+ this.ack(connV11, "sub1", frame);
+
+ unsubscribe(connV11, "sub1");
+
+ connV11.disconnect();
+
+ //no messages can be received.
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message message = consumer.receive(1000);
+ Assert.assertNull(message);
+ }
+
+ public void testAckModeClient2() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ subscribe(connV11, "sub1", "client");
+
+ int num = 50;
+ //send a bunch of messages
+ for (int i = 0; i < num; i++)
+ {
+ this.sendMessage("client-ack" + i);
+ }
+
+ ClientStompFrame frame = null;
+
+ for (int i = 0; i < num; i++)
+ {
+ frame = connV11.receiveFrame();
+ assertNotNull(frame);
+
+ //ack the 49th
+ if (i == num - 2)
+ {
+ this.ack(connV11, "sub1", frame);
+ }
+ }
+
+ unsubscribe(connV11, "sub1");
+
+ connV11.disconnect();
+
+ //no messages can be received.
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message message = consumer.receive(1000);
+ Assert.assertNotNull(message);
+ message = consumer.receive(1000);
+ Assert.assertNull(message);
+ }
+
+ public void testAckModeAuto() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ subscribe(connV11, "sub1", "auto");
+
+ int num = 50;
+ //send a bunch of messages
+ for (int i = 0; i < num; i++)
+ {
+ this.sendMessage("auto-ack" + i);
+ }
+
+ ClientStompFrame frame = null;
+
+ for (int i = 0; i < num; i++)
+ {
+ frame = connV11.receiveFrame();
+ assertNotNull(frame);
+ }
+
+ unsubscribe(connV11, "sub1");
+
+ connV11.disconnect();
+
+ //no messages can be received.
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message message = consumer.receive(1000);
+ Assert.assertNull(message);
+ }
+
+ public void testAckModeClientIndividual() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ subscribe(connV11, "sub1", "client-individual");
+
+ int num = 50;
+ //send a bunch of messages
+ for (int i = 0; i < num; i++)
+ {
+ this.sendMessage("client-individual-ack" + i);
+ }
+
+ ClientStompFrame frame = null;
+
+ for (int i = 0; i < num; i++)
+ {
+ frame = connV11.receiveFrame();
+ assertNotNull(frame);
+
+ System.out.println(i + " == received: " + frame);
+ //ack on even numbers
+ if (i%2 == 0)
+ {
+ this.ack(connV11, "sub1", frame);
+ }
+ }
+
+ unsubscribe(connV11, "sub1");
+
+ connV11.disconnect();
+
+ //no messages can be received.
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ TextMessage message = null;
+ for (int i = 0; i < num/2; i++)
+ {
+ message = (TextMessage) consumer.receive(1000);
+ Assert.assertNotNull(message);
+ System.out.println("Legal: " + message.getText());
+ }
+
+ message = (TextMessage) consumer.receive(1000);
+
+ Assert.assertNull(message);
+ }
+
+ public void testTwoSubscribers() throws Exception
+ {
+ connV11.connect(defUser, defPass, "myclientid");
+
+ this.subscribeTopic(connV11, "sub1", "auto", null);
+
+ StompClientConnection newConn =
StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ newConn.connect(defUser, defPass, "myclientid2");
+
+ this.subscribeTopic(newConn, "sub2", "auto", null);
+
+ ClientStompFrame frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getTopicPrefix() + getTopicName());
+
+ frame.setBody("Hello World");
+
+ connV11.sendFrame(frame);
+
+ // receive message from socket
+ frame = connV11.receiveFrame(1000);
+
+ System.out.println("received frame : " + frame);
+ assertEquals("Hello World", frame.getBody());
+ assertEquals("sub1", frame.getHeader("subscription"));
+
+ frame = newConn.receiveFrame(1000);
+
+ System.out.println("received 2 frame : " + frame);
+ assertEquals("Hello World", frame.getBody());
+ assertEquals("sub2", frame.getHeader("subscription"));
+
+ // remove suscription
+ this.unsubscribe(connV11, "sub1", true);
+ this.unsubscribe(newConn, "sub2", true);
+
+ connV11.disconnect();
+ newConn.disconnect();
+ }
+
+ public void testSendAndReceiveOnDifferentConnections() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ ClientStompFrame sendFrame = connV11.createFrame("SEND");
+ sendFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+ sendFrame.setBody("Hello World");
+
+ connV11.sendFrame(sendFrame);
+
+ StompClientConnection connV11_2 =
StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ connV11_2.connect(defUser, defPass);
+
+ this.subscribe(connV11_2, "sub1", "auto");
+
+ ClientStompFrame frame = connV11_2.receiveFrame(2000);
+
+ assertEquals("MESSAGE", frame.getCommand());
+ assertEquals("Hello World", frame.getBody());
+
+ connV11.disconnect();
+ connV11_2.disconnect();
+ }
+
+ //----------------Note: tests below are adapted from StompTest
+
+ public void testBeginSameTransactionTwice() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ beginTransaction(connV11, "tx1");
+
+ beginTransaction(connV11, "tx1");
+
+ ClientStompFrame f = connV11.receiveFrame();
+ Assert.assertTrue(f.getCommand().equals("ERROR"));
+ }
+
+ public void testBodyWithUTF8() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ this.subscribe(connV11, getName(), "auto");
+
+ String text = "A" + "\u00ea" + "\u00f1" +
"\u00fc" + "C";
+ System.out.println(text);
+ sendMessage(text);
+
+ ClientStompFrame frame = connV11.receiveFrame();
+ System.out.println(frame);
+ Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
+ Assert.assertNotNull(frame.getHeader("destination"));
+ Assert.assertTrue(frame.getBody().equals(text));
+
+ connV11.disconnect();
+ }
+
+ public void testClientAckNotPartOfTransaction() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ this.subscribe(connV11, getName(), "client");
+
+ sendMessage(getName());
+
+ ClientStompFrame frame = connV11.receiveFrame();
+
+ Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
+ Assert.assertNotNull(frame.getHeader("destination"));
+ Assert.assertTrue(frame.getBody().equals(getName()));
+ Assert.assertNotNull(frame.getHeader("message-id"));
+
+ String messageID = frame.getHeader("message-id");
+
+ beginTransaction(connV11, "tx1");
+
+ this.ack(connV11, getName(), messageID, "tx1");
+
+ abortTransaction(connV11, "tx1");
+
+ frame = connV11.receiveFrame();
+
+ assertNull(frame);
+
+ this.unsubscribe(connV11, getName());
+
+ connV11.disconnect();
+ }
+
+ public void testDisconnectAndError() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ this.subscribe(connV11, getName(), "client");
+
+ ClientStompFrame frame = connV11.createFrame("DISCONNECT");
+ frame.addHeader("receipt", "1");
+
+ ClientStompFrame result = connV11.sendFrame(frame);
+
+ if (result == null || (!"RECEIPT".equals(result.getCommand())) ||
(!"1".equals(result.getHeader("receipt-id"))))
+ {
+ fail("Disconnect failed! " + result);
+ }
+
+ // sending a message will result in an error
+ ClientStompFrame sendFrame = connV11.createFrame("SEND");
+ sendFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+ sendFrame.setBody("Hello World");
+
+ try
+ {
+ connV11.sendFrame(sendFrame);
+ fail("connection should have been closed by server.");
+ }
+ catch (ClosedChannelException e)
+ {
+ //ok.
+ }
+
+ connV11.destroy();
+ }
+
+ public void testDurableSubscriber() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ this.subscribe(connV11, "sub1", "client", getName());
+
+ this.subscribe(connV11, "sub1", "client", getName());
+
+ ClientStompFrame frame = connV11.receiveFrame();
+ Assert.assertTrue(frame.getCommand().equals("ERROR"));
+
+ connV11.disconnect();
+ }
+
+ public void testDurableSubscriberWithReconnection() throws Exception
+ {
+ connV11.connect(defUser, defPass, "myclientid");
+
+ this.subscribeTopic(connV11, "sub1", "auto", getName());
+
+ ClientStompFrame frame = connV11.createFrame("DISCONNECT");
+ frame.addHeader("receipt", "1");
+
+ ClientStompFrame result = connV11.sendFrame(frame);
+
+ if (result == null || (!"RECEIPT".equals(result.getCommand())) ||
(!"1".equals(result.getHeader("receipt-id"))))
+ {
+ fail("Disconnect failed! " + result);
+ }
+
+ // send the message when the durable subscriber is disconnected
+ sendMessage(getName(), topic);
+
+ connV11.destroy();
+ connV11 = StompClientConnectionFactory.createClientConnection("1.1",
hostname, port);
+ connV11.connect(defUser, defPass, "myclientid");
+
+ this.subscribeTopic(connV11, "sub1", "auto", getName());
+
+ // we must have received the message
+ frame = connV11.receiveFrame();
+
+ Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
+ Assert.assertNotNull(frame.getHeader("destination"));
+ Assert.assertEquals(getName(), frame.getBody());
+
+ this.unsubscribe(connV11, "sub1");
+
+ connV11.disconnect();
+ }
+
+ public void testJMSXGroupIdCanBeSet() throws Exception
+ {
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ connV11.connect(defUser, defPass);
+
+ ClientStompFrame frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("JMSXGroupID", "TEST");
+ frame.setBody("Hello World");
+
+ connV11.sendFrame(frame);
+
+ TextMessage message = (TextMessage)consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("Hello World", message.getText());
+ // differ from StompConnect
+ Assert.assertEquals("TEST",
message.getStringProperty("JMSXGroupID"));
+ }
+
+ public void testMessagesAreInOrder() throws Exception
+ {
+ int ctr = 10;
+ String[] data = new String[ctr];
+
+ connV11.connect(defUser, defPass);
+
+ this.subscribe(connV11, "sub1", "auto");
+
+ for (int i = 0; i < ctr; ++i)
+ {
+ data[i] = getName() + i;
+ sendMessage(data[i]);
+ }
+
+ ClientStompFrame frame = null;
+
+ for (int i = 0; i < ctr; ++i)
+ {
+ frame = connV11.receiveFrame();
+ Assert.assertTrue("Message not in order",
frame.getBody().equals(data[i]));
+ }
+
+ for (int i = 0; i < ctr; ++i)
+ {
+ data[i] = getName() + ":second:" + i;
+ sendMessage(data[i]);
+ }
+
+ for (int i = 0; i < ctr; ++i)
+ {
+ frame = connV11.receiveFrame();
+ Assert.assertTrue("Message not in order",
frame.getBody().equals(data[i]));
+ }
+
+ connV11.disconnect();
+ }
+
+ public void testSubscribeWithAutoAckAndSelector() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ this.subscribe(connV11, "sub1", "auto", null, "foo =
'zzz'");
+
+ sendMessage("Ignored message", "foo", "1234");
+ sendMessage("Real message", "foo", "zzz");
+
+ ClientStompFrame frame = connV11.receiveFrame();
+
+ Assert.assertTrue("Should have received the real message but got: " +
frame, frame.getBody().equals("Real message"));
+
+ connV11.disconnect();
+ }
+
+ public void testRedeliveryWithClientAck() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ this.subscribe(connV11, "subId", "client");
+
+ sendMessage(getName());
+
+ ClientStompFrame frame = connV11.receiveFrame();
+
+ assertTrue(frame.getCommand().equals("MESSAGE"));
+
+ connV11.disconnect();
+
+ // message should be received since message was not acknowledged
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message message = consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertTrue(message.getJMSRedelivered());
+ }
+
+ public void testSendManyMessages() throws Exception
+ {
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ connV11.connect(defUser, defPass);
+
+ int count = 1000;
+ final CountDownLatch latch = new CountDownLatch(count);
+ consumer.setMessageListener(new MessageListener()
+ {
+ public void onMessage(Message arg0)
+ {
+ latch.countDown();
+ }
+ });
+
+ ClientStompFrame frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.setBody("Hello World");
+
+ for (int i = 1; i <= count; i++)
+ {
+ connV11.sendFrame(frame);
+ }
+
+ assertTrue(latch.await(60, TimeUnit.SECONDS));
+
+ connV11.disconnect();
+ }
+
+ public void testSendMessage() throws Exception
+ {
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ connV11.connect(defUser, defPass);
+
+ ClientStompFrame frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.setBody("Hello World");
+
+ connV11.sendFrame(frame);
+
+ TextMessage message = (TextMessage)consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("Hello World", message.getText());
+ // Assert default priority 4 is used when priority header is not set
+ Assert.assertEquals("getJMSPriority", 4, message.getJMSPriority());
+
+ // Make sure that the timestamp is valid - should
+ // be very close to the current time.
+ long tnow = System.currentTimeMillis();
+ long tmsg = message.getJMSTimestamp();
+ Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
+ }
+
+ public void testSendMessageWithContentLength() throws Exception
+ {
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ connV11.connect(defUser, defPass);
+
+ byte[] data = new byte[] { 1, 0, 0, 4 };
+
+ ClientStompFrame frame = connV11.createFrame("SEND");
+
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.setBody(new String(data, "UTF-8"));
+
+ frame.addHeader("content-length", String.valueOf(data.length));
+
+ connV11.sendFrame(frame);
+
+ BytesMessage message = (BytesMessage)consumer.receive(10000);
+ Assert.assertNotNull(message);
+
+ assertEquals(data.length, message.getBodyLength());
+ assertEquals(data[0], message.readByte());
+ assertEquals(data[1], message.readByte());
+ assertEquals(data[2], message.readByte());
+ assertEquals(data[3], message.readByte());
+ }
+
+ public void testSendMessageWithCustomHeadersAndSelector() throws Exception
+ {
+ MessageConsumer consumer = session.createConsumer(queue, "foo =
'abc'");
+
+ connV11.connect(defUser, defPass);
+
+ ClientStompFrame frame = connV11.createFrame("SEND");
+ frame.addHeader("foo", "abc");
+ frame.addHeader("bar", "123");
+
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.setBody("Hello World");
+
+ connV11.sendFrame(frame);
+
+ TextMessage message = (TextMessage)consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("Hello World", message.getText());
+ Assert.assertEquals("foo", "abc",
message.getStringProperty("foo"));
+ Assert.assertEquals("bar", "123",
message.getStringProperty("bar"));
+ }
+
+ public void testSendMessageWithLeadingNewLine() throws Exception
+ {
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ connV11.connect(defUser, defPass);
+
+ ClientStompFrame frame = connV11.createFrame("SEND");
+
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.setBody("Hello World");
+
+ connV11.sendWickedFrame(frame);
+
+ TextMessage message = (TextMessage)consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("Hello World", message.getText());
+
+ // Make sure that the timestamp is valid - should
+ // be very close to the current time.
+ long tnow = System.currentTimeMillis();
+ long tmsg = message.getJMSTimestamp();
+ Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
+
+ assertNull(consumer.receive(1000));
+
+ connV11.disconnect();
+ }
+
+ public void testSendMessageWithReceipt() throws Exception
+ {
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ connV11.connect(defUser, defPass);
+
+ ClientStompFrame frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("receipt", "1234");
+ frame.setBody("Hello World");
+
+ frame = connV11.sendFrame(frame);
+
+ assertTrue(frame.getCommand().equals("RECEIPT"));
+ assertEquals("1234", frame.getHeader("receipt-id"));
+
+ TextMessage message = (TextMessage)consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("Hello World", message.getText());
+
+ // Make sure that the timestamp is valid - should
+ // be very close to the current time.
+ long tnow = System.currentTimeMillis();
+ long tmsg = message.getJMSTimestamp();
+ Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
+
+ connV11.disconnect();
+ }
+
+ public void testSendMessageWithStandardHeaders() throws Exception
+ {
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ connV11.connect(defUser, defPass);
+
+ ClientStompFrame frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("correlation-id", "c123");
+ frame.addHeader("persistent", "true");
+ frame.addHeader("priority", "3");
+ frame.addHeader("type", "t345");
+ frame.addHeader("JMSXGroupID", "abc");
+ frame.addHeader("foo", "abc");
+ frame.addHeader("bar", "123");
+
+ frame.setBody("Hello World");
+
+ frame = connV11.sendFrame(frame);
+
+ TextMessage message = (TextMessage)consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("Hello World", message.getText());
+ Assert.assertEquals("JMSCorrelationID", "c123",
message.getJMSCorrelationID());
+ Assert.assertEquals("getJMSType", "t345",
message.getJMSType());
+ Assert.assertEquals("getJMSPriority", 3, message.getJMSPriority());
+ Assert.assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode());
+ Assert.assertEquals("foo", "abc",
message.getStringProperty("foo"));
+ Assert.assertEquals("bar", "123",
message.getStringProperty("bar"));
+
+ Assert.assertEquals("JMSXGroupID", "abc",
message.getStringProperty("JMSXGroupID"));
+
+ connV11.disconnect();
+ }
+
+ public void testSubscribeToTopic() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ this.subscribeTopic(connV11, "sub1", null, null, true);
+
+ sendMessage(getName(), topic);
+
+ ClientStompFrame frame = connV11.receiveFrame();
+
+ Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
+ Assert.assertTrue(frame.getHeader("destination").equals(getTopicPrefix()
+ getTopicName()));
+ Assert.assertTrue(frame.getBody().equals(getName()));
+
+ this.unsubscribe(connV11, "sub1", true);
+
+ sendMessage(getName(), topic);
+
+ frame = connV11.receiveFrame(1000);
+ assertNull(frame);
+
+ connV11.disconnect();
+ }
+
+ public void testSubscribeToTopicWithNoLocal() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ this.subscribeTopic(connV11, "sub1", null, null, true, true);
+
+ // send a message on the same connection => it should not be received
+ ClientStompFrame frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getTopicPrefix() + getTopicName());
+
+ frame.setBody("Hello World");
+
+ connV11.sendFrame(frame);
+
+ frame = connV11.receiveFrame(2000);
+
+ assertNull(frame);
+
+ // send message on another JMS connection => it should be received
+ sendMessage(getName(), topic);
+
+ frame = connV11.receiveFrame();
+
+ Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
+ Assert.assertTrue(frame.getHeader("destination").equals(getTopicPrefix()
+ getTopicName()));
+ Assert.assertTrue(frame.getBody().equals(getName()));
+
+ this.unsubscribe(connV11, "sub1");
+
+ connV11.disconnect();
+ }
+
+ public void testSubscribeWithAutoAck() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ this.subscribe(connV11, "sub1", "auto");
+
+ sendMessage(getName());
+
+ ClientStompFrame frame = connV11.receiveFrame();
+
+ Assert.assertEquals("MESSAGE", frame.getCommand());
+ Assert.assertNotNull(frame.getHeader("destination"));
+ Assert.assertEquals(getName(), frame.getBody());
+
+ connV11.disconnect();
+
+ // message should not be received as it was auto-acked
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message message = consumer.receive(1000);
+ Assert.assertNull(message);
+ }
+
+ public void testSubscribeWithAutoAckAndBytesMessage() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ this.subscribe(connV11, "sub1", "auto");
+
+ byte[] payload = new byte[] { 1, 2, 3, 4, 5 };
+ sendMessage(payload, queue);
+
+ ClientStompFrame frame = connV11.receiveFrame();
+
+ assertEquals("MESSAGE", frame.getCommand());
+
+ System.out.println("Message: " + frame);
+
+ assertEquals("5", frame.getHeader("content-length"));
+
+ assertEquals(null, frame.getHeader("type"));
+
+ assertEquals(frame.getBody(), new String(payload, "UTF-8"));
+
+ connV11.disconnect();
+ }
+
+ public void testSubscribeWithClientAck() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ this.subscribe(connV11, "sub1", "client");
+
+ sendMessage(getName());
+
+ ClientStompFrame frame = connV11.receiveFrame();
+
+ this.ack(connV11, "sub1", frame);
+
+ connV11.disconnect();
+
+ // message should not be received since message was acknowledged by the client
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message message = consumer.receive(1000);
+ Assert.assertNull(message);
+ }
+
+ public void
testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithExplicitDisconnect() throws
Exception
+ {
+ assertSubscribeWithClientAckThenConsumeWithAutoAck(true);
+ }
+
+ public void
testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithNoDisconnectFrame() throws
Exception
+ {
+ assertSubscribeWithClientAckThenConsumeWithAutoAck(false);
+ }
+
+ public void testSubscribeWithID() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ this.subscribe(connV11, "mysubid", "auto");
+
+ sendMessage(getName());
+
+ ClientStompFrame frame = connV11.receiveFrame();
+
+ Assert.assertTrue(frame.getHeader("subscription") != null);
+
+ connV11.disconnect();
+ }
+
+ public void testSubscribeWithMessageSentWithProperties() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ this.subscribe(connV11, "sub1", "auto");
+
+ MessageProducer producer = session.createProducer(queue);
+ BytesMessage message = session.createBytesMessage();
+ message.setStringProperty("S", "value");
+ message.setBooleanProperty("n", false);
+ message.setByteProperty("byte", (byte)9);
+ message.setDoubleProperty("d", 2.0);
+ message.setFloatProperty("f", (float)6.0);
+ message.setIntProperty("i", 10);
+ message.setLongProperty("l", 121);
+ message.setShortProperty("s", (short)12);
+ message.writeBytes("Hello World".getBytes("UTF-8"));
+ producer.send(message);
+
+ ClientStompFrame frame = connV11.receiveFrame();
+ Assert.assertNotNull(frame);
+
+ Assert.assertTrue(frame.getHeader("S") != null);
+ Assert.assertTrue(frame.getHeader("n") != null);
+ Assert.assertTrue(frame.getHeader("byte") != null);
+ Assert.assertTrue(frame.getHeader("d") != null);
+ Assert.assertTrue(frame.getHeader("f") != null);
+ Assert.assertTrue(frame.getHeader("i") != null);
+ Assert.assertTrue(frame.getHeader("l") != null);
+ Assert.assertTrue(frame.getHeader("s") != null);
+ Assert.assertEquals("Hello World", frame.getBody());
+
+ connV11.disconnect();
+ }
+
+ public void testSuccessiveTransactionsWithSameID() throws Exception
+ {
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ connV11.connect(defUser, defPass);
+
+ // first tx
+ this.beginTransaction(connV11, "tx1");
+
+ ClientStompFrame frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("transaction", "tx1");
+ frame.setBody("Hello World");
+
+ connV11.sendFrame(frame);
+
+ this.commitTransaction(connV11, "tx1");
+
+ Message message = consumer.receive(1000);
+ Assert.assertNotNull("Should have received a message", message);
+
+ // 2nd tx with same tx ID
+ this.beginTransaction(connV11, "tx1");
+
+ frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("transaction", "tx1");
+
+ frame.setBody("Hello World");
+
+ connV11.sendFrame(frame);
+
+ this.commitTransaction(connV11, "tx1");
+
+ message = consumer.receive(1000);
+ Assert.assertNotNull("Should have received a message", message);
+
+ connV11.disconnect();
+ }
+
+ public void testTransactionCommit() throws Exception
+ {
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ connV11.connect(defUser, defPass);
+
+ this.beginTransaction(connV11, "tx1");
+
+ ClientStompFrame frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("transaction", "tx1");
+ frame.addHeader("receipt", "123");
+ frame.setBody("Hello World");
+
+ frame = connV11.sendFrame(frame);
+
+ assertEquals("123", frame.getHeader("receipt-id"));
+
+ // check the message is not committed
+ assertNull(consumer.receive(100));
+
+ this.commitTransaction(connV11, "tx1", true);
+
+ Message message = consumer.receive(1000);
+ Assert.assertNotNull("Should have received a message", message);
+
+ connV11.disconnect();
+ }
+
+ public void testTransactionRollback() throws Exception
+ {
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ connV11.connect(defUser, defPass);
+
+ this.beginTransaction(connV11, "tx1");
+
+ ClientStompFrame frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("transaction", "tx1");
+
+ frame.setBody("first message");
+
+ connV11.sendFrame(frame);
+
+ // rollback first message
+ this.abortTransaction(connV11, "tx1");
+
+ this.beginTransaction(connV11, "tx1");
+
+ frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("transaction", "tx1");
+
+ frame.setBody("second message");
+
+ connV11.sendFrame(frame);
+
+ this.commitTransaction(connV11, "tx1", true);
+
+ // only second msg should be received since first msg was rolled back
+ TextMessage message = (TextMessage)consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("second message", message.getText());
+
+ connV11.disconnect();
+ }
+
+ public void testUnsubscribe() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ this.subscribe(connV11, "sub1", "auto");
+
+ // send a message to our queue
+ sendMessage("first message");
+
+ // receive message from socket
+ ClientStompFrame frame = connV11.receiveFrame();
+
+ Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
+
+ // remove suscription
+ this.unsubscribe(connV11, "sub1", true);
+
+ // send a message to our queue
+ sendMessage("second message");
+
+ frame = connV11.receiveFrame(1000);
+ assertNull(frame);
+
+ connV11.disconnect();
+ }
+
+ //-----------------private help methods
+
+ private void abortTransaction(StompClientConnection conn, String txID) throws
IOException, InterruptedException
+ {
+ ClientStompFrame abortFrame = conn.createFrame("ABORT");
+ abortFrame.addHeader("transaction", txID);
+
+ conn.sendFrame(abortFrame);
+ }
+
+ private void beginTransaction(StompClientConnection conn, String txID) throws
IOException, InterruptedException
+ {
+ ClientStompFrame beginFrame = conn.createFrame("BEGIN");
+ beginFrame.addHeader("transaction", txID);
+
+ conn.sendFrame(beginFrame);
+ }
+
+ private void commitTransaction(StompClientConnection conn, String txID) throws
IOException, InterruptedException
+ {
+ commitTransaction(conn, txID, false);
+ }
+
+ private void commitTransaction(StompClientConnection conn, String txID, boolean
receipt) throws IOException, InterruptedException
+ {
+ ClientStompFrame beginFrame = conn.createFrame("COMMIT");
+ beginFrame.addHeader("transaction", txID);
+ if (receipt)
+ {
+ beginFrame.addHeader("receipt", "1234");
+ }
+ ClientStompFrame resp = conn.sendFrame(beginFrame);
+ if (receipt)
+ {
+ assertEquals("1234", resp.getHeader("receipt-id"));
+ }
+ }
+
+ private void ack(StompClientConnection conn, String subId,
+ ClientStompFrame frame) throws IOException, InterruptedException
+ {
+ String messageID = frame.getHeader("message-id");
+
+ ClientStompFrame ackFrame = conn.createFrame("ACK");
+
+ ackFrame.addHeader("subscription", subId);
+ ackFrame.addHeader("message-id", messageID);
+
+ ClientStompFrame response = conn.sendFrame(ackFrame);
+ if (response != null)
+ {
+ throw new IOException("failed to ack " + response);
+ }
+ }
+
+ private void ack(StompClientConnection conn, String subId, String mid, String txID)
throws IOException, InterruptedException
+ {
+ ClientStompFrame ackFrame = conn.createFrame("ACK");
+ ackFrame.addHeader("subscription", subId);
+ ackFrame.addHeader("message-id", mid);
+ if (txID != null)
+ {
+ ackFrame.addHeader("transaction", txID);
+ }
+
+ conn.sendFrame(ackFrame);
+ }
+
+ private void nack(StompClientConnection conn, String subId, String mid) throws
IOException, InterruptedException
+ {
+ ClientStompFrame ackFrame = conn.createFrame("NACK");
+ ackFrame.addHeader("subscription", subId);
+ ackFrame.addHeader("message-id", mid);
+
+ conn.sendFrame(ackFrame);
+ }
+
+ private void subscribe(StompClientConnection conn, String subId, String ack) throws
IOException, InterruptedException
+ {
+ subscribe(conn, subId, ack, null, null);
+ }
+
+ private void subscribe(StompClientConnection conn, String subId,
+ String ack, String durableId) throws IOException, InterruptedException
+ {
+ subscribe(conn, subId, ack, durableId, null);
+ }
+
+ private void subscribe(StompClientConnection conn, String subId,
+ String ack, String durableId, boolean receipt) throws IOException,
InterruptedException
+ {
+ subscribe(conn, subId, ack, durableId, null, receipt);
+ }
+
+ private void subscribe(StompClientConnection conn, String subId, String ack,
+ String durableId, String selector) throws IOException,
+ InterruptedException
+ {
+ subscribe(conn, subId, ack, durableId, selector, false);
+ }
+
+ private void subscribe(StompClientConnection conn, String subId,
+ String ack, String durableId, String selector, boolean receipt) throws
IOException, InterruptedException
+ {
+ ClientStompFrame subFrame = conn.createFrame("SUBSCRIBE");
+ subFrame.addHeader("id", subId);
+ subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+ if (ack != null)
+ {
+ subFrame.addHeader("ack", ack);
+ }
+ if (durableId != null)
+ {
+ subFrame.addHeader("durable-subscriber-name", durableId);
+ }
+ if (selector != null)
+ {
+ subFrame.addHeader("selector", selector);
+ }
+ if (receipt)
+ {
+ subFrame.addHeader("receipt", "1234");
+ }
+
+ subFrame = conn.sendFrame(subFrame);
+
+ if (receipt)
+ {
+ assertEquals("1234", subFrame.getHeader("receipt-id"));
+ }
+ }
+
+ private void subscribeTopic(StompClientConnection conn, String subId,
+ String ack, String durableId) throws IOException, InterruptedException
+ {
+ subscribeTopic(conn, subId, ack, durableId, false);
+ }
+
+ private void subscribeTopic(StompClientConnection conn, String subId,
+ String ack, String durableId, boolean receipt) throws IOException,
InterruptedException
+ {
+ subscribeTopic(conn, subId, ack, durableId, receipt, false);
+ }
+
+ private void subscribeTopic(StompClientConnection conn, String subId,
+ String ack, String durableId, boolean receipt, boolean noLocal) throws
IOException, InterruptedException
+ {
+ ClientStompFrame subFrame = conn.createFrame("SUBSCRIBE");
+ subFrame.addHeader("id", subId);
+ subFrame.addHeader("destination", getTopicPrefix() + getTopicName());
+ if (ack != null)
+ {
+ subFrame.addHeader("ack", ack);
+ }
+ if (durableId != null)
+ {
+ subFrame.addHeader("durable-subscriber-name", durableId);
+ }
+ if (receipt)
+ {
+ subFrame.addHeader("receipt", "1234");
+ }
+ if (noLocal)
+ {
+ subFrame.addHeader("no-local", "true");
+ }
+
+ ClientStompFrame frame = conn.sendFrame(subFrame);
+
+ if (receipt)
+ {
+ assertTrue(frame.getHeader("receipt-id").equals("1234"));
+ }
+ }
+
+ private void unsubscribe(StompClientConnection conn, String subId) throws IOException,
InterruptedException
+ {
+ ClientStompFrame subFrame = conn.createFrame("UNSUBSCRIBE");
+ subFrame.addHeader("id", subId);
+
+ conn.sendFrame(subFrame);
+ }
+
+ private void unsubscribe(StompClientConnection conn, String subId,
+ boolean receipt) throws IOException, InterruptedException
+ {
+ ClientStompFrame subFrame = conn.createFrame("UNSUBSCRIBE");
+ subFrame.addHeader("id", subId);
+
+ if (receipt)
+ {
+ subFrame.addHeader("receipt", "4321");
+ }
+
+ ClientStompFrame f = conn.sendFrame(subFrame);
+
+ if (receipt)
+ {
+ System.out.println("response: " + f);
+ assertEquals("RECEIPT", f.getCommand());
+ assertEquals("4321", f.getHeader("receipt-id"));
+ }
+ }
+
+ protected void assertSubscribeWithClientAckThenConsumeWithAutoAck(boolean
sendDisconnect) throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ this.subscribe(connV11, "sub1", "client");
+
+ sendMessage(getName());
+
+ ClientStompFrame frame = connV11.receiveFrame();
+
+ Assert.assertEquals("MESSAGE", frame.getCommand());
+
+ log.info("Reconnecting!");
+
+ if (sendDisconnect)
+ {
+ connV11.disconnect();
+ connV11 = StompClientConnectionFactory.createClientConnection("1.1",
hostname, port);
+ }
+ else
+ {
+ connV11.destroy();
+ connV11 = StompClientConnectionFactory.createClientConnection("1.1",
hostname, port);
+ }
+
+ // message should be received since message was not acknowledged
+ connV11.connect(defUser, defPass);
+
+ this.subscribe(connV11, "sub1", null);
+
+ frame = connV11.receiveFrame();
+ Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
+
+ connV11.disconnect();
+
+ // now lets make sure we don't see the message again
+ connV11.destroy();
+ connV11 = StompClientConnectionFactory.createClientConnection("1.1",
hostname, port);
+ connV11.connect(defUser, defPass);
+
+ this.subscribe(connV11, "sub1", null, null, true);
+
+ sendMessage("shouldBeNextMessage");
+
+ frame = connV11.receiveFrame();
+ Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
+ Assert.assertEquals("shouldBeNextMessage", frame.getBody());
+ }
+
+}
+
+
+
+
+