[hornetq-commits] JBoss hornetq SVN: r12192 - in trunk: tests/integration-tests/src/test/java/org/hornetq/tests/integration/client and 2 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Fri Feb 24 11:04:39 EST 2012
Author: borges
Date: 2012-02-24 11:04:38 -0500 (Fri, 24 Feb 2012)
New Revision: 12192
Modified:
trunk/hornetq-core/src/test/java/org/hornetq/tests/util/UnitTestCase.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompV11Test.java
Log:
Improve tearDown code.
Modified: trunk/hornetq-core/src/test/java/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- trunk/hornetq-core/src/test/java/org/hornetq/tests/util/UnitTestCase.java 2012-02-24 16:04:09 UTC (rev 12191)
+++ trunk/hornetq-core/src/test/java/org/hornetq/tests/util/UnitTestCase.java 2012-02-24 16:04:38 UTC (rev 12192)
@@ -948,9 +948,9 @@
try
{
- assertAllClientConsumersAreClosed();
+ assertAllClientConsumersAreClosed();
assertAllClientProducersAreClosed();
- assertAllClientSessionsAreClosed();
+ assertAllClientSessionsAreClosed();
}
finally
{
@@ -1534,15 +1534,14 @@
{
if (component == null)
return;
- if (component.isStarted())
- try
- {
- component.stop();
- }
- catch (Exception e)
- {
- // no-op
- }
+ try
+ {
+ component.stop();
+ }
+ catch (Exception e)
+ {
+ // no-op
+ }
}
protected final ClientSessionFactory createSessionFactory(ServerLocator locator) throws Exception
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionTest.java 2012-02-24 16:04:09 UTC (rev 12191)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionTest.java 2012-02-24 16:04:38 UTC (rev 12192)
@@ -29,6 +29,7 @@
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.api.core.client.SessionFailureListener;
+import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ClientSessionInternal;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
@@ -62,7 +63,7 @@
{
cf = createSessionFactory(locator);
- ClientSession clientSession = cf.createSession(false, true, true);
+ ClientSession clientSession = addClientSession(cf.createSession(false, true, true));
final CountDownLatch latch = new CountDownLatch(1);
clientSession.addFailureListener(new SessionFailureListener()
{
@@ -79,15 +80,13 @@
// Make sure failure listener is called if server is stopped without session being closed first
server.stop();
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
+ }
- // not really part of the test,
- // we still clean up resources left in the VM
- clientSession.close();
- }
-
public void testFailureListenerRemoved() throws Exception
{
cf = createSessionFactory(locator);
+ try
+ {
ClientSession clientSession = cf.createSession(false, true, true);
class MyFailureListener implements SessionFailureListener
{
@@ -110,6 +109,11 @@
clientSession.close();
server.stop();
Assert.assertFalse(listener.called);
+ }
+ finally
+ {
+ ((ClientSessionFactoryInternal)cf).causeExit();
+ }
}
// Closing a session if the underlying remoting connection is deaad should cleanly
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java 2012-02-24 16:04:09 UTC (rev 12191)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java 2012-02-24 16:04:38 UTC (rev 12192)
@@ -24,7 +24,7 @@
import java.util.concurrent.TimeUnit;
/**
- *
+ *
* @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
*
*/
@@ -33,27 +33,27 @@
protected static final String CONNECT_COMMAND = "CONNECT";
protected static final String CONNECTED_COMMAND = "CONNECTED";
protected static final String DISCONNECT_COMMAND = "DISCONNECT";
-
+
protected static final String LOGIN_HEADER = "login";
protected static final String PASSCODE_HEADER = "passcode";
-
+
//ext
protected static final String CLIENT_ID_HEADER = "client-id";
-
-
+
+
protected String version;
protected String host;
protected int port;
protected String username;
protected String passcode;
protected StompFrameFactory factory;
- protected SocketChannel socketChannel;
+ protected final SocketChannel socketChannel;
protected ByteBuffer readBuffer;
-
+
protected List<Byte> receiveList;
-
+
protected BlockingQueue<ClientStompFrame> frameQueue = new LinkedBlockingQueue<ClientStompFrame>();
-
+
protected boolean connected = false;
public AbstractStompClientConnection(String version, String host, int port) throws IOException
@@ -62,28 +62,27 @@
this.host = host;
this.port = port;
this.factory = StompFrameFactoryFactory.getFactory(version);
-
+ socketChannel = SocketChannel.open();
initSocket();
}
private void initSocket() throws IOException
{
- socketChannel = SocketChannel.open();
socketChannel.configureBlocking(true);
InetSocketAddress remoteAddr = new InetSocketAddress(host, port);
socketChannel.connect(remoteAddr);
-
+
startReaderThread();
}
-
+
private void startReaderThread()
{
readBuffer = ByteBuffer.allocateDirect(10240);
receiveList = new ArrayList<Byte>(10240);
-
+
new ReaderThread().start();
}
-
+
public ClientStompFrame sendFrame(ClientStompFrame frame) throws IOException, InterruptedException
{
ClientStompFrame response = null;
@@ -92,12 +91,12 @@
{
socketChannel.write(buffer);
}
-
+
//now response
if (frame.needsReply())
{
response = receiveFrame();
-
+
//filter out server ping
while (response != null)
{
@@ -118,17 +117,17 @@
{
ClientStompFrame response = null;
ByteBuffer buffer = frame.toByteBufferWithExtra("\n");
-
+
while (buffer.remaining() > 0)
{
socketChannel.write(buffer);
}
-
+
//now response
if (frame.needsReply())
{
response = receiveFrame();
-
+
//filter out server ping
while (response != null)
{
@@ -154,7 +153,7 @@
{
return frameQueue.poll(timeout, TimeUnit.MILLISECONDS);
}
-
+
//put bytes to byte array.
private void receiveBytes(int n) throws UnsupportedEncodingException
{
@@ -174,7 +173,7 @@
frameBytes[j] = receiveList.get(j);
}
ClientStompFrame frame = factory.createFrame(new String(frameBytes, "UTF-8"));
-
+
if (validateFrame(frame))
{
frameQueue.offer(frame);
@@ -202,7 +201,7 @@
//clear readbuffer
readBuffer.rewind();
}
-
+
protected void incrementServerPing()
{
}
@@ -220,20 +219,21 @@
}
return true;
}
-
+
protected void close() throws IOException
{
socketChannel.close();
}
-
+
private class ReaderThread extends Thread
{
+ @Override
public void run()
{
try
{
int n = socketChannel.read(readBuffer);
-
+
while (n >= 0)
{
if (n > 0)
@@ -244,14 +244,14 @@
}
//peer closed
close();
-
- }
+
+ }
catch (IOException e)
{
try
{
close();
- }
+ }
catch (IOException e1)
{
//ignore
@@ -264,7 +264,7 @@
{
connect(null, null);
}
-
+
public void destroy()
{
try
@@ -279,22 +279,22 @@
this.connected = false;
}
}
-
+
public void connect(String username, String password) throws Exception
{
throw new RuntimeException("connect method not implemented!");
}
-
+
public boolean isConnected()
{
return connected;
}
-
+
public String getVersion()
{
return version;
}
-
+
public int getFrameQueueSize()
{
return this.frameQueue.size();
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompV11Test.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompV11Test.java 2012-02-24 16:04:09 UTC (rev 12191)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompV11Test.java 2012-02-24 16:04:38 UTC (rev 12192)
@@ -39,67 +39,75 @@
import org.hornetq.tests.integration.stomp.util.StompClientConnectionV11;
/*
- *
+ *
*/
public class StompV11Test extends StompTestBase2
{
private static final transient Logger log = Logger.getLogger(StompV11Test.class);
-
+
private StompClientConnection connV11;
-
+
+ @Override
protected void setUp() throws Exception
{
super.setUp();
connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
}
-
+
+ @Override
protected void tearDown() throws Exception
{
- System.out.println("Connection 11 : " + connV11.isConnected());
- if (connV11.isConnected())
+ try
{
- connV11.disconnect();
+ log.debug("Connection 11 : " + connV11.isConnected());
+ if (connV11.isConnected())
+ {
+ connV11.disconnect();
+ }
}
- super.tearDown();
+ finally
+ {
+ super.tearDown();
+ }
}
-
+
public void testConnection() throws Exception
{
StompClientConnection connection = StompClientConnectionFactory.createClientConnection("1.0", hostname, port);
-
+
connection.connect(defUser, defPass);
-
+
assertTrue(connection.isConnected());
-
+
assertEquals("1.0", connection.getVersion());
-
+
connection.disconnect();
connection = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
-
+
connection.connect(defUser, defPass);
-
+
assertTrue(connection.isConnected());
-
+
assertEquals("1.1", connection.getVersion());
-
+
connection.disconnect();
-
+
connection = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
-
+
connection.connect();
-
+
assertFalse(connection.isConnected());
-
+
//new way of connection
StompClientConnectionV11 conn = (StompClientConnectionV11) StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
conn.connect1(defUser, defPass);
-
+
assertTrue(conn.isConnected());
-
+
conn.disconnect();
}
-
+
public void testNegotiation() throws Exception
{
// case 1 accept-version absent. It is a 1.0 connect
@@ -107,11 +115,11 @@
frame.addHeader("host", "127.0.0.1");
frame.addHeader("login", this.defUser);
frame.addHeader("passcode", this.defPass);
-
+
ClientStompFrame reply = connV11.sendFrame(frame);
-
+
assertEquals("CONNECTED", reply.getCommand());
-
+
//reply headers: version, session, server
assertEquals(null, reply.getHeader("version"));
@@ -124,14 +132,14 @@
frame.addHeader("host", "127.0.0.1");
frame.addHeader("login", this.defUser);
frame.addHeader("passcode", this.defPass);
-
+
reply = connV11.sendFrame(frame);
-
+
assertEquals("CONNECTED", reply.getCommand());
-
+
//reply headers: version, session, server
assertEquals("1.0", reply.getHeader("version"));
-
+
connV11.disconnect();
// case 3 accept-version=1.1, result: 1.1
@@ -141,14 +149,14 @@
frame.addHeader("host", "127.0.0.1");
frame.addHeader("login", this.defUser);
frame.addHeader("passcode", this.defPass);
-
+
reply = connV11.sendFrame(frame);
-
+
assertEquals("CONNECTED", reply.getCommand());
-
+
//reply headers: version, session, server
assertEquals("1.1", reply.getHeader("version"));
-
+
connV11.disconnect();
// case 4 accept-version=1.0,1.1,1.2, result 1.1
@@ -158,14 +166,14 @@
frame.addHeader("host", "127.0.0.1");
frame.addHeader("login", this.defUser);
frame.addHeader("passcode", this.defPass);
-
+
reply = connV11.sendFrame(frame);
-
+
assertEquals("CONNECTED", reply.getCommand());
-
+
//reply headers: version, session, server
assertEquals("1.1", reply.getHeader("version"));
-
+
connV11.disconnect();
// case 5 accept-version=1.2, result error
@@ -175,15 +183,15 @@
frame.addHeader("host", "127.0.0.1");
frame.addHeader("login", this.defUser);
frame.addHeader("passcode", this.defPass);
-
+
reply = connV11.sendFrame(frame);
-
+
assertEquals("ERROR", reply.getCommand());
-
+
System.out.println("Got error frame " + reply);
-
+
}
-
+
public void testSendAndReceive() throws Exception
{
connV11.connect(defUser, defPass);
@@ -191,56 +199,56 @@
frame.addHeader("destination", getQueuePrefix() + getQueueName());
frame.addHeader("content-type", "text/plain");
frame.setBody("Hello World 1!");
-
+
ClientStompFrame response = connV11.sendFrame(frame);
-
+
assertNull(response);
-
+
frame.addHeader("receipt", "1234");
frame.setBody("Hello World 2!");
-
+
response = connV11.sendFrame(frame);
-
+
assertNotNull(response);
-
+
assertEquals("RECEIPT", response.getCommand());
-
+
assertEquals("1234", response.getHeader("receipt-id"));
-
+
//subscribe
StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
newConn.connect(defUser, defPass);
-
+
ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
subFrame.addHeader("id", "a-sub");
subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
subFrame.addHeader("ack", "auto");
-
+
newConn.sendFrame(subFrame);
-
+
frame = newConn.receiveFrame();
-
+
System.out.println("received " + frame);
-
+
assertEquals("MESSAGE", frame.getCommand());
-
+
assertEquals("a-sub", frame.getHeader("subscription"));
-
+
assertNotNull(frame.getHeader("message-id"));
-
+
assertEquals(getQueuePrefix() + getQueueName(), frame.getHeader("destination"));
-
+
assertEquals("Hello World 1!", frame.getBody());
-
+
frame = newConn.receiveFrame();
-
- System.out.println("received " + frame);
-
+
+ System.out.println("received " + frame);
+
//unsub
ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
unsubFrame.addHeader("id", "a-sub");
newConn.sendFrame(unsubFrame);
-
+
newConn.disconnect();
}
@@ -251,32 +259,32 @@
frame.addHeader("destination", getQueuePrefix() + getQueueName());
frame.addHeader("content-type", "application/xml");
frame.setBody("Hello World 1!");
-
+
connV11.sendFrame(frame);
-
+
//subscribe
StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
newConn.connect(defUser, defPass);
-
+
ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
subFrame.addHeader("id", "a-sub");
subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
subFrame.addHeader("ack", "auto");
-
+
newConn.sendFrame(subFrame);
-
+
frame = newConn.receiveFrame();
-
+
System.out.println("received " + frame);
-
+
assertEquals("MESSAGE", frame.getCommand());
-
+
assertEquals("application/xml", frame.getHeader("content-type"));
-
+
//unsub
ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
unsubFrame.addHeader("id", "a-sub");
-
+
newConn.disconnect();
}
@@ -284,40 +292,40 @@
{
connV11.connect(defUser, defPass);
ClientStompFrame frame = connV11.createFrame("SEND");
-
+
String body = "Hello World 1!";
String cLen = String.valueOf(body.getBytes("UTF-8").length);
-
+
frame.addHeader("destination", getQueuePrefix() + getQueueName());
frame.addHeader("content-type", "application/xml");
frame.addHeader("content-length", cLen);
frame.setBody(body + "extra");
-
+
connV11.sendFrame(frame);
-
+
//subscribe
StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
newConn.connect(defUser, defPass);
-
+
ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
subFrame.addHeader("id", "a-sub");
subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
subFrame.addHeader("ack", "auto");
-
+
newConn.sendFrame(subFrame);
-
+
frame = newConn.receiveFrame();
-
+
System.out.println("received " + frame);
-
+
assertEquals("MESSAGE", frame.getCommand());
-
+
assertEquals(cLen, frame.getHeader("content-length"));
-
+
//unsub
ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
unsubFrame.addHeader("id", "a-sub");
-
+
newConn.disconnect();
}
@@ -325,51 +333,51 @@
{
connV11.connect(defUser, defPass);
ClientStompFrame frame = connV11.createFrame("SEND");
-
+
String body = "Hello World 1!";
String cLen = String.valueOf(body.getBytes("UTF-8").length);
-
+
frame.addHeader("destination", getQueuePrefix() + getQueueName());
frame.addHeader("content-type", "application/xml");
frame.addHeader("content-length", cLen);
String hKey = "special-header\\\\\\n\\:";
String hVal = "\\:\\\\\\ngood";
frame.addHeader(hKey, hVal);
-
+
System.out.println("key: |" + hKey + "| val: |" + hVal);
-
+
frame.setBody(body);
-
+
connV11.sendFrame(frame);
-
+
//subscribe
StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
newConn.connect(defUser, defPass);
-
+
ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
subFrame.addHeader("id", "a-sub");
subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
subFrame.addHeader("ack", "auto");
-
+
newConn.sendFrame(subFrame);
-
+
frame = newConn.receiveFrame();
-
+
System.out.println("received " + frame);
-
+
assertEquals("MESSAGE", frame.getCommand());
-
+
String value = frame.getHeader("special-header" + "\\" + "\n" + ":");
-
+
assertEquals(":" + "\\" + "\n" + "good", value);
-
+
//unsub
ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
unsubFrame.addHeader("id", "a-sub");
-
+
newConn.disconnect();
}
-
+
public void testHeartBeat() throws Exception
{
//no heart beat at all if heat-beat absent
@@ -377,17 +385,17 @@
frame.addHeader("host", "127.0.0.1");
frame.addHeader("login", this.defUser);
frame.addHeader("passcode", this.defPass);
-
+
ClientStompFrame reply = connV11.sendFrame(frame);
-
+
assertEquals("CONNECTED", reply.getCommand());
-
+
Thread.sleep(5000);
-
+
assertEquals(0, connV11.getFrameQueueSize());
-
+
connV11.disconnect();
-
+
//no heart beat for (0,0)
connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
frame = connV11.createFrame("CONNECT");
@@ -396,17 +404,17 @@
frame.addHeader("passcode", this.defPass);
frame.addHeader("heart-beat", "0,0");
frame.addHeader("accept-version", "1.0,1.1");
-
+
reply = connV11.sendFrame(frame);
-
+
assertEquals("CONNECTED", reply.getCommand());
-
+
assertEquals("0,0", reply.getHeader("heart-beat"));
-
+
Thread.sleep(5000);
-
+
assertEquals(0, connV11.getFrameQueueSize());
-
+
connV11.disconnect();
//heart-beat (1,0), should receive a min client ping accepted by server
@@ -417,15 +425,15 @@
frame.addHeader("passcode", this.defPass);
frame.addHeader("heart-beat", "1,0");
frame.addHeader("accept-version", "1.0,1.1");
-
+
reply = connV11.sendFrame(frame);
-
+
assertEquals("CONNECTED", reply.getCommand());
-
+
assertEquals("0,500", reply.getHeader("heart-beat"));
-
+
Thread.sleep(2000);
-
+
//now server side should be disconnected because we didn't send ping for 2 sec
frame = connV11.createFrame("SEND");
frame.addHeader("destination", getQueuePrefix() + getQueueName());
@@ -442,7 +450,7 @@
{
//ignore
}
-
+
//heart-beat (1,0), start a ping, then send a message, should be ok.
connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
frame = connV11.createFrame("CONNECT");
@@ -451,19 +459,19 @@
frame.addHeader("passcode", this.defPass);
frame.addHeader("heart-beat", "1,0");
frame.addHeader("accept-version", "1.0,1.1");
-
+
reply = connV11.sendFrame(frame);
-
+
assertEquals("CONNECTED", reply.getCommand());
-
+
assertEquals("0,500", reply.getHeader("heart-beat"));
-
+
System.out.println("========== start pinger!");
-
+
connV11.startPinger(500);
-
+
Thread.sleep(2000);
-
+
//now server side should be disconnected because we didn't send ping for 2 sec
frame = connV11.createFrame("SEND");
frame.addHeader("destination", getQueuePrefix() + getQueueName());
@@ -472,13 +480,13 @@
//send will be ok
connV11.sendFrame(frame);
-
+
connV11.stopPinger();
-
+
connV11.disconnect();
}
-
+
//server ping
public void testHeartBeat2() throws Exception
{
@@ -489,14 +497,14 @@
frame.addHeader("passcode", this.defPass);
frame.addHeader("heart-beat", "1,1");
frame.addHeader("accept-version", "1.0,1.1");
-
+
ClientStompFrame reply = connV11.sendFrame(frame);
-
+
assertEquals("CONNECTED", reply.getCommand());
assertEquals("500,500", reply.getHeader("heart-beat"));
connV11.disconnect();
-
+
//heart-beat (500,1000)
connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
frame = connV11.createFrame("CONNECT");
@@ -505,26 +513,26 @@
frame.addHeader("passcode", this.defPass);
frame.addHeader("heart-beat", "500,1000");
frame.addHeader("accept-version", "1.0,1.1");
-
+
reply = connV11.sendFrame(frame);
-
+
assertEquals("CONNECTED", reply.getCommand());
-
+
assertEquals("1000,500", reply.getHeader("heart-beat"));
-
+
System.out.println("========== start pinger!");
-
+
connV11.startPinger(500);
-
+
Thread.sleep(10000);
-
+
//now check the frame size
int size = connV11.getServerPingNumber();
-
+
System.out.println("ping received: " + size);
-
+
assertTrue(size > 5);
-
+
//now server side should be disconnected because we didn't send ping for 2 sec
frame = connV11.createFrame("SEND");
frame.addHeader("destination", getQueuePrefix() + getQueueName());
@@ -533,10 +541,10 @@
//send will be ok
connV11.sendFrame(frame);
-
+
connV11.disconnect();
}
-
+
public void testSendWithHeartBeatsAndReceive() throws Exception
{
StompClientConnection newConn = null;
@@ -601,21 +609,21 @@
connV11.disconnect();
}
}
-
+
public void testSendAndReceiveWithHeartBeats() throws Exception
{
connV11.connect(defUser, defPass);
ClientStompFrame frame = connV11.createFrame("SEND");
frame.addHeader("destination", getQueuePrefix() + getQueueName());
frame.addHeader("content-type", "text/plain");
-
+
for (int i = 0; i < 10; i++)
{
frame.setBody("Hello World " + i + "!");
connV11.sendFrame(frame);
Thread.sleep(500);
}
-
+
//subscribe
StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
try
@@ -750,13 +758,13 @@
ClientStompFrame frame = connV11.receiveFrame();
String messageID = frame.getHeader("message-id");
-
+
System.out.println("Received message with id " + messageID);
-
+
nack(connV11, "sub1", messageID);
-
+
unsubscribe(connV11, "sub1");
-
+
connV11.disconnect();
//Nack makes the message be dropped.
@@ -776,17 +784,17 @@
ClientStompFrame frame = connV11.receiveFrame();
String messageID = frame.getHeader("message-id");
-
+
System.out.println("Received message with id " + messageID);
-
+
nack(connV11, "sub2", messageID);
-
+
ClientStompFrame error = connV11.receiveFrame();
-
+
System.out.println("Receiver error: " + error);
-
+
unsubscribe(connV11, "sub1");
-
+
connV11.disconnect();
//message should be still there
@@ -806,17 +814,17 @@
ClientStompFrame frame = connV11.receiveFrame();
String messageID = frame.getHeader("message-id");
-
+
System.out.println("Received message with id " + messageID);
-
+
nack(connV11, "sub2", "someother");
-
+
ClientStompFrame error = connV11.receiveFrame();
-
+
System.out.println("Receiver error: " + error);
-
+
unsubscribe(connV11, "sub1");
-
+
connV11.disconnect();
//message should still there
@@ -824,8 +832,8 @@
Message message = consumer.receive(1000);
Assert.assertNotNull(message);
}
-
-
+
+
public void testAck() throws Exception
{
connV11.connect(defUser, defPass);
@@ -837,13 +845,13 @@
ClientStompFrame frame = connV11.receiveFrame();
String messageID = frame.getHeader("message-id");
-
+
System.out.println("Received message with id " + messageID);
-
+
ack(connV11, "sub1", messageID, null);
-
+
unsubscribe(connV11, "sub1");
-
+
connV11.disconnect();
//Nack makes the message be dropped.
@@ -863,17 +871,17 @@
ClientStompFrame frame = connV11.receiveFrame();
String messageID = frame.getHeader("message-id");
-
+
System.out.println("Received message with id " + messageID);
-
+
ack(connV11, "sub2", messageID, null);
-
+
ClientStompFrame error = connV11.receiveFrame();
-
+
System.out.println("Receiver error: " + error);
-
+
unsubscribe(connV11, "sub1");
-
+
connV11.disconnect();
//message should be still there
@@ -893,17 +901,17 @@
ClientStompFrame frame = connV11.receiveFrame();
String messageID = frame.getHeader("message-id");
-
+
System.out.println("Received message with id " + messageID);
-
+
ack(connV11, "sub2", "someother", null);
-
+
ClientStompFrame error = connV11.receiveFrame();
-
+
System.out.println("Receiver error: " + error);
-
+
unsubscribe(connV11, "sub1");
-
+
connV11.disconnect();
//message should still there
@@ -911,7 +919,7 @@
Message message = consumer.receive(1000);
Assert.assertNotNull(message);
}
-
+
public void testErrorWithReceipt() throws Exception
{
connV11.connect(defUser, defPass);
@@ -923,33 +931,33 @@
ClientStompFrame frame = connV11.receiveFrame();
String messageID = frame.getHeader("message-id");
-
+
System.out.println("Received message with id " + messageID);
-
+
ClientStompFrame ackFrame = connV11.createFrame("ACK");
//give it a wrong sub id
ackFrame.addHeader("subscription", "sub2");
ackFrame.addHeader("message-id", messageID);
ackFrame.addHeader("receipt", "answer-me");
-
+
ClientStompFrame error = connV11.sendFrame(ackFrame);
-
+
System.out.println("Receiver error: " + error);
-
+
assertEquals("ERROR", error.getCommand());
-
+
assertEquals("answer-me", error.getHeader("receipt-id"));
-
+
unsubscribe(connV11, "sub1");
-
+
connV11.disconnect();
//message should still there
MessageConsumer consumer = session.createConsumer(queue);
Message message = consumer.receive(1000);
- Assert.assertNotNull(message);
+ Assert.assertNotNull(message);
}
-
+
public void testErrorWithReceipt2() throws Exception
{
connV11.connect(defUser, defPass);
@@ -961,82 +969,82 @@
ClientStompFrame frame = connV11.receiveFrame();
String messageID = frame.getHeader("message-id");
-
+
System.out.println("Received message with id " + messageID);
-
+
ClientStompFrame ackFrame = connV11.createFrame("ACK");
//give it a wrong sub id
ackFrame.addHeader("subscription", "sub1");
ackFrame.addHeader("message-id", String.valueOf(Long.valueOf(messageID) + 1));
ackFrame.addHeader("receipt", "answer-me");
-
+
ClientStompFrame error = connV11.sendFrame(ackFrame);
-
+
System.out.println("Receiver error: " + error);
-
+
assertEquals("ERROR", error.getCommand());
-
+
assertEquals("answer-me", error.getHeader("receipt-id"));
-
+
unsubscribe(connV11, "sub1");
-
+
connV11.disconnect();
//message should still there
MessageConsumer consumer = session.createConsumer(queue);
Message message = consumer.receive(1000);
- Assert.assertNotNull(message);
+ Assert.assertNotNull(message);
}
-
+
public void testAckModeClient() throws Exception
{
connV11.connect(defUser, defPass);
subscribe(connV11, "sub1", "client");
-
+
int num = 50;
//send a bunch of messages
for (int i = 0; i < num; i++)
{
this.sendMessage("client-ack" + i);
}
-
+
ClientStompFrame frame = null;
-
+
for (int i = 0; i < num; i++)
{
frame = connV11.receiveFrame();
assertNotNull(frame);
}
-
+
//ack the last
this.ack(connV11, "sub1", frame);
-
+
unsubscribe(connV11, "sub1");
-
+
connV11.disconnect();
-
+
//no messages can be received.
MessageConsumer consumer = session.createConsumer(queue);
Message message = consumer.receive(1000);
Assert.assertNull(message);
}
-
+
public void testAckModeClient2() throws Exception
{
connV11.connect(defUser, defPass);
subscribe(connV11, "sub1", "client");
-
+
int num = 50;
//send a bunch of messages
for (int i = 0; i < num; i++)
{
this.sendMessage("client-ack" + i);
}
-
+
ClientStompFrame frame = null;
-
+
for (int i = 0; i < num; i++)
{
frame = connV11.receiveFrame();
@@ -1048,11 +1056,11 @@
this.ack(connV11, "sub1", frame);
}
}
-
+
unsubscribe(connV11, "sub1");
-
+
connV11.disconnect();
-
+
//no messages can be received.
MessageConsumer consumer = session.createConsumer(queue);
Message message = consumer.receive(1000);
@@ -1060,58 +1068,58 @@
message = consumer.receive(1000);
Assert.assertNull(message);
}
-
+
public void testAckModeAuto() throws Exception
{
connV11.connect(defUser, defPass);
subscribe(connV11, "sub1", "auto");
-
+
int num = 50;
//send a bunch of messages
for (int i = 0; i < num; i++)
{
this.sendMessage("auto-ack" + i);
}
-
+
ClientStompFrame frame = null;
-
+
for (int i = 0; i < num; i++)
{
frame = connV11.receiveFrame();
assertNotNull(frame);
}
-
+
unsubscribe(connV11, "sub1");
-
+
connV11.disconnect();
-
+
//no messages can be received.
MessageConsumer consumer = session.createConsumer(queue);
Message message = consumer.receive(1000);
Assert.assertNull(message);
}
-
+
public void testAckModeClientIndividual() throws Exception
{
connV11.connect(defUser, defPass);
subscribe(connV11, "sub1", "client-individual");
-
+
int num = 50;
//send a bunch of messages
for (int i = 0; i < num; i++)
{
this.sendMessage("client-individual-ack" + i);
}
-
+
ClientStompFrame frame = null;
-
+
for (int i = 0; i < num; i++)
{
frame = connV11.receiveFrame();
assertNotNull(frame);
-
+
System.out.println(i + " == received: " + frame);
//ack on even numbers
if (i%2 == 0)
@@ -1119,14 +1127,14 @@
this.ack(connV11, "sub1", frame);
}
}
-
+
unsubscribe(connV11, "sub1");
-
+
connV11.disconnect();
-
+
//no messages can be received.
MessageConsumer consumer = session.createConsumer(queue);
-
+
TextMessage message = null;
for (int i = 0; i < num/2; i++)
{
@@ -1134,85 +1142,85 @@
Assert.assertNotNull(message);
System.out.println("Legal: " + message.getText());
}
-
+
message = (TextMessage) consumer.receive(1000);
-
+
Assert.assertNull(message);
}
-
+
public void testTwoSubscribers() throws Exception
{
connV11.connect(defUser, defPass, "myclientid");
this.subscribeTopic(connV11, "sub1", "auto", null);
-
+
StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
newConn.connect(defUser, defPass, "myclientid2");
-
+
this.subscribeTopic(newConn, "sub2", "auto", null);
ClientStompFrame frame = connV11.createFrame("SEND");
frame.addHeader("destination", getTopicPrefix() + getTopicName());
-
+
frame.setBody("Hello World");
-
+
connV11.sendFrame(frame);
-
+
// receive message from socket
frame = connV11.receiveFrame(1000);
-
+
System.out.println("received frame : " + frame);
assertEquals("Hello World", frame.getBody());
assertEquals("sub1", frame.getHeader("subscription"));
-
+
frame = newConn.receiveFrame(1000);
-
+
System.out.println("received 2 frame : " + frame);
assertEquals("Hello World", frame.getBody());
assertEquals("sub2", frame.getHeader("subscription"));
-
+
// remove suscription
this.unsubscribe(connV11, "sub1", true);
this.unsubscribe(newConn, "sub2", true);
-
+
connV11.disconnect();
newConn.disconnect();
}
-
+
public void testSendAndReceiveOnDifferentConnections() throws Exception
{
connV11.connect(defUser, defPass);
-
+
ClientStompFrame sendFrame = connV11.createFrame("SEND");
sendFrame.addHeader("destination", getQueuePrefix() + getQueueName());
sendFrame.setBody("Hello World");
connV11.sendFrame(sendFrame);
-
+
StompClientConnection connV11_2 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
connV11_2.connect(defUser, defPass);
-
+
this.subscribe(connV11_2, "sub1", "auto");
-
+
ClientStompFrame frame = connV11_2.receiveFrame(2000);
-
+
assertEquals("MESSAGE", frame.getCommand());
assertEquals("Hello World", frame.getBody());
-
+
connV11.disconnect();
connV11_2.disconnect();
}
//----------------Note: tests below are adapted from StompTest
-
+
public void testBeginSameTransactionTwice() throws Exception
{
connV11.connect(defUser, defPass);
beginTransaction(connV11, "tx1");
-
+
beginTransaction(connV11, "tx1");
-
+
ClientStompFrame f = connV11.receiveFrame();
Assert.assertTrue(f.getCommand().equals("ERROR"));
}
@@ -1232,10 +1240,10 @@
Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
Assert.assertNotNull(frame.getHeader("destination"));
Assert.assertTrue(frame.getBody().equals(text));
-
+
connV11.disconnect();
}
-
+
public void testClientAckNotPartOfTransaction() throws Exception
{
connV11.connect(defUser, defPass);
@@ -1260,7 +1268,7 @@
abortTransaction(connV11, "tx1");
frame = connV11.receiveFrame();
-
+
assertNull(frame);
this.unsubscribe(connV11, getName());
@@ -1276,9 +1284,9 @@
ClientStompFrame frame = connV11.createFrame("DISCONNECT");
frame.addHeader("receipt", "1");
-
+
ClientStompFrame result = connV11.sendFrame(frame);
-
+
if (result == null || (!"RECEIPT".equals(result.getCommand())) || (!"1".equals(result.getHeader("receipt-id"))))
{
fail("Disconnect failed! " + result);
@@ -1288,7 +1296,7 @@
ClientStompFrame sendFrame = connV11.createFrame("SEND");
sendFrame.addHeader("destination", getQueuePrefix() + getQueueName());
sendFrame.setBody("Hello World");
-
+
try
{
connV11.sendFrame(sendFrame);
@@ -1298,8 +1306,10 @@
{
//ok.
}
-
- connV11.destroy();
+ finally
+ {
+ connV11.destroy();
+ }
}
public void testDurableSubscriber() throws Exception
@@ -1324,9 +1334,9 @@
ClientStompFrame frame = connV11.createFrame("DISCONNECT");
frame.addHeader("receipt", "1");
-
+
ClientStompFrame result = connV11.sendFrame(frame);
-
+
if (result == null || (!"RECEIPT".equals(result.getCommand())) || (!"1".equals(result.getHeader("receipt-id"))))
{
fail("Disconnect failed! " + result);
@@ -1349,7 +1359,7 @@
Assert.assertEquals(getName(), frame.getBody());
this.unsubscribe(connV11, "sub1");
-
+
connV11.disconnect();
}
@@ -1363,7 +1373,7 @@
frame.addHeader("destination", getQueuePrefix() + getQueueName());
frame.addHeader("JMSXGroupID", "TEST");
frame.setBody("Hello World");
-
+
connV11.sendFrame(frame);
TextMessage message = (TextMessage)consumer.receive(1000);
@@ -1379,7 +1389,7 @@
String[] data = new String[ctr];
connV11.connect(defUser, defPass);
-
+
this.subscribe(connV11, "sub1", "auto");
for (int i = 0; i < ctr; ++i)
@@ -1389,7 +1399,7 @@
}
ClientStompFrame frame = null;
-
+
for (int i = 0; i < ctr; ++i)
{
frame = connV11.receiveFrame();
@@ -1414,7 +1424,7 @@
public void testSubscribeWithAutoAckAndSelector() throws Exception
{
connV11.connect(defUser, defPass);
-
+
this.subscribe(connV11, "sub1", "auto", null, "foo = 'zzz'");
sendMessage("Ignored message", "foo", "1234");
@@ -1436,7 +1446,7 @@
sendMessage(getName());
ClientStompFrame frame = connV11.receiveFrame();
-
+
assertTrue(frame.getCommand().equals("MESSAGE"));
connV11.disconnect();
@@ -1474,7 +1484,7 @@
}
assertTrue(latch.await(60, TimeUnit.SECONDS));
-
+
connV11.disconnect();
}
@@ -1487,7 +1497,7 @@
ClientStompFrame frame = connV11.createFrame("SEND");
frame.addHeader("destination", getQueuePrefix() + getQueueName());
frame.setBody("Hello World");
-
+
connV11.sendFrame(frame);
TextMessage message = (TextMessage)consumer.receive(1000);
@@ -1510,16 +1520,16 @@
connV11.connect(defUser, defPass);
byte[] data = new byte[] { 1, 0, 0, 4 };
-
+
ClientStompFrame frame = connV11.createFrame("SEND");
-
+
frame.addHeader("destination", getQueuePrefix() + getQueueName());
frame.setBody(new String(data, "UTF-8"));
-
+
frame.addHeader("content-length", String.valueOf(data.length));
connV11.sendFrame(frame);
-
+
BytesMessage message = (BytesMessage)consumer.receive(10000);
Assert.assertNotNull(message);
@@ -1539,10 +1549,10 @@
ClientStompFrame frame = connV11.createFrame("SEND");
frame.addHeader("foo", "abc");
frame.addHeader("bar", "123");
-
+
frame.addHeader("destination", getQueuePrefix() + getQueueName());
frame.setBody("Hello World");
-
+
connV11.sendFrame(frame);
TextMessage message = (TextMessage)consumer.receive(1000);
@@ -1551,13 +1561,13 @@
Assert.assertEquals("foo", "abc", message.getStringProperty("foo"));
Assert.assertEquals("bar", "123", message.getStringProperty("bar"));
}
-
+
public void testSendMessageWithLeadingNewLine() throws Exception
{
MessageConsumer consumer = session.createConsumer(queue);
connV11.connect(defUser, defPass);
-
+
ClientStompFrame frame = connV11.createFrame("SEND");
frame.addHeader("destination", getQueuePrefix() + getQueueName());
@@ -1574,9 +1584,9 @@
long tnow = System.currentTimeMillis();
long tmsg = message.getJMSTimestamp();
Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
-
+
assertNull(consumer.receive(1000));
-
+
connV11.disconnect();
}
@@ -1592,7 +1602,7 @@
frame.setBody("Hello World");
frame = connV11.sendFrame(frame);
-
+
assertTrue(frame.getCommand().equals("RECEIPT"));
assertEquals("1234", frame.getHeader("receipt-id"));
@@ -1605,7 +1615,7 @@
long tnow = System.currentTimeMillis();
long tmsg = message.getJMSTimestamp();
Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
-
+
connV11.disconnect();
}
@@ -1640,18 +1650,18 @@
Assert.assertEquals("bar", "123", message.getStringProperty("bar"));
Assert.assertEquals("JMSXGroupID", "abc", message.getStringProperty("JMSXGroupID"));
-
+
connV11.disconnect();
}
public void testSubscribeToTopic() throws Exception
{
connV11.connect(defUser, defPass);
-
+
this.subscribeTopic(connV11, "sub1", null, null, true);
sendMessage(getName(), topic);
-
+
ClientStompFrame frame = connV11.receiveFrame();
Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
@@ -1683,7 +1693,7 @@
connV11.sendFrame(frame);
frame = connV11.receiveFrame(2000);
-
+
assertNull(frame);
// send message on another JMS connection => it should be received
@@ -1694,9 +1704,9 @@
Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
Assert.assertTrue(frame.getHeader("destination").equals(getTopicPrefix() + getTopicName()));
Assert.assertTrue(frame.getBody().equals(getName()));
-
+
this.unsubscribe(connV11, "sub1");
-
+
connV11.disconnect();
}
@@ -1715,7 +1725,7 @@
Assert.assertEquals(getName(), frame.getBody());
connV11.disconnect();
-
+
// message should not be received as it was auto-acked
MessageConsumer consumer = session.createConsumer(queue);
Message message = consumer.receive(1000);
@@ -1725,7 +1735,7 @@
public void testSubscribeWithAutoAckAndBytesMessage() throws Exception
{
connV11.connect(defUser, defPass);
-
+
this.subscribe(connV11, "sub1", "auto");
byte[] payload = new byte[] { 1, 2, 3, 4, 5 };
@@ -1734,22 +1744,22 @@
ClientStompFrame frame = connV11.receiveFrame();
assertEquals("MESSAGE", frame.getCommand());
-
+
System.out.println("Message: " + frame);
assertEquals("5", frame.getHeader("content-length"));
assertEquals(null, frame.getHeader("type"));
-
+
assertEquals(frame.getBody(), new String(payload, "UTF-8"));
-
+
connV11.disconnect();
}
public void testSubscribeWithClientAck() throws Exception
{
connV11.connect(defUser, defPass);
-
+
this.subscribe(connV11, "sub1", "client");
sendMessage(getName());
@@ -1779,7 +1789,7 @@
public void testSubscribeWithID() throws Exception
{
connV11.connect(defUser, defPass);
-
+
this.subscribe(connV11, "mysubid", "auto");
sendMessage(getName());
@@ -1794,7 +1804,7 @@
public void testSubscribeWithMessageSentWithProperties() throws Exception
{
connV11.connect(defUser, defPass);
-
+
this.subscribe(connV11, "sub1", "auto");
MessageProducer producer = session.createProducer(queue);
@@ -1812,7 +1822,7 @@
ClientStompFrame frame = connV11.receiveFrame();
Assert.assertNotNull(frame);
-
+
Assert.assertTrue(frame.getHeader("S") != null);
Assert.assertTrue(frame.getHeader("n") != null);
Assert.assertTrue(frame.getHeader("byte") != null);
@@ -1853,16 +1863,16 @@
frame = connV11.createFrame("SEND");
frame.addHeader("destination", getQueuePrefix() + getQueueName());
frame.addHeader("transaction", "tx1");
-
+
frame.setBody("Hello World");
-
+
connV11.sendFrame(frame);
this.commitTransaction(connV11, "tx1");
message = consumer.receive(1000);
Assert.assertNotNull("Should have received a message", message);
-
+
connV11.disconnect();
}
@@ -1881,17 +1891,17 @@
frame.setBody("Hello World");
frame = connV11.sendFrame(frame);
-
+
assertEquals("123", frame.getHeader("receipt-id"));
-
+
// check the message is not committed
assertNull(consumer.receive(100));
-
+
this.commitTransaction(connV11, "tx1", true);
Message message = consumer.receive(1000);
Assert.assertNotNull("Should have received a message", message);
-
+
connV11.disconnect();
}
@@ -1900,7 +1910,7 @@
MessageConsumer consumer = session.createConsumer(queue);
connV11.connect(defUser, defPass);
-
+
this.beginTransaction(connV11, "tx1");
ClientStompFrame frame = connV11.createFrame("SEND");
@@ -1908,7 +1918,7 @@
frame.addHeader("transaction", "tx1");
frame.setBody("first message");
-
+
connV11.sendFrame(frame);
// rollback first message
@@ -1921,7 +1931,7 @@
frame.addHeader("transaction", "tx1");
frame.setBody("second message");
-
+
connV11.sendFrame(frame);
this.commitTransaction(connV11, "tx1", true);
@@ -1930,7 +1940,7 @@
TextMessage message = (TextMessage)consumer.receive(1000);
Assert.assertNotNull(message);
Assert.assertEquals("second message", message.getText());
-
+
connV11.disconnect();
}
@@ -1945,7 +1955,7 @@
// receive message from socket
ClientStompFrame frame = connV11.receiveFrame();
-
+
Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
// remove suscription
@@ -1956,7 +1966,7 @@
frame = connV11.receiveFrame(1000);
assertNull(frame);
-
+
connV11.disconnect();
}
@@ -1974,7 +1984,7 @@
{
ClientStompFrame beginFrame = conn.createFrame("BEGIN");
beginFrame.addHeader("transaction", txID);
-
+
conn.sendFrame(beginFrame);
}
@@ -2002,12 +2012,12 @@
ClientStompFrame frame) throws IOException, InterruptedException
{
String messageID = frame.getHeader("message-id");
-
+
ClientStompFrame ackFrame = conn.createFrame("ACK");
ackFrame.addHeader("subscription", subId);
ackFrame.addHeader("message-id", messageID);
-
+
ClientStompFrame response = conn.sendFrame(ackFrame);
if (response != null)
{
@@ -2024,7 +2034,7 @@
{
ackFrame.addHeader("transaction", txID);
}
-
+
conn.sendFrame(ackFrame);
}
@@ -2033,10 +2043,10 @@
ClientStompFrame ackFrame = conn.createFrame("NACK");
ackFrame.addHeader("subscription", subId);
ackFrame.addHeader("message-id", mid);
-
+
conn.sendFrame(ackFrame);
}
-
+
private void subscribe(StompClientConnection conn, String subId, String ack) throws IOException, InterruptedException
{
subscribe(conn, subId, ack, null, null);
@@ -2060,7 +2070,7 @@
{
subscribe(conn, subId, ack, durableId, selector, false);
}
-
+
private void subscribe(StompClientConnection conn, String subId,
String ack, String durableId, String selector, boolean receipt) throws IOException, InterruptedException
{
@@ -2083,9 +2093,9 @@
{
subFrame.addHeader("receipt", "1234");
}
-
+
subFrame = conn.sendFrame(subFrame);
-
+
if (receipt)
{
assertEquals("1234", subFrame.getHeader("receipt-id"));
@@ -2126,9 +2136,9 @@
{
subFrame.addHeader("no-local", "true");
}
-
+
ClientStompFrame frame = conn.sendFrame(subFrame);
-
+
if (receipt)
{
assertTrue(frame.getHeader("receipt-id").equals("1234"));
@@ -2139,23 +2149,23 @@
{
ClientStompFrame subFrame = conn.createFrame("UNSUBSCRIBE");
subFrame.addHeader("id", subId);
-
+
conn.sendFrame(subFrame);
}
-
+
private void unsubscribe(StompClientConnection conn, String subId,
boolean receipt) throws IOException, InterruptedException
{
ClientStompFrame subFrame = conn.createFrame("UNSUBSCRIBE");
subFrame.addHeader("id", subId);
-
+
if (receipt)
{
subFrame.addHeader("receipt", "4321");
}
-
+
ClientStompFrame f = conn.sendFrame(subFrame);
-
+
if (receipt)
{
System.out.println("response: " + f);
@@ -2171,7 +2181,7 @@
this.subscribe(connV11, "sub1", "client");
sendMessage(getName());
-
+
ClientStompFrame frame = connV11.receiveFrame();
Assert.assertEquals("MESSAGE", frame.getCommand());
@@ -2203,7 +2213,7 @@
connV11.destroy();
connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
connV11.connect(defUser, defPass);
-
+
this.subscribe(connV11, "sub1", null, null, true);
sendMessage("shouldBeNextMessage");
More information about the hornetq-commits
mailing list