Author: clebert.suconic(a)jboss.com
Date: 2011-10-14 13:03:44 -0400 (Fri, 14 Oct 2011)
New Revision: 11544
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/LargeMessageControllerImpl.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionContinuationMessage.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
Log:
https://issues.jboss.org/browse/JBPAPP-7389 - Large Message Flow Control
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
---
branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2011-10-14
17:02:14 UTC (rev 11543)
+++
branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2011-10-14
17:03:44 UTC (rev 11544)
@@ -574,7 +574,7 @@
largeMessageCache.deleteOnExit();
}
- currentLargeMessageController = new LargeMessageControllerImpl(this,
packet.getLargeMessageSize(), 60, largeMessageCache);
+ currentLargeMessageController = new LargeMessageControllerImpl(this,
packet.getLargeMessageSize(), 5, largeMessageCache);
if (currentChunkMessage.isCompressed())
{
@@ -596,7 +596,18 @@
{
return;
}
- currentLargeMessageController.addPacket(chunk);
+ if (currentLargeMessageController == null)
+ {
+ if (log.isTraceEnabled())
+ {
+ log.trace("Sending back credits for largeController = null " +
chunk.getPacketSize());
+ }
+ flowControl(chunk.getPacketSize(), false);
+ }
+ else
+ {
+ currentLargeMessageController.addPacket(chunk);
+ }
}
public void clear(boolean waitForOnMessage) throws HornetQException
@@ -609,12 +620,39 @@
while (iter.hasNext())
{
- ClientMessageInternal message = iter.next();
+ try
+ {
+ ClientMessageInternal message = iter.next();
+
+ if (message.isLargeMessage())
+ {
+ ClientLargeMessageInternal largeMessage =
(ClientLargeMessageInternal)message;
+ largeMessage.getLargeMessageController().cancel();
+ }
- flowControlBeforeConsumption(message);
+ flowControlBeforeConsumption(message);
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ }
}
clearBuffer();
+
+ try
+ {
+ if (currentLargeMessageController != null)
+ {
+ currentLargeMessageController.cancel();
+ currentLargeMessageController = null;
+ }
+ }
+ catch (Throwable e)
+ {
+ // nothing that could be done here
+ log.warn(e.getMessage(), e);
+ }
}
// Need to send credits for the messages in the buffer
@@ -680,6 +718,11 @@
if (clientWindowSize >= 0)
{
creditsToSend += messageBytes;
+
+ if (log.isTraceEnabled())
+ {
+ log.trace(this + "::FlowControl::creditsToSend=" + creditsToSend +
", clientWindowSize = " + clientWindowSize + " messageBytes = " +
messageBytes);
+ }
if (creditsToSend >= clientWindowSize)
{
@@ -687,7 +730,7 @@
{
if (ClientConsumerImpl.trace)
{
- ClientConsumerImpl.log.trace("Sending " + creditsToSend +
" -1, for slow consumer");
+ ClientConsumerImpl.log.trace("FlowControl::Sending " +
creditsToSend + " -1, for slow consumer");
}
// sending the credits - 1 initially send to fire the slow consumer, or
the slow consumer would be
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/LargeMessageControllerImpl.java
===================================================================
---
branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/LargeMessageControllerImpl.java 2011-10-14
17:02:14 UTC (rev 11543)
+++
branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/LargeMessageControllerImpl.java 2011-10-14
17:03:44 UTC (rev 11544)
@@ -31,6 +31,7 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
import org.hornetq.utils.DataConstants;
import org.hornetq.utils.UTF8Util;
@@ -141,7 +142,7 @@
{
checkForPacket(totalSize - 1);
}
- catch (Exception ignored)
+ catch (Throwable ignored)
{
}
}
@@ -227,6 +228,24 @@
public synchronized void cancel()
{
+
+ int totalSize = 0;
+ Packet polledPacket = null;
+ while ((polledPacket = packets.poll()) != null)
+ {
+ totalSize += polledPacket.getPacketSize();
+ }
+
+ try
+ {
+ consumerInternal.flowControl(totalSize, false);
+ }
+ catch (Exception ignored)
+ {
+ // what else can we do here?
+ log.warn(ignored.getMessage(), ignored);
+ }
+
packets.offer(new SessionReceiveContinuationMessage());
streamEnded = true;
streamClosed = true;
@@ -279,6 +298,11 @@
public synchronized void saveBuffer(final OutputStream output) throws
HornetQException
{
+ if (streamClosed)
+ {
+ throw new HornetQException(HornetQException.ILLEGAL_STATE,
+ "The large message lost connection with its
session, either because of a rollback or a closed session");
+ }
setOutputStream(output);
waitCompletion(0);
}
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionContinuationMessage.java
===================================================================
---
branches/Branch_2_2_AS7/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionContinuationMessage.java 2011-10-14
17:02:14 UTC (rev 11543)
+++
branches/Branch_2_2_AS7/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionContinuationMessage.java 2011-10-14
17:03:44 UTC (rev 11544)
@@ -64,7 +64,14 @@
*/
public byte[] getBody()
{
- return body;
+ if (size <= 0)
+ {
+ return new byte[0];
+ }
+ else
+ {
+ return body;
+ }
}
/**
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java
===================================================================
---
branches/Branch_2_2_AS7/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java 2011-10-14
17:02:14 UTC (rev 11543)
+++
branches/Branch_2_2_AS7/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java 2011-10-14
17:03:44 UTC (rev 11544)
@@ -77,6 +77,22 @@
super.encodeRest(buffer);
buffer.writeLong(consumerID);
}
+ @Override
+ public int getPacketSize()
+ {
+ if (size == -1)
+ {
+ // This packet was created by the LargeMessageController
+ // TODO: Get rid of this scenario
+ return 0;
+ }
+ else
+ {
+ return size;
+ }
+ }
+
+
@Override
public void decodeRest(final HornetQBuffer buffer)
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java
===================================================================
---
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java 2011-10-14
17:02:14 UTC (rev 11543)
+++
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java 2011-10-14
17:03:44 UTC (rev 11544)
@@ -17,7 +17,6 @@
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
-import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
---
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-10-14
17:02:14 UTC (rev 11543)
+++
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-10-14
17:03:44 UTC (rev 11544)
@@ -61,7 +61,7 @@
// Constants
------------------------------------------------------------------------------------
private static final Logger log = Logger.getLogger(ServerConsumerImpl.class);
-
+
private static boolean isTrace = log.isTraceEnabled();
// Static
---------------------------------------------------------------------------------------
@@ -85,7 +85,7 @@
private boolean started;
private volatile LargeMessageDeliverer largeMessageDeliverer = null;
-
+
public String debug()
{
return toString() + "::Delivering " + this.deliveringRefs.size();
@@ -115,7 +115,7 @@
private final Binding binding;
private boolean transferring = false;
-
+
/* As well as consumer credit based flow control, we also tap into TCP flow control
(assuming transport is using TCP)
* This is useful in the case where consumer-window-size = -1, but we don't want
to OOM by sending messages ad infinitum to the Netty
* write queue when the TCP buffer is full, e.g. the client is slow or has died.
@@ -163,11 +163,11 @@
minLargeMessageSize = session.getMinLargeMessageSize();
this.strictUpdateDeliveryCount = strictUpdateDeliveryCount;
-
+
this.callback.addReadyListener(this);
this.creationTime = System.currentTimeMillis();
-
+
if (browseOnly)
{
browserDeliverer = new BrowserDeliverer(messageQueue.iterator());
@@ -185,7 +185,7 @@
{
return id;
}
-
+
public boolean isBrowseOnly()
{
return browseOnly;
@@ -195,12 +195,12 @@
{
return creationTime;
}
-
+
public String getConnectionID()
{
return this.session.getConnectionID().toString();
}
-
+
public String getSessionID()
{
return this.session.getName();
@@ -210,20 +210,23 @@
{
if (availableCredits != null && availableCredits.get() <= 0)
{
- if (log.isDebugEnabled() )
+ if (log.isDebugEnabled())
{
- log.debug(this + " is busy for the lack of credits!!!");
+ log.debug(this + " is busy for the lack of credits. Current credits =
" +
+ availableCredits +
+ " Can't receive reference " +
+ ref);
}
-
+
return HandleStatus.BUSY;
}
-
-// TODO -
https://jira.jboss.org/browse/HORNETQ-533
-// if (!writeReady.get())
-// {
-// return HandleStatus.BUSY;
-// }
-
+
+ // TODO -
https://jira.jboss.org/browse/HORNETQ-533
+ // if (!writeReady.get())
+ // {
+ // return HandleStatus.BUSY;
+ // }
+
synchronized (lock)
{
// If the consumer is stopped then we don't accept the message, it
@@ -233,11 +236,18 @@
{
return HandleStatus.BUSY;
}
-
+
// If there is a pendingLargeMessage we can't take another message
// This has to be checked inside the lock as the set to null is done inside the
lock
if (largeMessageDeliverer != null)
{
+ if (log.isDebugEnabled())
+ {
+ log.debug(this + " is busy delivering large message " +
+ largeMessageDeliverer +
+ ", can't deliver reference " +
+ ref);
+ }
return HandleStatus.BUSY;
}
@@ -268,7 +278,9 @@
// the updateDeliveryCount would still be updated after c
if (strictUpdateDeliveryCount && !ref.isPaged())
{
- if (ref.getMessage().isDurable() && ref.getQueue().isDurable()
&& !ref.getQueue().isInternalQueue() && !ref.isPaged())
+ if (ref.getMessage().isDurable() && ref.getQueue().isDurable()
&&
+ !ref.getQueue().isInternalQueue() &&
+ !ref.isPaged())
{
storageManager.updateDeliveryCount(ref);
}
@@ -309,7 +321,7 @@
public void close(final boolean failed) throws Exception
{
callback.removeReadyListener(this);
-
+
setStarted(false);
if (largeMessageDeliverer != null)
@@ -355,8 +367,8 @@
props.putSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME,
binding.getRoutingName());
- props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING, filter == null
? null
- :
filter.getFilterString());
+ props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING,
+ filter == null ? null :
filter.getFilterString());
props.putIntProperty(ManagementHelper.HDR_DISTANCE, binding.getDistance());
@@ -406,10 +418,28 @@
}
}
- public LinkedList<MessageReference> cancelRefs(final boolean failed, final
boolean lastConsumedAsDelivered, final Transaction tx) throws Exception
+ public LinkedList<MessageReference> cancelRefs(final boolean failed,
+ final boolean lastConsumedAsDelivered,
+ final Transaction tx) throws Exception
{
boolean performACK = lastConsumedAsDelivered;
+ try
+ {
+ if (largeMessageDeliverer != null)
+ {
+ largeMessageDeliverer.finish();
+ }
+ }
+ catch (Throwable e)
+ {
+ log.warn("Error on resetting large message deliver - " +
largeMessageDeliverer, e);
+ }
+ finally
+ {
+ largeMessageDeliverer = null;
+ }
+
LinkedList<MessageReference> refs = new
LinkedList<MessageReference>();
if (!deliveringRefs.isEmpty())
@@ -430,8 +460,9 @@
{
if (!failed)
{
- //We don't decrement delivery count if the client failed, since
there's a possibility that refs were actually delivered but we just didn't get any
acks for them
- //before failure
+ // We don't decrement delivery count if the client failed, since
there's a possibility that refs
+ // were actually delivered but we just didn't get any acks for
them
+ // before failure
ref.decrementDeliveryCount();
}
@@ -492,18 +523,23 @@
}
public void receiveCredits(final int credits) throws Exception
- {
+ {
if (credits == -1)
{
+ if (log.isDebugEnabled())
+ {
+ log.debug(this + ":: FlowControl::Received disable flow control
message");
+ }
// No flow control
availableCredits = null;
-
- //There may be messages already in the queue
+
+ // There may be messages already in the queue
promptDelivery();
}
else if (credits == 0)
{
- //reset, used on slow consumers
+ // reset, used on slow consumers
+ log.debug(this + ":: FlowControl::Received reset flow control
message");
availableCredits.set(0);
}
else
@@ -512,16 +548,17 @@
if (log.isDebugEnabled())
{
- log.debug(this + "::Received " + credits +
- " credits, previous value = " +
- previous +
- " currentValue = " +
- availableCredits.get());
+ log.debug(this + "::FlowControl::Received " +
+ credits +
+ " credits, previous value = " +
+ previous +
+ " currentValue = " +
+ availableCredits.get());
}
if (previous <= 0 && previous + credits > 0)
{
- if (log.isTraceEnabled() )
+ if (log.isTraceEnabled())
{
log.trace(this + "::calling promptDelivery from receiving
credits");
}
@@ -541,7 +578,7 @@
{
return;
}
-
+
// Acknowledge acknowledges all refs delivered by the consumer up to and including
the one explicitly
// acknowledged
@@ -573,21 +610,21 @@
}
while (ref.getMessage().getMessageID() != messageID);
}
-
+
public void individualAcknowledge(final boolean autoCommitAcks, final Transaction tx,
final long messageID) throws Exception
{
if (browseOnly)
{
return;
}
-
+
MessageReference ref = removeReferenceByID(messageID);
-
+
if (ref == null)
{
throw new IllegalStateException("Cannot find ref to ack " +
messageID);
}
-
+
if (autoCommitAcks)
{
ref.getQueue().acknowledge(ref);
@@ -627,13 +664,13 @@
return ref;
}
-
+
public void readyForWriting(final boolean ready)
{
if (ready)
{
writeReady.set(true);
-
+
promptDelivery();
}
else
@@ -701,10 +738,17 @@
if (availableCredits != null)
{
availableCredits.addAndGet(-packetSize);
+
+ if (log.isTraceEnabled())
+ {
+ log.trace(this + "::FlowControl::delivery standard taking " +
+ packetSize +
+ " from credits, available now is " +
+ availableCredits);
+ }
}
}
-
// Inner classes
// ------------------------------------------------------------------------
@@ -766,6 +810,12 @@
if (availableCredits != null && availableCredits.get() <= 0)
{
+ if (log.isTraceEnabled())
+ {
+ log.trace(this + "::FlowControl::delivery largeMessage
interrupting as there are no more credits, available=" +
+ availableCredits);
+ }
+
return false;
}
@@ -774,7 +824,7 @@
context = largeMessage.getBodyEncoder();
sizePendingLargeMessage = context.getLargeBodySize();
-
+
context.open();
sentInitialPacket = true;
@@ -787,6 +837,15 @@
if (availableCredits != null)
{
availableCredits.addAndGet(-packetSize);
+
+ if (log.isTraceEnabled())
+ {
+ log.trace(this + "::FlowControl::" +
+ " deliver initialpackage with " +
+ packetSize +
+ " delivered, available now = " +
+ availableCredits);
+ }
}
// Execute the rest of the large message on a different thread so as not
to tie up the delivery thread
@@ -802,7 +861,8 @@
{
if (ServerConsumerImpl.isTrace)
{
- log.trace("deliverLargeMessage: Leaving loop of send
LargeMessage because of credits");
+ log.trace(this + "::FlowControl::deliverLargeMessage Leaving
loop of send LargeMessage because of credits, available=" +
+ availableCredits);
}
return false;
@@ -825,16 +885,17 @@
int chunkLen = body.length;
- if (ServerConsumerImpl.isTrace)
- {
- log.trace("deliverLargeMessage: Sending " + packetSize +
- " availableCredits now is " +
- availableCredits);
- }
-
if (availableCredits != null)
{
availableCredits.addAndGet(-packetSize);
+
+ if (log.isTraceEnabled())
+ {
+ log.trace(this + "::FlowControl::largeMessage deliver
continuation, packetSize=" +
+ packetSize +
+ " available now=" +
+ availableCredits);
+ }
}
positionPendingLargeMessage += chunkLen;
@@ -898,7 +959,7 @@
}
private final LinkedListIterator<MessageReference> iterator;
-
+
public synchronized void close()
{
iterator.close();
Modified:
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java
===================================================================
---
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java 2011-10-14
17:02:14 UTC (rev 11543)
+++
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java 2011-10-14
17:03:44 UTC (rev 11544)
@@ -37,6 +37,7 @@
import org.hornetq.core.server.Consumer;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.impl.ServerConsumerImpl;
+import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.tests.util.ServiceTestBase;
/**
@@ -911,6 +912,123 @@
internalTestSlowConsumerOnMessageHandlerNoBuffers(true);
}
+ public void testFlowControl() throws Exception
+ {
+ internalTestFlowControlOnRollback(false);
+ }
+
+ public void testFlowControlLargeMessage() throws Exception
+ {
+ internalTestFlowControlOnRollback(true);
+ }
+
+ private void internalTestFlowControlOnRollback(final boolean isLargeMessage) throws
Exception
+ {
+
+ HornetQServer server = createServer(false, isNetty());
+
+ AddressSettings settings = new AddressSettings();
+ settings.setMaxDeliveryAttempts(-1);
+ server.getAddressSettingsRepository().addMatch("#", settings);
+
+ ClientSession session = null;
+
+ try
+ {
+ final int numberOfMessages = 100;
+
+ server.start();
+
+ locator.setConsumerWindowSize(300000);
+
+ if (isLargeMessage)
+ {
+ // something to ensure we are using large messages
+ locator.setMinLargeMessageSize(100);
+ }
+ else
+ {
+ // To make sure large messages won't kick in, we set anything large
+ locator.setMinLargeMessageSize(Integer.MAX_VALUE);
+ }
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ session = sf.createSession(false, false, false);
+
+ SimpleString ADDRESS = new SimpleString("some-queue");
+
+ session.createQueue(ADDRESS, ADDRESS, true);
+
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ for (int i = 0 ; i < numberOfMessages; i++)
+ {
+ ClientMessage msg = session.createMessage(true);
+ msg.putIntProperty("count", i);
+ msg.getBodyBuffer().writeBytes(new byte[1024]);
+ producer.send(msg);
+ }
+
+ session.commit();
+
+ ClientConsumerInternal consumer =
(ClientConsumerInternal)session.createConsumer(ADDRESS);
+
+ session.start();
+
+ for (int repeat = 0; repeat < 100; repeat ++)
+ {
+ System.out.println("Repeat " + repeat);
+ long timeout = System.currentTimeMillis() + 2000;
+ // At least 10 messages on the buffer
+ while (timeout > System.currentTimeMillis() &&
consumer.getBufferSize() <= 10)
+ {
+ Thread.sleep(10);
+ }
+ assertTrue(consumer.getBufferSize() >= 10);
+
+ ClientMessage msg = consumer.receive(500);
+ msg.getBodyBuffer().readByte();
+ assertNotNull(msg);
+ msg.acknowledge();
+ session.rollback();
+ }
+
+
+ for (int i = 0 ; i < numberOfMessages; i++)
+ {
+ ClientMessage msg = consumer.receive(5000);
+ assertNotNull(msg);
+ System.out.println("msg " + msg);
+ msg.getBodyBuffer().readByte();
+ msg.acknowledge();
+ session.commit();
+ }
+
+ }
+ finally
+ {
+ try
+ {
+ if (session != null)
+ {
+ session.close();
+ }
+ }
+ catch (Exception ignored)
+ {
+ }
+
+ if (server.isStarted())
+ {
+ server.stop();
+ }
+ }
+ }
+
+
+
public void internalTestSlowConsumerOnMessageHandlerNoBuffers(final boolean
largeMessages) throws Exception
{
Modified:
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java
===================================================================
---
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java 2011-10-14
17:02:14 UTC (rev 11543)
+++
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java 2011-10-14
17:03:44 UTC (rev 11544)
@@ -408,6 +408,7 @@
{
Message clientFile = createLargeClientMessage(session, messageSize, true);
clientFile.putIntProperty("txid", 2);
+ clientFile.putIntProperty("i", i);
producer.send(clientFile);
}
session.end(xid2, XAResource.TMSUCCESS);
@@ -422,6 +423,7 @@
for (int start = 0 ; start < 2; start++)
{
+ System.out.println("Start " + start);
sf = locator.createSessionFactory();
@@ -437,6 +439,7 @@
session.start();
for (int i = 0 ; i < 10; i++)
{
+ log.info("I = " + i);
ClientMessage msg = cons1.receive(5000);
assertNotNull(msg);
assertEquals(1, msg.getIntProperty("txid").intValue());
Modified:
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
---
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2011-10-14
17:02:14 UTC (rev 11543)
+++
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2011-10-14
17:03:44 UTC (rev 11544)
@@ -15,6 +15,9 @@
import java.io.ByteArrayOutputStream;
import java.util.HashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
@@ -70,7 +73,136 @@
{
return false;
}
+
+ public void testRollbackPartiallyConsumedBuffer() throws Exception
+ {
+ for (int i = 0 ; i < 1; i++)
+ {
+ log.info("#test " + i);
+ internalTestRollbackPartiallyConsumedBuffer(false);
+ tearDown();
+ setUp();
+
+ }
+
+ }
+
+ public void testRollbackPartiallyConsumedBufferWithRedeliveryDelay() throws Exception
+ {
+ internalTestRollbackPartiallyConsumedBuffer(true);
+ }
+
+
+ private void internalTestRollbackPartiallyConsumedBuffer(final boolean
redeliveryDelay) throws Exception
+ {
+ final int messageSize = 100 * 1024;
+
+ final ClientSession session;
+
+ try
+ {
+ server = createServer(true, isNetty());
+
+ AddressSettings settings = new AddressSettings();
+ if (redeliveryDelay)
+ {
+ settings.setRedeliveryDelay(1000);
+ if (locator.isCompressLargeMessage())
+ {
+ locator.setConsumerWindowSize(0);
+ }
+ }
+ settings.setMaxDeliveryAttempts(-1);
+
+ server.getAddressSettingsRepository().addMatch("#", settings);
+
+ server.start();
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ session = sf.createSession(false, false, false);
+
+ session.createQueue(ADDRESS, ADDRESS, true);
+
+ ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+
+ for (int i = 0 ; i < 20; i++)
+ {
+ Message clientFile = createLargeClientMessage(session, messageSize, true);
+
+ clientFile.putIntProperty("value", i);
+
+ producer.send(clientFile);
+ }
+
+ session.commit();
+
+ session.start();
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ final AtomicInteger errors = new AtomicInteger(0);
+
+ ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS);
+
+ consumer.setMessageHandler(new MessageHandler()
+ {
+ int counter = 0;
+ public void onMessage(ClientMessage message)
+ {
+ message.getBodyBuffer().readByte();
+ System.out.println("message:" + message);
+ try
+ {
+ if (counter ++ < 20)
+ {
+ Thread.sleep(100);
+ System.out.println("Rollback");
+ message.acknowledge();
+ session.rollback();
+ }
+ else
+ {
+ message.acknowledge();
+ session.commit();
+ }
+
+ if (counter == 40)
+ {
+ latch.countDown();
+ }
+ }
+ catch (Exception e)
+ {
+ latch.countDown();
+ e.printStackTrace();
+ errors.incrementAndGet();
+ }
+ }
+ });
+
+ assertTrue(latch.await(40, TimeUnit.SECONDS));
+
+ consumer.close();
+
+ session.close();
+
+ validateNoFilesOnLargeDir();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
public void testCloseConsumer() throws Exception
{
final int messageSize = (int)(3.5 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
@@ -124,7 +256,7 @@
{
try
{
- server.stop();
+ session.close();
}
catch (Throwable ignored)
{
@@ -132,7 +264,7 @@
try
{
- session.close();
+ server.stop();
}
catch (Throwable ignored)
{
@@ -500,16 +632,17 @@
ClientConsumer consumerExpiry = session.createConsumer(ADDRESS_EXPIRY);
ClientMessage msg1 = consumerExpiry.receive(5000);
+ assertTrue(msg1.isLargeMessage());
Assert.assertNotNull(msg1);
msg1.acknowledge();
- session.rollback();
-
for (int j = 0; j < messageSize; j++)
{
Assert.assertEquals(UnitTestCase.getSamplebyte(j),
msg1.getBodyBuffer().readByte());
}
+ session.rollback();
+
consumerExpiry.close();
for (int i = 0; i < 10; i++)
@@ -521,13 +654,13 @@
Assert.assertNotNull(msg1);
msg1.acknowledge();
- session.rollback();
-
for (int j = 0; j < messageSize; j++)
{
Assert.assertEquals(UnitTestCase.getSamplebyte(j),
msg1.getBodyBuffer().readByte());
}
+ session.rollback();
+
consumerExpiry.close();
}
@@ -638,13 +771,13 @@
Assert.assertNotNull(msg1);
msg1.acknowledge();
- session.rollback();
-
for (int j = 0; j < messageSize; j++)
{
Assert.assertEquals(UnitTestCase.getSamplebyte(j),
msg1.getBodyBuffer().readByte());
}
+ session.rollback();
+
consumerExpiry.close();
for (int i = 0; i < 10; i++)
@@ -655,13 +788,13 @@
Assert.assertNotNull(msg1);
msg1.acknowledge();
- session.rollback();
-
for (int j = 0; j < messageSize; j++)
{
Assert.assertEquals(UnitTestCase.getSamplebyte(j),
msg1.getBodyBuffer().readByte());
}
+ session.rollback();
+
consumerExpiry.close();
}
@@ -1892,6 +2025,7 @@
ClientConsumer consumer = session.createConsumer(queue[1]);
ClientMessage msg = consumer.receive(LargeMessageTest.RECEIVE_WAIT_TIME);
+ msg.getBodyBuffer().readByte();
Assert.assertNull(consumer.receiveImmediate());
Assert.assertNotNull(msg);