[hornetq-commits] JBoss hornetq SVN: r8441 - in trunk: src/main/org/hornetq/core/journal/impl and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Sat Nov 28 03:42:54 EST 2009


Author: timfox
Date: 2009-11-28 03:42:53 -0500 (Sat, 28 Nov 2009)
New Revision: 8441

Modified:
   trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
   trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java
   trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
   trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
   trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
   trunk/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java
Log:
mainly fixed JMSFailoverTest

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java	2009-11-28 03:29:29 UTC (rev 8440)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java	2009-11-28 08:42:53 UTC (rev 8441)
@@ -144,7 +144,7 @@
       sessionExecutor = executor;
 
       this.clientWindowSize = clientWindowSize;
-
+      
       this.ackBatchSize = ackBatchSize;
    }
 
@@ -585,7 +585,7 @@
       if (clientWindowSize >= 0)
       {
          creditsToSend += messageBytes;
-         
+
          if (creditsToSend >= clientWindowSize)
          {
             if (clientWindowSize == 0 && discountSlowConsumer)

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java	2009-11-28 03:29:29 UTC (rev 8440)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java	2009-11-28 08:42:53 UTC (rev 8441)
@@ -82,13 +82,13 @@
    }
 
    public synchronized void reset()
-   {
+   {      
       // Any arriving credits from before failover won't arrive, so we re-initialise
 
       semaphore.drainPermits();
 
       arriving = 0;
-
+      
       checkCredits(windowSize * 2);
    }
 
@@ -123,6 +123,7 @@
 
    private void requestCredits(final int credits)
    {
+      
       session.sendProducerCreditsMessage(credits, destination);
    }
 

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2009-11-28 03:29:29 UTC (rev 8440)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2009-11-28 08:42:53 UTC (rev 8441)
@@ -843,13 +843,13 @@
              
                conn.write(buffer, false);
 
-               int clientWindowSize = calcWindowSize(entry.getValue().getClientWindowSize());
-
+               int clientWindowSize = entry.getValue().getClientWindowSize();
+               
                if (clientWindowSize != 0)
                {
                   SessionConsumerFlowCreditMessage packet = new SessionConsumerFlowCreditMessage(entry.getKey(),
                                                                                                  clientWindowSize);
-
+                  
                   packet.setChannelID(channel.getID());
 
                   buffer = packet.encode(channel.getConnection());
@@ -902,6 +902,9 @@
       if (resetCreditManager)
       {
          producerCreditManager.reset();
+         
+         //Also need to send more credits for consumers, otherwise the system could hand with the server
+         //not having any credits to send
       }
    }
 

Modified: trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java	2009-11-28 03:29:29 UTC (rev 8440)
+++ trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java	2009-11-28 08:42:53 UTC (rev 8441)
@@ -99,7 +99,7 @@
    // Public --------------------------------------------------------
 
    public TimedBuffer(final int size, final long timeout, final boolean flushOnSync, final boolean logRates)
-   {     
+   {
       this.bufferSize = size;
       this.logRates = logRates;
       if (logRates)
@@ -131,7 +131,6 @@
 
       timerThread.start();
 
-      log.info("log rates "  + logRates);
       if (logRates)
       {
          logRatesTimerTask = new LogRatesTimerTask();
@@ -263,7 +262,7 @@
             flush();
          }
       }
-      
+
       if (buffer.writerIndex() == bufferLimit)
       {
          flush();
@@ -273,47 +272,47 @@
    public void flush()
    {
       ByteBuffer bufferToFlush = null;
-      
+
       boolean useSync = false;
-      
+
       List<IOAsyncTask> callbacksToCall = null;
-      
+
       synchronized (this)
       {
          if (buffer.writerIndex() > 0)
          {
             latchTimer.up();
-   
+
             int pos = buffer.writerIndex();
-   
+
             if (logRates)
             {
                bytesFlushed += pos;
             }
-   
+
             bufferToFlush = bufferObserver.newBuffer(bufferSize, pos);
-   
+
             // Putting a byteArray on a native buffer is much faster, since it will do in a single native call.
             // Using bufferToFlush.put(buffer) would make several append calls for each byte
-   
+
             bufferToFlush.put(buffer.toByteBuffer().array(), 0, pos);
 
             callbacksToCall = callbacks;
-            
+
             callbacks = new LinkedList<IOAsyncTask>();
-   
+
             useSync = pendingSync;
-            
+
             active = false;
             pendingSync = false;
-   
+
             buffer.clear();
             bufferLimit = 0;
 
             flushesDone++;
          }
       }
-      
+
       // Execute the flush outside of the lock
       // This is important for NIO performance while we are using NIO Callbacks
       if (bufferToFlush != null)
@@ -339,7 +338,7 @@
          {
             if (bufferObserver != null)
             {
-                flush();
+               flush();
             }
          }
          finally
@@ -369,7 +368,7 @@
             {
                double rate = 1000 * ((double)bytesFlushed) / (now - lastExecution);
                log.info("Write rate = " + rate + " bytes / sec or " + (long)(rate / (1024 * 1024)) + " MiB / sec");
-               double flushRate = 1000 * ((double)flushesDone) / (now - lastExecution);              
+               double flushRate = 1000 * ((double)flushesDone) / (now - lastExecution);
                log.info("Flush rate = " + flushRate + " flushes / sec");
             }
 
@@ -377,7 +376,7 @@
 
             bytesFlushed = 0;
 
-            flushesDone = 0;           
+            flushesDone = 0;
          }
       }
 

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2009-11-28 03:29:29 UTC (rev 8440)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2009-11-28 08:42:53 UTC (rev 8441)
@@ -186,7 +186,7 @@
    public HandleStatus handle(final MessageReference ref) throws Exception
    {      
       if (availableCredits != null && availableCredits.get() <= 0)
-      {         
+      {                 
          return HandleStatus.BUSY;
       }
 
@@ -416,6 +416,8 @@
 
    public void receiveCredits(final int credits) throws Exception
    {
+      
+      
       if (credits == -1)
       {
          // No flow control
@@ -433,7 +435,7 @@
                   " currentValue = " +
                   availableCredits.get());
          }
-
+         
          if (previous <= 0 && previous + credits > 0)
          {       
             promptDelivery(true);

Modified: trunk/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java	2009-11-28 03:29:29 UTC (rev 8440)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java	2009-11-28 08:42:53 UTC (rev 8441)
@@ -85,7 +85,18 @@
 
       jbcf.setBlockOnPersistentSend(true);
       jbcf.setBlockOnNonPersistentSend(true);
+      
+      //Note we set consumer window size to a value so we can verify that consumer credit re-sending
+      //works properly on failover
+      //The value is small enough that credits will have to be resent several time
+      
+      final int numMessages = 10;
+      
+      final int bodySize = 1000;
+      
+      jbcf.setConsumerWindowSize(numMessages * bodySize / 10);
 
+
       Connection conn = jbcf.createConnection();
 
       MyExceptionListener listener = new MyExceptionListener();
@@ -104,8 +115,7 @@
 
       Queue queue = sess.createQueue("myqueue");
 
-      final int numMessages = 1000;
-
+      
       MessageProducer producer = sess.createProducer(queue);
 
       producer.setDeliveryMode(DeliveryMode.PERSISTENT);



More information about the hornetq-commits mailing list