[hornetq-commits] JBoss hornetq SVN: r8962 - in trunk: tests/src/org/hornetq/tests/integration/client and 1 other directory.
do-not-reply at jboss.org
do-not-reply at jboss.org
Thu Mar 25 05:11:44 EDT 2010
Author: timfox
Date: 2010-03-25 05:11:43 -0400 (Thu, 25 Mar 2010)
New Revision: 8962
Modified:
trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
trunk/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java
Log:
optimisation - remove locking in producer server side credit management
Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-03-25 05:56:22 UTC (rev 8961)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-03-25 09:11:43 UTC (rev 8962)
@@ -627,7 +627,7 @@
}
- private Queue<Runnable> onMemoryFreedRunnables = new ConcurrentLinkedQueue<Runnable>();
+ private Queue<OurRunnable> onMemoryFreedRunnables = new ConcurrentLinkedQueue<OurRunnable>();
private class MemoryFreedRunnablesExecutor implements Runnable
{
@@ -644,20 +644,49 @@
private final Runnable memoryFreedRunnablesExecutor = new MemoryFreedRunnablesExecutor();
- private final Object runnableLock = new Object();
-
+ class OurRunnable implements Runnable
+ {
+ boolean ran;
+
+ final Runnable runnable;
+
+ OurRunnable(final Runnable runnable)
+ {
+ this.runnable = runnable;
+ }
+
+ public synchronized void run()
+ {
+ if (!ran)
+ {
+ runnable.run();
+
+ ran = true;
+ }
+ }
+ }
+
public void executeRunnableWhenMemoryAvailable(final Runnable runnable)
{
if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK && maxSize != -1)
{
- synchronized (runnableLock)
+ if (sizeInBytes.get() > maxSize)
{
- if (sizeInBytes.get() > maxSize)
+ OurRunnable ourRunnable = new OurRunnable(runnable);
+
+ onMemoryFreedRunnables.add(ourRunnable);
+
+ //We check again to avoid a race condition where the size can come down just after the element
+ //has been added, but the check to execute was done before the element was added
+ //NOTE! We do not fix this race by locking the whole thing, doing this check provides
+ //MUCH better performance in a highly concurrent environment
+ if (sizeInBytes.get() <= maxSize)
{
- onMemoryFreedRunnables.add(runnable);
+ //run it now
+ ourRunnable.run();
+ }
- return;
- }
+ return;
}
}
runnable.run();
@@ -669,16 +698,13 @@
{
if (maxSize != -1)
{
- synchronized (runnableLock)
+ long newSize = sizeInBytes.addAndGet(size);
+
+ if (newSize <= maxSize)
{
- long newSize = sizeInBytes.addAndGet(size);
-
- if (newSize <= maxSize)
+ if (!onMemoryFreedRunnables.isEmpty())
{
- if (!onMemoryFreedRunnables.isEmpty())
- {
- executor.execute(memoryFreedRunnablesExecutor);
- }
+ executor.execute(memoryFreedRunnablesExecutor);
}
}
}
Modified: trunk/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java 2010-03-25 05:56:22 UTC (rev 8961)
+++ trunk/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java 2010-03-25 09:11:43 UTC (rev 8962)
@@ -661,76 +661,6 @@
server.stop();
}
- public void testBlockingIssue() throws Exception
- {
- // HornetQServer server = createServer(true, true);
- //
- // AddressSettings addressSettings = new AddressSettings();
- // addressSettings.setMaxSizeBytes(300000);
- // addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
- //
- // HierarchicalRepository<AddressSettings> repos = server.getAddressSettingsRepository();
- // repos.addMatch("bar".toString(), addressSettings);
- //
- // server.start();
-
- // ClientSessionFactory sf = createFactory(true);
-
- TransportConfiguration tc = new TransportConfiguration(NettyConnectorFactory.class.getName());
-
- ClientSessionFactory sf = HornetQClient.createClientSessionFactory(tc);
-
-// ClientSession sess = sf.createSession();
-//
-// sess.createQueue("bar", "bar");
-
- int count = 0;
- while (true)
- {
- log.info("*** ITERATION " + count++ + "\n\n\n\n");
- ClientSession session = sf.createTransactedSession();
-
- ClientProducer producer = session.createProducer("bar");
-
- for (int i = 0; i < 1000; i++)
- {
-
- ClientMessage message = session.createMessage(true);
-
- message.getBodyBuffer().writeString("Hello");
-
- producer.send(message);
-
- // log.info("sent " + i);
-
- }
-
- session.commit();
-
- session.close();
-
- session = sf.createSession();
-
- session.start();
-
- ClientConsumer consumer = session.createConsumer("bar");
-
- for (int i = 0; i < 1000; i++)
- {
-
- ClientMessage msgReceived = consumer.receive();
-
- msgReceived.acknowledge();
-
- // log.info("read " + i);
-
- }
-
- session.close();
-
- }
- }
-
public void testProducerCreditsCaching5() throws Exception
{
HornetQServer server = createServer(false, isNetty());
More information about the hornetq-commits
mailing list