[hornetq-commits] JBoss hornetq SVN: r10735 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/server/impl and 1 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Wed May 25 17:41:07 EDT 2011
Author: clebert.suconic at jboss.com
Date: 2011-05-25 17:41:06 -0400 (Wed, 25 May 2011)
New Revision: 10735
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCredits.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/TemporaryQueueTest.java
Log:
HORNETQ-700 /JBPAPP-6606
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCredits.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCredits.java 2011-05-25 20:14:23 UTC (rev 10734)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCredits.java 2011-05-25 21:41:06 UTC (rev 10735)
@@ -25,6 +25,8 @@
void acquireCredits(int credits) throws InterruptedException;
void receiveCredits(int credits);
+
+ boolean isBlocked();
void reset();
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java 2011-05-25 20:14:23 UTC (rev 10734)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java 2011-05-25 21:41:06 UTC (rev 10735)
@@ -33,14 +33,16 @@
private final int windowSize;
+ private boolean blocked;
+
private final SimpleString address;
private final ClientSessionInternal session;
private int arriving;
-
+
private int refCount;
-
+
public ClientProducerCreditsImpl(final ClientSessionInternal session,
final SimpleString address,
final int windowSize)
@@ -64,16 +66,37 @@
{
checkCredits(credits);
- semaphore.acquire(credits);
+ if (!semaphore.tryAcquire(credits))
+ {
+ this.blocked = true;
+ try
+ {
+ semaphore.acquire(credits);
+ }
+ finally
+ {
+ this.blocked = false;
+ }
+ }
}
+ public boolean isBlocked()
+ {
+ return blocked;
+ }
+
+ public int getBalance()
+ {
+ return semaphore.availablePermits();
+ }
+
public void receiveCredits(final int credits)
{
synchronized (this)
{
arriving -= credits;
}
-
+
semaphore.release(credits);
}
@@ -84,7 +107,7 @@
semaphore.drainPermits();
int beforeFailure = arriving;
-
+
arriving = 0;
// If we are waiting for more credits than what's configured, then we need to use what we tried before
@@ -98,22 +121,22 @@
semaphore.release(Integer.MAX_VALUE / 2);
}
-
+
public synchronized void incrementRefCount()
{
refCount++;
}
-
+
public synchronized int decrementRefCount()
{
return --refCount;
}
-
+
public synchronized void releaseOutstanding()
{
semaphore.drainPermits();
}
-
+
private void checkCredits(final int credits)
{
int needed = Math.max(credits, windowSize);
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-05-25 20:14:23 UTC (rev 10734)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-05-25 21:41:06 UTC (rev 10735)
@@ -391,7 +391,7 @@
securityStore.check(address, CheckType.CREATE_NON_DURABLE_QUEUE, this);
}
- server.createQueue(address, name, filterString, durable, temporary);
+ Queue queue = server.createQueue(address, name, filterString, durable, temporary);
if (temporary)
{
@@ -401,7 +401,7 @@
// session is closed.
// It is up to the user to delete the queue when finished with it
- TempQueueCleanerUpper cleaner = new TempQueueCleanerUpper(postOffice, name);
+ TempQueueCleanerUpper cleaner = new TempQueueCleanerUpper(postOffice, name, queue);
remotingConnection.addCloseListener(cleaner);
remotingConnection.addFailureListener(cleaner);
@@ -409,18 +409,32 @@
tempQueueCleannerUppers.put(name, cleaner);
}
}
+
+
+ /**
+ * For test cases only
+ * @return
+ */
+ public RemotingConnection getRemotingConnection()
+ {
+ return remotingConnection;
+ }
private static class TempQueueCleanerUpper implements CloseListener, FailureListener
{
private final PostOffice postOffice;
private final SimpleString bindingName;
+
+ private final Queue queue;
- TempQueueCleanerUpper(final PostOffice postOffice, final SimpleString bindingName)
+ TempQueueCleanerUpper(final PostOffice postOffice, final SimpleString bindingName, final Queue queue)
{
this.postOffice = postOffice;
this.bindingName = bindingName;
+
+ this.queue = queue;
}
private void run()
@@ -431,6 +445,8 @@
{
postOffice.removeBinding(bindingName);
}
+
+ queue.deleteAllReferences();
}
catch (Exception e)
{
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/TemporaryQueueTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/TemporaryQueueTest.java 2011-05-25 20:14:23 UTC (rev 10734)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/TemporaryQueueTest.java 2011-05-25 21:41:06 UTC (rev 10735)
@@ -32,6 +32,8 @@
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.MessageHandler;
import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.client.impl.ClientProducerCreditsImpl;
+import org.hornetq.core.client.impl.ClientProducerImpl;
import org.hornetq.core.client.impl.ClientSessionInternal;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.logging.Logger;
@@ -41,6 +43,10 @@
import org.hornetq.core.remoting.CloseListener;
import org.hornetq.core.remoting.server.impl.RemotingServiceImpl;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.ServerSession;
+import org.hornetq.core.server.impl.ServerSessionImpl;
+import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
+import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.ServiceTestBase;
@@ -562,7 +568,106 @@
locator2.close();
}
+
+ public void testBlockingWithTemporaryQueue() throws Exception
+ {
+
+ AddressSettings setting = new AddressSettings();
+ setting.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
+ setting.setMaxSizeBytes(1024 * 1024);
+
+ server.getAddressSettingsRepository().addMatch("TestAD", setting);
+
+ ClientSessionFactory consumerCF = locator.createSessionFactory();
+ ClientSession consumerSession = consumerCF.createSession(true, true);
+ consumerSession.addMetaData("consumer", "consumer");
+ consumerSession.createTemporaryQueue("TestAD", "Q1");
+ ClientConsumer consumer = consumerSession.createConsumer("Q1");
+ consumerSession.start();
+
+ final ClientProducerImpl prod = (ClientProducerImpl)session.createProducer("TestAD");
+
+ final AtomicInteger errors = new AtomicInteger(0);
+
+ final AtomicInteger msgs = new AtomicInteger(0);
+
+ final int TOTAL_MSG = 1000;
+
+ Thread t = new Thread()
+ {
+ public void run()
+ {
+ try
+ {
+ for (int i = 0 ; i < TOTAL_MSG; i++)
+ {
+ ClientMessage msg = session.createMessage(false);
+ msg.getBodyBuffer().writeBytes(new byte[1024]);
+ prod.send(msg);
+ msgs.incrementAndGet();
+ }
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ errors.incrementAndGet();
+ }
+
+ System.out.println("done");
+ }
+ };
+
+ t.start();
+ while (msgs.get() == 0)
+ {
+ Thread.sleep(100);
+ }
+
+ while (t.isAlive() && errors.get() == 0 && !prod.getProducerCredits().isBlocked())
+ {
+ Thread.sleep(100);
+ }
+
+ assertEquals(0, errors.get());
+
+ ClientSessionFactory newConsumerCF = locator.createSessionFactory();
+ ClientSession newConsumerSession = newConsumerCF.createSession(true, true);
+ newConsumerSession.createTemporaryQueue("TestAD", "Q2");
+ ClientConsumer newConsumer = newConsumerSession.createConsumer("Q2");
+ newConsumerSession.start();
+
+ int toReceive = TOTAL_MSG - msgs.get() - 1;
+
+ for (ServerSession sessionIterator: server.getSessions())
+ {
+ if (sessionIterator.getMetaData("consumer") != null)
+ {
+ System.out.println("Failing session");
+ ServerSessionImpl impl = (ServerSessionImpl) sessionIterator;
+ impl.getRemotingConnection().fail(new HornetQException(HornetQException.DISCONNECTED, "failure e"));
+ }
+ }
+
+ int secondReceive = 0;
+
+ ClientMessage msg = null;
+ while (secondReceive < toReceive && (msg = newConsumer.receive(5000)) != null)
+ {
+ msg.acknowledge();
+ secondReceive++;
+ }
+
+ assertNull(newConsumer.receiveImmediate());
+
+ assertEquals(toReceive, secondReceive);
+
+ t.join();
+
+
+
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -586,6 +691,7 @@
{
ServerLocator retlocator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY));
retlocator.setConnectionTTL(TemporaryQueueTest.CONNECTION_TTL);
+ retlocator.setClientFailureCheckPeriod(TemporaryQueueTest.CONNECTION_TTL / 3);
return retlocator;
}
More information about the hornetq-commits
mailing list