JBoss hornetq SVN: r11831 - in trunk/tests: integration-tests/src/test/java/org/hornetq/tests/integration/clientcrash and 3 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2011-12-05 09:19:17 -0500 (Mon, 05 Dec 2011)
New Revision: 11831
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/CommitRollbackTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/clientcrash/ClientCrashTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/clientcrash/ClientExitTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/clientcrash/ClientTestBase.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/ra/HornetQMessageHandlerSecurityTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/ra/HornetQMessageHandlerTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/ra/HornetQMessageHandlerXATest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/ra/HornetQRATestBase.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/ra/OutgoingConnectionTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/remoting/DirectDeliverTest.java
trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/journal/CompactingStressTest.java
Log:
Move closing of resources to super.tearDown()
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/CommitRollbackTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/CommitRollbackTest.java 2011-12-05 14:18:54 UTC (rev 11830)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/CommitRollbackTest.java 2011-12-05 14:19:17 UTC (rev 11831)
@@ -19,7 +19,13 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.MessageHandler;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
import org.hornetq.tests.util.ServiceTestBase;
@@ -42,12 +48,10 @@
public void testReceiveWithCommit() throws Exception
{
HornetQServer server = createServer(false);
- try
- {
- server.start();
+ server.start();
ServerLocator locator = createInVMNonHALocator();
- ClientSessionFactory cf = locator.createSessionFactory();
+ ClientSessionFactory cf = createSessionFactory(locator);
ClientSession sendSession = cf.createSession(false, true, true);
ClientSession session = cf.createSession(false, false, false);
sendSession.createQueue(addressA, queueA, false);
@@ -71,24 +75,15 @@
Assert.assertEquals(0, q.getDeliveringCount());
session.close();
sendSession.close();
- }
- finally
- {
- if (server.isStarted())
- {
- server.stop();
}
- }
- }
public void testReceiveWithRollback() throws Exception
{
HornetQServer server = createServer(false);
- try
- {
+
server.start();
ServerLocator locator = createInVMNonHALocator();
- ClientSessionFactory cf = locator.createSessionFactory();
+ ClientSessionFactory cf = createSessionFactory(locator);
ClientSession sendSession = cf.createSession(false, true, true);
ClientSession session = cf.createSession(false, false, false);
sendSession.createQueue(addressA, queueA, false);
@@ -118,25 +113,15 @@
Assert.assertEquals(numMessages, q.getDeliveringCount());
session.close();
sendSession.close();
- cf.close();
- }
- finally
- {
- if (server.isStarted())
- {
- server.stop();
- }
- }
}
public void testReceiveWithRollbackMultipleConsumersDifferentQueues() throws Exception
{
HornetQServer server = createServer(false);
- try
- {
+
server.start();
ServerLocator locator = createInVMNonHALocator();
- ClientSessionFactory cf = locator.createSessionFactory();
+ ClientSessionFactory cf = createSessionFactory(locator);
ClientSession sendSession = cf.createSession(false, true, true);
ClientSession session = cf.createSession(false, false, false);
sendSession.createQueue(addressA, queueA, false);
@@ -173,26 +158,16 @@
Assert.assertEquals(numMessages, q.getMessageCount());
sendSession.close();
session.close();
- }
- finally
- {
- if (server.isStarted())
- {
- server.stop();
- }
- }
}
public void testAsyncConsumerCommit() throws Exception
{
HornetQServer server = createServer(false);
- try
- {
- server.start();
+ server.start();
ServerLocator locator = createInVMNonHALocator();
locator.setBlockOnAcknowledge(true);
locator.setAckBatchSize(0);
- ClientSessionFactory cf = locator.createSessionFactory();
+ ClientSessionFactory cf = createSessionFactory(locator);
ClientSession sendSession = cf.createSession(false, true, true);
final ClientSession session = cf.createSession(false, true, false);
sendSession.createQueue(addressA, queueA, false);
@@ -236,27 +211,17 @@
Assert.assertEquals(0, q.getMessageCount());
sendSession.close();
session.close();
- cf.close();
- }
- finally
- {
- if (server.isStarted())
- {
- server.stop();
- }
- }
+
}
public void testAsyncConsumerRollback() throws Exception
{
HornetQServer server = createServer(false);
- try
- {
- server.start();
+ server.start();
ServerLocator locator = createInVMNonHALocator();
locator.setBlockOnAcknowledge(true);
locator.setAckBatchSize(0);
- ClientSessionFactory cf = locator.createSessionFactory();
+ ClientSessionFactory cf = createSessionFactory(locator);
ClientSession sendSession = cf.createSession(false, true, true);
final ClientSession session = cf.createSession(false, true, false);
sendSession.createQueue(addressA, queueA, false);
@@ -285,14 +250,7 @@
sendSession.close();
session.close();
cf.close();
- }
- finally
- {
- if (server.isStarted())
- {
- server.stop();
- }
- }
+
}
private static class ackHandler implements MessageHandler
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/clientcrash/ClientCrashTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/clientcrash/ClientCrashTest.java 2011-12-05 14:18:54 UTC (rev 11830)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/clientcrash/ClientCrashTest.java 2011-12-05 14:19:17 UTC (rev 11831)
@@ -18,9 +18,14 @@
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
import org.hornetq.jms.client.HornetQTextMessage;
import org.hornetq.tests.util.SpawnedVMSupport;
@@ -113,18 +118,18 @@
// FIXME https://jira.jboss.org/jira/browse/JBMESSAGING-1421
assertActiveSession(0);
}
-
+
public void testCrashClient2() throws Exception
- {
+ {
assertActiveConnections(1);
ClientSession session = sf.createSession(false, true, true);
-
+
session.createQueue(ClientCrashTest.QUEUE, ClientCrashTest.QUEUE, null, false);
-
+
// spawn a JVM that creates a Core client, which sends a message
Process p = SpawnedVMSupport.spawnVM(CrashClient2.class.getName());
-
+
ClientCrashTest.log.debug("waiting for the client VM to crash ...");
p.waitFor();
@@ -133,9 +138,9 @@
System.out.println("VM Exited");
Thread.sleep(3 * ClientCrashTest.CONNECTION_TTL);
-
+
ClientConsumer consumer = session.createConsumer(ClientCrashTest.QUEUE);
-
+
session.start();
// receive a message from the queue
@@ -144,7 +149,7 @@
Assert.assertEquals(ClientCrashTest.MESSAGE_TEXT_FROM_CLIENT, messageFromClient.getBodyBuffer().readString());
assertEquals(2, messageFromClient.getDeliveryCount());
-
+
session.close();
}
@@ -157,26 +162,9 @@
super.setUp();
locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(NettyConnectorFactory.class.getName()));
-
+ addServerLocator(locator);
locator.setClientFailureCheckPeriod(ClientCrashTest.PING_PERIOD);
locator.setConnectionTTL(ClientCrashTest.CONNECTION_TTL);
- sf = locator.createSessionFactory();
+ sf = createSessionFactory(locator);
}
-
- @Override
- protected void tearDown() throws Exception
- {
- // sf.close();
-
- sf = null;
- locator.close();
- super.tearDown();
- }
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
}
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/clientcrash/ClientExitTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/clientcrash/ClientExitTest.java 2011-12-05 14:18:54 UTC (rev 11830)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/clientcrash/ClientExitTest.java 2011-12-05 14:19:17 UTC (rev 11831)
@@ -18,7 +18,11 @@
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
import org.hornetq.tests.util.RandomUtil;
@@ -81,7 +85,7 @@
// FIXME https://jira.jboss.org/jira/browse/JBMESSAGING-1421
// Thread.sleep(1000);
- //
+ //
// // the local session
// assertActiveConnections(1);
// // assertActiveSession(1);
@@ -102,17 +106,11 @@
super.setUp();
ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(NettyConnectorFactory.class.getName()));
- ClientSessionFactory sf = locator.createSessionFactory();
+ addServerLocator(locator);
+ ClientSessionFactory sf = createSessionFactory(locator);
session = sf.createSession(false, true, true);
session.createQueue(ClientExitTest.QUEUE, ClientExitTest.QUEUE, null, false);
consumer = session.createConsumer(ClientExitTest.QUEUE);
session.start();
}
-
- // Protected ------------------------------------------------------------------------------------
-
- // Private --------------------------------------------------------------------------------------
-
- // Inner classes --------------------------------------------------------------------------------
-
}
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/clientcrash/ClientTestBase.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/clientcrash/ClientTestBase.java 2011-12-05 14:18:54 UTC (rev 11830)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/clientcrash/ClientTestBase.java 2011-12-05 14:19:17 UTC (rev 11831)
@@ -29,22 +29,8 @@
public abstract class ClientTestBase extends ServiceTestBase
{
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
private HornetQServer server;
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
@Override
protected void setUp() throws Exception
{
@@ -56,16 +42,6 @@
server.start();
}
- @Override
- protected void tearDown() throws Exception
- {
- server.stop();
-
- server = null;
-
- super.tearDown();
- }
-
protected void assertActiveConnections(final int expectedActiveConnections) throws Exception
{
Assert.assertEquals(expectedActiveConnections, server.getHornetQServerControl().getConnectionCount());
@@ -76,8 +52,4 @@
Assert.assertEquals(expectedActiveSession, server.getSessions().size());
}
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
}
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/ra/HornetQMessageHandlerSecurityTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/ra/HornetQMessageHandlerSecurityTest.java 2011-12-05 14:18:54 UTC (rev 11830)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/ra/HornetQMessageHandlerSecurityTest.java 2011-12-05 14:19:17 UTC (rev 11831)
@@ -12,23 +12,16 @@
*/
package org.hornetq.tests.integration.ra;
-import org.hornetq.api.core.HornetQException;
-import org.hornetq.api.core.client.ClientMessage;
-import org.hornetq.api.core.client.ClientProducer;
-import org.hornetq.api.core.client.ClientSession;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.impl.LocalQueueBinding;
import org.hornetq.core.security.Role;
-import org.hornetq.core.server.impl.QueueImpl;
import org.hornetq.ra.HornetQResourceAdapter;
import org.hornetq.ra.inflow.HornetQActivationSpec;
-import javax.resource.ResourceException;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
/**
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
* Created Jul 6, 2010
@@ -70,9 +63,9 @@
server.getSecurityManager().addUser("testuser", "testpassword");
server.getSecurityManager().addRole("testuser", "arole");
Role role = new Role("arole", false, true, false, false, false, false, false);
- Set<Role> roles = new HashSet<Role>();
- roles.add(role);
- server.getSecurityRepository().addMatch(MDBQUEUEPREFIXED, roles);
+ Set<Role> roles = new HashSet<Role>();
+ roles.add(role);
+ server.getSecurityRepository().addMatch(MDBQUEUEPREFIXED, roles);
HornetQResourceAdapter qResourceAdapter = new HornetQResourceAdapter();
MyBootstrapContext ctx = new MyBootstrapContext();
qResourceAdapter.start(ctx);
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/ra/HornetQMessageHandlerTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/ra/HornetQMessageHandlerTest.java 2011-12-05 14:18:54 UTC (rev 11830)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/ra/HornetQMessageHandlerTest.java 2011-12-05 14:19:17 UTC (rev 11831)
@@ -12,26 +12,26 @@
*/
package org.hornetq.tests.integration.ra;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Message;
+
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ClientProducer;
import org.hornetq.api.core.client.ClientSession;
-import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.impl.LocalQueueBinding;
import org.hornetq.ra.HornetQResourceAdapter;
import org.hornetq.ra.inflow.HornetQActivationSpec;
-import javax.jms.Message;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
/**
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
* Created May 20, 2010
*/
public class HornetQMessageHandlerTest extends HornetQRATestBase
{
-
+
@Override
public boolean isSecure()
{
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/ra/HornetQMessageHandlerXATest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/ra/HornetQMessageHandlerXATest.java 2011-12-05 14:18:54 UTC (rev 11830)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/ra/HornetQMessageHandlerXATest.java 2011-12-05 14:19:17 UTC (rev 11831)
@@ -12,6 +12,15 @@
*/
package org.hornetq.tests.integration.ra;
+import java.lang.reflect.Method;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.resource.ResourceException;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ClientProducer;
import org.hornetq.api.core.client.ClientSession;
@@ -20,14 +29,6 @@
import org.hornetq.ra.inflow.HornetQActivationSpec;
import org.hornetq.utils.UUIDGenerator;
-import javax.resource.ResourceException;
-import javax.transaction.xa.XAException;
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-import java.lang.reflect.Method;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
/**
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
* Created Jul 6, 2010
@@ -55,7 +56,7 @@
XADummyEndpoint endpoint = new XADummyEndpoint(latch);
DummyMessageEndpointFactory endpointFactory = new DummyMessageEndpointFactory(endpoint, true);
qResourceAdapter.endpointActivation(endpointFactory, spec);
- ClientSession session = locator.createSessionFactory().createSession();
+ ClientSession session = createSessionFactory(locator).createSession();
ClientProducer clientProducer = session.createProducer(MDBQUEUEPREFIXED);
ClientMessage message = session.createMessage(true);
message.getBodyBuffer().writeString("teststring");
@@ -87,7 +88,7 @@
XADummyEndpoint endpoint = new XADummyEndpoint(latch);
DummyMessageEndpointFactory endpointFactory = new DummyMessageEndpointFactory(endpoint, true);
qResourceAdapter.endpointActivation(endpointFactory, spec);
- ClientSession session = locator.createSessionFactory().createSession();
+ ClientSession session = createSessionFactory(locator).createSession();
ClientProducer clientProducer = session.createProducer(MDBQUEUEPREFIXED);
ClientMessage message = session.createMessage(true);
message.getBodyBuffer().writeString("teststring");
@@ -110,7 +111,7 @@
class XADummyEndpoint extends DummyMessageEndpoint
{
- private Xid xid;
+ private final Xid xid;
public XADummyEndpoint(CountDownLatch latch)
{
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/ra/HornetQRATestBase.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/ra/HornetQRATestBase.java 2011-12-05 14:18:54 UTC (rev 11830)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/ra/HornetQRATestBase.java 2011-12-05 14:19:17 UTC (rev 11831)
@@ -12,12 +12,9 @@
*/
package org.hornetq.tests.integration.ra;
-import org.hornetq.api.core.SimpleString;
-import org.hornetq.api.core.client.ServerLocator;
-import org.hornetq.core.config.Configuration;
-import org.hornetq.core.server.HornetQServer;
-import org.hornetq.jms.client.HornetQMessage;
-import org.hornetq.tests.util.ServiceTestBase;
+import java.lang.reflect.Method;
+import java.util.Timer;
+import java.util.concurrent.CountDownLatch;
import javax.jms.Message;
import javax.jms.MessageListener;
@@ -27,12 +24,20 @@
import javax.resource.spi.XATerminator;
import javax.resource.spi.endpoint.MessageEndpoint;
import javax.resource.spi.endpoint.MessageEndpointFactory;
-import javax.resource.spi.work.*;
+import javax.resource.spi.work.ExecutionContext;
+import javax.resource.spi.work.Work;
+import javax.resource.spi.work.WorkException;
+import javax.resource.spi.work.WorkListener;
+import javax.resource.spi.work.WorkManager;
import javax.transaction.xa.XAResource;
-import java.lang.reflect.Method;
-import java.util.Timer;
-import java.util.concurrent.CountDownLatch;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.jms.client.HornetQMessage;
+import org.hornetq.tests.util.ServiceTestBase;
+
/**
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
* Created Jul 6, 2010
@@ -62,31 +67,11 @@
server.createQueue(MDBQUEUEPREFIXEDSIMPLE, MDBQUEUEPREFIXEDSIMPLE, null, true, false);
}
- @Override
- protected void tearDown() throws Exception
- {
- locator.close();
-
- locator = null;
- if (server != null)
- {
- try
- {
- server.stop();
- server = null;
- }
- catch (Exception e)
- {
- // ignore
- }
- }
- super.tearDown();
- }
public abstract boolean isSecure();
class DummyMessageEndpointFactory implements MessageEndpointFactory
{
- private DummyMessageEndpoint endpoint;
+ private final DummyMessageEndpoint endpoint;
private final boolean isDeliveryTransacted;
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/ra/OutgoingConnectionTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/ra/OutgoingConnectionTest.java 2011-12-05 14:18:54 UTC (rev 11830)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/ra/OutgoingConnectionTest.java 2011-12-05 14:19:17 UTC (rev 11831)
@@ -50,7 +50,7 @@
public class OutgoingConnectionTest extends HornetQRATestBase
{
private HornetQResourceAdapter resourceAdapter;
-
+
@Override
public boolean isSecure()
{
@@ -71,7 +71,7 @@
roles.add(role);
server.getSecurityRepository().addMatch(MDBQUEUEPREFIXED, roles);
}
-
+
@Override
protected void tearDown() throws Exception
{
@@ -103,7 +103,7 @@
TextMessage textMessage = (TextMessage) consumer.receive(1000);
assertNotNull(textMessage);
assertEquals(textMessage.getText(), "test");
-
+
ManagedConnection mc = ((HornetQRASession)s).getManagedConnection();
s.close();
mc.destroy();
@@ -175,7 +175,7 @@
assertNotNull(textMessage);
assertEquals(textMessage.getText(), "test");
s.commit();
-
+
ManagedConnection mc = ((HornetQRASession)s).getManagedConnection();
s.close();
mc.destroy();
@@ -195,14 +195,14 @@
Session s = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
try
{
- Session s2 = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
fail("should throw javax,jms.IllegalStateException: Only allowed one session per connection. See the J2EE spec, e.g. J2EE1.4 Section 6.6");
}
catch (JMSException e)
{
assertTrue(e.getLinkedException() instanceof IllegalStateException);
}
-
+
ManagedConnection mc = ((HornetQRASession)s).getManagedConnection();
s.close();
mc.destroy();
@@ -227,7 +227,7 @@
queueConnection = qraConnectionFactory.createQueueConnection("testuser", "testpassword");
session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
-
+
mc = ((HornetQRASession)session).getManagedConnection();
queueConnection.close();
mc.destroy();
@@ -246,7 +246,7 @@
HornetQRAConnectionFactory qraConnectionFactory = new HornetQRAConnectionFactoryImpl(mcf, qraConnectionManager);
QueueConnection queueConnection = qraConnectionFactory.createQueueConnection();
QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
-
+
ManagedConnection mc = ((HornetQRASession)session).getManagedConnection();
queueConnection.close();
mc.destroy();
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/remoting/DirectDeliverTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/remoting/DirectDeliverTest.java 2011-12-05 14:18:54 UTC (rev 11830)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/remoting/DirectDeliverTest.java 2011-12-05 14:19:17 UTC (rev 11831)
@@ -26,8 +26,6 @@
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
-import org.hornetq.core.logging.Logger;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
@@ -38,7 +36,7 @@
import org.hornetq.tests.util.ServiceTestBase;
/**
- *
+ *
* A DirectDeliverTest
*
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
@@ -48,26 +46,10 @@
public class DirectDeliverTest extends ServiceTestBase
{
- // Constants -----------------------------------------------------
-
- private static final Logger log = Logger.getLogger(DirectDeliverTest.class);
-
- // Attributes ----------------------------------------------------
-
private HornetQServer server;
-
+
private ServerLocator locator;
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
@Override
protected void setUp() throws Exception
{
@@ -84,43 +66,27 @@
config.setSecurityEnabled(false);
server = createServer(false, config);
server.start();
-
+
locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(NettyConnectorFactory.class.getName()));
+ addServerLocator(locator);
}
- @Override
- protected void tearDown() throws Exception
- {
- locator.close();
-
- server.stop();
-
- server = null;
-
- super.tearDown();
- }
-
- protected ClientSessionFactory createSessionFactory() throws Exception
- {
- return locator.createSessionFactory();
- }
-
public void testDirectDeliver() throws Exception
{
final String foo = "foo";
-
- ClientSessionFactory sf = createSessionFactory();
+ ClientSessionFactory sf = createSessionFactory(locator);
+
ClientSession session = sf.createSession();
session.createQueue(foo, foo);
Binding binding = server.getPostOffice().getBinding(new SimpleString(foo));
-
+
Queue queue = (Queue)binding.getBindable();
-
+
assertTrue(queue.isDirectDeliver());
-
+
ClientProducer prod = session.createProducer(foo);
ClientConsumer cons = session.createConsumer(foo);
@@ -133,12 +99,12 @@
prod.send(msg);
}
-
+
queue.flushExecutor();
-
+
//Consumer is not started so should go queued
assertFalse(queue.isDirectDeliver());
-
+
session.start();
for (int i = 0; i < numMessages; i++)
@@ -149,18 +115,18 @@
msg.acknowledge();
}
-
+
Thread.sleep((long)(QueueImpl.CHECK_QUEUE_SIZE_PERIOD * 1.5));
-
+
//Add another message, should go direct
ClientMessage msg = session.createMessage(true);
prod.send(msg);
-
+
queue.flushExecutor();
-
+
assertTrue(queue.isDirectDeliver());
-
+
//Send some more
for (int i = 0; i < numMessages; i++)
{
@@ -168,7 +134,7 @@
prod.send(msg);
}
-
+
for (int i = 0; i < numMessages + 1; i++)
{
msg = cons.receive(10000);
@@ -177,26 +143,19 @@
msg.acknowledge();
}
-
+
assertTrue(queue.isDirectDeliver());
-
+
session.stop();
-
+
for (int i = 0; i < numMessages; i++)
{
msg = session.createMessage(true);
prod.send(msg);
}
-
- assertFalse(queue.isDirectDeliver());
-
- sf.close();
+ assertFalse(queue.isDirectDeliver());
}
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
}
Modified: trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/journal/CompactingStressTest.java
===================================================================
--- trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/journal/CompactingStressTest.java 2011-12-05 14:18:54 UTC (rev 11830)
+++ trunk/tests/stress-tests/src/test/java/org/hornetq/tests/stress/journal/CompactingStressTest.java 2011-12-05 14:19:17 UTC (rev 11831)
@@ -20,7 +20,12 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Message;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.impl.ConfigurationImpl;
@@ -135,7 +140,7 @@
session.close();
server.stop();
-
+
setupServer(journalType);
server.start();
@@ -183,8 +188,6 @@
{
ClientProducer producer = session.createProducer(CompactingStressTest.AD3);
- byte[] buffer = new byte[10 * 1024];
-
ClientMessage msg = session.createMessage(true);
for (int i = 0; i < CompactingStressTest.TOT_AD3; i++)
13 years, 1 month
JBoss hornetq SVN: r11830 - in trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration: largemessage and 1 other directory.
by do-not-reply@jboss.org
Author: ataylor
Date: 2011-12-05 09:18:54 -0500 (Mon, 05 Dec 2011)
New Revision: 11830
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/InVMNonPersistentMessageBufferTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/LargeMessageCompressTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/LargeMessageTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java
Log:
test suite refactor
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java 2011-12-05 14:14:43 UTC (rev 11829)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java 2011-12-05 14:18:54 UTC (rev 11830)
@@ -71,14 +71,6 @@
locator = createFactory(isNetty());
}
- @Override
- protected void tearDown() throws Exception
- {
- locator.close();
-
- super.tearDown();
- }
-
private int getMessageEncodeSize(final SimpleString address) throws Exception
{
ServerLocator locator = createInVMNonHALocator();
@@ -97,62 +89,55 @@
public void testReceiveImmediateWithZeroWindow() throws Exception
{
HornetQServer server = createServer(false, isNetty());
- try
- {
- server.start();
- locator.setConsumerWindowSize(0);
+ server.start();
- ClientSessionFactory sf = locator.createSessionFactory();
+ locator.setConsumerWindowSize(0);
- ClientSession session = sf.createSession(false, false, false);
- session.createQueue("testWindow", "testWindow", true);
- session.close();
+ ClientSessionFactory sf = locator.createSessionFactory();
- int numConsumers = 5;
+ ClientSession session = sf.createSession(false, false, false);
+ session.createQueue("testWindow", "testWindow", true);
+ session.close();
- ArrayList<ClientConsumer> consumers = new ArrayList<ClientConsumer>();
- ArrayList<ClientSession> sessions = new ArrayList<ClientSession>();
- for (int i = 0; i < numConsumers; i++)
- {
- ClientSession session1 = sf.createSession();
- ClientConsumer consumer = session1.createConsumer("testWindow");
- consumers.add(consumer);
- session1.start();
- sessions.add(session1);
- consumer.receiveImmediate();
+ int numConsumers = 5;
- }
+ ArrayList<ClientConsumer> consumers = new ArrayList<ClientConsumer>();
+ ArrayList<ClientSession> sessions = new ArrayList<ClientSession>();
+ for (int i = 0; i < numConsumers; i++)
+ {
+ ClientSession session1 = sf.createSession();
+ ClientConsumer consumer = session1.createConsumer("testWindow");
+ consumers.add(consumer);
+ session1.start();
+ sessions.add(session1);
+ consumer.receiveImmediate();
- ClientSession senderSession = sf.createSession(false, false);
+ }
- ClientProducer producer = senderSession.createProducer("testWindow");
+ ClientSession senderSession = sf.createSession(false, false);
- ClientMessage sent = senderSession.createMessage(true);
- sent.putStringProperty("hello", "world");
- producer.send(sent);
+ ClientProducer producer = senderSession.createProducer("testWindow");
- senderSession.commit();
+ ClientMessage sent = senderSession.createMessage(true);
+ sent.putStringProperty("hello", "world");
+ producer.send(sent);
- senderSession.start();
+ senderSession.commit();
- ClientConsumer consumer = consumers.get(2);
- ClientMessage received = consumer.receive(1000);
- assertNotNull(received);
+ senderSession.start();
- for (ClientSession tmpSess : sessions)
- {
- tmpSess.close();
- }
+ ClientConsumer consumer = consumers.get(2);
+ ClientMessage received = consumer.receive(1000);
+ assertNotNull(received);
- senderSession.close();
-
- }
- finally
+ for (ClientSession tmpSess : sessions)
{
- server.stop();
+ tmpSess.close();
}
+ senderSession.close();
+
}
// https://jira.jboss.org/jira/browse/HORNETQ-385
@@ -204,7 +189,6 @@
finally
{
locator.close();
- server.stop();
}
}
@@ -212,125 +196,109 @@
public void testReceiveImmediateWithZeroWindow3() throws Exception
{
HornetQServer server = createServer(false, isNetty());
- try
- {
- server.start();
- locator.setConsumerWindowSize(0);
+ server.start();
- ClientSessionFactory sf = locator.createSessionFactory();
+ locator.setConsumerWindowSize(0);
- ClientSession session = sf.createSession(false, false, false);
- session.createQueue("testWindow", "testWindow", true);
- session.close();
+ ClientSessionFactory sf = locator.createSessionFactory();
- int numConsumers = 5;
+ ClientSession session = sf.createSession(false, false, false);
+ session.createQueue("testWindow", "testWindow", true);
+ session.close();
- ArrayList<ClientConsumer> consumers = new ArrayList<ClientConsumer>();
- ArrayList<ClientSession> sessions = new ArrayList<ClientSession>();
- for (int i = 0; i < numConsumers; i++)
- {
- ClientSession session1 = sf.createSession();
- ClientConsumer consumer = session1.createConsumer("testWindow");
- consumers.add(consumer);
- session1.start();
- sessions.add(session1);
- consumer.receive(10);
+ int numConsumers = 5;
- }
+ ArrayList<ClientConsumer> consumers = new ArrayList<ClientConsumer>();
+ ArrayList<ClientSession> sessions = new ArrayList<ClientSession>();
+ for (int i = 0; i < numConsumers; i++)
+ {
+ ClientSession session1 = sf.createSession();
+ ClientConsumer consumer = session1.createConsumer("testWindow");
+ consumers.add(consumer);
+ session1.start();
+ sessions.add(session1);
+ consumer.receive(10);
- ClientSession senderSession = sf.createSession(false, false);
+ }
- ClientProducer producer = senderSession.createProducer("testWindow");
+ ClientSession senderSession = sf.createSession(false, false);
- ClientMessage sent = senderSession.createMessage(true);
- sent.putStringProperty("hello", "world");
+ ClientProducer producer = senderSession.createProducer("testWindow");
- producer.send(sent);
+ ClientMessage sent = senderSession.createMessage(true);
+ sent.putStringProperty("hello", "world");
- senderSession.commit();
+ producer.send(sent);
- senderSession.start();
+ senderSession.commit();
- ClientConsumer consumer = consumers.get(2);
- ClientMessage received = consumer.receive(1000);
- assertNotNull(received);
+ senderSession.start();
- for (ClientSession tmpSess : sessions)
- {
- tmpSess.close();
- }
+ ClientConsumer consumer = consumers.get(2);
+ ClientMessage received = consumer.receive(1000);
+ assertNotNull(received);
- senderSession.close();
-
- }
- finally
+ for (ClientSession tmpSess : sessions)
{
- server.stop();
+ tmpSess.close();
}
+ senderSession.close();
}
public void testReceiveImmediateWithZeroWindow4() throws Exception
{
HornetQServer server = createServer(false, isNetty());
- try
- {
- server.start();
- locator.setConsumerWindowSize(0);
+ server.start();
- ClientSessionFactory sf = locator.createSessionFactory();
+ locator.setConsumerWindowSize(0);
- ClientSession session = sf.createSession(false, false, false);
- session.createQueue("testWindow", "testWindow", true);
- session.close();
+ ClientSessionFactory sf = locator.createSessionFactory();
- int numConsumers = 5;
+ ClientSession session = sf.createSession(false, false, false);
+ session.createQueue("testWindow", "testWindow", true);
+ session.close();
- ArrayList<ClientConsumer> consumers = new ArrayList<ClientConsumer>();
- ArrayList<ClientSession> sessions = new ArrayList<ClientSession>();
- for (int i = 0; i < numConsumers; i++)
- {
- ClientSession session1 = sf.createSession();
- ClientConsumer consumer = session1.createConsumer("testWindow");
- consumers.add(consumer);
- session1.start();
- sessions.add(session1);
- consumer.receive(10);
+ int numConsumers = 5;
- }
+ ArrayList<ClientConsumer> consumers = new ArrayList<ClientConsumer>();
+ ArrayList<ClientSession> sessions = new ArrayList<ClientSession>();
+ for (int i = 0; i < numConsumers; i++)
+ {
+ ClientSession session1 = sf.createSession();
+ ClientConsumer consumer = session1.createConsumer("testWindow");
+ consumers.add(consumer);
+ session1.start();
+ sessions.add(session1);
+ consumer.receive(10);
- ClientSession senderSession = sf.createSession(false, false);
+ }
- ClientProducer producer = senderSession.createProducer("testWindow");
+ ClientSession senderSession = sf.createSession(false, false);
- ClientMessage sent = senderSession.createMessage(true);
- sent.putStringProperty("hello", "world");
+ ClientProducer producer = senderSession.createProducer("testWindow");
- producer.send(sent);
+ ClientMessage sent = senderSession.createMessage(true);
+ sent.putStringProperty("hello", "world");
- senderSession.commit();
+ producer.send(sent);
- senderSession.start();
+ senderSession.commit();
- ClientConsumer consumer = consumers.get(2);
- ClientMessage received = consumer.receiveImmediate();
- assertNotNull(received);
+ senderSession.start();
- for (ClientSession tmpSess : sessions)
- {
- tmpSess.close();
- }
+ ClientConsumer consumer = consumers.get(2);
+ ClientMessage received = consumer.receiveImmediate();
+ assertNotNull(received);
- senderSession.close();
-
- }
- finally
+ for (ClientSession tmpSess : sessions)
{
- server.stop();
+ tmpSess.close();
}
+ senderSession.close();
}
/*
@@ -342,55 +310,45 @@
{
HornetQServer messagingService = createServer(false, isNetty());
locator.setBlockOnNonDurableSend(false);
- try
- {
- messagingService.start();
- int numMessage = 100;
- locator.setConsumerWindowSize(numMessage * getMessageEncodeSize(addressA));
- ClientSessionFactory cf = locator.createSessionFactory();
- ClientSession sendSession = cf.createSession(false, true, true);
- ClientSession receiveSession = cf.createSession(false, true, true);
- sendSession.createQueue(addressA, queueA, false);
- ClientConsumer receivingConsumer = receiveSession.createConsumer(queueA);
- ClientSession session = cf.createSession(false, true, true);
- ClientProducer cp = sendSession.createProducer(addressA);
- ClientConsumer cc = session.createConsumer(queueA);
- session.start();
- receiveSession.start();
- for (int i = 0; i < numMessage * 4; i++)
- {
- cp.send(sendSession.createMessage(false));
- }
+ messagingService.start();
+ int numMessage = 100;
+ locator.setConsumerWindowSize(numMessage * getMessageEncodeSize(addressA));
+ ClientSessionFactory cf = locator.createSessionFactory();
+ ClientSession sendSession = cf.createSession(false, true, true);
+ ClientSession receiveSession = cf.createSession(false, true, true);
+ sendSession.createQueue(addressA, queueA, false);
+ ClientConsumer receivingConsumer = receiveSession.createConsumer(queueA);
- for (int i = 0; i < numMessage * 2; i++)
- {
- ClientMessage m = receivingConsumer.receive(5000);
- Assert.assertNotNull(m);
- m.acknowledge();
- }
- receiveSession.close();
+ ClientSession session = cf.createSession(false, true, true);
+ ClientProducer cp = sendSession.createProducer(addressA);
+ ClientConsumer cc = session.createConsumer(queueA);
+ session.start();
+ receiveSession.start();
+ for (int i = 0; i < numMessage * 4; i++)
+ {
+ cp.send(sendSession.createMessage(false));
+ }
- for (int i = 0; i < numMessage * 2; i++)
- {
- ClientMessage m = cc.receive(5000);
- Assert.assertNotNull(m);
- m.acknowledge();
- }
-
- session.close();
- sendSession.close();
-
- Assert.assertEquals(0, getMessageCount(messagingService, queueA.toString()));
-
+ for (int i = 0; i < numMessage * 2; i++)
+ {
+ ClientMessage m = receivingConsumer.receive(5000);
+ Assert.assertNotNull(m);
+ m.acknowledge();
}
- finally
+ receiveSession.close();
+
+ for (int i = 0; i < numMessage * 2; i++)
{
- if (messagingService.isStarted())
- {
- messagingService.stop();
- }
+ ClientMessage m = cc.receive(5000);
+ Assert.assertNotNull(m);
+ m.acknowledge();
}
+
+ session.close();
+ sendSession.close();
+
+ Assert.assertEquals(0, getMessageCount(messagingService, queueA.toString()));
}
public void testSlowConsumerBufferingOne() throws Exception
@@ -468,11 +426,6 @@
catch (Exception ignored)
{
}
-
- if (server.isStarted())
- {
- server.stop();
- }
}
}
@@ -600,11 +553,6 @@
catch (Exception ignored)
{
}
-
- if (server.isStarted())
- {
- server.stop();
- }
}
}
@@ -796,11 +744,6 @@
catch (Exception ignored)
{
}
-
- if (server.isStarted())
- {
- server.stop();
- }
}
}
@@ -875,11 +818,6 @@
catch (Exception ignored)
{
}
-
- if (server.isStarted())
- {
- server.stop();
- }
}
}
@@ -1011,11 +949,6 @@
catch (Exception ignored)
{
}
-
- if (server.isStarted())
- {
- server.stop();
- }
}
}
@@ -1023,7 +956,6 @@
public void internalTestSlowConsumerOnMessageHandlerNoBuffers(final boolean largeMessages) throws Exception
{
-
HornetQServer server = createServer(false, isNetty());
ClientSession sessionB = null;
@@ -1175,11 +1107,6 @@
catch (Exception ignored)
{
}
-
- if (server.isStarted())
- {
- server.stop();
- }
}
}
@@ -1347,11 +1274,6 @@
{
ignored.printStackTrace();
}
-
- if (server.isStarted())
- {
- server.stop();
- }
}
}
@@ -1485,11 +1407,6 @@
catch (Exception ignored)
{
}
-
- if (server.isStarted())
- {
- server.stop();
- }
}
}
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/InVMNonPersistentMessageBufferTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/InVMNonPersistentMessageBufferTest.java 2011-12-05 14:14:43 UTC (rev 11829)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/InVMNonPersistentMessageBufferTest.java 2011-12-05 14:18:54 UTC (rev 11830)
@@ -257,11 +257,6 @@
session.close();
}
- if (server.isStarted())
- {
- server.stop();
- }
-
super.tearDown();
}
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java 2011-12-05 14:14:43 UTC (rev 11829)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java 2011-12-05 14:18:54 UTC (rev 11830)
@@ -79,13 +79,6 @@
locator = createFactory(isNetty());
}
- @Override
- protected void tearDown() throws Exception
- {
- locator.close();
- super.tearDown();
- }
-
protected boolean isNetty()
{
return false;
@@ -101,7 +94,7 @@
try
{
- server = createServer(true, isNetty());
+ HornetQServer server = createServer(true, isNetty());
server.getConfiguration()
.getInterceptorClassNames()
@@ -144,14 +137,6 @@
{
try
{
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
-
- try
- {
session.close();
}
catch (Throwable ignored)
@@ -170,7 +155,7 @@
try
{
- server = createServer(true, isNetty());
+ HornetQServer server = createServer(true, isNetty());
// server.getConfiguration()
// .getInterceptorClassNames()
@@ -231,14 +216,6 @@
{
try
{
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
-
- try
- {
session.close();
}
catch (Throwable ignored)
@@ -257,7 +234,7 @@
try
{
- server = createServer(true, createDefaultConfig(isNetty()), 10000, 20000, new HashMap<String, AddressSettings>());
+ HornetQServer server = createServer(true, createDefaultConfig(isNetty()), 10000, 20000, new HashMap<String, AddressSettings>());
// server.getConfiguration()
// .getInterceptorClassNames()
@@ -340,14 +317,6 @@
{
try
{
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
-
- try
- {
session.close();
}
catch (Throwable ignored)
@@ -366,7 +335,7 @@
try
{
- server = createServer(true, createDefaultConfig(isNetty()), 10000, 20000, new HashMap<String, AddressSettings>());
+ HornetQServer server = createServer(true, createDefaultConfig(isNetty()), 10000, 20000, new HashMap<String, AddressSettings>());
// server.getConfiguration()
// .getInterceptorClassNames()
@@ -484,14 +453,6 @@
{
try
{
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
-
- try
- {
session.close();
}
catch (Throwable ignored)
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/LargeMessageCompressTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/LargeMessageCompressTest.java 2011-12-05 14:14:43 UTC (rev 11829)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/LargeMessageCompressTest.java 2011-12-05 14:18:54 UTC (rev 11830)
@@ -27,6 +27,7 @@
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.server.HornetQServer;
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.UnitTestCase;
@@ -65,7 +66,7 @@
try
{
- server = createServer(true, isNetty());
+ HornetQServer server = createServer(true, isNetty());
server.start();
@@ -108,14 +109,6 @@
{
try
{
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
-
- try
- {
session.close();
}
catch (Throwable ignored)
@@ -132,7 +125,7 @@
try
{
- server = createServer(true, isNetty());
+ HornetQServer server = createServer(true, isNetty());
server.start();
@@ -187,14 +180,6 @@
{
try
{
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
-
- try
- {
session.close();
}
catch (Throwable ignored)
@@ -211,7 +196,7 @@
try
{
- server = createServer(true, isNetty());
+ HornetQServer server = createServer(true, isNetty());
server.start();
@@ -264,14 +249,6 @@
{
try
{
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
-
- try
- {
session.close();
}
catch (Throwable ignored)
@@ -290,7 +267,7 @@
try
{
- server = createServer(true, isNetty());
+ HornetQServer server = createServer(true, isNetty());
server.start();
@@ -369,14 +346,6 @@
{
try
{
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
-
- try
- {
session.close();
}
catch (Throwable ignored)
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/LargeMessageTest.java 2011-12-05 14:14:43 UTC (rev 11829)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/LargeMessageTest.java 2011-12-05 14:18:54 UTC (rev 11830)
@@ -107,107 +107,93 @@
final ClientSession session;
- try
- {
- server = createServer(true, isNetty());
+ HornetQServer server = createServer(true, isNetty());
- AddressSettings settings = new AddressSettings();
- if (redeliveryDelay)
+ AddressSettings settings = new AddressSettings();
+ if (redeliveryDelay)
+ {
+ settings.setRedeliveryDelay(1000);
+ if (locator.isCompressLargeMessage())
{
- settings.setRedeliveryDelay(1000);
- if (locator.isCompressLargeMessage())
- {
- locator.setConsumerWindowSize(0);
- }
+ locator.setConsumerWindowSize(0);
}
- settings.setMaxDeliveryAttempts(-1);
+ }
+ settings.setMaxDeliveryAttempts(-1);
- server.getAddressSettingsRepository().addMatch("#", settings);
+ server.getAddressSettingsRepository().addMatch("#", settings);
- server.start();
+ server.start();
- ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSessionFactory sf = locator.createSessionFactory();
- session = sf.createSession(false, false, false);
+ session = sf.createSession(false, false, false);
- session.createQueue(ADDRESS, ADDRESS, true);
+ session.createQueue(ADDRESS, ADDRESS, true);
- ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+ ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
- for (int i = 0 ; i < 20; i++)
- {
- Message clientFile = createLargeClientMessage(session, messageSize, true);
+ for (int i = 0 ; i < 20; i++)
+ {
+ Message clientFile = createLargeClientMessage(session, messageSize, true);
- clientFile.putIntProperty("value", i);
+ clientFile.putIntProperty("value", i);
- producer.send(clientFile);
- }
+ producer.send(clientFile);
+ }
- session.commit();
+ session.commit();
- session.start();
+ session.start();
- final CountDownLatch latch = new CountDownLatch(1);
+ final CountDownLatch latch = new CountDownLatch(1);
- final AtomicInteger errors = new AtomicInteger(0);
+ final AtomicInteger errors = new AtomicInteger(0);
- ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS);
+ ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS);
- consumer.setMessageHandler(new MessageHandler()
+ consumer.setMessageHandler(new MessageHandler()
+ {
+ int counter = 0;
+ public void onMessage(ClientMessage message)
{
- int counter = 0;
- public void onMessage(ClientMessage message)
+ message.getBodyBuffer().readByte();
+ // System.out.println("message:" + message);
+ try
{
- message.getBodyBuffer().readByte();
- // System.out.println("message:" + message);
- try
+ if (counter ++ < 20)
{
- if (counter ++ < 20)
- {
- Thread.sleep(100);
- // System.out.println("Rollback");
- message.acknowledge();
- session.rollback();
- }
- else
- {
- message.acknowledge();
- session.commit();
- }
-
- if (counter == 40)
- {
- latch.countDown();
- }
+ Thread.sleep(100);
+ // System.out.println("Rollback");
+ message.acknowledge();
+ session.rollback();
}
- catch (Exception e)
+ else
{
+ message.acknowledge();
+ session.commit();
+ }
+
+ if (counter == 40)
+ {
latch.countDown();
- e.printStackTrace();
- errors.incrementAndGet();
}
}
- });
+ catch (Exception e)
+ {
+ latch.countDown();
+ e.printStackTrace();
+ errors.incrementAndGet();
+ }
+ }
+ });
- assertTrue(latch.await(40, TimeUnit.SECONDS));
+ assertTrue(latch.await(40, TimeUnit.SECONDS));
- consumer.close();
+ consumer.close();
- session.close();
+ session.close();
- validateNoFilesOnLargeDir();
- }
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
-
+ validateNoFilesOnLargeDir();
}
public void testCloseConsumer() throws Exception
@@ -218,7 +204,7 @@
try
{
- server = createServer(true, isNetty());
+ HornetQServer server = createServer(true, isNetty());
server.start();
@@ -268,14 +254,6 @@
catch (Throwable ignored)
{
}
-
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
}
}
@@ -305,7 +283,7 @@
config.setJournalBufferSize_AIO(10 * 1024);
config.setJournalBufferSize_NIO(10 * 1024);
- server = createServer(true, config);
+ HornetQServer server = createServer(true, config);
server.start();
@@ -362,14 +340,6 @@
{
try
{
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
-
- try
- {
session.close();
}
catch (Throwable ignored)
@@ -386,7 +356,7 @@
try
{
- server = createServer(true, isNetty());
+ HornetQServer server = createServer(true, isNetty());
server.start();
@@ -489,14 +459,6 @@
{
try
{
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
-
- try
- {
session.close();
}
catch (Throwable ignored)
@@ -513,7 +475,7 @@
try
{
- server = createServer(true, isNetty());
+ HornetQServer server = createServer(true, isNetty());
server.start();
@@ -572,14 +534,6 @@
{
try
{
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
-
- try
- {
session.close();
}
catch (Throwable ignored)
@@ -596,7 +550,7 @@
try
{
- server = createServer(true, isNetty());
+ HornetQServer server = createServer(true, isNetty());
server.start();
@@ -708,14 +662,6 @@
{
try
{
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
-
- try
- {
session.close();
}
catch (Throwable ignored)
@@ -732,7 +678,7 @@
try
{
- server = createServer(true, isNetty());
+ HornetQServer server = createServer(true, isNetty());
server.start();
@@ -843,14 +789,6 @@
{
try
{
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
-
- try
- {
session.close();
}
catch (Throwable ignored)
@@ -867,7 +805,7 @@
try
{
- server = createServer(true, isNetty());
+ HornetQServer server = createServer(true, isNetty());
server.start();
@@ -950,14 +888,6 @@
{
try
{
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
-
- try
- {
session.close();
}
catch (Throwable ignored)
@@ -983,7 +913,7 @@
try
{
- server = createServer(true, isNetty());
+ HornetQServer server = createServer(true, isNetty());
server.start();
@@ -1039,14 +969,6 @@
{
try
{
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
-
- try
- {
session.close();
}
catch (Throwable ignored)
@@ -1071,7 +993,7 @@
try
{
- server = createServer(true, isNetty());
+ HornetQServer server = createServer(true, isNetty());
server.start();
@@ -1126,14 +1048,6 @@
{
try
{
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
-
- try
- {
session.close();
}
catch (Throwable ignored)
@@ -2002,73 +1916,58 @@
// there are two bindings.. one is ACKed, the other is not, the server is restarted
// The other binding is acked... The file must be deleted
- try
- {
+ HornetQServer server = createServer(true, isNetty());
- server = createServer(true, isNetty());
+ server.start();
- server.start();
+ SimpleString queue[] = new SimpleString[] { new SimpleString("queue1"), new SimpleString("queue2") };
- SimpleString queue[] = new SimpleString[] { new SimpleString("queue1"), new SimpleString("queue2") };
+ ClientSessionFactory sf = locator.createSessionFactory();
- ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
- ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
+ session.createQueue(LargeMessageTest.ADDRESS, queue[0], null, true);
+ session.createQueue(LargeMessageTest.ADDRESS, queue[1], null, true);
- session.createQueue(LargeMessageTest.ADDRESS, queue[0], null, true);
- session.createQueue(LargeMessageTest.ADDRESS, queue[1], null, true);
+ int numberOfBytes = 400000;
- int numberOfBytes = 400000;
+ Message clientFile = createLargeClientMessage(session, numberOfBytes);
- Message clientFile = createLargeClientMessage(session, numberOfBytes);
+ ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
- ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+ session.start();
- session.start();
+ producer.send(clientFile);
- producer.send(clientFile);
+ producer.close();
- producer.close();
+ ClientConsumer consumer = session.createConsumer(queue[1]);
+ ClientMessage msg = consumer.receive(LargeMessageTest.RECEIVE_WAIT_TIME);
+ msg.getBodyBuffer().readByte();
+ Assert.assertNull(consumer.receiveImmediate());
+ Assert.assertNotNull(msg);
- ClientConsumer consumer = session.createConsumer(queue[1]);
- ClientMessage msg = consumer.receive(LargeMessageTest.RECEIVE_WAIT_TIME);
- msg.getBodyBuffer().readByte();
- Assert.assertNull(consumer.receiveImmediate());
- Assert.assertNotNull(msg);
+ msg.acknowledge();
+ consumer.close();
- msg.acknowledge();
- consumer.close();
+ log.debug("Stopping");
- log.debug("Stopping");
+ session.stop();
- session.stop();
+ ClientConsumer consumer1 = session.createConsumer(queue[0]);
- ClientConsumer consumer1 = session.createConsumer(queue[0]);
+ session.start();
- session.start();
+ msg = consumer1.receive(LargeMessageTest.RECEIVE_WAIT_TIME);
+ Assert.assertNotNull(msg);
+ msg.acknowledge();
+ consumer1.close();
- msg = consumer1.receive(LargeMessageTest.RECEIVE_WAIT_TIME);
- Assert.assertNotNull(msg);
- msg.acknowledge();
- consumer1.close();
+ session.commit();
- session.commit();
+ session.close();
- session.close();
-
- validateNoFilesOnLargeDir();
- }
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
-
+ validateNoFilesOnLargeDir();
}
public void testTwoBindingsAndRestart() throws Exception
@@ -2085,66 +1984,50 @@
{
// there are two bindings.. one is ACKed, the other is not, the server is restarted
// The other binding is acked... The file must be deleted
+ HornetQServer server = createServer(true, isNetty());
- try
- {
+ server.start();
- server = createServer(true, isNetty());
+ SimpleString queue[] = new SimpleString[] { new SimpleString("queue1"), new SimpleString("queue2") };
- server.start();
+ ClientSessionFactory sf = locator.createSessionFactory();
- SimpleString queue[] = new SimpleString[] { new SimpleString("queue1"), new SimpleString("queue2") };
+ ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
- ClientSessionFactory sf = locator.createSessionFactory();
+ session.createQueue(LargeMessageTest.ADDRESS, queue[0], null, true);
+ session.createQueue(LargeMessageTest.ADDRESS, queue[1], null, true);
- ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
+ int numberOfBytes = 400000;
- session.createQueue(LargeMessageTest.ADDRESS, queue[0], null, true);
- session.createQueue(LargeMessageTest.ADDRESS, queue[1], null, true);
+ Message clientFile = createLargeClientMessage(session, numberOfBytes);
- int numberOfBytes = 400000;
+ ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+ producer.send(clientFile);
- Message clientFile = createLargeClientMessage(session, numberOfBytes);
+ producer.close();
- ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
- producer.send(clientFile);
+ readMessage(session, queue[1], numberOfBytes);
- producer.close();
+ if (restart)
+ {
+ session.close();
- readMessage(session, queue[1], numberOfBytes);
+ server.stop();
- if (restart)
- {
- session.close();
+ server = createServer(true, isNetty());
- server.stop();
+ server.start();
- server = createServer(true, isNetty());
+ sf = locator.createSessionFactory();
- server.start();
+ session = sf.createSession(null, null, false, true, true, false, 0);
+ }
- sf = locator.createSessionFactory();
+ readMessage(session, queue[0], numberOfBytes);
- session = sf.createSession(null, null, false, true, true, false, 0);
- }
+ session.close();
- readMessage(session, queue[0], numberOfBytes);
-
- session.close();
-
- validateNoFilesOnLargeDir();
- }
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
-
+ validateNoFilesOnLargeDir();
}
public void testSendRollbackXADurable() throws Exception
@@ -2171,75 +2054,53 @@
{
ClientSession session = null;
- try
- {
- server = createServer(true, isNetty());
+ HornetQServer server = createServer(true, isNetty());
- server.start();
+ server.start();
- ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSessionFactory sf = locator.createSessionFactory();
- session = sf.createSession(isXA, false, false);
+ session = sf.createSession(isXA, false, false);
- session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS, true);
+ session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS, true);
- Xid xid = null;
+ Xid xid = null;
- if (isXA)
- {
- xid = RandomUtil.randomXid();
- session.start(xid, XAResource.TMNOFLAGS);
- }
+ if (isXA)
+ {
+ xid = RandomUtil.randomXid();
+ session.start(xid, XAResource.TMNOFLAGS);
+ }
- ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+ ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
- Message clientFile = createLargeClientMessage(session, 50000, durable);
+ Message clientFile = createLargeClientMessage(session, 50000, durable);
- for (int i = 0; i < 1; i++)
- {
- producer.send(clientFile);
- }
+ for (int i = 0; i < 1; i++)
+ {
+ producer.send(clientFile);
+ }
- if (isXA)
- {
- session.end(xid, XAResource.TMSUCCESS);
- session.prepare(xid);
- session.close();
- server.stop();
- server.start();
- sf = locator.createSessionFactory();
- session = sf.createSession(isXA, false, false);
-
- session.rollback(xid);
- }
- else
- {
- session.rollback();
- }
-
+ if (isXA)
+ {
+ session.end(xid, XAResource.TMSUCCESS);
+ session.prepare(xid);
session.close();
+ server.stop();
+ server.start();
+ sf = locator.createSessionFactory();
+ session = sf.createSession(isXA, false, false);
- validateNoFilesOnLargeDir();
+ session.rollback(xid);
}
- finally
+ else
{
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
-
- try
- {
- session.close();
- }
- catch (Throwable ignored)
- {
- }
+ session.rollback();
}
+ session.close();
+
+ validateNoFilesOnLargeDir();
}
public void testSimpleRollback() throws Exception
@@ -2256,132 +2117,116 @@
{
// there are two bindings.. one is ACKed, the other is not, the server is restarted
// The other binding is acked... The file must be deleted
+ HornetQServer server = createServer(true, isNetty());
- try
+ server.start();
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(isXA, false, false);
+
+ Xid xid = null;
+
+ if (isXA)
{
+ xid = newXID();
+ session.start(xid, XAResource.TMNOFLAGS);
+ }
- server = createServer(true, isNetty());
+ session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS, null, true);
- server.start();
+ int numberOfBytes = 200000;
- ClientSessionFactory sf = locator.createSessionFactory();
+ session.start();
- ClientSession session = sf.createSession(isXA, false, false);
+ ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
- Xid xid = null;
+ ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS);
+ for (int n = 0; n < 10; n++)
+ {
+ Message clientFile = createLargeClientMessage(session, numberOfBytes, n % 2 == 0);
+
+ producer.send(clientFile);
+
+ Assert.assertNull(consumer.receiveImmediate());
+
if (isXA)
{
+ session.end(xid, XAResource.TMSUCCESS);
+ session.rollback(xid);
xid = newXID();
session.start(xid, XAResource.TMNOFLAGS);
}
+ else
+ {
+ session.rollback();
+ }
- session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS, null, true);
+ clientFile = createLargeClientMessage(session, numberOfBytes, n % 2 == 0);
- int numberOfBytes = 200000;
+ producer.send(clientFile);
- session.start();
+ Assert.assertNull(consumer.receiveImmediate());
- ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+ if (isXA)
+ {
+ session.end(xid, XAResource.TMSUCCESS);
+ session.commit(xid, true);
+ xid = newXID();
+ session.start(xid, XAResource.TMNOFLAGS);
+ }
+ else
+ {
+ session.commit();
+ }
- ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS);
-
- for (int n = 0; n < 10; n++)
+ for (int i = 0; i < 2; i++)
{
- Message clientFile = createLargeClientMessage(session, numberOfBytes, n % 2 == 0);
- producer.send(clientFile);
+ ClientMessage clientMessage = consumer.receive(5000);
- Assert.assertNull(consumer.receiveImmediate());
+ Assert.assertNotNull(clientMessage);
- if (isXA)
- {
- session.end(xid, XAResource.TMSUCCESS);
- session.rollback(xid);
- xid = newXID();
- session.start(xid, XAResource.TMNOFLAGS);
- }
- else
- {
- session.rollback();
- }
+ Assert.assertEquals(numberOfBytes, clientMessage.getBodySize());
- clientFile = createLargeClientMessage(session, numberOfBytes, n % 2 == 0);
+ clientMessage.acknowledge();
- producer.send(clientFile);
-
- Assert.assertNull(consumer.receiveImmediate());
-
if (isXA)
{
- session.end(xid, XAResource.TMSUCCESS);
- session.commit(xid, true);
- xid = newXID();
- session.start(xid, XAResource.TMNOFLAGS);
+ if (i == 0)
+ {
+ session.end(xid, XAResource.TMSUCCESS);
+ session.prepare(xid);
+ session.rollback(xid);
+ xid = newXID();
+ session.start(xid, XAResource.TMNOFLAGS);
+ }
+ else
+ {
+ session.end(xid, XAResource.TMSUCCESS);
+ session.commit(xid, true);
+ xid = newXID();
+ session.start(xid, XAResource.TMNOFLAGS);
+ }
}
else
{
- session.commit();
- }
-
- for (int i = 0; i < 2; i++)
- {
-
- ClientMessage clientMessage = consumer.receive(5000);
-
- Assert.assertNotNull(clientMessage);
-
- Assert.assertEquals(numberOfBytes, clientMessage.getBodySize());
-
- clientMessage.acknowledge();
-
- if (isXA)
+ if (i == 0)
{
- if (i == 0)
- {
- session.end(xid, XAResource.TMSUCCESS);
- session.prepare(xid);
- session.rollback(xid);
- xid = newXID();
- session.start(xid, XAResource.TMNOFLAGS);
- }
- else
- {
- session.end(xid, XAResource.TMSUCCESS);
- session.commit(xid, true);
- xid = newXID();
- session.start(xid, XAResource.TMNOFLAGS);
- }
+ session.rollback();
}
else
{
- if (i == 0)
- {
- session.rollback();
- }
- else
- {
- session.commit();
- }
+ session.commit();
}
}
}
-
- session.close();
-
- validateNoFilesOnLargeDir();
}
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
+ session.close();
+
+ validateNoFilesOnLargeDir();
}
public void testBufferMultipleLargeMessages() throws Exception
@@ -2604,144 +2449,128 @@
AddressSettings value = new AddressSettings();
map.put(LargeMessageTest.ADDRESS.toString(), value);
- server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map);
+ HornetQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map);
server.start();
final int numberOfBytes = 1024;
final int numberOfBytesBigMessage = 400000;
- try
- {
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setBlockOnAcknowledge(true);
+ ClientSessionFactory sf = locator.createSessionFactory();
- ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
- ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
+ session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS.concat("-0"), null, true);
+ session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS.concat("-1"), null, true);
- session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS.concat("-0"), null, true);
- session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS.concat("-1"), null, true);
+ ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
- ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+ ClientMessage message = null;
- ClientMessage message = null;
+ for (int i = 0; i < 100; i++)
+ {
+ message = session.createMessage(true);
- for (int i = 0; i < 100; i++)
- {
- message = session.createMessage(true);
+ message.getBodyBuffer().writerIndex(0);
- message.getBodyBuffer().writerIndex(0);
+ message.getBodyBuffer().writeBytes(new byte[numberOfBytes]);
- message.getBodyBuffer().writeBytes(new byte[numberOfBytes]);
-
- for (int j = 1; j <= numberOfBytes; j++)
- {
- message.getBodyBuffer().writeInt(j);
- }
-
- producer.send(message);
+ for (int j = 1; j <= numberOfBytes; j++)
+ {
+ message.getBodyBuffer().writeInt(j);
}
- ClientMessage clientFile = createLargeClientMessage(session, numberOfBytesBigMessage);
- clientFile.putBooleanProperty("TestLarge", true);
- producer.send(clientFile);
+ producer.send(message);
+ }
- for (int i = 0; i < 100; i++)
- {
- message = session.createMessage(true);
+ ClientMessage clientFile = createLargeClientMessage(session, numberOfBytesBigMessage);
+ clientFile.putBooleanProperty("TestLarge", true);
+ producer.send(clientFile);
- message.getBodyBuffer().writeBytes(new byte[numberOfBytes]);
+ for (int i = 0; i < 100; i++)
+ {
+ message = session.createMessage(true);
- producer.send(message);
- }
+ message.getBodyBuffer().writeBytes(new byte[numberOfBytes]);
- session.close();
+ producer.send(message);
+ }
- server.stop();
+ session.close();
- server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map);
- server.start();
+ server.stop();
- sf = locator.createSessionFactory();
+ server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map);
+ server.start();
- for (int ad = 0; ad < 2; ad++)
- {
- session = sf.createSession(false, false, false);
+ sf = locator.createSessionFactory();
- ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS.concat("-" + ad));
+ for (int ad = 0; ad < 2; ad++)
+ {
+ session = sf.createSession(false, false, false);
- session.start();
+ ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS.concat("-" + ad));
- for (int i = 0; i < 100; i++)
- {
- ClientMessage message2 = consumer.receive(LargeMessageTest.RECEIVE_WAIT_TIME);
+ session.start();
- Assert.assertNotNull(message2);
+ for (int i = 0; i < 100; i++)
+ {
+ ClientMessage message2 = consumer.receive(LargeMessageTest.RECEIVE_WAIT_TIME);
- message2.acknowledge();
+ Assert.assertNotNull(message2);
- Assert.assertNotNull(message2);
- }
+ message2.acknowledge();
- session.commit();
+ Assert.assertNotNull(message2);
+ }
- for (int i = 0; i < 5; i++)
- {
- ClientMessage messageLarge = consumer.receive(RECEIVE_WAIT_TIME);
+ session.commit();
- assertTrue(messageLarge.getBooleanProperty("TestLarge"));
+ for (int i = 0; i < 5; i++)
+ {
+ ClientMessage messageLarge = consumer.receive(RECEIVE_WAIT_TIME);
- Assert.assertNotNull(messageLarge);
+ assertTrue(messageLarge.getBooleanProperty("TestLarge"));
- ByteArrayOutputStream bout = new ByteArrayOutputStream();
- messageLarge.acknowledge();
- messageLarge.saveToOutputStream(bout);
- byte[] body = bout.toByteArray();
- assertEquals(numberOfBytesBigMessage, body.length);
- for (int bi = 0; bi < body.length; bi++)
- {
- assertEquals(getSamplebyte(bi), body[bi]);
- }
+ Assert.assertNotNull(messageLarge);
- if (i < 4)
- session.rollback();
- else
- session.commit();
+ ByteArrayOutputStream bout = new ByteArrayOutputStream();
+ messageLarge.acknowledge();
+ messageLarge.saveToOutputStream(bout);
+ byte[] body = bout.toByteArray();
+ assertEquals(numberOfBytesBigMessage, body.length);
+ for (int bi = 0; bi < body.length; bi++)
+ {
+ assertEquals(getSamplebyte(bi), body[bi]);
}
- for (int i = 0; i < 100; i++)
- {
- ClientMessage message2 = consumer.receive(LargeMessageTest.RECEIVE_WAIT_TIME);
+ if (i < 4)
+ session.rollback();
+ else
+ session.commit();
+ }
- Assert.assertNotNull(message2);
+ for (int i = 0; i < 100; i++)
+ {
+ ClientMessage message2 = consumer.receive(LargeMessageTest.RECEIVE_WAIT_TIME);
- message2.acknowledge();
+ Assert.assertNotNull(message2);
- Assert.assertNotNull(message2);
- }
+ message2.acknowledge();
- session.commit();
+ Assert.assertNotNull(message2);
+ }
- consumer.close();
+ session.commit();
- session.close();
+ consumer.close();
- }
+ session.close();
}
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
-
}
@@ -2758,128 +2587,111 @@
AddressSettings value = new AddressSettings();
map.put(LargeMessageTest.ADDRESS.toString(), value);
- server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map);
+ HornetQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map);
server.start();
final int numberOfBytes = 1024;
final int numberOfBytesBigMessage = 400000;
- try
- {
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+ locator.setCompressLargeMessage(true);
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setBlockOnAcknowledge(true);
- locator.setCompressLargeMessage(true);
+ ClientSessionFactory sf = locator.createSessionFactory();
- ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSession session = sf.createSession(false, true, true);
- ClientSession session = sf.createSession(false, true, true);
+ session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS.concat("-0"), null, true);
+ session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS.concat("-1"), null, true);
- session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS.concat("-0"), null, true);
- session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS.concat("-1"), null, true);
+ ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+ int msgId = 0;
- ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
- int msgId = 0;
+ for (int i = 0; i < 100; i++)
+ {
+ ClientMessage message = session.createMessage(true);
- for (int i = 0; i < 100; i++)
- {
- ClientMessage message = session.createMessage(true);
+ message.putIntProperty("msgID", msgId++);
- message.putIntProperty("msgID", msgId++);
+ message.putBooleanProperty("TestLarge", false);
- message.putBooleanProperty("TestLarge", false);
+ message.getBodyBuffer().writerIndex(0);
- message.getBodyBuffer().writerIndex(0);
+ message.getBodyBuffer().writeBytes(new byte[numberOfBytes]);
- message.getBodyBuffer().writeBytes(new byte[numberOfBytes]);
-
- for (int j = 1; j <= numberOfBytes; j++)
- {
- message.getBodyBuffer().writeInt(j);
- }
-
- producer.send(message);
+ for (int j = 1; j <= numberOfBytes; j++)
+ {
+ message.getBodyBuffer().writeInt(j);
}
+ producer.send(message);
+ }
- for (int i = 0; i < 10; i++)
- {
- ClientMessage clientFile = createLargeClientMessage(session, numberOfBytesBigMessage);
- clientFile.putBooleanProperty("TestLarge", true);
- producer.send(clientFile);
- }
- session.close();
+ for (int i = 0; i < 10; i++)
+ {
+ ClientMessage clientFile = createLargeClientMessage(session, numberOfBytesBigMessage);
+ clientFile.putBooleanProperty("TestLarge", true);
+ producer.send(clientFile);
+ }
- for (int ad = 0; ad < 2; ad++)
- {
- session = sf.createSession(false, false, false);
+ session.close();
- ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS.concat("-" + ad));
+ for (int ad = 0; ad < 2; ad++)
+ {
+ session = sf.createSession(false, false, false);
- session.start();
+ ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS.concat("-" + ad));
- for (int received = 0 ; received < 5; received++)
+ session.start();
+
+ for (int received = 0 ; received < 5; received++)
+ {
+ for (int i = 0; i < 100; i++)
{
- for (int i = 0; i < 100; i++)
- {
- ClientMessage message2 = consumer.receive(LargeMessageTest.RECEIVE_WAIT_TIME);
+ ClientMessage message2 = consumer.receive(LargeMessageTest.RECEIVE_WAIT_TIME);
- Assert.assertNotNull(message2);
+ Assert.assertNotNull(message2);
- assertFalse(message2.getBooleanProperty("TestLarge"));
+ assertFalse(message2.getBooleanProperty("TestLarge"));
- message2.acknowledge();
+ message2.acknowledge();
- Assert.assertNotNull(message2);
- }
+ Assert.assertNotNull(message2);
+ }
- for (int i = 0; i < 10; i++)
- {
- ClientMessage messageLarge = consumer.receive(RECEIVE_WAIT_TIME);
+ for (int i = 0; i < 10; i++)
+ {
+ ClientMessage messageLarge = consumer.receive(RECEIVE_WAIT_TIME);
- Assert.assertNotNull(messageLarge);
+ Assert.assertNotNull(messageLarge);
- assertTrue(messageLarge.getBooleanProperty("TestLarge"));
+ assertTrue(messageLarge.getBooleanProperty("TestLarge"));
- ByteArrayOutputStream bout = new ByteArrayOutputStream();
+ ByteArrayOutputStream bout = new ByteArrayOutputStream();
- messageLarge.acknowledge();
+ messageLarge.acknowledge();
- messageLarge.saveToOutputStream(bout);
- byte[] body = bout.toByteArray();
- assertEquals(numberOfBytesBigMessage, body.length);
- for (int bi = 0; bi < body.length; bi++)
- {
- assertEquals(getSamplebyte(bi), body[bi]);
- }
+ messageLarge.saveToOutputStream(bout);
+ byte[] body = bout.toByteArray();
+ assertEquals(numberOfBytesBigMessage, body.length);
+ for (int bi = 0; bi < body.length; bi++)
+ {
+ assertEquals(getSamplebyte(bi), body[bi]);
}
-
- session.rollback();
}
- session.commit();
+ session.rollback();
+ }
- consumer.close();
+ session.commit();
- session.close();
+ consumer.close();
- }
+ session.close();
}
- finally
- {
- locator.close();
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
-
}
public void testSendStreamingSingleMessage() throws Exception
@@ -3135,110 +2947,96 @@
AddressSettings value = new AddressSettings();
map.put(LargeMessageTest.ADDRESS.toString(), value);
- server = createServer(realFiles, config, PAGE_SIZE, PAGE_MAX, map);
+ HornetQServer server = createServer(realFiles, config, PAGE_SIZE, PAGE_MAX, map);
server.start();
final int numberOfBytes = 1024;
final int numberOfBytesBigMessage = 400000;
- try
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ if (sendBlocking)
{
- ClientSessionFactory sf = locator.createSessionFactory();
+ sf.getServerLocator().setBlockOnNonDurableSend(true);
+ sf.getServerLocator().setBlockOnDurableSend(true);
+ sf.getServerLocator().setBlockOnAcknowledge(true);
+ }
- if (sendBlocking)
- {
- sf.getServerLocator().setBlockOnNonDurableSend(true);
- sf.getServerLocator().setBlockOnDurableSend(true);
- sf.getServerLocator().setBlockOnAcknowledge(true);
- }
+ ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
- ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
+ session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS, null, true);
- session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS, null, true);
+ ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
- ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+ ClientMessage message = null;
- ClientMessage message = null;
+ for (int i = 0; i < 100; i++)
+ {
+ message = session.createMessage(true);
- for (int i = 0; i < 100; i++)
- {
- message = session.createMessage(true);
+ // TODO: Why do I need to reset the writerIndex?
+ message.getBodyBuffer().writerIndex(0);
- // TODO: Why do I need to reset the writerIndex?
- message.getBodyBuffer().writerIndex(0);
-
- for (int j = 1; j <= numberOfBytes; j++)
- {
- message.getBodyBuffer().writeInt(j);
- }
-
- producer.send(message);
+ for (int j = 1; j <= numberOfBytes; j++)
+ {
+ message.getBodyBuffer().writeInt(j);
}
- ClientMessage clientFile = createLargeClientMessage(session, numberOfBytesBigMessage);
+ producer.send(message);
+ }
- producer.send(clientFile);
+ ClientMessage clientFile = createLargeClientMessage(session, numberOfBytesBigMessage);
- session.close();
+ producer.send(clientFile);
- if (realFiles)
- {
- server.stop();
+ session.close();
- server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map);
- server.start();
+ if (realFiles)
+ {
+ server.stop();
- sf = locator.createSessionFactory();
- }
+ server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map);
+ server.start();
- session = sf.createSession(null, null, false, true, true, false, 0);
+ sf = locator.createSessionFactory();
+ }
- ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS);
+ session = sf.createSession(null, null, false, true, true, false, 0);
- session.start();
+ ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS);
- for (int i = 0; i < 100; i++)
- {
- ClientMessage message2 = consumer.receive(LargeMessageTest.RECEIVE_WAIT_TIME);
+ session.start();
- Assert.assertNotNull(message2);
+ for (int i = 0; i < 100; i++)
+ {
+ ClientMessage message2 = consumer.receive(LargeMessageTest.RECEIVE_WAIT_TIME);
- message2.acknowledge();
+ Assert.assertNotNull(message2);
- Assert.assertNotNull(message2);
+ message2.acknowledge();
- message.getBodyBuffer().readerIndex(0);
+ Assert.assertNotNull(message2);
- for (int j = 1; j <= numberOfBytes; j++)
- {
- Assert.assertEquals(j, message.getBodyBuffer().readInt());
- }
+ message.getBodyBuffer().readerIndex(0);
+
+ for (int j = 1; j <= numberOfBytes; j++)
+ {
+ Assert.assertEquals(j, message.getBodyBuffer().readInt());
}
+ }
- consumer.close();
+ consumer.close();
- session.close();
+ session.close();
- session = sf.createSession(null, null, false, true, true, false, 0);
+ session = sf.createSession(null, null, false, true, true, false, 0);
- readMessage(session, LargeMessageTest.ADDRESS, numberOfBytesBigMessage);
+ readMessage(session, LargeMessageTest.ADDRESS, numberOfBytesBigMessage);
- // printBuffer("message received : ", message2.getBody());
+ // printBuffer("message received : ", message2.getBody());
- session.close();
- }
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
-
+ session.close();
}
// Private -------------------------------------------------------
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java 2011-12-05 14:14:43 UTC (rev 11829)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java 2011-12-05 14:18:54 UTC (rev 11830)
@@ -56,8 +56,6 @@
protected final SimpleString ADDRESS = new SimpleString("SimpleAddress");
- protected HornetQServer server;
-
// Attributes ----------------------------------------------------
// Static --------------------------------------------------------
@@ -70,26 +68,6 @@
// Protected -----------------------------------------------------
- @Override
- protected void tearDown() throws Exception
- {
- if (server != null && server.isStarted())
- {
- try
- {
- server.stop();
- }
- catch (Exception e)
- {
- LargeMessageTestBase.log.warn(e.getMessage(), e);
- }
- }
-
- server = null;
-
- super.tearDown();
- }
-
protected void testChunks(final boolean isXA,
final boolean restartOnXA,
final boolean rollbackFirstSend,
@@ -139,7 +117,7 @@
{
clearData();
- server = createServer(realFiles);
+ HornetQServer server = createServer(realFiles);
server.start();
ServerLocator locator = createInVMNonHALocator();
13 years, 1 month
JBoss hornetq SVN: r11829 - trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2011-12-05 09:14:43 -0500 (Mon, 05 Dec 2011)
New Revision: 11829
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
Log:
fix decoding problem where it will by pass a frame if it is sent following another frame in one packet.
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2011-12-05 13:59:06 UTC (rev 11828)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2011-12-05 14:14:43 UTC (rev 11829)
@@ -829,7 +829,7 @@
System.arraycopy(decoder.workingBuffer, decoder.pos, content, 0, decoder.contentLength);
- decoder.pos += decoder.contentLength + 1;
+ decoder.pos += decoder.contentLength;
//drain all the rest
if (decoder.bodyStart == -1)
13 years, 1 month
JBoss hornetq SVN: r11828 - in trunk: tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover and 1 other directory.
by do-not-reply@jboss.org
Author: borges
Date: 2011-12-05 08:59:06 -0500 (Mon, 05 Dec 2011)
New Revision: 11828
Modified:
trunk/hornetq-core/src/test/java/org/hornetq/tests/util/ServiceTestBase.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
Log:
Delete session's close which is now all done at super class.
Modified: trunk/hornetq-core/src/test/java/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- trunk/hornetq-core/src/test/java/org/hornetq/tests/util/ServiceTestBase.java 2011-12-05 13:49:22 UTC (rev 11827)
+++ trunk/hornetq-core/src/test/java/org/hornetq/tests/util/ServiceTestBase.java 2011-12-05 13:59:06 UTC (rev 11828)
@@ -393,7 +393,7 @@
protected ClientSession addClientSession(ClientSession session)
{
synchronized (clientSessions)
- {
+ {
clientSessions.add(session);
}
return session;
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2011-12-05 13:49:22 UTC (rev 11827)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2011-12-05 13:59:06 UTC (rev 11828)
@@ -198,18 +198,13 @@
Assert.assertTrue(retry <= 5);
}
- /**
- * @throws Exception
- */
private void createClientSessionFactory() throws Exception
{
- sf = (ClientSessionFactoryInternal)locator.createSessionFactory();
- addSessionFactory(sf);
+ sf = (ClientSessionFactoryInternal)createSessionFactory(locator);
}
public void testNonTransacted() throws Exception
{
-
createSessionFactory();
ClientSession session = createSession(sf, true, true);
@@ -304,7 +299,6 @@
session.commit();
session.close();
-
}
// https://jira.jboss.org/jira/browse/HORNETQ-285
@@ -374,8 +368,6 @@
Assert.assertNull("message should be null! Was: " + message, message);
session.close();
-
-
}
/**
@@ -465,8 +457,6 @@
session.commit();
session.close();
-
-
}
public void testTransactedMessagesWithConsumerStartedBeforeFailover() throws Exception
@@ -508,8 +498,6 @@
Assert.assertNull(consumer.receiveImmediate());
session.commit();
-
- session.close();
}
public void testTransactedMessagesConsumedSoRollback() throws Exception
@@ -548,10 +536,6 @@
{
Assert.assertEquals(HornetQException.TRANSACTION_ROLLED_BACK, e.getCode());
}
-
- session1.close();
-
- session2.close();
}
public void testTransactedMessagesNotConsumedSoNoRollback() throws Exception
@@ -624,10 +608,6 @@
session2.commit();
Assert.assertNull(consumer.receiveImmediate());
-
- session1.close();
-
- session2.close();
}
public void testXAMessagesSentSoRollbackOnEnd() throws Exception
@@ -668,8 +648,6 @@
ClientMessage message = consumer.receiveImmediate();
Assert.assertNull(message);
-
- session.close();
}
public void testXAMessagesSentSoRollbackOnPrepare() throws Exception
@@ -716,7 +694,6 @@
producer.close();
consumer.close();
- session.close();
}
// This might happen if 1PC optimisation kicks in
@@ -760,8 +737,6 @@
ClientMessage message = consumer.receiveImmediate();
Assert.assertNull(message);
-
- session.close();
}
public void testXAMessagesNotSentSoNoRollbackOnCommit() throws Exception
@@ -805,8 +780,6 @@
session.prepare(xid2);
session.commit(xid2, false);
-
- session.close();
}
public void testXAMessagesConsumedSoRollbackOnEnd() throws Exception
@@ -847,10 +820,6 @@
{
Assert.assertEquals(XAException.XA_RBOTHER, e.errorCode);
}
-
- session1.close();
-
- session2.close();
}
public void testXAMessagesConsumedSoRollbackOnPrepare() throws Exception
@@ -893,10 +862,6 @@
{
Assert.assertEquals(XAException.XA_RBOTHER, e.errorCode);
}
-
- session1.close();
-
- session2.close();
}
// 1PC optimisation
@@ -966,8 +931,6 @@
createClientSessionFactory();
session = sendAndConsume(sf, true);
-
- session.close();
}
public void testFailoverMultipleSessionsWithConsumers() throws Exception
@@ -1029,8 +992,6 @@
{
session.close();
}
-
- sendSession.close();
}
/*
@@ -1056,8 +1017,6 @@
crash(session);
receiveDurableMessages(consumer);
-
- session.close();
}
private void sendMessagesSomeDurable(ClientSession session, ClientProducer producer) throws Exception, HornetQException
@@ -1106,8 +1065,6 @@
// Should get the same ones after failover since we didn't ack
receiveDurableMessages(consumer);
-
- session.close();
}
private void receiveDurableMessages(ClientConsumer consumer) throws HornetQException
@@ -1171,8 +1128,6 @@
// Should get the same ones after failover since we didn't ack
receiveMessages(consumer, NUM_MESSAGES, NUM_MESSAGES * 2, true);
-
- session.close();
}
private void receiveMessages(ClientConsumer consumer) throws HornetQException
@@ -1230,8 +1185,6 @@
sendMessagesSomeDurable(session, producer);
receiveMessages(consumer);
-
- session.close();
}
public void _testForceBlockingReturn() throws Exception
@@ -1430,8 +1383,6 @@
ClientMessage message = consumer.receiveImmediate();
Assert.assertNull(message);
-
- session2.close();
}
public void testCommitDidNotOccurUnblockedAndResend() throws Exception
@@ -1518,9 +1469,6 @@
ClientMessage message = consumer.receiveImmediate();
Assert.assertNull("expecting null message", message);
-
- session2.close();
-
}
public void testBackupServerNotRemoved() throws Exception
@@ -1554,8 +1502,6 @@
setBody(0, message);
producer.send(message);
-
- session.close();
}
public void testLiveAndBackupLiveComesBack() throws Exception
@@ -1601,9 +1547,6 @@
setBody(0, message);
producer.send(message);
-
- session.close();
-
}
public void testLiveAndBackupLiveComesBackNewFactory() throws Exception
@@ -1666,10 +1609,6 @@
assertNotNull(cm);
Assert.assertEquals("message0", cm.getBodyBuffer().readString());
-
- session.close();
-
-
}
public void testLiveAndBackupBackupComesBackNewFactory() throws Exception
@@ -1727,10 +1666,6 @@
assertNotNull(cm);
Assert.assertEquals("message0", cm.getBodyBuffer().readString());
-
- session.close();
-
-
}
// Package protected ---------------------------------------------
13 years, 1 month
JBoss hornetq SVN: r11827 - in trunk: tests/integration-tests/src/test/java/org/hornetq/tests/integration/client and 3 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2011-12-05 08:49:22 -0500 (Mon, 05 Dec 2011)
New Revision: 11827
Modified:
trunk/hornetq-core/src/test/java/org/hornetq/tests/util/ServiceTestBase.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/AckBatchSizeTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/AcknowledgeTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/AddressSettingsTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/AutogroupIdTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/AlmostLargeAsynchronousFailoverTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverOnFlowControlTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/LargeMessageFailoverTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedFailoverTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedNettyAsynchronousFailoverTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/SecurityFailoverTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/remoting/BatchDelayTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/xa/BasicXaTest.java
Log:
Keep track of servers created at ServiceTestBase, and close them at tearDown
Modified: trunk/hornetq-core/src/test/java/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- trunk/hornetq-core/src/test/java/org/hornetq/tests/util/ServiceTestBase.java 2011-12-05 12:43:53 UTC (rev 11826)
+++ trunk/hornetq-core/src/test/java/org/hornetq/tests/util/ServiceTestBase.java 2011-12-05 13:49:22 UTC (rev 11827)
@@ -16,8 +16,8 @@
import java.io.File;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -80,11 +80,42 @@
protected static final String NETTY_CONNECTOR_FACTORY = NettyConnectorFactory.class.getCanonicalName();
- private final List<ServerLocator> locators = new ArrayList<ServerLocator>();
+ private final Collection<ServerLocator> locators = new ArrayList<ServerLocator>();
+ private final Collection<HornetQServer> servers = new ArrayList<HornetQServer>();
+ private final Collection<ClientSessionFactory> sessionFactories = new ArrayList<ClientSessionFactory>();
+ private final Collection<ClientSession> clientSessions = new ArrayList<ClientSession>();
@Override
protected void tearDown() throws Exception
{
+ synchronized (clientSessions)
+ {
+ for (ClientSession cs : clientSessions)
+ {
+ try
+ {
+ if (cs != null)
+ {
+ cs.close();
+ }
+ }
+ catch (Exception e)
+ {
+ // no-op
+ }
+ }
+ clientSessions.clear();
+ }
+
+ synchronized (sessionFactories)
+ {
+ for (ClientSessionFactory sf : sessionFactories)
+ {
+ closeSessionFactory(sf);
+ }
+ sessionFactories.clear();
+ }
+
synchronized (locators)
{
for (ServerLocator locator : locators)
@@ -93,6 +124,16 @@
}
locators.clear();
}
+
+ synchronized (servers)
+ {
+ for (HornetQServer server : servers)
+ {
+ stopComponent(server);
+ }
+ servers.clear();
+ }
+
super.tearDown();
// checkFreePort(5445);
// checkFreePort(5446);
@@ -319,7 +360,8 @@
{
server = HornetQServers.newHornetQServer(configuration, mbeanServer, false);
}
-
+ try
+ {
for (Map.Entry<String, AddressSettings> setting : settings.entrySet())
{
server.getAddressSettingsRepository().addMatch(setting.getKey(), setting.getValue());
@@ -331,9 +373,39 @@
server.getAddressSettingsRepository().addMatch("#", defaultSetting);
+ return server;
+ }
+ finally
+ {
+ addServer(server);
+ }
+ }
+
+ protected HornetQServer addServer(HornetQServer server)
+ {
+ synchronized (servers)
+ {
+ servers.add(server);
+ }
return server;
}
+ protected ClientSession addClientSession(ClientSession session)
+ {
+ synchronized (clientSessions)
+ {
+ clientSessions.add(session);
+ }
+ return session;
+ }
+
+ protected void addSessionFactory(ClientSessionFactory sf)
+ {
+ synchronized (sessionFactories)
+ {
+ sessionFactories.add(sf);
+ }
+ }
protected HornetQServer createServer(final boolean realFiles,
final Configuration configuration,
final int pageSize,
@@ -360,7 +432,8 @@
{
server = HornetQServers.newHornetQServer(configuration, false);
}
-
+ try
+ {
if (settings != null)
{
for (Map.Entry<String, AddressSettings> setting : settings.entrySet())
@@ -377,6 +450,11 @@
server.getAddressSettingsRepository().addMatch("#", defaultSetting);
return server;
+ }
+ finally
+ {
+ addServer(server);
+ }
}
protected HornetQServer createServer(final boolean realFiles,
@@ -394,7 +472,8 @@
{
server = HornetQServers.newHornetQServer(configuration, mbeanServer, false);
}
-
+ try
+ {
for (Map.Entry<String, AddressSettings> setting : settings.entrySet())
{
server.getAddressSettingsRepository().addMatch(setting.getKey(), setting.getValue());
@@ -403,7 +482,14 @@
AddressSettings defaultSetting = new AddressSettings();
server.getAddressSettingsRepository().addMatch("#", defaultSetting);
+
+
return server;
+ }
+ finally
+ {
+ addServer(server);
+ }
}
protected HornetQServer createServer(final boolean realFiles)
@@ -451,7 +537,9 @@
securityManager,
nodeManager);
- server.setIdentity("Server " + id);
+ try
+ {
+ server.setIdentity("Server " + id);
for (Map.Entry<String, AddressSettings> setting : settings.entrySet())
{
@@ -465,6 +553,11 @@
server.getAddressSettingsRepository().addMatch("#", defaultSetting);
return server;
+ }
+ finally
+ {
+ addServer(server);
+ }
}
protected HornetQServer createServer(final boolean realFiles,
@@ -486,7 +579,8 @@
securityManager,
false);
}
-
+ try
+ {
Map<String, AddressSettings> settings = new HashMap<String, AddressSettings>();
for (Map.Entry<String, AddressSettings> setting : settings.entrySet())
@@ -498,7 +592,12 @@
server.getAddressSettingsRepository().addMatch("#", defaultSetting);
- return server;
+ return server;
+ }
+ finally
+ {
+ addServer(server);
+ }
}
protected HornetQServer createClusteredServerWithParams(final boolean isNetty,
@@ -561,6 +660,12 @@
}
}
+ protected final ClientSessionFactory createSessionFactory(ServerLocator locator) throws Exception
+ {
+ ClientSessionFactory sf = locator.createSessionFactory();
+ addSessionFactory(sf);
+ return sf;
+ }
protected void createQueue(final String address, final String queue) throws Exception
{
ServerLocator locator = createInVMNonHALocator();
@@ -586,16 +691,17 @@
{
ServerLocator locatorWithoutHA = isNetty ? HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(NETTY_CONNECTOR_FACTORY))
: HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
- addServerLocator(locatorWithoutHA);
- return locatorWithoutHA;
+ return addServerLocator(locatorWithoutHA);
+
}
- private void addServerLocator(ServerLocator locator)
+ protected ServerLocator addServerLocator(ServerLocator locator)
{
synchronized (locators)
{
locators.add(locator);
}
+ return locator;
}
protected ServerLocator createInVMLocator(final int serverID)
@@ -603,8 +709,7 @@
TransportConfiguration tnspConfig = createInVMTransportConnectorConfig(serverID, UUIDGenerator.getInstance().generateStringUUID());
ServerLocator locator = HornetQClient.createServerLocatorWithHA(tnspConfig);
- addServerLocator(locator);
- return locator;
+ return addServerLocator(locator);
}
/**
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/AckBatchSizeTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/AckBatchSizeTest.java 2011-12-05 12:43:53 UTC (rev 11826)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/AckBatchSizeTest.java 2011-12-05 13:49:22 UTC (rev 11827)
@@ -15,8 +15,12 @@
import junit.framework.Assert;
import org.hornetq.api.core.SimpleString;
-import org.hornetq.api.core.client.*;
-import org.hornetq.core.logging.Logger;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
import org.hornetq.tests.util.ServiceTestBase;
@@ -26,8 +30,6 @@
*/
public class AckBatchSizeTest extends ServiceTestBase
{
- private static final Logger log = Logger.getLogger(AckBatchSizeTest.class);
-
public final SimpleString addressA = new SimpleString("addressA");
public final SimpleString queueA = new SimpleString("queueA");
@@ -59,9 +61,6 @@
public void testAckBatchSize() throws Exception
{
HornetQServer server = createServer(false);
-
- try
- {
server.start();
ServerLocator locator = createInVMNonHALocator();
int numMessages = 100;
@@ -94,14 +93,6 @@
Assert.assertEquals(0, q.getDeliveringCount());
sendSession.close();
session.close();
- }
- finally
- {
- if (server.isStarted())
- {
- server.stop();
- }
- }
}
/*
@@ -111,8 +102,6 @@
{
HornetQServer server = createServer(false);
- try
- {
server.start();
ServerLocator locator = createInVMNonHALocator();
locator.setAckBatchSize(0);
@@ -145,13 +134,5 @@
}
sendSession.close();
session.close();
- }
- finally
- {
- if (server.isStarted())
- {
- server.stop();
- }
- }
}
}
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/AcknowledgeTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/AcknowledgeTest.java 2011-12-05 12:43:53 UTC (rev 11826)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/AcknowledgeTest.java 2011-12-05 13:49:22 UTC (rev 11827)
@@ -19,7 +19,13 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.MessageHandler;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
@@ -43,9 +49,7 @@
public void testReceiveAckLastMessageOnly() throws Exception
{
HornetQServer server = createServer(false);
- try
- {
- server.start();
+ server.start();
ServerLocator locator = createInVMNonHALocator();
locator.setAckBatchSize(0);
locator.setBlockOnAcknowledge(true);
@@ -73,21 +77,12 @@
Assert.assertEquals(0, q.getDeliveringCount());
session.close();
sendSession.close();
- }
- finally
- {
- if (server.isStarted())
- {
- server.stop();
- }
- }
}
public void testAsyncConsumerNoAck() throws Exception
{
HornetQServer server = createServer(false);
- try
- {
+
server.start();
ServerLocator locator = createInVMNonHALocator();
ClientSessionFactory cf = locator.createSessionFactory();;
@@ -101,10 +96,10 @@
{
cp.send(sendSession.createMessage(false));
}
-
+
Thread.sleep(500);
log.info("woke up");
-
+
final CountDownLatch latch = new CountDownLatch(numMessages);
session.start();
cc.setMessageHandler(new MessageHandler()
@@ -121,22 +116,12 @@
Assert.assertEquals(numMessages, q.getDeliveringCount());
sendSession.close();
session.close();
- }
- finally
- {
- if (server.isStarted())
- {
- server.stop();
}
- }
- }
public void testAsyncConsumerAck() throws Exception
{
HornetQServer server = createServer(false);
- try
- {
- server.start();
+ server.start();
ServerLocator locator = createInVMNonHALocator();
locator.setBlockOnAcknowledge(true);
locator.setAckBatchSize(0);
@@ -180,22 +165,12 @@
Assert.assertEquals(0, q.getDeliveringCount());
sendSession.close();
session.close();
- }
- finally
- {
- if (server.isStarted())
- {
- server.stop();
- }
- }
}
public void testAsyncConsumerAckLastMessageOnly() throws Exception
{
HornetQServer server = createServer(false);
- try
- {
- server.start();
+ server.start();
ServerLocator locator = createInVMNonHALocator();
locator.setBlockOnAcknowledge(true);
locator.setAckBatchSize(0);
@@ -242,14 +217,6 @@
Assert.assertEquals(0, q.getDeliveringCount());
sendSession.close();
session.close();
- }
- finally
- {
- if (server.isStarted())
- {
- server.stop();
- }
- }
}
}
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/AddressSettingsTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/AddressSettingsTest.java 2011-12-05 12:43:53 UTC (rev 11826)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/AddressSettingsTest.java 2011-12-05 13:49:22 UTC (rev 11827)
@@ -15,7 +15,12 @@
import junit.framework.Assert;
import org.hornetq.api.core.SimpleString;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.settings.HierarchicalRepository;
import org.hornetq.core.settings.impl.AddressSettings;
@@ -57,8 +62,7 @@
public void testSimpleHierarchyWithDLA() throws Exception
{
HornetQServer server = createServer(false);
- try
- {
+
server.start();
AddressSettings addressSettings = new AddressSettings();
addressSettings.setDeadLetterAddress(dlaA);
@@ -108,21 +112,13 @@
Assert.assertEquals("B", message.getBodyBuffer().readString());
sendSession.close();
session.close();
- }
- finally
- {
- if (server.isStarted())
- {
- server.stop();
- }
- }
+
}
public void test2LevelHierarchyWithDLA() throws Exception
{
HornetQServer server = createServer(false);
- try
- {
+
server.start();
AddressSettings addressSettings = new AddressSettings();
addressSettings.setDeadLetterAddress(dlaA);
@@ -172,22 +168,13 @@
Assert.assertEquals("B", message.getBodyBuffer().readString());
sendSession.close();
session.close();
- }
- finally
- {
- if (server.isStarted())
- {
- server.stop();
- }
- }
+
}
public void test2LevelWordHierarchyWithDLA() throws Exception
{
HornetQServer server = createServer(false);
- try
- {
- server.start();
+ server.start();
AddressSettings addressSettings = new AddressSettings();
addressSettings.setDeadLetterAddress(dlaA);
addressSettings.setMaxDeliveryAttempts(1);
@@ -236,21 +223,12 @@
Assert.assertEquals("B", message.getBodyBuffer().readString());
sendSession.close();
session.close();
- }
- finally
- {
- if (server.isStarted())
- {
- server.stop();
}
- }
- }
public void test3LevelHierarchyWithDLA() throws Exception
{
HornetQServer server = createServer(false);
- try
- {
+
server.start();
AddressSettings addressSettings = new AddressSettings();
addressSettings.setDeadLetterAddress(dlaA);
@@ -319,21 +297,13 @@
Assert.assertEquals("C", message.getBodyBuffer().readString());
sendSession.close();
session.close();
- }
- finally
- {
- if (server.isStarted())
- {
- server.stop();
- }
- }
+
}
public void testOverrideHierarchyWithDLA() throws Exception
{
HornetQServer server = createServer(false);
- try
- {
+
server.start();
AddressSettings addressSettings = new AddressSettings();
addressSettings.setMaxDeliveryAttempts(1);
@@ -392,13 +362,6 @@
Assert.assertNotNull(message);
sendSession.close();
session.close();
- }
- finally
- {
- if (server.isStarted())
- {
- server.stop();
- }
- }
+
}
}
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/AutogroupIdTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/AutogroupIdTest.java 2011-12-05 12:43:53 UTC (rev 11826)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/AutogroupIdTest.java 2011-12-05 13:49:22 UTC (rev 11827)
@@ -18,7 +18,13 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.MessageHandler;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.tests.util.ServiceTestBase;
@@ -49,8 +55,7 @@
public void testGroupIdAutomaticallySet() throws Exception
{
HornetQServer server = createServer(false);
- try
- {
+
server.start();
ServerLocator locator = createInVMNonHALocator();
@@ -73,7 +78,7 @@
consumer2.setMessageHandler(myMessageHandler2);
log.info("starting session");
-
+
session.start();
final int numMessages = 100;
@@ -85,19 +90,11 @@
latch.await();
session.close();
-
+
log.info(myMessageHandler2.messagesReceived);
Assert.assertEquals(100, myMessageHandler.messagesReceived);
Assert.assertEquals(0, myMessageHandler2.messagesReceived);
- }
- finally
- {
- if (server.isStarted())
- {
- server.stop();
- }
- }
}
@@ -107,8 +104,7 @@
public void testGroupIdAutomaticallySetMultipleProducers() throws Exception
{
HornetQServer server = createServer(false);
- try
- {
+
server.start();
@@ -154,15 +150,6 @@
Assert.assertEquals(myMessageHandler.messagesReceived, 100);
Assert.assertEquals(myMessageHandler2.messagesReceived, 100);
Assert.assertEquals(myMessageHandler3.messagesReceived, 0);
- }
- finally
- {
- if (server.isStarted())
- {
- server.stop();
- }
- }
-
}
/*
@@ -171,8 +158,7 @@
public void testGroupIdAutomaticallyNotSet() throws Exception
{
HornetQServer server = createServer(false);
- try
- {
+
server.start();
@@ -209,14 +195,6 @@
Assert.assertEquals(50, myMessageHandler.messagesReceived);
Assert.assertEquals(50, myMessageHandler2.messagesReceived);
- }
- finally
- {
- if (server.isStarted())
- {
- server.stop();
- }
- }
}
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/AlmostLargeAsynchronousFailoverTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/AlmostLargeAsynchronousFailoverTest.java 2011-12-05 12:43:53 UTC (rev 11826)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/AlmostLargeAsynchronousFailoverTest.java 2011-12-05 13:49:22 UTC (rev 11827)
@@ -26,20 +26,7 @@
public class AlmostLargeAsynchronousFailoverTest extends AsynchronousFailoverTest
{
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
+ @Override
protected void createConfigs() throws Exception
{
super.createConfigs();
@@ -47,6 +34,7 @@
backupServer.getServer().getConfiguration().setJournalFileSize(1024 * 1024);
}
+ @Override
protected ServerLocatorInternal getServerLocator() throws Exception
{
ServerLocatorInternal locator = super.getServerLocator();
@@ -55,13 +43,10 @@
return locator;
}
+ @Override
protected void addPayload(ClientMessage message)
{
message.putBytesProperty("payload", new byte[20 * 1024]);
}
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
}
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java 2011-12-05 12:43:53 UTC (rev 11826)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java 2011-12-05 13:49:22 UTC (rev 11827)
@@ -227,14 +227,10 @@
@Override
protected void tearDown() throws Exception
{
- if (sessionFactory != null)
- sessionFactory.close();
if (session != null)
session.close();
if (producer != null)
producer.close();
- closeServerLocator(locator);
-
super.tearDown();
}
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java 2011-12-05 12:43:53 UTC (rev 11826)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java 2011-12-05 13:49:22 UTC (rev 11827)
@@ -55,13 +55,6 @@
locator = getServerLocator();
}
- @Override
- protected void tearDown() throws Exception
- {
- closeServerLocator(locator);
- super.tearDown();
- }
-
public void testAutoFailback() throws Exception
{
locator.setBlockOnNonDurableSend(true);
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverOnFlowControlTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverOnFlowControlTest.java 2011-12-05 12:43:53 UTC (rev 11826)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverOnFlowControlTest.java 2011-12-05 13:49:22 UTC (rev 11827)
@@ -41,21 +41,9 @@
*/
public class FailoverOnFlowControlTest extends FailoverTestBase
{
-
private static Logger log = Logger.getLogger(FailoverOnFlowControlTest.class);
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
-
public void testOverflowSend() throws Exception
{
ServerLocator locator = getServerLocator();
@@ -129,8 +117,6 @@
}
session.close();
-
- locator.close();
}
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2011-12-05 12:43:53 UTC (rev 11826)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2011-12-05 13:49:22 UTC (rev 11827)
@@ -90,30 +90,22 @@
locator = getServerLocator();
}
- @Override
- protected void tearDown() throws Exception
- {
- closeSessionFactory();
- closeServerLocator(locator);
- super.tearDown();
- }
-
protected ClientSession createSession(ClientSessionFactory sf,
boolean autoCommitSends,
boolean autoCommitAcks,
int ackBatchSize) throws Exception
{
- return sf.createSession(autoCommitSends, autoCommitAcks, ackBatchSize);
+ return addClientSession(sf.createSession(autoCommitSends, autoCommitAcks, ackBatchSize));
}
protected ClientSession createSession(ClientSessionFactory sf, boolean autoCommitSends, boolean autoCommitAcks) throws Exception
{
- return sf.createSession(autoCommitSends, autoCommitAcks);
+ return addClientSession(sf.createSession(autoCommitSends, autoCommitAcks));
}
protected ClientSession createSession(ClientSessionFactory sf) throws Exception
{
- return sf.createSession();
+ return addClientSession(sf.createSession());
}
protected ClientSession createSession(ClientSessionFactory sf,
@@ -121,7 +113,7 @@
boolean autoCommitSends,
boolean autoCommitAcks) throws Exception
{
- return sf.createSession(xa, autoCommitSends, autoCommitAcks);
+ return addClientSession(sf.createSession(xa, autoCommitSends, autoCommitAcks));
}
@Override
@@ -143,7 +135,7 @@
locator.setAckBatchSize(0);
locator.setReconnectAttempts(-1);
- sf = (ClientSessionFactoryInternal)locator.createSessionFactory();
+ createClientSessionFactory();
ClientSession session = createSession(sf, true, true);
@@ -206,6 +198,15 @@
Assert.assertTrue(retry <= 5);
}
+ /**
+ * @throws Exception
+ */
+ private void createClientSessionFactory() throws Exception
+ {
+ sf = (ClientSessionFactoryInternal)locator.createSessionFactory();
+ addSessionFactory(sf);
+ }
+
public void testNonTransacted() throws Exception
{
@@ -962,7 +963,7 @@
Thread.sleep(5000);
- sf = (ClientSessionFactoryInternal)locator.createSessionFactory();
+ createClientSessionFactory();
session = sendAndConsume(sf, true);
@@ -1239,7 +1240,7 @@
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
locator.setReconnectAttempts(-1);
- sf = (ClientSessionFactoryInternal)locator.createSessionFactory();
+ createClientSessionFactory();
// Add an interceptor to delay the send method so we can get time to cause failover before it returns
@@ -1433,15 +1434,6 @@
session2.close();
}
- private void closeSessionFactory()
- {
- if (sf == null)
- return;
- closeSessionFactory(sf);
- Assert.assertEquals(0, sf.numSessions());
- Assert.assertEquals(0, sf.numConnections());
- }
-
public void testCommitDidNotOccurUnblockedAndResend() throws Exception
{
locator.setBlockOnNonDurableSend(true);
@@ -1661,7 +1653,7 @@
sf.close();
- sf = (ClientSessionFactoryInternal)locator.createSessionFactory();
+ createClientSessionFactory();
session = createSession(sf);
@@ -1722,7 +1714,7 @@
sf.close();
- sf = (ClientSessionFactoryInternal)locator.createSessionFactory();
+ createClientSessionFactory();
session = createSession(sf);
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-12-05 12:43:53 UTC (rev 11826)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-12-05 13:49:22 UTC (rev 11827)
@@ -16,7 +16,6 @@
import java.io.IOException;
import java.net.ServerSocket;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -31,7 +30,6 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ClientSession;
-import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
@@ -83,8 +81,6 @@
protected NodeManager nodeManager;
protected boolean startBackupServer = true;
- private final Collection<ServerLocator> serverLocators = new ArrayList<ServerLocator>();
- private final Collection<ClientSessionFactory> sessionFactories = new ArrayList<ClientSessionFactory>();
// Static --------------------------------------------------------
@@ -130,6 +126,7 @@
waitForServer(backupServer.getServer());
}
}
+
}
protected TestableServer createServer(Configuration config)
@@ -235,28 +232,10 @@
protected void tearDown() throws Exception
{
logAndSystemOut("#test tearDown");
- stopComponent(backupServer);
- stopComponent(liveServer);
- synchronized (sessionFactories)
- {
- for (ClientSessionFactory sf : sessionFactories)
- {
- closeSessionFactory(sf);
- }
- sessionFactories.clear();
- }
+ InVMConnector.failOnCreateConnection = false;
- synchronized (serverLocators)
- {
- for (ServerLocator locator : serverLocators)
- {
- closeServerLocator(locator);
- }
- serverLocators.clear();
- }
-
-
+ super.tearDown();
Assert.assertEquals(0, InVMRegistry.instance.size());
backupServer = null;
@@ -265,9 +244,6 @@
nodeManager = null;
- InVMConnector.failOnCreateConnection = false;
-
- super.tearDown();
try
{
ServerSocket serverSocket = new ServerSocket(5445);
@@ -297,10 +273,7 @@
locator.addClusterTopologyListener(new LatchClusterTopologyListener(countDownLatch));
sf = (ClientSessionFactoryInternal) locator.createSessionFactory();
- synchronized (sessionFactories)
- {
- sessionFactories.add(sf);
- }
+ addSessionFactory(sf);
assertTrue("topology members expected " + topologyMembers, countDownLatch.await(5, TimeUnit.SECONDS));
return sf;
}
@@ -397,10 +370,7 @@
protected ServerLocatorInternal getServerLocator() throws Exception
{
ServerLocator locator = HornetQClient.createServerLocatorWithHA(getConnectorTransportConfiguration(true), getConnectorTransportConfiguration(false));
- synchronized (serverLocators)
- {
- serverLocators.add(locator);
- }
+ addServerLocator(locator);
return (ServerLocatorInternal) locator;
}
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/LargeMessageFailoverTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/LargeMessageFailoverTest.java 2011-12-05 12:43:53 UTC (rev 11826)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/LargeMessageFailoverTest.java 2011-12-05 13:49:22 UTC (rev 11827)
@@ -16,7 +16,6 @@
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.client.impl.ServerLocatorInternal;
-import org.hornetq.core.logging.Logger;
/**
* A LargeMessageFailoverTest
@@ -28,22 +27,6 @@
public class LargeMessageFailoverTest extends FailoverTest
{
- // Constants -----------------------------------------------------
-
- private static final Logger log = Logger.getLogger(LargeMessageFailoverTest.class);
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
public LargeMessageFailoverTest(final String name)
{
super(name);
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java 2011-12-05 12:43:53 UTC (rev 11826)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java 2011-12-05 13:49:22 UTC (rev 11827)
@@ -214,9 +214,8 @@
@Override
protected HornetQServer createServer(final boolean realFiles, final Configuration configuration)
{
- return createInVMFailoverServer(true, configuration, PAGE_SIZE, PAGE_MAX, new HashMap<String, AddressSettings>(),
- nodeManager,
- 2);
+ return addServer(createInVMFailoverServer(true, configuration, PAGE_SIZE, PAGE_MAX,
+ new HashMap<String, AddressSettings>(), nodeManager, 2));
}
@Override
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedFailoverTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedFailoverTest.java 2011-12-05 12:43:53 UTC (rev 11826)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedFailoverTest.java 2011-12-05 13:49:22 UTC (rev 11827)
@@ -23,28 +23,9 @@
public class ReplicatedFailoverTest extends FailoverTest
{
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
@Override
protected void createConfigs() throws Exception
{
createReplicatedConfigs();
}
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
}
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedNettyAsynchronousFailoverTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedNettyAsynchronousFailoverTest.java 2011-12-05 12:43:53 UTC (rev 11826)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedNettyAsynchronousFailoverTest.java 2011-12-05 13:49:22 UTC (rev 11827)
@@ -13,10 +13,6 @@
package org.hornetq.tests.integration.cluster.failover;
-import org.hornetq.core.config.Configuration;
-import org.hornetq.tests.integration.cluster.util.SameProcessHornetQServer;
-import org.hornetq.tests.integration.cluster.util.TestableServer;
-
/**
* A ReplicatedNettyAsynchronousFailoverTest
*
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/SecurityFailoverTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/SecurityFailoverTest.java 2011-12-05 12:43:53 UTC (rev 11826)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/SecurityFailoverTest.java 2011-12-05 13:49:22 UTC (rev 11827)
@@ -57,13 +57,16 @@
boolean autoCommitAcks,
int ackBatchSize) throws Exception
{
- return sf.createSession("a",
+ ClientSession session =
+ sf.createSession("a",
"b",
isXA,
autoCommitSends,
autoCommitAcks,
sf.getServerLocator().isPreAcknowledge(),
ackBatchSize);
+ addClientSession(session);
+ return session;
}
@Override
@@ -72,8 +75,11 @@
boolean autoCommitAcks,
int ackBatchSize) throws Exception
{
- return sf.createSession("a", "b", false, autoCommitSends, autoCommitAcks, sf.getServerLocator()
+ ClientSession session =
+ sf.createSession("a", "b", false, autoCommitSends, autoCommitAcks, sf.getServerLocator()
.isPreAcknowledge(), ackBatchSize);
+ addClientSession(session);
+ return session;
}
@Override
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/remoting/BatchDelayTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/remoting/BatchDelayTest.java 2011-12-05 12:43:53 UTC (rev 11826)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/remoting/BatchDelayTest.java 2011-12-05 13:49:22 UTC (rev 11827)
@@ -17,18 +17,21 @@
import java.util.Map;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
-import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
-import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
import org.hornetq.core.remoting.impl.netty.TransportConstants;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.tests.util.ServiceTestBase;
/**
- *
+ *
* A BatchDelayTest
*
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
@@ -38,12 +41,8 @@
public class BatchDelayTest extends ServiceTestBase
{
- // Constants -----------------------------------------------------
-
- private static final Logger log = Logger.getLogger(BatchDelayTest.class);
-
private static final long DELAY = 500;
-
+
// Attributes ----------------------------------------------------
private HornetQServer server;
@@ -76,16 +75,6 @@
server.start();
}
- @Override
- protected void tearDown() throws Exception
- {
- server.stop();
-
- server = null;
-
- super.tearDown();
- }
-
protected ClientSessionFactory createSessionFactory() throws Exception
{
Map<String, Object> params = new HashMap<String, Object>();
@@ -93,7 +82,7 @@
ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(ServiceTestBase.NETTY_CONNECTOR_FACTORY, params));
ClientSessionFactory sf = locator.createSessionFactory();
-
+ addSessionFactory(sf);
return sf;
}
@@ -130,8 +119,6 @@
msg.acknowledge();
}
-
- sf.close();
}
public void testSendReceiveOne() throws Exception
@@ -157,10 +144,8 @@
msg = cons.receive(10000);
assertNotNull(msg);
-
- msg.acknowledge();
- sf.close();
+ msg.acknowledge();
}
// Private -------------------------------------------------------
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/xa/BasicXaTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/xa/BasicXaTest.java 2011-12-05 12:43:53 UTC (rev 11826)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/xa/BasicXaTest.java 2011-12-05 13:49:22 UTC (rev 11827)
@@ -25,7 +25,13 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.MessageHandler;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.HornetQServer;
@@ -133,103 +139,103 @@
super.tearDown();
}
-
-
+
+
public void testSendWithoutXID() throws Exception
{
// Since both resources have same RM, TM will probably use 1PC optimization
ServerLocator locator = createInVMNonHALocator();
ClientSessionFactory factory = locator.createSessionFactory();
-
+
ClientSession session = null;
-
+
try
{
-
+
session = factory.createSession(true, false, false);
session.createQueue("Test", "Test");
-
+
ClientProducer prod = session.createProducer("Test");
prod.send(session.createMessage(true));
-
+
session.start();
-
+
ClientConsumer cons = session.createConsumer("Test");
-
+
assertNotNull("Send went through an invalid XA Session", cons.receiveImmediate());
}
finally
{
factory.close();
-
+
session.close();
}
}
-
+
public void testACKWithoutXID() throws Exception
{
// Since both resources have same RM, TM will probably use 1PC optimization
ClientSessionFactory factory = locator.createSessionFactory();
-
+
ClientSession session = null;
-
+
try
{
-
+
session = factory.createSession(false, true, true);
session.createQueue("Test", "Test");
-
+
ClientProducer prod = session.createProducer("Test");
prod.send(session.createMessage(true));
-
+
session.close();
-
+
session = factory.createSession(true, false, false);
-
+
session.start();
-
+
ClientConsumer cons = session.createConsumer("Test");
-
+
ClientMessage msg = cons.receive(5000);
-
+
assertNotNull(msg);
-
+
msg.acknowledge();
-
+
session.close();
-
+
session = factory.createSession(false, false, false);
-
+
session.start();
-
+
cons = session.createConsumer("Test");
-
+
msg = cons.receiveImmediate();
-
+
assertNull("Acknowledge went through invalid XA Session", msg);
-
-
+
+
}
finally
{
factory.close();
-
+
session.close();
}
}
-
+
public void testIsSameRM() throws Exception
{
ServerLocator locator = createNettyNonHALocator();
@@ -258,8 +264,8 @@
session2.close();
}
-
+
public void testXAInterleaveResourceSuspendWorkCommit() throws Exception
{
Xid xid = newXID();
@@ -315,7 +321,7 @@
clientSession.prepare(xid3);
clientSession.commit(xid3, false);
}
-
+
public void testSendPrepareDoesntRollbackOnClose() throws Exception
{
Xid xid = newXID();
@@ -338,7 +344,7 @@
clientSession = sessionFactory.createSession(true, false, false);
log.info("committing");
-
+
clientSession.commit(xid, false);
clientSession.start();
ClientConsumer clientConsumer = clientSession.createConsumer(atestq);
@@ -406,7 +412,7 @@
clientSession2.close();
}
-
+
public void testReceiveRollback() throws Exception
{
int numSessions = 100;
@@ -986,7 +992,6 @@
}
}
- private static volatile int received = 0;
class TxMessageHandler implements MessageHandler
{
13 years, 1 month
JBoss hornetq SVN: r11826 - trunk/hornetq-journal/src/main/java/org/hornetq/core/asyncio/impl.
by do-not-reply@jboss.org
Author: borges
Date: 2011-12-05 07:43:53 -0500 (Mon, 05 Dec 2011)
New Revision: 11826
Modified:
trunk/hornetq-journal/src/main/java/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java
Log:
Remove spurious @SuppressWarnings
Modified: trunk/hornetq-journal/src/main/java/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java
===================================================================
--- trunk/hornetq-journal/src/main/java/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java 2011-12-05 12:43:42 UTC (rev 11825)
+++ trunk/hornetq-journal/src/main/java/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java 2011-12-05 12:43:53 UTC (rev 11826)
@@ -34,9 +34,9 @@
import org.hornetq.utils.ReusableLatch;
/**
- *
+ *
* AsynchronousFile implementation
- *
+ *
* @author clebert.suconic(a)jboss.com
* Warning: Case you refactor the name or the package of this class
* You need to make sure you also rename the C++ native calls
@@ -62,7 +62,7 @@
* This is accessed from a single thread (the Poller Thread) */
private long nextReadSequence = 0;
- /**
+ /**
* AIO can't guarantee ordering over callbacks.
* We use thie PriorityQueue to hold values until they are in order
*/
@@ -161,7 +161,7 @@
private Semaphore maxIOSemaphore;
private BufferCallback bufferCallback;
-
+
/** A callback for IO errors when they happen */
private final IOExceptionListener ioExceptionListener;
@@ -282,8 +282,8 @@
writeLock.unlock();
}
}
-
-
+
+
public void writeInternal(long positionToWrite, long size, ByteBuffer bytes) throws HornetQException
{
try
@@ -426,7 +426,7 @@
}
/**
- * This needs to be synchronized because of
+ * This needs to be synchronized because of
* http://bugs.sun.com/view_bug.do?bug_id=6791815
* http://mail.openjdk.java.net/pipermail/hotspot-runtime-dev/2009-January/0...
*/
@@ -470,8 +470,6 @@
// Private ---------------------------------------------------------------------------
- /** */
- @SuppressWarnings("unused")
private void callbackDone(final AIOCallback callback, final long sequence, final ByteBuffer buffer)
{
maxIOSemaphore.release();
@@ -540,7 +538,7 @@
final String errorMessage)
{
AsynchronousFileImpl.log.warn("CallbackError: " + errorMessage);
-
+
fireExceptionListener(errorCode, errorMessage);
maxIOSemaphore.release();
@@ -648,7 +646,7 @@
// completely done, or we might get beautiful GPFs
pollerLatch.await();
}
-
+
public static FileLock lock(int handle)
{
if (flock(handle))
@@ -662,13 +660,13 @@
}
// Native ----------------------------------------------------------------------------
-
-
+
+
// Functions used for locking files .....
public static native int openFile(String fileName);
-
+
public static native void closeFile(int handle);
-
+
private static native boolean flock(int handle);
// Functions used for locking files ^^^^^^^^
13 years, 1 month
JBoss hornetq SVN: r11825 - trunk/hornetq-ra/hornetq-ra-rar.
by do-not-reply@jboss.org
Author: borges
Date: 2011-12-05 07:43:42 -0500 (Mon, 05 Dec 2011)
New Revision: 11825
Modified:
trunk/hornetq-ra/hornetq-ra-rar/pom.xml
Log:
Fix Maven warning.
Modified: trunk/hornetq-ra/hornetq-ra-rar/pom.xml
===================================================================
--- trunk/hornetq-ra/hornetq-ra-rar/pom.xml 2011-12-05 12:43:33 UTC (rev 11824)
+++ trunk/hornetq-ra/hornetq-ra-rar/pom.xml 2011-12-05 12:43:42 UTC (rev 11825)
@@ -56,11 +56,10 @@
</exclusion>
</exclusions>
</dependency>
- <dependency>
+ <dependency>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
- <version>3.2.3.Final</version>
- </dependency>
+ </dependency>
</dependencies>
<build>
13 years, 1 month
JBoss hornetq SVN: r11824 - trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client.
by do-not-reply@jboss.org
Author: borges
Date: 2011-12-05 07:43:33 -0500 (Mon, 05 Dec 2011)
New Revision: 11824
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionTest.java
Log:
Move closing of resources to tearDown.
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionTest.java 2011-12-05 12:43:14 UTC (rev 11823)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/SessionTest.java 2011-12-05 12:43:33 UTC (rev 11824)
@@ -20,9 +20,15 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSession.BindingQuery;
import org.hornetq.api.core.client.ClientSession.QueueQuery;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.api.core.client.SessionFailureListener;
import org.hornetq.core.client.impl.ClientSessionInternal;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
@@ -39,6 +45,8 @@
private final String queueName = "ClientSessionTestQ";
private ServerLocator locator;
+ private HornetQServer server;
+ private ClientSessionFactory cf;
@Override
protected void setUp() throws Exception
@@ -46,23 +54,23 @@
super.setUp();
locator = createInVMNonHALocator();
+ server = createServer(false);
+ server.start();
}
@Override
protected void tearDown() throws Exception
{
- locator.close();
-
+ stopComponent(server);
+ closeSessionFactory(cf);
+ closeServerLocator(locator);
super.tearDown();
}
public void testFailureListener() throws Exception
{
- HornetQServer server = createServer(false);
- try
- {
- server.start();
- ClientSessionFactory cf = locator.createSessionFactory();
+
+ cf = locator.createSessionFactory();
ClientSession clientSession = cf.createSession(false, true, true);
final CountDownLatch latch = new CountDownLatch(1);
clientSession.addFailureListener(new SessionFailureListener()
@@ -84,23 +92,11 @@
// not really part of the test,
// we still clean up resources left in the VM
clientSession.close();
- }
- finally
- {
- if (server.isStarted())
- {
- server.stop();
}
- }
- }
public void testFailureListenerRemoved() throws Exception
{
- HornetQServer server = createServer(false);
- try
- {
- server.start();
- ClientSessionFactory cf = locator.createSessionFactory();
+ cf = locator.createSessionFactory();
ClientSession clientSession = cf.createSession(false, true, true);
class MyFailureListener implements SessionFailureListener
{
@@ -123,28 +119,18 @@
clientSession.close();
server.stop();
Assert.assertFalse(listener.called);
- }
- finally
- {
- if (server.isStarted())
- {
- server.stop();
- }
- }
}
// Closing a session if the underlying remoting connection is deaad should cleanly
// release all resources
public void testCloseSessionOnDestroyedConnection() throws Exception
{
- HornetQServer server = createServer(false);
- try
- {
// Make sure we have a short connection TTL so sessions will be quickly closed on the server
- long ttl = 500;
+ server.stop();
+ long ttl = 500;
server.getConfiguration().setConnectionTTLOverride(ttl);
server.start();
- ClientSessionFactory cf = locator.createSessionFactory();
+ cf = locator.createSessionFactory();
ClientSessionInternal clientSession = (ClientSessionInternal)cf.createSession(false, true, true);
clientSession.createQueue(queueName, queueName, false);
ClientProducer producer = clientSession.createProducer();
@@ -178,23 +164,11 @@
Thread.sleep(50);
}
- }
- finally
- {
- if (server.isStarted())
- {
- server.stop();
- }
- }
}
public void testBindingQuery() throws Exception
{
- HornetQServer server = createServer(false);
- try
- {
- server.start();
- ClientSessionFactory cf = locator.createSessionFactory();
+ cf = locator.createSessionFactory();
ClientSession clientSession = cf.createSession(false, true, true);
clientSession.createQueue("a1", "q1", false);
clientSession.createQueue("a1", "q2", false);
@@ -216,23 +190,11 @@
Assert.assertTrue(queues.contains(new SimpleString("q4")));
Assert.assertTrue(queues.contains(new SimpleString("q5")));
clientSession.close();
- }
- finally
- {
- if (server.isStarted())
- {
- server.stop();
- }
- }
}
public void testQueueQuery() throws Exception
{
- HornetQServer server = createServer(false);
- try
- {
- server.start();
- ClientSessionFactory cf = locator.createSessionFactory();
+ cf = locator.createSessionFactory();
ClientSession clientSession = cf.createSession(false, true, true);
clientSession.createQueue("a1", queueName, false);
clientSession.createConsumer(queueName);
@@ -246,23 +208,11 @@
Assert.assertEquals(2, resp.getMessageCount());
Assert.assertEquals(null, resp.getFilterString());
clientSession.close();
- }
- finally
- {
- if (server.isStarted())
- {
- server.stop();
- }
- }
}
public void testQueueQueryWithFilter() throws Exception
{
- HornetQServer server = createServer(false);
- try
- {
- server.start();
- ClientSessionFactory cf = locator.createSessionFactory();
+ cf = locator.createSessionFactory();
ClientSession clientSession = cf.createSession(false, true, true);
clientSession.createQueue("a1", queueName, "foo=bar", false);
clientSession.createConsumer(queueName);
@@ -273,45 +223,21 @@
Assert.assertEquals(0, resp.getMessageCount());
Assert.assertEquals(new SimpleString("foo=bar"), resp.getFilterString());
clientSession.close();
- }
- finally
- {
- if (server.isStarted())
- {
- server.stop();
- }
- }
}
public void testQueueQueryNoQ() throws Exception
{
- HornetQServer server = createServer(false);
- try
- {
- server.start();
- ClientSessionFactory cf = locator.createSessionFactory();
+ cf = locator.createSessionFactory();
ClientSession clientSession = cf.createSession(false, true, true);
QueueQuery resp = clientSession.queueQuery(new SimpleString(queueName));
Assert.assertFalse(resp.isExists());
Assert.assertEquals(null, resp.getAddress());
clientSession.close();
- }
- finally
- {
- if (server.isStarted())
- {
- server.stop();
- }
- }
}
public void testClose() throws Exception
{
- HornetQServer server = createServer(false);
- try
- {
- server.start();
- ClientSessionFactory cf = locator.createSessionFactory();
+ cf = locator.createSessionFactory();
ClientSession clientSession = cf.createSession(false, true, true);
clientSession.createQueue(queueName, queueName, false);
ClientProducer p = clientSession.createProducer();
@@ -324,86 +250,38 @@
Assert.assertTrue(p1.isClosed());
Assert.assertTrue(c.isClosed());
Assert.assertTrue(c1.isClosed());
- }
- finally
- {
- if (server.isStarted())
- {
- server.stop();
- }
- }
}
public void testCreateMessageNonDurable() throws Exception
{
- HornetQServer server = createServer(false);
- try
- {
- server.start();
- ClientSessionFactory cf = locator.createSessionFactory();
+ cf = locator.createSessionFactory();
ClientSession clientSession = cf.createSession(false, true, true);
ClientMessage clientMessage = clientSession.createMessage(false);
Assert.assertFalse(clientMessage.isDurable());
clientSession.close();
- }
- finally
- {
- if (server.isStarted())
- {
- server.stop();
- }
- }
}
public void testCreateMessageDurable() throws Exception
{
- HornetQServer server = createServer(false);
- try
- {
- server.start();
- ClientSessionFactory cf = locator.createSessionFactory();
+ cf = locator.createSessionFactory();
ClientSession clientSession = cf.createSession(false, true, true);
ClientMessage clientMessage = clientSession.createMessage(true);
Assert.assertTrue(clientMessage.isDurable());
clientSession.close();
- }
- finally
- {
- if (server.isStarted())
- {
- server.stop();
}
- }
- }
public void testCreateMessageType() throws Exception
{
- HornetQServer server = createServer(false);
- try
- {
- server.start();
- ClientSessionFactory cf = locator.createSessionFactory();
+ cf = locator.createSessionFactory();
ClientSession clientSession = cf.createSession(false, true, true);
ClientMessage clientMessage = clientSession.createMessage((byte)99, false);
Assert.assertEquals((byte)99, clientMessage.getType());
clientSession.close();
- }
- finally
- {
- if (server.isStarted())
- {
- server.stop();
- }
- }
}
public void testCreateMessageOverrides() throws Exception
{
- HornetQServer server = createServer(false);
- try
- {
- server.start();
- ClientSessionFactory cf = locator.createSessionFactory();
+ cf = locator.createSessionFactory();
ClientSession clientSession = cf.createSession(false, true, true);
ClientMessage clientMessage = clientSession.createMessage((byte)88, false, 100l, 300l, (byte)33);
Assert.assertEquals((byte)88, clientMessage.getType());
@@ -411,86 +289,38 @@
Assert.assertEquals(300l, clientMessage.getTimestamp());
Assert.assertEquals((byte)33, clientMessage.getPriority());
clientSession.close();
- }
- finally
- {
- if (server.isStarted())
- {
- server.stop();
- }
- }
}
public void testGetVersion() throws Exception
{
- HornetQServer server = createServer(false);
- try
- {
- server.start();
- ClientSessionFactory cf = locator.createSessionFactory();
+ cf = locator.createSessionFactory();
ClientSession clientSession = cf.createSession(false, true, true);
Assert.assertEquals(server.getVersion().getIncrementingVersion(), clientSession.getVersion());
clientSession.close();
- }
- finally
- {
- if (server.isStarted())
- {
- server.stop();
- }
- }
}
public void testStart() throws Exception
{
- HornetQServer server = createServer(false);
- try
- {
- server.start();
- ClientSessionFactory cf = locator.createSessionFactory();
+ cf = locator.createSessionFactory();
ClientSession clientSession = cf.createSession(false, true, true);
clientSession.createQueue(queueName, queueName, false);
clientSession.start();
clientSession.close();
- }
- finally
- {
- if (server.isStarted())
- {
- server.stop();
- }
- }
}
public void testStop() throws Exception
{
- HornetQServer server = createServer(false);
- try
- {
- server.start();
- ClientSessionFactory cf = locator.createSessionFactory();
+ cf = locator.createSessionFactory();
ClientSession clientSession = cf.createSession(false, true, true);
clientSession.createQueue(queueName, queueName, false);
clientSession.start();
clientSession.stop();
clientSession.close();
- }
- finally
- {
- if (server.isStarted())
- {
- server.stop();
- }
- }
}
public void testCommitWithSend() throws Exception
{
- HornetQServer server = createServer(false);
- try
- {
- server.start();
- ClientSessionFactory cf = locator.createSessionFactory();
+ cf = locator.createSessionFactory();
ClientSession clientSession = cf.createSession(false, false, true);
clientSession.createQueue(queueName, queueName, false);
ClientProducer cp = clientSession.createProducer(queueName);
@@ -509,23 +339,11 @@
clientSession.commit();
Assert.assertEquals(10, q.getMessageCount());
clientSession.close();
- }
- finally
- {
- if (server.isStarted())
- {
- server.stop();
- }
- }
}
public void testRollbackWithSend() throws Exception
{
- HornetQServer server = createServer(false);
- try
- {
- server.start();
- ClientSessionFactory cf = locator.createSessionFactory();
+ cf = locator.createSessionFactory();
ClientSession clientSession = cf.createSession(false, false, true);
clientSession.createQueue(queueName, queueName, false);
ClientProducer cp = clientSession.createProducer(queueName);
@@ -547,25 +365,13 @@
clientSession.commit();
Assert.assertEquals(2, q.getMessageCount());
clientSession.close();
- }
- finally
- {
- if (server.isStarted())
- {
- server.stop();
}
- }
- }
public void testCommitWithReceive() throws Exception
{
- HornetQServer server = createServer(false);
- try
- {
- server.start();
- locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
- ClientSessionFactory cf = locator.createSessionFactory();
+ cf = locator.createSessionFactory();
ClientSession sendSession = cf.createSession(false, true, true);
ClientProducer cp = sendSession.createProducer(queueName);
ClientSession clientSession = cf.createSession(false, true, false);
@@ -618,25 +424,13 @@
Assert.assertEquals(0, q.getMessageCount());
clientSession.close();
sendSession.close();
- }
- finally
- {
- if (server.isStarted())
- {
- server.stop();
- }
- }
}
public void testRollbackWithReceive() throws Exception
{
- HornetQServer server = createServer(false);
- try
- {
- server.start();
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
- ClientSessionFactory cf = locator.createSessionFactory();
+ cf = locator.createSessionFactory();
ClientSession sendSession = cf.createSession(false, true, true);
ClientProducer cp = sendSession.createProducer(queueName);
ClientSession clientSession = cf.createSession(false, true, false);
@@ -690,12 +484,4 @@
clientSession.close();
sendSession.close();
}
- finally
- {
- if (server.isStarted())
- {
- server.stop();
- }
- }
- }
}
13 years, 1 month
JBoss hornetq SVN: r11823 - in trunk: tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover and 1 other directory.
by do-not-reply@jboss.org
Author: borges
Date: 2011-12-05 07:43:14 -0500 (Mon, 05 Dec 2011)
New Revision: 11823
Modified:
trunk/hornetq-core/src/test/java/org/hornetq/tests/util/ServiceTestBase.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
Log:
Keep track of created locators and session-factories and close them at tear down
Modified: trunk/hornetq-core/src/test/java/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- trunk/hornetq-core/src/test/java/org/hornetq/tests/util/ServiceTestBase.java 2011-12-05 09:45:18 UTC (rev 11822)
+++ trunk/hornetq-core/src/test/java/org/hornetq/tests/util/ServiceTestBase.java 2011-12-05 12:43:14 UTC (rev 11823)
@@ -34,7 +34,6 @@
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
-import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.client.impl.Topology;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.logging.Logger;
@@ -86,11 +85,14 @@
@Override
protected void tearDown() throws Exception
{
- for (ServerLocator locator : locators)
+ synchronized (locators)
{
- closeServerLocator(locator);
+ for (ServerLocator locator : locators)
+ {
+ closeServerLocator(locator);
+ }
+ locators.clear();
}
- locators.clear();
super.tearDown();
// checkFreePort(5445);
// checkFreePort(5446);
@@ -584,15 +586,25 @@
{
ServerLocator locatorWithoutHA = isNetty ? HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(NETTY_CONNECTOR_FACTORY))
: HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
- locators.add(locatorWithoutHA);
+ addServerLocator(locatorWithoutHA);
return locatorWithoutHA;
}
+ private void addServerLocator(ServerLocator locator)
+ {
+ synchronized (locators)
+ {
+ locators.add(locator);
+ }
+ }
+
protected ServerLocator createInVMLocator(final int serverID)
{
TransportConfiguration tnspConfig = createInVMTransportConnectorConfig(serverID, UUIDGenerator.getInstance().generateStringUUID());
- return HornetQClient.createServerLocatorWithHA(tnspConfig);
+ ServerLocator locator = HornetQClient.createServerLocatorWithHA(tnspConfig);
+ addServerLocator(locator);
+ return locator;
}
/**
@@ -612,13 +624,6 @@
return tnspConfig;
}
- // XXX unused
- protected ClientSessionFactoryImpl createFactory(final String connectorClass) throws Exception
- {
- ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(connectorClass));
- return (ClientSessionFactoryImpl)locator.createSessionFactory();
-
- }
public String getTextMessage(final ClientMessage m)
{
m.getBodyBuffer().resetReaderIndex();
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-12-05 09:45:18 UTC (rev 11822)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-12-05 12:43:14 UTC (rev 11823)
@@ -31,6 +31,7 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
@@ -83,6 +84,7 @@
protected boolean startBackupServer = true;
private final Collection<ServerLocator> serverLocators = new ArrayList<ServerLocator>();
+ private final Collection<ClientSessionFactory> sessionFactories = new ArrayList<ClientSessionFactory>();
// Static --------------------------------------------------------
@@ -236,6 +238,15 @@
stopComponent(backupServer);
stopComponent(liveServer);
+ synchronized (sessionFactories)
+ {
+ for (ClientSessionFactory sf : sessionFactories)
+ {
+ closeSessionFactory(sf);
+ }
+ sessionFactories.clear();
+ }
+
synchronized (serverLocators)
{
for (ServerLocator locator : serverLocators)
@@ -245,6 +256,7 @@
serverLocators.clear();
}
+
Assert.assertEquals(0, InVMRegistry.instance.size());
backupServer = null;
@@ -285,7 +297,10 @@
locator.addClusterTopologyListener(new LatchClusterTopologyListener(countDownLatch));
sf = (ClientSessionFactoryInternal) locator.createSessionFactory();
-
+ synchronized (sessionFactories)
+ {
+ sessionFactories.add(sf);
+ }
assertTrue("topology members expected " + topologyMembers, countDownLatch.await(5, TimeUnit.SECONDS));
return sf;
}
13 years, 1 month
JBoss hornetq SVN: r11822 - trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client.
by do-not-reply@jboss.org
Author: ataylor
Date: 2011-12-05 04:45:18 -0500 (Mon, 05 Dec 2011)
New Revision: 11822
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ProducerFlowControlTest.java
Log:
stop server after resources to avoid reconnect retries
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ProducerFlowControlTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ProducerFlowControlTest.java 2011-12-05 09:40:04 UTC (rev 11821)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ProducerFlowControlTest.java 2011-12-05 09:45:18 UTC (rev 11822)
@@ -76,13 +76,14 @@
@Override
protected void tearDown() throws Exception
{
- stopComponent(server);
if (sf != null)
{
sf.close();
}
closeServerLocator(locator);
+ stopComponent(server);
+
super.tearDown();
}
@@ -535,29 +536,29 @@
sf = locator.createSessionFactory();
- session = sf.createSession(false, true, true, true);
+ session = sf.createSession(false, true, true, true);
- session.createQueue("address", "queue1", null, false);
+ session.createQueue("address", "queue1", null, false);
- ClientProducerCredits credits = null;
+ ClientProducerCredits credits = null;
- for (int i = 0; i < ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE * 2; i++)
- {
- ClientProducer prod = session.createProducer("address");
+ for (int i = 0; i < ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE * 2; i++)
+ {
+ ClientProducer prod = session.createProducer("address");
- ClientProducerCredits newCredits = ((ClientProducerInternal)prod).getProducerCredits();
+ ClientProducerCredits newCredits = ((ClientProducerInternal)prod).getProducerCredits();
- if (credits != null)
- {
- Assert.assertTrue(newCredits == credits);
- }
+ if (credits != null)
+ {
+ Assert.assertTrue(newCredits == credits);
+ }
- credits = newCredits;
+ credits = newCredits;
- Assert.assertEquals(1, ((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
- Assert.assertEquals(0, ((ClientSessionInternal)session).getProducerCreditManager()
- .unReferencedCreditsSize());
- }
+ Assert.assertEquals(1, ((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+ Assert.assertEquals(0, ((ClientSessionInternal)session).getProducerCreditManager()
+ .unReferencedCreditsSize());
+ }
}
public void testProducerCreditsCaching2() throws Exception
13 years, 1 month