JBoss hornetq SVN: r11882 - branches/Branch_2_2_EAP/src/config.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-12-08 13:16:25 -0500 (Thu, 08 Dec 2011)
New Revision: 11882
Modified:
branches/Branch_2_2_EAP/src/config/ra.xml
Log:
just changing the version name
Modified: branches/Branch_2_2_EAP/src/config/ra.xml
===================================================================
--- branches/Branch_2_2_EAP/src/config/ra.xml 2011-12-08 18:04:04 UTC (rev 11881)
+++ branches/Branch_2_2_EAP/src/config/ra.xml 2011-12-08 18:16:25 UTC (rev 11882)
@@ -8,8 +8,8 @@
http://java.sun.com/xml/ns/j2ee/connector_1_5.xsd"
version="1.5">
- <description>HornetQ 2.0 Resource Adapter</description>
- <display-name>HornetQ 2.0 Resource Adapter</display-name>
+ <description>HornetQ 2.2 Resource Adapter</description>
+ <display-name>HornetQ 2.2 Resource Adapter</display-name>
<vendor-name>Red Hat Middleware LLC</vendor-name>
<eis-type>JMS 1.1 Server</eis-type>
12 years, 5 months
JBoss hornetq SVN: r11881 - branches/Branch_2_2_EAP/src/config.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-12-08 13:04:04 -0500 (Thu, 08 Dec 2011)
New Revision: 11881
Modified:
branches/Branch_2_2_EAP/src/config/ra.xml
Log:
JBPAPP-5791 - changing property-types
Modified: branches/Branch_2_2_EAP/src/config/ra.xml
===================================================================
--- branches/Branch_2_2_EAP/src/config/ra.xml 2011-12-08 16:43:37 UTC (rev 11880)
+++ branches/Branch_2_2_EAP/src/config/ra.xml 2011-12-08 18:04:04 UTC (rev 11881)
@@ -246,19 +246,19 @@
<config-property>
<description>whether to use jndi for looking up destinations etc</description>
<config-property-name>UseJNDI</config-property-name>
- <config-property-type>boolean</config-property-type>
+ <config-property-type>java.lang.Boolean</config-property-type>
<config-property-value></config-property-value>
</config-property>
<config-property>
<description>how long in milliseconds to wait before retry on failed MDB setup</description>
<config-property-name>SetupInterval</config-property-name>
- <config-property-type>long</config-property-type>
+ <config-property-type>java.lang.Long</config-property-type>
<config-property-value></config-property-value>
</config-property>
<config-property>
<description>How many attempts should be made when connecting the MDB</description>
<config-property-name>SetupAttempts</config-property-name>
- <config-property-type>int</config-property-type>
+ <config-property-type>java.lang.Integer</config-property-type>
<config-property-value></config-property-value>
</config-property>-->
12 years, 5 months
JBoss hornetq SVN: r11880 - trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/xa.
by do-not-reply@jboss.org
Author: borges
Date: 2011-12-08 11:43:37 -0500 (Thu, 08 Dec 2011)
New Revision: 11880
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/xa/XaTimeoutTest.java
Log:
Close all resources to avoid leaking threads.
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/xa/XaTimeoutTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/xa/XaTimeoutTest.java 2011-12-08 16:37:18 UTC (rev 11879)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/xa/XaTimeoutTest.java 2011-12-08 16:43:37 UTC (rev 11880)
@@ -29,7 +29,13 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.server.HornetQServer;
@@ -65,6 +71,8 @@
private final SimpleString atestq = new SimpleString("atestq");
+ private ServerLocator locator;
+
@Override
protected void setUp() throws Exception
{
@@ -76,11 +84,13 @@
configuration.setTransactionTimeoutScanPeriod(500);
TransportConfiguration transportConfig = new TransportConfiguration(UnitTestCase.INVM_ACCEPTOR_FACTORY);
configuration.getAcceptorConfigurations().add(transportConfig);
- messagingService = HornetQServers.newHornetQServer(configuration, false);
+ messagingService = addServer(HornetQServers.newHornetQServer(configuration, false));
// start the server
messagingService.start();
// then we create a client as normal
- ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(InVMConnectorFactory.class.getName()));
+ locator =
+ addServerLocator(HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(
+ InVMConnectorFactory.class.getName())));
sessionFactory = locator.createSessionFactory();
clientSession = sessionFactory.createSession(true, false, false);
clientSession.createQueue(atestq, atestq, null, true);
@@ -102,17 +112,11 @@
//
}
}
- if (messagingService != null && messagingService.isStarted())
- {
- try
- {
- messagingService.stop();
- }
- catch (Exception e1)
- {
- //
- }
- }
+ closeSessionFactory(sessionFactory);
+ closeServerLocator(locator);
+
+ stopComponent(messagingService);
+
messagingService = null;
clientSession = null;
@@ -505,7 +509,7 @@
for(int i = 0; i < clientSessions.length/2; i++)
{
ClientMessage m = clientConsumer.receiveImmediate();
- Assert.assertNotNull(m);
+ Assert.assertNotNull(m);
}
ClientMessage m = clientConsumer.receiveImmediate();
Assert.assertNull(m);
@@ -549,7 +553,7 @@
{
return Collections.emptySet();
}
-
+
public List<MessageReference> getRelatedMessageReferences()
{
return null;
12 years, 5 months
JBoss hornetq SVN: r11879 - in trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration: management and 1 other directory.
by do-not-reply@jboss.org
Author: borges
Date: 2011-12-08 11:37:18 -0500 (Thu, 08 Dec 2011)
New Revision: 11879
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityManagementTestBase.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityManagementWithConfiguredAdminUserTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityManagementWithDefaultConfigurationTest.java
Log:
Improve tearDown.
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java 2011-12-08 12:41:42 UTC (rev 11878)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java 2011-12-08 16:37:18 UTC (rev 11879)
@@ -39,6 +39,15 @@
setupServers();
}
+ @Override
+ protected void tearDown() throws Exception
+ {
+ log.info("#test tearDown");
+ stopServers();
+
+ super.tearDown();
+ }
+
protected boolean isNetty()
{
return false;
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityManagementTestBase.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityManagementTestBase.java 2011-12-08 12:41:42 UTC (rev 11878)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityManagementTestBase.java 2011-12-08 16:37:18 UTC (rev 11879)
@@ -16,11 +16,15 @@
import junit.framework.Assert;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientRequestor;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.api.core.management.ResourceNames;
import org.hornetq.core.config.impl.ConfigurationImpl;
-import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.tests.util.UnitTestCase;
@@ -71,7 +75,9 @@
protected void doSendManagementMessage(final String user, final String password, final boolean expectSuccess) throws Exception
{
- ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY));
+ ServerLocator locator =
+ addServerLocator(HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(
+ UnitTestCase.INVM_CONNECTOR_FACTORY)));
ClientSessionFactory sf = locator.createSessionFactory();
try
{
@@ -112,6 +118,10 @@
e.printStackTrace();
}
}
+ finally
+ {
+ sf.close();
+ }
}
// Private -------------------------------------------------------
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityManagementWithConfiguredAdminUserTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityManagementWithConfiguredAdminUserTest.java 2011-12-08 12:41:42 UTC (rev 11878)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityManagementWithConfiguredAdminUserTest.java 2011-12-08 16:37:18 UTC (rev 11879)
@@ -86,7 +86,7 @@
Configuration conf = createBasicConfig();
conf.setSecurityEnabled(true);
conf.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
- HornetQServer server = HornetQServers.newHornetQServer(conf, false);
+ HornetQServer server = addServer(HornetQServers.newHornetQServer(conf, false));
server.start();
HierarchicalRepository<Set<Role>> securityRepository = server.getSecurityRepository();
HornetQSecurityManagerImpl securityManager = (HornetQSecurityManagerImpl)server.getSecurityManager();
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityManagementWithDefaultConfigurationTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityManagementWithDefaultConfigurationTest.java 2011-12-08 12:41:42 UTC (rev 11878)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityManagementWithDefaultConfigurationTest.java 2011-12-08 16:37:18 UTC (rev 11879)
@@ -56,7 +56,7 @@
conf.setClusterPassword(ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD);
conf.setSecurityEnabled(true);
conf.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
- HornetQServer server = HornetQServers.newHornetQServer(conf, false);
+ HornetQServer server = addServer(HornetQServers.newHornetQServer(conf, false));
server.start();
return server;
12 years, 5 months
JBoss hornetq SVN: r11878 - in trunk: tests/integration-tests/src/test/java/org/hornetq/tests/integration/client and 3 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2011-12-08 07:41:42 -0500 (Thu, 08 Dec 2011)
New Revision: 11878
Modified:
trunk/hornetq-core/src/test/java/org/hornetq/tests/util/ServiceTestBase.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/HornetQCrashTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/server/PredefinedQueueTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/StompWebSocketTest.java
trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/stomp/StompStressTest.java
Log:
Improve tests' tearDown
Modified: trunk/hornetq-core/src/test/java/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- trunk/hornetq-core/src/test/java/org/hornetq/tests/util/ServiceTestBase.java 2011-12-08 12:41:11 UTC (rev 11877)
+++ trunk/hornetq-core/src/test/java/org/hornetq/tests/util/ServiceTestBase.java 2011-12-08 12:41:42 UTC (rev 11878)
@@ -683,8 +683,8 @@
finally
{
session.close();
- sf.close();
- locator.close();
+ closeSessionFactory(sf);
+ closeServerLocator(locator);
}
}
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/HornetQCrashTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/HornetQCrashTest.java 2011-12-08 12:41:11 UTC (rev 11877)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/HornetQCrashTest.java 2011-12-08 12:41:42 UTC (rev 11878)
@@ -25,9 +25,9 @@
import org.hornetq.tests.util.UnitTestCase;
/**
- *
+ *
* From https://jira.jboss.org/jira/browse/HORNETQ-144
- *
+ *
*/
public class HornetQCrashTest extends UnitTestCase
{
@@ -45,7 +45,7 @@
configuration.setSecurityEnabled(false);
configuration.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
- server = HornetQServers.newHornetQServer(configuration);
+ server = addServer(HornetQServers.newHornetQServer(configuration));
server.start();
@@ -105,6 +105,7 @@
new Thread()
{
+ @Override
public void run()
{
try
@@ -134,18 +135,7 @@
protected void setUp() throws Exception
{
super.setUp();
-
-
locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY));
+ addServerLocator(locator);
}
-
- @Override
- protected void tearDown() throws Exception
- {
- locator.close();
-
- super.tearDown();
-
- server = null;
- }
}
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/server/PredefinedQueueTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/server/PredefinedQueueTest.java 2011-12-08 12:41:11 UTC (rev 11877)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/server/PredefinedQueueTest.java 2011-12-08 12:41:42 UTC (rev 11878)
@@ -21,7 +21,13 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.CoreQueueConfiguration;
import org.hornetq.core.logging.Logger;
@@ -31,11 +37,11 @@
import org.hornetq.tests.util.ServiceTestBase;
/**
- *
+ *
* A PredefinedQueueTest
*
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- *
+ *
* Created 19 Jan 2009 15:44:52
*
*
@@ -216,7 +222,7 @@
final String queueName3 = "queue3";
- HornetQServer server = HornetQServers.newHornetQServer(conf);
+ HornetQServer server = addServer(HornetQServers.newHornetQServer(conf));
server.start();
@@ -333,11 +339,11 @@
conf.setQueueConfigurations(queueConfs);
- HornetQServer server = HornetQServers.newHornetQServer(conf);
+ HornetQServer server = addServer(HornetQServers.newHornetQServer(conf));
server.start();
- ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY));
+ ServerLocator locator = createInVMNonHALocator();
ClientSessionFactory sf = locator.createSessionFactory();
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/StompWebSocketTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/StompWebSocketTest.java 2011-12-08 12:41:11 UTC (rev 11877)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/StompWebSocketTest.java 2011-12-08 12:41:42 UTC (rev 11878)
@@ -40,7 +40,7 @@
private static final transient Logger log = Logger.getLogger(StompWebSocketTest.class);
private JMSServerManager server;
- /**
+ /**
* to test the Stomp over Web Sockets protocol,
* uncomment the sleep call and run the stomp-websockets Javascript test suite
* from http://github.com/jmesnil/stomp-websocket
@@ -51,14 +51,15 @@
// Implementation methods
//-------------------------------------------------------------------------
- protected void setUp() throws Exception {
+ @Override
+ protected void setUp() throws Exception {
server = createServer();
server.start();
}
/**
* @return
- * @throws Exception
+ * @throws Exception
*/
private JMSServerManager createServer() throws Exception
{
@@ -73,14 +74,15 @@
config.getAcceptorConfigurations().add(stompTransport);
config.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
config.getQueueConfigurations().add(new CoreQueueConfiguration(getQueueName(), getQueueName(), null, false));
- HornetQServer hornetQServer = HornetQServers.newHornetQServer(config);
-
+ HornetQServer hornetQServer = addServer(HornetQServers.newHornetQServer(config));
+
JMSConfiguration jmsConfig = new JMSConfigurationImpl();
server = new JMSServerManagerImpl(hornetQServer, jmsConfig);
server.setContext(null);
return server;
}
+ @Override
protected void tearDown() throws Exception {
server.stop();
}
Modified: trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/stomp/StompStressTest.java
===================================================================
--- trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/stomp/StompStressTest.java 2011-12-08 12:41:11 UTC (rev 11877)
+++ trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/stomp/StompStressTest.java 2011-12-08 12:41:42 UTC (rev 11878)
@@ -25,8 +25,6 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.CoreQueueConfiguration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
-import org.hornetq.core.logging.Logger;
import org.hornetq.core.protocol.stomp.Stomp;
import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
@@ -38,17 +36,15 @@
public class StompStressTest extends UnitTestCase
{
- private static final transient Logger log = Logger.getLogger(StompStressTest.class);
-
private static final int COUNT = 1000;
- private int port = 61613;
+ private final int port = 61613;
private Socket stompSocket;
private ByteArrayOutputStream inputBuffer;
- private String destination = "stomp.stress.queue";
+ private final String destination = "stomp.stress.queue";
private HornetQServer server;
@@ -86,6 +82,7 @@
// Implementation methods
// -------------------------------------------------------------------------
+ @Override
protected void setUp() throws Exception
{
super.setUp();
@@ -110,17 +107,16 @@
config.getAcceptorConfigurations().add(stompTransport);
config.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
config.getQueueConfigurations().add(new CoreQueueConfiguration(destination, destination, null, false));
- return HornetQServers.newHornetQServer(config);
+ return addServer(HornetQServers.newHornetQServer(config));
}
+ @Override
protected void tearDown() throws Exception
{
if (stompSocket != null)
{
stompSocket.close();
}
- server.stop();
-
super.tearDown();
}
@@ -133,9 +129,9 @@
{
byte[] bytes = data.getBytes("UTF-8");
OutputStream outputStream = stompSocket.getOutputStream();
- for (int i = 0; i < bytes.length; i++)
+ for (byte b : bytes)
{
- outputStream.write(bytes[i]);
+ outputStream.write(b);
}
outputStream.flush();
}
@@ -143,9 +139,9 @@
public void sendFrame(byte[] data) throws Exception
{
OutputStream outputStream = stompSocket.getOutputStream();
- for (int i = 0; i < data.length; i++)
+ for (byte element : data)
{
- outputStream.write(data[i]);
+ outputStream.write(element);
}
outputStream.flush();
}
12 years, 5 months
JBoss hornetq SVN: r11877 - trunk/hornetq-core/src/test/java/org/hornetq/tests/util.
by do-not-reply@jboss.org
Author: borges
Date: 2011-12-08 07:41:11 -0500 (Thu, 08 Dec 2011)
New Revision: 11877
Modified:
trunk/hornetq-core/src/test/java/org/hornetq/tests/util/ServiceTestBase.java
trunk/hornetq-core/src/test/java/org/hornetq/tests/util/UnitTestCase.java
Log:
Move closing of locators from ServiceT.B. to UnitTestCase
Modified: trunk/hornetq-core/src/test/java/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- trunk/hornetq-core/src/test/java/org/hornetq/tests/util/ServiceTestBase.java 2011-12-08 11:18:04 UTC (rev 11876)
+++ trunk/hornetq-core/src/test/java/org/hornetq/tests/util/ServiceTestBase.java 2011-12-08 12:41:11 UTC (rev 11877)
@@ -80,7 +80,6 @@
protected static final String NETTY_CONNECTOR_FACTORY = NettyConnectorFactory.class.getCanonicalName();
- private final Collection<ServerLocator> locators = new ArrayList<ServerLocator>();
private final Collection<ClientSessionFactory> sessionFactories = new ArrayList<ClientSessionFactory>();
private final Collection<ClientSession> clientSessions = new ArrayList<ClientSession>();
@@ -106,21 +105,6 @@
/**
*
*/
- protected void closeAllServerLocatorsFactories()
- {
- synchronized (locators)
- {
- for (ServerLocator locator : locators)
- {
- closeServerLocator(locator);
- }
- locators.clear();
- }
- }
-
- /**
- *
- */
protected void closeAllSessionFactories()
{
synchronized (sessionFactories)
@@ -722,15 +706,6 @@
}
- protected ServerLocator addServerLocator(ServerLocator locator)
- {
- synchronized (locators)
- {
- locators.add(locator);
- }
- return locator;
- }
-
protected ServerLocator createInVMLocator(final int serverID)
{
TransportConfiguration tnspConfig = createInVMTransportConnectorConfig(serverID, UUIDGenerator.getInstance().generateStringUUID());
Modified: trunk/hornetq-core/src/test/java/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- trunk/hornetq-core/src/test/java/org/hornetq/tests/util/UnitTestCase.java 2011-12-08 11:18:04 UTC (rev 11876)
+++ trunk/hornetq-core/src/test/java/org/hornetq/tests/util/UnitTestCase.java 2011-12-08 12:41:11 UTC (rev 11877)
@@ -127,6 +127,7 @@
private static Set<Thread> alreadyFailedThread = new HashSet<Thread>();
private final Collection<HornetQServer> servers = new ArrayList<HornetQServer>();
+ private final Collection<ServerLocator> locators = new ArrayList<ServerLocator>();
private boolean checkThread = true;
@@ -1462,6 +1463,27 @@
}
}
+ protected final ServerLocator addServerLocator(ServerLocator locator)
+ {
+ synchronized (locators)
+ {
+ locators.add(locator);
+ }
+ return locator;
+ }
+
+ protected void closeAllServerLocatorsFactories()
+ {
+ synchronized (locators)
+ {
+ for (ServerLocator locator : locators)
+ {
+ closeServerLocator(locator);
+ }
+ locators.clear();
+ }
+ }
+
public static final void closeServerLocator(ServerLocator locator)
{
if (locator == null)
12 years, 5 months
JBoss hornetq SVN: r11876 - trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution.
by do-not-reply@jboss.org
Author: borges
Date: 2011-12-08 06:18:04 -0500 (Thu, 08 Dec 2011)
New Revision: 11876
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
Log:
Keep track of created server, server locators etc.
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-12-08 05:13:35 UTC (rev 11875)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-12-08 11:18:04 UTC (rev 11876)
@@ -1415,6 +1415,7 @@
locators[node].setBlockOnNonDurableSend(true);
locators[node].setBlockOnDurableSend(true);
+ addServerLocator(locators[node]);
ClientSessionFactory sf = createSessionFactory(locators[node]);
ClientSession session = sf.createSession();
@@ -1443,11 +1444,11 @@
}
locators[node] = HornetQClient.createServerLocatorWithoutHA(serverTotc);
-
locators[node].setBlockOnNonDurableSend(true);
locators[node].setBlockOnDurableSend(true);
locators[node].setReconnectAttempts(reconnectAttempts);
- ClientSessionFactory sf = locators[node].createSessionFactory();
+ addServerLocator(locators[node]);
+ ClientSessionFactory sf = createSessionFactory(locators[node]);
sfs[node] = sf;
}
@@ -1478,8 +1479,8 @@
locators[node].setReconnectAttempts(-1);
locators[node].setBlockOnNonDurableSend(blocking);
locators[node].setBlockOnDurableSend(blocking);
-
- ClientSessionFactory sf = locators[node].createSessionFactory();
+ addServerLocator(locators[node]);
+ ClientSessionFactory sf = createSessionFactory(locators[node]);
sfs[node] = sf;
}
@@ -1534,7 +1535,7 @@
}
else
{
- server = HornetQServers.newHornetQServer(configuration);
+ server = addServer(HornetQServers.newHornetQServer(configuration));
}
}
else
@@ -1545,7 +1546,7 @@
}
else
{
- server = HornetQServers.newHornetQServer(configuration, false);
+ server = addServer(HornetQServers.newHornetQServer(configuration, false));
}
}
@@ -1592,7 +1593,7 @@
else
{
boolean enablePersistency = fileStorage ? true : configuration.isPersistenceEnabled();
- server = HornetQServers.newHornetQServer(configuration, enablePersistency);
+ server = addServer(HornetQServers.newHornetQServer(configuration, enablePersistency));
}
server.setIdentity(this.getClass().getSimpleName() + "/Backup(" + node + " of live " + liveNode + ")");
servers[node] = server;
@@ -1656,7 +1657,7 @@
}
else
{
- server = HornetQServers.newHornetQServer(configuration);
+ server = addServer(HornetQServers.newHornetQServer(configuration));
server.setIdentity("Server " + node);
}
}
@@ -1668,7 +1669,7 @@
}
else
{
- server = HornetQServers.newHornetQServer(configuration, false);
+ server = addServer(HornetQServers.newHornetQServer(configuration, false));
server.setIdentity("Server " + node);
}
}
@@ -1733,7 +1734,7 @@
else
{
boolean enablePersistency = fileStorage ? configuration.isPersistenceEnabled() : false;
- server = HornetQServers.newHornetQServer(configuration, enablePersistency);
+ server = addServer(HornetQServers.newHornetQServer(configuration, enablePersistency));
}
servers[node] = server;
}
12 years, 5 months
JBoss hornetq SVN: r11875 - in branches/Branch_2_2_EAP: tests/src/org/hornetq/tests/integration and 1 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-12-08 00:13:35 -0500 (Thu, 08 Dec 2011)
New Revision: 11875
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/InterceptorTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
Log:
https://issues.jboss.org/browse/JBPAPP-7673 - Changing behaviour on MessageConsumer to do a round trip when no message is on client buffer, so it will verify if the consumer is still connected before the message consumer returns null
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2011-12-08 03:05:11 UTC (rev 11874)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2011-12-08 05:13:35 UTC (rev 11875)
@@ -104,6 +104,8 @@
private volatile boolean closed;
private volatile int creditsToSend;
+
+ private volatile boolean failedOver;
private volatile Exception lastException;
@@ -165,7 +167,7 @@
// ClientConsumer implementation
// -----------------------------------------------------------------
- private ClientMessage receive(long timeout, final boolean forcingDelivery) throws HornetQException
+ private ClientMessage receive(final long timeout, final boolean forcingDelivery) throws HornetQException
{
checkClosed();
@@ -194,17 +196,14 @@
receiverThread = Thread.currentThread();
- if (timeout == 0)
- {
- // Effectively infinite
- timeout = Long.MAX_VALUE;
- }
-
+ // To verify if deliveryForced was already call
boolean deliveryForced = false;
+ // To control when to call deliveryForce
+ boolean callForceDelivery = false;
long start = -1;
- long toWait = timeout;
+ long toWait = timeout == 0 ? Long.MAX_VALUE : timeout;
try
{
@@ -231,13 +230,8 @@
// we only force delivery once per call to receive
if (!deliveryForced)
{
- if (isTrace)
- {
- log.trace("Forcing delivery");
- }
- session.forceDelivery(id, forceDeliveryCount++);
-
- deliveryForced = true;
+ callForceDelivery = true;
+ break;
}
}
@@ -262,6 +256,35 @@
}
}
+ if (failedOver)
+ {
+ if (m == null)
+ {
+ // if failed over and the buffer is null, we reset the state and try it again
+ failedOver = false;
+ deliveryForced = false;
+ toWait = timeout == 0 ? Long.MAX_VALUE : timeout;
+ continue;
+ }
+ else
+ {
+ failedOver = false;
+ }
+ }
+
+ if (callForceDelivery)
+ {
+ if (isTrace)
+ {
+ log.trace("Forcing delivery");
+ }
+ // JBPAPP-6030 - Calling forceDelivery outside of the lock to avoid distributed dead locks
+ session.forceDelivery(id, forceDeliveryCount++);
+ callForceDelivery = false;
+ deliveryForced = true;
+ continue;
+ }
+
if (m != null)
{
session.workDone();
@@ -351,19 +374,14 @@
public ClientMessage receive(final long timeout) throws HornetQException
{
- if (isBrowseOnly())
+ ClientMessage msg = receive(timeout, false);
+
+ if (msg == null && !closed)
{
- ClientMessage msg = receive(timeout, false);
- if (msg == null)
- {
- msg = receive(0, true);
- }
- return msg;
+ msg = receive(0, true);
}
- else
- {
- return receive(timeout, false);
- }
+
+ return msg;
}
public ClientMessage receive() throws HornetQException
@@ -465,6 +483,8 @@
lastAckedMessage = null;
creditsToSend = 0;
+
+ failedOver = true;
ackIndividually = false;
}
@@ -887,6 +907,8 @@
{
rateLimiter.limit();
}
+
+ failedOver = false;
synchronized (this)
{
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-12-08 03:05:11 UTC (rev 11874)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-12-08 05:13:35 UTC (rev 11875)
@@ -398,15 +398,8 @@
{
checkClosed();
- // JBPAPP-6030 - Using the executor to avoid distributed dead locks
- executor.execute(new Runnable()
- {
- public void run()
- {
- SessionForceConsumerDelivery request = new SessionForceConsumerDelivery(consumerID, sequence);
- channel.send(request);
- }
- });
+ SessionForceConsumerDelivery request = new SessionForceConsumerDelivery(consumerID, sequence);
+ channel.send(request);
}
public ClientConsumer createConsumer(final SimpleString queueName) throws HornetQException
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/InterceptorTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/InterceptorTest.java 2011-12-08 03:05:11 UTC (rev 11874)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/InterceptorTest.java 2011-12-08 05:13:35 UTC (rev 11875)
@@ -19,6 +19,7 @@
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.client.*;
+import org.hornetq.core.client.impl.ClientConsumerImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.impl.PacketImpl;
@@ -127,16 +128,38 @@
{
public boolean intercept(final Packet packet, final RemotingConnection connection) throws HornetQException
{
+ if (isForceDeliveryResponse(packet))
+ {
+ return true;
+ }
+
if (packet.getType() == PacketImpl.SESS_RECEIVE_MSG)
{
return false;
}
-
+
return true;
}
}
+ /**
+ * @param packet
+ */
+ private boolean isForceDeliveryResponse(final Packet packet)
+ {
+ if (packet.getType() == PacketImpl.SESS_RECEIVE_MSG)
+ {
+ SessionReceiveMessage msg = (SessionReceiveMessage) packet;
+ if (msg.getMessage().containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE))
+ {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
private class MyInterceptor5 implements Interceptor
{
private final String key;
@@ -224,6 +247,12 @@
public boolean intercept(final Packet packet, final RemotingConnection connection) throws HornetQException
{
+
+ if (isForceDeliveryResponse(packet))
+ {
+ return true;
+ }
+
if (packet.getType() == PacketImpl.SESS_RECEIVE_MSG)
{
SessionReceiveMessage p = (SessionReceiveMessage)packet;
@@ -262,6 +291,8 @@
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = session.createMessage(false);
+
+ message.putIntProperty("count", i);
message.putStringProperty(InterceptorTest.key, "apple");
@@ -275,7 +306,11 @@
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = consumer.receive(1000);
-
+
+ assertNotNull(message);
+
+ assertEquals(i, message.getIntProperty("count").intValue());
+
Assert.assertEquals("orange", message.getStringProperty(InterceptorTest.key));
}
@@ -413,7 +448,7 @@
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = session.createMessage(false);
-
+
producer.send(message);
}
@@ -422,7 +457,7 @@
session.start();
ClientMessage message = consumer.receive(100);
-
+
Assert.assertNull(message);
session.close();
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2011-12-08 03:05:11 UTC (rev 11874)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2011-12-08 05:13:35 UTC (rev 11875)
@@ -195,7 +195,7 @@
ClientMessage m = consumer.receive(1000);
assertNotNull(m);
System.out.println("received message " + i);
- // assertEquals(i, m.getIntProperty("counter").intValue());
+ // assertEquals(i, m.getIntProperty("counter").intValue());
}
}
@@ -271,10 +271,125 @@
crash(session);
endLatch.await(60, TimeUnit.SECONDS);
assertTrue("received only " + received.size(), received.size() == 500);
-
+
session.close();
}
+
+ public void testTimeoutOnFailoverConsumeBlocked() throws Exception
+ {
+ locator.setCallTimeout(5000);
+ locator.setBlockOnNonDurableSend(true);
+ locator.setConsumerWindowSize(0);
+ locator.setBlockOnDurableSend(true);
+ locator.setAckBatchSize(0);
+ locator.setBlockOnAcknowledge(true);
+ locator.setReconnectAttempts(-1);
+ locator.setRetryInterval(500);
+ locator.setAckBatchSize(0);
+ ((InVMNodeManager)nodeManager).failoverPause = 5000l;
+ ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal)locator.createSessionFactory();
+
+ final ClientSession session = createSession(sf, true, true);
+
+ session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
+
+ final ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
+
+ for (int i = 0; i < 500; i++)
+ {
+ ClientMessage message = session.createMessage(true);
+ message.putIntProperty("counter", i);
+ message.putBooleanProperty("end", i == 499);
+ producer.send(message);
+ }
+
+ final CountDownLatch latch = new CountDownLatch(1);
+ final CountDownLatch endLatch = new CountDownLatch(1);
+
+ final ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
+ session.start();
+
+ final Map<Integer, ClientMessage> received = new HashMap<Integer, ClientMessage>();
+
+ Thread t = new Thread()
+ {
+ public void run()
+ {
+ ClientMessage message = null;
+ try
+ {
+ while ((message = getMessage()) != null)
+ {
+ Integer counter = message.getIntProperty("counter");
+ received.put(counter, message);
+ try
+ {
+ log.info("acking message = id = " + message.getMessageID() +
+ ", counter = " +
+ message.getIntProperty("counter"));
+ message.acknowledge();
+ }
+ catch (HornetQException e)
+ {
+ e.printStackTrace();
+ continue;
+ }
+ log.info("Acked counter = " + counter);
+ if (counter.equals(10))
+ {
+ latch.countDown();
+ }
+ if (received.size() == 500)
+ {
+ endLatch.countDown();
+ }
+
+ if (message.getBooleanProperty("end"))
+ {
+ break;
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+
+ }
+
+ private ClientMessage getMessage()
+ {
+ while (true)
+ {
+ try
+ {
+ ClientMessage msg = consumer.receive(20000);
+ if (msg == null)
+ {
+ log.info("Returning null message on consuming");
+ }
+ return msg;
+ }
+ catch (Exception ignored)
+ {
+ // retry
+ ignored.printStackTrace();
+ }
+ }
+ }
+ };
+ t.start();
+ latch.await(10, TimeUnit.SECONDS);
+ log.info("crashing session");
+ crash(session);
+ endLatch.await(60, TimeUnit.SECONDS);
+ t.join();
+ assertTrue("received only " + received.size(), received.size() == 500);
+
+ session.close();
+ }
+
// https://issues.jboss.org/browse/HORNETQ-685
public void testTimeoutOnFailoverTransactionCommit() throws Exception
{
12 years, 5 months
JBoss hornetq SVN: r11874 - trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2011-12-07 22:05:11 -0500 (Wed, 07 Dec 2011)
New Revision: 11874
Added:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompV11Test.java
Removed:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
Log:
test name correction
Deleted: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-12-07 20:53:32 UTC (rev 11873)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java 2011-12-08 03:05:11 UTC (rev 11874)
@@ -1,2221 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.hornetq.tests.integration.stomp.v11;
-
-import java.io.IOException;
-import java.nio.channels.ClosedChannelException;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.BytesMessage;
-import javax.jms.DeliveryMode;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.TextMessage;
-
-import junit.framework.Assert;
-
-import org.hornetq.core.logging.Logger;
-import org.hornetq.tests.integration.stomp.util.ClientStompFrame;
-import org.hornetq.tests.integration.stomp.util.StompClientConnection;
-import org.hornetq.tests.integration.stomp.util.StompClientConnectionFactory;
-import org.hornetq.tests.integration.stomp.util.StompClientConnectionV11;
-
-/*
- *
- */
-public class StompTestV11 extends StompTestBase2
-{
- private static final transient Logger log = Logger.getLogger(StompTestV11.class);
-
- private StompClientConnection connV11;
-
- protected void setUp() throws Exception
- {
- super.setUp();
- connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
- }
-
- protected void tearDown() throws Exception
- {
- System.out.println("Connection 11 : " + connV11.isConnected());
- if (connV11.isConnected())
- {
- connV11.disconnect();
- }
- super.tearDown();
- }
-
- public void testConnection() throws Exception
- {
- StompClientConnection connection = StompClientConnectionFactory.createClientConnection("1.0", hostname, port);
-
- connection.connect(defUser, defPass);
-
- assertTrue(connection.isConnected());
-
- assertEquals("1.0", connection.getVersion());
-
- connection.disconnect();
-
- connection = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
-
- connection.connect(defUser, defPass);
-
- assertTrue(connection.isConnected());
-
- assertEquals("1.1", connection.getVersion());
-
- connection.disconnect();
-
- connection = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
-
- connection.connect();
-
- assertFalse(connection.isConnected());
-
- //new way of connection
- StompClientConnectionV11 conn = (StompClientConnectionV11) StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
- conn.connect1(defUser, defPass);
-
- assertTrue(conn.isConnected());
-
- conn.disconnect();
- }
-
- public void testNegotiation() throws Exception
- {
- // case 1 accept-version absent. It is a 1.0 connect
- ClientStompFrame frame = connV11.createFrame("CONNECT");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
-
- ClientStompFrame reply = connV11.sendFrame(frame);
-
- assertEquals("CONNECTED", reply.getCommand());
-
- //reply headers: version, session, server
- assertEquals(null, reply.getHeader("version"));
-
- connV11.disconnect();
-
- // case 2 accept-version=1.0, result: 1.0
- connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
- frame = connV11.createFrame("CONNECT");
- frame.addHeader("accept-version", "1.0");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
-
- reply = connV11.sendFrame(frame);
-
- assertEquals("CONNECTED", reply.getCommand());
-
- //reply headers: version, session, server
- assertEquals("1.0", reply.getHeader("version"));
-
- connV11.disconnect();
-
- // case 3 accept-version=1.1, result: 1.1
- connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
- frame = connV11.createFrame("CONNECT");
- frame.addHeader("accept-version", "1.1");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
-
- reply = connV11.sendFrame(frame);
-
- assertEquals("CONNECTED", reply.getCommand());
-
- //reply headers: version, session, server
- assertEquals("1.1", reply.getHeader("version"));
-
- connV11.disconnect();
-
- // case 4 accept-version=1.0,1.1,1.2, result 1.1
- connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
- frame = connV11.createFrame("CONNECT");
- frame.addHeader("accept-version", "1.0,1.1,1.2");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
-
- reply = connV11.sendFrame(frame);
-
- assertEquals("CONNECTED", reply.getCommand());
-
- //reply headers: version, session, server
- assertEquals("1.1", reply.getHeader("version"));
-
- connV11.disconnect();
-
- // case 5 accept-version=1.2, result error
- connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
- frame = connV11.createFrame("CONNECT");
- frame.addHeader("accept-version", "1.2");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
-
- reply = connV11.sendFrame(frame);
-
- assertEquals("ERROR", reply.getCommand());
-
- System.out.println("Got error frame " + reply);
-
- }
-
- public void testSendAndReceive() throws Exception
- {
- connV11.connect(defUser, defPass);
- ClientStompFrame frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("content-type", "text/plain");
- frame.setBody("Hello World 1!");
-
- ClientStompFrame response = connV11.sendFrame(frame);
-
- assertNull(response);
-
- frame.addHeader("receipt", "1234");
- frame.setBody("Hello World 2!");
-
- response = connV11.sendFrame(frame);
-
- assertNotNull(response);
-
- assertEquals("RECEIPT", response.getCommand());
-
- assertEquals("1234", response.getHeader("receipt-id"));
-
- //subscribe
- StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
- newConn.connect(defUser, defPass);
-
- ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
- subFrame.addHeader("id", "a-sub");
- subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
- subFrame.addHeader("ack", "auto");
-
- newConn.sendFrame(subFrame);
-
- frame = newConn.receiveFrame();
-
- System.out.println("received " + frame);
-
- assertEquals("MESSAGE", frame.getCommand());
-
- assertEquals("a-sub", frame.getHeader("subscription"));
-
- assertNotNull(frame.getHeader("message-id"));
-
- assertEquals(getQueuePrefix() + getQueueName(), frame.getHeader("destination"));
-
- assertEquals("Hello World 1!", frame.getBody());
-
- frame = newConn.receiveFrame();
-
- System.out.println("received " + frame);
-
- //unsub
- ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
- unsubFrame.addHeader("id", "a-sub");
- newConn.sendFrame(unsubFrame);
-
- newConn.disconnect();
- }
-
- public void testHeaderContentType() throws Exception
- {
- connV11.connect(defUser, defPass);
- ClientStompFrame frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("content-type", "application/xml");
- frame.setBody("Hello World 1!");
-
- connV11.sendFrame(frame);
-
- //subscribe
- StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
- newConn.connect(defUser, defPass);
-
- ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
- subFrame.addHeader("id", "a-sub");
- subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
- subFrame.addHeader("ack", "auto");
-
- newConn.sendFrame(subFrame);
-
- frame = newConn.receiveFrame();
-
- System.out.println("received " + frame);
-
- assertEquals("MESSAGE", frame.getCommand());
-
- assertEquals("application/xml", frame.getHeader("content-type"));
-
- //unsub
- ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
- unsubFrame.addHeader("id", "a-sub");
-
- newConn.disconnect();
- }
-
- public void testHeaderContentLength() throws Exception
- {
- connV11.connect(defUser, defPass);
- ClientStompFrame frame = connV11.createFrame("SEND");
-
- String body = "Hello World 1!";
- String cLen = String.valueOf(body.getBytes("UTF-8").length);
-
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("content-type", "application/xml");
- frame.addHeader("content-length", cLen);
- frame.setBody(body + "extra");
-
- connV11.sendFrame(frame);
-
- //subscribe
- StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
- newConn.connect(defUser, defPass);
-
- ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
- subFrame.addHeader("id", "a-sub");
- subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
- subFrame.addHeader("ack", "auto");
-
- newConn.sendFrame(subFrame);
-
- frame = newConn.receiveFrame();
-
- System.out.println("received " + frame);
-
- assertEquals("MESSAGE", frame.getCommand());
-
- assertEquals(cLen, frame.getHeader("content-length"));
-
- //unsub
- ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
- unsubFrame.addHeader("id", "a-sub");
-
- newConn.disconnect();
- }
-
- public void testHeaderEncoding() throws Exception
- {
- connV11.connect(defUser, defPass);
- ClientStompFrame frame = connV11.createFrame("SEND");
-
- String body = "Hello World 1!";
- String cLen = String.valueOf(body.getBytes("UTF-8").length);
-
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("content-type", "application/xml");
- frame.addHeader("content-length", cLen);
- String hKey = "special-header\\\\\\n\\:";
- String hVal = "\\:\\\\\\ngood";
- frame.addHeader(hKey, hVal);
-
- System.out.println("key: |" + hKey + "| val: |" + hVal);
-
- frame.setBody(body);
-
- connV11.sendFrame(frame);
-
- //subscribe
- StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
- newConn.connect(defUser, defPass);
-
- ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
- subFrame.addHeader("id", "a-sub");
- subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
- subFrame.addHeader("ack", "auto");
-
- newConn.sendFrame(subFrame);
-
- frame = newConn.receiveFrame();
-
- System.out.println("received " + frame);
-
- assertEquals("MESSAGE", frame.getCommand());
-
- String value = frame.getHeader("special-header" + "\\" + "\n" + ":");
-
- assertEquals(":" + "\\" + "\n" + "good", value);
-
- //unsub
- ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
- unsubFrame.addHeader("id", "a-sub");
-
- newConn.disconnect();
- }
-
- public void testHeartBeat() throws Exception
- {
- //no heart beat at all if heat-beat absent
- ClientStompFrame frame = connV11.createFrame("CONNECT");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
-
- ClientStompFrame reply = connV11.sendFrame(frame);
-
- assertEquals("CONNECTED", reply.getCommand());
-
- Thread.sleep(5000);
-
- assertEquals(0, connV11.getFrameQueueSize());
-
- connV11.disconnect();
-
- //no heart beat for (0,0)
- connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
- frame = connV11.createFrame("CONNECT");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
- frame.addHeader("heart-beat", "0,0");
- frame.addHeader("accept-version", "1.0,1.1");
-
- reply = connV11.sendFrame(frame);
-
- assertEquals("CONNECTED", reply.getCommand());
-
- assertEquals("0,0", reply.getHeader("heart-beat"));
-
- Thread.sleep(5000);
-
- assertEquals(0, connV11.getFrameQueueSize());
-
- connV11.disconnect();
-
- //heart-beat (1,0), should receive a min client ping accepted by server
- connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
- frame = connV11.createFrame("CONNECT");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
- frame.addHeader("heart-beat", "1,0");
- frame.addHeader("accept-version", "1.0,1.1");
-
- reply = connV11.sendFrame(frame);
-
- assertEquals("CONNECTED", reply.getCommand());
-
- assertEquals("0,500", reply.getHeader("heart-beat"));
-
- Thread.sleep(2000);
-
- //now server side should be disconnected because we didn't send ping for 2 sec
- frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("content-type", "text/plain");
- frame.setBody("Hello World");
-
- //send will fail
- try
- {
- connV11.sendFrame(frame);
- fail("connection should have been destroyed by now");
- }
- catch (IOException e)
- {
- //ignore
- }
-
- //heart-beat (1,0), start a ping, then send a message, should be ok.
- connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
- frame = connV11.createFrame("CONNECT");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
- frame.addHeader("heart-beat", "1,0");
- frame.addHeader("accept-version", "1.0,1.1");
-
- reply = connV11.sendFrame(frame);
-
- assertEquals("CONNECTED", reply.getCommand());
-
- assertEquals("0,500", reply.getHeader("heart-beat"));
-
- System.out.println("========== start pinger!");
-
- connV11.startPinger(500);
-
- Thread.sleep(2000);
-
- //now server side should be disconnected because we didn't send ping for 2 sec
- frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("content-type", "text/plain");
- frame.setBody("Hello World");
-
- //send will be ok
- connV11.sendFrame(frame);
-
- connV11.stopPinger();
-
- connV11.disconnect();
-
- }
-
- //server ping
- public void testHeartBeat2() throws Exception
- {
- //heart-beat (1,1)
- ClientStompFrame frame = connV11.createFrame("CONNECT");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
- frame.addHeader("heart-beat", "1,1");
- frame.addHeader("accept-version", "1.0,1.1");
-
- ClientStompFrame reply = connV11.sendFrame(frame);
-
- assertEquals("CONNECTED", reply.getCommand());
- assertEquals("500,500", reply.getHeader("heart-beat"));
-
- connV11.disconnect();
-
- //heart-beat (500,1000)
- connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
- frame = connV11.createFrame("CONNECT");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
- frame.addHeader("heart-beat", "500,1000");
- frame.addHeader("accept-version", "1.0,1.1");
-
- reply = connV11.sendFrame(frame);
-
- assertEquals("CONNECTED", reply.getCommand());
-
- assertEquals("1000,500", reply.getHeader("heart-beat"));
-
- System.out.println("========== start pinger!");
-
- connV11.startPinger(500);
-
- Thread.sleep(10000);
-
- //now check the frame size
- int size = connV11.getServerPingNumber();
-
- System.out.println("ping received: " + size);
-
- assertTrue(size > 5);
-
- //now server side should be disconnected because we didn't send ping for 2 sec
- frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("content-type", "text/plain");
- frame.setBody("Hello World");
-
- //send will be ok
- connV11.sendFrame(frame);
-
- connV11.disconnect();
- }
-
- public void testSendWithHeartBeatsAndReceive() throws Exception
- {
- StompClientConnection newConn = null;
- try
- {
- ClientStompFrame frame = connV11.createFrame("CONNECT");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
- frame.addHeader("heart-beat", "500,1000");
- frame.addHeader("accept-version", "1.0,1.1");
-
- connV11.sendFrame(frame);
-
- connV11.startPinger(500);
-
- frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("content-type", "text/plain");
-
- for (int i = 0; i < 10; i++)
- {
- frame.setBody("Hello World " + i + "!");
- connV11.sendFrame(frame);
- Thread.sleep(500);
- }
-
- // subscribe
- newConn = StompClientConnectionFactory.createClientConnection("1.1",
- hostname, port);
- newConn.connect(defUser, defPass);
-
- ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
- subFrame.addHeader("id", "a-sub");
- subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
- subFrame.addHeader("ack", "auto");
-
- newConn.sendFrame(subFrame);
-
- int cnt = 0;
-
- frame = newConn.receiveFrame();
-
- while (frame != null)
- {
- cnt++;
- Thread.sleep(500);
- frame = newConn.receiveFrame(5000);
- }
-
- assertEquals(10, cnt);
-
- // unsub
- ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
- unsubFrame.addHeader("id", "a-sub");
- newConn.sendFrame(unsubFrame);
- }
- finally
- {
- if (newConn != null)
- newConn.disconnect();
- connV11.disconnect();
- }
- }
-
- public void testSendAndReceiveWithHeartBeats() throws Exception
- {
- connV11.connect(defUser, defPass);
- ClientStompFrame frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("content-type", "text/plain");
-
- for (int i = 0; i < 10; i++)
- {
- frame.setBody("Hello World " + i + "!");
- connV11.sendFrame(frame);
- Thread.sleep(500);
- }
-
- //subscribe
- StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
- try
- {
- frame = newConn.createFrame("CONNECT");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
- frame.addHeader("heart-beat", "500,1000");
- frame.addHeader("accept-version", "1.0,1.1");
-
- newConn.sendFrame(frame);
-
- newConn.startPinger(500);
-
- Thread.sleep(500);
-
- ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
- subFrame.addHeader("id", "a-sub");
- subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
- subFrame.addHeader("ack", "auto");
-
- newConn.sendFrame(subFrame);
-
- int cnt = 0;
-
- frame = newConn.receiveFrame();
-
- while (frame != null)
- {
- cnt++;
- Thread.sleep(500);
- frame = newConn.receiveFrame(5000);
- }
-
- assertEquals(10, cnt);
-
- // unsub
- ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
- unsubFrame.addHeader("id", "a-sub");
- newConn.sendFrame(unsubFrame);
- }
- finally
- {
- newConn.disconnect();
- }
- }
-
- public void testSendWithHeartBeatsAndReceiveWithHeartBeats() throws Exception
- {
- StompClientConnection newConn = null;
- try
- {
- ClientStompFrame frame = connV11.createFrame("CONNECT");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
- frame.addHeader("heart-beat", "500,1000");
- frame.addHeader("accept-version", "1.0,1.1");
-
- connV11.sendFrame(frame);
-
- connV11.startPinger(500);
-
- frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("content-type", "text/plain");
-
- for (int i = 0; i < 10; i++)
- {
- frame.setBody("Hello World " + i + "!");
- connV11.sendFrame(frame);
- Thread.sleep(500);
- }
-
- // subscribe
- newConn = StompClientConnectionFactory.createClientConnection("1.1",
- hostname, port);
- frame = newConn.createFrame("CONNECT");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
- frame.addHeader("heart-beat", "500,1000");
- frame.addHeader("accept-version", "1.0,1.1");
-
- newConn.sendFrame(frame);
-
- newConn.startPinger(500);
-
- Thread.sleep(500);
-
- ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
- subFrame.addHeader("id", "a-sub");
- subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
- subFrame.addHeader("ack", "auto");
-
- newConn.sendFrame(subFrame);
-
- int cnt = 0;
-
- frame = newConn.receiveFrame();
-
- while (frame != null)
- {
- cnt++;
- Thread.sleep(500);
- frame = newConn.receiveFrame(5000);
- }
- assertEquals(10, cnt);
-
- // unsub
- ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
- unsubFrame.addHeader("id", "a-sub");
- newConn.sendFrame(unsubFrame);
- }
- finally
- {
- if (newConn != null)
- newConn.disconnect();
- connV11.disconnect();
- }
- }
-
- public void testNack() throws Exception
- {
- connV11.connect(defUser, defPass);
-
- subscribe(connV11, "sub1", "client");
-
- sendMessage(getName());
-
- ClientStompFrame frame = connV11.receiveFrame();
-
- String messageID = frame.getHeader("message-id");
-
- System.out.println("Received message with id " + messageID);
-
- nack(connV11, "sub1", messageID);
-
- unsubscribe(connV11, "sub1");
-
- connV11.disconnect();
-
- //Nack makes the message be dropped.
- MessageConsumer consumer = session.createConsumer(queue);
- Message message = consumer.receive(1000);
- Assert.assertNull(message);
- }
-
- public void testNackWithWrongSubId() throws Exception
- {
- connV11.connect(defUser, defPass);
-
- subscribe(connV11, "sub1", "client");
-
- sendMessage(getName());
-
- ClientStompFrame frame = connV11.receiveFrame();
-
- String messageID = frame.getHeader("message-id");
-
- System.out.println("Received message with id " + messageID);
-
- nack(connV11, "sub2", messageID);
-
- ClientStompFrame error = connV11.receiveFrame();
-
- System.out.println("Receiver error: " + error);
-
- unsubscribe(connV11, "sub1");
-
- connV11.disconnect();
-
- //message should be still there
- MessageConsumer consumer = session.createConsumer(queue);
- Message message = consumer.receive(1000);
- Assert.assertNotNull(message);
- }
-
- public void testNackWithWrongMessageId() throws Exception
- {
- connV11.connect(defUser, defPass);
-
- subscribe(connV11, "sub1", "client");
-
- sendMessage(getName());
-
- ClientStompFrame frame = connV11.receiveFrame();
-
- String messageID = frame.getHeader("message-id");
-
- System.out.println("Received message with id " + messageID);
-
- nack(connV11, "sub2", "someother");
-
- ClientStompFrame error = connV11.receiveFrame();
-
- System.out.println("Receiver error: " + error);
-
- unsubscribe(connV11, "sub1");
-
- connV11.disconnect();
-
- //message should still there
- MessageConsumer consumer = session.createConsumer(queue);
- Message message = consumer.receive(1000);
- Assert.assertNotNull(message);
- }
-
-
- public void testAck() throws Exception
- {
- connV11.connect(defUser, defPass);
-
- subscribe(connV11, "sub1", "client");
-
- sendMessage(getName());
-
- ClientStompFrame frame = connV11.receiveFrame();
-
- String messageID = frame.getHeader("message-id");
-
- System.out.println("Received message with id " + messageID);
-
- ack(connV11, "sub1", messageID, null);
-
- unsubscribe(connV11, "sub1");
-
- connV11.disconnect();
-
- //Nack makes the message be dropped.
- MessageConsumer consumer = session.createConsumer(queue);
- Message message = consumer.receive(1000);
- Assert.assertNull(message);
- }
-
- public void testAckWithWrongSubId() throws Exception
- {
- connV11.connect(defUser, defPass);
-
- subscribe(connV11, "sub1", "client");
-
- sendMessage(getName());
-
- ClientStompFrame frame = connV11.receiveFrame();
-
- String messageID = frame.getHeader("message-id");
-
- System.out.println("Received message with id " + messageID);
-
- ack(connV11, "sub2", messageID, null);
-
- ClientStompFrame error = connV11.receiveFrame();
-
- System.out.println("Receiver error: " + error);
-
- unsubscribe(connV11, "sub1");
-
- connV11.disconnect();
-
- //message should be still there
- MessageConsumer consumer = session.createConsumer(queue);
- Message message = consumer.receive(1000);
- Assert.assertNotNull(message);
- }
-
- public void testAckWithWrongMessageId() throws Exception
- {
- connV11.connect(defUser, defPass);
-
- subscribe(connV11, "sub1", "client");
-
- sendMessage(getName());
-
- ClientStompFrame frame = connV11.receiveFrame();
-
- String messageID = frame.getHeader("message-id");
-
- System.out.println("Received message with id " + messageID);
-
- ack(connV11, "sub2", "someother", null);
-
- ClientStompFrame error = connV11.receiveFrame();
-
- System.out.println("Receiver error: " + error);
-
- unsubscribe(connV11, "sub1");
-
- connV11.disconnect();
-
- //message should still there
- MessageConsumer consumer = session.createConsumer(queue);
- Message message = consumer.receive(1000);
- Assert.assertNotNull(message);
- }
-
- public void testErrorWithReceipt() throws Exception
- {
- connV11.connect(defUser, defPass);
-
- subscribe(connV11, "sub1", "client");
-
- sendMessage(getName());
-
- ClientStompFrame frame = connV11.receiveFrame();
-
- String messageID = frame.getHeader("message-id");
-
- System.out.println("Received message with id " + messageID);
-
- ClientStompFrame ackFrame = connV11.createFrame("ACK");
- //give it a wrong sub id
- ackFrame.addHeader("subscription", "sub2");
- ackFrame.addHeader("message-id", messageID);
- ackFrame.addHeader("receipt", "answer-me");
-
- ClientStompFrame error = connV11.sendFrame(ackFrame);
-
- System.out.println("Receiver error: " + error);
-
- assertEquals("ERROR", error.getCommand());
-
- assertEquals("answer-me", error.getHeader("receipt-id"));
-
- unsubscribe(connV11, "sub1");
-
- connV11.disconnect();
-
- //message should still there
- MessageConsumer consumer = session.createConsumer(queue);
- Message message = consumer.receive(1000);
- Assert.assertNotNull(message);
- }
-
- public void testErrorWithReceipt2() throws Exception
- {
- connV11.connect(defUser, defPass);
-
- subscribe(connV11, "sub1", "client");
-
- sendMessage(getName());
-
- ClientStompFrame frame = connV11.receiveFrame();
-
- String messageID = frame.getHeader("message-id");
-
- System.out.println("Received message with id " + messageID);
-
- ClientStompFrame ackFrame = connV11.createFrame("ACK");
- //give it a wrong sub id
- ackFrame.addHeader("subscription", "sub1");
- ackFrame.addHeader("message-id", String.valueOf(Long.valueOf(messageID) + 1));
- ackFrame.addHeader("receipt", "answer-me");
-
- ClientStompFrame error = connV11.sendFrame(ackFrame);
-
- System.out.println("Receiver error: " + error);
-
- assertEquals("ERROR", error.getCommand());
-
- assertEquals("answer-me", error.getHeader("receipt-id"));
-
- unsubscribe(connV11, "sub1");
-
- connV11.disconnect();
-
- //message should still there
- MessageConsumer consumer = session.createConsumer(queue);
- Message message = consumer.receive(1000);
- Assert.assertNotNull(message);
- }
-
- public void testAckModeClient() throws Exception
- {
- connV11.connect(defUser, defPass);
-
- subscribe(connV11, "sub1", "client");
-
- int num = 50;
- //send a bunch of messages
- for (int i = 0; i < num; i++)
- {
- this.sendMessage("client-ack" + i);
- }
-
- ClientStompFrame frame = null;
-
- for (int i = 0; i < num; i++)
- {
- frame = connV11.receiveFrame();
- assertNotNull(frame);
- }
-
- //ack the last
- this.ack(connV11, "sub1", frame);
-
- unsubscribe(connV11, "sub1");
-
- connV11.disconnect();
-
- //no messages can be received.
- MessageConsumer consumer = session.createConsumer(queue);
- Message message = consumer.receive(1000);
- Assert.assertNull(message);
- }
-
- public void testAckModeClient2() throws Exception
- {
- connV11.connect(defUser, defPass);
-
- subscribe(connV11, "sub1", "client");
-
- int num = 50;
- //send a bunch of messages
- for (int i = 0; i < num; i++)
- {
- this.sendMessage("client-ack" + i);
- }
-
- ClientStompFrame frame = null;
-
- for (int i = 0; i < num; i++)
- {
- frame = connV11.receiveFrame();
- assertNotNull(frame);
-
- //ack the 49th
- if (i == num - 2)
- {
- this.ack(connV11, "sub1", frame);
- }
- }
-
- unsubscribe(connV11, "sub1");
-
- connV11.disconnect();
-
- //no messages can be received.
- MessageConsumer consumer = session.createConsumer(queue);
- Message message = consumer.receive(1000);
- Assert.assertNotNull(message);
- message = consumer.receive(1000);
- Assert.assertNull(message);
- }
-
- public void testAckModeAuto() throws Exception
- {
- connV11.connect(defUser, defPass);
-
- subscribe(connV11, "sub1", "auto");
-
- int num = 50;
- //send a bunch of messages
- for (int i = 0; i < num; i++)
- {
- this.sendMessage("auto-ack" + i);
- }
-
- ClientStompFrame frame = null;
-
- for (int i = 0; i < num; i++)
- {
- frame = connV11.receiveFrame();
- assertNotNull(frame);
- }
-
- unsubscribe(connV11, "sub1");
-
- connV11.disconnect();
-
- //no messages can be received.
- MessageConsumer consumer = session.createConsumer(queue);
- Message message = consumer.receive(1000);
- Assert.assertNull(message);
- }
-
- public void testAckModeClientIndividual() throws Exception
- {
- connV11.connect(defUser, defPass);
-
- subscribe(connV11, "sub1", "client-individual");
-
- int num = 50;
- //send a bunch of messages
- for (int i = 0; i < num; i++)
- {
- this.sendMessage("client-individual-ack" + i);
- }
-
- ClientStompFrame frame = null;
-
- for (int i = 0; i < num; i++)
- {
- frame = connV11.receiveFrame();
- assertNotNull(frame);
-
- System.out.println(i + " == received: " + frame);
- //ack on even numbers
- if (i%2 == 0)
- {
- this.ack(connV11, "sub1", frame);
- }
- }
-
- unsubscribe(connV11, "sub1");
-
- connV11.disconnect();
-
- //no messages can be received.
- MessageConsumer consumer = session.createConsumer(queue);
-
- TextMessage message = null;
- for (int i = 0; i < num/2; i++)
- {
- message = (TextMessage) consumer.receive(1000);
- Assert.assertNotNull(message);
- System.out.println("Legal: " + message.getText());
- }
-
- message = (TextMessage) consumer.receive(1000);
-
- Assert.assertNull(message);
- }
-
- public void testTwoSubscribers() throws Exception
- {
- connV11.connect(defUser, defPass, "myclientid");
-
- this.subscribeTopic(connV11, "sub1", "auto", null);
-
- StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
- newConn.connect(defUser, defPass, "myclientid2");
-
- this.subscribeTopic(newConn, "sub2", "auto", null);
-
- ClientStompFrame frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getTopicPrefix() + getTopicName());
-
- frame.setBody("Hello World");
-
- connV11.sendFrame(frame);
-
- // receive message from socket
- frame = connV11.receiveFrame(1000);
-
- System.out.println("received frame : " + frame);
- assertEquals("Hello World", frame.getBody());
- assertEquals("sub1", frame.getHeader("subscription"));
-
- frame = newConn.receiveFrame(1000);
-
- System.out.println("received 2 frame : " + frame);
- assertEquals("Hello World", frame.getBody());
- assertEquals("sub2", frame.getHeader("subscription"));
-
- // remove suscription
- this.unsubscribe(connV11, "sub1", true);
- this.unsubscribe(newConn, "sub2", true);
-
- connV11.disconnect();
- newConn.disconnect();
- }
-
- public void testSendAndReceiveOnDifferentConnections() throws Exception
- {
- connV11.connect(defUser, defPass);
-
- ClientStompFrame sendFrame = connV11.createFrame("SEND");
- sendFrame.addHeader("destination", getQueuePrefix() + getQueueName());
- sendFrame.setBody("Hello World");
-
- connV11.sendFrame(sendFrame);
-
- StompClientConnection connV11_2 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
- connV11_2.connect(defUser, defPass);
-
- this.subscribe(connV11_2, "sub1", "auto");
-
- ClientStompFrame frame = connV11_2.receiveFrame(2000);
-
- assertEquals("MESSAGE", frame.getCommand());
- assertEquals("Hello World", frame.getBody());
-
- connV11.disconnect();
- connV11_2.disconnect();
- }
-
- //----------------Note: tests below are adapted from StompTest
-
- public void testBeginSameTransactionTwice() throws Exception
- {
- connV11.connect(defUser, defPass);
-
- beginTransaction(connV11, "tx1");
-
- beginTransaction(connV11, "tx1");
-
- ClientStompFrame f = connV11.receiveFrame();
- Assert.assertTrue(f.getCommand().equals("ERROR"));
- }
-
- public void testBodyWithUTF8() throws Exception
- {
- connV11.connect(defUser, defPass);
-
- this.subscribe(connV11, getName(), "auto");
-
- String text = "A" + "\u00ea" + "\u00f1" + "\u00fc" + "C";
- System.out.println(text);
- sendMessage(text);
-
- ClientStompFrame frame = connV11.receiveFrame();
- System.out.println(frame);
- Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
- Assert.assertNotNull(frame.getHeader("destination"));
- Assert.assertTrue(frame.getBody().equals(text));
-
- connV11.disconnect();
- }
-
- public void testClientAckNotPartOfTransaction() throws Exception
- {
- connV11.connect(defUser, defPass);
-
- this.subscribe(connV11, getName(), "client");
-
- sendMessage(getName());
-
- ClientStompFrame frame = connV11.receiveFrame();
-
- Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
- Assert.assertNotNull(frame.getHeader("destination"));
- Assert.assertTrue(frame.getBody().equals(getName()));
- Assert.assertNotNull(frame.getHeader("message-id"));
-
- String messageID = frame.getHeader("message-id");
-
- beginTransaction(connV11, "tx1");
-
- this.ack(connV11, getName(), messageID, "tx1");
-
- abortTransaction(connV11, "tx1");
-
- frame = connV11.receiveFrame();
-
- assertNull(frame);
-
- this.unsubscribe(connV11, getName());
-
- connV11.disconnect();
- }
-
- public void testDisconnectAndError() throws Exception
- {
- connV11.connect(defUser, defPass);
-
- this.subscribe(connV11, getName(), "client");
-
- ClientStompFrame frame = connV11.createFrame("DISCONNECT");
- frame.addHeader("receipt", "1");
-
- ClientStompFrame result = connV11.sendFrame(frame);
-
- if (result == null || (!"RECEIPT".equals(result.getCommand())) || (!"1".equals(result.getHeader("receipt-id"))))
- {
- fail("Disconnect failed! " + result);
- }
-
- // sending a message will result in an error
- ClientStompFrame sendFrame = connV11.createFrame("SEND");
- sendFrame.addHeader("destination", getQueuePrefix() + getQueueName());
- sendFrame.setBody("Hello World");
-
- try
- {
- connV11.sendFrame(sendFrame);
- fail("connection should have been closed by server.");
- }
- catch (ClosedChannelException e)
- {
- //ok.
- }
-
- connV11.destroy();
- }
-
- public void testDurableSubscriber() throws Exception
- {
- connV11.connect(defUser, defPass);
-
- this.subscribe(connV11, "sub1", "client", getName());
-
- this.subscribe(connV11, "sub1", "client", getName());
-
- ClientStompFrame frame = connV11.receiveFrame();
- Assert.assertTrue(frame.getCommand().equals("ERROR"));
-
- connV11.disconnect();
- }
-
- public void testDurableSubscriberWithReconnection() throws Exception
- {
- connV11.connect(defUser, defPass, "myclientid");
-
- this.subscribeTopic(connV11, "sub1", "auto", getName());
-
- ClientStompFrame frame = connV11.createFrame("DISCONNECT");
- frame.addHeader("receipt", "1");
-
- ClientStompFrame result = connV11.sendFrame(frame);
-
- if (result == null || (!"RECEIPT".equals(result.getCommand())) || (!"1".equals(result.getHeader("receipt-id"))))
- {
- fail("Disconnect failed! " + result);
- }
-
- // send the message when the durable subscriber is disconnected
- sendMessage(getName(), topic);
-
- connV11.destroy();
- connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
- connV11.connect(defUser, defPass, "myclientid");
-
- this.subscribeTopic(connV11, "sub1", "auto", getName());
-
- // we must have received the message
- frame = connV11.receiveFrame();
-
- Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
- Assert.assertNotNull(frame.getHeader("destination"));
- Assert.assertEquals(getName(), frame.getBody());
-
- this.unsubscribe(connV11, "sub1");
-
- connV11.disconnect();
- }
-
- public void testJMSXGroupIdCanBeSet() throws Exception
- {
- MessageConsumer consumer = session.createConsumer(queue);
-
- connV11.connect(defUser, defPass);
-
- ClientStompFrame frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("JMSXGroupID", "TEST");
- frame.setBody("Hello World");
-
- connV11.sendFrame(frame);
-
- TextMessage message = (TextMessage)consumer.receive(1000);
- Assert.assertNotNull(message);
- Assert.assertEquals("Hello World", message.getText());
- // differ from StompConnect
- Assert.assertEquals("TEST", message.getStringProperty("JMSXGroupID"));
- }
-
- public void testMessagesAreInOrder() throws Exception
- {
- int ctr = 10;
- String[] data = new String[ctr];
-
- connV11.connect(defUser, defPass);
-
- this.subscribe(connV11, "sub1", "auto");
-
- for (int i = 0; i < ctr; ++i)
- {
- data[i] = getName() + i;
- sendMessage(data[i]);
- }
-
- ClientStompFrame frame = null;
-
- for (int i = 0; i < ctr; ++i)
- {
- frame = connV11.receiveFrame();
- Assert.assertTrue("Message not in order", frame.getBody().equals(data[i]));
- }
-
- for (int i = 0; i < ctr; ++i)
- {
- data[i] = getName() + ":second:" + i;
- sendMessage(data[i]);
- }
-
- for (int i = 0; i < ctr; ++i)
- {
- frame = connV11.receiveFrame();
- Assert.assertTrue("Message not in order", frame.getBody().equals(data[i]));
- }
-
- connV11.disconnect();
- }
-
- public void testSubscribeWithAutoAckAndSelector() throws Exception
- {
- connV11.connect(defUser, defPass);
-
- this.subscribe(connV11, "sub1", "auto", null, "foo = 'zzz'");
-
- sendMessage("Ignored message", "foo", "1234");
- sendMessage("Real message", "foo", "zzz");
-
- ClientStompFrame frame = connV11.receiveFrame();
-
- Assert.assertTrue("Should have received the real message but got: " + frame, frame.getBody().equals("Real message"));
-
- connV11.disconnect();
- }
-
- public void testRedeliveryWithClientAck() throws Exception
- {
- connV11.connect(defUser, defPass);
-
- this.subscribe(connV11, "subId", "client");
-
- sendMessage(getName());
-
- ClientStompFrame frame = connV11.receiveFrame();
-
- assertTrue(frame.getCommand().equals("MESSAGE"));
-
- connV11.disconnect();
-
- // message should be received since message was not acknowledged
- MessageConsumer consumer = session.createConsumer(queue);
- Message message = consumer.receive(1000);
- Assert.assertNotNull(message);
- Assert.assertTrue(message.getJMSRedelivered());
- }
-
- public void testSendManyMessages() throws Exception
- {
- MessageConsumer consumer = session.createConsumer(queue);
-
- connV11.connect(defUser, defPass);
-
- int count = 1000;
- final CountDownLatch latch = new CountDownLatch(count);
- consumer.setMessageListener(new MessageListener()
- {
- public void onMessage(Message arg0)
- {
- latch.countDown();
- }
- });
-
- ClientStompFrame frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.setBody("Hello World");
-
- for (int i = 1; i <= count; i++)
- {
- connV11.sendFrame(frame);
- }
-
- assertTrue(latch.await(60, TimeUnit.SECONDS));
-
- connV11.disconnect();
- }
-
- public void testSendMessage() throws Exception
- {
- MessageConsumer consumer = session.createConsumer(queue);
-
- connV11.connect(defUser, defPass);
-
- ClientStompFrame frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.setBody("Hello World");
-
- connV11.sendFrame(frame);
-
- TextMessage message = (TextMessage)consumer.receive(1000);
- Assert.assertNotNull(message);
- Assert.assertEquals("Hello World", message.getText());
- // Assert default priority 4 is used when priority header is not set
- Assert.assertEquals("getJMSPriority", 4, message.getJMSPriority());
-
- // Make sure that the timestamp is valid - should
- // be very close to the current time.
- long tnow = System.currentTimeMillis();
- long tmsg = message.getJMSTimestamp();
- Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
- }
-
- public void testSendMessageWithContentLength() throws Exception
- {
- MessageConsumer consumer = session.createConsumer(queue);
-
- connV11.connect(defUser, defPass);
-
- byte[] data = new byte[] { 1, 0, 0, 4 };
-
- ClientStompFrame frame = connV11.createFrame("SEND");
-
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.setBody(new String(data, "UTF-8"));
-
- frame.addHeader("content-length", String.valueOf(data.length));
-
- connV11.sendFrame(frame);
-
- BytesMessage message = (BytesMessage)consumer.receive(10000);
- Assert.assertNotNull(message);
-
- assertEquals(data.length, message.getBodyLength());
- assertEquals(data[0], message.readByte());
- assertEquals(data[1], message.readByte());
- assertEquals(data[2], message.readByte());
- assertEquals(data[3], message.readByte());
- }
-
- public void testSendMessageWithCustomHeadersAndSelector() throws Exception
- {
- MessageConsumer consumer = session.createConsumer(queue, "foo = 'abc'");
-
- connV11.connect(defUser, defPass);
-
- ClientStompFrame frame = connV11.createFrame("SEND");
- frame.addHeader("foo", "abc");
- frame.addHeader("bar", "123");
-
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.setBody("Hello World");
-
- connV11.sendFrame(frame);
-
- TextMessage message = (TextMessage)consumer.receive(1000);
- Assert.assertNotNull(message);
- Assert.assertEquals("Hello World", message.getText());
- Assert.assertEquals("foo", "abc", message.getStringProperty("foo"));
- Assert.assertEquals("bar", "123", message.getStringProperty("bar"));
- }
-
- public void testSendMessageWithLeadingNewLine() throws Exception
- {
- MessageConsumer consumer = session.createConsumer(queue);
-
- connV11.connect(defUser, defPass);
-
- ClientStompFrame frame = connV11.createFrame("SEND");
-
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.setBody("Hello World");
-
- connV11.sendWickedFrame(frame);
-
- TextMessage message = (TextMessage)consumer.receive(1000);
- Assert.assertNotNull(message);
- Assert.assertEquals("Hello World", message.getText());
-
- // Make sure that the timestamp is valid - should
- // be very close to the current time.
- long tnow = System.currentTimeMillis();
- long tmsg = message.getJMSTimestamp();
- Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
-
- assertNull(consumer.receive(1000));
-
- connV11.disconnect();
- }
-
- public void testSendMessageWithReceipt() throws Exception
- {
- MessageConsumer consumer = session.createConsumer(queue);
-
- connV11.connect(defUser, defPass);
-
- ClientStompFrame frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("receipt", "1234");
- frame.setBody("Hello World");
-
- frame = connV11.sendFrame(frame);
-
- assertTrue(frame.getCommand().equals("RECEIPT"));
- assertEquals("1234", frame.getHeader("receipt-id"));
-
- TextMessage message = (TextMessage)consumer.receive(1000);
- Assert.assertNotNull(message);
- Assert.assertEquals("Hello World", message.getText());
-
- // Make sure that the timestamp is valid - should
- // be very close to the current time.
- long tnow = System.currentTimeMillis();
- long tmsg = message.getJMSTimestamp();
- Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
-
- connV11.disconnect();
- }
-
- public void testSendMessageWithStandardHeaders() throws Exception
- {
- MessageConsumer consumer = session.createConsumer(queue);
-
- connV11.connect(defUser, defPass);
-
- ClientStompFrame frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("correlation-id", "c123");
- frame.addHeader("persistent", "true");
- frame.addHeader("priority", "3");
- frame.addHeader("type", "t345");
- frame.addHeader("JMSXGroupID", "abc");
- frame.addHeader("foo", "abc");
- frame.addHeader("bar", "123");
-
- frame.setBody("Hello World");
-
- frame = connV11.sendFrame(frame);
-
- TextMessage message = (TextMessage)consumer.receive(1000);
- Assert.assertNotNull(message);
- Assert.assertEquals("Hello World", message.getText());
- Assert.assertEquals("JMSCorrelationID", "c123", message.getJMSCorrelationID());
- Assert.assertEquals("getJMSType", "t345", message.getJMSType());
- Assert.assertEquals("getJMSPriority", 3, message.getJMSPriority());
- Assert.assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode());
- Assert.assertEquals("foo", "abc", message.getStringProperty("foo"));
- Assert.assertEquals("bar", "123", message.getStringProperty("bar"));
-
- Assert.assertEquals("JMSXGroupID", "abc", message.getStringProperty("JMSXGroupID"));
-
- connV11.disconnect();
- }
-
- public void testSubscribeToTopic() throws Exception
- {
- connV11.connect(defUser, defPass);
-
- this.subscribeTopic(connV11, "sub1", null, null, true);
-
- sendMessage(getName(), topic);
-
- ClientStompFrame frame = connV11.receiveFrame();
-
- Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
- Assert.assertTrue(frame.getHeader("destination").equals(getTopicPrefix() + getTopicName()));
- Assert.assertTrue(frame.getBody().equals(getName()));
-
- this.unsubscribe(connV11, "sub1", true);
-
- sendMessage(getName(), topic);
-
- frame = connV11.receiveFrame(1000);
- assertNull(frame);
-
- connV11.disconnect();
- }
-
- public void testSubscribeToTopicWithNoLocal() throws Exception
- {
- connV11.connect(defUser, defPass);
-
- this.subscribeTopic(connV11, "sub1", null, null, true, true);
-
- // send a message on the same connection => it should not be received
- ClientStompFrame frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getTopicPrefix() + getTopicName());
-
- frame.setBody("Hello World");
-
- connV11.sendFrame(frame);
-
- frame = connV11.receiveFrame(2000);
-
- assertNull(frame);
-
- // send message on another JMS connection => it should be received
- sendMessage(getName(), topic);
-
- frame = connV11.receiveFrame();
-
- Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
- Assert.assertTrue(frame.getHeader("destination").equals(getTopicPrefix() + getTopicName()));
- Assert.assertTrue(frame.getBody().equals(getName()));
-
- this.unsubscribe(connV11, "sub1");
-
- connV11.disconnect();
- }
-
- public void testSubscribeWithAutoAck() throws Exception
- {
- connV11.connect(defUser, defPass);
-
- this.subscribe(connV11, "sub1", "auto");
-
- sendMessage(getName());
-
- ClientStompFrame frame = connV11.receiveFrame();
-
- Assert.assertEquals("MESSAGE", frame.getCommand());
- Assert.assertNotNull(frame.getHeader("destination"));
- Assert.assertEquals(getName(), frame.getBody());
-
- connV11.disconnect();
-
- // message should not be received as it was auto-acked
- MessageConsumer consumer = session.createConsumer(queue);
- Message message = consumer.receive(1000);
- Assert.assertNull(message);
- }
-
- public void testSubscribeWithAutoAckAndBytesMessage() throws Exception
- {
- connV11.connect(defUser, defPass);
-
- this.subscribe(connV11, "sub1", "auto");
-
- byte[] payload = new byte[] { 1, 2, 3, 4, 5 };
- sendMessage(payload, queue);
-
- ClientStompFrame frame = connV11.receiveFrame();
-
- assertEquals("MESSAGE", frame.getCommand());
-
- System.out.println("Message: " + frame);
-
- assertEquals("5", frame.getHeader("content-length"));
-
- assertEquals(null, frame.getHeader("type"));
-
- assertEquals(frame.getBody(), new String(payload, "UTF-8"));
-
- connV11.disconnect();
- }
-
- public void testSubscribeWithClientAck() throws Exception
- {
- connV11.connect(defUser, defPass);
-
- this.subscribe(connV11, "sub1", "client");
-
- sendMessage(getName());
-
- ClientStompFrame frame = connV11.receiveFrame();
-
- this.ack(connV11, "sub1", frame);
-
- connV11.disconnect();
-
- // message should not be received since message was acknowledged by the client
- MessageConsumer consumer = session.createConsumer(queue);
- Message message = consumer.receive(1000);
- Assert.assertNull(message);
- }
-
- public void testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithExplicitDisconnect() throws Exception
- {
- assertSubscribeWithClientAckThenConsumeWithAutoAck(true);
- }
-
- public void testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithNoDisconnectFrame() throws Exception
- {
- assertSubscribeWithClientAckThenConsumeWithAutoAck(false);
- }
-
- public void testSubscribeWithID() throws Exception
- {
- connV11.connect(defUser, defPass);
-
- this.subscribe(connV11, "mysubid", "auto");
-
- sendMessage(getName());
-
- ClientStompFrame frame = connV11.receiveFrame();
-
- Assert.assertTrue(frame.getHeader("subscription") != null);
-
- connV11.disconnect();
- }
-
- public void testSubscribeWithMessageSentWithProperties() throws Exception
- {
- connV11.connect(defUser, defPass);
-
- this.subscribe(connV11, "sub1", "auto");
-
- MessageProducer producer = session.createProducer(queue);
- BytesMessage message = session.createBytesMessage();
- message.setStringProperty("S", "value");
- message.setBooleanProperty("n", false);
- message.setByteProperty("byte", (byte)9);
- message.setDoubleProperty("d", 2.0);
- message.setFloatProperty("f", (float)6.0);
- message.setIntProperty("i", 10);
- message.setLongProperty("l", 121);
- message.setShortProperty("s", (short)12);
- message.writeBytes("Hello World".getBytes("UTF-8"));
- producer.send(message);
-
- ClientStompFrame frame = connV11.receiveFrame();
- Assert.assertNotNull(frame);
-
- Assert.assertTrue(frame.getHeader("S") != null);
- Assert.assertTrue(frame.getHeader("n") != null);
- Assert.assertTrue(frame.getHeader("byte") != null);
- Assert.assertTrue(frame.getHeader("d") != null);
- Assert.assertTrue(frame.getHeader("f") != null);
- Assert.assertTrue(frame.getHeader("i") != null);
- Assert.assertTrue(frame.getHeader("l") != null);
- Assert.assertTrue(frame.getHeader("s") != null);
- Assert.assertEquals("Hello World", frame.getBody());
-
- connV11.disconnect();
- }
-
- public void testSuccessiveTransactionsWithSameID() throws Exception
- {
- MessageConsumer consumer = session.createConsumer(queue);
-
- connV11.connect(defUser, defPass);
-
- // first tx
- this.beginTransaction(connV11, "tx1");
-
- ClientStompFrame frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("transaction", "tx1");
- frame.setBody("Hello World");
-
- connV11.sendFrame(frame);
-
- this.commitTransaction(connV11, "tx1");
-
- Message message = consumer.receive(1000);
- Assert.assertNotNull("Should have received a message", message);
-
- // 2nd tx with same tx ID
- this.beginTransaction(connV11, "tx1");
-
- frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("transaction", "tx1");
-
- frame.setBody("Hello World");
-
- connV11.sendFrame(frame);
-
- this.commitTransaction(connV11, "tx1");
-
- message = consumer.receive(1000);
- Assert.assertNotNull("Should have received a message", message);
-
- connV11.disconnect();
- }
-
- public void testTransactionCommit() throws Exception
- {
- MessageConsumer consumer = session.createConsumer(queue);
-
- connV11.connect(defUser, defPass);
-
- this.beginTransaction(connV11, "tx1");
-
- ClientStompFrame frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("transaction", "tx1");
- frame.addHeader("receipt", "123");
- frame.setBody("Hello World");
-
- frame = connV11.sendFrame(frame);
-
- assertEquals("123", frame.getHeader("receipt-id"));
-
- // check the message is not committed
- assertNull(consumer.receive(100));
-
- this.commitTransaction(connV11, "tx1", true);
-
- Message message = consumer.receive(1000);
- Assert.assertNotNull("Should have received a message", message);
-
- connV11.disconnect();
- }
-
- public void testTransactionRollback() throws Exception
- {
- MessageConsumer consumer = session.createConsumer(queue);
-
- connV11.connect(defUser, defPass);
-
- this.beginTransaction(connV11, "tx1");
-
- ClientStompFrame frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("transaction", "tx1");
-
- frame.setBody("first message");
-
- connV11.sendFrame(frame);
-
- // rollback first message
- this.abortTransaction(connV11, "tx1");
-
- this.beginTransaction(connV11, "tx1");
-
- frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("transaction", "tx1");
-
- frame.setBody("second message");
-
- connV11.sendFrame(frame);
-
- this.commitTransaction(connV11, "tx1", true);
-
- // only second msg should be received since first msg was rolled back
- TextMessage message = (TextMessage)consumer.receive(1000);
- Assert.assertNotNull(message);
- Assert.assertEquals("second message", message.getText());
-
- connV11.disconnect();
- }
-
- public void testUnsubscribe() throws Exception
- {
- connV11.connect(defUser, defPass);
-
- this.subscribe(connV11, "sub1", "auto");
-
- // send a message to our queue
- sendMessage("first message");
-
- // receive message from socket
- ClientStompFrame frame = connV11.receiveFrame();
-
- Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
-
- // remove suscription
- this.unsubscribe(connV11, "sub1", true);
-
- // send a message to our queue
- sendMessage("second message");
-
- frame = connV11.receiveFrame(1000);
- assertNull(frame);
-
- connV11.disconnect();
- }
-
- //-----------------private help methods
-
- private void abortTransaction(StompClientConnection conn, String txID) throws IOException, InterruptedException
- {
- ClientStompFrame abortFrame = conn.createFrame("ABORT");
- abortFrame.addHeader("transaction", txID);
-
- conn.sendFrame(abortFrame);
- }
-
- private void beginTransaction(StompClientConnection conn, String txID) throws IOException, InterruptedException
- {
- ClientStompFrame beginFrame = conn.createFrame("BEGIN");
- beginFrame.addHeader("transaction", txID);
-
- conn.sendFrame(beginFrame);
- }
-
- private void commitTransaction(StompClientConnection conn, String txID) throws IOException, InterruptedException
- {
- commitTransaction(conn, txID, false);
- }
-
- private void commitTransaction(StompClientConnection conn, String txID, boolean receipt) throws IOException, InterruptedException
- {
- ClientStompFrame beginFrame = conn.createFrame("COMMIT");
- beginFrame.addHeader("transaction", txID);
- if (receipt)
- {
- beginFrame.addHeader("receipt", "1234");
- }
- ClientStompFrame resp = conn.sendFrame(beginFrame);
- if (receipt)
- {
- assertEquals("1234", resp.getHeader("receipt-id"));
- }
- }
-
- private void ack(StompClientConnection conn, String subId,
- ClientStompFrame frame) throws IOException, InterruptedException
- {
- String messageID = frame.getHeader("message-id");
-
- ClientStompFrame ackFrame = conn.createFrame("ACK");
-
- ackFrame.addHeader("subscription", subId);
- ackFrame.addHeader("message-id", messageID);
-
- ClientStompFrame response = conn.sendFrame(ackFrame);
- if (response != null)
- {
- throw new IOException("failed to ack " + response);
- }
- }
-
- private void ack(StompClientConnection conn, String subId, String mid, String txID) throws IOException, InterruptedException
- {
- ClientStompFrame ackFrame = conn.createFrame("ACK");
- ackFrame.addHeader("subscription", subId);
- ackFrame.addHeader("message-id", mid);
- if (txID != null)
- {
- ackFrame.addHeader("transaction", txID);
- }
-
- conn.sendFrame(ackFrame);
- }
-
- private void nack(StompClientConnection conn, String subId, String mid) throws IOException, InterruptedException
- {
- ClientStompFrame ackFrame = conn.createFrame("NACK");
- ackFrame.addHeader("subscription", subId);
- ackFrame.addHeader("message-id", mid);
-
- conn.sendFrame(ackFrame);
- }
-
- private void subscribe(StompClientConnection conn, String subId, String ack) throws IOException, InterruptedException
- {
- subscribe(conn, subId, ack, null, null);
- }
-
- private void subscribe(StompClientConnection conn, String subId,
- String ack, String durableId) throws IOException, InterruptedException
- {
- subscribe(conn, subId, ack, durableId, null);
- }
-
- private void subscribe(StompClientConnection conn, String subId,
- String ack, String durableId, boolean receipt) throws IOException, InterruptedException
- {
- subscribe(conn, subId, ack, durableId, null, receipt);
- }
-
- private void subscribe(StompClientConnection conn, String subId, String ack,
- String durableId, String selector) throws IOException,
- InterruptedException
- {
- subscribe(conn, subId, ack, durableId, selector, false);
- }
-
- private void subscribe(StompClientConnection conn, String subId,
- String ack, String durableId, String selector, boolean receipt) throws IOException, InterruptedException
- {
- ClientStompFrame subFrame = conn.createFrame("SUBSCRIBE");
- subFrame.addHeader("id", subId);
- subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
- if (ack != null)
- {
- subFrame.addHeader("ack", ack);
- }
- if (durableId != null)
- {
- subFrame.addHeader("durable-subscriber-name", durableId);
- }
- if (selector != null)
- {
- subFrame.addHeader("selector", selector);
- }
- if (receipt)
- {
- subFrame.addHeader("receipt", "1234");
- }
-
- subFrame = conn.sendFrame(subFrame);
-
- if (receipt)
- {
- assertEquals("1234", subFrame.getHeader("receipt-id"));
- }
- }
-
- private void subscribeTopic(StompClientConnection conn, String subId,
- String ack, String durableId) throws IOException, InterruptedException
- {
- subscribeTopic(conn, subId, ack, durableId, false);
- }
-
- private void subscribeTopic(StompClientConnection conn, String subId,
- String ack, String durableId, boolean receipt) throws IOException, InterruptedException
- {
- subscribeTopic(conn, subId, ack, durableId, receipt, false);
- }
-
- private void subscribeTopic(StompClientConnection conn, String subId,
- String ack, String durableId, boolean receipt, boolean noLocal) throws IOException, InterruptedException
- {
- ClientStompFrame subFrame = conn.createFrame("SUBSCRIBE");
- subFrame.addHeader("id", subId);
- subFrame.addHeader("destination", getTopicPrefix() + getTopicName());
- if (ack != null)
- {
- subFrame.addHeader("ack", ack);
- }
- if (durableId != null)
- {
- subFrame.addHeader("durable-subscriber-name", durableId);
- }
- if (receipt)
- {
- subFrame.addHeader("receipt", "1234");
- }
- if (noLocal)
- {
- subFrame.addHeader("no-local", "true");
- }
-
- ClientStompFrame frame = conn.sendFrame(subFrame);
-
- if (receipt)
- {
- assertTrue(frame.getHeader("receipt-id").equals("1234"));
- }
- }
-
- private void unsubscribe(StompClientConnection conn, String subId) throws IOException, InterruptedException
- {
- ClientStompFrame subFrame = conn.createFrame("UNSUBSCRIBE");
- subFrame.addHeader("id", subId);
-
- conn.sendFrame(subFrame);
- }
-
- private void unsubscribe(StompClientConnection conn, String subId,
- boolean receipt) throws IOException, InterruptedException
- {
- ClientStompFrame subFrame = conn.createFrame("UNSUBSCRIBE");
- subFrame.addHeader("id", subId);
-
- if (receipt)
- {
- subFrame.addHeader("receipt", "4321");
- }
-
- ClientStompFrame f = conn.sendFrame(subFrame);
-
- if (receipt)
- {
- System.out.println("response: " + f);
- assertEquals("RECEIPT", f.getCommand());
- assertEquals("4321", f.getHeader("receipt-id"));
- }
- }
-
- protected void assertSubscribeWithClientAckThenConsumeWithAutoAck(boolean sendDisconnect) throws Exception
- {
- connV11.connect(defUser, defPass);
-
- this.subscribe(connV11, "sub1", "client");
-
- sendMessage(getName());
-
- ClientStompFrame frame = connV11.receiveFrame();
-
- Assert.assertEquals("MESSAGE", frame.getCommand());
-
- log.info("Reconnecting!");
-
- if (sendDisconnect)
- {
- connV11.disconnect();
- connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
- }
- else
- {
- connV11.destroy();
- connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
- }
-
- // message should be received since message was not acknowledged
- connV11.connect(defUser, defPass);
-
- this.subscribe(connV11, "sub1", null);
-
- frame = connV11.receiveFrame();
- Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
-
- connV11.disconnect();
-
- // now lets make sure we don't see the message again
- connV11.destroy();
- connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
- connV11.connect(defUser, defPass);
-
- this.subscribe(connV11, "sub1", null, null, true);
-
- sendMessage("shouldBeNextMessage");
-
- frame = connV11.receiveFrame();
- Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
- Assert.assertEquals("shouldBeNextMessage", frame.getBody());
- }
-
-}
-
-
-
-
-
Copied: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompV11Test.java (from rev 11873, trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java)
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompV11Test.java (rev 0)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompV11Test.java 2011-12-08 03:05:11 UTC (rev 11874)
@@ -0,0 +1,2221 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.hornetq.tests.integration.stomp.v11;
+
+import java.io.IOException;
+import java.nio.channels.ClosedChannelException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.BytesMessage;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.TextMessage;
+
+import junit.framework.Assert;
+
+import org.hornetq.core.logging.Logger;
+import org.hornetq.tests.integration.stomp.util.ClientStompFrame;
+import org.hornetq.tests.integration.stomp.util.StompClientConnection;
+import org.hornetq.tests.integration.stomp.util.StompClientConnectionFactory;
+import org.hornetq.tests.integration.stomp.util.StompClientConnectionV11;
+
+/*
+ *
+ */
+public class StompV11Test extends StompTestBase2
+{
+ private static final transient Logger log = Logger.getLogger(StompV11Test.class);
+
+ private StompClientConnection connV11;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ }
+
+ protected void tearDown() throws Exception
+ {
+ System.out.println("Connection 11 : " + connV11.isConnected());
+ if (connV11.isConnected())
+ {
+ connV11.disconnect();
+ }
+ super.tearDown();
+ }
+
+ public void testConnection() throws Exception
+ {
+ StompClientConnection connection = StompClientConnectionFactory.createClientConnection("1.0", hostname, port);
+
+ connection.connect(defUser, defPass);
+
+ assertTrue(connection.isConnected());
+
+ assertEquals("1.0", connection.getVersion());
+
+ connection.disconnect();
+
+ connection = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+
+ connection.connect(defUser, defPass);
+
+ assertTrue(connection.isConnected());
+
+ assertEquals("1.1", connection.getVersion());
+
+ connection.disconnect();
+
+ connection = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+
+ connection.connect();
+
+ assertFalse(connection.isConnected());
+
+ //new way of connection
+ StompClientConnectionV11 conn = (StompClientConnectionV11) StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ conn.connect1(defUser, defPass);
+
+ assertTrue(conn.isConnected());
+
+ conn.disconnect();
+ }
+
+ public void testNegotiation() throws Exception
+ {
+ // case 1 accept-version absent. It is a 1.0 connect
+ ClientStompFrame frame = connV11.createFrame("CONNECT");
+ frame.addHeader("host", "127.0.0.1");
+ frame.addHeader("login", this.defUser);
+ frame.addHeader("passcode", this.defPass);
+
+ ClientStompFrame reply = connV11.sendFrame(frame);
+
+ assertEquals("CONNECTED", reply.getCommand());
+
+ //reply headers: version, session, server
+ assertEquals(null, reply.getHeader("version"));
+
+ connV11.disconnect();
+
+ // case 2 accept-version=1.0, result: 1.0
+ connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ frame = connV11.createFrame("CONNECT");
+ frame.addHeader("accept-version", "1.0");
+ frame.addHeader("host", "127.0.0.1");
+ frame.addHeader("login", this.defUser);
+ frame.addHeader("passcode", this.defPass);
+
+ reply = connV11.sendFrame(frame);
+
+ assertEquals("CONNECTED", reply.getCommand());
+
+ //reply headers: version, session, server
+ assertEquals("1.0", reply.getHeader("version"));
+
+ connV11.disconnect();
+
+ // case 3 accept-version=1.1, result: 1.1
+ connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ frame = connV11.createFrame("CONNECT");
+ frame.addHeader("accept-version", "1.1");
+ frame.addHeader("host", "127.0.0.1");
+ frame.addHeader("login", this.defUser);
+ frame.addHeader("passcode", this.defPass);
+
+ reply = connV11.sendFrame(frame);
+
+ assertEquals("CONNECTED", reply.getCommand());
+
+ //reply headers: version, session, server
+ assertEquals("1.1", reply.getHeader("version"));
+
+ connV11.disconnect();
+
+ // case 4 accept-version=1.0,1.1,1.2, result 1.1
+ connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ frame = connV11.createFrame("CONNECT");
+ frame.addHeader("accept-version", "1.0,1.1,1.2");
+ frame.addHeader("host", "127.0.0.1");
+ frame.addHeader("login", this.defUser);
+ frame.addHeader("passcode", this.defPass);
+
+ reply = connV11.sendFrame(frame);
+
+ assertEquals("CONNECTED", reply.getCommand());
+
+ //reply headers: version, session, server
+ assertEquals("1.1", reply.getHeader("version"));
+
+ connV11.disconnect();
+
+ // case 5 accept-version=1.2, result error
+ connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ frame = connV11.createFrame("CONNECT");
+ frame.addHeader("accept-version", "1.2");
+ frame.addHeader("host", "127.0.0.1");
+ frame.addHeader("login", this.defUser);
+ frame.addHeader("passcode", this.defPass);
+
+ reply = connV11.sendFrame(frame);
+
+ assertEquals("ERROR", reply.getCommand());
+
+ System.out.println("Got error frame " + reply);
+
+ }
+
+ public void testSendAndReceive() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+ ClientStompFrame frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("content-type", "text/plain");
+ frame.setBody("Hello World 1!");
+
+ ClientStompFrame response = connV11.sendFrame(frame);
+
+ assertNull(response);
+
+ frame.addHeader("receipt", "1234");
+ frame.setBody("Hello World 2!");
+
+ response = connV11.sendFrame(frame);
+
+ assertNotNull(response);
+
+ assertEquals("RECEIPT", response.getCommand());
+
+ assertEquals("1234", response.getHeader("receipt-id"));
+
+ //subscribe
+ StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ newConn.connect(defUser, defPass);
+
+ ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
+ subFrame.addHeader("id", "a-sub");
+ subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+ subFrame.addHeader("ack", "auto");
+
+ newConn.sendFrame(subFrame);
+
+ frame = newConn.receiveFrame();
+
+ System.out.println("received " + frame);
+
+ assertEquals("MESSAGE", frame.getCommand());
+
+ assertEquals("a-sub", frame.getHeader("subscription"));
+
+ assertNotNull(frame.getHeader("message-id"));
+
+ assertEquals(getQueuePrefix() + getQueueName(), frame.getHeader("destination"));
+
+ assertEquals("Hello World 1!", frame.getBody());
+
+ frame = newConn.receiveFrame();
+
+ System.out.println("received " + frame);
+
+ //unsub
+ ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
+ unsubFrame.addHeader("id", "a-sub");
+ newConn.sendFrame(unsubFrame);
+
+ newConn.disconnect();
+ }
+
+ public void testHeaderContentType() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+ ClientStompFrame frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("content-type", "application/xml");
+ frame.setBody("Hello World 1!");
+
+ connV11.sendFrame(frame);
+
+ //subscribe
+ StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ newConn.connect(defUser, defPass);
+
+ ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
+ subFrame.addHeader("id", "a-sub");
+ subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+ subFrame.addHeader("ack", "auto");
+
+ newConn.sendFrame(subFrame);
+
+ frame = newConn.receiveFrame();
+
+ System.out.println("received " + frame);
+
+ assertEquals("MESSAGE", frame.getCommand());
+
+ assertEquals("application/xml", frame.getHeader("content-type"));
+
+ //unsub
+ ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
+ unsubFrame.addHeader("id", "a-sub");
+
+ newConn.disconnect();
+ }
+
+ public void testHeaderContentLength() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+ ClientStompFrame frame = connV11.createFrame("SEND");
+
+ String body = "Hello World 1!";
+ String cLen = String.valueOf(body.getBytes("UTF-8").length);
+
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("content-type", "application/xml");
+ frame.addHeader("content-length", cLen);
+ frame.setBody(body + "extra");
+
+ connV11.sendFrame(frame);
+
+ //subscribe
+ StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ newConn.connect(defUser, defPass);
+
+ ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
+ subFrame.addHeader("id", "a-sub");
+ subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+ subFrame.addHeader("ack", "auto");
+
+ newConn.sendFrame(subFrame);
+
+ frame = newConn.receiveFrame();
+
+ System.out.println("received " + frame);
+
+ assertEquals("MESSAGE", frame.getCommand());
+
+ assertEquals(cLen, frame.getHeader("content-length"));
+
+ //unsub
+ ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
+ unsubFrame.addHeader("id", "a-sub");
+
+ newConn.disconnect();
+ }
+
+ public void testHeaderEncoding() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+ ClientStompFrame frame = connV11.createFrame("SEND");
+
+ String body = "Hello World 1!";
+ String cLen = String.valueOf(body.getBytes("UTF-8").length);
+
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("content-type", "application/xml");
+ frame.addHeader("content-length", cLen);
+ String hKey = "special-header\\\\\\n\\:";
+ String hVal = "\\:\\\\\\ngood";
+ frame.addHeader(hKey, hVal);
+
+ System.out.println("key: |" + hKey + "| val: |" + hVal);
+
+ frame.setBody(body);
+
+ connV11.sendFrame(frame);
+
+ //subscribe
+ StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ newConn.connect(defUser, defPass);
+
+ ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
+ subFrame.addHeader("id", "a-sub");
+ subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+ subFrame.addHeader("ack", "auto");
+
+ newConn.sendFrame(subFrame);
+
+ frame = newConn.receiveFrame();
+
+ System.out.println("received " + frame);
+
+ assertEquals("MESSAGE", frame.getCommand());
+
+ String value = frame.getHeader("special-header" + "\\" + "\n" + ":");
+
+ assertEquals(":" + "\\" + "\n" + "good", value);
+
+ //unsub
+ ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
+ unsubFrame.addHeader("id", "a-sub");
+
+ newConn.disconnect();
+ }
+
+ public void testHeartBeat() throws Exception
+ {
+ //no heart beat at all if heat-beat absent
+ ClientStompFrame frame = connV11.createFrame("CONNECT");
+ frame.addHeader("host", "127.0.0.1");
+ frame.addHeader("login", this.defUser);
+ frame.addHeader("passcode", this.defPass);
+
+ ClientStompFrame reply = connV11.sendFrame(frame);
+
+ assertEquals("CONNECTED", reply.getCommand());
+
+ Thread.sleep(5000);
+
+ assertEquals(0, connV11.getFrameQueueSize());
+
+ connV11.disconnect();
+
+ //no heart beat for (0,0)
+ connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ frame = connV11.createFrame("CONNECT");
+ frame.addHeader("host", "127.0.0.1");
+ frame.addHeader("login", this.defUser);
+ frame.addHeader("passcode", this.defPass);
+ frame.addHeader("heart-beat", "0,0");
+ frame.addHeader("accept-version", "1.0,1.1");
+
+ reply = connV11.sendFrame(frame);
+
+ assertEquals("CONNECTED", reply.getCommand());
+
+ assertEquals("0,0", reply.getHeader("heart-beat"));
+
+ Thread.sleep(5000);
+
+ assertEquals(0, connV11.getFrameQueueSize());
+
+ connV11.disconnect();
+
+ //heart-beat (1,0), should receive a min client ping accepted by server
+ connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ frame = connV11.createFrame("CONNECT");
+ frame.addHeader("host", "127.0.0.1");
+ frame.addHeader("login", this.defUser);
+ frame.addHeader("passcode", this.defPass);
+ frame.addHeader("heart-beat", "1,0");
+ frame.addHeader("accept-version", "1.0,1.1");
+
+ reply = connV11.sendFrame(frame);
+
+ assertEquals("CONNECTED", reply.getCommand());
+
+ assertEquals("0,500", reply.getHeader("heart-beat"));
+
+ Thread.sleep(2000);
+
+ //now server side should be disconnected because we didn't send ping for 2 sec
+ frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("content-type", "text/plain");
+ frame.setBody("Hello World");
+
+ //send will fail
+ try
+ {
+ connV11.sendFrame(frame);
+ fail("connection should have been destroyed by now");
+ }
+ catch (IOException e)
+ {
+ //ignore
+ }
+
+ //heart-beat (1,0), start a ping, then send a message, should be ok.
+ connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ frame = connV11.createFrame("CONNECT");
+ frame.addHeader("host", "127.0.0.1");
+ frame.addHeader("login", this.defUser);
+ frame.addHeader("passcode", this.defPass);
+ frame.addHeader("heart-beat", "1,0");
+ frame.addHeader("accept-version", "1.0,1.1");
+
+ reply = connV11.sendFrame(frame);
+
+ assertEquals("CONNECTED", reply.getCommand());
+
+ assertEquals("0,500", reply.getHeader("heart-beat"));
+
+ System.out.println("========== start pinger!");
+
+ connV11.startPinger(500);
+
+ Thread.sleep(2000);
+
+ //now server side should be disconnected because we didn't send ping for 2 sec
+ frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("content-type", "text/plain");
+ frame.setBody("Hello World");
+
+ //send will be ok
+ connV11.sendFrame(frame);
+
+ connV11.stopPinger();
+
+ connV11.disconnect();
+
+ }
+
+ //server ping
+ public void testHeartBeat2() throws Exception
+ {
+ //heart-beat (1,1)
+ ClientStompFrame frame = connV11.createFrame("CONNECT");
+ frame.addHeader("host", "127.0.0.1");
+ frame.addHeader("login", this.defUser);
+ frame.addHeader("passcode", this.defPass);
+ frame.addHeader("heart-beat", "1,1");
+ frame.addHeader("accept-version", "1.0,1.1");
+
+ ClientStompFrame reply = connV11.sendFrame(frame);
+
+ assertEquals("CONNECTED", reply.getCommand());
+ assertEquals("500,500", reply.getHeader("heart-beat"));
+
+ connV11.disconnect();
+
+ //heart-beat (500,1000)
+ connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ frame = connV11.createFrame("CONNECT");
+ frame.addHeader("host", "127.0.0.1");
+ frame.addHeader("login", this.defUser);
+ frame.addHeader("passcode", this.defPass);
+ frame.addHeader("heart-beat", "500,1000");
+ frame.addHeader("accept-version", "1.0,1.1");
+
+ reply = connV11.sendFrame(frame);
+
+ assertEquals("CONNECTED", reply.getCommand());
+
+ assertEquals("1000,500", reply.getHeader("heart-beat"));
+
+ System.out.println("========== start pinger!");
+
+ connV11.startPinger(500);
+
+ Thread.sleep(10000);
+
+ //now check the frame size
+ int size = connV11.getServerPingNumber();
+
+ System.out.println("ping received: " + size);
+
+ assertTrue(size > 5);
+
+ //now server side should be disconnected because we didn't send ping for 2 sec
+ frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("content-type", "text/plain");
+ frame.setBody("Hello World");
+
+ //send will be ok
+ connV11.sendFrame(frame);
+
+ connV11.disconnect();
+ }
+
+ public void testSendWithHeartBeatsAndReceive() throws Exception
+ {
+ StompClientConnection newConn = null;
+ try
+ {
+ ClientStompFrame frame = connV11.createFrame("CONNECT");
+ frame.addHeader("host", "127.0.0.1");
+ frame.addHeader("login", this.defUser);
+ frame.addHeader("passcode", this.defPass);
+ frame.addHeader("heart-beat", "500,1000");
+ frame.addHeader("accept-version", "1.0,1.1");
+
+ connV11.sendFrame(frame);
+
+ connV11.startPinger(500);
+
+ frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("content-type", "text/plain");
+
+ for (int i = 0; i < 10; i++)
+ {
+ frame.setBody("Hello World " + i + "!");
+ connV11.sendFrame(frame);
+ Thread.sleep(500);
+ }
+
+ // subscribe
+ newConn = StompClientConnectionFactory.createClientConnection("1.1",
+ hostname, port);
+ newConn.connect(defUser, defPass);
+
+ ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
+ subFrame.addHeader("id", "a-sub");
+ subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+ subFrame.addHeader("ack", "auto");
+
+ newConn.sendFrame(subFrame);
+
+ int cnt = 0;
+
+ frame = newConn.receiveFrame();
+
+ while (frame != null)
+ {
+ cnt++;
+ Thread.sleep(500);
+ frame = newConn.receiveFrame(5000);
+ }
+
+ assertEquals(10, cnt);
+
+ // unsub
+ ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
+ unsubFrame.addHeader("id", "a-sub");
+ newConn.sendFrame(unsubFrame);
+ }
+ finally
+ {
+ if (newConn != null)
+ newConn.disconnect();
+ connV11.disconnect();
+ }
+ }
+
+ public void testSendAndReceiveWithHeartBeats() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+ ClientStompFrame frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("content-type", "text/plain");
+
+ for (int i = 0; i < 10; i++)
+ {
+ frame.setBody("Hello World " + i + "!");
+ connV11.sendFrame(frame);
+ Thread.sleep(500);
+ }
+
+ //subscribe
+ StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ try
+ {
+ frame = newConn.createFrame("CONNECT");
+ frame.addHeader("host", "127.0.0.1");
+ frame.addHeader("login", this.defUser);
+ frame.addHeader("passcode", this.defPass);
+ frame.addHeader("heart-beat", "500,1000");
+ frame.addHeader("accept-version", "1.0,1.1");
+
+ newConn.sendFrame(frame);
+
+ newConn.startPinger(500);
+
+ Thread.sleep(500);
+
+ ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
+ subFrame.addHeader("id", "a-sub");
+ subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+ subFrame.addHeader("ack", "auto");
+
+ newConn.sendFrame(subFrame);
+
+ int cnt = 0;
+
+ frame = newConn.receiveFrame();
+
+ while (frame != null)
+ {
+ cnt++;
+ Thread.sleep(500);
+ frame = newConn.receiveFrame(5000);
+ }
+
+ assertEquals(10, cnt);
+
+ // unsub
+ ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
+ unsubFrame.addHeader("id", "a-sub");
+ newConn.sendFrame(unsubFrame);
+ }
+ finally
+ {
+ newConn.disconnect();
+ }
+ }
+
+ public void testSendWithHeartBeatsAndReceiveWithHeartBeats() throws Exception
+ {
+ StompClientConnection newConn = null;
+ try
+ {
+ ClientStompFrame frame = connV11.createFrame("CONNECT");
+ frame.addHeader("host", "127.0.0.1");
+ frame.addHeader("login", this.defUser);
+ frame.addHeader("passcode", this.defPass);
+ frame.addHeader("heart-beat", "500,1000");
+ frame.addHeader("accept-version", "1.0,1.1");
+
+ connV11.sendFrame(frame);
+
+ connV11.startPinger(500);
+
+ frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("content-type", "text/plain");
+
+ for (int i = 0; i < 10; i++)
+ {
+ frame.setBody("Hello World " + i + "!");
+ connV11.sendFrame(frame);
+ Thread.sleep(500);
+ }
+
+ // subscribe
+ newConn = StompClientConnectionFactory.createClientConnection("1.1",
+ hostname, port);
+ frame = newConn.createFrame("CONNECT");
+ frame.addHeader("host", "127.0.0.1");
+ frame.addHeader("login", this.defUser);
+ frame.addHeader("passcode", this.defPass);
+ frame.addHeader("heart-beat", "500,1000");
+ frame.addHeader("accept-version", "1.0,1.1");
+
+ newConn.sendFrame(frame);
+
+ newConn.startPinger(500);
+
+ Thread.sleep(500);
+
+ ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
+ subFrame.addHeader("id", "a-sub");
+ subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+ subFrame.addHeader("ack", "auto");
+
+ newConn.sendFrame(subFrame);
+
+ int cnt = 0;
+
+ frame = newConn.receiveFrame();
+
+ while (frame != null)
+ {
+ cnt++;
+ Thread.sleep(500);
+ frame = newConn.receiveFrame(5000);
+ }
+ assertEquals(10, cnt);
+
+ // unsub
+ ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
+ unsubFrame.addHeader("id", "a-sub");
+ newConn.sendFrame(unsubFrame);
+ }
+ finally
+ {
+ if (newConn != null)
+ newConn.disconnect();
+ connV11.disconnect();
+ }
+ }
+
+ public void testNack() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ subscribe(connV11, "sub1", "client");
+
+ sendMessage(getName());
+
+ ClientStompFrame frame = connV11.receiveFrame();
+
+ String messageID = frame.getHeader("message-id");
+
+ System.out.println("Received message with id " + messageID);
+
+ nack(connV11, "sub1", messageID);
+
+ unsubscribe(connV11, "sub1");
+
+ connV11.disconnect();
+
+ //Nack makes the message be dropped.
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message message = consumer.receive(1000);
+ Assert.assertNull(message);
+ }
+
+ public void testNackWithWrongSubId() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ subscribe(connV11, "sub1", "client");
+
+ sendMessage(getName());
+
+ ClientStompFrame frame = connV11.receiveFrame();
+
+ String messageID = frame.getHeader("message-id");
+
+ System.out.println("Received message with id " + messageID);
+
+ nack(connV11, "sub2", messageID);
+
+ ClientStompFrame error = connV11.receiveFrame();
+
+ System.out.println("Receiver error: " + error);
+
+ unsubscribe(connV11, "sub1");
+
+ connV11.disconnect();
+
+ //message should be still there
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message message = consumer.receive(1000);
+ Assert.assertNotNull(message);
+ }
+
+ public void testNackWithWrongMessageId() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ subscribe(connV11, "sub1", "client");
+
+ sendMessage(getName());
+
+ ClientStompFrame frame = connV11.receiveFrame();
+
+ String messageID = frame.getHeader("message-id");
+
+ System.out.println("Received message with id " + messageID);
+
+ nack(connV11, "sub2", "someother");
+
+ ClientStompFrame error = connV11.receiveFrame();
+
+ System.out.println("Receiver error: " + error);
+
+ unsubscribe(connV11, "sub1");
+
+ connV11.disconnect();
+
+ //message should still there
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message message = consumer.receive(1000);
+ Assert.assertNotNull(message);
+ }
+
+
+ public void testAck() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ subscribe(connV11, "sub1", "client");
+
+ sendMessage(getName());
+
+ ClientStompFrame frame = connV11.receiveFrame();
+
+ String messageID = frame.getHeader("message-id");
+
+ System.out.println("Received message with id " + messageID);
+
+ ack(connV11, "sub1", messageID, null);
+
+ unsubscribe(connV11, "sub1");
+
+ connV11.disconnect();
+
+ //Nack makes the message be dropped.
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message message = consumer.receive(1000);
+ Assert.assertNull(message);
+ }
+
+ public void testAckWithWrongSubId() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ subscribe(connV11, "sub1", "client");
+
+ sendMessage(getName());
+
+ ClientStompFrame frame = connV11.receiveFrame();
+
+ String messageID = frame.getHeader("message-id");
+
+ System.out.println("Received message with id " + messageID);
+
+ ack(connV11, "sub2", messageID, null);
+
+ ClientStompFrame error = connV11.receiveFrame();
+
+ System.out.println("Receiver error: " + error);
+
+ unsubscribe(connV11, "sub1");
+
+ connV11.disconnect();
+
+ //message should be still there
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message message = consumer.receive(1000);
+ Assert.assertNotNull(message);
+ }
+
+ public void testAckWithWrongMessageId() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ subscribe(connV11, "sub1", "client");
+
+ sendMessage(getName());
+
+ ClientStompFrame frame = connV11.receiveFrame();
+
+ String messageID = frame.getHeader("message-id");
+
+ System.out.println("Received message with id " + messageID);
+
+ ack(connV11, "sub2", "someother", null);
+
+ ClientStompFrame error = connV11.receiveFrame();
+
+ System.out.println("Receiver error: " + error);
+
+ unsubscribe(connV11, "sub1");
+
+ connV11.disconnect();
+
+ //message should still there
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message message = consumer.receive(1000);
+ Assert.assertNotNull(message);
+ }
+
+ public void testErrorWithReceipt() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ subscribe(connV11, "sub1", "client");
+
+ sendMessage(getName());
+
+ ClientStompFrame frame = connV11.receiveFrame();
+
+ String messageID = frame.getHeader("message-id");
+
+ System.out.println("Received message with id " + messageID);
+
+ ClientStompFrame ackFrame = connV11.createFrame("ACK");
+ //give it a wrong sub id
+ ackFrame.addHeader("subscription", "sub2");
+ ackFrame.addHeader("message-id", messageID);
+ ackFrame.addHeader("receipt", "answer-me");
+
+ ClientStompFrame error = connV11.sendFrame(ackFrame);
+
+ System.out.println("Receiver error: " + error);
+
+ assertEquals("ERROR", error.getCommand());
+
+ assertEquals("answer-me", error.getHeader("receipt-id"));
+
+ unsubscribe(connV11, "sub1");
+
+ connV11.disconnect();
+
+ //message should still there
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message message = consumer.receive(1000);
+ Assert.assertNotNull(message);
+ }
+
+ public void testErrorWithReceipt2() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ subscribe(connV11, "sub1", "client");
+
+ sendMessage(getName());
+
+ ClientStompFrame frame = connV11.receiveFrame();
+
+ String messageID = frame.getHeader("message-id");
+
+ System.out.println("Received message with id " + messageID);
+
+ ClientStompFrame ackFrame = connV11.createFrame("ACK");
+ //give it a wrong sub id
+ ackFrame.addHeader("subscription", "sub1");
+ ackFrame.addHeader("message-id", String.valueOf(Long.valueOf(messageID) + 1));
+ ackFrame.addHeader("receipt", "answer-me");
+
+ ClientStompFrame error = connV11.sendFrame(ackFrame);
+
+ System.out.println("Receiver error: " + error);
+
+ assertEquals("ERROR", error.getCommand());
+
+ assertEquals("answer-me", error.getHeader("receipt-id"));
+
+ unsubscribe(connV11, "sub1");
+
+ connV11.disconnect();
+
+ //message should still there
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message message = consumer.receive(1000);
+ Assert.assertNotNull(message);
+ }
+
+ public void testAckModeClient() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ subscribe(connV11, "sub1", "client");
+
+ int num = 50;
+ //send a bunch of messages
+ for (int i = 0; i < num; i++)
+ {
+ this.sendMessage("client-ack" + i);
+ }
+
+ ClientStompFrame frame = null;
+
+ for (int i = 0; i < num; i++)
+ {
+ frame = connV11.receiveFrame();
+ assertNotNull(frame);
+ }
+
+ //ack the last
+ this.ack(connV11, "sub1", frame);
+
+ unsubscribe(connV11, "sub1");
+
+ connV11.disconnect();
+
+ //no messages can be received.
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message message = consumer.receive(1000);
+ Assert.assertNull(message);
+ }
+
+ public void testAckModeClient2() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ subscribe(connV11, "sub1", "client");
+
+ int num = 50;
+ //send a bunch of messages
+ for (int i = 0; i < num; i++)
+ {
+ this.sendMessage("client-ack" + i);
+ }
+
+ ClientStompFrame frame = null;
+
+ for (int i = 0; i < num; i++)
+ {
+ frame = connV11.receiveFrame();
+ assertNotNull(frame);
+
+ //ack the 49th
+ if (i == num - 2)
+ {
+ this.ack(connV11, "sub1", frame);
+ }
+ }
+
+ unsubscribe(connV11, "sub1");
+
+ connV11.disconnect();
+
+ //no messages can be received.
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message message = consumer.receive(1000);
+ Assert.assertNotNull(message);
+ message = consumer.receive(1000);
+ Assert.assertNull(message);
+ }
+
+ public void testAckModeAuto() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ subscribe(connV11, "sub1", "auto");
+
+ int num = 50;
+ //send a bunch of messages
+ for (int i = 0; i < num; i++)
+ {
+ this.sendMessage("auto-ack" + i);
+ }
+
+ ClientStompFrame frame = null;
+
+ for (int i = 0; i < num; i++)
+ {
+ frame = connV11.receiveFrame();
+ assertNotNull(frame);
+ }
+
+ unsubscribe(connV11, "sub1");
+
+ connV11.disconnect();
+
+ //no messages can be received.
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message message = consumer.receive(1000);
+ Assert.assertNull(message);
+ }
+
+ public void testAckModeClientIndividual() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ subscribe(connV11, "sub1", "client-individual");
+
+ int num = 50;
+ //send a bunch of messages
+ for (int i = 0; i < num; i++)
+ {
+ this.sendMessage("client-individual-ack" + i);
+ }
+
+ ClientStompFrame frame = null;
+
+ for (int i = 0; i < num; i++)
+ {
+ frame = connV11.receiveFrame();
+ assertNotNull(frame);
+
+ System.out.println(i + " == received: " + frame);
+ //ack on even numbers
+ if (i%2 == 0)
+ {
+ this.ack(connV11, "sub1", frame);
+ }
+ }
+
+ unsubscribe(connV11, "sub1");
+
+ connV11.disconnect();
+
+ //no messages can be received.
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ TextMessage message = null;
+ for (int i = 0; i < num/2; i++)
+ {
+ message = (TextMessage) consumer.receive(1000);
+ Assert.assertNotNull(message);
+ System.out.println("Legal: " + message.getText());
+ }
+
+ message = (TextMessage) consumer.receive(1000);
+
+ Assert.assertNull(message);
+ }
+
+ public void testTwoSubscribers() throws Exception
+ {
+ connV11.connect(defUser, defPass, "myclientid");
+
+ this.subscribeTopic(connV11, "sub1", "auto", null);
+
+ StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ newConn.connect(defUser, defPass, "myclientid2");
+
+ this.subscribeTopic(newConn, "sub2", "auto", null);
+
+ ClientStompFrame frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getTopicPrefix() + getTopicName());
+
+ frame.setBody("Hello World");
+
+ connV11.sendFrame(frame);
+
+ // receive message from socket
+ frame = connV11.receiveFrame(1000);
+
+ System.out.println("received frame : " + frame);
+ assertEquals("Hello World", frame.getBody());
+ assertEquals("sub1", frame.getHeader("subscription"));
+
+ frame = newConn.receiveFrame(1000);
+
+ System.out.println("received 2 frame : " + frame);
+ assertEquals("Hello World", frame.getBody());
+ assertEquals("sub2", frame.getHeader("subscription"));
+
+ // remove suscription
+ this.unsubscribe(connV11, "sub1", true);
+ this.unsubscribe(newConn, "sub2", true);
+
+ connV11.disconnect();
+ newConn.disconnect();
+ }
+
+ public void testSendAndReceiveOnDifferentConnections() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ ClientStompFrame sendFrame = connV11.createFrame("SEND");
+ sendFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+ sendFrame.setBody("Hello World");
+
+ connV11.sendFrame(sendFrame);
+
+ StompClientConnection connV11_2 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ connV11_2.connect(defUser, defPass);
+
+ this.subscribe(connV11_2, "sub1", "auto");
+
+ ClientStompFrame frame = connV11_2.receiveFrame(2000);
+
+ assertEquals("MESSAGE", frame.getCommand());
+ assertEquals("Hello World", frame.getBody());
+
+ connV11.disconnect();
+ connV11_2.disconnect();
+ }
+
+ //----------------Note: tests below are adapted from StompTest
+
+ public void testBeginSameTransactionTwice() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ beginTransaction(connV11, "tx1");
+
+ beginTransaction(connV11, "tx1");
+
+ ClientStompFrame f = connV11.receiveFrame();
+ Assert.assertTrue(f.getCommand().equals("ERROR"));
+ }
+
+ public void testBodyWithUTF8() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ this.subscribe(connV11, getName(), "auto");
+
+ String text = "A" + "\u00ea" + "\u00f1" + "\u00fc" + "C";
+ System.out.println(text);
+ sendMessage(text);
+
+ ClientStompFrame frame = connV11.receiveFrame();
+ System.out.println(frame);
+ Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
+ Assert.assertNotNull(frame.getHeader("destination"));
+ Assert.assertTrue(frame.getBody().equals(text));
+
+ connV11.disconnect();
+ }
+
+ public void testClientAckNotPartOfTransaction() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ this.subscribe(connV11, getName(), "client");
+
+ sendMessage(getName());
+
+ ClientStompFrame frame = connV11.receiveFrame();
+
+ Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
+ Assert.assertNotNull(frame.getHeader("destination"));
+ Assert.assertTrue(frame.getBody().equals(getName()));
+ Assert.assertNotNull(frame.getHeader("message-id"));
+
+ String messageID = frame.getHeader("message-id");
+
+ beginTransaction(connV11, "tx1");
+
+ this.ack(connV11, getName(), messageID, "tx1");
+
+ abortTransaction(connV11, "tx1");
+
+ frame = connV11.receiveFrame();
+
+ assertNull(frame);
+
+ this.unsubscribe(connV11, getName());
+
+ connV11.disconnect();
+ }
+
+ public void testDisconnectAndError() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ this.subscribe(connV11, getName(), "client");
+
+ ClientStompFrame frame = connV11.createFrame("DISCONNECT");
+ frame.addHeader("receipt", "1");
+
+ ClientStompFrame result = connV11.sendFrame(frame);
+
+ if (result == null || (!"RECEIPT".equals(result.getCommand())) || (!"1".equals(result.getHeader("receipt-id"))))
+ {
+ fail("Disconnect failed! " + result);
+ }
+
+ // sending a message will result in an error
+ ClientStompFrame sendFrame = connV11.createFrame("SEND");
+ sendFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+ sendFrame.setBody("Hello World");
+
+ try
+ {
+ connV11.sendFrame(sendFrame);
+ fail("connection should have been closed by server.");
+ }
+ catch (ClosedChannelException e)
+ {
+ //ok.
+ }
+
+ connV11.destroy();
+ }
+
+ public void testDurableSubscriber() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ this.subscribe(connV11, "sub1", "client", getName());
+
+ this.subscribe(connV11, "sub1", "client", getName());
+
+ ClientStompFrame frame = connV11.receiveFrame();
+ Assert.assertTrue(frame.getCommand().equals("ERROR"));
+
+ connV11.disconnect();
+ }
+
+ public void testDurableSubscriberWithReconnection() throws Exception
+ {
+ connV11.connect(defUser, defPass, "myclientid");
+
+ this.subscribeTopic(connV11, "sub1", "auto", getName());
+
+ ClientStompFrame frame = connV11.createFrame("DISCONNECT");
+ frame.addHeader("receipt", "1");
+
+ ClientStompFrame result = connV11.sendFrame(frame);
+
+ if (result == null || (!"RECEIPT".equals(result.getCommand())) || (!"1".equals(result.getHeader("receipt-id"))))
+ {
+ fail("Disconnect failed! " + result);
+ }
+
+ // send the message when the durable subscriber is disconnected
+ sendMessage(getName(), topic);
+
+ connV11.destroy();
+ connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ connV11.connect(defUser, defPass, "myclientid");
+
+ this.subscribeTopic(connV11, "sub1", "auto", getName());
+
+ // we must have received the message
+ frame = connV11.receiveFrame();
+
+ Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
+ Assert.assertNotNull(frame.getHeader("destination"));
+ Assert.assertEquals(getName(), frame.getBody());
+
+ this.unsubscribe(connV11, "sub1");
+
+ connV11.disconnect();
+ }
+
+ public void testJMSXGroupIdCanBeSet() throws Exception
+ {
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ connV11.connect(defUser, defPass);
+
+ ClientStompFrame frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("JMSXGroupID", "TEST");
+ frame.setBody("Hello World");
+
+ connV11.sendFrame(frame);
+
+ TextMessage message = (TextMessage)consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("Hello World", message.getText());
+ // differ from StompConnect
+ Assert.assertEquals("TEST", message.getStringProperty("JMSXGroupID"));
+ }
+
+ public void testMessagesAreInOrder() throws Exception
+ {
+ int ctr = 10;
+ String[] data = new String[ctr];
+
+ connV11.connect(defUser, defPass);
+
+ this.subscribe(connV11, "sub1", "auto");
+
+ for (int i = 0; i < ctr; ++i)
+ {
+ data[i] = getName() + i;
+ sendMessage(data[i]);
+ }
+
+ ClientStompFrame frame = null;
+
+ for (int i = 0; i < ctr; ++i)
+ {
+ frame = connV11.receiveFrame();
+ Assert.assertTrue("Message not in order", frame.getBody().equals(data[i]));
+ }
+
+ for (int i = 0; i < ctr; ++i)
+ {
+ data[i] = getName() + ":second:" + i;
+ sendMessage(data[i]);
+ }
+
+ for (int i = 0; i < ctr; ++i)
+ {
+ frame = connV11.receiveFrame();
+ Assert.assertTrue("Message not in order", frame.getBody().equals(data[i]));
+ }
+
+ connV11.disconnect();
+ }
+
+ public void testSubscribeWithAutoAckAndSelector() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ this.subscribe(connV11, "sub1", "auto", null, "foo = 'zzz'");
+
+ sendMessage("Ignored message", "foo", "1234");
+ sendMessage("Real message", "foo", "zzz");
+
+ ClientStompFrame frame = connV11.receiveFrame();
+
+ Assert.assertTrue("Should have received the real message but got: " + frame, frame.getBody().equals("Real message"));
+
+ connV11.disconnect();
+ }
+
+ public void testRedeliveryWithClientAck() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ this.subscribe(connV11, "subId", "client");
+
+ sendMessage(getName());
+
+ ClientStompFrame frame = connV11.receiveFrame();
+
+ assertTrue(frame.getCommand().equals("MESSAGE"));
+
+ connV11.disconnect();
+
+ // message should be received since message was not acknowledged
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message message = consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertTrue(message.getJMSRedelivered());
+ }
+
+ public void testSendManyMessages() throws Exception
+ {
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ connV11.connect(defUser, defPass);
+
+ int count = 1000;
+ final CountDownLatch latch = new CountDownLatch(count);
+ consumer.setMessageListener(new MessageListener()
+ {
+ public void onMessage(Message arg0)
+ {
+ latch.countDown();
+ }
+ });
+
+ ClientStompFrame frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.setBody("Hello World");
+
+ for (int i = 1; i <= count; i++)
+ {
+ connV11.sendFrame(frame);
+ }
+
+ assertTrue(latch.await(60, TimeUnit.SECONDS));
+
+ connV11.disconnect();
+ }
+
+ public void testSendMessage() throws Exception
+ {
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ connV11.connect(defUser, defPass);
+
+ ClientStompFrame frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.setBody("Hello World");
+
+ connV11.sendFrame(frame);
+
+ TextMessage message = (TextMessage)consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("Hello World", message.getText());
+ // Assert default priority 4 is used when priority header is not set
+ Assert.assertEquals("getJMSPriority", 4, message.getJMSPriority());
+
+ // Make sure that the timestamp is valid - should
+ // be very close to the current time.
+ long tnow = System.currentTimeMillis();
+ long tmsg = message.getJMSTimestamp();
+ Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
+ }
+
+ public void testSendMessageWithContentLength() throws Exception
+ {
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ connV11.connect(defUser, defPass);
+
+ byte[] data = new byte[] { 1, 0, 0, 4 };
+
+ ClientStompFrame frame = connV11.createFrame("SEND");
+
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.setBody(new String(data, "UTF-8"));
+
+ frame.addHeader("content-length", String.valueOf(data.length));
+
+ connV11.sendFrame(frame);
+
+ BytesMessage message = (BytesMessage)consumer.receive(10000);
+ Assert.assertNotNull(message);
+
+ assertEquals(data.length, message.getBodyLength());
+ assertEquals(data[0], message.readByte());
+ assertEquals(data[1], message.readByte());
+ assertEquals(data[2], message.readByte());
+ assertEquals(data[3], message.readByte());
+ }
+
+ public void testSendMessageWithCustomHeadersAndSelector() throws Exception
+ {
+ MessageConsumer consumer = session.createConsumer(queue, "foo = 'abc'");
+
+ connV11.connect(defUser, defPass);
+
+ ClientStompFrame frame = connV11.createFrame("SEND");
+ frame.addHeader("foo", "abc");
+ frame.addHeader("bar", "123");
+
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.setBody("Hello World");
+
+ connV11.sendFrame(frame);
+
+ TextMessage message = (TextMessage)consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("Hello World", message.getText());
+ Assert.assertEquals("foo", "abc", message.getStringProperty("foo"));
+ Assert.assertEquals("bar", "123", message.getStringProperty("bar"));
+ }
+
+ public void testSendMessageWithLeadingNewLine() throws Exception
+ {
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ connV11.connect(defUser, defPass);
+
+ ClientStompFrame frame = connV11.createFrame("SEND");
+
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.setBody("Hello World");
+
+ connV11.sendWickedFrame(frame);
+
+ TextMessage message = (TextMessage)consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("Hello World", message.getText());
+
+ // Make sure that the timestamp is valid - should
+ // be very close to the current time.
+ long tnow = System.currentTimeMillis();
+ long tmsg = message.getJMSTimestamp();
+ Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
+
+ assertNull(consumer.receive(1000));
+
+ connV11.disconnect();
+ }
+
+ public void testSendMessageWithReceipt() throws Exception
+ {
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ connV11.connect(defUser, defPass);
+
+ ClientStompFrame frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("receipt", "1234");
+ frame.setBody("Hello World");
+
+ frame = connV11.sendFrame(frame);
+
+ assertTrue(frame.getCommand().equals("RECEIPT"));
+ assertEquals("1234", frame.getHeader("receipt-id"));
+
+ TextMessage message = (TextMessage)consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("Hello World", message.getText());
+
+ // Make sure that the timestamp is valid - should
+ // be very close to the current time.
+ long tnow = System.currentTimeMillis();
+ long tmsg = message.getJMSTimestamp();
+ Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
+
+ connV11.disconnect();
+ }
+
+ public void testSendMessageWithStandardHeaders() throws Exception
+ {
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ connV11.connect(defUser, defPass);
+
+ ClientStompFrame frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("correlation-id", "c123");
+ frame.addHeader("persistent", "true");
+ frame.addHeader("priority", "3");
+ frame.addHeader("type", "t345");
+ frame.addHeader("JMSXGroupID", "abc");
+ frame.addHeader("foo", "abc");
+ frame.addHeader("bar", "123");
+
+ frame.setBody("Hello World");
+
+ frame = connV11.sendFrame(frame);
+
+ TextMessage message = (TextMessage)consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("Hello World", message.getText());
+ Assert.assertEquals("JMSCorrelationID", "c123", message.getJMSCorrelationID());
+ Assert.assertEquals("getJMSType", "t345", message.getJMSType());
+ Assert.assertEquals("getJMSPriority", 3, message.getJMSPriority());
+ Assert.assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode());
+ Assert.assertEquals("foo", "abc", message.getStringProperty("foo"));
+ Assert.assertEquals("bar", "123", message.getStringProperty("bar"));
+
+ Assert.assertEquals("JMSXGroupID", "abc", message.getStringProperty("JMSXGroupID"));
+
+ connV11.disconnect();
+ }
+
+ public void testSubscribeToTopic() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ this.subscribeTopic(connV11, "sub1", null, null, true);
+
+ sendMessage(getName(), topic);
+
+ ClientStompFrame frame = connV11.receiveFrame();
+
+ Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
+ Assert.assertTrue(frame.getHeader("destination").equals(getTopicPrefix() + getTopicName()));
+ Assert.assertTrue(frame.getBody().equals(getName()));
+
+ this.unsubscribe(connV11, "sub1", true);
+
+ sendMessage(getName(), topic);
+
+ frame = connV11.receiveFrame(1000);
+ assertNull(frame);
+
+ connV11.disconnect();
+ }
+
+ public void testSubscribeToTopicWithNoLocal() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ this.subscribeTopic(connV11, "sub1", null, null, true, true);
+
+ // send a message on the same connection => it should not be received
+ ClientStompFrame frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getTopicPrefix() + getTopicName());
+
+ frame.setBody("Hello World");
+
+ connV11.sendFrame(frame);
+
+ frame = connV11.receiveFrame(2000);
+
+ assertNull(frame);
+
+ // send message on another JMS connection => it should be received
+ sendMessage(getName(), topic);
+
+ frame = connV11.receiveFrame();
+
+ Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
+ Assert.assertTrue(frame.getHeader("destination").equals(getTopicPrefix() + getTopicName()));
+ Assert.assertTrue(frame.getBody().equals(getName()));
+
+ this.unsubscribe(connV11, "sub1");
+
+ connV11.disconnect();
+ }
+
+ public void testSubscribeWithAutoAck() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ this.subscribe(connV11, "sub1", "auto");
+
+ sendMessage(getName());
+
+ ClientStompFrame frame = connV11.receiveFrame();
+
+ Assert.assertEquals("MESSAGE", frame.getCommand());
+ Assert.assertNotNull(frame.getHeader("destination"));
+ Assert.assertEquals(getName(), frame.getBody());
+
+ connV11.disconnect();
+
+ // message should not be received as it was auto-acked
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message message = consumer.receive(1000);
+ Assert.assertNull(message);
+ }
+
+ public void testSubscribeWithAutoAckAndBytesMessage() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ this.subscribe(connV11, "sub1", "auto");
+
+ byte[] payload = new byte[] { 1, 2, 3, 4, 5 };
+ sendMessage(payload, queue);
+
+ ClientStompFrame frame = connV11.receiveFrame();
+
+ assertEquals("MESSAGE", frame.getCommand());
+
+ System.out.println("Message: " + frame);
+
+ assertEquals("5", frame.getHeader("content-length"));
+
+ assertEquals(null, frame.getHeader("type"));
+
+ assertEquals(frame.getBody(), new String(payload, "UTF-8"));
+
+ connV11.disconnect();
+ }
+
+ public void testSubscribeWithClientAck() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ this.subscribe(connV11, "sub1", "client");
+
+ sendMessage(getName());
+
+ ClientStompFrame frame = connV11.receiveFrame();
+
+ this.ack(connV11, "sub1", frame);
+
+ connV11.disconnect();
+
+ // message should not be received since message was acknowledged by the client
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message message = consumer.receive(1000);
+ Assert.assertNull(message);
+ }
+
+ public void testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithExplicitDisconnect() throws Exception
+ {
+ assertSubscribeWithClientAckThenConsumeWithAutoAck(true);
+ }
+
+ public void testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithNoDisconnectFrame() throws Exception
+ {
+ assertSubscribeWithClientAckThenConsumeWithAutoAck(false);
+ }
+
+ public void testSubscribeWithID() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ this.subscribe(connV11, "mysubid", "auto");
+
+ sendMessage(getName());
+
+ ClientStompFrame frame = connV11.receiveFrame();
+
+ Assert.assertTrue(frame.getHeader("subscription") != null);
+
+ connV11.disconnect();
+ }
+
+ public void testSubscribeWithMessageSentWithProperties() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ this.subscribe(connV11, "sub1", "auto");
+
+ MessageProducer producer = session.createProducer(queue);
+ BytesMessage message = session.createBytesMessage();
+ message.setStringProperty("S", "value");
+ message.setBooleanProperty("n", false);
+ message.setByteProperty("byte", (byte)9);
+ message.setDoubleProperty("d", 2.0);
+ message.setFloatProperty("f", (float)6.0);
+ message.setIntProperty("i", 10);
+ message.setLongProperty("l", 121);
+ message.setShortProperty("s", (short)12);
+ message.writeBytes("Hello World".getBytes("UTF-8"));
+ producer.send(message);
+
+ ClientStompFrame frame = connV11.receiveFrame();
+ Assert.assertNotNull(frame);
+
+ Assert.assertTrue(frame.getHeader("S") != null);
+ Assert.assertTrue(frame.getHeader("n") != null);
+ Assert.assertTrue(frame.getHeader("byte") != null);
+ Assert.assertTrue(frame.getHeader("d") != null);
+ Assert.assertTrue(frame.getHeader("f") != null);
+ Assert.assertTrue(frame.getHeader("i") != null);
+ Assert.assertTrue(frame.getHeader("l") != null);
+ Assert.assertTrue(frame.getHeader("s") != null);
+ Assert.assertEquals("Hello World", frame.getBody());
+
+ connV11.disconnect();
+ }
+
+ public void testSuccessiveTransactionsWithSameID() throws Exception
+ {
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ connV11.connect(defUser, defPass);
+
+ // first tx
+ this.beginTransaction(connV11, "tx1");
+
+ ClientStompFrame frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("transaction", "tx1");
+ frame.setBody("Hello World");
+
+ connV11.sendFrame(frame);
+
+ this.commitTransaction(connV11, "tx1");
+
+ Message message = consumer.receive(1000);
+ Assert.assertNotNull("Should have received a message", message);
+
+ // 2nd tx with same tx ID
+ this.beginTransaction(connV11, "tx1");
+
+ frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("transaction", "tx1");
+
+ frame.setBody("Hello World");
+
+ connV11.sendFrame(frame);
+
+ this.commitTransaction(connV11, "tx1");
+
+ message = consumer.receive(1000);
+ Assert.assertNotNull("Should have received a message", message);
+
+ connV11.disconnect();
+ }
+
+ public void testTransactionCommit() throws Exception
+ {
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ connV11.connect(defUser, defPass);
+
+ this.beginTransaction(connV11, "tx1");
+
+ ClientStompFrame frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("transaction", "tx1");
+ frame.addHeader("receipt", "123");
+ frame.setBody("Hello World");
+
+ frame = connV11.sendFrame(frame);
+
+ assertEquals("123", frame.getHeader("receipt-id"));
+
+ // check the message is not committed
+ assertNull(consumer.receive(100));
+
+ this.commitTransaction(connV11, "tx1", true);
+
+ Message message = consumer.receive(1000);
+ Assert.assertNotNull("Should have received a message", message);
+
+ connV11.disconnect();
+ }
+
+ public void testTransactionRollback() throws Exception
+ {
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ connV11.connect(defUser, defPass);
+
+ this.beginTransaction(connV11, "tx1");
+
+ ClientStompFrame frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("transaction", "tx1");
+
+ frame.setBody("first message");
+
+ connV11.sendFrame(frame);
+
+ // rollback first message
+ this.abortTransaction(connV11, "tx1");
+
+ this.beginTransaction(connV11, "tx1");
+
+ frame = connV11.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("transaction", "tx1");
+
+ frame.setBody("second message");
+
+ connV11.sendFrame(frame);
+
+ this.commitTransaction(connV11, "tx1", true);
+
+ // only second msg should be received since first msg was rolled back
+ TextMessage message = (TextMessage)consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("second message", message.getText());
+
+ connV11.disconnect();
+ }
+
+ public void testUnsubscribe() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ this.subscribe(connV11, "sub1", "auto");
+
+ // send a message to our queue
+ sendMessage("first message");
+
+ // receive message from socket
+ ClientStompFrame frame = connV11.receiveFrame();
+
+ Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
+
+ // remove suscription
+ this.unsubscribe(connV11, "sub1", true);
+
+ // send a message to our queue
+ sendMessage("second message");
+
+ frame = connV11.receiveFrame(1000);
+ assertNull(frame);
+
+ connV11.disconnect();
+ }
+
+ //-----------------private help methods
+
+ private void abortTransaction(StompClientConnection conn, String txID) throws IOException, InterruptedException
+ {
+ ClientStompFrame abortFrame = conn.createFrame("ABORT");
+ abortFrame.addHeader("transaction", txID);
+
+ conn.sendFrame(abortFrame);
+ }
+
+ private void beginTransaction(StompClientConnection conn, String txID) throws IOException, InterruptedException
+ {
+ ClientStompFrame beginFrame = conn.createFrame("BEGIN");
+ beginFrame.addHeader("transaction", txID);
+
+ conn.sendFrame(beginFrame);
+ }
+
+ private void commitTransaction(StompClientConnection conn, String txID) throws IOException, InterruptedException
+ {
+ commitTransaction(conn, txID, false);
+ }
+
+ private void commitTransaction(StompClientConnection conn, String txID, boolean receipt) throws IOException, InterruptedException
+ {
+ ClientStompFrame beginFrame = conn.createFrame("COMMIT");
+ beginFrame.addHeader("transaction", txID);
+ if (receipt)
+ {
+ beginFrame.addHeader("receipt", "1234");
+ }
+ ClientStompFrame resp = conn.sendFrame(beginFrame);
+ if (receipt)
+ {
+ assertEquals("1234", resp.getHeader("receipt-id"));
+ }
+ }
+
+ private void ack(StompClientConnection conn, String subId,
+ ClientStompFrame frame) throws IOException, InterruptedException
+ {
+ String messageID = frame.getHeader("message-id");
+
+ ClientStompFrame ackFrame = conn.createFrame("ACK");
+
+ ackFrame.addHeader("subscription", subId);
+ ackFrame.addHeader("message-id", messageID);
+
+ ClientStompFrame response = conn.sendFrame(ackFrame);
+ if (response != null)
+ {
+ throw new IOException("failed to ack " + response);
+ }
+ }
+
+ private void ack(StompClientConnection conn, String subId, String mid, String txID) throws IOException, InterruptedException
+ {
+ ClientStompFrame ackFrame = conn.createFrame("ACK");
+ ackFrame.addHeader("subscription", subId);
+ ackFrame.addHeader("message-id", mid);
+ if (txID != null)
+ {
+ ackFrame.addHeader("transaction", txID);
+ }
+
+ conn.sendFrame(ackFrame);
+ }
+
+ private void nack(StompClientConnection conn, String subId, String mid) throws IOException, InterruptedException
+ {
+ ClientStompFrame ackFrame = conn.createFrame("NACK");
+ ackFrame.addHeader("subscription", subId);
+ ackFrame.addHeader("message-id", mid);
+
+ conn.sendFrame(ackFrame);
+ }
+
+ private void subscribe(StompClientConnection conn, String subId, String ack) throws IOException, InterruptedException
+ {
+ subscribe(conn, subId, ack, null, null);
+ }
+
+ private void subscribe(StompClientConnection conn, String subId,
+ String ack, String durableId) throws IOException, InterruptedException
+ {
+ subscribe(conn, subId, ack, durableId, null);
+ }
+
+ private void subscribe(StompClientConnection conn, String subId,
+ String ack, String durableId, boolean receipt) throws IOException, InterruptedException
+ {
+ subscribe(conn, subId, ack, durableId, null, receipt);
+ }
+
+ private void subscribe(StompClientConnection conn, String subId, String ack,
+ String durableId, String selector) throws IOException,
+ InterruptedException
+ {
+ subscribe(conn, subId, ack, durableId, selector, false);
+ }
+
+ private void subscribe(StompClientConnection conn, String subId,
+ String ack, String durableId, String selector, boolean receipt) throws IOException, InterruptedException
+ {
+ ClientStompFrame subFrame = conn.createFrame("SUBSCRIBE");
+ subFrame.addHeader("id", subId);
+ subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+ if (ack != null)
+ {
+ subFrame.addHeader("ack", ack);
+ }
+ if (durableId != null)
+ {
+ subFrame.addHeader("durable-subscriber-name", durableId);
+ }
+ if (selector != null)
+ {
+ subFrame.addHeader("selector", selector);
+ }
+ if (receipt)
+ {
+ subFrame.addHeader("receipt", "1234");
+ }
+
+ subFrame = conn.sendFrame(subFrame);
+
+ if (receipt)
+ {
+ assertEquals("1234", subFrame.getHeader("receipt-id"));
+ }
+ }
+
+ private void subscribeTopic(StompClientConnection conn, String subId,
+ String ack, String durableId) throws IOException, InterruptedException
+ {
+ subscribeTopic(conn, subId, ack, durableId, false);
+ }
+
+ private void subscribeTopic(StompClientConnection conn, String subId,
+ String ack, String durableId, boolean receipt) throws IOException, InterruptedException
+ {
+ subscribeTopic(conn, subId, ack, durableId, receipt, false);
+ }
+
+ private void subscribeTopic(StompClientConnection conn, String subId,
+ String ack, String durableId, boolean receipt, boolean noLocal) throws IOException, InterruptedException
+ {
+ ClientStompFrame subFrame = conn.createFrame("SUBSCRIBE");
+ subFrame.addHeader("id", subId);
+ subFrame.addHeader("destination", getTopicPrefix() + getTopicName());
+ if (ack != null)
+ {
+ subFrame.addHeader("ack", ack);
+ }
+ if (durableId != null)
+ {
+ subFrame.addHeader("durable-subscriber-name", durableId);
+ }
+ if (receipt)
+ {
+ subFrame.addHeader("receipt", "1234");
+ }
+ if (noLocal)
+ {
+ subFrame.addHeader("no-local", "true");
+ }
+
+ ClientStompFrame frame = conn.sendFrame(subFrame);
+
+ if (receipt)
+ {
+ assertTrue(frame.getHeader("receipt-id").equals("1234"));
+ }
+ }
+
+ private void unsubscribe(StompClientConnection conn, String subId) throws IOException, InterruptedException
+ {
+ ClientStompFrame subFrame = conn.createFrame("UNSUBSCRIBE");
+ subFrame.addHeader("id", subId);
+
+ conn.sendFrame(subFrame);
+ }
+
+ private void unsubscribe(StompClientConnection conn, String subId,
+ boolean receipt) throws IOException, InterruptedException
+ {
+ ClientStompFrame subFrame = conn.createFrame("UNSUBSCRIBE");
+ subFrame.addHeader("id", subId);
+
+ if (receipt)
+ {
+ subFrame.addHeader("receipt", "4321");
+ }
+
+ ClientStompFrame f = conn.sendFrame(subFrame);
+
+ if (receipt)
+ {
+ System.out.println("response: " + f);
+ assertEquals("RECEIPT", f.getCommand());
+ assertEquals("4321", f.getHeader("receipt-id"));
+ }
+ }
+
+ protected void assertSubscribeWithClientAckThenConsumeWithAutoAck(boolean sendDisconnect) throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ this.subscribe(connV11, "sub1", "client");
+
+ sendMessage(getName());
+
+ ClientStompFrame frame = connV11.receiveFrame();
+
+ Assert.assertEquals("MESSAGE", frame.getCommand());
+
+ log.info("Reconnecting!");
+
+ if (sendDisconnect)
+ {
+ connV11.disconnect();
+ connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ }
+ else
+ {
+ connV11.destroy();
+ connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ }
+
+ // message should be received since message was not acknowledged
+ connV11.connect(defUser, defPass);
+
+ this.subscribe(connV11, "sub1", null);
+
+ frame = connV11.receiveFrame();
+ Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
+
+ connV11.disconnect();
+
+ // now lets make sure we don't see the message again
+ connV11.destroy();
+ connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ connV11.connect(defUser, defPass);
+
+ this.subscribe(connV11, "sub1", null, null, true);
+
+ sendMessage("shouldBeNextMessage");
+
+ frame = connV11.receiveFrame();
+ Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
+ Assert.assertEquals("shouldBeNextMessage", frame.getBody());
+ }
+
+}
+
+
+
+
+
12 years, 5 months
JBoss hornetq SVN: r11873 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-12-07 15:53:32 -0500 (Wed, 07 Dec 2011)
New Revision: 11873
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
Log:
JBPAPP-6589 - changing log
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-12-07 15:28:10 UTC (rev 11872)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-12-07 20:53:32 UTC (rev 11873)
@@ -1074,7 +1074,7 @@
if (consumer == null)
{
- ServerSessionImpl.log.error("There is no consumer with id " + consumerID);
+ ServerSessionImpl.log.debug("There is no consumer with id " + consumerID);
return;
}
12 years, 5 months