Author: clebert.suconic(a)jboss.com
Date: 2011-09-01 09:31:34 -0400 (Thu, 01 Sep 2011)
New Revision: 11269
Modified:
branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java
Log:
Make sure the test will stop its servers properly
Modified:
branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java 2011-09-01
03:08:07 UTC (rev 11268)
+++
branches/Branch_2_2_EAP_cluster_clean3/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java 2011-09-01
13:31:34 UTC (rev 11269)
@@ -217,137 +217,144 @@
repos.addMatch(address.toString(), addressSettings);
server.start();
+ waitForServer(server);
- locator.setProducerWindowSize(producerWindowSize);
- locator.setConsumerWindowSize(consumerWindowSize);
- locator.setAckBatchSize(ackBatchSize);
-
- if (minLargeMessageSize != -1)
+ try
{
- locator.setMinLargeMessageSize(minLargeMessageSize);
- }
- ClientSessionFactory sf = locator.createSessionFactory();
- ClientSession session = sf.createSession(false, true, true, true);
+ locator.setProducerWindowSize(producerWindowSize);
+ locator.setConsumerWindowSize(consumerWindowSize);
+ locator.setAckBatchSize(ackBatchSize);
- session.start();
+ if (minLargeMessageSize != -1)
+ {
+ locator.setMinLargeMessageSize(minLargeMessageSize);
+ }
- final String queueName = "testqueue";
+ ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSession session = sf.createSession(false, true, true, true);
- for (int i = 0; i < numConsumers; i++)
- {
- session.createQueue(address, new SimpleString(queueName + i), null, false);
- }
+ session.start();
- final byte[] bytes = RandomUtil.randomBytes(messageSize);
+ final String queueName = "testqueue";
- class MyHandler implements MessageHandler
- {
- int count = 0;
+ for (int i = 0; i < numConsumers; i++)
+ {
+ session.createQueue(address, new SimpleString(queueName + i), null, false);
+ }
- final CountDownLatch latch = new CountDownLatch(1);
+ final byte[] bytes = RandomUtil.randomBytes(messageSize);
- volatile Exception exception;
-
- public void onMessage(final ClientMessage message)
+ class MyHandler implements MessageHandler
{
- try
- {
- byte[] bytesRead = new byte[messageSize];
+ int count = 0;
- message.getBodyBuffer().readBytes(bytesRead);
+ final CountDownLatch latch = new CountDownLatch(1);
- UnitTestCase.assertEqualsByteArrays(bytes, bytesRead);
+ volatile Exception exception;
- message.acknowledge();
-
- if (++count == numMessages * numProducers)
+ public void onMessage(final ClientMessage message)
+ {
+ try
{
- latch.countDown();
- }
+ byte[] bytesRead = new byte[messageSize];
- if (consumerDelay > 0)
- {
- Thread.sleep(consumerDelay);
- }
+ message.getBodyBuffer().readBytes(bytesRead);
- }
- catch (Exception e)
- {
- ProducerFlowControlTest.log.error("Failed to handle message",
e);
+ UnitTestCase.assertEqualsByteArrays(bytes, bytesRead);
- exception = e;
+ message.acknowledge();
- latch.countDown();
- }
- }
- }
+ if (++count == numMessages * numProducers)
+ {
+ latch.countDown();
+ }
- MyHandler[] handlers = new MyHandler[numConsumers];
+ if (consumerDelay > 0)
+ {
+ Thread.sleep(consumerDelay);
+ }
- for (int i = 0; i < numConsumers; i++)
- {
- handlers[i] = new MyHandler();
+ }
+ catch (Exception e)
+ {
+ ProducerFlowControlTest.log.error("Failed to handle message",
e);
- ClientConsumer consumer = session.createConsumer(new SimpleString(queueName +
i));
+ exception = e;
- consumer.setMessageHandler(handlers[i]);
- }
+ latch.countDown();
+ }
+ }
+ }
- ClientProducer[] producers = new ClientProducer[numProducers];
+ MyHandler[] handlers = new MyHandler[numConsumers];
- for (int i = 0; i < numProducers; i++)
- {
- if (anon)
+ for (int i = 0; i < numConsumers; i++)
{
- producers[i] = session.createProducer();
- }
- else
- {
- producers[i] = session.createProducer(address);
- }
- }
+ handlers[i] = new MyHandler();
- long start = System.currentTimeMillis();
+ ClientConsumer consumer = session.createConsumer(new SimpleString(queueName +
i));
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session.createMessage(false);
+ consumer.setMessageHandler(handlers[i]);
+ }
- message.getBodyBuffer().writeBytes(bytes);
+ ClientProducer[] producers = new ClientProducer[numProducers];
- for (int j = 0; j < numProducers; j++)
+ for (int i = 0; i < numProducers; i++)
{
if (anon)
{
- producers[j].send(address, message);
+ producers[i] = session.createProducer();
}
else
{
- producers[j].send(message);
+ producers[i] = session.createProducer(address);
}
+ }
+ long start = System.currentTimeMillis();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createMessage(false);
+
+ message.getBodyBuffer().writeBytes(bytes);
+
+ for (int j = 0; j < numProducers; j++)
+ {
+ if (anon)
+ {
+ producers[j].send(address, message);
+ }
+ else
+ {
+ producers[j].send(message);
+ }
+
+ }
}
- }
- for (int i = 0; i < numConsumers; i++)
- {
- assertTrue(handlers[i].latch.await(5, TimeUnit.MINUTES));
+ for (int i = 0; i < numConsumers; i++)
+ {
+ Assert.assertTrue(handlers[i].latch.await(5, TimeUnit.MINUTES));
- Assert.assertNull(handlers[i].exception);
- }
+ Assert.assertNull(handlers[i].exception);
+ }
- long end = System.currentTimeMillis();
+ long end = System.currentTimeMillis();
- double rate = 1000 * (double)numMessages / (end - start);
+ double rate = 1000 * (double)numMessages / (end - start);
- ProducerFlowControlTest.log.info("rate is " + rate + " msgs /
sec");
+ ProducerFlowControlTest.log.info("rate is " + rate + " msgs /
sec");
- session.close();
+ session.close();
- sf.close();
-
- server.stop();
+ sf.close();
+ }
+ finally
+ {
+ server.stop();
+ }
}
public void testClosingSessionUnblocksBlockedProducer() throws Exception
@@ -364,56 +371,63 @@
repos.addMatch(address.toString(), addressSettings);
server.start();
+ waitForServer(server);
- locator.setProducerWindowSize(1024);
- locator.setConsumerWindowSize(1024);
- locator.setAckBatchSize(1024);
+ try
+ {
- ClientSessionFactory sf = locator.createSessionFactory();
- final ClientSession session = sf.createSession(false, true, true, true);
+ locator.setProducerWindowSize(1024);
+ locator.setConsumerWindowSize(1024);
+ locator.setAckBatchSize(1024);
- final SimpleString queueName = new SimpleString("testqueue");
+ ClientSessionFactory sf = locator.createSessionFactory();
+ final ClientSession session = sf.createSession(false, true, true, true);
- session.createQueue(address, queueName, null, false);
+ final SimpleString queueName = new SimpleString("testqueue");
- ClientProducer producer = session.createProducer(address);
+ session.createQueue(address, queueName, null, false);
- byte[] bytes = new byte[2000];
+ ClientProducer producer = session.createProducer(address);
- ClientMessage message = session.createMessage(false);
+ byte[] bytes = new byte[2000];
- message.getBodyBuffer().writeBytes(bytes);
+ ClientMessage message = session.createMessage(false);
- final AtomicBoolean closed = new AtomicBoolean(false);
+ message.getBodyBuffer().writeBytes(bytes);
- Thread t = new Thread(new Runnable()
- {
- public void run()
+ final AtomicBoolean closed = new AtomicBoolean(false);
+
+ Thread t = new Thread(new Runnable()
{
- try
+ public void run()
{
- Thread.sleep(500);
+ try
+ {
+ Thread.sleep(500);
- closed.set(true);
+ closed.set(true);
- session.close();
+ session.close();
+ }
+ catch (Exception e)
+ {
+ }
}
- catch (Exception e)
- {
- }
- }
- });
+ });
- t.start();
+ t.start();
- // This will block
- producer.send(message);
+ // This will block
+ producer.send(message);
- Assert.assertTrue(closed.get());
+ Assert.assertTrue(closed.get());
- t.join();
-
- server.stop();
+ t.join();
+ }
+ finally
+ {
+ server.stop();
+ }
}
public void testFlowControlMessageNotRouted() throws Exception
@@ -430,33 +444,40 @@
repos.addMatch(address.toString(), addressSettings);
server.start();
+ waitForServer(server);
- locator.setProducerWindowSize(1024);
- locator.setConsumerWindowSize(1024);
- locator.setAckBatchSize(1024);
+ try
+ {
- ClientSessionFactory sf = locator.createSessionFactory();
+ locator.setProducerWindowSize(1024);
+ locator.setConsumerWindowSize(1024);
+ locator.setAckBatchSize(1024);
- final ClientSession session = sf.createSession(false, true, true, true);
+ ClientSessionFactory sf = locator.createSessionFactory();
- ClientProducer producer = session.createProducer(address);
+ final ClientSession session = sf.createSession(false, true, true, true);
- byte[] bytes = new byte[100];
+ ClientProducer producer = session.createProducer(address);
- final int numMessages = 1000;
+ byte[] bytes = new byte[100];
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session.createMessage(false);
+ final int numMessages = 1000;
- message.getBodyBuffer().writeBytes(bytes);
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createMessage(false);
- producer.send(message);
- }
+ message.getBodyBuffer().writeBytes(bytes);
- session.close();
+ producer.send(message);
+ }
- server.stop();
+ session.close();
+ }
+ finally
+ {
+ server.stop();
+ }
}
// Not technically a flow control test, but what the hell
@@ -465,66 +486,73 @@
HornetQServer server = createServer(false, isNetty());
server.start();
+ waitForServer(server);
- ClientSessionFactory sf = locator.createSessionFactory();
+ try
+ {
- final ClientSession session = sf.createSession(false, true, true, true);
+ ClientSessionFactory sf = locator.createSessionFactory();
- session.createQueue("address", "queue1", null, false);
- session.createQueue("address", "queue2", null, false);
- session.createQueue("address", "queue3", null, false);
- session.createQueue("address", "queue4", null, false);
- session.createQueue("address", "queue5", null, false);
+ final ClientSession session = sf.createSession(false, true, true, true);
- ClientConsumer consumer1 = session.createConsumer("queue1");
- ClientConsumer consumer2 = session.createConsumer("queue2");
- ClientConsumer consumer3 = session.createConsumer("queue3");
- ClientConsumer consumer4 = session.createConsumer("queue4");
- ClientConsumer consumer5 = session.createConsumer("queue5");
+ session.createQueue("address", "queue1", null, false);
+ session.createQueue("address", "queue2", null, false);
+ session.createQueue("address", "queue3", null, false);
+ session.createQueue("address", "queue4", null, false);
+ session.createQueue("address", "queue5", null, false);
- ClientProducer producer = session.createProducer("address");
+ ClientConsumer consumer1 = session.createConsumer("queue1");
+ ClientConsumer consumer2 = session.createConsumer("queue2");
+ ClientConsumer consumer3 = session.createConsumer("queue3");
+ ClientConsumer consumer4 = session.createConsumer("queue4");
+ ClientConsumer consumer5 = session.createConsumer("queue5");
- byte[] bytes = new byte[2000];
+ ClientProducer producer = session.createProducer("address");
- ClientMessage message = session.createMessage(false);
+ byte[] bytes = new byte[2000];
- message.getBodyBuffer().writeBytes(bytes);
+ ClientMessage message = session.createMessage(false);
- final int numMessages = 1000;
+ message.getBodyBuffer().writeBytes(bytes);
- for (int i = 0; i < numMessages; i++)
- {
- producer.send(message);
- }
+ final int numMessages = 1000;
- session.start();
+ for (int i = 0; i < numMessages; i++)
+ {
+ producer.send(message);
+ }
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage msg = consumer1.receive(1000);
+ session.start();
- Assert.assertNotNull(msg);
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage msg = consumer1.receive(1000);
- msg = consumer2.receive(5000);
+ Assert.assertNotNull(msg);
- Assert.assertNotNull(msg);
+ msg = consumer2.receive(5000);
- msg = consumer3.receive(5000);
+ Assert.assertNotNull(msg);
- Assert.assertNotNull(msg);
+ msg = consumer3.receive(5000);
- msg = consumer4.receive(5000);
+ Assert.assertNotNull(msg);
- Assert.assertNotNull(msg);
+ msg = consumer4.receive(5000);
- msg = consumer5.receive(5000);
+ Assert.assertNotNull(msg);
- Assert.assertNotNull(msg);
- }
+ msg = consumer5.receive(5000);
- session.close();
+ Assert.assertNotNull(msg);
+ }
- server.stop();
+ session.close();
+ }
+ finally
+ {
+ server.stop();
+ }
}
public void testProducerCreditsCaching1() throws Exception
@@ -532,35 +560,43 @@
HornetQServer server = createServer(false, isNetty());
server.start();
+ waitForServer(server);
- ClientSessionFactory sf = locator.createSessionFactory();
+ try
+ {
- final ClientSession session = sf.createSession(false, true, true, true);
+ ClientSessionFactory sf = locator.createSessionFactory();
- session.createQueue("address", "queue1", null, false);
+ final ClientSession session = sf.createSession(false, true, true, true);
- ClientProducerCredits credits = null;
+ session.createQueue("address", "queue1", null, false);
- for (int i = 0; i <
ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE * 2; i++)
- {
- ClientProducer prod = session.createProducer("address");
+ ClientProducerCredits credits = null;
- ClientProducerCredits newCredits =
((ClientProducerInternal)prod).getProducerCredits();
-
- if (credits != null)
+ for (int i = 0; i <
ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE * 2; i++)
{
- assertTrue(newCredits == credits);
- }
+ ClientProducer prod = session.createProducer("address");
- credits = newCredits;
+ ClientProducerCredits newCredits =
((ClientProducerInternal)prod).getProducerCredits();
- assertEquals(1,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
- assertEquals(0,
((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
- }
+ if (credits != null)
+ {
+ Assert.assertTrue(newCredits == credits);
+ }
- session.close();
+ credits = newCredits;
- server.stop();
+ Assert.assertEquals(1,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+ Assert.assertEquals(0,
((ClientSessionInternal)session).getProducerCreditManager()
+
.unReferencedCreditsSize());
+ }
+
+ session.close();
+ }
+ finally
+ {
+ server.stop();
+ }
}
public void testProducerCreditsCaching2() throws Exception
@@ -568,37 +604,45 @@
HornetQServer server = createServer(false, isNetty());
server.start();
+ waitForServer(server);
- ClientSessionFactory sf = locator.createSessionFactory();
+ try
+ {
- final ClientSession session = sf.createSession(false, true, true, true);
+ ClientSessionFactory sf = locator.createSessionFactory();
- session.createQueue("address", "queue1", null, false);
+ final ClientSession session = sf.createSession(false, true, true, true);
- ClientProducerCredits credits = null;
+ session.createQueue("address", "queue1", null, false);
- for (int i = 0; i <
ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE * 2; i++)
- {
- ClientProducer prod = session.createProducer("address");
+ ClientProducerCredits credits = null;
- ClientProducerCredits newCredits =
((ClientProducerInternal)prod).getProducerCredits();
-
- if (credits != null)
+ for (int i = 0; i <
ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE * 2; i++)
{
- assertTrue(newCredits == credits);
- }
+ ClientProducer prod = session.createProducer("address");
- credits = newCredits;
+ ClientProducerCredits newCredits =
((ClientProducerInternal)prod).getProducerCredits();
- prod.close();
+ if (credits != null)
+ {
+ Assert.assertTrue(newCredits == credits);
+ }
- assertEquals(1,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
- assertEquals(1,
((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
- }
+ credits = newCredits;
- session.close();
+ prod.close();
- server.stop();
+ Assert.assertEquals(1,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+ Assert.assertEquals(1,
((ClientSessionInternal)session).getProducerCreditManager()
+
.unReferencedCreditsSize());
+ }
+
+ session.close();
+ }
+ finally
+ {
+ server.stop();
+ }
}
public void testProducerCreditsCaching3() throws Exception
@@ -606,35 +650,43 @@
HornetQServer server = createServer(false, isNetty());
server.start();
+ waitForServer(server);
- ClientSessionFactory sf = locator.createSessionFactory();
+ try
+ {
- final ClientSession session = sf.createSession(false, true, true, true);
+ ClientSessionFactory sf = locator.createSessionFactory();
- session.createQueue("address", "queue1", null, false);
+ final ClientSession session = sf.createSession(false, true, true, true);
- ClientProducerCredits credits = null;
+ session.createQueue("address", "queue1", null, false);
- for (int i = 0; i <
ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE; i++)
- {
- ClientProducer prod = session.createProducer("address" + i);
+ ClientProducerCredits credits = null;
- ClientProducerCredits newCredits =
((ClientProducerInternal)prod).getProducerCredits();
-
- if (credits != null)
+ for (int i = 0; i <
ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE; i++)
{
- assertFalse(newCredits == credits);
- }
+ ClientProducer prod = session.createProducer("address" + i);
- credits = newCredits;
+ ClientProducerCredits newCredits =
((ClientProducerInternal)prod).getProducerCredits();
- assertEquals(i + 1,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
- assertEquals(0,
((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
- }
+ if (credits != null)
+ {
+ Assert.assertFalse(newCredits == credits);
+ }
- session.close();
+ credits = newCredits;
- server.stop();
+ Assert.assertEquals(i + 1,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+ Assert.assertEquals(0,
((ClientSessionInternal)session).getProducerCreditManager()
+
.unReferencedCreditsSize());
+ }
+
+ session.close();
+ }
+ finally
+ {
+ server.stop();
+ }
}
public void testProducerCreditsCaching4() throws Exception
@@ -642,37 +694,45 @@
HornetQServer server = createServer(false, isNetty());
server.start();
+ waitForServer(server);
- ClientSessionFactory sf = locator.createSessionFactory();
+ try
+ {
- final ClientSession session = sf.createSession(false, true, true, true);
+ ClientSessionFactory sf = locator.createSessionFactory();
- session.createQueue("address", "queue1", null, false);
+ final ClientSession session = sf.createSession(false, true, true, true);
- ClientProducerCredits credits = null;
+ session.createQueue("address", "queue1", null, false);
- for (int i = 0; i <
ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE; i++)
- {
- ClientProducer prod = session.createProducer("address" + i);
+ ClientProducerCredits credits = null;
- ClientProducerCredits newCredits =
((ClientProducerInternal)prod).getProducerCredits();
-
- if (credits != null)
+ for (int i = 0; i <
ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE; i++)
{
- assertFalse(newCredits == credits);
- }
+ ClientProducer prod = session.createProducer("address" + i);
- credits = newCredits;
+ ClientProducerCredits newCredits =
((ClientProducerInternal)prod).getProducerCredits();
- prod.close();
+ if (credits != null)
+ {
+ Assert.assertFalse(newCredits == credits);
+ }
- assertEquals(i + 1,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
- assertEquals(i + 1,
((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
- }
+ credits = newCredits;
- session.close();
+ prod.close();
- server.stop();
+ Assert.assertEquals(i + 1,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+ Assert.assertEquals(i + 1,
((ClientSessionInternal)session).getProducerCreditManager()
+
.unReferencedCreditsSize());
+ }
+
+ session.close();
+ }
+ finally
+ {
+ server.stop();
+ }
}
public void testProducerCreditsCaching5() throws Exception
@@ -680,63 +740,73 @@
HornetQServer server = createServer(false, isNetty());
server.start();
+ waitForServer(server);
- ClientSessionFactory sf = locator.createSessionFactory();
+ try
+ {
- final ClientSession session = sf.createSession(false, true, true, true);
+ ClientSessionFactory sf = locator.createSessionFactory();
- session.createQueue("address", "queue1", null, false);
+ final ClientSession session = sf.createSession(false, true, true, true);
- ClientProducerCredits credits = null;
+ session.createQueue("address", "queue1", null, false);
- List<ClientProducerCredits> creditsList = new
ArrayList<ClientProducerCredits>();
+ ClientProducerCredits credits = null;
- for (int i = 0; i <
ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE; i++)
- {
- ClientProducer prod = session.createProducer("address" + i);
+ List<ClientProducerCredits> creditsList = new
ArrayList<ClientProducerCredits>();
- ClientProducerCredits newCredits =
((ClientProducerInternal)prod).getProducerCredits();
+ for (int i = 0; i <
ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE; i++)
+ {
+ ClientProducer prod = session.createProducer("address" + i);
- if (credits != null)
- {
- assertFalse(newCredits == credits);
+ ClientProducerCredits newCredits =
((ClientProducerInternal)prod).getProducerCredits();
+
+ if (credits != null)
+ {
+ Assert.assertFalse(newCredits == credits);
+ }
+
+ credits = newCredits;
+
+ Assert.assertEquals(i + 1,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+ Assert.assertEquals(0,
((ClientSessionInternal)session).getProducerCreditManager()
+
.unReferencedCreditsSize());
+
+ creditsList.add(credits);
}
- credits = newCredits;
+ Iterator<ClientProducerCredits> iter = creditsList.iterator();
- assertEquals(i + 1,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
- assertEquals(0,
((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+ for (int i = 0; i <
ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE; i++)
+ {
+ ClientProducer prod = session.createProducer("address" + i);
- creditsList.add(credits);
- }
+ ClientProducerCredits newCredits =
((ClientProducerInternal)prod).getProducerCredits();
- Iterator<ClientProducerCredits> iter = creditsList.iterator();
+ Assert.assertTrue(newCredits == iter.next());
- for (int i = 0; i <
ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE; i++)
- {
- ClientProducer prod = session.createProducer("address" + i);
+
Assert.assertEquals(ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE,
+
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+ Assert.assertEquals(0,
((ClientSessionInternal)session).getProducerCreditManager()
+
.unReferencedCreditsSize());
+ }
- ClientProducerCredits newCredits =
((ClientProducerInternal)prod).getProducerCredits();
+ for (int i = 0; i < 10; i++)
+ {
+ ClientProducer prod = session.createProducer("address" + (i +
ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE));
- assertTrue(newCredits == iter.next());
+
Assert.assertEquals(ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE +
i + 1,
+
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+ Assert.assertEquals(0,
((ClientSessionInternal)session).getProducerCreditManager()
+
.unReferencedCreditsSize());
+ }
-
assertEquals(ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE,
-
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
- assertEquals(0,
((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+ session.close();
}
-
- for (int i = 0; i < 10; i++)
+ finally
{
- ClientProducer prod = session.createProducer("address" + (i +
ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE));
-
- assertEquals(ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE
+ i + 1,
-
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
- assertEquals(0,
((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+ server.stop();
}
-
- session.close();
-
- server.stop();
}
public void testProducerCreditsCaching6() throws Exception
@@ -744,26 +814,35 @@
HornetQServer server = createServer(false, isNetty());
server.start();
+ waitForServer(server);
- ClientSessionFactory sf = locator.createSessionFactory();
+ try
+ {
- final ClientSession session = sf.createSession(false, true, true, true);
+ ClientSessionFactory sf = locator.createSessionFactory();
- session.createQueue("address", "queue1", null, false);
+ final ClientSession session = sf.createSession(false, true, true, true);
- for (int i = 0; i <
ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE; i++)
- {
- ClientProducer prod = session.createProducer((String)null);
+ session.createQueue("address", "queue1", null, false);
- prod.send("address", session.createMessage(false));
+ for (int i = 0; i <
ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE; i++)
+ {
+ ClientProducer prod = session.createProducer((String)null);
- assertEquals(1,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
- assertEquals(1,
((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+ prod.send("address", session.createMessage(false));
+
+ Assert.assertEquals(1,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+ Assert.assertEquals(1,
((ClientSessionInternal)session).getProducerCreditManager()
+
.unReferencedCreditsSize());
+ }
+
+ session.close();
}
+ finally
+ {
+ server.stop();
+ }
- session.close();
-
- server.stop();
}
public void testProducerCreditsCaching7() throws Exception
@@ -771,50 +850,58 @@
HornetQServer server = createServer(false, isNetty());
server.start();
+ waitForServer(server);
- ClientSessionFactory sf = locator.createSessionFactory();
+ try
+ {
- final ClientSession session = sf.createSession(false, true, true, true);
+ ClientSessionFactory sf = locator.createSessionFactory();
- session.createQueue("address", "queue1", null, false);
+ final ClientSession session = sf.createSession(false, true, true, true);
- for (int i = 0; i <
ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE; i++)
- {
- ClientProducer prod = session.createProducer((String)null);
+ session.createQueue("address", "queue1", null, false);
- prod.send("address" + i, session.createMessage(false));
+ for (int i = 0; i <
ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE; i++)
+ {
+ ClientProducer prod = session.createProducer((String)null);
- assertEquals(i + 1,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
- assertEquals(i + 1,
((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
- }
+ prod.send("address" + i, session.createMessage(false));
- for (int i = 0; i < 10; i++)
- {
- ClientProducer prod = session.createProducer((String)null);
+ Assert.assertEquals(i + 1,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+ Assert.assertEquals(i + 1,
((ClientSessionInternal)session).getProducerCreditManager()
+
.unReferencedCreditsSize());
+ }
- prod.send("address" + i, session.createMessage(false));
+ for (int i = 0; i < 10; i++)
+ {
+ ClientProducer prod = session.createProducer((String)null);
-
assertEquals(ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE,
-
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
-
assertEquals(ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE,
-
((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
- }
+ prod.send("address" + i, session.createMessage(false));
- for (int i = 0; i < 10; i++)
- {
- ClientProducer prod = session.createProducer((String)null);
+
Assert.assertEquals(ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE,
+
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+
Assert.assertEquals(ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE,
+
((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+ }
- prod.send("address2-" + i, session.createMessage(false));
+ for (int i = 0; i < 10; i++)
+ {
+ ClientProducer prod = session.createProducer((String)null);
-
assertEquals(ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE,
-
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
-
assertEquals(ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE,
-
((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
- }
+ prod.send("address2-" + i, session.createMessage(false));
- session.close();
+
Assert.assertEquals(ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE,
+
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+
Assert.assertEquals(ClientProducerCreditManagerImpl.MAX_UNREFERENCED_CREDITS_CACHE_SIZE,
+
((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+ }
- server.stop();
+ session.close();
+ }
+ finally
+ {
+ server.stop();
+ }
}
public void testProducerCreditsRefCounting() throws Exception
@@ -822,43 +909,50 @@
HornetQServer server = createServer(false, isNetty());
server.start();
+ waitForServer(server);
- ClientSessionFactory sf = locator.createSessionFactory();
+ try
+ {
- final ClientSession session = sf.createSession(false, true, true, true);
+ ClientSessionFactory sf = locator.createSessionFactory();
- session.createQueue("address", "queue1", null, false);
+ final ClientSession session = sf.createSession(false, true, true, true);
- ClientProducer prod1 = session.createProducer("address");
- assertEquals(1,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
- assertEquals(0,
((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+ session.createQueue("address", "queue1", null, false);
- ClientProducer prod2 = session.createProducer("address");
- assertEquals(1,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
- assertEquals(0,
((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+ ClientProducer prod1 = session.createProducer("address");
+ Assert.assertEquals(1,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+ Assert.assertEquals(0,
((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
- ClientProducer prod3 = session.createProducer("address");
- assertEquals(1,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
- assertEquals(0,
((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+ ClientProducer prod2 = session.createProducer("address");
+ Assert.assertEquals(1,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+ Assert.assertEquals(0,
((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
- prod1.close();
+ ClientProducer prod3 = session.createProducer("address");
+ Assert.assertEquals(1,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+ Assert.assertEquals(0,
((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
- assertEquals(1,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
- assertEquals(0,
((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+ prod1.close();
- prod2.close();
+ Assert.assertEquals(1,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+ Assert.assertEquals(0,
((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
- assertEquals(1,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
- assertEquals(0,
((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+ prod2.close();
- prod3.close();
+ Assert.assertEquals(1,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+ Assert.assertEquals(0,
((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
- assertEquals(1,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
- assertEquals(1,
((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
+ prod3.close();
- session.close();
+ Assert.assertEquals(1,
((ClientSessionInternal)session).getProducerCreditManager().creditsMapSize());
+ Assert.assertEquals(1,
((ClientSessionInternal)session).getProducerCreditManager().unReferencedCreditsSize());
- server.stop();
+ session.close();
+ }
+ finally
+ {
+ server.stop();
+ }
}
}