[hornetq-commits] JBoss hornetq SVN: r8962 - in trunk: tests/src/org/hornetq/tests/integration/client and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Mar 25 05:11:44 EDT 2010


Author: timfox
Date: 2010-03-25 05:11:43 -0400 (Thu, 25 Mar 2010)
New Revision: 8962

Modified:
   trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
   trunk/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java
Log:
optimisation - remove locking in producer server side credit management

Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2010-03-25 05:56:22 UTC (rev 8961)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2010-03-25 09:11:43 UTC (rev 8962)
@@ -627,7 +627,7 @@
 
    }
 
-   private Queue<Runnable> onMemoryFreedRunnables = new ConcurrentLinkedQueue<Runnable>();
+   private Queue<OurRunnable> onMemoryFreedRunnables = new ConcurrentLinkedQueue<OurRunnable>();
 
    private class MemoryFreedRunnablesExecutor implements Runnable
    {
@@ -644,20 +644,49 @@
 
    private final Runnable memoryFreedRunnablesExecutor = new MemoryFreedRunnablesExecutor();
 
-   private final Object runnableLock = new Object();
-
+   class OurRunnable implements Runnable
+   {
+      boolean ran;
+      
+      final Runnable runnable;
+      
+      OurRunnable(final Runnable runnable)
+      {
+         this.runnable = runnable;
+      }
+      
+      public synchronized void run()
+      {
+         if (!ran)
+         {
+            runnable.run();
+            
+            ran = true;
+         }
+      }
+   }
+   
    public void executeRunnableWhenMemoryAvailable(final Runnable runnable)
    {
       if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK && maxSize != -1)
       {
-         synchronized (runnableLock)
+         if (sizeInBytes.get() > maxSize)
          {
-            if (sizeInBytes.get() > maxSize)
+            OurRunnable ourRunnable = new OurRunnable(runnable);
+            
+            onMemoryFreedRunnables.add(ourRunnable);
+            
+            //We check again to avoid a race condition where the size can come down just after the element
+            //has been added, but the check to execute was done before the element was added
+            //NOTE! We do not fix this race by locking the whole thing, doing this check provides
+            //MUCH better performance in a highly concurrent environment
+            if (sizeInBytes.get() <= maxSize)
             {
-               onMemoryFreedRunnables.add(runnable);
+               //run it now
+               ourRunnable.run();
+            }
 
-               return;
-            }
+            return;
          }
       }
       runnable.run();
@@ -669,16 +698,13 @@
       {
          if (maxSize != -1)
          {
-            synchronized (runnableLock)
+            long newSize = sizeInBytes.addAndGet(size);
+
+            if (newSize <= maxSize)
             {
-               long newSize = sizeInBytes.addAndGet(size);
-
-               if (newSize <= maxSize)
+               if (!onMemoryFreedRunnables.isEmpty())
                {
-                  if (!onMemoryFreedRunnables.isEmpty())
-                  {
-                     executor.execute(memoryFreedRunnablesExecutor);
-                  }
+                  executor.execute(memoryFreedRunnablesExecutor);
                }
             }
          }

Modified: trunk/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java	2010-03-25 05:56:22 UTC (rev 8961)
+++ trunk/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java	2010-03-25 09:11:43 UTC (rev 8962)
@@ -661,76 +661,6 @@
       server.stop();
    }
 
-   public void testBlockingIssue() throws Exception
-   {
-      // HornetQServer server = createServer(true, true);
-      //
-      // AddressSettings addressSettings = new AddressSettings();
-      // addressSettings.setMaxSizeBytes(300000);
-      // addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
-      //
-      // HierarchicalRepository<AddressSettings> repos = server.getAddressSettingsRepository();
-      // repos.addMatch("bar".toString(), addressSettings);
-      //      
-      // server.start();
-
-      // ClientSessionFactory sf = createFactory(true);
-
-      TransportConfiguration tc = new TransportConfiguration(NettyConnectorFactory.class.getName());
-
-      ClientSessionFactory sf = HornetQClient.createClientSessionFactory(tc);
-
-//      ClientSession sess = sf.createSession();
-//
-//      sess.createQueue("bar", "bar");
-
-      int count = 0;
-      while (true)
-      {
-         log.info("*** ITERATION " + count++ + "\n\n\n\n");
-         ClientSession session = sf.createTransactedSession();
-
-         ClientProducer producer = session.createProducer("bar");
-
-         for (int i = 0; i < 1000; i++)
-         {
-
-            ClientMessage message = session.createMessage(true);
-
-            message.getBodyBuffer().writeString("Hello");
-
-            producer.send(message);
-
-            // log.info("sent " + i);
-
-         }
-
-         session.commit();
-
-         session.close();
-
-         session = sf.createSession();
-
-         session.start();
-
-         ClientConsumer consumer = session.createConsumer("bar");
-
-         for (int i = 0; i < 1000; i++)
-         {
-
-            ClientMessage msgReceived = consumer.receive();
-
-            msgReceived.acknowledge();
-
-            // log.info("read " + i);
-
-         }
-
-         session.close();
-
-      }
-   }
-
    public void testProducerCreditsCaching5() throws Exception
    {
       HornetQServer server = createServer(false, isNetty());



More information about the hornetq-commits mailing list