[hornetq-commits] JBoss hornetq SVN: r10735 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/server/impl and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed May 25 17:41:07 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-05-25 17:41:06 -0400 (Wed, 25 May 2011)
New Revision: 10735

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCredits.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/TemporaryQueueTest.java
Log:
HORNETQ-700 /JBPAPP-6606

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCredits.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCredits.java	2011-05-25 20:14:23 UTC (rev 10734)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCredits.java	2011-05-25 21:41:06 UTC (rev 10735)
@@ -25,6 +25,8 @@
    void acquireCredits(int credits) throws InterruptedException;
 
    void receiveCredits(int credits);
+   
+   boolean isBlocked();
 
    void reset();
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java	2011-05-25 20:14:23 UTC (rev 10734)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java	2011-05-25 21:41:06 UTC (rev 10735)
@@ -33,14 +33,16 @@
 
    private final int windowSize;
 
+      private boolean blocked;
+
    private final SimpleString address;
 
    private final ClientSessionInternal session;
 
    private int arriving;
-   
+
    private int refCount;
-   
+
    public ClientProducerCreditsImpl(final ClientSessionInternal session,
                                     final SimpleString address,
                                     final int windowSize)
@@ -64,16 +66,37 @@
    {
       checkCredits(credits);
 
-      semaphore.acquire(credits);
+      if (!semaphore.tryAcquire(credits))
+      {
+         this.blocked = true;
+         try
+         {
+            semaphore.acquire(credits);
+         }
+         finally
+         {
+            this.blocked = false;
+         }
+      }
    }
 
+   public boolean isBlocked()
+   {
+      return blocked;
+   }
+
+   public int getBalance()
+   {
+      return semaphore.availablePermits();
+   }
+
    public void receiveCredits(final int credits)
    {
       synchronized (this)
       {
          arriving -= credits;
       }
-      
+
       semaphore.release(credits);
    }
 
@@ -84,7 +107,7 @@
       semaphore.drainPermits();
 
       int beforeFailure = arriving;
-      
+
       arriving = 0;
 
       // If we are waiting for more credits than what's configured, then we need to use what we tried before
@@ -98,22 +121,22 @@
 
       semaphore.release(Integer.MAX_VALUE / 2);
    }
-    
+
    public synchronized void incrementRefCount()
    {
       refCount++;
    }
-   
+
    public synchronized int decrementRefCount()
    {
       return --refCount;
    }
-   
+
    public synchronized void releaseOutstanding()
    {
       semaphore.drainPermits();
    }
-   
+
    private void checkCredits(final int credits)
    {
       int needed = Math.max(credits, windowSize);

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2011-05-25 20:14:23 UTC (rev 10734)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2011-05-25 21:41:06 UTC (rev 10735)
@@ -391,7 +391,7 @@
          securityStore.check(address, CheckType.CREATE_NON_DURABLE_QUEUE, this);
       }
 
-      server.createQueue(address, name, filterString, durable, temporary);
+      Queue queue = server.createQueue(address, name, filterString, durable, temporary);
 
       if (temporary)
       {
@@ -401,7 +401,7 @@
          // session is closed.
          // It is up to the user to delete the queue when finished with it
         
-         TempQueueCleanerUpper cleaner = new TempQueueCleanerUpper(postOffice, name);
+         TempQueueCleanerUpper cleaner = new TempQueueCleanerUpper(postOffice, name, queue);
 
          remotingConnection.addCloseListener(cleaner);
          remotingConnection.addFailureListener(cleaner);
@@ -409,18 +409,32 @@
          tempQueueCleannerUppers.put(name, cleaner);
       }
    }
