Author: timfox
Date: 2009-11-04 11:45:51 -0500 (Wed, 04 Nov 2009)
New Revision: 8210
Added:
trunk/tests/src/org/hornetq/tests/integration/client/NettyProducerFlowControlTest.java
Modified:
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
trunk/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java
trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java
Log:
refactored serverconsumerimpl to not pre-calculate + few tweaks
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-11-04 16:20:14 UTC
(rev 8209)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-11-04 16:45:51 UTC
(rev 8210)
@@ -1342,7 +1342,6 @@
private synchronized HandleStatus handle(final MessageReference reference, final
Consumer consumer)
{
-
HandleStatus status;
try
{
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-11-04
16:20:14 UTC (rev 8209)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-11-04
16:45:51 UTC (rev 8210)
@@ -29,6 +29,7 @@
import org.hornetq.core.management.ManagementService;
import org.hornetq.core.management.Notification;
import org.hornetq.core.management.NotificationType;
+import org.hornetq.core.message.LargeMessageEncodingContext;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.QueueBinding;
@@ -36,10 +37,15 @@
import org.hornetq.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionReceiveMessage;
import org.hornetq.core.remoting.spi.HornetQBuffer;
-import org.hornetq.core.server.*;
+import org.hornetq.core.server.HandleStatus;
+import org.hornetq.core.server.LargeServerMessage;
+import org.hornetq.core.server.MessageReference;
+import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.ServerConsumer;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.ServerSession;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.core.transaction.impl.TransactionImpl;
-import org.hornetq.core.message.LargeMessageEncodingContext;
import org.hornetq.utils.TypedProperties;
/**
@@ -88,9 +94,7 @@
private volatile LargeMessageDeliverer largeMessageDeliverer = null;
- // We will only be sending one largeMessage at any time, however during replication
you may have
- // more than one LargeMessage pending on the replicationBuffer
- private final AtomicInteger pendingLargeMessagesCounter = new AtomicInteger(0);
+ private boolean largeMessageInDelivery;
/**
* if we are a browse only consumer we don't need to worry about acknowledgemenets
or being started/stopeed by the session.
@@ -116,8 +120,7 @@
private final Binding binding;
// Constructors
---------------------------------------------------------------------------------
-
-
+
public ServerConsumerImpl(final long id,
final ServerSession session,
final QueueBinding binding,
@@ -131,7 +134,7 @@
final Executor executor,
final ManagementService managementService) throws Exception
{
-
+
this.id = id;
this.filter = filter;
@@ -180,7 +183,89 @@
public HandleStatus handle(final MessageReference ref) throws Exception
{
- return doHandle(ref);
+ if (availableCredits != null && availableCredits.get() <= 0)
+ {
+ return HandleStatus.BUSY;
+ }
+
+ lock.lock();
+
+ try
+ {
+ // If the consumer is stopped then we don't accept the message, it
+ // should go back into the
+ // queue for delivery later.
+ if (!started)
+ {
+ return HandleStatus.BUSY;
+ }
+
+ // note: Since we schedule deliveries to start under replication, we use a
counter of pendingLargeMessages.
+
+ // If there is a pendingLargeMessage we can't take another message
+ // This has to be checked inside the lock as the set to null is done inside the
lock
+ if (largeMessageInDelivery)
+ {
+ return HandleStatus.BUSY;
+ }
+
+ final ServerMessage message = ref.getMessage();
+
+ if (filter != null && !filter.match(message))
+ {
+ return HandleStatus.NO_MATCH;
+ }
+
+ if (!browseOnly)
+ {
+ if (!preAcknowledge)
+ {
+ deliveringRefs.add(ref);
+ }
+
+ ref.handled();
+
+ ref.incrementDeliveryCount();
+
+ // If updateDeliveries = false (set by strict-update),
+ // the updateDeliveryCount would still be updated after cancel
+ if (updateDeliveries)
+ {
+ if (ref.getMessage().isDurable() && ref.getQueue().isDurable())
+ {
+ storageManager.updateDeliveryCount(ref);
+ }
+ }
+
+ if (preAcknowledge)
+ {
+ if (message.isLargeMessage())
+ {
+ // we must hold one reference, or the file will be deleted before it
could be delivered
+ ((LargeServerMessage)message).incrementDelayDeletionCount();
+ }
+
+ // With pre-ack, we ack *before* sending to the client
+ ref.getQueue().acknowledge(ref);
+ }
+
+ }
+
+ if (message.isLargeMessage())
+ {
+ deliverLargeMessage(ref, message);
+ }
+ else
+ {
+ deliverStandardMessage(ref, message);
+ }
+
+ return HandleStatus.HANDLED;
+ }
+ finally
+ {
+ lock.unlock();
+ }
}
public Filter getFilter()
@@ -248,16 +333,18 @@
* When the consumer receives such a "forced delivery" message, it discards
it
* and knows that there are no other messages to be delivered.
*/
+
+ // TODO - why is this executed on a different thread?
public synchronized void forceDelivery(final long sequence)
- {
+ {
// The prompt delivery is called synchronously to ensure the "forced
delivery" message is
// sent after any queue delivery.
executor.execute(new Runnable()
- {
+ {
public void run()
{
promptDelivery(false);
-
+
ServerMessage forcedDeliveryMessage = new
ServerMessageImpl(storageManager.generateUniqueID());
forcedDeliveryMessage.setBody(ChannelBuffers.EMPTY_BUFFER);
forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE,
sequence);
@@ -310,7 +397,7 @@
{
lock.unlock();
}
-
+
// Outside the lock
if (started)
{
@@ -332,10 +419,10 @@
if (trace)
{
trace("Received " + credits +
- " credits, previous value = " +
- previous +
- " currentValue = " +
- availableCredits.get());
+ " credits, previous value = " +
+ previous +
+ " currentValue = " +
+ availableCredits.get());
}
if (previous <= 0 && previous + credits > 0)
@@ -465,106 +552,15 @@
}
}
- /**
- *
- */
private void resumeLargeMessage()
{
executor.execute(resumeLargeMessageRunnable);
}
- private HandleStatus doHandle(final MessageReference ref) throws Exception
+ private void deliverLargeMessage(final MessageReference ref, final ServerMessage
message) throws Exception
{
- if (availableCredits != null && availableCredits.get() <= 0)
- {
- return HandleStatus.BUSY;
- }
+ largeMessageInDelivery = true;
- lock.lock();
-
- try
- {
- // If the consumer is stopped then we don't accept the message, it
- // should go back into the
- // queue for delivery later.
- if (!started)
- {
- return HandleStatus.BUSY;
- }
-
- // note: Since we schedule deliveries to start under replication, we use a
counter of pendingLargeMessages.
-
- // If there is a pendingLargeMessage we can't take another message
- // This has to be checked inside the lock as the set to null is done inside the
lock
- if (pendingLargeMessagesCounter.get() > 0)
- {
- return HandleStatus.BUSY;
- }
-
- final ServerMessage message = ref.getMessage();
-
- if (filter != null && !filter.match(message))
- {
- return HandleStatus.NO_MATCH;
- }
-
- if (!browseOnly)
- {
- if (!preAcknowledge)
- {
- deliveringRefs.add(ref);
- }
-
- ref.handled();
-
- ref.incrementDeliveryCount();
-
- // If updateDeliveries = false (set by strict-update),
- // the updateDeliveryCount would still be updated after cancel
- if (updateDeliveries)
- {
- if (ref.getMessage().isDurable() && ref.getQueue().isDurable())
- {
- storageManager.updateDeliveryCount(ref);
- }
- }
-
- if (preAcknowledge)
- {
- if (message.isLargeMessage())
- {
- // we must hold one reference, or the file will be deleted before it
could be delivered
- ((LargeServerMessage)message).incrementDelayDeletionCount();
- }
-
- // With pre-ack, we ack *before* sending to the client
- ref.getQueue().acknowledge(ref);
- }
-
- }
-
- if (message.isLargeMessage())
- {
- deliverLargeMessage(ref, message);
- }
- else
- {
- deliverStandardMessage(ref, message);
- }
-
- return HandleStatus.HANDLED;
- }
- finally
- {
- lock.unlock();
- }
- }
-
- private void deliverLargeMessage(final MessageReference ref, final ServerMessage
message)
- throws Exception
- {
- pendingLargeMessagesCounter.incrementAndGet();
-
final LargeMessageDeliverer localDeliverer = new
LargeMessageDeliverer((LargeServerMessage)message, ref);
// it doesn't need lock because deliverLargeMesasge is already inside the
lock()
@@ -591,7 +587,7 @@
// Inner classes
// ------------------------------------------------------------------------
- final Runnable resumeLargeMessageRunnable = new Runnable()
+ private final Runnable resumeLargeMessageRunnable = new Runnable()
{
public void run()
{
@@ -624,30 +620,29 @@
/** Internal encapsulation of the logic on sending LargeMessages.
* This Inner class was created to avoid a bunch of loose properties about the
current LargeMessage being sent*/
- private class LargeMessageDeliverer
+ private final class LargeMessageDeliverer
{
private final long sizePendingLargeMessage;
/** The current message being processed */
- private final LargeServerMessage pendingLargeMessage;
+ private LargeServerMessage largeMessage;
private final MessageReference ref;
- private volatile boolean sentFirstMessage = false;
+ private volatile boolean sentInitialPacket = false;
/** The current position on the message being processed */
private volatile long positionPendingLargeMessage;
private LargeMessageEncodingContext context;
- public LargeMessageDeliverer(final LargeServerMessage message, final
MessageReference ref)
- throws Exception
+ public LargeMessageDeliverer(final LargeServerMessage message, final
MessageReference ref) throws Exception
{
- pendingLargeMessage = message;
+ largeMessage = message;
- pendingLargeMessage.incrementDelayDeletionCount();
+ largeMessage.incrementDelayDeletionCount();
- sizePendingLargeMessage = pendingLargeMessage.getLargeBodySize();
+ sizePendingLargeMessage = largeMessage.getLargeBodySize();
this.ref = ref;
}
@@ -658,7 +653,7 @@
try
{
- if (pendingLargeMessage == null)
+ if (largeMessage == null)
{
return true;
}
@@ -667,61 +662,47 @@
{
return false;
}
-
- SessionReceiveMessage initialMessage;
- if (sentFirstMessage)
+ if (!sentInitialPacket)
{
- initialMessage = null;
- }
- else
- {
- sentFirstMessage = true;
+ HornetQBuffer headerBuffer =
ChannelBuffers.buffer(largeMessage.getHeadersAndPropertiesEncodeSize());
- HornetQBuffer headerBuffer =
ChannelBuffers.buffer(pendingLargeMessage.getHeadersAndPropertiesEncodeSize());
+ largeMessage.encodeHeadersAndProperties(headerBuffer);
- pendingLargeMessage.encodeHeadersAndProperties(headerBuffer);
+ SessionReceiveMessage initialPacket = new SessionReceiveMessage(id,
+
headerBuffer.array(),
+
largeMessage.getLargeBodySize(),
+
ref.getDeliveryCount());
- initialMessage = new SessionReceiveMessage(id,
- headerBuffer.array(),
-
pendingLargeMessage.getLargeBodySize(),
- ref.getDeliveryCount());
- context = pendingLargeMessage.createNewContext();
+ context = largeMessage.createNewContext();
+
context.open();
- }
- int precalculateAvailableCredits;
-
- if (availableCredits != null)
- {
- // Flow control needs to be done in advance.
- // If we take out credits as we send, the client would be sending credits
back as we are delivering
- // as a result we would fire up a lot of packets over using the channel.
- precalculateAvailableCredits = preCalculateFlowControl(initialMessage);
- }
- else
- {
- precalculateAvailableCredits = 0;
- }
-
- if (initialMessage != null)
- {
- channel.send(initialMessage);
-
if (availableCredits != null)
{
- precalculateAvailableCredits -=
initialMessage.getRequiredBufferSize();
+ availableCredits.addAndGet(-initialPacket.getRequiredBufferSize());
}
- }
- while (positionPendingLargeMessage < sizePendingLargeMessage)
+ sentInitialPacket = true;
+
+ channel.send(initialPacket);
+
+ // Execute the rest of the large message on a different thread so as not
to tie up the delivery thread
+ // for too long
+
+ resumeLargeMessage();
+
+ return false;
+ }
+ else
{
- if (precalculateAvailableCredits <= 0 && availableCredits !=
null)
+ if (availableCredits != null && availableCredits.get() <= 0)
{
if (trace)
{
trace("deliverLargeMessage: Leaving loop of send LargeMessage
because of credits");
}
+
return false;
}
@@ -731,10 +712,7 @@
if (availableCredits != null)
{
- if ((precalculateAvailableCredits -= chunk.getRequiredBufferSize())
< 0)
- {
- log.warn("Flowcontrol logic is not working properly, too many
credits were taken");
- }
+ availableCredits.addAndGet(-chunk.getRequiredBufferSize());
}
if (trace)
@@ -747,18 +725,20 @@
channel.send(chunk);
positionPendingLargeMessage += chunkLen;
- }
- if (precalculateAvailableCredits != 0)
- {
- log.warn("Flowcontrol logic is not working properly... creidts =
" + precalculateAvailableCredits);
+ if (positionPendingLargeMessage < sizePendingLargeMessage)
+ {
+ resumeLargeMessage();
+
+ return false;
+ }
}
if (trace)
{
trace("Finished deliverLargeMessage");
}
- context.close();
+
finish();
return true;
@@ -774,55 +754,31 @@
*/
public void finish() throws Exception
{
- pendingLargeMessage.releaseResources();
-
- pendingLargeMessage.decrementDelayDeletionCount();
-
- if (preAcknowledge && !browseOnly)
+ lock.lock();
+ try
{
- // PreAck will have an extra reference
- pendingLargeMessage.decrementDelayDeletionCount();
- }
+ context.close();
- largeMessageDeliverer = null;
+ largeMessage.releaseResources();
- pendingLargeMessagesCounter.decrementAndGet();
- }
+ largeMessage.decrementDelayDeletionCount();
- /**
- * Credits flow control are calculated in advance.
- * @return
- */
- private int preCalculateFlowControl(SessionReceiveMessage firstPacket)
- {
- while (true)
- {
- final int currentCredit = availableCredits.get();
- int precalculatedCredits = 0;
-
- if (firstPacket != null)
+ if (preAcknowledge && !browseOnly)
{
- precalculatedCredits = firstPacket.getRequiredBufferSize();
+ // PreAck will have an extra reference
+ largeMessage.decrementDelayDeletionCount();
}
- long chunkLen = 0;
- for (long i = positionPendingLargeMessage; precalculatedCredits <
currentCredit && i < sizePendingLargeMessage; i += chunkLen)
- {
- chunkLen = (int)Math.min(sizePendingLargeMessage - i,
minLargeMessageSize);
- precalculatedCredits += chunkLen +
SessionReceiveContinuationMessage.SESSION_RECEIVE_CONTINUATION_BASE_SIZE;
- }
+ largeMessageDeliverer = null;
- // The calculation of credits and taking credits out has to be taken
atomically.
- // Since we are not sending anything to the client during this calculation,
this is unlikely to happen
- if (availableCredits.compareAndSet(currentCredit, currentCredit -
precalculatedCredits))
- {
- if (trace)
- {
- log.trace("Taking " + precalculatedCredits + " credits
out on preCalculateFlowControl (largeMessage)");
- }
- return precalculatedCredits;
- }
+ largeMessageInDelivery = false;
+
+ largeMessage = null;
}
+ finally
+ {
+ lock.unlock();
+ }
}
private SessionReceiveContinuationMessage
createChunkSend(LargeMessageEncodingContext context)
@@ -835,8 +791,8 @@
HornetQBuffer bodyBuffer = ChannelBuffers.buffer(localChunkLen);
- //pendingLargeMessage.encodeBody(bodyBuffer, positionPendingLargeMessage,
localChunkLen);
- pendingLargeMessage.encodeBody(bodyBuffer, context, localChunkLen);
+ // pendingLargeMessage.encodeBody(bodyBuffer, positionPendingLargeMessage,
localChunkLen);
+ largeMessage.encodeBody(bodyBuffer, context, localChunkLen);
chunk = new SessionReceiveContinuationMessage(id,
bodyBuffer.array(),
@@ -858,7 +814,7 @@
private final Iterator<MessageReference> iterator;
- public void run()
+ public synchronized void run()
{
// if the reference was busy during the previous iteration, handle it now
if (current != null)
@@ -866,14 +822,17 @@
try
{
HandleStatus status = handle(current);
+
if (status == HandleStatus.BUSY)
{
return;
}
+
+ current = null;
}
catch (Exception e)
{
- log.warn("Exception while browser handled from " + messageQueue
+ ": " + current, e);
+ log.error("Exception while browser handled from " + messageQueue
+ ": " + current, e);
return;
}
}
@@ -894,10 +853,11 @@
}
catch (Exception e)
{
- log.warn("Exception while browser handled from " + messageQueue
+ ": " + ref, e);
+ log.error("Exception while browser handled from " + messageQueue
+ ": " + ref, e);
break;
}
}
+
}
}
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-11-04 16:20:14
UTC (rev 8209)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-11-04 16:45:51
UTC (rev 8210)
@@ -165,7 +165,7 @@
private volatile LargeServerMessage currentLargeMessage;
private ServerSessionPacketHandler handler;
-
+
private boolean closed;
// Constructors
---------------------------------------------------------------------------------
@@ -368,17 +368,17 @@
Filter filter = FilterImpl.createFilter(filterString);;
ServerConsumer consumer = new ServerConsumerImpl(packet.getID(),
- this,
- (QueueBinding)binding,
- filter,
- started,
- browseOnly,
- storageManager,
- channel,
- preAcknowledge,
- updateDeliveries,
- executor,
- managementService);
+ this,
+ (QueueBinding)binding,
+ filter,
+ started,
+ browseOnly,
+ storageManager,
+ channel,
+ preAcknowledge,
+ updateDeliveries,
+ executor,
+ managementService);
consumers.put(consumer.getID(), consumer);
@@ -1063,9 +1063,9 @@
response = new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
}
else
- {
+ {
Transaction theTx = resourceManager.removeTransaction(xid);
-
+
if (theTx == null)
{
// checked heuristic committed transactions
@@ -1856,7 +1856,7 @@
private void doRollback(final boolean lastMessageAsDelived, final Transaction theTx)
throws Exception
{
boolean wasStarted = started;
-
+
List<MessageReference> toCancel = new ArrayList<MessageReference>();
for (ServerConsumer consumer : consumers.values())
@@ -1873,7 +1873,7 @@
{
ref.getQueue().cancel(theTx, ref);
}
-
+
theTx.rollback();
if (wasStarted)
Modified: trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2009-11-04
16:20:14 UTC (rev 8209)
+++ trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2009-11-04
16:45:51 UTC (rev 8210)
@@ -14,8 +14,6 @@
package org.hornetq.tests.integration.client;
import java.util.HashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
@@ -28,7 +26,6 @@
import org.hornetq.core.client.ClientProducer;
import org.hornetq.core.client.ClientSession;
import org.hornetq.core.client.ClientSessionFactory;
-import org.hornetq.core.client.MessageHandler;
import org.hornetq.core.client.impl.ClientConsumerInternal;
import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.config.Configuration;
@@ -286,7 +283,6 @@
server.start();
ClientSessionFactory sf = createFactory(isNetty());
-
session = sf.createSession(false, false, false);
Added:
trunk/tests/src/org/hornetq/tests/integration/client/NettyProducerFlowControlTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/client/NettyProducerFlowControlTest.java
(rev 0)
+++
trunk/tests/src/org/hornetq/tests/integration/client/NettyProducerFlowControlTest.java 2009-11-04
16:45:51 UTC (rev 8210)
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.client;
+
+/**
+ * A NettyProducerFlowControlTest
+ *
+ * @author tim
+ *
+ *
+ */
+public class NettyProducerFlowControlTest extends ProducerFlowControlTest
+{
+ @Override
+ protected boolean isNetty()
+ {
+ return true;
+ }
+
+}
Modified:
trunk/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java 2009-11-04
16:20:14 UTC (rev 8209)
+++
trunk/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java 2009-11-04
16:45:51 UTC (rev 8210)
@@ -44,125 +44,104 @@
{
private static final Logger log = Logger.getLogger(ProducerFlowControlTest.class);
+ protected boolean isNetty()
+ {
+ return false;
+ }
+
// TODO need to test crashing a producer with unused credits returns them to the pool
public void testFlowControlSingleConsumer() throws Exception
{
- testFlowControl(false, 1000, 500, 10 * 1024, 1024, 1024, 1024, 1, 1, 0, false);
+ testFlowControl(1000, 500, 10 * 1024, 1024, 1024, 1024, 1, 1, 0, false);
}
public void testFlowControlAnon() throws Exception
{
- testFlowControl(false, 1000, 500, 10 * 1024, 1024, 1024, 1024, 1, 1, 0, true);
+ testFlowControl(1000, 500, 10 * 1024, 1024, 1024, 1024, 1, 1, 0, true);
}
public void testFlowControlSingleConsumerLargeMaxSize() throws Exception
{
- testFlowControl(false, 1000, 500, 1024 * 1024, 1024, 1024, 1024, 1, 1, 0, false);
+ testFlowControl(1000, 500, 1024 * 1024, 1024, 1024, 1024, 1, 1, 0, false);
}
public void testFlowControlMultipleConsumers() throws Exception
{
- testFlowControl(false, 1000, 500, 10 * 1024, 1024, 1024, 1024, 5, 1, 0, false);
+ testFlowControl(1000, 500, 10 * 1024, 1024, 1024, 1024, 5, 1, 0, false);
}
public void testFlowControlZeroConsumerWindowSize() throws Exception
{
- testFlowControl(false, 1000, 500, 10 * 1024, 1024, 0, 1024, 1, 1, 0, false);
+ testFlowControl(1000, 500, 10 * 1024, 1024, 0, 1024, 1, 1, 0, false);
}
public void testFlowControlZeroAckBatchSize() throws Exception
{
- testFlowControl(false, 1000, 500, 10 * 1024, 1024, 1024, 0, 1, 1, 0, false);
+ testFlowControl(1000, 500, 10 * 1024, 1024, 1024, 0, 1, 1, 0, false);
}
public void testFlowControlSingleConsumerSlowConsumer() throws Exception
{
- testFlowControl(false, 100, 500, 1024, 512, 512, 512, 1, 1, 10, false);
+ testFlowControl(100, 500, 1024, 512, 512, 512, 1, 1, 10, false);
}
public void testFlowControlSmallMessages() throws Exception
{
- testFlowControl(false, 1000, 0, 10 * 1024, 1024, 1024, 1024, 1, 1, 0, false);
+ testFlowControl(1000, 0, 10 * 1024, 1024, 1024, 1024, 1, 1, 0, false);
}
public void testFlowControlLargerMessagesSmallWindowSize() throws Exception
{
- testFlowControl(false, 1000, 10 * 1024, 10 * 1024, 1024, 1024, 1024, 1, 1, 0,
false);
+ testFlowControl(1000, 10 * 1024, 10 * 1024, 1024, 1024, 1024, 1, 1, 0, false);
}
public void testFlowControlMultipleProducers() throws Exception
{
- testFlowControl(false, 1000, 500, 1024 * 1024, 1024, 1024, 1024, 1, 5, 0, false);
+ testFlowControl(1000, 500, 1024 * 1024, 1024, 1024, 1024, 1, 5, 0, false);
}
public void testFlowControlMultipleProducersAndConsumers() throws Exception
{
- testFlowControl(false, 500, 500, 100 * 1024, 1024, 1024, 1024, 1, 3, 3, false);
+ testFlowControl(500, 500, 100 * 1024, 1024, 1024, 1024, 1, 3, 3, false);
}
public void testFlowControlMultipleProducersAnon() throws Exception
{
- testFlowControl(false, 1000, 500, 1024 * 1024, 1024, 1024, 1024, 1, 5, 0, true);
+ testFlowControl(1000, 500, 1024 * 1024, 1024, 1024, 1024, 1, 5, 0, true);
}
- public void testFlowControlSingleConsumerNetty() throws Exception
+ public void testFlowControlLargeMessages2() throws Exception
{
- testFlowControl(true, 1000, 500, 10 * 1024, 1024, 1024, 1024, 1, 1, 0, false);
+ testFlowControl(1000, 10000, -1, 1024, 0, 0, 1, 1, 0, false, 1000, true);
}
- public void testFlowControlSingleConsumerLargeMaxSizeNetty() throws Exception
+ public void testFlowControlLargeMessages3() throws Exception
{
- testFlowControl(true, 1000, 500, 1024 * 1024, 1024, 1024, 1024, 1, 1, 0, false);
+ testFlowControl(1000, 10000, 100 * 1024, 1024, 1024, 0, 1, 1, 0, false, 1000,
true);
}
- public void testFlowControlMultipleConsumersNetty() throws Exception
+ public void testFlowControlLargeMessages4() throws Exception
{
- testFlowControl(true, 1000, 500, 10 * 1024, 1024, 1024, 1024, 5, 1, 0, false);
+ testFlowControl(1000, 10000, 100 * 1024, 1024, 1024, 1024, 1, 1, 0, false, 1000,
true);
}
- public void testFlowControlZeroConsumerWindowSizeNetty() throws Exception
+ public void testFlowControlLargeMessages5() throws Exception
{
- testFlowControl(true, 1000, 500, 10 * 1024, 1024, 0, 1024, 1, 1, 0, false);
+ testFlowControl(1000, 10000, 100 * 1024, 1024, -1, 1024, 1, 1, 0, false, 1000,
true);
}
- public void testFlowControlZeroAckBatchSizeNetty() throws Exception
+ public void testFlowControlLargeMessages6() throws Exception
{
- testFlowControl(true, 1000, 500, 10 * 1024, 1024, 1024, 0, 1, 1, 0, false);
+ testFlowControl(1000, 10000, 100 * 1024, 1024, 1024, 1024, 1, 1, 0, true, 1000,
true);
}
-
- public void testFlowControlSingleConsumerSlowConsumerNetty() throws Exception
+
+ public void testFlowControlLargeMessages7() throws Exception
{
- testFlowControl(true, 100, 500, 1024, 512, 512, 512, 1, 1, 10, false);
+ testFlowControl(1000, 10000, 100 * 1024, 1024, 1024, 1024, 2, 2, 0, true, 1000,
true);
}
- public void testFlowControlSmallMessagesNetty() throws Exception
- {
- testFlowControl(true, 1000, 0, 10 * 1024, 1024, 1024, 1024, 1, 1, 0, false);
- }
-
- public void testFlowControlLargerMessagesSmallWindowSizeNetty() throws Exception
- {
- testFlowControl(true, 1000, 10 * 1024, 10 * 1024, 1024, 1024, 1024, 1, 1, 0,
false);
- }
-
- public void testFlowControlMultipleProducersNetty() throws Exception
- {
- testFlowControl(true, 1000, 500, 1024 * 1024, 1024, 1024, 1024, 1, 5, 0, false);
- }
-
- public void testFlowControlMultipleProducersAndConsumersNetty() throws Exception
- {
- testFlowControl(false, 500, 500, 100 * 1024, 1024, 1024, 1024, 1, 3, 3, false);
- }
-
- public void testFlowControlMultipleProducersAnonNetty() throws Exception
- {
- testFlowControl(true, 1000, 500, 1024 * 1024, 1024, 1024, 1024, 1, 5, 0, true);
- }
-
- private void testFlowControl(final boolean netty,
- final int numMessages,
+ private void testFlowControl(final int numMessages,
final int messageSize,
final int maxSize,
final int producerWindowSize,
@@ -173,8 +152,7 @@
final long consumerDelay,
final boolean anon) throws Exception
{
- testFlowControl(netty,
- numMessages,
+ testFlowControl(numMessages,
messageSize,
maxSize,
producerWindowSize,
@@ -187,19 +165,8 @@
-1,
false);
}
-
-// public void testFlowControlLargeMessages() throws Exception
-// {
-// testFlowControl(true, 1000, 10000, 100 * 1024, 1024, 1024, 0, 1, 1, 0, false,
1000, true);
-// }
-//
-// public void testFlowControlLargeMessages2() throws Exception
-// {
-// testFlowControl(true, 1000, 10000, -1, 1024, 0, 0, 1, 1, 0, false, 1000, true);
-// }
- private void testFlowControl(final boolean netty,
- final int numMessages,
+ private void testFlowControl(final int numMessages,
final int messageSize,
final int maxSize,
final int producerWindowSize,
@@ -214,7 +181,7 @@
{
final SimpleString address = new SimpleString("testaddress");
- Configuration config = super.createDefaultConfig(netty);
+ Configuration config = super.createDefaultConfig(isNetty());
HornetQServer server = createServer(realFiles, config);
@@ -229,7 +196,7 @@
ClientSessionFactory sf;
- if (netty)
+ if (isNetty())
{
sf = createNettyFactory();
}
@@ -248,7 +215,7 @@
}
ClientSession session = sf.createSession(false, true, true, true);
-
+
session.start();
final String queueName = "testqueue";
@@ -258,7 +225,6 @@
session.createQueue(address, new SimpleString(queueName + i), null, false);
}
-
class MyHandler implements MessageHandler
{
int count = 0;
@@ -271,16 +237,16 @@
{
try
{
- log.info("got message " + count);
-
+ // log.info("got message " + count);
+
int availBytes = message.getBody().readableBytes();
-
+
assertEquals(messageSize, availBytes);
-
+
byte[] bytes = new byte[availBytes];
-
+
message.getBody().readBytes(bytes);
-
+
message.acknowledge();
if (++count == numMessages * numProducers)
@@ -309,7 +275,7 @@
handlers[i] = new MyHandler();
log.info("created consumer");
-
+
ClientConsumer consumer = session.createConsumer(new SimpleString(queueName +
i));
consumer.setMessageHandler(handlers[i]);
@@ -346,16 +312,15 @@
else
{
producers[j].send(message);
-
- //log.info("sent message " + i);
+
+ // log.info("sent message " + i);
}
}
}
-
- log.info("sent messages");
-
-
+
+ // log.info("sent messages");
+
for (int i = 0; i < numConsumers; i++)
{
handlers[i].latch.await();
Modified:
trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java 2009-11-04
16:20:14 UTC (rev 8209)
+++
trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java 2009-11-04
16:45:51 UTC (rev 8210)
@@ -214,7 +214,7 @@
}
sendMessages(numberOfMessages, numberOfBytes, delayDelivery, session,
producer);
-
+
if (isXA)
{
session.end(xid, XAResource.TMSUCCESS);
@@ -270,9 +270,6 @@
for (int iteration = testBrowser ? 0 : 1; iteration < 2; iteration++)
{
-
- log.debug("Iteration: " + iteration);
-
session.stop();
// first time with a browser
@@ -289,11 +286,8 @@
public void onMessage(final ClientMessage message)
{
-
try
{
- log.debug("Message on consumer: " + msgCounter);
-
if (delayDelivery > 0)
{
long originalTime = (Long)message.getProperty(new
SimpleString("original-time"));
@@ -307,7 +301,7 @@
}
assertNotNull(message);
-
+
if (delayDelivery <= 0)
{
// right now there is no guarantee of ordered delivered on
multiple scheduledMessages with
@@ -365,7 +359,7 @@
log.debug("Read " + b + " bytes");
}
- assertEquals("byte pos" + b + " is
incorrect", getSamplebyte(b), buffer.readByte());
+ assertEquals(getSamplebyte(b), buffer.readByte());
}
}
}
@@ -389,7 +383,6 @@
assertTrue(latchDone.await(waitOnConsumer, TimeUnit.SECONDS));
assertEquals(0, errors.get());
-
}
else
{
@@ -498,8 +491,8 @@
session.start(xid, XAResource.TMNOFLAGS);
}
else
- {
- session.rollback();
+ {
+ session.rollback();
}
}
else