[hornetq-commits] JBoss hornetq SVN: r8210 - in trunk: tests/src/org/hornetq/tests/integration/client and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Nov 4 11:45:51 EST 2009


Author: timfox
Date: 2009-11-04 11:45:51 -0500 (Wed, 04 Nov 2009)
New Revision: 8210

Added:
   trunk/tests/src/org/hornetq/tests/integration/client/NettyProducerFlowControlTest.java
Modified:
   trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
   trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
   trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
   trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
   trunk/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java
   trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java
Log:
refactored serverconsumerimpl to not pre-calculate + few tweaks

Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java	2009-11-04 16:20:14 UTC (rev 8209)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java	2009-11-04 16:45:51 UTC (rev 8210)
@@ -1342,7 +1342,6 @@
 
    private synchronized HandleStatus handle(final MessageReference reference, final Consumer consumer)
    {
-
       HandleStatus status;
       try
       {

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2009-11-04 16:20:14 UTC (rev 8209)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2009-11-04 16:45:51 UTC (rev 8210)
@@ -29,6 +29,7 @@
 import org.hornetq.core.management.ManagementService;
 import org.hornetq.core.management.Notification;
 import org.hornetq.core.management.NotificationType;
+import org.hornetq.core.message.LargeMessageEncodingContext;
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.postoffice.Binding;
 import org.hornetq.core.postoffice.QueueBinding;
@@ -36,10 +37,15 @@
 import org.hornetq.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
 import org.hornetq.core.remoting.impl.wireformat.SessionReceiveMessage;
 import org.hornetq.core.remoting.spi.HornetQBuffer;
-import org.hornetq.core.server.*;
+import org.hornetq.core.server.HandleStatus;
+import org.hornetq.core.server.LargeServerMessage;
+import org.hornetq.core.server.MessageReference;
+import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.ServerConsumer;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.ServerSession;
 import org.hornetq.core.transaction.Transaction;
 import org.hornetq.core.transaction.impl.TransactionImpl;
-import org.hornetq.core.message.LargeMessageEncodingContext;
 import org.hornetq.utils.TypedProperties;
 
 /**
@@ -88,9 +94,7 @@
 
    private volatile LargeMessageDeliverer largeMessageDeliverer = null;
 
-   // We will only be sending one largeMessage at any time, however during replication you may have
-   // more than one LargeMessage pending on the replicationBuffer
-   private final AtomicInteger pendingLargeMessagesCounter = new AtomicInteger(0);
+   private boolean largeMessageInDelivery;
 
    /**
     * if we are a browse only consumer we don't need to worry about acknowledgemenets or being started/stopeed by the session.
@@ -116,8 +120,7 @@
    private final Binding binding;
 
    // Constructors ---------------------------------------------------------------------------------
-   
-   
+
    public ServerConsumerImpl(final long id,
                              final ServerSession session,
                              final QueueBinding binding,
@@ -131,7 +134,7 @@
                              final Executor executor,
                              final ManagementService managementService) throws Exception
    {
-      
+
       this.id = id;
 
       this.filter = filter;
@@ -180,7 +183,89 @@
 
    public HandleStatus handle(final MessageReference ref) throws Exception
    {
-      return doHandle(ref);
+      if (availableCredits != null && availableCredits.get() <= 0)
+      {
+         return HandleStatus.BUSY;
+      }
+
+      lock.lock();
+
+      try
+      {
+         // If the consumer is stopped then we don't accept the message, it
+         // should go back into the
+         // queue for delivery later.
+         if (!started)
+         {
+            return HandleStatus.BUSY;
+         }
+
+         // note: Since we schedule deliveries to start under replication, we use a counter of pendingLargeMessages.
+
+         // If there is a pendingLargeMessage we can't take another message
+         // This has to be checked inside the lock as the set to null is done inside the lock
+         if (largeMessageInDelivery)
+         {
+            return HandleStatus.BUSY;
+         }
+
+         final ServerMessage message = ref.getMessage();
+
+         if (filter != null && !filter.match(message))
+         {
+            return HandleStatus.NO_MATCH;
+         }
+
+         if (!browseOnly)
+         {
+            if (!preAcknowledge)
+            {
+               deliveringRefs.add(ref);
+            }
+
+            ref.handled();
+
+            ref.incrementDeliveryCount();
+
+            // If updateDeliveries = false (set by strict-update),
+            // the updateDeliveryCount would still be updated after cancel
+            if (updateDeliveries)
+            {
+               if (ref.getMessage().isDurable() && ref.getQueue().isDurable())
+               {
+                  storageManager.updateDeliveryCount(ref);
+               }
+            }
+
+            if (preAcknowledge)
+            {
+               if (message.isLargeMessage())
+               {
+                  // we must hold one reference, or the file will be deleted before it could be delivered
+                  ((LargeServerMessage)message).incrementDelayDeletionCount();
+               }
+
+               // With pre-ack, we ack *before* sending to the client
+               ref.getQueue().acknowledge(ref);
+            }
+
+         }
+
+         if (message.isLargeMessage())
+         {
+            deliverLargeMessage(ref, message);
+         }
+         else
+         {
+            deliverStandardMessage(ref, message);
+         }
+
+         return HandleStatus.HANDLED;
+      }
+      finally
+      {
+         lock.unlock();
+      }
    }
 
    public Filter getFilter()
@@ -248,16 +333,18 @@
     * When the consumer receives such a "forced delivery" message, it discards it
     * and knows that there are no other messages to be delivered.
     */
+
+   // TODO - why is this executed on a different thread?
    public synchronized void forceDelivery(final long sequence)
-   {        
+   {
       // The prompt delivery is called synchronously to ensure the "forced delivery" message is
       // sent after any queue delivery.
       executor.execute(new Runnable()
-      {         
+      {
          public void run()
          {
             promptDelivery(false);
-            
+
             ServerMessage forcedDeliveryMessage = new ServerMessageImpl(storageManager.generateUniqueID());
             forcedDeliveryMessage.setBody(ChannelBuffers.EMPTY_BUFFER);
             forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence);
@@ -310,7 +397,7 @@
       {
          lock.unlock();
       }
-      
+
       // Outside the lock
       if (started)
       {
@@ -332,10 +419,10 @@
          if (trace)
          {
             trace("Received " + credits +
-                      " credits, previous value = " +
-                      previous +
-                      " currentValue = " +
-                      availableCredits.get());
+                  " credits, previous value = " +
+                  previous +
+                  " currentValue = " +
+                  availableCredits.get());
          }
 
          if (previous <= 0 && previous + credits > 0)
@@ -465,106 +552,15 @@
       }
    }
 
-   /**
-    * 
-    */
    private void resumeLargeMessage()
    {
       executor.execute(resumeLargeMessageRunnable);
    }
 
-   private HandleStatus doHandle(final MessageReference ref) throws Exception
+   private void deliverLargeMessage(final MessageReference ref, final ServerMessage message) throws Exception
    {
-      if (availableCredits != null && availableCredits.get() <= 0)
-      {         
-         return HandleStatus.BUSY;
-      }
+      largeMessageInDelivery = true;
 
-      lock.lock();
-
-      try
-      {
-         // If the consumer is stopped then we don't accept the message, it
-         // should go back into the
-         // queue for delivery later.
-         if (!started)
-         {
-            return HandleStatus.BUSY;
-         }
-
-         // note: Since we schedule deliveries to start under replication, we use a counter of pendingLargeMessages.
-
-         // If there is a pendingLargeMessage we can't take another message
-         // This has to be checked inside the lock as the set to null is done inside the lock
-         if (pendingLargeMessagesCounter.get() > 0)
-         {
-            return HandleStatus.BUSY;
-         }
-
-         final ServerMessage message = ref.getMessage();
-         
-         if (filter != null && !filter.match(message))
-         {
-            return HandleStatus.NO_MATCH;
-         }
-
-         if (!browseOnly)
-         {
-            if (!preAcknowledge)
-            {
-               deliveringRefs.add(ref);
-            }
-
-            ref.handled();
-
-            ref.incrementDeliveryCount();
-
-            // If updateDeliveries = false (set by strict-update),
-            // the updateDeliveryCount would still be updated after cancel
-            if (updateDeliveries)
-            {
-               if (ref.getMessage().isDurable() && ref.getQueue().isDurable())
-               {
-                  storageManager.updateDeliveryCount(ref);
-               }
-            }
-
-            if (preAcknowledge)
-            {
-               if (message.isLargeMessage())
-               {
-                  // we must hold one reference, or the file will be deleted before it could be delivered
-                  ((LargeServerMessage)message).incrementDelayDeletionCount();
-               }
-
-               // With pre-ack, we ack *before* sending to the client
-               ref.getQueue().acknowledge(ref);
-            }
-
-         }
-
-         if (message.isLargeMessage())
-         {
-            deliverLargeMessage(ref, message);
-         }
-         else
-         {
-            deliverStandardMessage(ref, message);
-         }
-
-         return HandleStatus.HANDLED;
-      }
-      finally
-      {
-         lock.unlock();
-      }
-   }
-
-   private void deliverLargeMessage(final MessageReference ref, final ServerMessage message)
-      throws Exception
-   {
-      pendingLargeMessagesCounter.incrementAndGet();
-
       final LargeMessageDeliverer localDeliverer = new LargeMessageDeliverer((LargeServerMessage)message, ref);
 
       // it doesn't need lock because deliverLargeMesasge is already inside the lock()
@@ -591,7 +587,7 @@
    // Inner classes
    // ------------------------------------------------------------------------
 
-   final Runnable resumeLargeMessageRunnable = new Runnable()
+   private final Runnable resumeLargeMessageRunnable = new Runnable()
    {
       public void run()
       {
@@ -624,30 +620,29 @@
 
    /** Internal encapsulation of the logic on sending LargeMessages.
     *  This Inner class was created to avoid a bunch of loose properties about the current LargeMessage being sent*/
-   private class LargeMessageDeliverer
+   private final class LargeMessageDeliverer
    {
       private final long sizePendingLargeMessage;
 
       /** The current message being processed */
-      private final LargeServerMessage pendingLargeMessage;
+      private LargeServerMessage largeMessage;
 
       private final MessageReference ref;
 
-      private volatile boolean sentFirstMessage = false;
+      private volatile boolean sentInitialPacket = false;
 
       /** The current position on the message being processed */
       private volatile long positionPendingLargeMessage;
 
       private LargeMessageEncodingContext context;
 
-      public LargeMessageDeliverer(final LargeServerMessage message, final MessageReference ref)
-         throws Exception
+      public LargeMessageDeliverer(final LargeServerMessage message, final MessageReference ref) throws Exception
       {
-         pendingLargeMessage = message;
+         largeMessage = message;
 
-         pendingLargeMessage.incrementDelayDeletionCount();
+         largeMessage.incrementDelayDeletionCount();
 
-         sizePendingLargeMessage = pendingLargeMessage.getLargeBodySize();
+         sizePendingLargeMessage = largeMessage.getLargeBodySize();
 
          this.ref = ref;
       }
@@ -658,7 +653,7 @@
 
          try
          {
-            if (pendingLargeMessage == null)
+            if (largeMessage == null)
             {
                return true;
             }
@@ -667,61 +662,47 @@
             {
                return false;
             }
-            
-            SessionReceiveMessage initialMessage;
 
-            if (sentFirstMessage)
+            if (!sentInitialPacket)
             {
-               initialMessage = null;
-            }
-            else
-            {
-               sentFirstMessage = true;
+               HornetQBuffer headerBuffer = ChannelBuffers.buffer(largeMessage.getHeadersAndPropertiesEncodeSize());
 
-               HornetQBuffer headerBuffer = ChannelBuffers.buffer(pendingLargeMessage.getHeadersAndPropertiesEncodeSize());
+               largeMessage.encodeHeadersAndProperties(headerBuffer);
 
-               pendingLargeMessage.encodeHeadersAndProperties(headerBuffer);
+               SessionReceiveMessage initialPacket = new SessionReceiveMessage(id,
+                                                                               headerBuffer.array(),
+                                                                               largeMessage.getLargeBodySize(),
+                                                                               ref.getDeliveryCount());
 
-               initialMessage = new SessionReceiveMessage(id,
-                                                          headerBuffer.array(),
-                                                          pendingLargeMessage.getLargeBodySize(),
-                                                          ref.getDeliveryCount());
-               context = pendingLargeMessage.createNewContext();
+               context = largeMessage.createNewContext();
+
                context.open();
-            }
 
-            int precalculateAvailableCredits;
-
-            if (availableCredits != null)
-            {
-               // Flow control needs to be done in advance.
-               // If we take out credits as we send, the client would be sending credits back as we are delivering
-               // as a result we would fire up a lot of packets over using the channel.
-               precalculateAvailableCredits = preCalculateFlowControl(initialMessage);
-            }
-            else
-            {
-               precalculateAvailableCredits = 0;
-            }
-
-            if (initialMessage != null)
-            {
-               channel.send(initialMessage);
-
                if (availableCredits != null)
                {
-                  precalculateAvailableCredits -= initialMessage.getRequiredBufferSize();
+                  availableCredits.addAndGet(-initialPacket.getRequiredBufferSize());
                }
-            }
 
-            while (positionPendingLargeMessage < sizePendingLargeMessage)
+               sentInitialPacket = true;
+
+               channel.send(initialPacket);
+
+               // Execute the rest of the large message on a different thread so as not to tie up the delivery thread
+               // for too long
+
+               resumeLargeMessage();
+
+               return false;
+            }
+            else
             {
-               if (precalculateAvailableCredits <= 0 && availableCredits != null)
+               if (availableCredits != null && availableCredits.get() <= 0)
                {
                   if (trace)
                   {
                      trace("deliverLargeMessage: Leaving loop of send LargeMessage because of credits");
                   }
+
                   return false;
                }
 
@@ -731,10 +712,7 @@
 
                if (availableCredits != null)
                {
-                  if ((precalculateAvailableCredits -= chunk.getRequiredBufferSize()) < 0)
-                  {
-                     log.warn("Flowcontrol logic is not working properly, too many credits were taken");
-                  }
+                  availableCredits.addAndGet(-chunk.getRequiredBufferSize());
                }
 
                if (trace)
@@ -747,18 +725,20 @@
                channel.send(chunk);
 
                positionPendingLargeMessage += chunkLen;
-            }
 
-            if (precalculateAvailableCredits != 0)
-            {
-               log.warn("Flowcontrol logic is not working properly... creidts = " + precalculateAvailableCredits);
+               if (positionPendingLargeMessage < sizePendingLargeMessage)
+               {
+                  resumeLargeMessage();
+
+                  return false;
+               }
             }
 
             if (trace)
             {
                trace("Finished deliverLargeMessage");
             }
-            context.close();
+
             finish();
 
             return true;
@@ -774,55 +754,31 @@
        */
       public void finish() throws Exception
       {
-         pendingLargeMessage.releaseResources();
-         
-         pendingLargeMessage.decrementDelayDeletionCount();
-
-         if (preAcknowledge && !browseOnly)
+         lock.lock();
+         try
          {
-            // PreAck will have an extra reference
-            pendingLargeMessage.decrementDelayDeletionCount();
-         }
+            context.close();
 
-         largeMessageDeliverer = null;
+            largeMessage.releaseResources();
 
-         pendingLargeMessagesCounter.decrementAndGet();
-      }
+            largeMessage.decrementDelayDeletionCount();
 
-      /**
-       * Credits flow control are calculated in advance.
-       * @return
-       */
-      private int preCalculateFlowControl(SessionReceiveMessage firstPacket)
-      {
-         while (true)
-         {
-            final int currentCredit = availableCredits.get();
-            int precalculatedCredits = 0;
-
-            if (firstPacket != null)
+            if (preAcknowledge && !browseOnly)
             {
-               precalculatedCredits = firstPacket.getRequiredBufferSize();
+               // PreAck will have an extra reference
+               largeMessage.decrementDelayDeletionCount();
             }
 
-            long chunkLen = 0;
-            for (long i = positionPendingLargeMessage; precalculatedCredits < currentCredit && i < sizePendingLargeMessage; i += chunkLen)
-            {
-               chunkLen = (int)Math.min(sizePendingLargeMessage - i, minLargeMessageSize);
-               precalculatedCredits += chunkLen + SessionReceiveContinuationMessage.SESSION_RECEIVE_CONTINUATION_BASE_SIZE;
-            }
+            largeMessageDeliverer = null;
 
-            // The calculation of credits and taking credits out has to be taken atomically.
-            // Since we are not sending anything to the client during this calculation, this is unlikely to happen
-            if (availableCredits.compareAndSet(currentCredit, currentCredit - precalculatedCredits))
-            {
-               if (trace)
-               {
-                  log.trace("Taking " + precalculatedCredits + " credits out on preCalculateFlowControl (largeMessage)");
-               }
-               return precalculatedCredits;
-            }
+            largeMessageInDelivery = false;
+
+            largeMessage = null;
          }
+         finally
+         {
+            lock.unlock();
+         }
       }
 
       private SessionReceiveContinuationMessage createChunkSend(LargeMessageEncodingContext context)
@@ -835,8 +791,8 @@
 
          HornetQBuffer bodyBuffer = ChannelBuffers.buffer(localChunkLen);
 
-         //pendingLargeMessage.encodeBody(bodyBuffer, positionPendingLargeMessage, localChunkLen);
-         pendingLargeMessage.encodeBody(bodyBuffer, context, localChunkLen);
+         // pendingLargeMessage.encodeBody(bodyBuffer, positionPendingLargeMessage, localChunkLen);
+         largeMessage.encodeBody(bodyBuffer, context, localChunkLen);
 
          chunk = new SessionReceiveContinuationMessage(id,
                                                        bodyBuffer.array(),
@@ -858,7 +814,7 @@
 
       private final Iterator<MessageReference> iterator;
 
-      public void run()
+      public synchronized void run()
       {
          // if the reference was busy during the previous iteration, handle it now
          if (current != null)
@@ -866,14 +822,17 @@
             try
             {
                HandleStatus status = handle(current);
+
                if (status == HandleStatus.BUSY)
                {
                   return;
                }
+
+               current = null;
             }
             catch (Exception e)
             {
-               log.warn("Exception while browser handled from " + messageQueue + ": " + current, e);
+               log.error("Exception while browser handled from " + messageQueue + ": " + current, e);
                return;
             }
          }
@@ -894,10 +853,11 @@
             }
             catch (Exception e)
             {
-               log.warn("Exception while browser handled from " + messageQueue + ": " + ref, e);
+               log.error("Exception while browser handled from " + messageQueue + ": " + ref, e);
                break;
             }
          }
+
       }
 
    }

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2009-11-04 16:20:14 UTC (rev 8209)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2009-11-04 16:45:51 UTC (rev 8210)
@@ -165,7 +165,7 @@
    private volatile LargeServerMessage currentLargeMessage;
 
    private ServerSessionPacketHandler handler;
-   
+
    private boolean closed;
 
    // Constructors ---------------------------------------------------------------------------------
@@ -368,17 +368,17 @@
          Filter filter = FilterImpl.createFilter(filterString);;
 
          ServerConsumer consumer = new ServerConsumerImpl(packet.getID(),
-                                                          this,
-                                                          (QueueBinding)binding,
-                                                          filter,
-                                                          started,
-                                                          browseOnly,
-                                                          storageManager,
-                                                          channel,
-                                                          preAcknowledge,
-                                                          updateDeliveries,
-                                                          executor,
-                                                          managementService);
+                                                           this,
+                                                           (QueueBinding)binding,
+                                                           filter,
+                                                           started,
+                                                           browseOnly,
+                                                           storageManager,
+                                                           channel,
+                                                           preAcknowledge,
+                                                           updateDeliveries,
+                                                           executor,
+                                                           managementService);
 
          consumers.put(consumer.getID(), consumer);
 
@@ -1063,9 +1063,9 @@
             response = new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
          }
          else
-         {                     
+         {
             Transaction theTx = resourceManager.removeTransaction(xid);
-            
+
             if (theTx == null)
             {
                // checked heuristic committed transactions
@@ -1856,7 +1856,7 @@
    private void doRollback(final boolean lastMessageAsDelived, final Transaction theTx) throws Exception
    {
       boolean wasStarted = started;
-      
+
       List<MessageReference> toCancel = new ArrayList<MessageReference>();
 
       for (ServerConsumer consumer : consumers.values())
@@ -1873,7 +1873,7 @@
       {
          ref.getQueue().cancel(theTx, ref);
       }
-            
+
       theTx.rollback();
 
       if (wasStarted)

Modified: trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java	2009-11-04 16:20:14 UTC (rev 8209)
+++ trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java	2009-11-04 16:45:51 UTC (rev 8210)
@@ -14,8 +14,6 @@
 package org.hornetq.tests.integration.client;
 
 import java.util.HashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
 
 import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
@@ -28,7 +26,6 @@
 import org.hornetq.core.client.ClientProducer;
 import org.hornetq.core.client.ClientSession;
 import org.hornetq.core.client.ClientSessionFactory;
-import org.hornetq.core.client.MessageHandler;
 import org.hornetq.core.client.impl.ClientConsumerInternal;
 import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
 import org.hornetq.core.config.Configuration;
@@ -286,7 +283,6 @@
          server.start();
 
          ClientSessionFactory sf = createFactory(isNetty());
-         
 
          session = sf.createSession(false, false, false);
 

Added: trunk/tests/src/org/hornetq/tests/integration/client/NettyProducerFlowControlTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/NettyProducerFlowControlTest.java	                        (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/client/NettyProducerFlowControlTest.java	2009-11-04 16:45:51 UTC (rev 8210)
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.client;
+
+/**
+ * A NettyProducerFlowControlTest
+ *
+ * @author tim
+ *
+ *
+ */
+public class NettyProducerFlowControlTest extends ProducerFlowControlTest
+{
+   @Override
+   protected boolean isNetty()
+   {
+      return true;
+   }
+
+}

Modified: trunk/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java	2009-11-04 16:20:14 UTC (rev 8209)
+++ trunk/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java	2009-11-04 16:45:51 UTC (rev 8210)
@@ -44,125 +44,104 @@
 {
    private static final Logger log = Logger.getLogger(ProducerFlowControlTest.class);
 
+   protected boolean isNetty()
+   {
+      return false;
+   }
+
    // TODO need to test crashing a producer with unused credits returns them to the pool
 
    public void testFlowControlSingleConsumer() throws Exception
    {
-      testFlowControl(false, 1000, 500, 10 * 1024, 1024, 1024, 1024, 1, 1, 0, false);
+      testFlowControl(1000, 500, 10 * 1024, 1024, 1024, 1024, 1, 1, 0, false);
    }
 
    public void testFlowControlAnon() throws Exception
    {
-      testFlowControl(false, 1000, 500, 10 * 1024, 1024, 1024, 1024, 1, 1, 0, true);
+      testFlowControl(1000, 500, 10 * 1024, 1024, 1024, 1024, 1, 1, 0, true);
    }
 
    public void testFlowControlSingleConsumerLargeMaxSize() throws Exception
    {
-      testFlowControl(false, 1000, 500, 1024 * 1024, 1024, 1024, 1024, 1, 1, 0, false);
+      testFlowControl(1000, 500, 1024 * 1024, 1024, 1024, 1024, 1, 1, 0, false);
    }
 
    public void testFlowControlMultipleConsumers() throws Exception
    {
-      testFlowControl(false, 1000, 500, 10 * 1024, 1024, 1024, 1024, 5, 1, 0, false);
+      testFlowControl(1000, 500, 10 * 1024, 1024, 1024, 1024, 5, 1, 0, false);
    }
 
    public void testFlowControlZeroConsumerWindowSize() throws Exception
    {
-      testFlowControl(false, 1000, 500, 10 * 1024, 1024, 0, 1024, 1, 1, 0, false);
+      testFlowControl(1000, 500, 10 * 1024, 1024, 0, 1024, 1, 1, 0, false);
    }
 
    public void testFlowControlZeroAckBatchSize() throws Exception
    {
-      testFlowControl(false, 1000, 500, 10 * 1024, 1024, 1024, 0, 1, 1, 0, false);
+      testFlowControl(1000, 500, 10 * 1024, 1024, 1024, 0, 1, 1, 0, false);
    }
 
    public void testFlowControlSingleConsumerSlowConsumer() throws Exception
    {
-      testFlowControl(false, 100, 500, 1024, 512, 512, 512, 1, 1, 10, false);
+      testFlowControl(100, 500, 1024, 512, 512, 512, 1, 1, 10, false);
    }
 
    public void testFlowControlSmallMessages() throws Exception
    {
-      testFlowControl(false, 1000, 0, 10 * 1024, 1024, 1024, 1024, 1, 1, 0, false);
+      testFlowControl(1000, 0, 10 * 1024, 1024, 1024, 1024, 1, 1, 0, false);
    }
 
    public void testFlowControlLargerMessagesSmallWindowSize() throws Exception
    {
-      testFlowControl(false, 1000, 10 * 1024, 10 * 1024, 1024, 1024, 1024, 1, 1, 0, false);
+      testFlowControl(1000, 10 * 1024, 10 * 1024, 1024, 1024, 1024, 1, 1, 0, false);
    }
 
    public void testFlowControlMultipleProducers() throws Exception
    {
-      testFlowControl(false, 1000, 500, 1024 * 1024, 1024, 1024, 1024, 1, 5, 0, false);
+      testFlowControl(1000, 500, 1024 * 1024, 1024, 1024, 1024, 1, 5, 0, false);
    }
 
    public void testFlowControlMultipleProducersAndConsumers() throws Exception
    {
-      testFlowControl(false, 500, 500, 100 * 1024, 1024, 1024, 1024, 1, 3, 3, false);
+      testFlowControl(500, 500, 100 * 1024, 1024, 1024, 1024, 1, 3, 3, false);
    }
 
    public void testFlowControlMultipleProducersAnon() throws Exception
    {
-      testFlowControl(false, 1000, 500, 1024 * 1024, 1024, 1024, 1024, 1, 5, 0, true);
+      testFlowControl(1000, 500, 1024 * 1024, 1024, 1024, 1024, 1, 5, 0, true);
    }
 
-   public void testFlowControlSingleConsumerNetty() throws Exception
+   public void testFlowControlLargeMessages2() throws Exception
    {
-      testFlowControl(true, 1000, 500, 10 * 1024, 1024, 1024, 1024, 1, 1, 0, false);
+      testFlowControl(1000, 10000, -1, 1024, 0, 0, 1, 1, 0, false, 1000, true);
    }
 
-   public void testFlowControlSingleConsumerLargeMaxSizeNetty() throws Exception
+   public void testFlowControlLargeMessages3() throws Exception
    {
-      testFlowControl(true, 1000, 500, 1024 * 1024, 1024, 1024, 1024, 1, 1, 0, false);
+      testFlowControl(1000, 10000, 100 * 1024, 1024, 1024, 0, 1, 1, 0, false, 1000, true);
    }
 
-   public void testFlowControlMultipleConsumersNetty() throws Exception
+   public void testFlowControlLargeMessages4() throws Exception
    {
-      testFlowControl(true, 1000, 500, 10 * 1024, 1024, 1024, 1024, 5, 1, 0, false);
+      testFlowControl(1000, 10000, 100 * 1024, 1024, 1024, 1024, 1, 1, 0, false, 1000, true);
    }
 
-   public void testFlowControlZeroConsumerWindowSizeNetty() throws Exception
+   public void testFlowControlLargeMessages5() throws Exception
    {
-      testFlowControl(true, 1000, 500, 10 * 1024, 1024, 0, 1024, 1, 1, 0, false);
+      testFlowControl(1000, 10000, 100 * 1024, 1024, -1, 1024, 1, 1, 0, false, 1000, true);
    }
 
-   public void testFlowControlZeroAckBatchSizeNetty() throws Exception
+   public void testFlowControlLargeMessages6() throws Exception
    {
-      testFlowControl(true, 1000, 500, 10 * 1024, 1024, 1024, 0, 1, 1, 0, false);
+      testFlowControl(1000, 10000, 100 * 1024, 1024, 1024, 1024, 1, 1, 0, true, 1000, true);
    }
-
-   public void testFlowControlSingleConsumerSlowConsumerNetty() throws Exception
+   
+   public void testFlowControlLargeMessages7() throws Exception
    {
-      testFlowControl(true, 100, 500, 1024, 512, 512, 512, 1, 1, 10, false);
+      testFlowControl(1000, 10000, 100 * 1024, 1024, 1024, 1024, 2, 2, 0, true, 1000, true);
    }
 
-   public void testFlowControlSmallMessagesNetty() throws Exception
-   {
-      testFlowControl(true, 1000, 0, 10 * 1024, 1024, 1024, 1024, 1, 1, 0, false);
-   }
-
-   public void testFlowControlLargerMessagesSmallWindowSizeNetty() throws Exception
-   {
-      testFlowControl(true, 1000, 10 * 1024, 10 * 1024, 1024, 1024, 1024, 1, 1, 0, false);
-   }
-
-   public void testFlowControlMultipleProducersNetty() throws Exception
-   {
-      testFlowControl(true, 1000, 500, 1024 * 1024, 1024, 1024, 1024, 1, 5, 0, false);
-   }
-
-   public void testFlowControlMultipleProducersAndConsumersNetty() throws Exception
-   {
-      testFlowControl(false, 500, 500, 100 * 1024, 1024, 1024, 1024, 1, 3, 3, false);
-   }
-
-   public void testFlowControlMultipleProducersAnonNetty() throws Exception
-   {
-      testFlowControl(true, 1000, 500, 1024 * 1024, 1024, 1024, 1024, 1, 5, 0, true);
-   }
-
-   private void testFlowControl(final boolean netty,
-                                final int numMessages,
+   private void testFlowControl(final int numMessages,
                                 final int messageSize,
                                 final int maxSize,
                                 final int producerWindowSize,
@@ -173,8 +152,7 @@
                                 final long consumerDelay,
                                 final boolean anon) throws Exception
    {
-      testFlowControl(netty,
-                      numMessages,
+      testFlowControl(numMessages,
                       messageSize,
                       maxSize,
                       producerWindowSize,
@@ -187,19 +165,8 @@
                       -1,
                       false);
    }
-   
-//   public void testFlowControlLargeMessages() throws Exception
-//   {
-//      testFlowControl(true, 1000, 10000, 100 * 1024, 1024, 1024, 0, 1, 1, 0, false, 1000, true);
-//   }
-//   
-//   public void testFlowControlLargeMessages2() throws Exception
-//   {
-//      testFlowControl(true, 1000, 10000, -1, 1024, 0, 0, 1, 1, 0, false, 1000, true);
-//   }
 
-   private void testFlowControl(final boolean netty,
-                                final int numMessages,
+   private void testFlowControl(final int numMessages,
                                 final int messageSize,
                                 final int maxSize,
                                 final int producerWindowSize,
@@ -214,7 +181,7 @@
    {
       final SimpleString address = new SimpleString("testaddress");
 
-      Configuration config = super.createDefaultConfig(netty);
+      Configuration config = super.createDefaultConfig(isNetty());
 
       HornetQServer server = createServer(realFiles, config);
 
@@ -229,7 +196,7 @@
 
       ClientSessionFactory sf;
 
-      if (netty)
+      if (isNetty())
       {
          sf = createNettyFactory();
       }
@@ -248,7 +215,7 @@
       }
 
       ClientSession session = sf.createSession(false, true, true, true);
-      
+
       session.start();
 
       final String queueName = "testqueue";
@@ -258,7 +225,6 @@
          session.createQueue(address, new SimpleString(queueName + i), null, false);
       }
 
-      
       class MyHandler implements MessageHandler
       {
          int count = 0;
@@ -271,16 +237,16 @@
          {
             try
             {
-               log.info("got message " + count);
-               
+               // log.info("got message " + count);
+
                int availBytes = message.getBody().readableBytes();
-               
+
                assertEquals(messageSize, availBytes);
-               
+
                byte[] bytes = new byte[availBytes];
-               
+
                message.getBody().readBytes(bytes);
-               
+
                message.acknowledge();
 
                if (++count == numMessages * numProducers)
@@ -309,7 +275,7 @@
          handlers[i] = new MyHandler();
 
          log.info("created consumer");
-         
+
          ClientConsumer consumer = session.createConsumer(new SimpleString(queueName + i));
 
          consumer.setMessageHandler(handlers[i]);
@@ -346,16 +312,15 @@
             else
             {
                producers[j].send(message);
-               
-               //log.info("sent message " + i);
+
+               // log.info("sent message " + i);
             }
 
          }
       }
-      
-      log.info("sent messages");
-      
-      
+
+      // log.info("sent messages");
+
       for (int i = 0; i < numConsumers; i++)
       {
          handlers[i].latch.await();

Modified: trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java	2009-11-04 16:20:14 UTC (rev 8209)
+++ trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java	2009-11-04 16:45:51 UTC (rev 8210)
@@ -214,7 +214,7 @@
          }
 
          sendMessages(numberOfMessages, numberOfBytes, delayDelivery, session, producer);
-
+         
          if (isXA)
          {
             session.end(xid, XAResource.TMSUCCESS);
@@ -270,9 +270,6 @@
 
          for (int iteration = testBrowser ? 0 : 1; iteration < 2; iteration++)
          {
-
-            log.debug("Iteration: " + iteration);
-
             session.stop();
 
             // first time with a browser
@@ -289,11 +286,8 @@
 
                   public void onMessage(final ClientMessage message)
                   {
-
                      try
                      {
-                        log.debug("Message on consumer: " + msgCounter);
-
                         if (delayDelivery > 0)
                         {
                            long originalTime = (Long)message.getProperty(new SimpleString("original-time"));
@@ -307,7 +301,7 @@
                         }
 
                         assertNotNull(message);
-
+                        
                         if (delayDelivery <= 0)
                         {
                            // right now there is no guarantee of ordered delivered on multiple scheduledMessages with
@@ -365,7 +359,7 @@
                                  log.debug("Read " + b + " bytes");
                               }
 
-                              assertEquals("byte pos" + b + " is incorrect", getSamplebyte(b), buffer.readByte());
+                              assertEquals(getSamplebyte(b), buffer.readByte());
                            }
                         }
                      }
@@ -389,7 +383,6 @@
 
                assertTrue(latchDone.await(waitOnConsumer, TimeUnit.SECONDS));
                assertEquals(0, errors.get());
-
             }
             else
             {
@@ -498,8 +491,8 @@
                   session.start(xid, XAResource.TMNOFLAGS);
                }
                else
-               {
-                  session.rollback();
+               {    
+                  session.rollback();                 
                }
             }
             else



More information about the hornetq-commits mailing list