+   
+   
+   /**
+    * For test cases only
+    * @return
+    */
+   public RemotingConnection getRemotingConnection()
+   {
+      return remotingConnection;
+   }
 
    private static class TempQueueCleanerUpper implements CloseListener, FailureListener
    {
       private final PostOffice postOffice;
 
       private final SimpleString bindingName;
+      
+      private final Queue queue;
 
-      TempQueueCleanerUpper(final PostOffice postOffice, final SimpleString bindingName)
+      TempQueueCleanerUpper(final PostOffice postOffice, final SimpleString bindingName, final Queue queue)
       {
          this.postOffice = postOffice;
 
          this.bindingName = bindingName;
+         
+         this.queue = queue;
       }
 
       private void run()
@@ -431,6 +445,8 @@
             {
                postOffice.removeBinding(bindingName);
             }
+            
+            queue.deleteAllReferences();
          }
          catch (Exception e)
          {

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/TemporaryQueueTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/TemporaryQueueTest.java	2011-05-25 20:14:23 UTC (rev 10734)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/TemporaryQueueTest.java	2011-05-25 21:41:06 UTC (rev 10735)
@@ -32,6 +32,8 @@
 import org.hornetq.api.core.client.HornetQClient;
 import org.hornetq.api.core.client.MessageHandler;
 import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.client.impl.ClientProducerCreditsImpl;
+import org.hornetq.core.client.impl.ClientProducerImpl;
 import org.hornetq.core.client.impl.ClientSessionInternal;
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.logging.Logger;
@@ -41,6 +43,10 @@
 import org.hornetq.core.remoting.CloseListener;
 import org.hornetq.core.remoting.server.impl.RemotingServiceImpl;
 import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.ServerSession;
+import org.hornetq.core.server.impl.ServerSessionImpl;
+import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
+import org.hornetq.core.settings.impl.AddressSettings;
 import org.hornetq.spi.core.protocol.RemotingConnection;
 import org.hornetq.tests.util.RandomUtil;
 import org.hornetq.tests.util.ServiceTestBase;
@@ -562,7 +568,106 @@
 
       locator2.close();
    }
+   
+   public void testBlockingWithTemporaryQueue() throws Exception
+   {
+      
+      AddressSettings setting = new AddressSettings();
+      setting.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
+      setting.setMaxSizeBytes(1024 * 1024);
+      
+      server.getAddressSettingsRepository().addMatch("TestAD", setting);
+      
+      ClientSessionFactory consumerCF = locator.createSessionFactory();
+      ClientSession consumerSession = consumerCF.createSession(true, true);
+      consumerSession.addMetaData("consumer", "consumer");
+      consumerSession.createTemporaryQueue("TestAD", "Q1");
+      ClientConsumer consumer = consumerSession.createConsumer("Q1");
+      consumerSession.start();
+      
+      final ClientProducerImpl prod = (ClientProducerImpl)session.createProducer("TestAD");
+      
+      final AtomicInteger errors = new AtomicInteger(0);
+      
+      final AtomicInteger msgs = new AtomicInteger(0);
+      
+      final int TOTAL_MSG = 1000;
+      
+      Thread t = new Thread()
+      {
+         public void run()
+         {
+            try
+            {
+               for (int i = 0 ; i < TOTAL_MSG; i++)
+               {
+                  ClientMessage msg = session.createMessage(false);
+                  msg.getBodyBuffer().writeBytes(new byte[1024]);
+                  prod.send(msg);
+                  msgs.incrementAndGet();
+               }
+            }
+            catch (Throwable e)
+            {
+               e.printStackTrace();
+               errors.incrementAndGet();
+            }
+            
+            System.out.println("done");
+         }
+      };
+      
+      t.start();
 
+      while (msgs.get() == 0)
+      {
+         Thread.sleep(100);
+      }
+      
+      while (t.isAlive() && errors.get() == 0 && !prod.getProducerCredits().isBlocked())
+      {
+         Thread.sleep(100);
+      }
+      
+      assertEquals(0, errors.get());
+
+      ClientSessionFactory newConsumerCF = locator.createSessionFactory();
+      ClientSession newConsumerSession = newConsumerCF.createSession(true, true);
+      newConsumerSession.createTemporaryQueue("TestAD", "Q2");
+      ClientConsumer newConsumer = newConsumerSession.createConsumer("Q2");
+      newConsumerSession.start();
+      
+      int toReceive = TOTAL_MSG - msgs.get() - 1;
+
+      for (ServerSession sessionIterator: server.getSessions())
+      {
+         if (sessionIterator.getMetaData("consumer") != null)
+         {
+            System.out.println("Failing session");
+            ServerSessionImpl impl = (ServerSessionImpl) sessionIterator;
+            impl.getRemotingConnection().fail(new HornetQException(HornetQException.DISCONNECTED, "failure e"));
+         }
+      }
+      
+      int secondReceive = 0;
+      
+      ClientMessage msg = null;
+      while (secondReceive < toReceive && (msg = newConsumer.receive(5000)) != null)
+      {
+         msg.acknowledge();
+         secondReceive++;
+      }
+      
+      assertNull(newConsumer.receiveImmediate());
+      
+      assertEquals(toReceive, secondReceive);
+      
+      t.join();
+      
+      
+      
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
@@ -586,6 +691,7 @@
    {
       ServerLocator retlocator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY));
       retlocator.setConnectionTTL(TemporaryQueueTest.CONNECTION_TTL);
+      retlocator.setClientFailureCheckPeriod(TemporaryQueueTest.CONNECTION_TTL / 3);
       return retlocator;
    }
 



More information about the hornetq-commits mailing list