Author: timfox
Date: 2010-04-14 14:35:54 -0400 (Wed, 14 Apr 2010)
New Revision: 9118
Modified:
trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
Log:
fixed stomp race condition
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java 2010-04-14 17:12:52
UTC (rev 9117)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java 2010-04-14 18:35:54
UTC (rev 9118)
@@ -16,6 +16,7 @@
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.SimpleString;
@@ -48,12 +49,12 @@
private final OperationContext sessionContext;
- private final Map<Long, StompSubscription> subscriptions = new HashMap<Long,
StompSubscription>();
+ private final Map<Long, StompSubscription> subscriptions = new
ConcurrentHashMap<Long, StompSubscription>();
// key = message ID, value = consumer ID
- private final Map<Long, Long> messagesToAck = new HashMap<Long, Long>();
+ private final Map<Long, Long> messagesToAck = new ConcurrentHashMap<Long,
Long>();
- private boolean noLocal = false;
+ private volatile boolean noLocal = false;
StompSession(final StompConnection connection, final StompProtocolManager manager,
OperationContext sessionContext)
{
@@ -90,19 +91,17 @@
}
HornetQBuffer buffer = serverMessage.getBodyBuffer();
buffer.readerIndex(MessageImpl.BUFFER_HEADER_SPACE + DataConstants.SIZE_INT);
- int bodyPos = serverMessage.getEndOfBodyPosition() == -1 ? buffer.writerIndex()
: serverMessage.getEndOfBodyPosition();
+ int bodyPos = serverMessage.getEndOfBodyPosition() == -1 ? buffer.writerIndex()
+ :
serverMessage.getEndOfBodyPosition();
int size = bodyPos - buffer.readerIndex();
byte[] data = new byte[size];
buffer.readBytes(data);
headers.put(Headers.CONTENT_LENGTH, data.length);
serverMessage.getBodyBuffer().resetReaderIndex();
-
+
StompFrame frame = new StompFrame(Stomp.Responses.MESSAGE, headers, data);
StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame,
deliveryCount);
- manager.send(connection, frame);
- int length = frame.getEncodedSize();
-
if (subscription.getAck().equals(Stomp.Headers.Subscribe.AckModeValues.AUTO))
{
session.acknowledge(consumerID, serverMessage.getMessageID());
@@ -112,6 +111,11 @@
{
messagesToAck.put(serverMessage.getMessageID(), consumerID);
}
+
+ // Must send AFTER adding to messagesToAck - or could get acked from client
BEFORE it's been added!
+ manager.send(connection, frame);
+ int length = frame.getEncodedSize();
+
return length;
}
@@ -149,7 +153,7 @@
String subscriptionID,
String clientID,
String durableSubscriptionName,
- String destination,
+ String destination,
String selector,
String ack) throws Exception
{
@@ -159,7 +163,7 @@
// subscribes to a topic
if (durableSubscriptionName != null)
{
- if (clientID == null)
+ if (clientID == null)
{
throw new IllegalStateException("Cannot create a subscriber on the
durable subscription if the client-id of the connection is not set");
}
@@ -177,7 +181,8 @@
throw new IllegalStateException("Cannot create a subscriber on the
durable subscription since it already has a subscriber: " + queue);
}
}
- } else
+ }
+ else
{
queue = UUIDGenerator.getInstance().generateSimpleStringUUID();
session.createQueue(SimpleString.toSimpleString(destination), queue, null,
true, false);
@@ -239,7 +244,7 @@
{
return noLocal;
}
-
+
public void setNoLocal(boolean noLocal)
{
this.noLocal = noLocal;
Modified: trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-04-14 17:12:52
UTC (rev 9117)
+++ trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-04-14 18:35:54
UTC (rev 9118)
@@ -66,1407 +66,1211 @@
import org.hornetq.spi.core.protocol.ProtocolType;
import org.hornetq.tests.util.UnitTestCase;
-public class StompTest extends UnitTestCase {
- private static final transient Logger log = Logger.getLogger(StompTest.class);
- private int port = 61613;
- private Socket stompSocket;
- private ByteArrayOutputStream inputBuffer;
- private ConnectionFactory connectionFactory;
- private Connection connection;
- private Session session;
- private Queue queue;
- private Topic topic;
- private JMSServerManager server;
+public class StompTest extends UnitTestCase
+{
+ private static final transient Logger log = Logger.getLogger(StompTest.class);
- public void _testSendManyMessages() throws Exception {
- MessageConsumer consumer = session.createConsumer(queue);
-
- String frame =
- "CONNECT\n" +
- "login: brianm\n" +
- "passcode: wombats\n\n" +
- Stomp.NULL;
- sendFrame(frame);
- frame = receiveFrame(10000);
-
- Assert.assertTrue(frame.startsWith("CONNECTED"));
- int count = 1000;
- final CountDownLatch latch = new CountDownLatch(count);
- consumer.setMessageListener(new MessageListener()
+ private int port = 61613;
+
+ private Socket stompSocket;
+
+ private ByteArrayOutputStream inputBuffer;
+
+ private ConnectionFactory connectionFactory;
+
+ private Connection connection;
+
+ private Session session;
+
+ private Queue queue;
+
+ private Topic topic;
+
+ private JMSServerManager server;
+
+ public void _testSendManyMessages() throws Exception
+ {
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ String frame = "CONNECT\n" + "login: brianm\n" +
"passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
+ frame = receiveFrame(10000);
+
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+ int count = 1000;
+ final CountDownLatch latch = new CountDownLatch(count);
+ consumer.setMessageListener(new MessageListener()
{
-
+
public void onMessage(Message arg0)
{
System.out.println("<<< " + (1000 - latch.getCount()));
latch.countDown();
}
});
-
- frame =
- "SEND\n" +
- "destination:" + getQueuePrefix() + getQueueName() +
"\n\n" +
- "Hello World" +
- Stomp.NULL;
- for (int i=1; i <= count; i++) {
- // Thread.sleep(1);
- System.out.println(">>> " + i);
- sendFrame(frame);
- }
-
- assertTrue(latch.await(60, TimeUnit.SECONDS));
-
- }
-
- public void testConnect() throws Exception {
- String connect_frame = "CONNECT\n" + "login: brianm\n" +
"passcode: wombats\n" + "request-id: 1\n" + "\n" +
Stomp.NULL;
- sendFrame(connect_frame);
+ frame = "SEND\n" + "destination:" + getQueuePrefix() +
getQueueName() + "\n\n" + "Hello World" + Stomp.NULL;
+ for (int i = 1; i <= count; i++)
+ {
+ // Thread.sleep(1);
+ System.out.println(">>> " + i);
+ sendFrame(frame);
+ }
- String f = receiveFrame(10000);
- Assert.assertTrue(f.startsWith("CONNECTED"));
- Assert.assertTrue(f.indexOf("response-id:1") >= 0);
- }
-
- public void testDisconnectAndError() throws Exception {
+ assertTrue(latch.await(60, TimeUnit.SECONDS));
- String connectFrame = "CONNECT\n" + "login: brianm\n" +
"passcode: wombats\n" + "request-id: 1\n" + "\n" +
Stomp.NULL;
- sendFrame(connectFrame);
+ }
- String f = receiveFrame(10000);
- Assert.assertTrue(f.startsWith("CONNECTED"));
- Assert.assertTrue(f.indexOf("response-id:1") >= 0);
-
- String disconnectFrame = "DISCONNECT\n\n" + Stomp.NULL;
- sendFrame(disconnectFrame);
-
- waitForFrameToTakeEffect();
-
- // sending a message will result in an error
- String frame =
- "SEND\n" +
- "destination:" + getQueuePrefix() + getQueueName() +
"\n\n" +
- "Hello World" +
- Stomp.NULL;
- try {
- sendFrame(frame);
- Assert.fail("the socket must have been closed when the server handled the
DISCONNECT");
- } catch (IOException e)
- {
- }
+ public void testConnect() throws Exception
+ {
+
+ String connect_frame = "CONNECT\n" + "login: brianm\n" +
+ "passcode: wombats\n" +
+ "request-id: 1\n" +
+ "\n" +
+ Stomp.NULL;
+ sendFrame(connect_frame);
+
+ String f = receiveFrame(10000);
+ Assert.assertTrue(f.startsWith("CONNECTED"));
+ Assert.assertTrue(f.indexOf("response-id:1") >= 0);
}
+ public void testDisconnectAndError() throws Exception
+ {
- public void testSendMessage() throws Exception {
+ String connectFrame = "CONNECT\n" + "login: brianm\n" +
+ "passcode: wombats\n" +
+ "request-id: 1\n" +
+ "\n" +
+ Stomp.NULL;
+ sendFrame(connectFrame);
- MessageConsumer consumer = session.createConsumer(queue);
+ String f = receiveFrame(10000);
+ Assert.assertTrue(f.startsWith("CONNECTED"));
+ Assert.assertTrue(f.indexOf("response-id:1") >= 0);
- String frame =
- "CONNECT\n" +
- "login: brianm\n" +
- "passcode: wombats\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ String disconnectFrame = "DISCONNECT\n\n" + Stomp.NULL;
+ sendFrame(disconnectFrame);
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ waitForFrameToTakeEffect();
- frame =
- "\nSEND\n" +
- "destination:" + getQueuePrefix() + getQueueName() +
"\n\n" +
- "Hello World" +
- Stomp.NULL;
+ // sending a message will result in an error
+ String frame = "SEND\n" + "destination:" +
+ getQueuePrefix() +
+ getQueueName() +
+ "\n\n" +
+ "Hello World" +
+ Stomp.NULL;
+ try
+ {
+ sendFrame(frame);
+ Assert.fail("the socket must have been closed when the server handled the
DISCONNECT");
+ }
+ catch (IOException e)
+ {
+ }
+ }
- sendFrame(frame);
-
- BytesMessage message = (BytesMessage) consumer.receive(1000);
- Assert.assertNotNull(message);
- Assert.assertEquals("Hello World", readContent(message));
+ public void testSendMessage() throws Exception
+ {
- // Make sure that the timestamp is valid - should
- // be very close to the current time.
- long tnow = System.currentTimeMillis();
- long tmsg = message.getJMSTimestamp();
- Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
- }
-
- public void testSendMessageWithReceipt() throws Exception {
+ MessageConsumer consumer = session.createConsumer(queue);
- MessageConsumer consumer = session.createConsumer(queue);
+ String frame = "CONNECT\n" + "login: brianm\n" +
"passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
- String frame =
- "CONNECT\n" +
- "login: brianm\n" +
- "passcode: wombats\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ frame = "\nSEND\n" + "destination:" + getQueuePrefix() +
getQueueName() + "\n\n" + "Hello World" + Stomp.NULL;
- frame =
- "SEND\n" +
- "destination:" + getQueuePrefix() + getQueueName() +
"\n" +
- "receipt: 1234\n\n" +
- "Hello World" +
- Stomp.NULL;
+ sendFrame(frame);
- sendFrame(frame);
+ BytesMessage message = (BytesMessage)consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("Hello World", readContent(message));
- String f = receiveFrame(10000);
- Assert.assertTrue(f.startsWith("RECEIPT"));
- Assert.assertTrue(f.indexOf("receipt-id:1234") >= 0);
+ // Make sure that the timestamp is valid - should
+ // be very close to the current time.
+ long tnow = System.currentTimeMillis();
+ long tmsg = message.getJMSTimestamp();
+ Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
+ }
- BytesMessage message = (BytesMessage) consumer.receive(1000);
- Assert.assertNotNull(message);
- Assert.assertEquals("Hello World", readContent(message));
+ public void testSendMessageWithReceipt() throws Exception
+ {
- // Make sure that the timestamp is valid - should
- // be very close to the current time.
- long tnow = System.currentTimeMillis();
- long tmsg = message.getJMSTimestamp();
- Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ String frame = "CONNECT\n" + "login: brianm\n" +
"passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame = "SEND\n" + "destination:" +
+ getQueuePrefix() +
+ getQueueName() +
+ "\n" +
+ "receipt: 1234\n\n" +
+ "Hello World" +
+ Stomp.NULL;
+
+ sendFrame(frame);
+
+ String f = receiveFrame(10000);
+ Assert.assertTrue(f.startsWith("RECEIPT"));
+ Assert.assertTrue(f.indexOf("receipt-id:1234") >= 0);
+
+ BytesMessage message = (BytesMessage)consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("Hello World", readContent(message));
+
+ // Make sure that the timestamp is valid - should
+ // be very close to the current time.
+ long tnow = System.currentTimeMillis();
+ long tmsg = message.getJMSTimestamp();
+ Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
}
-
- public void testSendMessageWithContentLength() throws Exception {
- MessageConsumer consumer = session.createConsumer(queue);
+ public void testSendMessageWithContentLength() throws Exception
+ {
- String frame =
- "CONNECT\n" +
- "login: brianm\n" +
- "passcode: wombats\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ MessageConsumer consumer = session.createConsumer(queue);
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ String frame = "CONNECT\n" + "login: brianm\n" +
"passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
- byte[] data = new byte[] {1, 0, 0, 4};
-
- frame =
- "SEND\n" +
- "destination:" + getQueuePrefix() + getQueueName() +
"\n" +
- "content-length:" + data.length + "\n\n";
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- baos.write(frame.getBytes("UTF-8"));
- baos.write(data);
- baos.write('\0');
- sendFrame(baos.toByteArray());
-
- BytesMessage message = (BytesMessage) consumer.receive(1000);
- Assert.assertNotNull(message);
- assertEquals(data.length, message.getBodyLength());
- assertEquals(data[0], message.readByte());
- assertEquals(data[1], message.readByte());
- assertEquals(data[2], message.readByte());
- assertEquals(data[3], message.readByte());
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ byte[] data = new byte[] { 1, 0, 0, 4 };
+
+ frame = "SEND\n" + "destination:" +
+ getQueuePrefix() +
+ getQueueName() +
+ "\n" +
+ "content-length:" +
+ data.length +
+ "\n\n";
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ baos.write(frame.getBytes("UTF-8"));
+ baos.write(data);
+ baos.write('\0');
+ sendFrame(baos.toByteArray());
+
+ BytesMessage message = (BytesMessage)consumer.receive(1000);
+ Assert.assertNotNull(message);
+ assertEquals(data.length, message.getBodyLength());
+ assertEquals(data[0], message.readByte());
+ assertEquals(data[1], message.readByte());
+ assertEquals(data[2], message.readByte());
+ assertEquals(data[3], message.readByte());
}
- public void testJMSXGroupIdCanBeSet() throws Exception {
+ public void testJMSXGroupIdCanBeSet() throws Exception
+ {
- MessageConsumer consumer = session.createConsumer(queue);
+ MessageConsumer consumer = session.createConsumer(queue);
- String frame =
- "CONNECT\n" +
- "login: brianm\n" +
- "passcode: wombats\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ String frame = "CONNECT\n" + "login: brianm\n" +
"passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
- frame =
- "SEND\n" +
- "destination:" + getQueuePrefix() + getQueueName() +
"\n" +
- "JMSXGroupID: TEST\n\n" +
- "Hello World" +
- Stomp.NULL;
+ frame = "SEND\n" + "destination:" +
+ getQueuePrefix() +
+ getQueueName() +
+ "\n" +
+ "JMSXGroupID: TEST\n\n" +
+ "Hello World" +
+ Stomp.NULL;
- sendFrame(frame);
+ sendFrame(frame);
- BytesMessage message = (BytesMessage) consumer.receive(1000);
- Assert.assertNotNull(message);
- Assert.assertEquals("Hello World", readContent(message));
- // differ from StompConnect
- Assert.assertEquals("TEST",
message.getStringProperty("JMSXGroupID"));
- }
+ BytesMessage message = (BytesMessage)consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("Hello World", readContent(message));
+ // differ from StompConnect
+ Assert.assertEquals("TEST",
message.getStringProperty("JMSXGroupID"));
+ }
- public void testSendMessageWithCustomHeadersAndSelector() throws Exception {
+ public void testSendMessageWithCustomHeadersAndSelector() throws Exception
+ {
- MessageConsumer consumer = session.createConsumer(queue, "foo =
'abc'");
+ MessageConsumer consumer = session.createConsumer(queue, "foo =
'abc'");
- String frame =
- "CONNECT\n" +
- "login: brianm\n" +
- "passcode: wombats\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ String frame = "CONNECT\n" + "login: brianm\n" +
"passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
- frame =
- "SEND\n" +
- "foo:abc\n" +
- "bar:123\n" +
- "destination:" + getQueuePrefix() + getQueueName() +
"\n\n" +
- "Hello World" +
- Stomp.NULL;
+ frame = "SEND\n" + "foo:abc\n" +
+ "bar:123\n" +
+ "destination:" +
+ getQueuePrefix() +
+ getQueueName() +
+ "\n\n" +
+ "Hello World" +
+ Stomp.NULL;
- sendFrame(frame);
+ sendFrame(frame);
- BytesMessage message = (BytesMessage) consumer.receive(1000);
- Assert.assertNotNull(message);
- Assert.assertEquals("Hello World", readContent(message));
- Assert.assertEquals("foo", "abc",
message.getStringProperty("foo"));
- Assert.assertEquals("bar", "123",
message.getStringProperty("bar"));
- }
+ BytesMessage message = (BytesMessage)consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("Hello World", readContent(message));
+ Assert.assertEquals("foo", "abc",
message.getStringProperty("foo"));
+ Assert.assertEquals("bar", "123",
message.getStringProperty("bar"));
+ }
- public void testSendMessageWithStandardHeaders() throws Exception {
+ public void testSendMessageWithStandardHeaders() throws Exception
+ {
- MessageConsumer consumer = session.createConsumer(queue);
+ MessageConsumer consumer = session.createConsumer(queue);
- String frame =
- "CONNECT\n" +
- "login: brianm\n" +
- "passcode: wombats\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ String frame = "CONNECT\n" + "login: brianm\n" +
"passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
- frame =
- "SEND\n" +
- "correlation-id:c123\n" +
- "persistent:true\n" +
- "priority:3\n" +
- "type:t345\n" +
- "JMSXGroupID:abc\n" +
- "foo:abc\n" +
- "bar:123\n" +
- "destination:" + getQueuePrefix() + getQueueName() +
"\n\n" +
- "Hello World" +
- Stomp.NULL;
+ frame = "SEND\n" + "correlation-id:c123\n" +
+ "persistent:true\n" +
+ "priority:3\n" +
+ "type:t345\n" +
+ "JMSXGroupID:abc\n" +
+ "foo:abc\n" +
+ "bar:123\n" +
+ "destination:" +
+ getQueuePrefix() +
+ getQueueName() +
+ "\n\n" +
+ "Hello World" +
+ Stomp.NULL;
- sendFrame(frame);
+ sendFrame(frame);
- BytesMessage message = (BytesMessage) consumer.receive(1000);
- Assert.assertNotNull(message);
- Assert.assertEquals("Hello World", readContent(message));
- Assert.assertEquals("JMSCorrelationID", "c123",
message.getJMSCorrelationID());
- Assert.assertEquals("getJMSType", "t345",
message.getJMSType());
- Assert.assertEquals("getJMSPriority", 3, message.getJMSPriority());
- Assert.assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode());
- Assert.assertEquals("foo", "abc",
message.getStringProperty("foo"));
- Assert.assertEquals("bar", "123",
message.getStringProperty("bar"));
+ BytesMessage message = (BytesMessage)consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("Hello World", readContent(message));
+ Assert.assertEquals("JMSCorrelationID", "c123",
message.getJMSCorrelationID());
+ Assert.assertEquals("getJMSType", "t345",
message.getJMSType());
+ Assert.assertEquals("getJMSPriority", 3, message.getJMSPriority());
+ Assert.assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode());
+ Assert.assertEquals("foo", "abc",
message.getStringProperty("foo"));
+ Assert.assertEquals("bar", "123",
message.getStringProperty("bar"));
- Assert.assertEquals("JMSXGroupID", "abc",
message.getStringProperty("JMSXGroupID"));
- // FIXME do we support it?
- //Assert.assertEquals("GroupID", "abc",
amqMessage.getGroupID());
- }
+ Assert.assertEquals("JMSXGroupID", "abc",
message.getStringProperty("JMSXGroupID"));
+ // FIXME do we support it?
+ // Assert.assertEquals("GroupID", "abc",
amqMessage.getGroupID());
+ }
- public void testSubscribeWithAutoAck() throws Exception {
+ public void testSubscribeWithAutoAck() throws Exception
+ {
- String frame =
- "CONNECT\n" +
- "login: brianm\n" +
- "passcode: wombats\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ String frame = "CONNECT\n" + "login: brianm\n" +
"passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
- frame = receiveFrame(100000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
- frame =
- "SUBSCRIBE\n" +
- "destination:" + getQueuePrefix() + getQueueName() +
"\n" +
- "ack:auto\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() +
getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
+ sendFrame(frame);
- sendMessage(getName());
+ sendMessage(getName());
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("MESSAGE"));
- Assert.assertTrue(frame.indexOf("destination:") > 0);
- Assert.assertTrue(frame.indexOf(getName()) > 0);
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+ Assert.assertTrue(frame.indexOf("destination:") > 0);
+ Assert.assertTrue(frame.indexOf(getName()) > 0);
- frame =
- "DISCONNECT\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
-
- // message should not be received as it was auto-acked
- MessageConsumer consumer = session.createConsumer(queue);
- Message message = consumer.receive(1000);
- Assert.assertNull(message);
+ frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ sendFrame(frame);
- }
+ // message should not be received as it was auto-acked
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message message = consumer.receive(1000);
+ Assert.assertNull(message);
- public void testSubscribeWithAutoAckAndBytesMessage() throws Exception {
+ }
- String frame =
- "CONNECT\n" +
- "login: brianm\n" +
- "passcode: wombats\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ public void testSubscribeWithAutoAckAndBytesMessage() throws Exception
+ {
- frame = receiveFrame(100000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ String frame = "CONNECT\n" + "login: brianm\n" +
"passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
- frame =
- "SUBSCRIBE\n" +
- "destination:" + getQueuePrefix() + getQueueName() +
"\n" +
- "ack:auto\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
- byte[] payload = new byte[]{1, 2, 3, 4, 5};
- sendMessage(payload, queue);
+ frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() +
getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
+ sendFrame(frame);
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("MESSAGE"));
+ byte[] payload = new byte[] { 1, 2, 3, 4, 5 };
+ sendMessage(payload, queue);
- Pattern cl = Pattern.compile("Content-length:\\s*(\\d+)",
Pattern.CASE_INSENSITIVE);
- Matcher cl_matcher = cl.matcher(frame);
- Assert.assertTrue(cl_matcher.find());
- Assert.assertEquals("5", cl_matcher.group(1));
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
- Assert.assertFalse(Pattern.compile("type:\\s*null",
Pattern.CASE_INSENSITIVE).matcher(frame).find());
- Assert.assertTrue(frame.indexOf(new String(payload)) > -1);
-
- frame =
- "DISCONNECT\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
- }
+ Pattern cl = Pattern.compile("Content-length:\\s*(\\d+)",
Pattern.CASE_INSENSITIVE);
+ Matcher cl_matcher = cl.matcher(frame);
+ Assert.assertTrue(cl_matcher.find());
+ Assert.assertEquals("5", cl_matcher.group(1));
- public void testSubscribeWithMessageSentWithProperties() throws Exception {
+ Assert.assertFalse(Pattern.compile("type:\\s*null",
Pattern.CASE_INSENSITIVE).matcher(frame).find());
+ Assert.assertTrue(frame.indexOf(new String(payload)) > -1);
- String frame =
- "CONNECT\n" +
- "login: brianm\n" +
- "passcode: wombats\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ sendFrame(frame);
+ }
- frame = receiveFrame(100000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ public void testSubscribeWithMessageSentWithProperties() throws Exception
+ {
- frame =
- "SUBSCRIBE\n" +
- "destination:" + getQueuePrefix() + getQueueName() +
"\n" +
- "ack:auto\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ String frame = "CONNECT\n" + "login: brianm\n" +
"passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
- MessageProducer producer = session.createProducer(queue);
- BytesMessage message = session.createBytesMessage();
- message.setStringProperty("S", "value");
- message.setBooleanProperty("n", false);
- message.setByteProperty("byte", (byte) 9);
- message.setDoubleProperty("d", 2.0);
- message.setFloatProperty("f", (float) 6.0);
- message.setIntProperty("i", 10);
- message.setLongProperty("l", 121);
- message.setShortProperty("s", (short) 12);
- message.writeBytes("Hello World".getBytes("UTF-8"));
- producer.send(message);
+ frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
- frame = receiveFrame(10000);
- Assert.assertNotNull(frame);
- Assert.assertTrue(frame.startsWith("MESSAGE"));
- Assert.assertTrue(frame.indexOf("S:") > 0);
- Assert.assertTrue(frame.indexOf("n:") > 0);
- Assert.assertTrue(frame.indexOf("byte:") > 0);
- Assert.assertTrue(frame.indexOf("d:") > 0);
- Assert.assertTrue(frame.indexOf("f:") > 0);
- Assert.assertTrue(frame.indexOf("i:") > 0);
- Assert.assertTrue(frame.indexOf("l:") > 0);
- Assert.assertTrue(frame.indexOf("s:") > 0);
- Assert.assertTrue(frame.indexOf("Hello World") > 0);
+ frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() +
getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
+ sendFrame(frame);
-// System.out.println("out: "+frame);
+ MessageProducer producer = session.createProducer(queue);
+ BytesMessage message = session.createBytesMessage();
+ message.setStringProperty("S", "value");
+ message.setBooleanProperty("n", false);
+ message.setByteProperty("byte", (byte)9);
+ message.setDoubleProperty("d", 2.0);
+ message.setFloatProperty("f", (float)6.0);
+ message.setIntProperty("i", 10);
+ message.setLongProperty("l", 121);
+ message.setShortProperty("s", (short)12);
+ message.writeBytes("Hello World".getBytes("UTF-8"));
+ producer.send(message);
- frame =
- "DISCONNECT\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
- }
-
- public void testSubscribeWithID() throws Exception {
+ frame = receiveFrame(10000);
+ Assert.assertNotNull(frame);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+ Assert.assertTrue(frame.indexOf("S:") > 0);
+ Assert.assertTrue(frame.indexOf("n:") > 0);
+ Assert.assertTrue(frame.indexOf("byte:") > 0);
+ Assert.assertTrue(frame.indexOf("d:") > 0);
+ Assert.assertTrue(frame.indexOf("f:") > 0);
+ Assert.assertTrue(frame.indexOf("i:") > 0);
+ Assert.assertTrue(frame.indexOf("l:") > 0);
+ Assert.assertTrue(frame.indexOf("s:") > 0);
+ Assert.assertTrue(frame.indexOf("Hello World") > 0);
- String frame =
- "CONNECT\n" +
- "login: brianm\n" +
- "passcode: wombats\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ // System.out.println("out: "+frame);
- frame = receiveFrame(100000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ sendFrame(frame);
+ }
- frame =
- "SUBSCRIBE\n" +
- "destination:" + getQueuePrefix() + getQueueName() +
"\n" +
- "ack:auto\n" +
- "id: mysubid\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ public void testSubscribeWithID() throws Exception
+ {
- sendMessage(getName());
+ String frame = "CONNECT\n" + "login: brianm\n" +
"passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("MESSAGE"));
- Assert.assertTrue(frame.indexOf("destination:") > 0);
- Assert.assertTrue(frame.indexOf("subscription:") > 0);
- Assert.assertTrue(frame.indexOf(getName()) > 0);
+ frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
- frame =
- "DISCONNECT\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ frame = "SUBSCRIBE\n" + "destination:" +
+ getQueuePrefix() +
+ getQueueName() +
+ "\n" +
+ "ack:auto\n" +
+ "id: mysubid\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ sendMessage(getName());
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+ Assert.assertTrue(frame.indexOf("destination:") > 0);
+ Assert.assertTrue(frame.indexOf("subscription:") > 0);
+ Assert.assertTrue(frame.indexOf(getName()) > 0);
+
+ frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ sendFrame(frame);
}
- public void testBodyWithUTF8() throws Exception {
+ public void testBodyWithUTF8() throws Exception
+ {
- String frame =
- "CONNECT\n" +
- "login: brianm\n" +
- "passcode: wombats\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ String frame = "CONNECT\n" + "login: brianm\n" +
"passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
- frame = receiveFrame(100000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
- frame =
- "SUBSCRIBE\n" +
- "destination:" + getQueuePrefix() + getQueueName() +
"\n" +
- "ack:auto\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() +
getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
+ sendFrame(frame);
- String text = "A" + "\u00ea" + "\u00f1" +
- "\u00fc" + "C";
- System.out.println(text);
- sendMessage(text);
+ String text = "A" + "\u00ea" + "\u00f1" +
"\u00fc" + "C";
+ System.out.println(text);
+ sendMessage(text);
- frame = receiveFrame(10000);
- System.out.println(frame);
- Assert.assertTrue(frame.startsWith("MESSAGE"));
- Assert.assertTrue(frame.indexOf("destination:") > 0);
- Assert.assertTrue(frame.indexOf(text) > 0);
+ frame = receiveFrame(10000);
+ System.out.println(frame);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+ Assert.assertTrue(frame.indexOf("destination:") > 0);
+ Assert.assertTrue(frame.indexOf(text) > 0);
- frame =
- "DISCONNECT\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ sendFrame(frame);
}
-
- public void testMessagesAreInOrder() throws Exception {
- int ctr = 10;
- String[] data = new String[ctr];
- String frame =
- "CONNECT\n" +
- "login: brianm\n" +
- "passcode: wombats\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ public void testMessagesAreInOrder() throws Exception
+ {
+ int ctr = 10;
+ String[] data = new String[ctr];
- frame = receiveFrame(100000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ String frame = "CONNECT\n" + "login: brianm\n" +
"passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
- frame =
- "SUBSCRIBE\n" +
- "destination:" + getQueuePrefix() + getQueueName() +
"\n" +
- "ack:auto\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
- for (int i = 0; i < ctr; ++i) {
- data[i] = getName() + i;
- sendMessage(data[i]);
- }
+ frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() +
getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
+ sendFrame(frame);
- for (int i = 0; i < ctr; ++i) {
- frame = receiveFrame(1000);
- Assert.assertTrue("Message not in order", frame.indexOf(data[i])
>= 0);
- }
+ for (int i = 0; i < ctr; ++i)
+ {
+ data[i] = getName() + i;
+ sendMessage(data[i]);
+ }
- // sleep a while before publishing another set of messages
- waitForFrameToTakeEffect();
+ for (int i = 0; i < ctr; ++i)
+ {
+ frame = receiveFrame(1000);
+ Assert.assertTrue("Message not in order", frame.indexOf(data[i]) >=
0);
+ }
- for (int i = 0; i < ctr; ++i) {
- data[i] = getName() + ":second:" + i;
- sendMessage(data[i]);
- }
+ // sleep a while before publishing another set of messages
+ waitForFrameToTakeEffect();
- for (int i = 0; i < ctr; ++i) {
- frame = receiveFrame(1000);
- Assert.assertTrue("Message not in order", frame.indexOf(data[i])
>= 0);
- }
+ for (int i = 0; i < ctr; ++i)
+ {
+ data[i] = getName() + ":second:" + i;
+ sendMessage(data[i]);
+ }
- frame =
- "DISCONNECT\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
- }
+ for (int i = 0; i < ctr; ++i)
+ {
+ frame = receiveFrame(1000);
+ Assert.assertTrue("Message not in order", frame.indexOf(data[i]) >=
0);
+ }
- public void testSubscribeWithAutoAckAndSelector() throws Exception {
+ frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ sendFrame(frame);
+ }
- String frame =
- "CONNECT\n" +
- "login: brianm\n" +
- "passcode: wombats\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ public void testSubscribeWithAutoAckAndSelector() throws Exception
+ {
- frame = receiveFrame(100000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ String frame = "CONNECT\n" + "login: brianm\n" +
"passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
- frame =
- "SUBSCRIBE\n" +
- "destination:" + getQueuePrefix() + getQueueName() +
"\n" +
- "selector: foo = 'zzz'\n" +
- "ack:auto\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
- sendMessage("Ignored message", "foo", "1234");
- sendMessage("Real message", "foo", "zzz");
+ frame = "SUBSCRIBE\n" + "destination:" +
+ getQueuePrefix() +
+ getQueueName() +
+ "\n" +
+ "selector: foo = 'zzz'\n" +
+ "ack:auto\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("MESSAGE"));
- Assert.assertTrue("Should have received the real message but got: " +
frame, frame.indexOf("Real message") > 0);
+ sendMessage("Ignored message", "foo", "1234");
+ sendMessage("Real message", "foo", "zzz");
- frame =
- "DISCONNECT\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
- }
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+ Assert.assertTrue("Should have received the real message but got: " +
frame, frame.indexOf("Real message") > 0);
- public void testSubscribeWithClientAck() throws Exception {
+ frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ sendFrame(frame);
+ }
- String frame =
- "CONNECT\n" +
- "login: brianm\n" +
- "passcode: wombats\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ public void testSubscribeWithClientAck() throws Exception
+ {
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ String frame = "CONNECT\n" + "login: brianm\n" +
"passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
- frame =
- "SUBSCRIBE\n" +
- "destination:" + getQueuePrefix() + getQueueName() +
"\n" +
- "ack:client\n\n" +
- Stomp.NULL;
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
- sendFrame(frame);
-
- sendMessage(getName());
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("MESSAGE"));
- Pattern cl = Pattern.compile("message-id:\\s*(\\S+)",
Pattern.CASE_INSENSITIVE);
- Matcher cl_matcher = cl.matcher(frame);
- Assert.assertTrue(cl_matcher.find());
- String messageID = cl_matcher.group(1);
+ frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() +
getQueueName() + "\n" + "ack:client\n\n" + Stomp.NULL;
- frame =
- "ACK\n" +
- "message-id: " + messageID + "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ sendFrame(frame);
- frame =
- "DISCONNECT\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ sendMessage(getName());
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+ Pattern cl = Pattern.compile("message-id:\\s*(\\S+)",
Pattern.CASE_INSENSITIVE);
+ Matcher cl_matcher = cl.matcher(frame);
+ Assert.assertTrue(cl_matcher.find());
+ String messageID = cl_matcher.group(1);
- // message should not be received since message was acknowledged by the client
- MessageConsumer consumer = session.createConsumer(queue);
- Message message = consumer.receive(1000);
- Assert.assertNull(message);
+ frame = "ACK\n" + "message-id: " + messageID + "\n\n"
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ sendFrame(frame);
+
+ // message should not be received since message was acknowledged by the client
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message message = consumer.receive(1000);
+ Assert.assertNull(message);
}
-
- public void testRedeliveryWithClientAck() throws Exception {
- String frame =
- "CONNECT\n" +
- "login: brianm\n" +
- "passcode: wombats\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ public void testRedeliveryWithClientAck() throws Exception
+ {
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ String frame = "CONNECT\n" + "login: brianm\n" +
"passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
- frame =
- "SUBSCRIBE\n" +
- "destination:" + getQueuePrefix() + getQueueName() +
"\n" +
- "ack:client\n\n" +
- Stomp.NULL;
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
- sendFrame(frame);
-
- sendMessage(getName());
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("MESSAGE"));
+ frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() +
getQueueName() + "\n" + "ack:client\n\n" + Stomp.NULL;
- frame =
- "DISCONNECT\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ sendFrame(frame);
- // message should be received since message was not acknowledged
- MessageConsumer consumer = session.createConsumer(queue);
- Message message = consumer.receive(1000);
- Assert.assertNotNull(message);
- Assert.assertTrue(message.getJMSRedelivered());
- }
-
- public void
testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithNoDisconnectFrame() throws
Exception {
- assertSubscribeWithClientAckThenConsumeWithAutoAck(false);
- }
+ sendMessage(getName());
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
- public void
testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithExplicitDisconnect() throws
Exception {
- assertSubscribeWithClientAckThenConsumeWithAutoAck(true);
- }
+ frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ sendFrame(frame);
- protected void assertSubscribeWithClientAckThenConsumeWithAutoAck(boolean
sendDisconnect) throws Exception {
+ // message should be received since message was not acknowledged
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message message = consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertTrue(message.getJMSRedelivered());
+ }
- String frame =
- "CONNECT\n" +
- "login: brianm\n" +
- "passcode: wombats\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ public void
testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithNoDisconnectFrame() throws
Exception
+ {
+ assertSubscribeWithClientAckThenConsumeWithAutoAck(false);
+ }
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ public void
testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithExplicitDisconnect() throws
Exception
+ {
+ assertSubscribeWithClientAckThenConsumeWithAutoAck(true);
+ }
- frame =
- "SUBSCRIBE\n" +
- "destination:" + getQueuePrefix() + getQueueName() +
"\n" +
- "ack:client\n\n" +
- Stomp.NULL;
+ protected void assertSubscribeWithClientAckThenConsumeWithAutoAck(boolean
sendDisconnect) throws Exception
+ {
- sendFrame(frame);
- sendMessage(getName());
+ String frame = "CONNECT\n" + "login: brianm\n" +
"passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("MESSAGE"));
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
- log.info("Reconnecting!");
-
- if (sendDisconnect) {
- frame =
- "DISCONNECT\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
- waitForFrameToTakeEffect();
- reconnect();
- }
- else {
- reconnect(1000);
- waitForFrameToTakeEffect();
- }
+ frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() +
getQueueName() + "\n" + "ack:client\n\n" + Stomp.NULL;
+ sendFrame(frame);
+ sendMessage(getName());
- // message should be received since message was not acknowledged
- frame =
- "CONNECT\n" +
- "login: brianm\n" +
- "passcode: wombats\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ log.info("Reconnecting!");
- frame =
- "SUBSCRIBE\n" +
- "destination:" + getQueuePrefix() + getQueueName() +
"\n\n" +
- Stomp.NULL;
+ if (sendDisconnect)
+ {
+ frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ sendFrame(frame);
+ waitForFrameToTakeEffect();
+ reconnect();
+ }
+ else
+ {
+ reconnect(1000);
+ waitForFrameToTakeEffect();
+ }
- sendFrame(frame);
+ // message should be received since message was not acknowledged
+ frame = "CONNECT\n" + "login: brianm\n" + "passcode:
wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("MESSAGE"));
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
- frame =
- "DISCONNECT\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
- waitForFrameToTakeEffect();
-
- // now lets make sure we don't see the message again
- reconnect();
+ frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() +
getQueueName() + "\n\n" + Stomp.NULL;
- frame =
- "CONNECT\n" +
- "login: brianm\n" +
- "passcode: wombats\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ sendFrame(frame);
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
- frame =
- "SUBSCRIBE\n" +
- "destination:" + getQueuePrefix() + getQueueName() +
"\n" +
- "receipt: 1234\n\n" +
- Stomp.NULL;
+ frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ sendFrame(frame);
+ waitForFrameToTakeEffect();
- sendFrame(frame);
- // wait for SUBSCRIBE's receipt
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("RECEIPT"));
+ // now lets make sure we don't see the message again
+ reconnect();
- sendMessage("shouldBeNextMessage");
+ frame = "CONNECT\n" + "login: brianm\n" + "passcode:
wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("MESSAGE"));
- System.out.println(frame);
- Assert.assertTrue(frame.contains("shouldBeNextMessage"));
- }
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
- public void testUnsubscribe() throws Exception {
+ frame = "SUBSCRIBE\n" + "destination:" +
+ getQueuePrefix() +
+ getQueueName() +
+ "\n" +
+ "receipt: 1234\n\n" +
+ Stomp.NULL;
- String frame =
- "CONNECT\n" +
- "login: brianm\n" +
- "passcode: wombats\n\n" +
- Stomp.NULL;
- sendFrame(frame);
- frame = receiveFrame(100000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ sendFrame(frame);
+ // wait for SUBSCRIBE's receipt
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("RECEIPT"));
- frame =
- "SUBSCRIBE\n" +
- "destination:" + getQueuePrefix() + getQueueName() +
"\n" +
- "ack:auto\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ sendMessage("shouldBeNextMessage");
- //send a message to our queue
- sendMessage("first message");
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+ System.out.println(frame);
+ Assert.assertTrue(frame.contains("shouldBeNextMessage"));
+ }
- //receive message from socket
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("MESSAGE"));
+ public void testUnsubscribe() throws Exception
+ {
- //remove suscription
- frame =
- "UNSUBSCRIBE\n" +
- "destination:" + getQueuePrefix() + getQueueName() +
"\n" +
- "receipt:567\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
- waitForReceipt();
+ String frame = "CONNECT\n" + "login: brianm\n" +
"passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
+ frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
- //send a message to our queue
- sendMessage("second message");
+ frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() +
getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
+ sendFrame(frame);
- try {
- frame = receiveFrame(1000);
- log.info("Received frame: " + frame);
- Assert.fail("No message should have been received since subscription was
removed");
- }
- catch (SocketTimeoutException e) {
+ // send a message to our queue
+ sendMessage("first message");
- }
- }
+ // receive message from socket
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
- public void testUnsubscribeWithID() throws Exception {
+ // remove suscription
+ frame = "UNSUBSCRIBE\n" + "destination:" +
+ getQueuePrefix() +
+ getQueueName() +
+ "\n" +
+ "receipt:567\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+ waitForReceipt();
- String frame =
- "CONNECT\n" +
- "login: brianm\n" +
- "passcode: wombats\n\n" +
- Stomp.NULL;
- sendFrame(frame);
- frame = receiveFrame(100000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ // send a message to our queue
+ sendMessage("second message");
- frame =
- "SUBSCRIBE\n" +
- "destination:" + getQueuePrefix() + getQueueName() +
"\n" +
- "id: mysubid\n" +
- "ack:auto\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ try
+ {
+ frame = receiveFrame(1000);
+ log.info("Received frame: " + frame);
+ Assert.fail("No message should have been received since subscription was
removed");
+ }
+ catch (SocketTimeoutException e)
+ {
- //send a message to our queue
- sendMessage("first message");
+ }
+ }
- //receive message from socket
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("MESSAGE"));
+ public void testUnsubscribeWithID() throws Exception
+ {
- //remove suscription
- frame =
- "UNSUBSCRIBE\n" +
- "id:mysubid\n" +
- "receipt: 345\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
- waitForReceipt();
+ String frame = "CONNECT\n" + "login: brianm\n" +
"passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
+ frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
- //send a message to our queue
- sendMessage("second message");
+ frame = "SUBSCRIBE\n" + "destination:" +
+ getQueuePrefix() +
+ getQueueName() +
+ "\n" +
+ "id: mysubid\n" +
+ "ack:auto\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
- try {
- frame = receiveFrame(1000);
- log.info("Received frame: " + frame);
- Assert.fail("No message should have been received since subscription was
removed");
- }
- catch (SocketTimeoutException e) {
+ // send a message to our queue
+ sendMessage("first message");
- }
- }
+ // receive message from socket
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
- public void testTransactionCommit() throws Exception {
- MessageConsumer consumer = session.createConsumer(queue);
+ // remove suscription
+ frame = "UNSUBSCRIBE\n" + "id:mysubid\n" + "receipt:
345\n" + "\n\n" + Stomp.NULL;
+ sendFrame(frame);
+ waitForReceipt();
- String frame =
- "CONNECT\n" +
- "login: brianm\n" +
- "passcode: wombats\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ // send a message to our queue
+ sendMessage("second message");
- String f = receiveFrame(1000);
- Assert.assertTrue(f.startsWith("CONNECTED"));
+ try
+ {
+ frame = receiveFrame(1000);
+ log.info("Received frame: " + frame);
+ Assert.fail("No message should have been received since subscription was
removed");
+ }
+ catch (SocketTimeoutException e)
+ {
- frame =
- "BEGIN\n" +
- "transaction: tx1\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ }
+ }
- frame =
- "SEND\n" +
- "destination:" + getQueuePrefix() + getQueueName() +
"\n" +
- "transaction: tx1\n" +
- "receipt: 123\n" +
- "\n\n" +
- "Hello World" +
- Stomp.NULL;
- sendFrame(frame);
- waitForReceipt();
-
- // check the message is not committed
- assertNull(consumer.receive(100));
-
- frame =
- "COMMIT\n" +
- "transaction: tx1\n" +
- "receipt:456\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
- waitForReceipt();
+ public void testTransactionCommit() throws Exception
+ {
+ MessageConsumer consumer = session.createConsumer(queue);
- Message message = consumer.receive(1000);
- Assert.assertNotNull("Should have received a message", message);
- }
-
- public void testSuccessiveTransactionsWithSameID() throws Exception {
- MessageConsumer consumer = session.createConsumer(queue);
+ String frame = "CONNECT\n" + "login: brianm\n" +
"passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
- String frame =
- "CONNECT\n" +
- "login: brianm\n" +
- "passcode: wombats\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ String f = receiveFrame(1000);
+ Assert.assertTrue(f.startsWith("CONNECTED"));
- String f = receiveFrame(1000);
- Assert.assertTrue(f.startsWith("CONNECTED"));
+ frame = "BEGIN\n" + "transaction: tx1\n" + "\n\n" +
Stomp.NULL;
+ sendFrame(frame);
- // first tx
- frame =
- "BEGIN\n" +
- "transaction: tx1\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ frame = "SEND\n" + "destination:" +
+ getQueuePrefix() +
+ getQueueName() +
+ "\n" +
+ "transaction: tx1\n" +
+ "receipt: 123\n" +
+ "\n\n" +
+ "Hello World" +
+ Stomp.NULL;
+ sendFrame(frame);
+ waitForReceipt();
- frame =
- "SEND\n" +
- "destination:" + getQueuePrefix() + getQueueName() +
"\n" +
- "transaction: tx1\n" +
- "\n\n" +
- "Hello World" +
- Stomp.NULL;
- sendFrame(frame);
+ // check the message is not committed
+ assertNull(consumer.receive(100));
- frame =
- "COMMIT\n" +
- "transaction: tx1\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ frame = "COMMIT\n" + "transaction: tx1\n" +
"receipt:456\n" + "\n\n" + Stomp.NULL;
+ sendFrame(frame);
+ waitForReceipt();
- Message message = consumer.receive(1000);
- Assert.assertNotNull("Should have received a message", message);
+ Message message = consumer.receive(1000);
+ Assert.assertNotNull("Should have received a message", message);
+ }
- // 2nd tx with same tx ID
- frame =
- "BEGIN\n" +
- "transaction: tx1\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ public void testSuccessiveTransactionsWithSameID() throws Exception
+ {
+ MessageConsumer consumer = session.createConsumer(queue);
- frame =
- "SEND\n" +
- "destination:" + getQueuePrefix() + getQueueName() +
"\n" +
- "transaction: tx1\n" +
- "\n\n" +
- "Hello World" +
- Stomp.NULL;
- sendFrame(frame);
+ String frame = "CONNECT\n" + "login: brianm\n" +
"passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
- frame =
- "COMMIT\n" +
- "transaction: tx1\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ String f = receiveFrame(1000);
+ Assert.assertTrue(f.startsWith("CONNECTED"));
- message = consumer.receive(1000);
- Assert.assertNotNull("Should have received a message", message);
-}
-
- public void testBeginSameTransactionTwice() throws Exception {
- String frame =
- "CONNECT\n" +
- "login: brianm\n" +
- "passcode: wombats\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ // first tx
+ frame = "BEGIN\n" + "transaction: tx1\n" + "\n\n" +
Stomp.NULL;
+ sendFrame(frame);
- String f = receiveFrame(1000);
- Assert.assertTrue(f.startsWith("CONNECTED"));
+ frame = "SEND\n" + "destination:" +
+ getQueuePrefix() +
+ getQueueName() +
+ "\n" +
+ "transaction: tx1\n" +
+ "\n\n" +
+ "Hello World" +
+ Stomp.NULL;
+ sendFrame(frame);
- frame =
- "BEGIN\n" +
- "transaction: tx1\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ frame = "COMMIT\n" + "transaction: tx1\n" + "\n\n" +
Stomp.NULL;
+ sendFrame(frame);
- // begin the tx a 2nd time
- frame =
- "BEGIN\n" +
- "transaction: tx1\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ Message message = consumer.receive(1000);
+ Assert.assertNotNull("Should have received a message", message);
- f = receiveFrame(1000);
- Assert.assertTrue(f.startsWith("ERROR"));
+ // 2nd tx with same tx ID
+ frame = "BEGIN\n" + "transaction: tx1\n" + "\n\n" +
Stomp.NULL;
+ sendFrame(frame);
+ frame = "SEND\n" + "destination:" +
+ getQueuePrefix() +
+ getQueueName() +
+ "\n" +
+ "transaction: tx1\n" +
+ "\n\n" +
+ "Hello World" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = "COMMIT\n" + "transaction: tx1\n" + "\n\n" +
Stomp.NULL;
+ sendFrame(frame);
+
+ message = consumer.receive(1000);
+ Assert.assertNotNull("Should have received a message", message);
}
- public void testTransactionRollback() throws Exception {
- MessageConsumer consumer = session.createConsumer(queue);
+ public void testBeginSameTransactionTwice() throws Exception
+ {
+ String frame = "CONNECT\n" + "login: brianm\n" +
"passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
- String frame =
- "CONNECT\n" +
- "login: brianm\n" +
- "passcode: wombats\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ String f = receiveFrame(1000);
+ Assert.assertTrue(f.startsWith("CONNECTED"));
- String f = receiveFrame(1000);
- Assert.assertTrue(f.startsWith("CONNECTED"));
+ frame = "BEGIN\n" + "transaction: tx1\n" + "\n\n" +
Stomp.NULL;
+ sendFrame(frame);
- frame =
- "BEGIN\n" +
- "transaction: tx1\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ // begin the tx a 2nd time
+ frame = "BEGIN\n" + "transaction: tx1\n" + "\n\n" +
Stomp.NULL;
+ sendFrame(frame);
- frame =
- "SEND\n" +
- "destination:" + getQueuePrefix() + getQueueName() +
"\n" +
- "transaction: tx1\n" +
- "\n" +
- "first message" +
- Stomp.NULL;
- sendFrame(frame);
+ f = receiveFrame(1000);
+ Assert.assertTrue(f.startsWith("ERROR"));
- //rollback first message
- frame =
- "ABORT\n" +
- "transaction: tx1\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ }
- frame =
- "BEGIN\n" +
- "transaction: tx1\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ public void testTransactionRollback() throws Exception
+ {
+ MessageConsumer consumer = session.createConsumer(queue);
- frame =
- "SEND\n" +
- "destination:" + getQueuePrefix() + getQueueName() +
"\n" +
- "transaction: tx1\n" +
- "\n" +
- "second message" +
- Stomp.NULL;
- sendFrame(frame);
+ String frame = "CONNECT\n" + "login: brianm\n" +
"passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
- frame =
- "COMMIT\n" +
- "transaction: tx1\n" +
- "receipt:789\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
- waitForReceipt();
+ String f = receiveFrame(1000);
+ Assert.assertTrue(f.startsWith("CONNECTED"));
- //only second msg should be received since first msg was rolled back
- BytesMessage message = (BytesMessage) consumer.receive(1000);
- Assert.assertNotNull(message);
- Assert.assertEquals("second message", readContent(message));
- }
-
- public void testSubscribeToTopic() throws Exception {
+ frame = "BEGIN\n" + "transaction: tx1\n" + "\n\n" +
Stomp.NULL;
+ sendFrame(frame);
- String frame =
- "CONNECT\n" +
- "login: brianm\n" +
- "passcode: wombats\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ frame = "SEND\n" + "destination:" +
+ getQueuePrefix() +
+ getQueueName() +
+ "\n" +
+ "transaction: tx1\n" +
+ "\n" +
+ "first message" +
+ Stomp.NULL;
+ sendFrame(frame);
- frame = receiveFrame(100000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ // rollback first message
+ frame = "ABORT\n" + "transaction: tx1\n" + "\n\n" +
Stomp.NULL;
+ sendFrame(frame);
- frame =
- "SUBSCRIBE\n" +
- "destination:" + getTopicPrefix() + getTopicName() +
"\n" +
- "receipt: 12\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
- // wait for SUBSCRIBE's receipt
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("RECEIPT"));
+ frame = "BEGIN\n" + "transaction: tx1\n" + "\n\n" +
Stomp.NULL;
+ sendFrame(frame);
- sendMessage(getName(), topic);
+ frame = "SEND\n" + "destination:" +
+ getQueuePrefix() +
+ getQueueName() +
+ "\n" +
+ "transaction: tx1\n" +
+ "\n" +
+ "second message" +
+ Stomp.NULL;
+ sendFrame(frame);
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("MESSAGE"));
- Assert.assertTrue(frame.indexOf("destination:") > 0);
- Assert.assertTrue(frame.indexOf(getName()) > 0);
+ frame = "COMMIT\n" + "transaction: tx1\n" +
"receipt:789\n" + "\n\n" + Stomp.NULL;
+ sendFrame(frame);
+ waitForReceipt();
- frame =
- "UNSUBSCRIBE\n" +
- "destination:" + getTopicPrefix() + getTopicName() +
"\n" +
- "receipt: 1234\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
- // wait for UNSUBSCRIBE's receipt
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("RECEIPT"));
-
- sendMessage(getName(), topic);
+ // only second msg should be received since first msg was rolled back
+ BytesMessage message = (BytesMessage)consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("second message", readContent(message));
+ }
- try {
- frame = receiveFrame(1000);
- log.info("Received frame: " + frame);
- Assert.fail("No message should have been received since subscription was
removed");
+ public void testSubscribeToTopic() throws Exception
+ {
+
+ String frame = "CONNECT\n" + "login: brianm\n" +
"passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame = "SUBSCRIBE\n" + "destination:" +
+ getTopicPrefix() +
+ getTopicName() +
+ "\n" +
+ "receipt: 12\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+ // wait for SUBSCRIBE's receipt
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("RECEIPT"));
+
+ sendMessage(getName(), topic);
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+ Assert.assertTrue(frame.indexOf("destination:") > 0);
+ Assert.assertTrue(frame.indexOf(getName()) > 0);
+
+ frame = "UNSUBSCRIBE\n" + "destination:" +
+ getTopicPrefix() +
+ getTopicName() +
+ "\n" +
+ "receipt: 1234\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+ // wait for UNSUBSCRIBE's receipt
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("RECEIPT"));
+
+ sendMessage(getName(), topic);
+
+ try
+ {
+ frame = receiveFrame(1000);
+ log.info("Received frame: " + frame);
+ Assert.fail("No message should have been received since subscription was
removed");
}
- catch (SocketTimeoutException e) {
+ catch (SocketTimeoutException e)
+ {
}
-
- frame =
- "DISCONNECT\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+
+ frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ sendFrame(frame);
}
-
- public void testDurableSubscriberWithReconnection() throws Exception {
- String connectFame =
- "CONNECT\n" +
- "login: brianm\n" +
- "passcode: wombats\n" +
- "client-id: myclientid\n\n" +
- Stomp.NULL;
- sendFrame(connectFame);
+ public void testDurableSubscriberWithReconnection() throws Exception
+ {
- String frame = receiveFrame(100000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ String connectFame = "CONNECT\n" + "login: brianm\n" +
+ "passcode: wombats\n" +
+ "client-id: myclientid\n\n" +
+ Stomp.NULL;
+ sendFrame(connectFame);
- String subscribeFrame =
- "SUBSCRIBE\n" +
- "destination:" + getTopicPrefix() + getTopicName() +
"\n" +
- "durable-subscriber-name: " + getName() + "\n"
+
- "\n\n" +
- Stomp.NULL;
- sendFrame(subscribeFrame);
- waitForFrameToTakeEffect();
+ String frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
- String disconnectFrame =
- "DISCONNECT\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(disconnectFrame);
- waitForFrameToTakeEffect();
-
- // send the message when the durable subscriber is disconnected
- sendMessage(getName(), topic);
+ String subscribeFrame = "SUBSCRIBE\n" + "destination:" +
+ getTopicPrefix() +
+ getTopicName() +
+ "\n" +
+ "durable-subscriber-name: " +
+ getName() +
+ "\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(subscribeFrame);
+ waitForFrameToTakeEffect();
- reconnect(1000);
- sendFrame(connectFame);
- frame = receiveFrame(100000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
-
- sendFrame(subscribeFrame);
+ String disconnectFrame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ sendFrame(disconnectFrame);
+ waitForFrameToTakeEffect();
- // we must have received the message
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("MESSAGE"));
- Assert.assertTrue(frame.indexOf("destination:") > 0);
- Assert.assertTrue(frame.indexOf(getName()) > 0);
+ // send the message when the durable subscriber is disconnected
+ sendMessage(getName(), topic);
- String unsubscribeFrame =
- "UNSUBSCRIBE\n" +
- "destination:" + getTopicPrefix() + getTopicName() +
"\n" +
- "receipt: 1234\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(unsubscribeFrame);
- // wait for UNSUBSCRIBE's receipt
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("RECEIPT"));
-
- sendFrame(disconnectFrame);
+ reconnect(1000);
+ sendFrame(connectFame);
+ frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ sendFrame(subscribeFrame);
+
+ // we must have received the message
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+ Assert.assertTrue(frame.indexOf("destination:") > 0);
+ Assert.assertTrue(frame.indexOf(getName()) > 0);
+
+ String unsubscribeFrame = "UNSUBSCRIBE\n" + "destination:" +
+ getTopicPrefix() +
+ getTopicName() +
+ "\n" +
+ "receipt: 1234\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(unsubscribeFrame);
+ // wait for UNSUBSCRIBE's receipt
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("RECEIPT"));
+
+ sendFrame(disconnectFrame);
}
-
- public void testDurableSubscriber() throws Exception {
- String frame =
- "CONNECT\n" +
- "login: brianm\n" +
- "passcode: wombats\n" +
- "client-id: myclientid\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ public void testDurableSubscriber() throws Exception
+ {
- frame = receiveFrame(100000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ String frame = "CONNECT\n" + "login: brianm\n" +
"passcode: wombats\n" + "client-id: myclientid\n\n" + Stomp.NULL;
+ sendFrame(frame);
- String subscribeFrame =
- "SUBSCRIBE\n" +
- "destination:" + getTopicPrefix() + getTopicName() +
"\n" +
- "receipt: 12\n" +
- "durable-subscriber-name: " + getName() + "\n"
+
- "\n\n" +
- Stomp.NULL;
- sendFrame(subscribeFrame);
- // wait for SUBSCRIBE's receipt
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("RECEIPT"));
+ frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
- // creating a subscriber with the same durable-subscriber-name must fail
- sendFrame(subscribeFrame);
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("ERROR"));
-
- frame =
- "DISCONNECT\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ String subscribeFrame = "SUBSCRIBE\n" + "destination:" +
+ getTopicPrefix() +
+ getTopicName() +
+ "\n" +
+ "receipt: 12\n" +
+ "durable-subscriber-name: " +
+ getName() +
+ "\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(subscribeFrame);
+ // wait for SUBSCRIBE's receipt
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("RECEIPT"));
+
+ // creating a subscriber with the same durable-subscriber-name must fail
+ sendFrame(subscribeFrame);
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("ERROR"));
+
+ frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ sendFrame(frame);
}
-
- public void testSubscribeToTopicWithNoLocal() throws Exception {
- String frame =
- "CONNECT\n" +
- "login: brianm\n" +
- "passcode: wombats\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ public void testSubscribeToTopicWithNoLocal() throws Exception
+ {
- frame = receiveFrame(100000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ String frame = "CONNECT\n" + "login: brianm\n" +
"passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
- frame =
- "SUBSCRIBE\n" +
- "destination:" + getTopicPrefix() + getTopicName() +
"\n" +
- "receipt: 12\n" +
- "no-local: true\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
- // wait for SUBSCRIBE's receipt
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("RECEIPT"));
+ frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
- // send a message on the same connection => it should not be received
- frame = "SEND\n" +
- "destination:" + getTopicPrefix() + getTopicName() + "\n\n"
+
- "Hello World" +
- Stomp.NULL;
- sendFrame(frame);
-
- try {
- frame = receiveFrame(2000);
- log.info("Received frame: " + frame);
- Assert.fail("No message should have been received since subscription is
noLocal");
+ frame = "SUBSCRIBE\n" + "destination:" +
+ getTopicPrefix() +
+ getTopicName() +
+ "\n" +
+ "receipt: 12\n" +
+ "no-local: true\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+ // wait for SUBSCRIBE's receipt
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("RECEIPT"));
+
+ // send a message on the same connection => it should not be received
+ frame = "SEND\n" + "destination:" + getTopicPrefix() +
getTopicName() + "\n\n" + "Hello World" + Stomp.NULL;
+ sendFrame(frame);
+
+ try
+ {
+ frame = receiveFrame(2000);
+ log.info("Received frame: " + frame);
+ Assert.fail("No message should have been received since subscription is
noLocal");
}
- catch (SocketTimeoutException e) {
+ catch (SocketTimeoutException e)
+ {
}
-
+
// send message on another JMS connection => it should be received
sendMessage(getName(), topic);
frame = receiveFrame(10000);
Assert.assertTrue(frame.startsWith("MESSAGE"));
Assert.assertTrue(frame.indexOf("destination:") > 0);
Assert.assertTrue(frame.indexOf(getName()) > 0);
-
- frame =
- "DISCONNECT\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+
+ frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ sendFrame(frame);
}
-
- public void testClientAckNotPartOfTransaction() throws Exception {
- String frame =
- "CONNECT\n" +
- "login: brianm\n" +
- "passcode: wombats\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ public void testClientAckNotPartOfTransaction() throws Exception
+ {
- frame = receiveFrame(100000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ String frame = "CONNECT\n" + "login: brianm\n" +
"passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
- frame =
- "SUBSCRIBE\n" +
- "destination:" + getQueuePrefix() + getQueueName() +
"\n" +
- "ack:client\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
- sendMessage(getName());
+ frame = "SUBSCRIBE\n" + "destination:" +
+ getQueuePrefix() +
+ getQueueName() +
+ "\n" +
+ "ack:client\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("MESSAGE"));
- Assert.assertTrue(frame.indexOf("destination:") > 0);
- Assert.assertTrue(frame.indexOf(getName()) > 0);
- Assert.assertTrue(frame.indexOf("message-id:") > 0);
- Pattern cl = Pattern.compile("message-id:\\s*(\\S+)",
Pattern.CASE_INSENSITIVE);
- Matcher cl_matcher = cl.matcher(frame);
- Assert.assertTrue(cl_matcher.find());
- String messageID = cl_matcher.group(1);
+ sendMessage(getName());
- frame =
- "BEGIN\n" +
- "transaction: tx1\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+ Assert.assertTrue(frame.indexOf("destination:") > 0);
+ Assert.assertTrue(frame.indexOf(getName()) > 0);
+ Assert.assertTrue(frame.indexOf("message-id:") > 0);
+ Pattern cl = Pattern.compile("message-id:\\s*(\\S+)",
Pattern.CASE_INSENSITIVE);
+ Matcher cl_matcher = cl.matcher(frame);
+ Assert.assertTrue(cl_matcher.find());
+ String messageID = cl_matcher.group(1);
- frame =
- "ACK\n" +
- "message-id:" + messageID + "\n" +
- "transaction: tx1\n" +
- "\n" +
- "second message" +
- Stomp.NULL;
- sendFrame(frame);
+ frame = "BEGIN\n" + "transaction: tx1\n" + "\n\n" +
Stomp.NULL;
+ sendFrame(frame);
- frame =
- "ABORT\n" +
- "transaction: tx1\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
-
- try {
- frame = receiveFrame(1000);
- log.info("Received frame: " + frame);
- Assert.fail("No message should have been received as the message was
acked even though the transaction has been aborted");
- }
- catch (SocketTimeoutException e) {
- }
-
- frame =
- "UNSUBSCRIBE\n" +
- "destination:" + getQueuePrefix() + getQueueName() +
"\n\n" +
- Stomp.NULL;
- sendFrame(frame);
-
- frame =
- "DISCONNECT\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ frame = "ACK\n" + "message-id:" + messageID + "\n" +
"transaction: tx1\n" + "\n" + "second message" +
Stomp.NULL;
+ sendFrame(frame);
+
+ frame = "ABORT\n" + "transaction: tx1\n" + "\n\n" +
Stomp.NULL;
+ sendFrame(frame);
+
+ try
+ {
+ frame = receiveFrame(1000);
+ log.info("Received frame: " + frame);
+ Assert.fail("No message should have been received as the message was acked
even though the transaction has been aborted");
+ }
+ catch (SocketTimeoutException e)
+ {
+ }
+
+ frame = "UNSUBSCRIBE\n" + "destination:" + getQueuePrefix() +
getQueueName() + "\n\n" + Stomp.NULL;
+ sendFrame(frame);
+
+ frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ sendFrame(frame);
}
-
- // Implementation methods
- //-------------------------------------------------------------------------
- protected void setUp() throws Exception {
- super.setUp();
-
- server = createServer();
- server.start();
- connectionFactory = createConnectionFactory();
- stompSocket = createSocket();
- inputBuffer = new ByteArrayOutputStream();
+ // Implementation methods
+ // -------------------------------------------------------------------------
+ protected void setUp() throws Exception
+ {
+ super.setUp();
- connection = connectionFactory.createConnection();
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- queue = session.createQueue(getQueueName());
- topic = session.createTopic(getTopicName());
- connection.start();
- }
+ server = createServer();
+ server.start();
+ connectionFactory = createConnectionFactory();
- /**
- * @return
- * @throws Exception
- */
+ stompSocket = createSocket();
+ inputBuffer = new ByteArrayOutputStream();
+
+ connection = connectionFactory.createConnection();
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ queue = session.createQueue(getQueueName());
+ topic = session.createTopic(getTopicName());
+ connection.start();
+ }
+
+ /**
+ * @return
+ * @throws Exception
+ */
private JMSServerManager createServer() throws Exception
{
Configuration config = new ConfigurationImpl();
@@ -1479,149 +1283,178 @@
TransportConfiguration stompTransport = new
TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
config.getAcceptorConfigurations().add(stompTransport);
config.getAcceptorConfigurations().add(new
TransportConfiguration(InVMAcceptorFactory.class.getName()));
- HornetQServer hornetQServer = HornetQServers.newHornetQServer(config);
-
- JMSConfiguration jmsConfig = new JMSConfigurationImpl();
- jmsConfig.getQueueConfigurations().add(new
JMSQueueConfigurationImpl(getQueueName(), null, false, getQueueName()));
- jmsConfig.getTopicConfigurations().add(new TopicConfigurationImpl(getTopicName(),
getTopicName()));
- server = new JMSServerManagerImpl(hornetQServer, jmsConfig);
- server.setContext(null);
- return server;
+ HornetQServer hornetQServer = HornetQServers.newHornetQServer(config);
+
+ JMSConfiguration jmsConfig = new JMSConfigurationImpl();
+ jmsConfig.getQueueConfigurations()
+ .add(new JMSQueueConfigurationImpl(getQueueName(), null, false,
getQueueName()));
+ jmsConfig.getTopicConfigurations().add(new TopicConfigurationImpl(getTopicName(),
getTopicName()));
+ server = new JMSServerManagerImpl(hornetQServer, jmsConfig);
+ server.setContext(null);
+ return server;
}
- protected void tearDown() throws Exception {
- connection.close();
- if (stompSocket != null) {
- stompSocket.close();
- }
- server.stop();
-
- super.tearDown();
- }
+ protected void tearDown() throws Exception
+ {
+ connection.close();
+ if (stompSocket != null)
+ {
+ stompSocket.close();
+ }
+ server.stop();
- protected void reconnect() throws Exception {
- reconnect(0);
- }
- protected void reconnect(long sleep) throws Exception {
- stompSocket.close();
+ super.tearDown();
+ }
- if (sleep > 0) {
- Thread.sleep(sleep);
- }
+ protected void reconnect() throws Exception
+ {
+ reconnect(0);
+ }
- stompSocket = createSocket();
- inputBuffer = new ByteArrayOutputStream();
- }
+ protected void reconnect(long sleep) throws Exception
+ {
+ stompSocket.close();
- protected ConnectionFactory createConnectionFactory() {
- return new HornetQConnectionFactory(new
TransportConfiguration(InVMConnectorFactory.class.getName()));
- }
+ if (sleep > 0)
+ {
+ Thread.sleep(sleep);
+ }
- protected Socket createSocket() throws IOException {
- return new Socket("127.0.0.1", port);
- }
+ stompSocket = createSocket();
+ inputBuffer = new ByteArrayOutputStream();
+ }
- protected String getQueueName() {
- return "test";
- }
+ protected ConnectionFactory createConnectionFactory()
+ {
+ return new HornetQConnectionFactory(new
TransportConfiguration(InVMConnectorFactory.class.getName()));
+ }
- protected String getQueuePrefix() {
- return "jms.queue.";
+ protected Socket createSocket() throws IOException
+ {
+ return new Socket("127.0.0.1", port);
}
-
- protected String getTopicName() {
- return "testtopic";
+
+ protected String getQueueName()
+ {
+ return "test";
}
- protected String getTopicPrefix() {
- return "jms.topic.";
+ protected String getQueuePrefix()
+ {
+ return "jms.queue.";
}
- public void sendFrame(String data) throws Exception {
- byte[] bytes = data.getBytes("UTF-8");
- OutputStream outputStream = stompSocket.getOutputStream();
- for (int i = 0; i < bytes.length; i++) {
- outputStream.write(bytes[i]);
- }
- outputStream.flush();
- }
+ protected String getTopicName()
+ {
+ return "testtopic";
+ }
- public void sendFrame(byte[] data) throws Exception {
- OutputStream outputStream = stompSocket.getOutputStream();
- for (int i = 0; i < data.length; i++) {
- outputStream.write(data[i]);
- }
- outputStream.flush();
- }
-
- public String receiveFrame(long timeOut) throws Exception {
- stompSocket.setSoTimeout((int) timeOut);
- InputStream is = stompSocket.getInputStream();
- int c = 0;
- for (; ;) {
+ protected String getTopicPrefix()
+ {
+ return "jms.topic.";
+ }
+
+ public void sendFrame(String data) throws Exception
+ {
+ byte[] bytes = data.getBytes("UTF-8");
+ OutputStream outputStream = stompSocket.getOutputStream();
+ for (int i = 0; i < bytes.length; i++)
+ {
+ outputStream.write(bytes[i]);
+ }
+ outputStream.flush();
+ }
+
+ public void sendFrame(byte[] data) throws Exception
+ {
+ OutputStream outputStream = stompSocket.getOutputStream();
+ for (int i = 0; i < data.length; i++)
+ {
+ outputStream.write(data[i]);
+ }
+ outputStream.flush();
+ }
+
+ public String receiveFrame(long timeOut) throws Exception
+ {
+ stompSocket.setSoTimeout((int)timeOut);
+ InputStream is = stompSocket.getInputStream();
+ int c = 0;
+ for (;;)
+ {
+ c = is.read();
+ if (c < 0)
+ {
+ throw new IOException("socket closed.");
+ }
+ else if (c == 0)
+ {
c = is.read();
- if (c < 0) {
- throw new IOException("socket closed.");
+ if (c != '\n')
+ {
+ byte[] ba = inputBuffer.toByteArray();
+ System.out.println(new String(ba, "UTF-8"));
}
- else if (c == 0) {
- c = is.read();
- if (c != '\n')
- {
- byte[] ba = inputBuffer.toByteArray();
- System.out.println(new String(ba, "UTF-8"));
- }
- Assert.assertEquals("Expecting stomp frame to terminate with
\0\n", c, '\n');
- byte[] ba = inputBuffer.toByteArray();
- inputBuffer.reset();
- return new String(ba, "UTF-8");
- }
- else {
- inputBuffer.write(c);
- }
- }
- }
+ Assert.assertEquals("Expecting stomp frame to terminate with \0\n",
c, '\n');
+ byte[] ba = inputBuffer.toByteArray();
+ inputBuffer.reset();
+ return new String(ba, "UTF-8");
+ }
+ else
+ {
+ inputBuffer.write(c);
+ }
+ }
+ }
- public void sendMessage(String msg) throws Exception {
- sendMessage(msg.getBytes("UTF-8"), "foo", "xyz",
queue);
+ public void sendMessage(String msg) throws Exception
+ {
+ sendMessage(msg.getBytes("UTF-8"), "foo", "xyz",
queue);
}
- public void sendMessage(String msg, Destination destination) throws Exception {
- sendMessage(msg.getBytes("UTF-8"), "foo", "xyz",
destination);
- }
+ public void sendMessage(String msg, Destination destination) throws Exception
+ {
+ sendMessage(msg.getBytes("UTF-8"), "foo", "xyz",
destination);
+ }
- public void sendMessage(byte[] data, Destination destination) throws Exception {
- sendMessage(data, "foo", "xyz", destination);
+ public void sendMessage(byte[] data, Destination destination) throws Exception
+ {
+ sendMessage(data, "foo", "xyz", destination);
}
-
- public void sendMessage(String msg, String propertyName, String propertyValue) throws
Exception {
- sendMessage(msg.getBytes("UTF-8"), propertyName, propertyValue, queue);
- }
- public void sendMessage(byte[] data, String propertyName, String propertyValue,
Destination destination) throws Exception {
- MessageProducer producer = session.createProducer(destination);
- BytesMessage message = session.createBytesMessage();
- message.setStringProperty(propertyName, propertyValue);
- message.writeBytes(data);
- producer.send(message);
- }
-
- public String readContent(BytesMessage message) throws Exception
- {
- byte[] data = new byte[1024];
- int size = message.readBytes(data);
- return new String(data, 0, size, "UTF-8");
- }
-
- protected void waitForReceipt() throws Exception {
- String frame = receiveFrame(50000);
- assertNotNull(frame);
- assertTrue(frame.indexOf("RECEIPT") > -1);
+ public void sendMessage(String msg, String propertyName, String propertyValue) throws
Exception
+ {
+ sendMessage(msg.getBytes("UTF-8"), propertyName, propertyValue, queue);
}
-
- protected void waitForFrameToTakeEffect() throws InterruptedException {
- // bit of a dirty hack :)
- // another option would be to force some kind of receipt to be returned
- // from the frame
- Thread.sleep(2000);
- }
+
+ public void sendMessage(byte[] data, String propertyName, String propertyValue,
Destination destination) throws Exception
+ {
+ MessageProducer producer = session.createProducer(destination);
+ BytesMessage message = session.createBytesMessage();
+ message.setStringProperty(propertyName, propertyValue);
+ message.writeBytes(data);
+ producer.send(message);
+ }
+
+ public String readContent(BytesMessage message) throws Exception
+ {
+ byte[] data = new byte[1024];
+ int size = message.readBytes(data);
+ return new String(data, 0, size, "UTF-8");
+ }
+
+ protected void waitForReceipt() throws Exception
+ {
+ String frame = receiveFrame(50000);
+ assertNotNull(frame);
+ assertTrue(frame.indexOf("RECEIPT") > -1);
+ }
+
+ protected void waitForFrameToTakeEffect() throws InterruptedException
+ {
+ // bit of a dirty hack :)
+ // another option would be to force some kind of receipt to be returned
+ // from the frame
+ Thread.sleep(2000);
+ }
}