JBoss hornetq SVN: r9121 - in trunk: tests/src/org/hornetq/tests/integration/stomp and 1 other directory.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-04-15 05:45:07 -0400 (Thu, 15 Apr 2010)
New Revision: 9121
Modified:
trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameDecoder.java
trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
Log:
fix Stomp decoder
* make sure the buffer is readable before attempting to read a byte from it
* increased receive timeout in StompTest.testSendMessageWithContentLength()
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameDecoder.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameDecoder.java 2010-04-15 09:27:44 UTC (rev 9120)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameDecoder.java 2010-04-15 09:45:07 UTC (rev 9121)
@@ -124,6 +124,10 @@
data = new byte[length];
buffer.readBytes(data);
+ if (!buffer.readable())
+ {
+ return null;
+ }
if (buffer.readByte() != 0)
{
throw new StompException(Stomp.Headers.CONTENT_LENGTH + " bytes were read and " +
Modified: trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-04-15 09:27:44 UTC (rev 9120)
+++ trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-04-15 09:45:07 UTC (rev 9121)
@@ -64,6 +64,7 @@
import org.hornetq.jms.server.config.impl.TopicConfigurationImpl;
import org.hornetq.jms.server.impl.JMSServerManagerImpl;
import org.hornetq.spi.core.protocol.ProtocolType;
+import org.hornetq.tests.unit.util.InVMContext;
import org.hornetq.tests.util.UnitTestCase;
public class StompTest extends UnitTestCase
@@ -260,7 +261,7 @@
baos.write('\0');
sendFrame(baos.toByteArray());
- BytesMessage message = (BytesMessage)consumer.receive(1000);
+ BytesMessage message = (BytesMessage)consumer.receive(10000);
Assert.assertNotNull(message);
assertEquals(data.length, message.getBodyLength());
assertEquals(data[0], message.readByte());
@@ -1290,7 +1291,7 @@
.add(new JMSQueueConfigurationImpl(getQueueName(), null, false, getQueueName()));
jmsConfig.getTopicConfigurations().add(new TopicConfigurationImpl(getTopicName(), getTopicName()));
server = new JMSServerManagerImpl(hornetQServer, jmsConfig);
- server.setContext(null);
+ server.setContext(new InVMContext());
return server;
}
14 years, 1 month
JBoss hornetq SVN: r9120 - trunk/src/main/org/hornetq/ra/inflow.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-04-15 05:27:44 -0400 (Thu, 15 Apr 2010)
New Revision: 9120
Modified:
trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-362 - fixed the ra to cope with non transactional mdb's
Modified: trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java
===================================================================
--- trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java 2010-04-15 09:10:04 UTC (rev 9119)
+++ trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java 2010-04-15 09:27:44 UTC (rev 9120)
@@ -246,8 +246,8 @@
endpoint.beforeDelivery(HornetQActivation.ONMESSAGE);
beforeDelivery = true;
msg.doBeforeReceive();
+ ((MessageListener)endpoint).onMessage(msg);
message.acknowledge();
- ((MessageListener)endpoint).onMessage(msg);
endpoint.afterDelivery();
if (useLocalTx)
{
@@ -269,7 +269,7 @@
HornetQMessageHandler.log.warn("Unable to call after delivery");
}
}
- if (useLocalTx)
+ if (useLocalTx || !activation.isDeliveryTransacted())
{
try
{
14 years, 1 month
JBoss hornetq SVN: r9119 - in trunk: tests/src/org/hornetq/tests/integration/jms/bridge and 1 other directory.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-04-15 05:10:04 -0400 (Thu, 15 Apr 2010)
New Revision: 9119
Modified:
trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java
trunk/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-287: cannot stop JMSBridge which is handling startup failure
* flag JMSBridge stopping attribute as volatile
* make sure the failure handler is stopped before JMSBridge.stop() returns
* fix test to check for the failure handler thread existence before/after calling JMSBrige.stop()
Modified: trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java 2010-04-14 18:35:54 UTC (rev 9118)
+++ trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java 2010-04-15 09:10:04 UTC (rev 9119)
@@ -63,6 +63,8 @@
*/
public class JMSBridgeImpl implements HornetQComponent, JMSBridge
{
+ public static final String FAILURE_HANDLER_THREAD_NAME = "jmsbridge-failurehandler-thread";
+
private static final Logger log;
private static boolean trace;
@@ -108,7 +110,7 @@
private boolean started;
- private boolean stopping = false;
+ private volatile boolean stopping = false;
private final LinkedList<Message> messages;
@@ -160,6 +162,8 @@
private ObjectName objectName;
+ private Thread startupFailureThread;
+
private static final int FORWARD_MODE_XA = 0;
private static final int FORWARD_MODE_LOCALTX = 1;
@@ -392,12 +396,6 @@
{
stopping = true;
- if (!started)
- {
- JMSBridgeImpl.log.warn("Attempt to stop, but is already stopped");
- return;
- }
-
if (JMSBridgeImpl.trace)
{
JMSBridgeImpl.log.trace("Stopping " + this);
@@ -417,6 +415,12 @@
{
sourceReceiver.interrupt();
}
+
+ if (startupFailureThread != null)
+ {
+ startupFailureThread.interrupt();
+ }
+
}
// This must be outside sync block
@@ -451,6 +455,22 @@
}
}
+ // This must be outside sync block
+ if (startupFailureThread != null)
+ {
+ if (JMSBridgeImpl.trace)
+ {
+ JMSBridgeImpl.log.trace("Waiting for failure thread to finish");
+ }
+
+ startupFailureThread.join();
+
+ if (JMSBridgeImpl.trace)
+ {
+ JMSBridgeImpl.log.trace("Failure thread has finished");
+ }
+ }
+
if (tx != null)
{
// Terminate any transaction
@@ -1617,10 +1637,10 @@
private void handleFailureOnStartup()
{
- handleFailure(new StartupFailureHandler());
+ startupFailureThread = handleFailure(new StartupFailureHandler());
}
- private void handleFailure(final Runnable failureHandler)
+ private Thread handleFailure(final Runnable failureHandler)
{
failed = true;
@@ -1628,9 +1648,11 @@
// In the case of onMessage we can't close the connection from inside the onMessage method
// since it will block waiting for onMessage to complete. In the case of start we want to return
// from the call before the connections are reestablished so that the caller is not blocked unnecessarily.
- Thread t = new Thread(failureHandler, "jmsbridge-failurehandler-thread");
+ Thread t = new Thread(failureHandler, FAILURE_HANDLER_THREAD_NAME);
t.start();
+
+ return t;
}
private void addMessageIDInHeader(final Message msg) throws Exception
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java 2010-04-14 18:35:54 UTC (rev 9118)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java 2010-04-15 09:10:04 UTC (rev 9119)
@@ -12,7 +12,11 @@
*/
package org.hornetq.tests.integration.jms.bridge;
+import java.io.PrintWriter;
+import java.io.StringWriter;
import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.util.Map;
import junit.framework.Assert;
@@ -148,8 +152,6 @@
{
jmsServer1.stop();
- long failureRetryInterval = 500;
-
JMSBridgeImpl bridge = new JMSBridgeImpl(cff0,
cff1,
sourceQueueFactory,
@@ -173,13 +175,12 @@
Assert.assertFalse(bridge.isStarted());
Assert.assertTrue(bridge.isFailed());
- int numThreads = ManagementFactory.getThreadMXBean().getThreadCount();
+ assertTrue(threadExists(JMSBridgeImpl.FAILURE_HANDLER_THREAD_NAME));
bridge.stop();
- Thread.sleep(failureRetryInterval * 2);
- // the JMS Brigde failure handler thread must have been stopped at most 1 failureRetryInterval ms after the bridge is stopped
- assertEquals(numThreads - 1, ManagementFactory.getThreadMXBean().getThreadCount());
+ assertFalse(threadExists(JMSBridgeImpl.FAILURE_HANDLER_THREAD_NAME));
+
Assert.assertFalse(bridge.isStarted());
// we restart and setup the server for the test's tearDown checks
@@ -189,6 +190,20 @@
}
+ private boolean threadExists(String threadName)
+ {
+ long[] threadIds = ManagementFactory.getThreadMXBean().getAllThreadIds();
+ for (long id : threadIds)
+ {
+ ThreadInfo threadInfo = ManagementFactory.getThreadMXBean().getThreadInfo(id);
+ if (threadInfo.getThreadName().equals(threadName))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
/*
* Send some messages
* Crash the destination server
14 years, 1 month
JBoss hornetq SVN: r9118 - in trunk: tests/src/org/hornetq/tests/integration/stomp and 1 other directory.
by do-not-reply@jboss.org
Author: timfox
Date: 2010-04-14 14:35:54 -0400 (Wed, 14 Apr 2010)
New Revision: 9118
Modified:
trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
Log:
fixed stomp race condition
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java 2010-04-14 17:12:52 UTC (rev 9117)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java 2010-04-14 18:35:54 UTC (rev 9118)
@@ -16,6 +16,7 @@
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.SimpleString;
@@ -48,12 +49,12 @@
private final OperationContext sessionContext;
- private final Map<Long, StompSubscription> subscriptions = new HashMap<Long, StompSubscription>();
+ private final Map<Long, StompSubscription> subscriptions = new ConcurrentHashMap<Long, StompSubscription>();
// key = message ID, value = consumer ID
- private final Map<Long, Long> messagesToAck = new HashMap<Long, Long>();
+ private final Map<Long, Long> messagesToAck = new ConcurrentHashMap<Long, Long>();
- private boolean noLocal = false;
+ private volatile boolean noLocal = false;
StompSession(final StompConnection connection, final StompProtocolManager manager, OperationContext sessionContext)
{
@@ -90,19 +91,17 @@
}
HornetQBuffer buffer = serverMessage.getBodyBuffer();
buffer.readerIndex(MessageImpl.BUFFER_HEADER_SPACE + DataConstants.SIZE_INT);
- int bodyPos = serverMessage.getEndOfBodyPosition() == -1 ? buffer.writerIndex() : serverMessage.getEndOfBodyPosition();
+ int bodyPos = serverMessage.getEndOfBodyPosition() == -1 ? buffer.writerIndex()
+ : serverMessage.getEndOfBodyPosition();
int size = bodyPos - buffer.readerIndex();
byte[] data = new byte[size];
buffer.readBytes(data);
headers.put(Headers.CONTENT_LENGTH, data.length);
serverMessage.getBodyBuffer().resetReaderIndex();
-
+
StompFrame frame = new StompFrame(Stomp.Responses.MESSAGE, headers, data);
StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame, deliveryCount);
- manager.send(connection, frame);
- int length = frame.getEncodedSize();
-
if (subscription.getAck().equals(Stomp.Headers.Subscribe.AckModeValues.AUTO))
{
session.acknowledge(consumerID, serverMessage.getMessageID());
@@ -112,6 +111,11 @@
{
messagesToAck.put(serverMessage.getMessageID(), consumerID);
}
+
+ // Must send AFTER adding to messagesToAck - or could get acked from client BEFORE it's been added!
+ manager.send(connection, frame);
+ int length = frame.getEncodedSize();
+
return length;
}
@@ -149,7 +153,7 @@
String subscriptionID,
String clientID,
String durableSubscriptionName,
- String destination,
+ String destination,
String selector,
String ack) throws Exception
{
@@ -159,7 +163,7 @@
// subscribes to a topic
if (durableSubscriptionName != null)
{
- if (clientID == null)
+ if (clientID == null)
{
throw new IllegalStateException("Cannot create a subscriber on the durable subscription if the client-id of the connection is not set");
}
@@ -177,7 +181,8 @@
throw new IllegalStateException("Cannot create a subscriber on the durable subscription since it already has a subscriber: " + queue);
}
}
- } else
+ }
+ else
{
queue = UUIDGenerator.getInstance().generateSimpleStringUUID();
session.createQueue(SimpleString.toSimpleString(destination), queue, null, true, false);
@@ -239,7 +244,7 @@
{
return noLocal;
}
-
+
public void setNoLocal(boolean noLocal)
{
this.noLocal = noLocal;
Modified: trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-04-14 17:12:52 UTC (rev 9117)
+++ trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-04-14 18:35:54 UTC (rev 9118)
@@ -66,1407 +66,1211 @@
import org.hornetq.spi.core.protocol.ProtocolType;
import org.hornetq.tests.util.UnitTestCase;
-public class StompTest extends UnitTestCase {
- private static final transient Logger log = Logger.getLogger(StompTest.class);
- private int port = 61613;
- private Socket stompSocket;
- private ByteArrayOutputStream inputBuffer;
- private ConnectionFactory connectionFactory;
- private Connection connection;
- private Session session;
- private Queue queue;
- private Topic topic;
- private JMSServerManager server;
+public class StompTest extends UnitTestCase
+{
+ private static final transient Logger log = Logger.getLogger(StompTest.class);
- public void _testSendManyMessages() throws Exception {
- MessageConsumer consumer = session.createConsumer(queue);
-
- String frame =
- "CONNECT\n" +
- "login: brianm\n" +
- "passcode: wombats\n\n" +
- Stomp.NULL;
- sendFrame(frame);
- frame = receiveFrame(10000);
-
- Assert.assertTrue(frame.startsWith("CONNECTED"));
- int count = 1000;
- final CountDownLatch latch = new CountDownLatch(count);
- consumer.setMessageListener(new MessageListener()
+ private int port = 61613;
+
+ private Socket stompSocket;
+
+ private ByteArrayOutputStream inputBuffer;
+
+ private ConnectionFactory connectionFactory;
+
+ private Connection connection;
+
+ private Session session;
+
+ private Queue queue;
+
+ private Topic topic;
+
+ private JMSServerManager server;
+
+ public void _testSendManyMessages() throws Exception
+ {
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
+ frame = receiveFrame(10000);
+
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+ int count = 1000;
+ final CountDownLatch latch = new CountDownLatch(count);
+ consumer.setMessageListener(new MessageListener()
{
-
+
public void onMessage(Message arg0)
{
System.out.println("<<< " + (1000 - latch.getCount()));
latch.countDown();
}
});
-
- frame =
- "SEND\n" +
- "destination:" + getQueuePrefix() + getQueueName() + "\n\n" +
- "Hello World" +
- Stomp.NULL;
- for (int i=1; i <= count; i++) {
- // Thread.sleep(1);
- System.out.println(">>> " + i);
- sendFrame(frame);
- }
-
- assertTrue(latch.await(60, TimeUnit.SECONDS));
-
- }
-
- public void testConnect() throws Exception {
- String connect_frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "request-id: 1\n" + "\n" + Stomp.NULL;
- sendFrame(connect_frame);
+ frame = "SEND\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL;
+ for (int i = 1; i <= count; i++)
+ {
+ // Thread.sleep(1);
+ System.out.println(">>> " + i);
+ sendFrame(frame);
+ }
- String f = receiveFrame(10000);
- Assert.assertTrue(f.startsWith("CONNECTED"));
- Assert.assertTrue(f.indexOf("response-id:1") >= 0);
- }
-
- public void testDisconnectAndError() throws Exception {
+ assertTrue(latch.await(60, TimeUnit.SECONDS));
- String connectFrame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "request-id: 1\n" + "\n" + Stomp.NULL;
- sendFrame(connectFrame);
+ }
- String f = receiveFrame(10000);
- Assert.assertTrue(f.startsWith("CONNECTED"));
- Assert.assertTrue(f.indexOf("response-id:1") >= 0);
-
- String disconnectFrame = "DISCONNECT\n\n" + Stomp.NULL;
- sendFrame(disconnectFrame);
-
- waitForFrameToTakeEffect();
-
- // sending a message will result in an error
- String frame =
- "SEND\n" +
- "destination:" + getQueuePrefix() + getQueueName() + "\n\n" +
- "Hello World" +
- Stomp.NULL;
- try {
- sendFrame(frame);
- Assert.fail("the socket must have been closed when the server handled the DISCONNECT");
- } catch (IOException e)
- {
- }
+ public void testConnect() throws Exception
+ {
+
+ String connect_frame = "CONNECT\n" + "login: brianm\n" +
+ "passcode: wombats\n" +
+ "request-id: 1\n" +
+ "\n" +
+ Stomp.NULL;
+ sendFrame(connect_frame);
+
+ String f = receiveFrame(10000);
+ Assert.assertTrue(f.startsWith("CONNECTED"));
+ Assert.assertTrue(f.indexOf("response-id:1") >= 0);
}
+ public void testDisconnectAndError() throws Exception
+ {
- public void testSendMessage() throws Exception {
+ String connectFrame = "CONNECT\n" + "login: brianm\n" +
+ "passcode: wombats\n" +
+ "request-id: 1\n" +
+ "\n" +
+ Stomp.NULL;
+ sendFrame(connectFrame);
- MessageConsumer consumer = session.createConsumer(queue);
+ String f = receiveFrame(10000);
+ Assert.assertTrue(f.startsWith("CONNECTED"));
+ Assert.assertTrue(f.indexOf("response-id:1") >= 0);
- String frame =
- "CONNECT\n" +
- "login: brianm\n" +
- "passcode: wombats\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ String disconnectFrame = "DISCONNECT\n\n" + Stomp.NULL;
+ sendFrame(disconnectFrame);
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ waitForFrameToTakeEffect();
- frame =
- "\nSEND\n" +
- "destination:" + getQueuePrefix() + getQueueName() + "\n\n" +
- "Hello World" +
- Stomp.NULL;
+ // sending a message will result in an error
+ String frame = "SEND\n" + "destination:" +
+ getQueuePrefix() +
+ getQueueName() +
+ "\n\n" +
+ "Hello World" +
+ Stomp.NULL;
+ try
+ {
+ sendFrame(frame);
+ Assert.fail("the socket must have been closed when the server handled the DISCONNECT");
+ }
+ catch (IOException e)
+ {
+ }
+ }
- sendFrame(frame);
-
- BytesMessage message = (BytesMessage) consumer.receive(1000);
- Assert.assertNotNull(message);
- Assert.assertEquals("Hello World", readContent(message));
+ public void testSendMessage() throws Exception
+ {
- // Make sure that the timestamp is valid - should
- // be very close to the current time.
- long tnow = System.currentTimeMillis();
- long tmsg = message.getJMSTimestamp();
- Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
- }
-
- public void testSendMessageWithReceipt() throws Exception {
+ MessageConsumer consumer = session.createConsumer(queue);
- MessageConsumer consumer = session.createConsumer(queue);
+ String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
- String frame =
- "CONNECT\n" +
- "login: brianm\n" +
- "passcode: wombats\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ frame = "\nSEND\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL;
- frame =
- "SEND\n" +
- "destination:" + getQueuePrefix() + getQueueName() + "\n" +
- "receipt: 1234\n\n" +
- "Hello World" +
- Stomp.NULL;
+ sendFrame(frame);
- sendFrame(frame);
+ BytesMessage message = (BytesMessage)consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("Hello World", readContent(message));
- String f = receiveFrame(10000);
- Assert.assertTrue(f.startsWith("RECEIPT"));
- Assert.assertTrue(f.indexOf("receipt-id:1234") >= 0);
+ // Make sure that the timestamp is valid - should
+ // be very close to the current time.
+ long tnow = System.currentTimeMillis();
+ long tmsg = message.getJMSTimestamp();
+ Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
+ }
- BytesMessage message = (BytesMessage) consumer.receive(1000);
- Assert.assertNotNull(message);
- Assert.assertEquals("Hello World", readContent(message));
+ public void testSendMessageWithReceipt() throws Exception
+ {
- // Make sure that the timestamp is valid - should
- // be very close to the current time.
- long tnow = System.currentTimeMillis();
- long tmsg = message.getJMSTimestamp();
- Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame = "SEND\n" + "destination:" +
+ getQueuePrefix() +
+ getQueueName() +
+ "\n" +
+ "receipt: 1234\n\n" +
+ "Hello World" +
+ Stomp.NULL;
+
+ sendFrame(frame);
+
+ String f = receiveFrame(10000);
+ Assert.assertTrue(f.startsWith("RECEIPT"));
+ Assert.assertTrue(f.indexOf("receipt-id:1234") >= 0);
+
+ BytesMessage message = (BytesMessage)consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("Hello World", readContent(message));
+
+ // Make sure that the timestamp is valid - should
+ // be very close to the current time.
+ long tnow = System.currentTimeMillis();
+ long tmsg = message.getJMSTimestamp();
+ Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
}
-
- public void testSendMessageWithContentLength() throws Exception {
- MessageConsumer consumer = session.createConsumer(queue);
+ public void testSendMessageWithContentLength() throws Exception
+ {
- String frame =
- "CONNECT\n" +
- "login: brianm\n" +
- "passcode: wombats\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ MessageConsumer consumer = session.createConsumer(queue);
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
- byte[] data = new byte[] {1, 0, 0, 4};
-
- frame =
- "SEND\n" +
- "destination:" + getQueuePrefix() + getQueueName() + "\n" +
- "content-length:" + data.length + "\n\n";
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- baos.write(frame.getBytes("UTF-8"));
- baos.write(data);
- baos.write('\0');
- sendFrame(baos.toByteArray());
-
- BytesMessage message = (BytesMessage) consumer.receive(1000);
- Assert.assertNotNull(message);
- assertEquals(data.length, message.getBodyLength());
- assertEquals(data[0], message.readByte());
- assertEquals(data[1], message.readByte());
- assertEquals(data[2], message.readByte());
- assertEquals(data[3], message.readByte());
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ byte[] data = new byte[] { 1, 0, 0, 4 };
+
+ frame = "SEND\n" + "destination:" +
+ getQueuePrefix() +
+ getQueueName() +
+ "\n" +
+ "content-length:" +
+ data.length +
+ "\n\n";
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ baos.write(frame.getBytes("UTF-8"));
+ baos.write(data);
+ baos.write('\0');
+ sendFrame(baos.toByteArray());
+
+ BytesMessage message = (BytesMessage)consumer.receive(1000);
+ Assert.assertNotNull(message);
+ assertEquals(data.length, message.getBodyLength());
+ assertEquals(data[0], message.readByte());
+ assertEquals(data[1], message.readByte());
+ assertEquals(data[2], message.readByte());
+ assertEquals(data[3], message.readByte());
}
- public void testJMSXGroupIdCanBeSet() throws Exception {
+ public void testJMSXGroupIdCanBeSet() throws Exception
+ {
- MessageConsumer consumer = session.createConsumer(queue);
+ MessageConsumer consumer = session.createConsumer(queue);
- String frame =
- "CONNECT\n" +
- "login: brianm\n" +
- "passcode: wombats\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
- frame =
- "SEND\n" +
- "destination:" + getQueuePrefix() + getQueueName() + "\n" +
- "JMSXGroupID: TEST\n\n" +
- "Hello World" +
- Stomp.NULL;
+ frame = "SEND\n" + "destination:" +
+ getQueuePrefix() +
+ getQueueName() +
+ "\n" +
+ "JMSXGroupID: TEST\n\n" +
+ "Hello World" +
+ Stomp.NULL;
- sendFrame(frame);
+ sendFrame(frame);
- BytesMessage message = (BytesMessage) consumer.receive(1000);
- Assert.assertNotNull(message);
- Assert.assertEquals("Hello World", readContent(message));
- // differ from StompConnect
- Assert.assertEquals("TEST", message.getStringProperty("JMSXGroupID"));
- }
+ BytesMessage message = (BytesMessage)consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("Hello World", readContent(message));
+ // differ from StompConnect
+ Assert.assertEquals("TEST", message.getStringProperty("JMSXGroupID"));
+ }
- public void testSendMessageWithCustomHeadersAndSelector() throws Exception {
+ public void testSendMessageWithCustomHeadersAndSelector() throws Exception
+ {
- MessageConsumer consumer = session.createConsumer(queue, "foo = 'abc'");
+ MessageConsumer consumer = session.createConsumer(queue, "foo = 'abc'");
- String frame =
- "CONNECT\n" +
- "login: brianm\n" +
- "passcode: wombats\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
- frame =
- "SEND\n" +
- "foo:abc\n" +
- "bar:123\n" +
- "destination:" + getQueuePrefix() + getQueueName() + "\n\n" +
- "Hello World" +
- Stomp.NULL;
+ frame = "SEND\n" + "foo:abc\n" +
+ "bar:123\n" +
+ "destination:" +
+ getQueuePrefix() +
+ getQueueName() +
+ "\n\n" +
+ "Hello World" +
+ Stomp.NULL;
- sendFrame(frame);
+ sendFrame(frame);
- BytesMessage message = (BytesMessage) consumer.receive(1000);
- Assert.assertNotNull(message);
- Assert.assertEquals("Hello World", readContent(message));
- Assert.assertEquals("foo", "abc", message.getStringProperty("foo"));
- Assert.assertEquals("bar", "123", message.getStringProperty("bar"));
- }
+ BytesMessage message = (BytesMessage)consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("Hello World", readContent(message));
+ Assert.assertEquals("foo", "abc", message.getStringProperty("foo"));
+ Assert.assertEquals("bar", "123", message.getStringProperty("bar"));
+ }
- public void testSendMessageWithStandardHeaders() throws Exception {
+ public void testSendMessageWithStandardHeaders() throws Exception
+ {
- MessageConsumer consumer = session.createConsumer(queue);
+ MessageConsumer consumer = session.createConsumer(queue);
- String frame =
- "CONNECT\n" +
- "login: brianm\n" +
- "passcode: wombats\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
- frame =
- "SEND\n" +
- "correlation-id:c123\n" +
- "persistent:true\n" +
- "priority:3\n" +
- "type:t345\n" +
- "JMSXGroupID:abc\n" +
- "foo:abc\n" +
- "bar:123\n" +
- "destination:" + getQueuePrefix() + getQueueName() + "\n\n" +
- "Hello World" +
- Stomp.NULL;
+ frame = "SEND\n" + "correlation-id:c123\n" +
+ "persistent:true\n" +
+ "priority:3\n" +
+ "type:t345\n" +
+ "JMSXGroupID:abc\n" +
+ "foo:abc\n" +
+ "bar:123\n" +
+ "destination:" +
+ getQueuePrefix() +
+ getQueueName() +
+ "\n\n" +
+ "Hello World" +
+ Stomp.NULL;
- sendFrame(frame);
+ sendFrame(frame);
- BytesMessage message = (BytesMessage) consumer.receive(1000);
- Assert.assertNotNull(message);
- Assert.assertEquals("Hello World", readContent(message));
- Assert.assertEquals("JMSCorrelationID", "c123", message.getJMSCorrelationID());
- Assert.assertEquals("getJMSType", "t345", message.getJMSType());
- Assert.assertEquals("getJMSPriority", 3, message.getJMSPriority());
- Assert.assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode());
- Assert.assertEquals("foo", "abc", message.getStringProperty("foo"));
- Assert.assertEquals("bar", "123", message.getStringProperty("bar"));
+ BytesMessage message = (BytesMessage)consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("Hello World", readContent(message));
+ Assert.assertEquals("JMSCorrelationID", "c123", message.getJMSCorrelationID());
+ Assert.assertEquals("getJMSType", "t345", message.getJMSType());
+ Assert.assertEquals("getJMSPriority", 3, message.getJMSPriority());
+ Assert.assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode());
+ Assert.assertEquals("foo", "abc", message.getStringProperty("foo"));
+ Assert.assertEquals("bar", "123", message.getStringProperty("bar"));
- Assert.assertEquals("JMSXGroupID", "abc", message.getStringProperty("JMSXGroupID"));
- // FIXME do we support it?
- //Assert.assertEquals("GroupID", "abc", amqMessage.getGroupID());
- }
+ Assert.assertEquals("JMSXGroupID", "abc", message.getStringProperty("JMSXGroupID"));
+ // FIXME do we support it?
+ // Assert.assertEquals("GroupID", "abc", amqMessage.getGroupID());
+ }
- public void testSubscribeWithAutoAck() throws Exception {
+ public void testSubscribeWithAutoAck() throws Exception
+ {
- String frame =
- "CONNECT\n" +
- "login: brianm\n" +
- "passcode: wombats\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
- frame = receiveFrame(100000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
- frame =
- "SUBSCRIBE\n" +
- "destination:" + getQueuePrefix() + getQueueName() + "\n" +
- "ack:auto\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
+ sendFrame(frame);
- sendMessage(getName());
+ sendMessage(getName());
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("MESSAGE"));
- Assert.assertTrue(frame.indexOf("destination:") > 0);
- Assert.assertTrue(frame.indexOf(getName()) > 0);
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+ Assert.assertTrue(frame.indexOf("destination:") > 0);
+ Assert.assertTrue(frame.indexOf(getName()) > 0);
- frame =
- "DISCONNECT\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
-
- // message should not be received as it was auto-acked
- MessageConsumer consumer = session.createConsumer(queue);
- Message message = consumer.receive(1000);
- Assert.assertNull(message);
+ frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ sendFrame(frame);
- }
+ // message should not be received as it was auto-acked
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message message = consumer.receive(1000);
+ Assert.assertNull(message);
- public void testSubscribeWithAutoAckAndBytesMessage() throws Exception {
+ }
- String frame =
- "CONNECT\n" +
- "login: brianm\n" +
- "passcode: wombats\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ public void testSubscribeWithAutoAckAndBytesMessage() throws Exception
+ {
- frame = receiveFrame(100000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
- frame =
- "SUBSCRIBE\n" +
- "destination:" + getQueuePrefix() + getQueueName() + "\n" +
- "ack:auto\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
- byte[] payload = new byte[]{1, 2, 3, 4, 5};
- sendMessage(payload, queue);
+ frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
+ sendFrame(frame);
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("MESSAGE"));
+ byte[] payload = new byte[] { 1, 2, 3, 4, 5 };
+ sendMessage(payload, queue);
- Pattern cl = Pattern.compile("Content-length:\\s*(\\d+)", Pattern.CASE_INSENSITIVE);
- Matcher cl_matcher = cl.matcher(frame);
- Assert.assertTrue(cl_matcher.find());
- Assert.assertEquals("5", cl_matcher.group(1));
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
- Assert.assertFalse(Pattern.compile("type:\\s*null", Pattern.CASE_INSENSITIVE).matcher(frame).find());
- Assert.assertTrue(frame.indexOf(new String(payload)) > -1);
-
- frame =
- "DISCONNECT\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
- }
+ Pattern cl = Pattern.compile("Content-length:\\s*(\\d+)", Pattern.CASE_INSENSITIVE);
+ Matcher cl_matcher = cl.matcher(frame);
+ Assert.assertTrue(cl_matcher.find());
+ Assert.assertEquals("5", cl_matcher.group(1));
- public void testSubscribeWithMessageSentWithProperties() throws Exception {
+ Assert.assertFalse(Pattern.compile("type:\\s*null", Pattern.CASE_INSENSITIVE).matcher(frame).find());
+ Assert.assertTrue(frame.indexOf(new String(payload)) > -1);
- String frame =
- "CONNECT\n" +
- "login: brianm\n" +
- "passcode: wombats\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ sendFrame(frame);
+ }
- frame = receiveFrame(100000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ public void testSubscribeWithMessageSentWithProperties() throws Exception
+ {
- frame =
- "SUBSCRIBE\n" +
- "destination:" + getQueuePrefix() + getQueueName() + "\n" +
- "ack:auto\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
- MessageProducer producer = session.createProducer(queue);
- BytesMessage message = session.createBytesMessage();
- message.setStringProperty("S", "value");
- message.setBooleanProperty("n", false);
- message.setByteProperty("byte", (byte) 9);
- message.setDoubleProperty("d", 2.0);
- message.setFloatProperty("f", (float) 6.0);
- message.setIntProperty("i", 10);
- message.setLongProperty("l", 121);
- message.setShortProperty("s", (short) 12);
- message.writeBytes("Hello World".getBytes("UTF-8"));
- producer.send(message);
+ frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
- frame = receiveFrame(10000);
- Assert.assertNotNull(frame);
- Assert.assertTrue(frame.startsWith("MESSAGE"));
- Assert.assertTrue(frame.indexOf("S:") > 0);
- Assert.assertTrue(frame.indexOf("n:") > 0);
- Assert.assertTrue(frame.indexOf("byte:") > 0);
- Assert.assertTrue(frame.indexOf("d:") > 0);
- Assert.assertTrue(frame.indexOf("f:") > 0);
- Assert.assertTrue(frame.indexOf("i:") > 0);
- Assert.assertTrue(frame.indexOf("l:") > 0);
- Assert.assertTrue(frame.indexOf("s:") > 0);
- Assert.assertTrue(frame.indexOf("Hello World") > 0);
+ frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
+ sendFrame(frame);
-// System.out.println("out: "+frame);
+ MessageProducer producer = session.createProducer(queue);
+ BytesMessage message = session.createBytesMessage();
+ message.setStringProperty("S", "value");
+ message.setBooleanProperty("n", false);
+ message.setByteProperty("byte", (byte)9);
+ message.setDoubleProperty("d", 2.0);
+ message.setFloatProperty("f", (float)6.0);
+ message.setIntProperty("i", 10);
+ message.setLongProperty("l", 121);
+ message.setShortProperty("s", (short)12);
+ message.writeBytes("Hello World".getBytes("UTF-8"));
+ producer.send(message);
- frame =
- "DISCONNECT\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
- }
-
- public void testSubscribeWithID() throws Exception {
+ frame = receiveFrame(10000);
+ Assert.assertNotNull(frame);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+ Assert.assertTrue(frame.indexOf("S:") > 0);
+ Assert.assertTrue(frame.indexOf("n:") > 0);
+ Assert.assertTrue(frame.indexOf("byte:") > 0);
+ Assert.assertTrue(frame.indexOf("d:") > 0);
+ Assert.assertTrue(frame.indexOf("f:") > 0);
+ Assert.assertTrue(frame.indexOf("i:") > 0);
+ Assert.assertTrue(frame.indexOf("l:") > 0);
+ Assert.assertTrue(frame.indexOf("s:") > 0);
+ Assert.assertTrue(frame.indexOf("Hello World") > 0);
- String frame =
- "CONNECT\n" +
- "login: brianm\n" +
- "passcode: wombats\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ // System.out.println("out: "+frame);
- frame = receiveFrame(100000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ sendFrame(frame);
+ }
- frame =
- "SUBSCRIBE\n" +
- "destination:" + getQueuePrefix() + getQueueName() + "\n" +
- "ack:auto\n" +
- "id: mysubid\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ public void testSubscribeWithID() throws Exception
+ {
- sendMessage(getName());
+ String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("MESSAGE"));
- Assert.assertTrue(frame.indexOf("destination:") > 0);
- Assert.assertTrue(frame.indexOf("subscription:") > 0);
- Assert.assertTrue(frame.indexOf(getName()) > 0);
+ frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
- frame =
- "DISCONNECT\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ frame = "SUBSCRIBE\n" + "destination:" +
+ getQueuePrefix() +
+ getQueueName() +
+ "\n" +
+ "ack:auto\n" +
+ "id: mysubid\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ sendMessage(getName());
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+ Assert.assertTrue(frame.indexOf("destination:") > 0);
+ Assert.assertTrue(frame.indexOf("subscription:") > 0);
+ Assert.assertTrue(frame.indexOf(getName()) > 0);
+
+ frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ sendFrame(frame);
}
- public void testBodyWithUTF8() throws Exception {
+ public void testBodyWithUTF8() throws Exception
+ {
- String frame =
- "CONNECT\n" +
- "login: brianm\n" +
- "passcode: wombats\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
- frame = receiveFrame(100000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
- frame =
- "SUBSCRIBE\n" +
- "destination:" + getQueuePrefix() + getQueueName() + "\n" +
- "ack:auto\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
+ sendFrame(frame);
- String text = "A" + "\u00ea" + "\u00f1" +
- "\u00fc" + "C";
- System.out.println(text);
- sendMessage(text);
+ String text = "A" + "\u00ea" + "\u00f1" + "\u00fc" + "C";
+ System.out.println(text);
+ sendMessage(text);
- frame = receiveFrame(10000);
- System.out.println(frame);
- Assert.assertTrue(frame.startsWith("MESSAGE"));
- Assert.assertTrue(frame.indexOf("destination:") > 0);
- Assert.assertTrue(frame.indexOf(text) > 0);
+ frame = receiveFrame(10000);
+ System.out.println(frame);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+ Assert.assertTrue(frame.indexOf("destination:") > 0);
+ Assert.assertTrue(frame.indexOf(text) > 0);
- frame =
- "DISCONNECT\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ sendFrame(frame);
}
-
- public void testMessagesAreInOrder() throws Exception {
- int ctr = 10;
- String[] data = new String[ctr];
- String frame =
- "CONNECT\n" +
- "login: brianm\n" +
- "passcode: wombats\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ public void testMessagesAreInOrder() throws Exception
+ {
+ int ctr = 10;
+ String[] data = new String[ctr];
- frame = receiveFrame(100000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
- frame =
- "SUBSCRIBE\n" +
- "destination:" + getQueuePrefix() + getQueueName() + "\n" +
- "ack:auto\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
- for (int i = 0; i < ctr; ++i) {
- data[i] = getName() + i;
- sendMessage(data[i]);
- }
+ frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
+ sendFrame(frame);
- for (int i = 0; i < ctr; ++i) {
- frame = receiveFrame(1000);
- Assert.assertTrue("Message not in order", frame.indexOf(data[i]) >= 0);
- }
+ for (int i = 0; i < ctr; ++i)
+ {
+ data[i] = getName() + i;
+ sendMessage(data[i]);
+ }
- // sleep a while before publishing another set of messages
- waitForFrameToTakeEffect();
+ for (int i = 0; i < ctr; ++i)
+ {
+ frame = receiveFrame(1000);
+ Assert.assertTrue("Message not in order", frame.indexOf(data[i]) >= 0);
+ }
- for (int i = 0; i < ctr; ++i) {
- data[i] = getName() + ":second:" + i;
- sendMessage(data[i]);
- }
+ // sleep a while before publishing another set of messages
+ waitForFrameToTakeEffect();
- for (int i = 0; i < ctr; ++i) {
- frame = receiveFrame(1000);
- Assert.assertTrue("Message not in order", frame.indexOf(data[i]) >= 0);
- }
+ for (int i = 0; i < ctr; ++i)
+ {
+ data[i] = getName() + ":second:" + i;
+ sendMessage(data[i]);
+ }
- frame =
- "DISCONNECT\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
- }
+ for (int i = 0; i < ctr; ++i)
+ {
+ frame = receiveFrame(1000);
+ Assert.assertTrue("Message not in order", frame.indexOf(data[i]) >= 0);
+ }
- public void testSubscribeWithAutoAckAndSelector() throws Exception {
+ frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ sendFrame(frame);
+ }
- String frame =
- "CONNECT\n" +
- "login: brianm\n" +
- "passcode: wombats\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ public void testSubscribeWithAutoAckAndSelector() throws Exception
+ {
- frame = receiveFrame(100000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
- frame =
- "SUBSCRIBE\n" +
- "destination:" + getQueuePrefix() + getQueueName() + "\n" +
- "selector: foo = 'zzz'\n" +
- "ack:auto\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
- sendMessage("Ignored message", "foo", "1234");
- sendMessage("Real message", "foo", "zzz");
+ frame = "SUBSCRIBE\n" + "destination:" +
+ getQueuePrefix() +
+ getQueueName() +
+ "\n" +
+ "selector: foo = 'zzz'\n" +
+ "ack:auto\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("MESSAGE"));
- Assert.assertTrue("Should have received the real message but got: " + frame, frame.indexOf("Real message") > 0);
+ sendMessage("Ignored message", "foo", "1234");
+ sendMessage("Real message", "foo", "zzz");
- frame =
- "DISCONNECT\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
- }
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+ Assert.assertTrue("Should have received the real message but got: " + frame, frame.indexOf("Real message") > 0);
- public void testSubscribeWithClientAck() throws Exception {
+ frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ sendFrame(frame);
+ }
- String frame =
- "CONNECT\n" +
- "login: brianm\n" +
- "passcode: wombats\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ public void testSubscribeWithClientAck() throws Exception
+ {
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
- frame =
- "SUBSCRIBE\n" +
- "destination:" + getQueuePrefix() + getQueueName() + "\n" +
- "ack:client\n\n" +
- Stomp.NULL;
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
- sendFrame(frame);
-
- sendMessage(getName());
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("MESSAGE"));
- Pattern cl = Pattern.compile("message-id:\\s*(\\S+)", Pattern.CASE_INSENSITIVE);
- Matcher cl_matcher = cl.matcher(frame);
- Assert.assertTrue(cl_matcher.find());
- String messageID = cl_matcher.group(1);
+ frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:client\n\n" + Stomp.NULL;
- frame =
- "ACK\n" +
- "message-id: " + messageID + "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ sendFrame(frame);
- frame =
- "DISCONNECT\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ sendMessage(getName());
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+ Pattern cl = Pattern.compile("message-id:\\s*(\\S+)", Pattern.CASE_INSENSITIVE);
+ Matcher cl_matcher = cl.matcher(frame);
+ Assert.assertTrue(cl_matcher.find());
+ String messageID = cl_matcher.group(1);
- // message should not be received since message was acknowledged by the client
- MessageConsumer consumer = session.createConsumer(queue);
- Message message = consumer.receive(1000);
- Assert.assertNull(message);
+ frame = "ACK\n" + "message-id: " + messageID + "\n\n" + Stomp.NULL;
+ sendFrame(frame);
+
+ frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ sendFrame(frame);
+
+ // message should not be received since message was acknowledged by the client
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message message = consumer.receive(1000);
+ Assert.assertNull(message);
}
-
- public void testRedeliveryWithClientAck() throws Exception {
- String frame =
- "CONNECT\n" +
- "login: brianm\n" +
- "passcode: wombats\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ public void testRedeliveryWithClientAck() throws Exception
+ {
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
- frame =
- "SUBSCRIBE\n" +
- "destination:" + getQueuePrefix() + getQueueName() + "\n" +
- "ack:client\n\n" +
- Stomp.NULL;
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
- sendFrame(frame);
-
- sendMessage(getName());
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("MESSAGE"));
+ frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:client\n\n" + Stomp.NULL;
- frame =
- "DISCONNECT\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ sendFrame(frame);
- // message should be received since message was not acknowledged
- MessageConsumer consumer = session.createConsumer(queue);
- Message message = consumer.receive(1000);
- Assert.assertNotNull(message);
- Assert.assertTrue(message.getJMSRedelivered());
- }
-
- public void testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithNoDisconnectFrame() throws Exception {
- assertSubscribeWithClientAckThenConsumeWithAutoAck(false);
- }
+ sendMessage(getName());
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
- public void testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithExplicitDisconnect() throws Exception {
- assertSubscribeWithClientAckThenConsumeWithAutoAck(true);
- }
+ frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ sendFrame(frame);
- protected void assertSubscribeWithClientAckThenConsumeWithAutoAck(boolean sendDisconnect) throws Exception {
+ // message should be received since message was not acknowledged
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message message = consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertTrue(message.getJMSRedelivered());
+ }
- String frame =
- "CONNECT\n" +
- "login: brianm\n" +
- "passcode: wombats\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ public void testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithNoDisconnectFrame() throws Exception
+ {
+ assertSubscribeWithClientAckThenConsumeWithAutoAck(false);
+ }
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ public void testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithExplicitDisconnect() throws Exception
+ {
+ assertSubscribeWithClientAckThenConsumeWithAutoAck(true);
+ }
- frame =
- "SUBSCRIBE\n" +
- "destination:" + getQueuePrefix() + getQueueName() + "\n" +
- "ack:client\n\n" +
- Stomp.NULL;
+ protected void assertSubscribeWithClientAckThenConsumeWithAutoAck(boolean sendDisconnect) throws Exception
+ {
- sendFrame(frame);
- sendMessage(getName());
+ String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("MESSAGE"));
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
- log.info("Reconnecting!");
-
- if (sendDisconnect) {
- frame =
- "DISCONNECT\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
- waitForFrameToTakeEffect();
- reconnect();
- }
- else {
- reconnect(1000);
- waitForFrameToTakeEffect();
- }
+ frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:client\n\n" + Stomp.NULL;
+ sendFrame(frame);
+ sendMessage(getName());
- // message should be received since message was not acknowledged
- frame =
- "CONNECT\n" +
- "login: brianm\n" +
- "passcode: wombats\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ log.info("Reconnecting!");
- frame =
- "SUBSCRIBE\n" +
- "destination:" + getQueuePrefix() + getQueueName() + "\n\n" +
- Stomp.NULL;
+ if (sendDisconnect)
+ {
+ frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ sendFrame(frame);
+ waitForFrameToTakeEffect();
+ reconnect();
+ }
+ else
+ {
+ reconnect(1000);
+ waitForFrameToTakeEffect();
+ }
- sendFrame(frame);
+ // message should be received since message was not acknowledged
+ frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("MESSAGE"));
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
- frame =
- "DISCONNECT\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
- waitForFrameToTakeEffect();
-
- // now lets make sure we don't see the message again
- reconnect();
+ frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n\n" + Stomp.NULL;
- frame =
- "CONNECT\n" +
- "login: brianm\n" +
- "passcode: wombats\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ sendFrame(frame);
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
- frame =
- "SUBSCRIBE\n" +
- "destination:" + getQueuePrefix() + getQueueName() + "\n" +
- "receipt: 1234\n\n" +
- Stomp.NULL;
+ frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ sendFrame(frame);
+ waitForFrameToTakeEffect();
- sendFrame(frame);
- // wait for SUBSCRIBE's receipt
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("RECEIPT"));
+ // now lets make sure we don't see the message again
+ reconnect();
- sendMessage("shouldBeNextMessage");
+ frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("MESSAGE"));
- System.out.println(frame);
- Assert.assertTrue(frame.contains("shouldBeNextMessage"));
- }
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
- public void testUnsubscribe() throws Exception {
+ frame = "SUBSCRIBE\n" + "destination:" +
+ getQueuePrefix() +
+ getQueueName() +
+ "\n" +
+ "receipt: 1234\n\n" +
+ Stomp.NULL;
- String frame =
- "CONNECT\n" +
- "login: brianm\n" +
- "passcode: wombats\n\n" +
- Stomp.NULL;
- sendFrame(frame);
- frame = receiveFrame(100000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ sendFrame(frame);
+ // wait for SUBSCRIBE's receipt
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("RECEIPT"));
- frame =
- "SUBSCRIBE\n" +
- "destination:" + getQueuePrefix() + getQueueName() + "\n" +
- "ack:auto\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ sendMessage("shouldBeNextMessage");
- //send a message to our queue
- sendMessage("first message");
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+ System.out.println(frame);
+ Assert.assertTrue(frame.contains("shouldBeNextMessage"));
+ }
- //receive message from socket
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("MESSAGE"));
+ public void testUnsubscribe() throws Exception
+ {
- //remove suscription
- frame =
- "UNSUBSCRIBE\n" +
- "destination:" + getQueuePrefix() + getQueueName() + "\n" +
- "receipt:567\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
- waitForReceipt();
+ String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
+ frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
- //send a message to our queue
- sendMessage("second message");
+ frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
+ sendFrame(frame);
- try {
- frame = receiveFrame(1000);
- log.info("Received frame: " + frame);
- Assert.fail("No message should have been received since subscription was removed");
- }
- catch (SocketTimeoutException e) {
+ // send a message to our queue
+ sendMessage("first message");
- }
- }
+ // receive message from socket
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
- public void testUnsubscribeWithID() throws Exception {
+ // remove suscription
+ frame = "UNSUBSCRIBE\n" + "destination:" +
+ getQueuePrefix() +
+ getQueueName() +
+ "\n" +
+ "receipt:567\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+ waitForReceipt();
- String frame =
- "CONNECT\n" +
- "login: brianm\n" +
- "passcode: wombats\n\n" +
- Stomp.NULL;
- sendFrame(frame);
- frame = receiveFrame(100000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ // send a message to our queue
+ sendMessage("second message");
- frame =
- "SUBSCRIBE\n" +
- "destination:" + getQueuePrefix() + getQueueName() + "\n" +
- "id: mysubid\n" +
- "ack:auto\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ try
+ {
+ frame = receiveFrame(1000);
+ log.info("Received frame: " + frame);
+ Assert.fail("No message should have been received since subscription was removed");
+ }
+ catch (SocketTimeoutException e)
+ {
- //send a message to our queue
- sendMessage("first message");
+ }
+ }
- //receive message from socket
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("MESSAGE"));
+ public void testUnsubscribeWithID() throws Exception
+ {
- //remove suscription
- frame =
- "UNSUBSCRIBE\n" +
- "id:mysubid\n" +
- "receipt: 345\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
- waitForReceipt();
+ String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
+ frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
- //send a message to our queue
- sendMessage("second message");
+ frame = "SUBSCRIBE\n" + "destination:" +
+ getQueuePrefix() +
+ getQueueName() +
+ "\n" +
+ "id: mysubid\n" +
+ "ack:auto\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
- try {
- frame = receiveFrame(1000);
- log.info("Received frame: " + frame);
- Assert.fail("No message should have been received since subscription was removed");
- }
- catch (SocketTimeoutException e) {
+ // send a message to our queue
+ sendMessage("first message");
- }
- }
+ // receive message from socket
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
- public void testTransactionCommit() throws Exception {
- MessageConsumer consumer = session.createConsumer(queue);
+ // remove suscription
+ frame = "UNSUBSCRIBE\n" + "id:mysubid\n" + "receipt: 345\n" + "\n\n" + Stomp.NULL;
+ sendFrame(frame);
+ waitForReceipt();
- String frame =
- "CONNECT\n" +
- "login: brianm\n" +
- "passcode: wombats\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ // send a message to our queue
+ sendMessage("second message");
- String f = receiveFrame(1000);
- Assert.assertTrue(f.startsWith("CONNECTED"));
+ try
+ {
+ frame = receiveFrame(1000);
+ log.info("Received frame: " + frame);
+ Assert.fail("No message should have been received since subscription was removed");
+ }
+ catch (SocketTimeoutException e)
+ {
- frame =
- "BEGIN\n" +
- "transaction: tx1\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ }
+ }
- frame =
- "SEND\n" +
- "destination:" + getQueuePrefix() + getQueueName() + "\n" +
- "transaction: tx1\n" +
- "receipt: 123\n" +
- "\n\n" +
- "Hello World" +
- Stomp.NULL;
- sendFrame(frame);
- waitForReceipt();
-
- // check the message is not committed
- assertNull(consumer.receive(100));
-
- frame =
- "COMMIT\n" +
- "transaction: tx1\n" +
- "receipt:456\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
- waitForReceipt();
+ public void testTransactionCommit() throws Exception
+ {
+ MessageConsumer consumer = session.createConsumer(queue);
- Message message = consumer.receive(1000);
- Assert.assertNotNull("Should have received a message", message);
- }
-
- public void testSuccessiveTransactionsWithSameID() throws Exception {
- MessageConsumer consumer = session.createConsumer(queue);
+ String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
- String frame =
- "CONNECT\n" +
- "login: brianm\n" +
- "passcode: wombats\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ String f = receiveFrame(1000);
+ Assert.assertTrue(f.startsWith("CONNECTED"));
- String f = receiveFrame(1000);
- Assert.assertTrue(f.startsWith("CONNECTED"));
+ frame = "BEGIN\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
+ sendFrame(frame);
- // first tx
- frame =
- "BEGIN\n" +
- "transaction: tx1\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ frame = "SEND\n" + "destination:" +
+ getQueuePrefix() +
+ getQueueName() +
+ "\n" +
+ "transaction: tx1\n" +
+ "receipt: 123\n" +
+ "\n\n" +
+ "Hello World" +
+ Stomp.NULL;
+ sendFrame(frame);
+ waitForReceipt();
- frame =
- "SEND\n" +
- "destination:" + getQueuePrefix() + getQueueName() + "\n" +
- "transaction: tx1\n" +
- "\n\n" +
- "Hello World" +
- Stomp.NULL;
- sendFrame(frame);
+ // check the message is not committed
+ assertNull(consumer.receive(100));
- frame =
- "COMMIT\n" +
- "transaction: tx1\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ frame = "COMMIT\n" + "transaction: tx1\n" + "receipt:456\n" + "\n\n" + Stomp.NULL;
+ sendFrame(frame);
+ waitForReceipt();
- Message message = consumer.receive(1000);
- Assert.assertNotNull("Should have received a message", message);
+ Message message = consumer.receive(1000);
+ Assert.assertNotNull("Should have received a message", message);
+ }
- // 2nd tx with same tx ID
- frame =
- "BEGIN\n" +
- "transaction: tx1\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ public void testSuccessiveTransactionsWithSameID() throws Exception
+ {
+ MessageConsumer consumer = session.createConsumer(queue);
- frame =
- "SEND\n" +
- "destination:" + getQueuePrefix() + getQueueName() + "\n" +
- "transaction: tx1\n" +
- "\n\n" +
- "Hello World" +
- Stomp.NULL;
- sendFrame(frame);
+ String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
- frame =
- "COMMIT\n" +
- "transaction: tx1\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ String f = receiveFrame(1000);
+ Assert.assertTrue(f.startsWith("CONNECTED"));
- message = consumer.receive(1000);
- Assert.assertNotNull("Should have received a message", message);
-}
-
- public void testBeginSameTransactionTwice() throws Exception {
- String frame =
- "CONNECT\n" +
- "login: brianm\n" +
- "passcode: wombats\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ // first tx
+ frame = "BEGIN\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
+ sendFrame(frame);
- String f = receiveFrame(1000);
- Assert.assertTrue(f.startsWith("CONNECTED"));
+ frame = "SEND\n" + "destination:" +
+ getQueuePrefix() +
+ getQueueName() +
+ "\n" +
+ "transaction: tx1\n" +
+ "\n\n" +
+ "Hello World" +
+ Stomp.NULL;
+ sendFrame(frame);
- frame =
- "BEGIN\n" +
- "transaction: tx1\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ frame = "COMMIT\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
+ sendFrame(frame);
- // begin the tx a 2nd time
- frame =
- "BEGIN\n" +
- "transaction: tx1\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ Message message = consumer.receive(1000);
+ Assert.assertNotNull("Should have received a message", message);
- f = receiveFrame(1000);
- Assert.assertTrue(f.startsWith("ERROR"));
+ // 2nd tx with same tx ID
+ frame = "BEGIN\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
+ sendFrame(frame);
+ frame = "SEND\n" + "destination:" +
+ getQueuePrefix() +
+ getQueueName() +
+ "\n" +
+ "transaction: tx1\n" +
+ "\n\n" +
+ "Hello World" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = "COMMIT\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
+ sendFrame(frame);
+
+ message = consumer.receive(1000);
+ Assert.assertNotNull("Should have received a message", message);
}
- public void testTransactionRollback() throws Exception {
- MessageConsumer consumer = session.createConsumer(queue);
+ public void testBeginSameTransactionTwice() throws Exception
+ {
+ String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
- String frame =
- "CONNECT\n" +
- "login: brianm\n" +
- "passcode: wombats\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ String f = receiveFrame(1000);
+ Assert.assertTrue(f.startsWith("CONNECTED"));
- String f = receiveFrame(1000);
- Assert.assertTrue(f.startsWith("CONNECTED"));
+ frame = "BEGIN\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
+ sendFrame(frame);
- frame =
- "BEGIN\n" +
- "transaction: tx1\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ // begin the tx a 2nd time
+ frame = "BEGIN\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
+ sendFrame(frame);
- frame =
- "SEND\n" +
- "destination:" + getQueuePrefix() + getQueueName() + "\n" +
- "transaction: tx1\n" +
- "\n" +
- "first message" +
- Stomp.NULL;
- sendFrame(frame);
+ f = receiveFrame(1000);
+ Assert.assertTrue(f.startsWith("ERROR"));
- //rollback first message
- frame =
- "ABORT\n" +
- "transaction: tx1\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ }
- frame =
- "BEGIN\n" +
- "transaction: tx1\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ public void testTransactionRollback() throws Exception
+ {
+ MessageConsumer consumer = session.createConsumer(queue);
- frame =
- "SEND\n" +
- "destination:" + getQueuePrefix() + getQueueName() + "\n" +
- "transaction: tx1\n" +
- "\n" +
- "second message" +
- Stomp.NULL;
- sendFrame(frame);
+ String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
- frame =
- "COMMIT\n" +
- "transaction: tx1\n" +
- "receipt:789\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
- waitForReceipt();
+ String f = receiveFrame(1000);
+ Assert.assertTrue(f.startsWith("CONNECTED"));
- //only second msg should be received since first msg was rolled back
- BytesMessage message = (BytesMessage) consumer.receive(1000);
- Assert.assertNotNull(message);
- Assert.assertEquals("second message", readContent(message));
- }
-
- public void testSubscribeToTopic() throws Exception {
+ frame = "BEGIN\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
+ sendFrame(frame);
- String frame =
- "CONNECT\n" +
- "login: brianm\n" +
- "passcode: wombats\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ frame = "SEND\n" + "destination:" +
+ getQueuePrefix() +
+ getQueueName() +
+ "\n" +
+ "transaction: tx1\n" +
+ "\n" +
+ "first message" +
+ Stomp.NULL;
+ sendFrame(frame);
- frame = receiveFrame(100000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ // rollback first message
+ frame = "ABORT\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
+ sendFrame(frame);
- frame =
- "SUBSCRIBE\n" +
- "destination:" + getTopicPrefix() + getTopicName() + "\n" +
- "receipt: 12\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
- // wait for SUBSCRIBE's receipt
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("RECEIPT"));
+ frame = "BEGIN\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
+ sendFrame(frame);
- sendMessage(getName(), topic);
+ frame = "SEND\n" + "destination:" +
+ getQueuePrefix() +
+ getQueueName() +
+ "\n" +
+ "transaction: tx1\n" +
+ "\n" +
+ "second message" +
+ Stomp.NULL;
+ sendFrame(frame);
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("MESSAGE"));
- Assert.assertTrue(frame.indexOf("destination:") > 0);
- Assert.assertTrue(frame.indexOf(getName()) > 0);
+ frame = "COMMIT\n" + "transaction: tx1\n" + "receipt:789\n" + "\n\n" + Stomp.NULL;
+ sendFrame(frame);
+ waitForReceipt();
- frame =
- "UNSUBSCRIBE\n" +
- "destination:" + getTopicPrefix() + getTopicName() + "\n" +
- "receipt: 1234\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
- // wait for UNSUBSCRIBE's receipt
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("RECEIPT"));
-
- sendMessage(getName(), topic);
+ // only second msg should be received since first msg was rolled back
+ BytesMessage message = (BytesMessage)consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("second message", readContent(message));
+ }
- try {
- frame = receiveFrame(1000);
- log.info("Received frame: " + frame);
- Assert.fail("No message should have been received since subscription was removed");
+ public void testSubscribeToTopic() throws Exception
+ {
+
+ String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame = "SUBSCRIBE\n" + "destination:" +
+ getTopicPrefix() +
+ getTopicName() +
+ "\n" +
+ "receipt: 12\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+ // wait for SUBSCRIBE's receipt
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("RECEIPT"));
+
+ sendMessage(getName(), topic);
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+ Assert.assertTrue(frame.indexOf("destination:") > 0);
+ Assert.assertTrue(frame.indexOf(getName()) > 0);
+
+ frame = "UNSUBSCRIBE\n" + "destination:" +
+ getTopicPrefix() +
+ getTopicName() +
+ "\n" +
+ "receipt: 1234\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+ // wait for UNSUBSCRIBE's receipt
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("RECEIPT"));
+
+ sendMessage(getName(), topic);
+
+ try
+ {
+ frame = receiveFrame(1000);
+ log.info("Received frame: " + frame);
+ Assert.fail("No message should have been received since subscription was removed");
}
- catch (SocketTimeoutException e) {
+ catch (SocketTimeoutException e)
+ {
}
-
- frame =
- "DISCONNECT\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+
+ frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ sendFrame(frame);
}
-
- public void testDurableSubscriberWithReconnection() throws Exception {
- String connectFame =
- "CONNECT\n" +
- "login: brianm\n" +
- "passcode: wombats\n" +
- "client-id: myclientid\n\n" +
- Stomp.NULL;
- sendFrame(connectFame);
+ public void testDurableSubscriberWithReconnection() throws Exception
+ {
- String frame = receiveFrame(100000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ String connectFame = "CONNECT\n" + "login: brianm\n" +
+ "passcode: wombats\n" +
+ "client-id: myclientid\n\n" +
+ Stomp.NULL;
+ sendFrame(connectFame);
- String subscribeFrame =
- "SUBSCRIBE\n" +
- "destination:" + getTopicPrefix() + getTopicName() + "\n" +
- "durable-subscriber-name: " + getName() + "\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(subscribeFrame);
- waitForFrameToTakeEffect();
+ String frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
- String disconnectFrame =
- "DISCONNECT\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(disconnectFrame);
- waitForFrameToTakeEffect();
-
- // send the message when the durable subscriber is disconnected
- sendMessage(getName(), topic);
+ String subscribeFrame = "SUBSCRIBE\n" + "destination:" +
+ getTopicPrefix() +
+ getTopicName() +
+ "\n" +
+ "durable-subscriber-name: " +
+ getName() +
+ "\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(subscribeFrame);
+ waitForFrameToTakeEffect();
- reconnect(1000);
- sendFrame(connectFame);
- frame = receiveFrame(100000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
-
- sendFrame(subscribeFrame);
+ String disconnectFrame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ sendFrame(disconnectFrame);
+ waitForFrameToTakeEffect();
- // we must have received the message
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("MESSAGE"));
- Assert.assertTrue(frame.indexOf("destination:") > 0);
- Assert.assertTrue(frame.indexOf(getName()) > 0);
+ // send the message when the durable subscriber is disconnected
+ sendMessage(getName(), topic);
- String unsubscribeFrame =
- "UNSUBSCRIBE\n" +
- "destination:" + getTopicPrefix() + getTopicName() + "\n" +
- "receipt: 1234\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(unsubscribeFrame);
- // wait for UNSUBSCRIBE's receipt
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("RECEIPT"));
-
- sendFrame(disconnectFrame);
+ reconnect(1000);
+ sendFrame(connectFame);
+ frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ sendFrame(subscribeFrame);
+
+ // we must have received the message
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+ Assert.assertTrue(frame.indexOf("destination:") > 0);
+ Assert.assertTrue(frame.indexOf(getName()) > 0);
+
+ String unsubscribeFrame = "UNSUBSCRIBE\n" + "destination:" +
+ getTopicPrefix() +
+ getTopicName() +
+ "\n" +
+ "receipt: 1234\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(unsubscribeFrame);
+ // wait for UNSUBSCRIBE's receipt
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("RECEIPT"));
+
+ sendFrame(disconnectFrame);
}
-
- public void testDurableSubscriber() throws Exception {
- String frame =
- "CONNECT\n" +
- "login: brianm\n" +
- "passcode: wombats\n" +
- "client-id: myclientid\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ public void testDurableSubscriber() throws Exception
+ {
- frame = receiveFrame(100000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "client-id: myclientid\n\n" + Stomp.NULL;
+ sendFrame(frame);
- String subscribeFrame =
- "SUBSCRIBE\n" +
- "destination:" + getTopicPrefix() + getTopicName() + "\n" +
- "receipt: 12\n" +
- "durable-subscriber-name: " + getName() + "\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(subscribeFrame);
- // wait for SUBSCRIBE's receipt
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("RECEIPT"));
+ frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
- // creating a subscriber with the same durable-subscriber-name must fail
- sendFrame(subscribeFrame);
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("ERROR"));
-
- frame =
- "DISCONNECT\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ String subscribeFrame = "SUBSCRIBE\n" + "destination:" +
+ getTopicPrefix() +
+ getTopicName() +
+ "\n" +
+ "receipt: 12\n" +
+ "durable-subscriber-name: " +
+ getName() +
+ "\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(subscribeFrame);
+ // wait for SUBSCRIBE's receipt
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("RECEIPT"));
+
+ // creating a subscriber with the same durable-subscriber-name must fail
+ sendFrame(subscribeFrame);
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("ERROR"));
+
+ frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ sendFrame(frame);
}
-
- public void testSubscribeToTopicWithNoLocal() throws Exception {
- String frame =
- "CONNECT\n" +
- "login: brianm\n" +
- "passcode: wombats\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ public void testSubscribeToTopicWithNoLocal() throws Exception
+ {
- frame = receiveFrame(100000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
- frame =
- "SUBSCRIBE\n" +
- "destination:" + getTopicPrefix() + getTopicName() + "\n" +
- "receipt: 12\n" +
- "no-local: true\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
- // wait for SUBSCRIBE's receipt
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("RECEIPT"));
+ frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
- // send a message on the same connection => it should not be received
- frame = "SEND\n" +
- "destination:" + getTopicPrefix() + getTopicName() + "\n\n" +
- "Hello World" +
- Stomp.NULL;
- sendFrame(frame);
-
- try {
- frame = receiveFrame(2000);
- log.info("Received frame: " + frame);
- Assert.fail("No message should have been received since subscription is noLocal");
+ frame = "SUBSCRIBE\n" + "destination:" +
+ getTopicPrefix() +
+ getTopicName() +
+ "\n" +
+ "receipt: 12\n" +
+ "no-local: true\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+ // wait for SUBSCRIBE's receipt
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("RECEIPT"));
+
+ // send a message on the same connection => it should not be received
+ frame = "SEND\n" + "destination:" + getTopicPrefix() + getTopicName() + "\n\n" + "Hello World" + Stomp.NULL;
+ sendFrame(frame);
+
+ try
+ {
+ frame = receiveFrame(2000);
+ log.info("Received frame: " + frame);
+ Assert.fail("No message should have been received since subscription is noLocal");
}
- catch (SocketTimeoutException e) {
+ catch (SocketTimeoutException e)
+ {
}
-
+
// send message on another JMS connection => it should be received
sendMessage(getName(), topic);
frame = receiveFrame(10000);
Assert.assertTrue(frame.startsWith("MESSAGE"));
Assert.assertTrue(frame.indexOf("destination:") > 0);
Assert.assertTrue(frame.indexOf(getName()) > 0);
-
- frame =
- "DISCONNECT\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+
+ frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ sendFrame(frame);
}
-
- public void testClientAckNotPartOfTransaction() throws Exception {
- String frame =
- "CONNECT\n" +
- "login: brianm\n" +
- "passcode: wombats\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ public void testClientAckNotPartOfTransaction() throws Exception
+ {
- frame = receiveFrame(100000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
- frame =
- "SUBSCRIBE\n" +
- "destination:" + getQueuePrefix() + getQueueName() + "\n" +
- "ack:client\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
- sendMessage(getName());
+ frame = "SUBSCRIBE\n" + "destination:" +
+ getQueuePrefix() +
+ getQueueName() +
+ "\n" +
+ "ack:client\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("MESSAGE"));
- Assert.assertTrue(frame.indexOf("destination:") > 0);
- Assert.assertTrue(frame.indexOf(getName()) > 0);
- Assert.assertTrue(frame.indexOf("message-id:") > 0);
- Pattern cl = Pattern.compile("message-id:\\s*(\\S+)", Pattern.CASE_INSENSITIVE);
- Matcher cl_matcher = cl.matcher(frame);
- Assert.assertTrue(cl_matcher.find());
- String messageID = cl_matcher.group(1);
+ sendMessage(getName());
- frame =
- "BEGIN\n" +
- "transaction: tx1\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+ Assert.assertTrue(frame.indexOf("destination:") > 0);
+ Assert.assertTrue(frame.indexOf(getName()) > 0);
+ Assert.assertTrue(frame.indexOf("message-id:") > 0);
+ Pattern cl = Pattern.compile("message-id:\\s*(\\S+)", Pattern.CASE_INSENSITIVE);
+ Matcher cl_matcher = cl.matcher(frame);
+ Assert.assertTrue(cl_matcher.find());
+ String messageID = cl_matcher.group(1);
- frame =
- "ACK\n" +
- "message-id:" + messageID + "\n" +
- "transaction: tx1\n" +
- "\n" +
- "second message" +
- Stomp.NULL;
- sendFrame(frame);
+ frame = "BEGIN\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
+ sendFrame(frame);
- frame =
- "ABORT\n" +
- "transaction: tx1\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
-
- try {
- frame = receiveFrame(1000);
- log.info("Received frame: " + frame);
- Assert.fail("No message should have been received as the message was acked even though the transaction has been aborted");
- }
- catch (SocketTimeoutException e) {
- }
-
- frame =
- "UNSUBSCRIBE\n" +
- "destination:" + getQueuePrefix() + getQueueName() + "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
-
- frame =
- "DISCONNECT\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ frame = "ACK\n" + "message-id:" + messageID + "\n" + "transaction: tx1\n" + "\n" + "second message" + Stomp.NULL;
+ sendFrame(frame);
+
+ frame = "ABORT\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
+ sendFrame(frame);
+
+ try
+ {
+ frame = receiveFrame(1000);
+ log.info("Received frame: " + frame);
+ Assert.fail("No message should have been received as the message was acked even though the transaction has been aborted");
+ }
+ catch (SocketTimeoutException e)
+ {
+ }
+
+ frame = "UNSUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n\n" + Stomp.NULL;
+ sendFrame(frame);
+
+ frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ sendFrame(frame);
}
-
- // Implementation methods
- //-------------------------------------------------------------------------
- protected void setUp() throws Exception {
- super.setUp();
-
- server = createServer();
- server.start();
- connectionFactory = createConnectionFactory();
- stompSocket = createSocket();
- inputBuffer = new ByteArrayOutputStream();
+ // Implementation methods
+ // -------------------------------------------------------------------------
+ protected void setUp() throws Exception
+ {
+ super.setUp();
- connection = connectionFactory.createConnection();
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- queue = session.createQueue(getQueueName());
- topic = session.createTopic(getTopicName());
- connection.start();
- }
+ server = createServer();
+ server.start();
+ connectionFactory = createConnectionFactory();
- /**
- * @return
- * @throws Exception
- */
+ stompSocket = createSocket();
+ inputBuffer = new ByteArrayOutputStream();
+
+ connection = connectionFactory.createConnection();
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ queue = session.createQueue(getQueueName());
+ topic = session.createTopic(getTopicName());
+ connection.start();
+ }
+
+ /**
+ * @return
+ * @throws Exception
+ */
private JMSServerManager createServer() throws Exception
{
Configuration config = new ConfigurationImpl();
@@ -1479,149 +1283,178 @@
TransportConfiguration stompTransport = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
config.getAcceptorConfigurations().add(stompTransport);
config.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
- HornetQServer hornetQServer = HornetQServers.newHornetQServer(config);
-
- JMSConfiguration jmsConfig = new JMSConfigurationImpl();
- jmsConfig.getQueueConfigurations().add(new JMSQueueConfigurationImpl(getQueueName(), null, false, getQueueName()));
- jmsConfig.getTopicConfigurations().add(new TopicConfigurationImpl(getTopicName(), getTopicName()));
- server = new JMSServerManagerImpl(hornetQServer, jmsConfig);
- server.setContext(null);
- return server;
+ HornetQServer hornetQServer = HornetQServers.newHornetQServer(config);
+
+ JMSConfiguration jmsConfig = new JMSConfigurationImpl();
+ jmsConfig.getQueueConfigurations()
+ .add(new JMSQueueConfigurationImpl(getQueueName(), null, false, getQueueName()));
+ jmsConfig.getTopicConfigurations().add(new TopicConfigurationImpl(getTopicName(), getTopicName()));
+ server = new JMSServerManagerImpl(hornetQServer, jmsConfig);
+ server.setContext(null);
+ return server;
}
- protected void tearDown() throws Exception {
- connection.close();
- if (stompSocket != null) {
- stompSocket.close();
- }
- server.stop();
-
- super.tearDown();
- }
+ protected void tearDown() throws Exception
+ {
+ connection.close();
+ if (stompSocket != null)
+ {
+ stompSocket.close();
+ }
+ server.stop();
- protected void reconnect() throws Exception {
- reconnect(0);
- }
- protected void reconnect(long sleep) throws Exception {
- stompSocket.close();
+ super.tearDown();
+ }
- if (sleep > 0) {
- Thread.sleep(sleep);
- }
+ protected void reconnect() throws Exception
+ {
+ reconnect(0);
+ }
- stompSocket = createSocket();
- inputBuffer = new ByteArrayOutputStream();
- }
+ protected void reconnect(long sleep) throws Exception
+ {
+ stompSocket.close();
- protected ConnectionFactory createConnectionFactory() {
- return new HornetQConnectionFactory(new TransportConfiguration(InVMConnectorFactory.class.getName()));
- }
+ if (sleep > 0)
+ {
+ Thread.sleep(sleep);
+ }
- protected Socket createSocket() throws IOException {
- return new Socket("127.0.0.1", port);
- }
+ stompSocket = createSocket();
+ inputBuffer = new ByteArrayOutputStream();
+ }
- protected String getQueueName() {
- return "test";
- }
+ protected ConnectionFactory createConnectionFactory()
+ {
+ return new HornetQConnectionFactory(new TransportConfiguration(InVMConnectorFactory.class.getName()));
+ }
- protected String getQueuePrefix() {
- return "jms.queue.";
+ protected Socket createSocket() throws IOException
+ {
+ return new Socket("127.0.0.1", port);
}
-
- protected String getTopicName() {
- return "testtopic";
+
+ protected String getQueueName()
+ {
+ return "test";
}
- protected String getTopicPrefix() {
- return "jms.topic.";
+ protected String getQueuePrefix()
+ {
+ return "jms.queue.";
}
- public void sendFrame(String data) throws Exception {
- byte[] bytes = data.getBytes("UTF-8");
- OutputStream outputStream = stompSocket.getOutputStream();
- for (int i = 0; i < bytes.length; i++) {
- outputStream.write(bytes[i]);
- }
- outputStream.flush();
- }
+ protected String getTopicName()
+ {
+ return "testtopic";
+ }
- public void sendFrame(byte[] data) throws Exception {
- OutputStream outputStream = stompSocket.getOutputStream();
- for (int i = 0; i < data.length; i++) {
- outputStream.write(data[i]);
- }
- outputStream.flush();
- }
-
- public String receiveFrame(long timeOut) throws Exception {
- stompSocket.setSoTimeout((int) timeOut);
- InputStream is = stompSocket.getInputStream();
- int c = 0;
- for (; ;) {
+ protected String getTopicPrefix()
+ {
+ return "jms.topic.";
+ }
+
+ public void sendFrame(String data) throws Exception
+ {
+ byte[] bytes = data.getBytes("UTF-8");
+ OutputStream outputStream = stompSocket.getOutputStream();
+ for (int i = 0; i < bytes.length; i++)
+ {
+ outputStream.write(bytes[i]);
+ }
+ outputStream.flush();
+ }
+
+ public void sendFrame(byte[] data) throws Exception
+ {
+ OutputStream outputStream = stompSocket.getOutputStream();
+ for (int i = 0; i < data.length; i++)
+ {
+ outputStream.write(data[i]);
+ }
+ outputStream.flush();
+ }
+
+ public String receiveFrame(long timeOut) throws Exception
+ {
+ stompSocket.setSoTimeout((int)timeOut);
+ InputStream is = stompSocket.getInputStream();
+ int c = 0;
+ for (;;)
+ {
+ c = is.read();
+ if (c < 0)
+ {
+ throw new IOException("socket closed.");
+ }
+ else if (c == 0)
+ {
c = is.read();
- if (c < 0) {
- throw new IOException("socket closed.");
+ if (c != '\n')
+ {
+ byte[] ba = inputBuffer.toByteArray();
+ System.out.println(new String(ba, "UTF-8"));
}
- else if (c == 0) {
- c = is.read();
- if (c != '\n')
- {
- byte[] ba = inputBuffer.toByteArray();
- System.out.println(new String(ba, "UTF-8"));
- }
- Assert.assertEquals("Expecting stomp frame to terminate with \0\n", c, '\n');
- byte[] ba = inputBuffer.toByteArray();
- inputBuffer.reset();
- return new String(ba, "UTF-8");
- }
- else {
- inputBuffer.write(c);
- }
- }
- }
+ Assert.assertEquals("Expecting stomp frame to terminate with \0\n", c, '\n');
+ byte[] ba = inputBuffer.toByteArray();
+ inputBuffer.reset();
+ return new String(ba, "UTF-8");
+ }
+ else
+ {
+ inputBuffer.write(c);
+ }
+ }
+ }
- public void sendMessage(String msg) throws Exception {
- sendMessage(msg.getBytes("UTF-8"), "foo", "xyz", queue);
+ public void sendMessage(String msg) throws Exception
+ {
+ sendMessage(msg.getBytes("UTF-8"), "foo", "xyz", queue);
}
- public void sendMessage(String msg, Destination destination) throws Exception {
- sendMessage(msg.getBytes("UTF-8"), "foo", "xyz", destination);
- }
+ public void sendMessage(String msg, Destination destination) throws Exception
+ {
+ sendMessage(msg.getBytes("UTF-8"), "foo", "xyz", destination);
+ }
- public void sendMessage(byte[] data, Destination destination) throws Exception {
- sendMessage(data, "foo", "xyz", destination);
+ public void sendMessage(byte[] data, Destination destination) throws Exception
+ {
+ sendMessage(data, "foo", "xyz", destination);
}
-
- public void sendMessage(String msg, String propertyName, String propertyValue) throws Exception {
- sendMessage(msg.getBytes("UTF-8"), propertyName, propertyValue, queue);
- }
- public void sendMessage(byte[] data, String propertyName, String propertyValue, Destination destination) throws Exception {
- MessageProducer producer = session.createProducer(destination);
- BytesMessage message = session.createBytesMessage();
- message.setStringProperty(propertyName, propertyValue);
- message.writeBytes(data);
- producer.send(message);
- }
-
- public String readContent(BytesMessage message) throws Exception
- {
- byte[] data = new byte[1024];
- int size = message.readBytes(data);
- return new String(data, 0, size, "UTF-8");
- }
-
- protected void waitForReceipt() throws Exception {
- String frame = receiveFrame(50000);
- assertNotNull(frame);
- assertTrue(frame.indexOf("RECEIPT") > -1);
+ public void sendMessage(String msg, String propertyName, String propertyValue) throws Exception
+ {
+ sendMessage(msg.getBytes("UTF-8"), propertyName, propertyValue, queue);
}
-
- protected void waitForFrameToTakeEffect() throws InterruptedException {
- // bit of a dirty hack :)
- // another option would be to force some kind of receipt to be returned
- // from the frame
- Thread.sleep(2000);
- }
+
+ public void sendMessage(byte[] data, String propertyName, String propertyValue, Destination destination) throws Exception
+ {
+ MessageProducer producer = session.createProducer(destination);
+ BytesMessage message = session.createBytesMessage();
+ message.setStringProperty(propertyName, propertyValue);
+ message.writeBytes(data);
+ producer.send(message);
+ }
+
+ public String readContent(BytesMessage message) throws Exception
+ {
+ byte[] data = new byte[1024];
+ int size = message.readBytes(data);
+ return new String(data, 0, size, "UTF-8");
+ }
+
+ protected void waitForReceipt() throws Exception
+ {
+ String frame = receiveFrame(50000);
+ assertNotNull(frame);
+ assertTrue(frame.indexOf("RECEIPT") > -1);
+ }
+
+ protected void waitForFrameToTakeEffect() throws InterruptedException
+ {
+ // bit of a dirty hack :)
+ // another option would be to force some kind of receipt to be returned
+ // from the frame
+ Thread.sleep(2000);
+ }
}
14 years, 1 month
JBoss hornetq SVN: r9117 - trunk/src/main/org/hornetq/core/server/management/impl.
by do-not-reply@jboss.org
Author: timfox
Date: 2010-04-14 13:12:52 -0400 (Wed, 14 Apr 2010)
New Revision: 9117
Modified:
trunk/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-317
Modified: trunk/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java 2010-04-14 14:52:57 UTC (rev 9116)
+++ trunk/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java 2010-04-14 17:12:52 UTC (rev 9117)
@@ -683,8 +683,10 @@
}
}
- // start sending notification *messages* only when the *remoting service* if started
- if (messagingServer == null || !messagingServer.getRemotingService().isStarted())
+ // start sending notification *messages* only when server has initialised
+ // Note at backup initialisation we don't want to send notifications either
+ // https://jira.jboss.org/jira/browse/HORNETQ-317
+ if (messagingServer == null || !messagingServer.isInitialised())
{
return;
}
14 years, 1 month
JBoss hornetq SVN: r9116 - branches/HnetQ_323_cn/docs/user-manual/zh.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2010-04-14 10:52:57 -0400 (Wed, 14 Apr 2010)
New Revision: 9116
Modified:
branches/HnetQ_323_cn/docs/user-manual/zh/management.xml
Log:
save
Modified: branches/HnetQ_323_cn/docs/user-manual/zh/management.xml
===================================================================
--- branches/HnetQ_323_cn/docs/user-manual/zh/management.xml 2010-04-14 14:29:22 UTC (rev 9115)
+++ branches/HnetQ_323_cn/docs/user-manual/zh/management.xml 2010-04-14 14:52:57 UTC (rev 9116)
@@ -402,64 +402,53 @@
</section>
</section>
<section id="management.jmx">
- <title>Using Management Via JMX</title>
- <para>HornetQ can be managed using <ulink
+ <title>使用JMX</title>
+ <para>HornetQ提供了<ulink
url="http://java.sun.com/javase/technologies/core/mntr-mgmt/javamanagement/"
- >JMX</ulink>. </para>
- <para>The management API is exposed by HornetQ using MBeans interfaces. HornetQ registers its
- resources with the domain <literal>org.hornetq</literal>.</para>
- <para>For example, the <literal>ObjectName</literal> to manage a JMS Queue <literal
- >exampleQueue</literal> is:</para>
+ >JMX</ulink>。</para>
+ <para>HornetQ通过MBean的接口暴露其JMX管理操作。它将自己的资源注册到<literal>org.hornetq</literal>域。</para>
+ <para>比如,用来管理一个名为<literal>exampleQueue</literal>JMS队列的<literal>ObjectName</literal>是:</para>
<programlisting>
org.hornetq:module=JMS,type=Queue,name="exampleQueue"
</programlisting>
- <para>and the MBean is:</para>
+ <para>MBean为:</para>
<programlisting>
org.hornetq.api.jms.management.JMSQueueControl
</programlisting>
- <para>The MBean's <literal>ObjectName</literal> are built using the helper class <literal
- >org.hornetq.api.core.management.ObjectNameBuilder</literal>. You can also use <literal
- >jconsole</literal> to find the <literal>ObjectName</literal> of the MBeans you want to
- manage. </para>
- <para>Managing HornetQ using JMX is identical to management of any Java Applications using
- JMX. It can be done by reflection or by creating proxies of the MBeans.</para>
+ <para>MBean的<literal>ObjectName</literal>用 <literal
+ >org.hornetq.api.core.management.ObjectNameBuilder</literal>来产生出来的。你也可以使用<literal
+ >jconsole</literal>来查找你想要的MBean的<literal>ObjectName</literal>。</para>
+ <para>使用JMX来管理HornetQ与用JMX管理其它Java应用程序没有什么不同。你可以使用反射或者创建MBean代理的方法。</para>
<section id="management.jmx.configuration">
- <title>Configuring JMX</title>
- <para>By default, JMX is enabled to manage HornetQ. It can be disabled by setting <literal
- >jmx-management-enabled</literal> to <literal>false</literal> in <literal
- >hornetq-configuration.xml</literal>:</para>
+ <title>配置JMX</title>
+ <para>默认情况下HornetQ的JMX是打开的。将<literal
+ >hornetq-configuration.xml</literal>文件中的<literal
+ >jmx-management-enabled</literal>设置为<literal>false</literal>就可以关闭JMX:</para>
<programlisting>
<!-- false to disable JMX management for HornetQ -->
<jmx-management-enabled>false</jmx-management-enabled>
</programlisting>
- <para>If JMX is enabled, HornetQ can be managed locally using <literal>jconsole</literal>.
- Remote connections to JMX are not enabled by default for security reasons. Please refer
- to <ulink url="http://java.sun.com/j2se/1.5.0/docs/guide/management/agent.html#remote"
- >Java Management guide</ulink> to configure the server for remote management (system
- properties must be set in <literal>run.sh</literal> or <literal>run.bat</literal>
- scripts).</para>
- <para>By default, HornetQ server uses the JMX domain "org.hornetq". To manage several
- HornetQ servers from the <emphasis>same</emphasis> MBeanServer, the JMX domain can be
- configured for each individual HornetQ server by setting <literal>jmx-domain</literal>
- in <literal>hornetq-configuration.xml</literal>: </para>
+ <para>如果JMX功能是打开的,则使用<literal>jconsole</literal>可以管理本地的HornetQ。
+ 出于安全考虑,默认情况下JMX远程连接是关闭的。参见<ulink url="http://java.sun.com/j2se/1.5.0/docs/guide/management/agent.html#remote"
+ >Java管理指南</ulink>来配置服务器的远程管理(系统变量必须在<literal>run.sh</literal>或<literal>run.bat</literal>中定义)。</para>
+ <para>HornetQ默认使用JMX域名"org.hornetq"。如果要用一个MBeanServer管理多个HornetQ服务器,可以将每个HornetQ
+ 服务器配置成不同的JMX域。方法就是在<literal>hornetq-configuration.xml</literal>文件中设置<literal>jmx-domain</literal>:</para>
<programlisting>
<!-- use a specific JMX domain for HornetQ MBeans -->
<jmx-domain>my.org.hornetq</jmx-domain>
</programlisting>
<section>
- <title>MBeanServer configuration</title>
- <para>When HornetQ is run in standalone, it uses the Java Virtual Machine's <literal
- >Platform MBeanServer</literal> to register its MBeans. This is configured in
- JBoss Microcontainer Beans file (see <xref
- linkend="server.microcontainer.configuration"/>):</para>
+ <title>MBeanServer的配置</title>
+ <para>HornetQ在独立运行时使用Java虚拟机的<literal
+ >Platform MBeanServer</literal>来注册其MBean。这在JBoss Microcontainer(微容器)的bean
+ 文件中进行配置(参见<xref linkend="server.microcontainer.configuration"/>):</para>
<programlisting><!-- MBeanServer -->
<bean name="MBeanServer" class="javax.management.MBeanServer">
<constructor factoryClass="java.lang.management.ManagementFactory"
factoryMethod="getPlatformMBeanServer" />
</bean>
</programlisting>
- <para>When it is integrated in JBoss AS 5+, it uses the Application Server's own MBean
- Server so that it can be managed using AS 5's jmx-console:</para>
+ <para>当与AS 5+集成运行时,它使用应用服务器自己的MBean服务,这样就可以使用它的jmx-console:</para>
<programlisting><!-- MBeanServer -->
<bean name="MBeanServer" class="javax.management.MBeanServer">
<constructor factoryClass="org.jboss.mx.util.MBeanServerLocator"
@@ -469,63 +458,53 @@
</section>
</section>
<section>
- <title>Example</title>
- <para>See <xref linkend="examples.jmx"/> for an example which shows how to use a remote
- connection to JMX and MBean proxies to manage HornetQ.</para>
+ <title>例子</title>
+ <para>参见<xref linkend="examples.jmx"/>,这个例子展示了如何使用远程JMX连接或MBean代理来管理HornetQ。</para>
</section>
</section>
<section>
- <title>Using Management Via Core API</title>
- <para>The core management API in HornetQ is called by sending Core messages to a special
- address, the <emphasis>management address</emphasis>.</para>
- <para><emphasis>Management messages</emphasis> are regular Core messages with well-known
- properties that the server needs to understand to interact with the management API:</para>
+ <title>使用核心接口</title>
+ <para>核心管理接口的调用实际上是向一个特殊的地址发送核心消息。这个特殊地址称为<emphasis>管理地址</emphasis>。</para>
+ <para><emphasis>管理消息</emphasis>是一些定义了一些固定属性的普通核心消息。服务器通过这些属性来解释管理操作:</para>
<itemizedlist>
<listitem>
- <para>The name of the managed resource</para>
+ <para>管理资源的名称</para>
</listitem>
<listitem>
- <para>The name of the management operation</para>
+ <para>管理操作的名称</para>
</listitem>
<listitem>
- <para>The parameters of the management operation</para>
+ <para>管理操作的参数</para>
</listitem>
</itemizedlist>
- <para>When such a management message is sent to the management address, HornetQ server will
- handle it, extract the information, invoke the operation on the managed resources and send
- a <emphasis>management reply</emphasis> to the management message's reply-to address
- (specified by <literal>ClientMessageImpl.REPLYTO_HEADER_NAME</literal>). </para>
- <para>A <literal>ClientConsumer</literal> can be used to consume the management reply and
- retrieve the result of the operation (if any) stored in the reply's body. For portability,
- results are returned as a <ulink url="http://json.org">JSON</ulink> String rather than Java
- Serialization (the <literal>org.hornetq.api.core.management.ManagementHelper</literal> can
- be used to convert the JSON string to Java objects).</para>
- <para>These steps can be simplified to make it easier to invoke management operations using
- Core messages:</para>
+ <para>当一个管理消息发送到管理地址时,HornetQ服务器将从中提取出相应的信息,再调用相应的管理资源的方法,之后向
+ 该管理消息的回答地址(reply-to address,由<literal>ClientMessageImpl.REPLYTO_HEADER_NAME
+ </literal>定义)发送一个<emphasis>管理回答</emphasis>。</para>
+ <para>一个<literal>ClientConsumer</literal>用来接收管理回答并提取出其中的操作的結果(如果有的话)。
+ 考虑到可移植性,返回的結果采用的是格式的字符串,而没有采用Java的序列化技术
+ (<literal>org.hornetq.api.core.management.ManagementHelper</literal>可以用来将JSON字符串
+ 转换成Java对象)。</para>
+ <para>使用以下步骤可以简化使用核心消息调用管理操作:</para>
<orderedlist>
<listitem>
- <para>Create a <literal>ClientRequestor</literal> to send messages to the management
- address and receive replies</para>
+ <para>创建一个<literal>ClientRequestor</literal>对象,用来发送管理消息并接收回答。</para>
</listitem>
<listitem>
- <para>Create a <literal>ClientMessage</literal></para>
+ <para>创建一个<literal>ClientMessage</literal>。</para>
</listitem>
<listitem>
- <para>Use the helper class <literal
- >org.hornetq.api.core.management.ManagementHelper</literal> to fill the message
- with the management properties</para>
+ <para>使用<literal
+ >org.hornetq.api.core.management.ManagementHelper</literal>类来帮助设置消息的管理参数。</para>
</listitem>
<listitem>
- <para>Send the message using the <literal>ClientRequestor</literal></para>
+ <para>通过<literal>ClientRequestor</literal>将消息发送</para>
</listitem>
<listitem>
- <para>Use the helper class <literal
- >org.hornetq.api.core.management.ManagementHelper</literal> to retrieve the
- operation result from the management reply</para>
+ <para>使用 <literal
+ >org.hornetq.api.core.management.ManagementHelper</literal>类从管理操作結果中提取返回值。</para>
</listitem>
</orderedlist>
- <para>For example, to find out the number of messages in the core queue <literal
- >exampleQueue</literal>:</para>
+ <para>例如,要得到核心队列<literal>exampleQueue</literal>中消息的数量:</para>
<programlisting>
ClientSession session = ...
ClientRequestor requestor = new ClientRequestor(session, "jms.queue.hornetq.management");
@@ -535,25 +514,22 @@
int count = (Integer) ManagementHelper.getResult(reply);
System.out.println("There are " + count + " messages in exampleQueue");
</programlisting>
- <para>Management operation name and parameters must conform to the Java interfaces defined in
- the <literal>management</literal> packages.</para>
- <para>Names of the resources are built using the helper class <literal
- >org.hornetq.api.core.management.ResourceNames</literal> and are straightforward
- (<literal>core.queue.exampleQueue</literal> for the Core Queue <literal
- >exampleQueue</literal>, <literal>jms.topic.exampleTopic</literal> for the JMS Topic
- <literal>exampleTopic</literal>, etc.).</para>
+ <para>管理操作名及其参数必须和<literal>management</literal>包中定义的Java接口一致。</para>
+ <para>资源的名称是用<literal>org.hornetq.api.core.management.ResourceNames</literal>类来生成的,
+ 命名都非常直观(如核心队列<literal>exampleQueue</literal>的名称为<literal>core.queue.exampleQueue</literal>,
+ JMS Topic <literal>exampleTopic</literal>的名称为<literal>jms.topic.exampleTopic</literal>,等等)。
+</para>
<section id="management.core.configuration">
- <title>Configuring Core Management</title>
- <para>The management address to send management messages is configured in <literal
- >hornetq-configuration.xml</literal>:</para>
+ <title>配置核心管理</title>
+ <para>管理地址的配置在文件<literal
+ >hornetq-configuration.xml</literal>中:</para>
<programlisting>
<management-address>jms.queue.hornetq.management</management-address>
</programlisting>
- <para>By default, the address is <literal>jms.queue.hornetq.management</literal> (it is
- prepended by "jms.queue" so that JMS clients can also send management messages).</para>
- <para>The management address requires a <emphasis>special</emphasis> user permission
- <literal>manage</literal> to be able to receive and handle management messages. This
- is also configured in hornetq-configuration.xml:</para>
+ <para>它的默认地址是<literal>jms.queue.hornetq.management</literal> (地址前缀加上
+ “jms.queue”是为了方便JMS客户端也可以向它发送管理消息。</para>
+ <para>管理地址需要一个<emphasis>特殊</emphasis>的用户权限
+ <literal>manage</literal>来接收并处理管理消息。这个权限也在hornetq-configuration.xml文件中配置:</para>
<programlisting>
<!-- users with the admin role will be allowed to manage -->
<!-- HornetQ using management messages -->
@@ -564,41 +540,32 @@
</section>
</section>
<section id="management.jms">
- <title>Using Management Via JMS</title>
- <para>Using JMS messages to manage HornetQ is very similar to using core API.</para>
- <para>An important difference is that JMS requires a JMS queue to send the messages to
- (instead of an address for the core API).</para>
- <para>The <emphasis>management queue</emphasis> is a special queue and needs to be
- instantiated directly by the client:</para>
+ <title>使用JMS进行管理</title>
+ <para>使用JMS管理HornetQ与使用核心API管理HornetQ十分相似。</para>
+ <para>其中一个重要的不同是JMS需要一个JMS队列来发送消息(而核心接口使用的是一个地址)。</para>
+ <para><emphasis>管理队列</emphasis>是一个特殊的队列,它需要客户端直接实例化:</para>
<programlisting>
Queue managementQueue = HornetQJMSClient.createQueue("hornetq.management");
</programlisting>
- <para>All the other steps are the same than for the Core API but they use JMS API
- instead:</para>
+ <para>其余步骤完全和使用核心接口一样,只是相应的对象不同:</para>
<orderedlist>
<listitem>
- <para>create a <literal>QueueRequestor</literal> to send messages to the management
- address and receive replies</para>
+ <para>创建一个<literal>QueueRequestor</literal>来向管理地址发送管理消息并接收回答。</para>
</listitem>
<listitem>
- <para>create a <literal>Message</literal></para>
+ <para>创建一个<literal>消息</literal></para>
</listitem>
<listitem>
- <para>use the helper class <literal
- >org.hornetq.api.jms.management.JMSManagementHelper</literal> to fill the message
- with the management properties</para>
+ <para>使用 <literal>org.hornetq.api.jms.management.JMSManagementHelper</literal>类向消息中设置管理参数。</para>
</listitem>
<listitem>
- <para>send the message using the <literal>QueueRequestor</literal></para>
+ <para>再使用<literal>QueueRequestor</literal>发送消息。</para>
</listitem>
<listitem>
- <para>use the helper class <literal
- >org.hornetq.api.jms.management.JMSManagementHelper</literal> to retrieve the
- operation result from the management reply</para>
+ <para>使用<literal>org.hornetq.api.jms.management.JMSManagementHelper</literal>来从回答中提取返回結果。</para>
</listitem>
</orderedlist>
- <para>For example, to know the number of messages in the JMS queue <literal
- >exampleQueue</literal>:</para>
+ <para>例如,要得到一个JMS队列<literal>exampleQueue</literal>中有多少消息:</para>
<programlisting>
Queue managementQueue = HornetQJMSClient.createQueue("hornetq.management");
@@ -612,14 +579,12 @@
System.out.println("There are " + count + " messages in exampleQueue");
</programlisting>
<section>
- <title>Configuring JMS Management</title>
- <para>Whether JMS or the core API is used for management, the configuration steps are the
- same (see <xref linkend="management.core.configuration"/>).</para>
+ <title>配置JMS管理</title>
+ <para>JMS管理的配置与核心接口管理的配置步骤是一样的(参见<xref linkend="management.core.configuration"/>)。</para>
</section>
<section>
- <title>Example</title>
- <para>See <xref linkend="examples.management"/> for an example which shows how to use JMS
- messages to manage HornetQ server.</para>
+ <title>例子</title>
+ <para>参见<xref linkend="examples.management"/>,它展示了如何使用JMS消息来管理HornetQ。</para>
</section>
</section>
14 years, 1 month
JBoss hornetq SVN: r9115 - trunk/tests/src/org/hornetq/tests/integration/cluster/distribution.
by do-not-reply@jboss.org
Author: timfox
Date: 2010-04-14 10:29:22 -0400 (Wed, 14 Apr 2010)
New Revision: 9115
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-359
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java 2010-04-14 13:03:49 UTC (rev 9114)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java 2010-04-14 14:29:22 UTC (rev 9115)
@@ -595,7 +595,44 @@
verifyReceiveAll(QueueImpl.REDISTRIBUTOR_BATCH_SIZE * 2, 1);
}
+
+ /*
+ * Start one node with no consumers and send some messages
+ * Start another node add a consumer and verify all messages are redistribute
+ * https://jira.jboss.org/jira/browse/HORNETQ-359
+ */
+ public void testRedistributionWhenNewNodeIsAddedWithConsumer() throws Exception
+ {
+ setupCluster(false);
+ startServers(0);
+
+ setupSessionFactory(0, isNetty());
+
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+
+ waitForBindings(0, "queues.testaddress", 1, 0, true);
+
+ send(0, "queues.testaddress", 20, false, null);
+
+ //Now bring up node 1
+
+ startServers(1);
+
+ createQueue(1, "queues.testaddress", "queue0", null, false);
+
+ waitForBindings(1, "queues.testaddress", 1, 0, true);
+ waitForBindings(0, "queues.testaddress", 1, 0, false);
+
+ addConsumer(0, 1, "queue0", null);
+
+ verifyReceiveAll(20, 0);
+ verifyNotReceive(0);
+ }
+
protected void setupCluster(final boolean forwardWhenNoConsumers) throws Exception
{
setupClusterConnection("cluster0", "queues", forwardWhenNoConsumers, 1, isNetty(), 0, 1, 2);
14 years, 1 month
JBoss hornetq SVN: r9114 - trunk/docs/user-manual/en.
by do-not-reply@jboss.org
Author: timfox
Date: 2010-04-14 09:03:49 -0400 (Wed, 14 Apr 2010)
New Revision: 9114
Modified:
trunk/docs/user-manual/en/clusters.xml
Log:
https://jira.jboss.org/jira/browse/HORNETQ-342
Modified: trunk/docs/user-manual/en/clusters.xml
===================================================================
--- trunk/docs/user-manual/en/clusters.xml 2010-04-14 12:28:10 UTC (rev 9113)
+++ trunk/docs/user-manual/en/clusters.xml 2010-04-14 13:03:49 UTC (rev 9114)
@@ -1,5 +1,4 @@
<?xml version="1.0" encoding="UTF-8"?>
-
<!-- ============================================================================= -->
<!-- Copyright © 2009 Red Hat, Inc. and others. -->
<!-- -->
@@ -17,26 +16,24 @@
<!-- and agrees not to assert, Section 4d of CC-BY-SA to the fullest extent -->
<!-- permitted by applicable law. -->
<!-- ============================================================================= -->
-
<chapter id="clusters">
- <title>Clusters</title>
+ <title>Clusters</title>
<section>
<title>Clusters Overview</title>
- <para>HornetQ clusters allow groups of HornetQ servers to be grouped
- together in order to share message processing load. Each active node in the cluster is
- an active HornetQ server which manages its own messages and handles its own
- connections. A server must be configured to be clustered, you will need to set the
- <literal>clustered</literal> element in the <literal>hornetq-configuration.xml</literal>
- configuration file to <literal>true</literal>, this is <literal>false</literal> by
- default.</para>
+ <para>HornetQ clusters allow groups of HornetQ servers to be grouped together in order to
+ share message processing load. Each active node in the cluster is an active HornetQ
+ server which manages its own messages and handles its own connections. A server must be
+ configured to be clustered, you will need to set the <literal>clustered</literal>
+ element in the <literal>hornetq-configuration.xml</literal> configuration file to
+ <literal>true</literal>, this is <literal>false</literal> by default.</para>
<para>The cluster is formed by each node declaring <emphasis>cluster connections</emphasis>
- to other nodes in the core configuration file <literal>hornetq-configuration.xml</literal>.
- When a node forms a cluster connection to another node, internally it creates a <emphasis>core
- bridge</emphasis> (as described in <xref
- linkend="core-bridges" />) connection between it and the other node,
- this is done transparently behind the scenes - you don't have to declare an explicit
- bridge for each node. These cluster connections allow messages to flow between the nodes
- of the cluster to balance load.</para>
+ to other nodes in the core configuration file <literal
+ >hornetq-configuration.xml</literal>. When a node forms a cluster connection to
+ another node, internally it creates a <emphasis>core bridge</emphasis> (as described in
+ <xref linkend="core-bridges"/>) connection between it and the other node, this is
+ done transparently behind the scenes - you don't have to declare an explicit bridge for
+ each node. These cluster connections allow messages to flow between the nodes of the
+ cluster to balance load.</para>
<para>Nodes can be connected together to form a cluster in many different topologies, we
will discuss a couple of the more common topologies later in this chapter.</para>
<para>We'll also discuss client side load balancing, where we can balance client connections
@@ -74,19 +71,21 @@
<para>A broadcast group is the means by which a server broadcasts connectors over the
network. A connector defines a way in which a client (or other server) can make
connections to the server. For more information on what a connector is, please see
- <xref linkend="configuring-transports" />.</para>
+ <xref linkend="configuring-transports"/>.</para>
<para>The broadcast group takes a set of connector pairs, each connector pair contains
connection settings for a live and (optional) backup server and broadcasts them on
the network. It also defines the UDP address and port settings. </para>
<para>Broadcast groups are defined in the server configuration file <literal
- >hornetq-configuration.xml</literal>. There can be many broadcast groups per HornetQ
- server. All broadcast groups must be defined in a <literal
+ >hornetq-configuration.xml</literal>. There can be many broadcast groups per
+ HornetQ server. All broadcast groups must be defined in a <literal
>broadcast-groups</literal> element.</para>
<para>Let's take a look at an example broadcast group from <literal
>hornetq-configuration.xml</literal>:</para>
<programlisting><broadcast-groups>
- <broadcast-group name="my-broadcast-group">
- <local-bind-port>54321</local-bind-port>
+ <broadcast-group name="my-broadcast-group"></programlisting>
+ <programlisting>
+ <local-bind-address>172.16.9.3</local-bind-address>
+ <local-bind-port>5432</local-bind-port>
<group-address>231.7.7.7</group-address>
<group-port>9876</group-port>
<broadcast-period>1000</broadcast-period>
@@ -103,18 +102,19 @@
have a unique name. </para>
</listitem>
<listitem>
- <para><literal>local-bind-address</literal>. This is the local bind
- address that the datagram socket is bound to. If you have multiple network
- interfaces on your server, you would specify which one you wish to use for
- broadcasts by setting this property. If this property is not specified then
- the socket will be bound to the wildcard address, an IP address chosen by
- the kernel.</para>
+ <para><literal>local-bind-address</literal>. This is the local bind address that
+ the datagram socket is bound to. If you have multiple network interfaces on
+ your server, you would specify which one you wish to use for broadcasts by
+ setting this property. If this property is not specified then the socket
+ will be bound to the wildcard address, an IP address chosen by the
+ kernel.</para>
</listitem>
<listitem>
<para><literal>local-bind-port</literal>. If you want to specify a local port to
which the datagram socket is bound you can specify it here. Normally you
would just use the default value of <literal>-1</literal> which signifies
- that an anonymous port should be used.</para>
+ that an anonymous port should be used. This parameter is alawys specified in conjunction with
+ <literal>local-bind-address</literal>.</para>
</listitem>
<listitem>
<para><literal>group-address</literal>. This is the multicast address to which
@@ -135,11 +135,12 @@
<listitem>
<para><literal>connector-ref</literal>. This specifies the connector and
optional backup connector that will be broadcasted (see <xref
- linkend="configuring-transports" /> for more information on
- connectors). The connector to be broadcasted is specified by the <literal
+ linkend="configuring-transports"/> for more information on connectors).
+ The connector to be broadcasted is specified by the <literal
>connector-name</literal> attribute, and the backup connector to be
- broadcasted is specified by the <literal>backup-connector</literal> attribute.
- The <literal>backup-connector</literal> attribute is optional.</para>
+ broadcasted is specified by the <literal>backup-connector</literal>
+ attribute. The <literal>backup-connector</literal> attribute is
+ optional.</para>
</listitem>
</itemizedlist>
</section>
@@ -168,12 +169,13 @@
<section>
<title>Defining Discovery Groups on the Server</title>
<para>For cluster connections, discovery groups are defined in the server side
- configuration file <literal>hornetq-configuration.xml</literal>. All discovery groups
- must be defined inside a <literal>discovery-groups</literal> element. There can be
- many discovery groups defined by HornetQ server. Let's look at an
+ configuration file <literal>hornetq-configuration.xml</literal>. All discovery
+ groups must be defined inside a <literal>discovery-groups</literal> element. There
+ can be many discovery groups defined by HornetQ server. Let's look at an
example:</para>
<programlisting><discovery-groups>
<discovery-group name="my-discovery-group">
+ <local-bind-address>172.16.9.7</local-bind-address>
<group-address>231.7.7.7</group-address>
<group-port>9876</group-port>
<refresh-timeout>10000</refresh-timeout>
@@ -186,6 +188,11 @@
name per server.</para>
</listitem>
<listitem>
+ <para><literal>local-bind-address</literal>. If you are running with multiple network interfaces on the same machine, you
+ may want to specify that the discovery group listens only only a specific interface. To do this you can specify the interface
+ address with this parameter. This parameter is optional.</para>
+ </listitem>
+ <listitem>
<para><literal>group-address</literal>. This is the multicast ip address of the
group to listen on. It should match the <literal>group-address</literal> in
the broadcast group that you wish to listen from. This parameter is
@@ -211,9 +218,9 @@
</section>
<section id="clusters-discovery.groups.clientside">
<title>Discovery Groups on the Client Side</title>
- <para>Let's discuss how to configure a HornetQ client to use discovery to
- discover a list of servers to which it can connect. The way to do this differs
- depending on whether you're using JMS or the core API.</para>
+ <para>Let's discuss how to configure a HornetQ client to use discovery to discover a
+ list of servers to which it can connect. The way to do this differs depending on
+ whether you're using JMS or the core API.</para>
<section>
<title>Configuring client discovery using JMS</title>
<para>If you're using JMS and you're also using the JMS Service on the server to
@@ -271,8 +278,7 @@
ClientSession session1 = factory.createClientSession(...); ClientSession
session2 = factory.createClientSession(...);
- </programlisting>
- </para>
+ </programlisting></para>
<para>The <literal>refresh-timeout</literal> can be set directly on the session
factory by using the setter method <literal>setDiscoveryRefreshTimeout() if you
want to change the default value.</literal></para>
@@ -288,12 +294,12 @@
</section>
<section>
<title>Server-Side Message Load Balancing</title>
- <para>If cluster connections are defined between nodes of a cluster, then HornetQ
- will load balance messages arriving at a particular node from a client.</para>
+ <para>If cluster connections are defined between nodes of a cluster, then HornetQ will load
+ balance messages arriving at a particular node from a client.</para>
<para>Let's take a simple example of a cluster of four nodes A, B, C, and D arranged in a
- <emphasis>symmetric cluster</emphasis> (described in <xref linkend="symmetric-cluster" />).
- We have a queue called
- <literal>OrderQueue</literal> deployed on each node of the cluster.</para>
+ <emphasis>symmetric cluster</emphasis> (described in <xref
+ linkend="symmetric-cluster"/>). We have a queue called <literal>OrderQueue</literal>
+ deployed on each node of the cluster.</para>
<para>We have client Ca connected to node A, sending orders to the server. We have also have
order processor clients Pa, Pb, Pc, and Pd connected to each of the nodes A, B, C, D. If
no cluster connection was defined on node A, then as order messages arrive on node A
@@ -307,20 +313,20 @@
<para>For example, messages arriving on node A might be distributed in the following order
between the nodes: B, D, C, A, B, D, C, A, B, D. The exact order depends on the order
the nodes started up, but the algorithm used is round robin.</para>
- <para>HornetQ cluster connections can be configured to always blindly load balance
- messages in a round robin fashion irrespective of whether there are any matching
- consumers on other nodes, but they can be a bit cleverer than that and also be
- configured to only distribute to other nodes if they have matching consumers. We'll look
- at both these cases in turn with some examples, but first we'll discuss configuring
- cluster connections in general.</para>
+ <para>HornetQ cluster connections can be configured to always blindly load balance messages
+ in a round robin fashion irrespective of whether there are any matching consumers on
+ other nodes, but they can be a bit cleverer than that and also be configured to only
+ distribute to other nodes if they have matching consumers. We'll look at both these
+ cases in turn with some examples, but first we'll discuss configuring cluster
+ connections in general.</para>
<section id="clusters.cluster-connections">
<title>Configuring Cluster Connections</title>
<para>Cluster connections group servers into clusters so that messages can be load
balanced between the nodes of the cluster. Let's take a look at a typical cluster
connection. Cluster connections are always defined in <literal
- >hornetq-configuration.xml</literal> inside a <literal>cluster-connection</literal>
- element. There can be zero or more cluster connections defined per HornetQ
- server.</para>
+ >hornetq-configuration.xml</literal> inside a <literal
+ >cluster-connection</literal> element. There can be zero or more cluster
+ connections defined per HornetQ server.</para>
<programlisting>
<cluster-connections>
<cluster-connection name="my-cluster">
@@ -364,7 +370,7 @@
the same way as a bridge does.</para>
<para>This parameter determines the interval in milliseconds between retry
attempts. It has the same meaning as the <literal>retry-interval</literal>
- on a bridge (as described in <xref linkend="core-bridges" />).</para>
+ on a bridge (as described in <xref linkend="core-bridges"/>).</para>
<para>This parameter is optional and its default value is <literal>500</literal>
milliseconds.</para>
</listitem>
@@ -394,11 +400,11 @@
<emphasis>not</emphasis> forward messages to other nodes if there are no
<emphasis>queues</emphasis> of the same name on the other nodes, even if
this parameter is set to <literal>true</literal>.</para>
- <para>If this is set to <literal>false</literal> then HornetQ will only
- forward messages to other nodes of the cluster if the address to which they
- are being forwarded has queues which have consumers, and if those consumers
- have message filters (selectors) at least one of those selectors must match
- the message.</para>
+ <para>If this is set to <literal>false</literal> then HornetQ will only forward
+ messages to other nodes of the cluster if the address to which they are
+ being forwarded has queues which have consumers, and if those consumers have
+ message filters (selectors) at least one of those selectors must match the
+ message.</para>
<para>This parameter is optional, the default value is <literal
>false</literal>.</para>
</listitem>
@@ -407,14 +413,14 @@
nodes to which it might load balance a message, those nodes do not have to
be directly connected to it via a cluster connection. HornetQ can be
configured to also load balance messages to nodes which might be connected
- to it only indirectly with other HornetQ servers as intermediates in
- a chain.</para>
- <para>This allows HornetQ to be configured in more complex topologies
- and still provide message load balancing. We'll discuss this more later in
- this chapter.</para>
+ to it only indirectly with other HornetQ servers as intermediates in a
+ chain.</para>
+ <para>This allows HornetQ to be configured in more complex topologies and still
+ provide message load balancing. We'll discuss this more later in this
+ chapter.</para>
<para>The default value for this parameter is <literal>1</literal>, which means
- messages are only load balanced to other HornetQ serves which are
- directly connected to this server. This parameter is optional.</para>
+ messages are only load balanced to other HornetQ serves which are directly
+ connected to this server. This parameter is optional.</para>
</listitem>
<listitem>
<para><literal>discovery-group-ref</literal>. This parameter determines which
@@ -425,28 +431,30 @@
</section>
<section id="clusters.clusteruser">
<title>Cluster User Credentials</title>
-
<para>When creating connections between nodes of a cluster to form a cluster connection,
- HornetQ uses a cluster user and cluster password which is defined in <literal>hornetq-configuration.xml</literal>:</para>
+ HornetQ uses a cluster user and cluster password which is defined in <literal
+ >hornetq-configuration.xml</literal>:</para>
<programlisting>
<cluster-user>HORNETQ.CLUSTER.ADMIN.USER</cluster-user>
<cluster-password>CHANGE ME!!</cluster-password>
</programlisting>
- <warning><para>It is imperative that these values are changed from their default, or remote clients will be able to make connections
- to the server using the default values. If they are not
- changed from the default, HornetQ will detect this and pester you with a warning on every
- start-up.</para></warning>
+ <warning>
+ <para>It is imperative that these values are changed from their default, or remote
+ clients will be able to make connections to the server using the default values.
+ If they are not changed from the default, HornetQ will detect this and pester
+ you with a warning on every start-up.</para>
+ </warning>
</section>
</section>
<section id="clusters.client.loadbalancing">
<title>Client-Side Load balancing</title>
- <para>With HornetQ client-side load balancing, subsequent
- sessions created using a single session factory can be connected to different nodes of the
- cluster. This allows sessions to spread smoothly across the nodes of a cluster and
- not be "clumped" on any particular node.</para>
+ <para>With HornetQ client-side load balancing, subsequent sessions created using a single
+ session factory can be connected to different nodes of the cluster. This allows sessions
+ to spread smoothly across the nodes of a cluster and not be "clumped" on any particular
+ node.</para>
<para>The load balancing policy to be used by the client factory is configurable. HornetQ
- provides two out-of-the-box load balancing policies and you can also implement
- your own and use that.</para>
+ provides two out-of-the-box load balancing policies and you can also implement your own
+ and use that.</para>
<para>The out-of-the-box policies are</para>
<itemizedlist>
<listitem>
@@ -541,8 +549,8 @@
<literal>hornetq-configuration.xml</literal> which will be used as a live
connector. The <literal>backup-connector-name</literal> is optional, and if
specified it also references a connector defined in <literal
- >hornetq-configuration.xml</literal>. For more information on connectors please
- see <xref linkend="configuring-transports" />.</para>
+ >hornetq-configuration.xml</literal>. For more information on connectors
+ please see <xref linkend="configuring-transports"/>.</para>
<para>The connection factory thus maintains a list of [connector, backup connector]
pairs, these pairs are then used by the client connection load balancing policy
on the client side when creating connections to the cluster.</para>
@@ -596,8 +604,8 @@
<para>In the above snippet we create a list of pairs of <literal
>TransportConfiguration</literal> objects. Each <literal
>TransportConfiguration</literal> object contains knowledge of how to make a
- connection to a specific server. For more information on this, please see
- <xref linkend="configuring-transports" />.</para>
+ connection to a specific server. For more information on this, please see <xref
+ linkend="configuring-transports"/>.</para>
<para>A <literal>ClientSessionFactoryImpl</literal> instance is then created passing
the list of servers in the constructor. Any sessions subsequently created by
this factory will create sessions according to the client connection load
@@ -629,12 +637,13 @@
>connector-name</literal> attribute references a connector defined in <literal
>hornetq-configuration.xml</literal> which will be used as a live connector. The
<literal>backup-connector-name</literal> is optional, and if specified it also
- references a connector defined in <literal>hornetq-configuration.xml</literal>. For more
- information on connectors please see <xref linkend="configuring-transports" />.</para>
+ references a connector defined in <literal>hornetq-configuration.xml</literal>. For
+ more information on connectors please see <xref linkend="configuring-transports"
+ />.</para>
<note>
- <para>Due to a limitation in HornetQ 2.0.0, failover is not supported for clusters
- defined using a static set of nodes. To support failover over cluster nodes, they
- must be configured to use a discovery group.</para>
+ <para>Due to a limitation in HornetQ 2.0.0, failover is not supported for clusters
+ defined using a static set of nodes. To support failover over cluster nodes,
+ they must be configured to use a discovery group.</para>
</note>
</section>
</section>
@@ -648,17 +657,17 @@
it doesn't solve: What happens if the consumers on a queue close after the messages have
been sent to the node? If there are no consumers on the queue the message won't get
consumed and we have a <emphasis>starvation</emphasis> situation.</para>
- <para>This is where message redistribution comes in. With message redistribution HornetQ
- can be configured to automatically <emphasis>redistribute</emphasis> messages
- from queues which have no consumers back to other nodes in the cluster which do have
- matching consumers.</para>
+ <para>This is where message redistribution comes in. With message redistribution HornetQ can
+ be configured to automatically <emphasis>redistribute</emphasis> messages from queues
+ which have no consumers back to other nodes in the cluster which do have matching
+ consumers.</para>
<para>Message redistribution can be configured to kick in immediately after the last
consumer on a queue is closed, or to wait a configurable delay after the last consumer
on a queue is closed before redistributing. By default message redistribution is
disabled.</para>
<para>Message redistribution can be configured on a per address basis, by specifying the
redistribution delay in the address settings, for more information on configuring
- address settings, please see <xref linkend="queue-attributes" />.</para>
+ address settings, please see <xref linkend="queue-attributes"/>.</para>
<para>Here's an address settings snippet from <literal>hornetq-configuration.xml</literal>
showing how message redistribution is enabled for a set of queues:</para>
<programlisting><address-settings>
@@ -672,8 +681,8 @@
to addresses that start with "jms.", so the above would enable instant (no delay)
redistribution for all JMS queues and topic subscriptions.</para>
<para>The attribute <literal>match</literal> can be an exact match or it can be a string
- that conforms to the HornetQ wildcard syntax (described in <xref linkend="wildcard-syntax"
- />).</para>
+ that conforms to the HornetQ wildcard syntax (described in <xref
+ linkend="wildcard-syntax"/>).</para>
<para>The element <literal>redistribution-delay</literal> defines the delay in milliseconds
after the last consumer is closed on a queue before redistributing messages from that
queue to other nodes of the cluster which do have matching consumers. A delay of zero
14 years, 1 month
JBoss hornetq SVN: r9113 - trunk.
by do-not-reply@jboss.org
Author: timfox
Date: 2010-04-14 08:28:10 -0400 (Wed, 14 Apr 2010)
New Revision: 9113
Modified:
trunk/build-hornetq.xml
Log:
fixed test mask
Modified: trunk/build-hornetq.xml
===================================================================
--- trunk/build-hornetq.xml 2010-04-14 12:26:06 UTC (rev 9112)
+++ trunk/build-hornetq.xml 2010-04-14 12:28:10 UTC (rev 9113)
@@ -161,7 +161,7 @@
<property name="thirdparty.dir" value="thirdparty"/>
<property name="test.build.dir" value="${test.dir}/build"/>
<property name="test.src.dir" value="${test.dir}/src"/>
- <property name="test-mask" value="DiscoveryTest"/>
+ <property name="test-mask" value="*Test"/>
<property name="test.classes.dir" value="${test.build.dir}/classes"/>
<property name="test.output.dir" value="${test.dir}/build"/>
<property name="test.reports.dir" value="${test.output.dir}/reports"/>
14 years, 1 month
JBoss hornetq SVN: r9112 - in trunk: tests/src/org/hornetq/tests/integration/discovery and 1 other directory.
by do-not-reply@jboss.org
Author: timfox
Date: 2010-04-14 08:26:06 -0400 (Wed, 14 Apr 2010)
New Revision: 9112
Modified:
trunk/build-hornetq.xml
trunk/tests/src/org/hornetq/tests/integration/discovery/DiscoveryTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-342
Modified: trunk/build-hornetq.xml
===================================================================
--- trunk/build-hornetq.xml 2010-04-14 12:09:00 UTC (rev 9111)
+++ trunk/build-hornetq.xml 2010-04-14 12:26:06 UTC (rev 9112)
@@ -161,7 +161,7 @@
<property name="thirdparty.dir" value="thirdparty"/>
<property name="test.build.dir" value="${test.dir}/build"/>
<property name="test.src.dir" value="${test.dir}/src"/>
- <property name="test-mask" value="*Test"/>
+ <property name="test-mask" value="DiscoveryTest"/>
<property name="test.classes.dir" value="${test.build.dir}/classes"/>
<property name="test.output.dir" value="${test.dir}/build"/>
<property name="test.reports.dir" value="${test.output.dir}/reports"/>
Modified: trunk/tests/src/org/hornetq/tests/integration/discovery/DiscoveryTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/discovery/DiscoveryTest.java 2010-04-14 12:09:00 UTC (rev 9111)
+++ trunk/tests/src/org/hornetq/tests/integration/discovery/DiscoveryTest.java 2010-04-14 12:26:06 UTC (rev 9112)
@@ -130,9 +130,11 @@
//We need to choose a real NIC on the local machine - note this will silently pass if the machine
//has no usable NIC!
- NetworkInterface ni = null;
Enumeration<NetworkInterface> networkInterfaces = NetworkInterface.getNetworkInterfaces();
- while (networkInterfaces.hasMoreElements())
+
+ InetAddress localAddress = null;
+
+ outer: while (networkInterfaces.hasMoreElements())
{
NetworkInterface networkInterface = networkInterfaces.nextElement();
if (networkInterface.isLoopback() || networkInterface.isVirtual() || !networkInterface.isUp() ||
@@ -141,21 +143,28 @@
continue;
}
- ni = networkInterface;
+ Enumeration<InetAddress> en = networkInterface.getInetAddresses();
- break;
-
+ while (en.hasMoreElements())
+ {
+ InetAddress ia = en.nextElement();
+
+ if (ia.getAddress().length == 4)
+ {
+ localAddress = ia;
+
+ break outer;
+ }
+ }
}
- if (ni == null)
+ if (localAddress == null)
{
- log.warn("Can't find NIC");
+ log.warn("Can't find address to use");
return;
}
- InetAddress localAddress = ni.getInetAddresses().nextElement();
-
log.info("Local address is " + localAddress);
BroadcastGroup bg = new BroadcastGroupImpl(nodeID,
14 years, 1 month