JBoss hornetq SVN: r11544 - in branches/Branch_2_2_AS7: src/main/org/hornetq/core/protocol/core/impl/wireformat and 2 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-10-14 13:03:44 -0400 (Fri, 14 Oct 2011)
New Revision: 11544
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/LargeMessageControllerImpl.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionContinuationMessage.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
Log:
https://issues.jboss.org/browse/JBPAPP-7389 - Large Message Flow Control
Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2011-10-14 17:02:14 UTC (rev 11543)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2011-10-14 17:03:44 UTC (rev 11544)
@@ -574,7 +574,7 @@
largeMessageCache.deleteOnExit();
}
- currentLargeMessageController = new LargeMessageControllerImpl(this, packet.getLargeMessageSize(), 60, largeMessageCache);
+ currentLargeMessageController = new LargeMessageControllerImpl(this, packet.getLargeMessageSize(), 5, largeMessageCache);
if (currentChunkMessage.isCompressed())
{
@@ -596,7 +596,18 @@
{
return;
}
- currentLargeMessageController.addPacket(chunk);
+ if (currentLargeMessageController == null)
+ {
+ if (log.isTraceEnabled())
+ {
+ log.trace("Sending back credits for largeController = null " + chunk.getPacketSize());
+ }
+ flowControl(chunk.getPacketSize(), false);
+ }
+ else
+ {
+ currentLargeMessageController.addPacket(chunk);
+ }
}
public void clear(boolean waitForOnMessage) throws HornetQException
@@ -609,12 +620,39 @@
while (iter.hasNext())
{
- ClientMessageInternal message = iter.next();
+ try
+ {
+ ClientMessageInternal message = iter.next();
+
+ if (message.isLargeMessage())
+ {
+ ClientLargeMessageInternal largeMessage = (ClientLargeMessageInternal)message;
+ largeMessage.getLargeMessageController().cancel();
+ }
- flowControlBeforeConsumption(message);
+ flowControlBeforeConsumption(message);
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ }
}
clearBuffer();
+
+ try
+ {
+ if (currentLargeMessageController != null)
+ {
+ currentLargeMessageController.cancel();
+ currentLargeMessageController = null;
+ }
+ }
+ catch (Throwable e)
+ {
+ // nothing that could be done here
+ log.warn(e.getMessage(), e);
+ }
}
// Need to send credits for the messages in the buffer
@@ -680,6 +718,11 @@
if (clientWindowSize >= 0)
{
creditsToSend += messageBytes;
+
+ if (log.isTraceEnabled())
+ {
+ log.trace(this + "::FlowControl::creditsToSend=" + creditsToSend + ", clientWindowSize = " + clientWindowSize + " messageBytes = " + messageBytes);
+ }
if (creditsToSend >= clientWindowSize)
{
@@ -687,7 +730,7 @@
{
if (ClientConsumerImpl.trace)
{
- ClientConsumerImpl.log.trace("Sending " + creditsToSend + " -1, for slow consumer");
+ ClientConsumerImpl.log.trace("FlowControl::Sending " + creditsToSend + " -1, for slow consumer");
}
// sending the credits - 1 initially send to fire the slow consumer, or the slow consumer would be
Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/LargeMessageControllerImpl.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/LargeMessageControllerImpl.java 2011-10-14 17:02:14 UTC (rev 11543)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/LargeMessageControllerImpl.java 2011-10-14 17:03:44 UTC (rev 11544)
@@ -31,6 +31,7 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
import org.hornetq.utils.DataConstants;
import org.hornetq.utils.UTF8Util;
@@ -141,7 +142,7 @@
{
checkForPacket(totalSize - 1);
}
- catch (Exception ignored)
+ catch (Throwable ignored)
{
}
}
@@ -227,6 +228,24 @@
public synchronized void cancel()
{
+
+ int totalSize = 0;
+ Packet polledPacket = null;
+ while ((polledPacket = packets.poll()) != null)
+ {
+ totalSize += polledPacket.getPacketSize();
+ }
+
+ try
+ {
+ consumerInternal.flowControl(totalSize, false);
+ }
+ catch (Exception ignored)
+ {
+ // what else can we do here?
+ log.warn(ignored.getMessage(), ignored);
+ }
+
packets.offer(new SessionReceiveContinuationMessage());
streamEnded = true;
streamClosed = true;
@@ -279,6 +298,11 @@
public synchronized void saveBuffer(final OutputStream output) throws HornetQException
{
+ if (streamClosed)
+ {
+ throw new HornetQException(HornetQException.ILLEGAL_STATE,
+ "The large message lost connection with its session, either because of a rollback or a closed session");
+ }
setOutputStream(output);
waitCompletion(0);
}
Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionContinuationMessage.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionContinuationMessage.java 2011-10-14 17:02:14 UTC (rev 11543)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionContinuationMessage.java 2011-10-14 17:03:44 UTC (rev 11544)
@@ -64,7 +64,14 @@
*/
public byte[] getBody()
{
- return body;
+ if (size <= 0)
+ {
+ return new byte[0];
+ }
+ else
+ {
+ return body;
+ }
}
/**
Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java 2011-10-14 17:02:14 UTC (rev 11543)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java 2011-10-14 17:03:44 UTC (rev 11544)
@@ -77,6 +77,22 @@
super.encodeRest(buffer);
buffer.writeLong(consumerID);
}
+ @Override
+ public int getPacketSize()
+ {
+ if (size == -1)
+ {
+ // This packet was created by the LargeMessageController
+ // TODO: Get rid of this scenario
+ return 0;
+ }
+ else
+ {
+ return size;
+ }
+ }
+
+
@Override
public void decodeRest(final HornetQBuffer buffer)
Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java 2011-10-14 17:02:14 UTC (rev 11543)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java 2011-10-14 17:03:44 UTC (rev 11544)
@@ -17,7 +17,6 @@
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
-import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-10-14 17:02:14 UTC (rev 11543)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-10-14 17:03:44 UTC (rev 11544)
@@ -61,7 +61,7 @@
// Constants ------------------------------------------------------------------------------------
private static final Logger log = Logger.getLogger(ServerConsumerImpl.class);
-
+
private static boolean isTrace = log.isTraceEnabled();
// Static ---------------------------------------------------------------------------------------
@@ -85,7 +85,7 @@
private boolean started;
private volatile LargeMessageDeliverer largeMessageDeliverer = null;
-
+
public String debug()
{
return toString() + "::Delivering " + this.deliveringRefs.size();
@@ -115,7 +115,7 @@
private final Binding binding;
private boolean transferring = false;
-
+
/* As well as consumer credit based flow control, we also tap into TCP flow control (assuming transport is using TCP)
* This is useful in the case where consumer-window-size = -1, but we don't want to OOM by sending messages ad infinitum to the Netty
* write queue when the TCP buffer is full, e.g. the client is slow or has died.
@@ -163,11 +163,11 @@
minLargeMessageSize = session.getMinLargeMessageSize();
this.strictUpdateDeliveryCount = strictUpdateDeliveryCount;
-
+
this.callback.addReadyListener(this);
this.creationTime = System.currentTimeMillis();
-
+
if (browseOnly)
{
browserDeliverer = new BrowserDeliverer(messageQueue.iterator());
@@ -185,7 +185,7 @@
{
return id;
}
-
+
public boolean isBrowseOnly()
{
return browseOnly;
@@ -195,12 +195,12 @@
{
return creationTime;
}
-
+
public String getConnectionID()
{
return this.session.getConnectionID().toString();
}
-
+
public String getSessionID()
{
return this.session.getName();
@@ -210,20 +210,23 @@
{
if (availableCredits != null && availableCredits.get() <= 0)
{
- if (log.isDebugEnabled() )
+ if (log.isDebugEnabled())
{
- log.debug(this + " is busy for the lack of credits!!!");
+ log.debug(this + " is busy for the lack of credits. Current credits = " +
+ availableCredits +
+ " Can't receive reference " +
+ ref);
}
-
+
return HandleStatus.BUSY;
}
-
-// TODO - https://jira.jboss.org/browse/HORNETQ-533
-// if (!writeReady.get())
-// {
-// return HandleStatus.BUSY;
-// }
-
+
+ // TODO - https://jira.jboss.org/browse/HORNETQ-533
+ // if (!writeReady.get())
+ // {
+ // return HandleStatus.BUSY;
+ // }
+
synchronized (lock)
{
// If the consumer is stopped then we don't accept the message, it
@@ -233,11 +236,18 @@
{
return HandleStatus.BUSY;
}
-
+
// If there is a pendingLargeMessage we can't take another message
// This has to be checked inside the lock as the set to null is done inside the lock
if (largeMessageDeliverer != null)
{
+ if (log.isDebugEnabled())
+ {
+ log.debug(this + " is busy delivering large message " +
+ largeMessageDeliverer +
+ ", can't deliver reference " +
+ ref);
+ }
return HandleStatus.BUSY;
}
@@ -268,7 +278,9 @@
// the updateDeliveryCount would still be updated after c
if (strictUpdateDeliveryCount && !ref.isPaged())
{
- if (ref.getMessage().isDurable() && ref.getQueue().isDurable() && !ref.getQueue().isInternalQueue() && !ref.isPaged())
+ if (ref.getMessage().isDurable() && ref.getQueue().isDurable() &&
+ !ref.getQueue().isInternalQueue() &&
+ !ref.isPaged())
{
storageManager.updateDeliveryCount(ref);
}
@@ -309,7 +321,7 @@
public void close(final boolean failed) throws Exception
{
callback.removeReadyListener(this);
-
+
setStarted(false);
if (largeMessageDeliverer != null)
@@ -355,8 +367,8 @@
props.putSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME, binding.getRoutingName());
- props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING, filter == null ? null
- : filter.getFilterString());
+ props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING,
+ filter == null ? null : filter.getFilterString());
props.putIntProperty(ManagementHelper.HDR_DISTANCE, binding.getDistance());
@@ -406,10 +418,28 @@
}
}
- public LinkedList<MessageReference> cancelRefs(final boolean failed, final boolean lastConsumedAsDelivered, final Transaction tx) throws Exception
+ public LinkedList<MessageReference> cancelRefs(final boolean failed,
+ final boolean lastConsumedAsDelivered,
+ final Transaction tx) throws Exception
{
boolean performACK = lastConsumedAsDelivered;
+ try
+ {
+ if (largeMessageDeliverer != null)
+ {
+ largeMessageDeliverer.finish();
+ }
+ }
+ catch (Throwable e)
+ {
+ log.warn("Error on resetting large message deliver - " + largeMessageDeliverer, e);
+ }
+ finally
+ {
+ largeMessageDeliverer = null;
+ }
+
LinkedList<MessageReference> refs = new LinkedList<MessageReference>();
if (!deliveringRefs.isEmpty())
@@ -430,8 +460,9 @@
{
if (!failed)
{
- //We don't decrement delivery count if the client failed, since there's a possibility that refs were actually delivered but we just didn't get any acks for them
- //before failure
+ // We don't decrement delivery count if the client failed, since there's a possibility that refs
+ // were actually delivered but we just didn't get any acks for them
+ // before failure
ref.decrementDeliveryCount();
}
@@ -492,18 +523,23 @@
}
public void receiveCredits(final int credits) throws Exception
- {
+ {
if (credits == -1)
{
+ if (log.isDebugEnabled())
+ {
+ log.debug(this + ":: FlowControl::Received disable flow control message");
+ }
// No flow control
availableCredits = null;
-
- //There may be messages already in the queue
+
+ // There may be messages already in the queue
promptDelivery();
}
else if (credits == 0)
{
- //reset, used on slow consumers
+ // reset, used on slow consumers
+ log.debug(this + ":: FlowControl::Received reset flow control message");
availableCredits.set(0);
}
else
@@ -512,16 +548,17 @@
if (log.isDebugEnabled())
{
- log.debug(this + "::Received " + credits +
- " credits, previous value = " +
- previous +
- " currentValue = " +
- availableCredits.get());
+ log.debug(this + "::FlowControl::Received " +
+ credits +
+ " credits, previous value = " +
+ previous +
+ " currentValue = " +
+ availableCredits.get());
}
if (previous <= 0 && previous + credits > 0)
{
- if (log.isTraceEnabled() )
+ if (log.isTraceEnabled())
{
log.trace(this + "::calling promptDelivery from receiving credits");
}
@@ -541,7 +578,7 @@
{
return;
}
-
+
// Acknowledge acknowledges all refs delivered by the consumer up to and including the one explicitly
// acknowledged
@@ -573,21 +610,21 @@
}
while (ref.getMessage().getMessageID() != messageID);
}
-
+
public void individualAcknowledge(final boolean autoCommitAcks, final Transaction tx, final long messageID) throws Exception
{
if (browseOnly)
{
return;
}
-
+
MessageReference ref = removeReferenceByID(messageID);
-
+
if (ref == null)
{
throw new IllegalStateException("Cannot find ref to ack " + messageID);
}
-
+
if (autoCommitAcks)
{
ref.getQueue().acknowledge(ref);
@@ -627,13 +664,13 @@
return ref;
}
-
+
public void readyForWriting(final boolean ready)
{
if (ready)
{
writeReady.set(true);
-
+
promptDelivery();
}
else
@@ -701,10 +738,17 @@
if (availableCredits != null)
{
availableCredits.addAndGet(-packetSize);
+
+ if (log.isTraceEnabled())
+ {
+ log.trace(this + "::FlowControl::delivery standard taking " +
+ packetSize +
+ " from credits, available now is " +
+ availableCredits);
+ }
}
}
-
// Inner classes
// ------------------------------------------------------------------------
@@ -766,6 +810,12 @@
if (availableCredits != null && availableCredits.get() <= 0)
{
+ if (log.isTraceEnabled())
+ {
+ log.trace(this + "::FlowControl::delivery largeMessage interrupting as there are no more credits, available=" +
+ availableCredits);
+ }
+
return false;
}
@@ -774,7 +824,7 @@
context = largeMessage.getBodyEncoder();
sizePendingLargeMessage = context.getLargeBodySize();
-
+
context.open();
sentInitialPacket = true;
@@ -787,6 +837,15 @@
if (availableCredits != null)
{
availableCredits.addAndGet(-packetSize);
+
+ if (log.isTraceEnabled())
+ {
+ log.trace(this + "::FlowControl::" +
+ " deliver initialpackage with " +
+ packetSize +
+ " delivered, available now = " +
+ availableCredits);
+ }
}
// Execute the rest of the large message on a different thread so as not to tie up the delivery thread
@@ -802,7 +861,8 @@
{
if (ServerConsumerImpl.isTrace)
{
- log.trace("deliverLargeMessage: Leaving loop of send LargeMessage because of credits");
+ log.trace(this + "::FlowControl::deliverLargeMessage Leaving loop of send LargeMessage because of credits, available=" +
+ availableCredits);
}
return false;
@@ -825,16 +885,17 @@
int chunkLen = body.length;
- if (ServerConsumerImpl.isTrace)
- {
- log.trace("deliverLargeMessage: Sending " + packetSize +
- " availableCredits now is " +
- availableCredits);
- }
-
if (availableCredits != null)
{
availableCredits.addAndGet(-packetSize);
+
+ if (log.isTraceEnabled())
+ {
+ log.trace(this + "::FlowControl::largeMessage deliver continuation, packetSize=" +
+ packetSize +
+ " available now=" +
+ availableCredits);
+ }
}
positionPendingLargeMessage += chunkLen;
@@ -898,7 +959,7 @@
}
private final LinkedListIterator<MessageReference> iterator;
-
+
public synchronized void close()
{
iterator.close();
Modified: branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java
===================================================================
--- branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java 2011-10-14 17:02:14 UTC (rev 11543)
+++ branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java 2011-10-14 17:03:44 UTC (rev 11544)
@@ -37,6 +37,7 @@
import org.hornetq.core.server.Consumer;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.impl.ServerConsumerImpl;
+import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.tests.util.ServiceTestBase;
/**
@@ -911,6 +912,123 @@
internalTestSlowConsumerOnMessageHandlerNoBuffers(true);
}
+ public void testFlowControl() throws Exception
+ {
+ internalTestFlowControlOnRollback(false);
+ }
+
+ public void testFlowControlLargeMessage() throws Exception
+ {
+ internalTestFlowControlOnRollback(true);
+ }
+
+ private void internalTestFlowControlOnRollback(final boolean isLargeMessage) throws Exception
+ {
+
+ HornetQServer server = createServer(false, isNetty());
+
+ AddressSettings settings = new AddressSettings();
+ settings.setMaxDeliveryAttempts(-1);
+ server.getAddressSettingsRepository().addMatch("#", settings);
+
+ ClientSession session = null;
+
+ try
+ {
+ final int numberOfMessages = 100;
+
+ server.start();
+
+ locator.setConsumerWindowSize(300000);
+
+ if (isLargeMessage)
+ {
+ // something to ensure we are using large messages
+ locator.setMinLargeMessageSize(100);
+ }
+ else
+ {
+ // To make sure large messages won't kick in, we set anything large
+ locator.setMinLargeMessageSize(Integer.MAX_VALUE);
+ }
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ session = sf.createSession(false, false, false);
+
+ SimpleString ADDRESS = new SimpleString("some-queue");
+
+ session.createQueue(ADDRESS, ADDRESS, true);
+
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ for (int i = 0 ; i < numberOfMessages; i++)
+ {
+ ClientMessage msg = session.createMessage(true);
+ msg.putIntProperty("count", i);
+ msg.getBodyBuffer().writeBytes(new byte[1024]);
+ producer.send(msg);
+ }
+
+ session.commit();
+
+ ClientConsumerInternal consumer = (ClientConsumerInternal)session.createConsumer(ADDRESS);
+
+ session.start();
+
+ for (int repeat = 0; repeat < 100; repeat ++)
+ {
+ System.out.println("Repeat " + repeat);
+ long timeout = System.currentTimeMillis() + 2000;
+ // At least 10 messages on the buffer
+ while (timeout > System.currentTimeMillis() && consumer.getBufferSize() <= 10)
+ {
+ Thread.sleep(10);
+ }
+ assertTrue(consumer.getBufferSize() >= 10);
+
+ ClientMessage msg = consumer.receive(500);
+ msg.getBodyBuffer().readByte();
+ assertNotNull(msg);
+ msg.acknowledge();
+ session.rollback();
+ }
+
+
+ for (int i = 0 ; i < numberOfMessages; i++)
+ {
+ ClientMessage msg = consumer.receive(5000);
+ assertNotNull(msg);
+ System.out.println("msg " + msg);
+ msg.getBodyBuffer().readByte();
+ msg.acknowledge();
+ session.commit();
+ }
+
+ }
+ finally
+ {
+ try
+ {
+ if (session != null)
+ {
+ session.close();
+ }
+ }
+ catch (Exception ignored)
+ {
+ }
+
+ if (server.isStarted())
+ {
+ server.stop();
+ }
+ }
+ }
+
+
+
public void internalTestSlowConsumerOnMessageHandlerNoBuffers(final boolean largeMessages) throws Exception
{
Modified: branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java
===================================================================
--- branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java 2011-10-14 17:02:14 UTC (rev 11543)
+++ branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java 2011-10-14 17:03:44 UTC (rev 11544)
@@ -408,6 +408,7 @@
{
Message clientFile = createLargeClientMessage(session, messageSize, true);
clientFile.putIntProperty("txid", 2);
+ clientFile.putIntProperty("i", i);
producer.send(clientFile);
}
session.end(xid2, XAResource.TMSUCCESS);
@@ -422,6 +423,7 @@
for (int start = 0 ; start < 2; start++)
{
+ System.out.println("Start " + start);
sf = locator.createSessionFactory();
@@ -437,6 +439,7 @@
session.start();
for (int i = 0 ; i < 10; i++)
{
+ log.info("I = " + i);
ClientMessage msg = cons1.receive(5000);
assertNotNull(msg);
assertEquals(1, msg.getIntProperty("txid").intValue());
Modified: branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2011-10-14 17:02:14 UTC (rev 11543)
+++ branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2011-10-14 17:03:44 UTC (rev 11544)
@@ -15,6 +15,9 @@
import java.io.ByteArrayOutputStream;
import java.util.HashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
@@ -70,7 +73,136 @@
{
return false;
}
+
+ public void testRollbackPartiallyConsumedBuffer() throws Exception
+ {
+ for (int i = 0 ; i < 1; i++)
+ {
+ log.info("#test " + i);
+ internalTestRollbackPartiallyConsumedBuffer(false);
+ tearDown();
+ setUp();
+
+ }
+
+ }
+
+ public void testRollbackPartiallyConsumedBufferWithRedeliveryDelay() throws Exception
+ {
+ internalTestRollbackPartiallyConsumedBuffer(true);
+ }
+
+
+ private void internalTestRollbackPartiallyConsumedBuffer(final boolean redeliveryDelay) throws Exception
+ {
+ final int messageSize = 100 * 1024;
+
+ final ClientSession session;
+
+ try
+ {
+ server = createServer(true, isNetty());
+
+ AddressSettings settings = new AddressSettings();
+ if (redeliveryDelay)
+ {
+ settings.setRedeliveryDelay(1000);
+ if (locator.isCompressLargeMessage())
+ {
+ locator.setConsumerWindowSize(0);
+ }
+ }
+ settings.setMaxDeliveryAttempts(-1);
+
+ server.getAddressSettingsRepository().addMatch("#", settings);
+
+ server.start();
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ session = sf.createSession(false, false, false);
+
+ session.createQueue(ADDRESS, ADDRESS, true);
+
+ ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+
+ for (int i = 0 ; i < 20; i++)
+ {
+ Message clientFile = createLargeClientMessage(session, messageSize, true);
+
+ clientFile.putIntProperty("value", i);
+
+ producer.send(clientFile);
+ }
+
+ session.commit();
+
+ session.start();
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ final AtomicInteger errors = new AtomicInteger(0);
+
+ ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS);
+
+ consumer.setMessageHandler(new MessageHandler()
+ {
+ int counter = 0;
+ public void onMessage(ClientMessage message)
+ {
+ message.getBodyBuffer().readByte();
+ System.out.println("message:" + message);
+ try
+ {
+ if (counter ++ < 20)
+ {
+ Thread.sleep(100);
+ System.out.println("Rollback");
+ message.acknowledge();
+ session.rollback();
+ }
+ else
+ {
+ message.acknowledge();
+ session.commit();
+ }
+
+ if (counter == 40)
+ {
+ latch.countDown();
+ }
+ }
+ catch (Exception e)
+ {
+ latch.countDown();
+ e.printStackTrace();
+ errors.incrementAndGet();
+ }
+ }
+ });
+
+ assertTrue(latch.await(40, TimeUnit.SECONDS));
+
+ consumer.close();
+
+ session.close();
+
+ validateNoFilesOnLargeDir();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
public void testCloseConsumer() throws Exception
{
final int messageSize = (int)(3.5 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
@@ -124,7 +256,7 @@
{
try
{
- server.stop();
+ session.close();
}
catch (Throwable ignored)
{
@@ -132,7 +264,7 @@
try
{
- session.close();
+ server.stop();
}
catch (Throwable ignored)
{
@@ -500,16 +632,17 @@
ClientConsumer consumerExpiry = session.createConsumer(ADDRESS_EXPIRY);
ClientMessage msg1 = consumerExpiry.receive(5000);
+ assertTrue(msg1.isLargeMessage());
Assert.assertNotNull(msg1);
msg1.acknowledge();
- session.rollback();
-
for (int j = 0; j < messageSize; j++)
{
Assert.assertEquals(UnitTestCase.getSamplebyte(j), msg1.getBodyBuffer().readByte());
}
+ session.rollback();
+
consumerExpiry.close();
for (int i = 0; i < 10; i++)
@@ -521,13 +654,13 @@
Assert.assertNotNull(msg1);
msg1.acknowledge();
- session.rollback();
-
for (int j = 0; j < messageSize; j++)
{
Assert.assertEquals(UnitTestCase.getSamplebyte(j), msg1.getBodyBuffer().readByte());
}
+ session.rollback();
+
consumerExpiry.close();
}
@@ -638,13 +771,13 @@
Assert.assertNotNull(msg1);
msg1.acknowledge();
- session.rollback();
-
for (int j = 0; j < messageSize; j++)
{
Assert.assertEquals(UnitTestCase.getSamplebyte(j), msg1.getBodyBuffer().readByte());
}
+ session.rollback();
+
consumerExpiry.close();
for (int i = 0; i < 10; i++)
@@ -655,13 +788,13 @@
Assert.assertNotNull(msg1);
msg1.acknowledge();
- session.rollback();
-
for (int j = 0; j < messageSize; j++)
{
Assert.assertEquals(UnitTestCase.getSamplebyte(j), msg1.getBodyBuffer().readByte());
}
+ session.rollback();
+
consumerExpiry.close();
}
@@ -1892,6 +2025,7 @@
ClientConsumer consumer = session.createConsumer(queue[1]);
ClientMessage msg = consumer.receive(LargeMessageTest.RECEIVE_WAIT_TIME);
+ msg.getBodyBuffer().readByte();
Assert.assertNull(consumer.receiveImmediate());
Assert.assertNotNull(msg);
13 years, 3 months
JBoss hornetq SVN: r11543 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/protocol/core/impl/wireformat and 2 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-10-14 13:02:14 -0400 (Fri, 14 Oct 2011)
New Revision: 11543
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/LargeMessageControllerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionContinuationMessage.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
Log:
https://issues.jboss.org/browse/JBPAPP-7389 - flow control on large message
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2011-10-14 16:59:20 UTC (rev 11542)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2011-10-14 17:02:14 UTC (rev 11543)
@@ -574,7 +574,7 @@
largeMessageCache.deleteOnExit();
}
- currentLargeMessageController = new LargeMessageControllerImpl(this, packet.getLargeMessageSize(), 60, largeMessageCache);
+ currentLargeMessageController = new LargeMessageControllerImpl(this, packet.getLargeMessageSize(), 5, largeMessageCache);
if (currentChunkMessage.isCompressed())
{
@@ -596,7 +596,18 @@
{
return;
}
- currentLargeMessageController.addPacket(chunk);
+ if (currentLargeMessageController == null)
+ {
+ if (log.isTraceEnabled())
+ {
+ log.trace("Sending back credits for largeController = null " + chunk.getPacketSize());
+ }
+ flowControl(chunk.getPacketSize(), false);
+ }
+ else
+ {
+ currentLargeMessageController.addPacket(chunk);
+ }
}
public void clear(boolean waitForOnMessage) throws HornetQException
@@ -609,12 +620,39 @@
while (iter.hasNext())
{
- ClientMessageInternal message = iter.next();
+ try
+ {
+ ClientMessageInternal message = iter.next();
+
+ if (message.isLargeMessage())
+ {
+ ClientLargeMessageInternal largeMessage = (ClientLargeMessageInternal)message;
+ largeMessage.getLargeMessageController().cancel();
+ }
- flowControlBeforeConsumption(message);
+ flowControlBeforeConsumption(message);
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ }
}
clearBuffer();
+
+ try
+ {
+ if (currentLargeMessageController != null)
+ {
+ currentLargeMessageController.cancel();
+ currentLargeMessageController = null;
+ }
+ }
+ catch (Throwable e)
+ {
+ // nothing that could be done here
+ log.warn(e.getMessage(), e);
+ }
}
// Need to send credits for the messages in the buffer
@@ -680,6 +718,11 @@
if (clientWindowSize >= 0)
{
creditsToSend += messageBytes;
+
+ if (log.isTraceEnabled())
+ {
+ log.trace(this + "::FlowControl::creditsToSend=" + creditsToSend + ", clientWindowSize = " + clientWindowSize + " messageBytes = " + messageBytes);
+ }
if (creditsToSend >= clientWindowSize)
{
@@ -687,7 +730,7 @@
{
if (ClientConsumerImpl.trace)
{
- ClientConsumerImpl.log.trace("Sending " + creditsToSend + " -1, for slow consumer");
+ ClientConsumerImpl.log.trace("FlowControl::Sending " + creditsToSend + " -1, for slow consumer");
}
// sending the credits - 1 initially send to fire the slow consumer, or the slow consumer would be
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/LargeMessageControllerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/LargeMessageControllerImpl.java 2011-10-14 16:59:20 UTC (rev 11542)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/LargeMessageControllerImpl.java 2011-10-14 17:02:14 UTC (rev 11543)
@@ -31,6 +31,7 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
import org.hornetq.utils.DataConstants;
import org.hornetq.utils.UTF8Util;
@@ -141,7 +142,7 @@
{
checkForPacket(totalSize - 1);
}
- catch (Exception ignored)
+ catch (Throwable ignored)
{
}
}
@@ -227,6 +228,24 @@
public synchronized void cancel()
{
+
+ int totalSize = 0;
+ Packet polledPacket = null;
+ while ((polledPacket = packets.poll()) != null)
+ {
+ totalSize += polledPacket.getPacketSize();
+ }
+
+ try
+ {
+ consumerInternal.flowControl(totalSize, false);
+ }
+ catch (Exception ignored)
+ {
+ // what else can we do here?
+ log.warn(ignored.getMessage(), ignored);
+ }
+
packets.offer(new SessionReceiveContinuationMessage());
streamEnded = true;
streamClosed = true;
@@ -279,6 +298,11 @@
public synchronized void saveBuffer(final OutputStream output) throws HornetQException
{
+ if (streamClosed)
+ {
+ throw new HornetQException(HornetQException.ILLEGAL_STATE,
+ "The large message lost connection with its session, either because of a rollback or a closed session");
+ }
setOutputStream(output);
waitCompletion(0);
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionContinuationMessage.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionContinuationMessage.java 2011-10-14 16:59:20 UTC (rev 11542)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionContinuationMessage.java 2011-10-14 17:02:14 UTC (rev 11543)
@@ -64,7 +64,14 @@
*/
public byte[] getBody()
{
- return body;
+ if (size <= 0)
+ {
+ return new byte[0];
+ }
+ else
+ {
+ return body;
+ }
}
/**
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java 2011-10-14 16:59:20 UTC (rev 11542)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java 2011-10-14 17:02:14 UTC (rev 11543)
@@ -77,6 +77,22 @@
super.encodeRest(buffer);
buffer.writeLong(consumerID);
}
+ @Override
+ public int getPacketSize()
+ {
+ if (size == -1)
+ {
+ // This packet was created by the LargeMessageController
+ // TODO: Get rid of this scenario
+ return 0;
+ }
+ else
+ {
+ return size;
+ }
+ }
+
+
@Override
public void decodeRest(final HornetQBuffer buffer)
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java 2011-10-14 16:59:20 UTC (rev 11542)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java 2011-10-14 17:02:14 UTC (rev 11543)
@@ -17,7 +17,6 @@
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
-import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-10-14 16:59:20 UTC (rev 11542)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-10-14 17:02:14 UTC (rev 11543)
@@ -61,7 +61,7 @@
// Constants ------------------------------------------------------------------------------------
private static final Logger log = Logger.getLogger(ServerConsumerImpl.class);
-
+
private static boolean isTrace = log.isTraceEnabled();
// Static ---------------------------------------------------------------------------------------
@@ -85,7 +85,7 @@
private boolean started;
private volatile LargeMessageDeliverer largeMessageDeliverer = null;
-
+
public String debug()
{
return toString() + "::Delivering " + this.deliveringRefs.size();
@@ -115,7 +115,7 @@
private final Binding binding;
private boolean transferring = false;
-
+
/* As well as consumer credit based flow control, we also tap into TCP flow control (assuming transport is using TCP)
* This is useful in the case where consumer-window-size = -1, but we don't want to OOM by sending messages ad infinitum to the Netty
* write queue when the TCP buffer is full, e.g. the client is slow or has died.
@@ -163,11 +163,11 @@
minLargeMessageSize = session.getMinLargeMessageSize();
this.strictUpdateDeliveryCount = strictUpdateDeliveryCount;
-
+
this.callback.addReadyListener(this);
this.creationTime = System.currentTimeMillis();
-
+
if (browseOnly)
{
browserDeliverer = new BrowserDeliverer(messageQueue.iterator());
@@ -185,7 +185,7 @@
{
return id;
}
-
+
public boolean isBrowseOnly()
{
return browseOnly;
@@ -195,12 +195,12 @@
{
return creationTime;
}
-
+
public String getConnectionID()
{
return this.session.getConnectionID().toString();
}
-
+
public String getSessionID()
{
return this.session.getName();
@@ -210,20 +210,23 @@
{
if (availableCredits != null && availableCredits.get() <= 0)
{
- if (log.isDebugEnabled() )
+ if (log.isDebugEnabled())
{
- log.debug(this + " is busy for the lack of credits!!!");
+ log.debug(this + " is busy for the lack of credits. Current credits = " +
+ availableCredits +
+ " Can't receive reference " +
+ ref);
}
-
+
return HandleStatus.BUSY;
}
-
-// TODO - https://jira.jboss.org/browse/HORNETQ-533
-// if (!writeReady.get())
-// {
-// return HandleStatus.BUSY;
-// }
-
+
+ // TODO - https://jira.jboss.org/browse/HORNETQ-533
+ // if (!writeReady.get())
+ // {
+ // return HandleStatus.BUSY;
+ // }
+
synchronized (lock)
{
// If the consumer is stopped then we don't accept the message, it
@@ -233,11 +236,18 @@
{
return HandleStatus.BUSY;
}
-
+
// If there is a pendingLargeMessage we can't take another message
// This has to be checked inside the lock as the set to null is done inside the lock
if (largeMessageDeliverer != null)
{
+ if (log.isDebugEnabled())
+ {
+ log.debug(this + " is busy delivering large message " +
+ largeMessageDeliverer +
+ ", can't deliver reference " +
+ ref);
+ }
return HandleStatus.BUSY;
}
@@ -268,7 +278,9 @@
// the updateDeliveryCount would still be updated after c
if (strictUpdateDeliveryCount && !ref.isPaged())
{
- if (ref.getMessage().isDurable() && ref.getQueue().isDurable() && !ref.getQueue().isInternalQueue() && !ref.isPaged())
+ if (ref.getMessage().isDurable() && ref.getQueue().isDurable() &&
+ !ref.getQueue().isInternalQueue() &&
+ !ref.isPaged())
{
storageManager.updateDeliveryCount(ref);
}
@@ -309,7 +321,7 @@
public void close(final boolean failed) throws Exception
{
callback.removeReadyListener(this);
-
+
setStarted(false);
if (largeMessageDeliverer != null)
@@ -355,8 +367,8 @@
props.putSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME, binding.getRoutingName());
- props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING, filter == null ? null
- : filter.getFilterString());
+ props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING,
+ filter == null ? null : filter.getFilterString());
props.putIntProperty(ManagementHelper.HDR_DISTANCE, binding.getDistance());
@@ -406,10 +418,28 @@
}
}
- public LinkedList<MessageReference> cancelRefs(final boolean failed, final boolean lastConsumedAsDelivered, final Transaction tx) throws Exception
+ public LinkedList<MessageReference> cancelRefs(final boolean failed,
+ final boolean lastConsumedAsDelivered,
+ final Transaction tx) throws Exception
{
boolean performACK = lastConsumedAsDelivered;
+ try
+ {
+ if (largeMessageDeliverer != null)
+ {
+ largeMessageDeliverer.finish();
+ }
+ }
+ catch (Throwable e)
+ {
+ log.warn("Error on resetting large message deliver - " + largeMessageDeliverer, e);
+ }
+ finally
+ {
+ largeMessageDeliverer = null;
+ }
+
LinkedList<MessageReference> refs = new LinkedList<MessageReference>();
if (!deliveringRefs.isEmpty())
@@ -430,8 +460,9 @@
{
if (!failed)
{
- //We don't decrement delivery count if the client failed, since there's a possibility that refs were actually delivered but we just didn't get any acks for them
- //before failure
+ // We don't decrement delivery count if the client failed, since there's a possibility that refs
+ // were actually delivered but we just didn't get any acks for them
+ // before failure
ref.decrementDeliveryCount();
}
@@ -492,18 +523,23 @@
}
public void receiveCredits(final int credits) throws Exception
- {
+ {
if (credits == -1)
{
+ if (log.isDebugEnabled())
+ {
+ log.debug(this + ":: FlowControl::Received disable flow control message");
+ }
// No flow control
availableCredits = null;
-
- //There may be messages already in the queue
+
+ // There may be messages already in the queue
promptDelivery();
}
else if (credits == 0)
{
- //reset, used on slow consumers
+ // reset, used on slow consumers
+ log.debug(this + ":: FlowControl::Received reset flow control message");
availableCredits.set(0);
}
else
@@ -512,16 +548,17 @@
if (log.isDebugEnabled())
{
- log.debug(this + "::Received " + credits +
- " credits, previous value = " +
- previous +
- " currentValue = " +
- availableCredits.get());
+ log.debug(this + "::FlowControl::Received " +
+ credits +
+ " credits, previous value = " +
+ previous +
+ " currentValue = " +
+ availableCredits.get());
}
if (previous <= 0 && previous + credits > 0)
{
- if (log.isTraceEnabled() )
+ if (log.isTraceEnabled())
{
log.trace(this + "::calling promptDelivery from receiving credits");
}
@@ -541,7 +578,7 @@
{
return;
}
-
+
// Acknowledge acknowledges all refs delivered by the consumer up to and including the one explicitly
// acknowledged
@@ -573,21 +610,21 @@
}
while (ref.getMessage().getMessageID() != messageID);
}
-
+
public void individualAcknowledge(final boolean autoCommitAcks, final Transaction tx, final long messageID) throws Exception
{
if (browseOnly)
{
return;
}
-
+
MessageReference ref = removeReferenceByID(messageID);
-
+
if (ref == null)
{
throw new IllegalStateException("Cannot find ref to ack " + messageID);
}
-
+
if (autoCommitAcks)
{
ref.getQueue().acknowledge(ref);
@@ -627,13 +664,13 @@
return ref;
}
-
+
public void readyForWriting(final boolean ready)
{
if (ready)
{
writeReady.set(true);
-
+
promptDelivery();
}
else
@@ -701,10 +738,17 @@
if (availableCredits != null)
{
availableCredits.addAndGet(-packetSize);
+
+ if (log.isTraceEnabled())
+ {
+ log.trace(this + "::FlowControl::delivery standard taking " +
+ packetSize +
+ " from credits, available now is " +
+ availableCredits);
+ }
}
}
-
// Inner classes
// ------------------------------------------------------------------------
@@ -766,6 +810,12 @@
if (availableCredits != null && availableCredits.get() <= 0)
{
+ if (log.isTraceEnabled())
+ {
+ log.trace(this + "::FlowControl::delivery largeMessage interrupting as there are no more credits, available=" +
+ availableCredits);
+ }
+
return false;
}
@@ -774,7 +824,7 @@
context = largeMessage.getBodyEncoder();
sizePendingLargeMessage = context.getLargeBodySize();
-
+
context.open();
sentInitialPacket = true;
@@ -787,6 +837,15 @@
if (availableCredits != null)
{
availableCredits.addAndGet(-packetSize);
+
+ if (log.isTraceEnabled())
+ {
+ log.trace(this + "::FlowControl::" +
+ " deliver initialpackage with " +
+ packetSize +
+ " delivered, available now = " +
+ availableCredits);
+ }
}
// Execute the rest of the large message on a different thread so as not to tie up the delivery thread
@@ -802,7 +861,8 @@
{
if (ServerConsumerImpl.isTrace)
{
- log.trace("deliverLargeMessage: Leaving loop of send LargeMessage because of credits");
+ log.trace(this + "::FlowControl::deliverLargeMessage Leaving loop of send LargeMessage because of credits, available=" +
+ availableCredits);
}
return false;
@@ -825,16 +885,17 @@
int chunkLen = body.length;
- if (ServerConsumerImpl.isTrace)
- {
- log.trace("deliverLargeMessage: Sending " + packetSize +
- " availableCredits now is " +
- availableCredits);
- }
-
if (availableCredits != null)
{
availableCredits.addAndGet(-packetSize);
+
+ if (log.isTraceEnabled())
+ {
+ log.trace(this + "::FlowControl::largeMessage deliver continuation, packetSize=" +
+ packetSize +
+ " available now=" +
+ availableCredits);
+ }
}
positionPendingLargeMessage += chunkLen;
@@ -898,7 +959,7 @@
}
private final LinkedListIterator<MessageReference> iterator;
-
+
public synchronized void close()
{
iterator.close();
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java 2011-10-14 16:59:20 UTC (rev 11542)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java 2011-10-14 17:02:14 UTC (rev 11543)
@@ -37,6 +37,7 @@
import org.hornetq.core.server.Consumer;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.impl.ServerConsumerImpl;
+import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.tests.util.ServiceTestBase;
/**
@@ -911,6 +912,123 @@
internalTestSlowConsumerOnMessageHandlerNoBuffers(true);
}
+ public void testFlowControl() throws Exception
+ {
+ internalTestFlowControlOnRollback(false);
+ }
+
+ public void testFlowControlLargeMessage() throws Exception
+ {
+ internalTestFlowControlOnRollback(true);
+ }
+
+ private void internalTestFlowControlOnRollback(final boolean isLargeMessage) throws Exception
+ {
+
+ HornetQServer server = createServer(false, isNetty());
+
+ AddressSettings settings = new AddressSettings();
+ settings.setMaxDeliveryAttempts(-1);
+ server.getAddressSettingsRepository().addMatch("#", settings);
+
+ ClientSession session = null;
+
+ try
+ {
+ final int numberOfMessages = 100;
+
+ server.start();
+
+ locator.setConsumerWindowSize(300000);
+
+ if (isLargeMessage)
+ {
+ // something to ensure we are using large messages
+ locator.setMinLargeMessageSize(100);
+ }
+ else
+ {
+ // To make sure large messages won't kick in, we set anything large
+ locator.setMinLargeMessageSize(Integer.MAX_VALUE);
+ }
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ session = sf.createSession(false, false, false);
+
+ SimpleString ADDRESS = new SimpleString("some-queue");
+
+ session.createQueue(ADDRESS, ADDRESS, true);
+
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ for (int i = 0 ; i < numberOfMessages; i++)
+ {
+ ClientMessage msg = session.createMessage(true);
+ msg.putIntProperty("count", i);
+ msg.getBodyBuffer().writeBytes(new byte[1024]);
+ producer.send(msg);
+ }
+
+ session.commit();
+
+ ClientConsumerInternal consumer = (ClientConsumerInternal)session.createConsumer(ADDRESS);
+
+ session.start();
+
+ for (int repeat = 0; repeat < 100; repeat ++)
+ {
+ System.out.println("Repeat " + repeat);
+ long timeout = System.currentTimeMillis() + 2000;
+ // At least 10 messages on the buffer
+ while (timeout > System.currentTimeMillis() && consumer.getBufferSize() <= 10)
+ {
+ Thread.sleep(10);
+ }
+ assertTrue(consumer.getBufferSize() >= 10);
+
+ ClientMessage msg = consumer.receive(500);
+ msg.getBodyBuffer().readByte();
+ assertNotNull(msg);
+ msg.acknowledge();
+ session.rollback();
+ }
+
+
+ for (int i = 0 ; i < numberOfMessages; i++)
+ {
+ ClientMessage msg = consumer.receive(5000);
+ assertNotNull(msg);
+ System.out.println("msg " + msg);
+ msg.getBodyBuffer().readByte();
+ msg.acknowledge();
+ session.commit();
+ }
+
+ }
+ finally
+ {
+ try
+ {
+ if (session != null)
+ {
+ session.close();
+ }
+ }
+ catch (Exception ignored)
+ {
+ }
+
+ if (server.isStarted())
+ {
+ server.stop();
+ }
+ }
+ }
+
+
+
public void internalTestSlowConsumerOnMessageHandlerNoBuffers(final boolean largeMessages) throws Exception
{
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java 2011-10-14 16:59:20 UTC (rev 11542)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java 2011-10-14 17:02:14 UTC (rev 11543)
@@ -408,6 +408,7 @@
{
Message clientFile = createLargeClientMessage(session, messageSize, true);
clientFile.putIntProperty("txid", 2);
+ clientFile.putIntProperty("i", i);
producer.send(clientFile);
}
session.end(xid2, XAResource.TMSUCCESS);
@@ -422,6 +423,7 @@
for (int start = 0 ; start < 2; start++)
{
+ System.out.println("Start " + start);
sf = locator.createSessionFactory();
@@ -437,6 +439,7 @@
session.start();
for (int i = 0 ; i < 10; i++)
{
+ log.info("I = " + i);
ClientMessage msg = cons1.receive(5000);
assertNotNull(msg);
assertEquals(1, msg.getIntProperty("txid").intValue());
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2011-10-14 16:59:20 UTC (rev 11542)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2011-10-14 17:02:14 UTC (rev 11543)
@@ -15,6 +15,9 @@
import java.io.ByteArrayOutputStream;
import java.util.HashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
@@ -70,7 +73,136 @@
{
return false;
}
+
+ public void testRollbackPartiallyConsumedBuffer() throws Exception
+ {
+ for (int i = 0 ; i < 1; i++)
+ {
+ log.info("#test " + i);
+ internalTestRollbackPartiallyConsumedBuffer(false);
+ tearDown();
+ setUp();
+
+ }
+
+ }
+
+ public void testRollbackPartiallyConsumedBufferWithRedeliveryDelay() throws Exception
+ {
+ internalTestRollbackPartiallyConsumedBuffer(true);
+ }
+
+
+ private void internalTestRollbackPartiallyConsumedBuffer(final boolean redeliveryDelay) throws Exception
+ {
+ final int messageSize = 100 * 1024;
+
+ final ClientSession session;
+
+ try
+ {
+ server = createServer(true, isNetty());
+
+ AddressSettings settings = new AddressSettings();
+ if (redeliveryDelay)
+ {
+ settings.setRedeliveryDelay(1000);
+ if (locator.isCompressLargeMessage())
+ {
+ locator.setConsumerWindowSize(0);
+ }
+ }
+ settings.setMaxDeliveryAttempts(-1);
+
+ server.getAddressSettingsRepository().addMatch("#", settings);
+
+ server.start();
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ session = sf.createSession(false, false, false);
+
+ session.createQueue(ADDRESS, ADDRESS, true);
+
+ ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+
+ for (int i = 0 ; i < 20; i++)
+ {
+ Message clientFile = createLargeClientMessage(session, messageSize, true);
+
+ clientFile.putIntProperty("value", i);
+
+ producer.send(clientFile);
+ }
+
+ session.commit();
+
+ session.start();
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ final AtomicInteger errors = new AtomicInteger(0);
+
+ ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS);
+
+ consumer.setMessageHandler(new MessageHandler()
+ {
+ int counter = 0;
+ public void onMessage(ClientMessage message)
+ {
+ message.getBodyBuffer().readByte();
+ System.out.println("message:" + message);
+ try
+ {
+ if (counter ++ < 20)
+ {
+ Thread.sleep(100);
+ System.out.println("Rollback");
+ message.acknowledge();
+ session.rollback();
+ }
+ else
+ {
+ message.acknowledge();
+ session.commit();
+ }
+
+ if (counter == 40)
+ {
+ latch.countDown();
+ }
+ }
+ catch (Exception e)
+ {
+ latch.countDown();
+ e.printStackTrace();
+ errors.incrementAndGet();
+ }
+ }
+ });
+
+ assertTrue(latch.await(40, TimeUnit.SECONDS));
+
+ consumer.close();
+
+ session.close();
+
+ validateNoFilesOnLargeDir();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
public void testCloseConsumer() throws Exception
{
final int messageSize = (int)(3.5 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
@@ -124,7 +256,7 @@
{
try
{
- server.stop();
+ session.close();
}
catch (Throwable ignored)
{
@@ -132,7 +264,7 @@
try
{
- session.close();
+ server.stop();
}
catch (Throwable ignored)
{
@@ -500,16 +632,17 @@
ClientConsumer consumerExpiry = session.createConsumer(ADDRESS_EXPIRY);
ClientMessage msg1 = consumerExpiry.receive(5000);
+ assertTrue(msg1.isLargeMessage());
Assert.assertNotNull(msg1);
msg1.acknowledge();
- session.rollback();
-
for (int j = 0; j < messageSize; j++)
{
Assert.assertEquals(UnitTestCase.getSamplebyte(j), msg1.getBodyBuffer().readByte());
}
+ session.rollback();
+
consumerExpiry.close();
for (int i = 0; i < 10; i++)
@@ -521,13 +654,13 @@
Assert.assertNotNull(msg1);
msg1.acknowledge();
- session.rollback();
-
for (int j = 0; j < messageSize; j++)
{
Assert.assertEquals(UnitTestCase.getSamplebyte(j), msg1.getBodyBuffer().readByte());
}
+ session.rollback();
+
consumerExpiry.close();
}
@@ -638,13 +771,13 @@
Assert.assertNotNull(msg1);
msg1.acknowledge();
- session.rollback();
-
for (int j = 0; j < messageSize; j++)
{
Assert.assertEquals(UnitTestCase.getSamplebyte(j), msg1.getBodyBuffer().readByte());
}
+ session.rollback();
+
consumerExpiry.close();
for (int i = 0; i < 10; i++)
@@ -655,13 +788,13 @@
Assert.assertNotNull(msg1);
msg1.acknowledge();
- session.rollback();
-
for (int j = 0; j < messageSize; j++)
{
Assert.assertEquals(UnitTestCase.getSamplebyte(j), msg1.getBodyBuffer().readByte());
}
+ session.rollback();
+
consumerExpiry.close();
}
@@ -1892,6 +2025,7 @@
ClientConsumer consumer = session.createConsumer(queue[1]);
ClientMessage msg = consumer.receive(LargeMessageTest.RECEIVE_WAIT_TIME);
+ msg.getBodyBuffer().readByte();
Assert.assertNull(consumer.receiveImmediate());
Assert.assertNotNull(msg);
13 years, 3 months
JBoss hornetq SVN: r11542 - trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution.
by do-not-reply@jboss.org
Author: ataylor
Date: 2011-10-14 12:59:20 -0400 (Fri, 14 Oct 2011)
New Revision: 11542
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
Log:
removed unwanted locators0 variable
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-10-14 16:54:56 UTC (rev 11541)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-10-14 16:59:20 UTC (rev 11542)
@@ -121,8 +121,6 @@
protected NodeManager[] nodeManagers;
- protected ServerLocator[] locators0;
-
protected ClientSessionFactory[] sfs;
protected long[] timeStarts;
@@ -184,7 +182,6 @@
}
UnitTestCase.checkFreePort(ClusterTestBase.PORTS);
- locators0 = null;
servers = null;
sfs = null;
@@ -592,15 +589,15 @@
protected void closeAllServerLocatorsFactories() throws Exception
{
- for (int i = 0; i < locators0.length; i++)
+ for (int i = 0; i < locators.length; i++)
{
- ServerLocator sf = locators0[i];
+ ServerLocator sf = locators[i];
if (sf != null)
{
sf.close();
- locators0[i] = null;
+ locators[i] = null;
}
}
}
@@ -1400,16 +1397,16 @@
if (ha)
{
- locators0[node] = HornetQClient.createServerLocatorWithHA(serverTotc);
+ locators[node] = HornetQClient.createServerLocatorWithHA(serverTotc);
}
else
{
- locators0[node] = HornetQClient.createServerLocatorWithoutHA(serverTotc);
+ locators[node] = HornetQClient.createServerLocatorWithoutHA(serverTotc);
}
- locators0[node].setBlockOnNonDurableSend(true);
- locators0[node].setBlockOnDurableSend(true);
- ClientSessionFactory sf = locators0[node].createSessionFactory();
+ locators[node].setBlockOnNonDurableSend(true);
+ locators[node].setBlockOnDurableSend(true);
+ ClientSessionFactory sf = locators[node].createSessionFactory();
ClientSession session = sf.createSession();
session.close();
@@ -1436,12 +1433,12 @@
serverTotc = new TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY, params);
}
- locators0[node] = HornetQClient.createServerLocatorWithoutHA(serverTotc);
+ locators[node] = HornetQClient.createServerLocatorWithoutHA(serverTotc);
- locators0[node].setBlockOnNonDurableSend(true);
- locators0[node].setBlockOnDurableSend(true);
- locators0[node].setReconnectAttempts(reconnectAttempts);
- ClientSessionFactory sf = locators0[node].createSessionFactory();
+ locators[node].setBlockOnNonDurableSend(true);
+ locators[node].setBlockOnDurableSend(true);
+ locators[node].setReconnectAttempts(reconnectAttempts);
+ ClientSessionFactory sf = locators[node].createSessionFactory();
sfs[node] = sf;
}
@@ -1466,14 +1463,14 @@
serverTotc = new TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY, params);
}
- locators0[node] = HornetQClient.createServerLocatorWithHA(serverTotc);
- locators0[node].setRetryInterval(100);
- locators0[node].setRetryIntervalMultiplier(1d);
- locators0[node].setReconnectAttempts(-1);
- locators0[node].setBlockOnNonDurableSend(blocking);
- locators0[node].setBlockOnDurableSend(blocking);
+ locators[node] = HornetQClient.createServerLocatorWithHA(serverTotc);
+ locators[node].setRetryInterval(100);
+ locators[node].setRetryIntervalMultiplier(1d);
+ locators[node].setReconnectAttempts(-1);
+ locators[node].setBlockOnNonDurableSend(blocking);
+ locators[node].setBlockOnDurableSend(blocking);
- ClientSessionFactory sf = locators0[node].createSessionFactory();
+ ClientSessionFactory sf = locators[node].createSessionFactory();
sfs[node] = sf;
}
13 years, 3 months
JBoss hornetq SVN: r11541 - in trunk/hornetq-core/src/main/java/org/hornetq/core: protocol/core/impl/wireformat and 1 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-10-14 12:54:56 -0400 (Fri, 14 Oct 2011)
New Revision: 11541
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientConsumerImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/LargeMessageControllerImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/SessionContinuationMessage.java
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/ServerConsumerImpl.java
Log:
JBPAPP-7389 - flow control on large messages
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientConsumerImpl.java 2011-10-14 16:54:26 UTC (rev 11540)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientConsumerImpl.java 2011-10-14 16:54:56 UTC (rev 11541)
@@ -572,7 +572,7 @@
largeMessageCache.deleteOnExit();
}
- currentLargeMessageController = new LargeMessageControllerImpl(this, packet.getLargeMessageSize(), 60, largeMessageCache);
+ currentLargeMessageController = new LargeMessageControllerImpl(this, packet.getLargeMessageSize(), 5, largeMessageCache);
if (currentChunkMessage.isCompressed())
{
@@ -594,7 +594,18 @@
{
return;
}
- currentLargeMessageController.addPacket(chunk);
+ if (currentLargeMessageController == null)
+ {
+ if (log.isTraceEnabled())
+ {
+ log.trace("Sending back credits for largeController = null " + chunk.getPacketSize());
+ }
+ flowControl(chunk.getPacketSize(), false);
+ }
+ else
+ {
+ currentLargeMessageController.addPacket(chunk);
+ }
}
public void clear(boolean waitForOnMessage) throws HornetQException
@@ -607,12 +618,39 @@
while (iter.hasNext())
{
- ClientMessageInternal message = iter.next();
+ try
+ {
+ ClientMessageInternal message = iter.next();
+
+ if (message.isLargeMessage())
+ {
+ ClientLargeMessageInternal largeMessage = (ClientLargeMessageInternal)message;
+ largeMessage.getLargeMessageController().cancel();
+ }
- flowControlBeforeConsumption(message);
+ flowControlBeforeConsumption(message);
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ }
}
clearBuffer();
+
+ try
+ {
+ if (currentLargeMessageController != null)
+ {
+ currentLargeMessageController.cancel();
+ currentLargeMessageController = null;
+ }
+ }
+ catch (Throwable e)
+ {
+ // nothing that could be done here
+ log.warn(e.getMessage(), e);
+ }
}
// Need to send credits for the messages in the buffer
@@ -678,6 +716,11 @@
if (clientWindowSize >= 0)
{
creditsToSend += messageBytes;
+
+ if (log.isTraceEnabled())
+ {
+ log.trace(this + "::FlowControl::creditsToSend=" + creditsToSend + ", clientWindowSize = " + clientWindowSize + " messageBytes = " + messageBytes);
+ }
if (creditsToSend >= clientWindowSize)
{
@@ -685,7 +728,7 @@
{
if (ClientConsumerImpl.isTrace)
{
- ClientConsumerImpl.log.trace("Sending " + creditsToSend + " -1, for slow consumer");
+ ClientConsumerImpl.log.trace("FlowControl::Sending " + creditsToSend + " -1, for slow consumer");
}
// sending the credits - 1 initially send to fire the slow consumer, or the slow consumer would be
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/LargeMessageControllerImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/LargeMessageControllerImpl.java 2011-10-14 16:54:26 UTC (rev 11540)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/LargeMessageControllerImpl.java 2011-10-14 16:54:56 UTC (rev 11541)
@@ -31,6 +31,7 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
import org.hornetq.utils.DataConstants;
import org.hornetq.utils.UTF8Util;
@@ -141,7 +142,7 @@
{
checkForPacket(totalSize - 1);
}
- catch (Exception ignored)
+ catch (Throwable ignored)
{
}
}
@@ -227,6 +228,24 @@
public synchronized void cancel()
{
+
+ int totalSize = 0;
+ Packet polledPacket = null;
+ while ((polledPacket = packets.poll()) != null)
+ {
+ totalSize += polledPacket.getPacketSize();
+ }
+
+ try
+ {
+ consumerInternal.flowControl(totalSize, false);
+ }
+ catch (Exception ignored)
+ {
+ // what else can we do here?
+ log.warn(ignored.getMessage(), ignored);
+ }
+
packets.offer(new SessionReceiveContinuationMessage());
streamEnded = true;
streamClosed = true;
@@ -279,6 +298,11 @@
public synchronized void saveBuffer(final OutputStream output) throws HornetQException
{
+ if (streamClosed)
+ {
+ throw new HornetQException(HornetQException.ILLEGAL_STATE,
+ "The large message lost connection with its session, either because of a rollback or a closed session");
+ }
setOutputStream(output);
waitCompletion(0);
}
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/SessionContinuationMessage.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/SessionContinuationMessage.java 2011-10-14 16:54:26 UTC (rev 11540)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/SessionContinuationMessage.java 2011-10-14 16:54:56 UTC (rev 11541)
@@ -64,7 +64,14 @@
*/
public byte[] getBody()
{
- return body;
+ if (size <= 0)
+ {
+ return new byte[0];
+ }
+ else
+ {
+ return body;
+ }
}
/**
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java 2011-10-14 16:54:26 UTC (rev 11540)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java 2011-10-14 16:54:56 UTC (rev 11541)
@@ -77,6 +77,21 @@
super.encodeRest(buffer);
buffer.writeLong(consumerID);
}
+ @Override
+ public int getPacketSize()
+ {
+ if (size == -1)
+ {
+ // This packet was created by the LargeMessageController
+ return 0;
+ }
+ else
+ {
+ return size;
+ }
+ }
+
+
@Override
public void decodeRest(final HornetQBuffer buffer)
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-10-14 16:54:26 UTC (rev 11540)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-10-14 16:54:56 UTC (rev 11541)
@@ -61,7 +61,7 @@
// Constants ------------------------------------------------------------------------------------
private static final Logger log = Logger.getLogger(ServerConsumerImpl.class);
-
+
private static boolean isTrace = log.isTraceEnabled();
// Static ---------------------------------------------------------------------------------------
@@ -85,7 +85,7 @@
private boolean started;
private volatile LargeMessageDeliverer largeMessageDeliverer = null;
-
+
public String debug()
{
return toString() + "::Delivering " + this.deliveringRefs.size();
@@ -115,7 +115,7 @@
private final Binding binding;
private boolean transferring = false;
-
+
/* As well as consumer credit based flow control, we also tap into TCP flow control (assuming transport is using TCP)
* This is useful in the case where consumer-window-size = -1, but we don't want to OOM by sending messages ad infinitum to the Netty
* write queue when the TCP buffer is full, e.g. the client is slow or has died.
@@ -163,11 +163,11 @@
minLargeMessageSize = session.getMinLargeMessageSize();
this.strictUpdateDeliveryCount = strictUpdateDeliveryCount;
-
+
this.callback.addReadyListener(this);
this.creationTime = System.currentTimeMillis();
-
+
if (browseOnly)
{
browserDeliverer = new BrowserDeliverer(messageQueue.iterator());
@@ -185,7 +185,7 @@
{
return id;
}
-
+
public boolean isBrowseOnly()
{
return browseOnly;
@@ -195,12 +195,12 @@
{
return creationTime;
}
-
+
public String getConnectionID()
{
return this.session.getConnectionID().toString();
}
-
+
public String getSessionID()
{
return this.session.getName();
@@ -210,20 +210,23 @@
{
if (availableCredits != null && availableCredits.get() <= 0)
{
- if (log.isDebugEnabled() )
+ if (log.isDebugEnabled())
{
- log.debug(this + " is busy for the lack of credits!!!");
+ log.debug(this + " is busy for the lack of credits. Current credits = " +
+ availableCredits +
+ " Can't receive reference " +
+ ref);
}
-
+
return HandleStatus.BUSY;
}
-
-// TODO - https://jira.jboss.org/browse/HORNETQ-533
-// if (!writeReady.get())
-// {
-// return HandleStatus.BUSY;
-// }
-
+
+ // TODO - https://jira.jboss.org/browse/HORNETQ-533
+ // if (!writeReady.get())
+ // {
+ // return HandleStatus.BUSY;
+ // }
+
synchronized (lock)
{
// If the consumer is stopped then we don't accept the message, it
@@ -233,11 +236,18 @@
{
return HandleStatus.BUSY;
}
-
+
// If there is a pendingLargeMessage we can't take another message
// This has to be checked inside the lock as the set to null is done inside the lock
if (largeMessageDeliverer != null)
{
+ if (log.isDebugEnabled())
+ {
+ log.debug(this + " is busy delivering large message " +
+ largeMessageDeliverer +
+ ", can't deliver reference " +
+ ref);
+ }
return HandleStatus.BUSY;
}
@@ -268,7 +278,9 @@
// the updateDeliveryCount would still be updated after c
if (strictUpdateDeliveryCount && !ref.isPaged())
{
- if (ref.getMessage().isDurable() && ref.getQueue().isDurable() && !ref.getQueue().isInternalQueue() && !ref.isPaged())
+ if (ref.getMessage().isDurable() && ref.getQueue().isDurable() &&
+ !ref.getQueue().isInternalQueue() &&
+ !ref.isPaged())
{
storageManager.updateDeliveryCount(ref);
}
@@ -309,7 +321,7 @@
public void close(final boolean failed) throws Exception
{
callback.removeReadyListener(this);
-
+
setStarted(false);
if (largeMessageDeliverer != null)
@@ -355,8 +367,8 @@
props.putSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME, binding.getRoutingName());
- props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING, filter == null ? null
- : filter.getFilterString());
+ props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING,
+ filter == null ? null : filter.getFilterString());
props.putIntProperty(ManagementHelper.HDR_DISTANCE, binding.getDistance());
@@ -406,10 +418,28 @@
}
}
- public LinkedList<MessageReference> cancelRefs(final boolean failed, final boolean lastConsumedAsDelivered, final Transaction tx) throws Exception
+ public LinkedList<MessageReference> cancelRefs(final boolean failed,
+ final boolean lastConsumedAsDelivered,
+ final Transaction tx) throws Exception
{
boolean performACK = lastConsumedAsDelivered;
+ try
+ {
+ if (largeMessageDeliverer != null)
+ {
+ largeMessageDeliverer.finish();
+ }
+ }
+ catch (Throwable e)
+ {
+ log.warn("Error on resetting large message deliver - " + largeMessageDeliverer, e);
+ }
+ finally
+ {
+ largeMessageDeliverer = null;
+ }
+
LinkedList<MessageReference> refs = new LinkedList<MessageReference>();
if (!deliveringRefs.isEmpty())
@@ -430,8 +460,9 @@
{
if (!failed)
{
- //We don't decrement delivery count if the client failed, since there's a possibility that refs were actually delivered but we just didn't get any acks for them
- //before failure
+ // We don't decrement delivery count if the client failed, since there's a possibility that refs
+ // were actually delivered but we just didn't get any acks for them
+ // before failure
ref.decrementDeliveryCount();
}
@@ -492,18 +523,23 @@
}
public void receiveCredits(final int credits) throws Exception
- {
+ {
if (credits == -1)
{
+ if (log.isDebugEnabled())
+ {
+ log.debug(this + ":: FlowControl::Received disable flow control message");
+ }
// No flow control
availableCredits = null;
-
- //There may be messages already in the queue
+
+ // There may be messages already in the queue
promptDelivery();
}
else if (credits == 0)
{
- //reset, used on slow consumers
+ // reset, used on slow consumers
+ log.debug(this + ":: FlowControl::Received reset flow control message");
availableCredits.set(0);
}
else
@@ -512,16 +548,17 @@
if (log.isDebugEnabled())
{
- log.debug(this + "::Received " + credits +
- " credits, previous value = " +
- previous +
- " currentValue = " +
- availableCredits.get());
+ log.debug(this + "::FlowControl::Received " +
+ credits +
+ " credits, previous value = " +
+ previous +
+ " currentValue = " +
+ availableCredits.get());
}
if (previous <= 0 && previous + credits > 0)
{
- if (log.isTraceEnabled() )
+ if (log.isTraceEnabled())
{
log.trace(this + "::calling promptDelivery from receiving credits");
}
@@ -541,7 +578,7 @@
{
return;
}
-
+
// Acknowledge acknowledges all refs delivered by the consumer up to and including the one explicitly
// acknowledged
@@ -573,21 +610,21 @@
}
while (ref.getMessage().getMessageID() != messageID);
}
-
+
public void individualAcknowledge(final boolean autoCommitAcks, final Transaction tx, final long messageID) throws Exception
{
if (browseOnly)
{
return;
}
-
+
MessageReference ref = removeReferenceByID(messageID);
-
+
if (ref == null)
{
throw new IllegalStateException("Cannot find ref to ack " + messageID);
}
-
+
if (autoCommitAcks)
{
ref.getQueue().acknowledge(ref);
@@ -627,13 +664,13 @@
return ref;
}
-
+
public void readyForWriting(final boolean ready)
{
if (ready)
{
writeReady.set(true);
-
+
promptDelivery();
}
else
@@ -701,10 +738,17 @@
if (availableCredits != null)
{
availableCredits.addAndGet(-packetSize);
+
+ if (log.isTraceEnabled())
+ {
+ log.trace(this + "::FlowControl::delivery standard taking " +
+ packetSize +
+ " from credits, available now is " +
+ availableCredits);
+ }
}
}
-
// Inner classes
// ------------------------------------------------------------------------
@@ -766,6 +810,12 @@
if (availableCredits != null && availableCredits.get() <= 0)
{
+ if (log.isTraceEnabled())
+ {
+ log.trace(this + "::FlowControl::delivery largeMessage interrupting as there are no more credits, available=" +
+ availableCredits);
+ }
+
return false;
}
@@ -774,7 +824,7 @@
context = largeMessage.getBodyEncoder();
sizePendingLargeMessage = context.getLargeBodySize();
-
+
context.open();
sentInitialPacket = true;
@@ -787,6 +837,15 @@
if (availableCredits != null)
{
availableCredits.addAndGet(-packetSize);
+
+ if (log.isTraceEnabled())
+ {
+ log.trace(this + "::FlowControl::" +
+ " deliver initialpackage with " +
+ packetSize +
+ " delivered, available now = " +
+ availableCredits);
+ }
}
// Execute the rest of the large message on a different thread so as not to tie up the delivery thread
@@ -802,7 +861,8 @@
{
if (ServerConsumerImpl.isTrace)
{
- log.trace("deliverLargeMessage: Leaving loop of send LargeMessage because of credits");
+ log.trace(this + "::FlowControl::deliverLargeMessage Leaving loop of send LargeMessage because of credits, available=" +
+ availableCredits);
}
return false;
@@ -825,16 +885,17 @@
int chunkLen = body.length;
- if (ServerConsumerImpl.isTrace)
- {
- log.trace("deliverLargeMessage: Sending " + packetSize +
- " availableCredits now is " +
- availableCredits);
- }
-
if (availableCredits != null)
{
availableCredits.addAndGet(-packetSize);
+
+ if (log.isTraceEnabled())
+ {
+ log.trace(this + "::FlowControl::largeMessage deliver continuation, packetSize=" +
+ packetSize +
+ " available now=" +
+ availableCredits);
+ }
}
positionPendingLargeMessage += chunkLen;
@@ -898,7 +959,7 @@
}
private final LinkedListIterator<MessageReference> iterator;
-
+
public synchronized void close()
{
iterator.close();
13 years, 3 months
JBoss hornetq SVN: r11540 - trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-10-14 12:54:26 -0400 (Fri, 14 Oct 2011)
New Revision: 11540
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/LargeMessageTest.java
Log:
JBPAPP-7389 - flow control on large messages
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java 2011-10-14 14:36:55 UTC (rev 11539)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java 2011-10-14 16:54:26 UTC (rev 11540)
@@ -37,6 +37,7 @@
import org.hornetq.core.server.Consumer;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.impl.ServerConsumerImpl;
+import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.tests.util.ServiceTestBase;
/**
@@ -911,6 +912,123 @@
internalTestSlowConsumerOnMessageHandlerNoBuffers(true);
}
+ public void testFlowControl() throws Exception
+ {
+ internalTestFlowControlOnRollback(false);
+ }
+
+ public void testFlowControlLargeMessage() throws Exception
+ {
+ internalTestFlowControlOnRollback(true);
+ }
+
+ private void internalTestFlowControlOnRollback(final boolean isLargeMessage) throws Exception
+ {
+
+ HornetQServer server = createServer(false, isNetty());
+
+ AddressSettings settings = new AddressSettings();
+ settings.setMaxDeliveryAttempts(-1);
+ server.getAddressSettingsRepository().addMatch("#", settings);
+
+ ClientSession session = null;
+
+ try
+ {
+ final int numberOfMessages = 100;
+
+ server.start();
+
+ locator.setConsumerWindowSize(300000);
+
+ if (isLargeMessage)
+ {
+ // something to ensure we are using large messages
+ locator.setMinLargeMessageSize(100);
+ }
+ else
+ {
+ // To make sure large messages won't kick in, we set anything large
+ locator.setMinLargeMessageSize(Integer.MAX_VALUE);
+ }
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ session = sf.createSession(false, false, false);
+
+ SimpleString ADDRESS = new SimpleString("some-queue");
+
+ session.createQueue(ADDRESS, ADDRESS, true);
+
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ for (int i = 0 ; i < numberOfMessages; i++)
+ {
+ ClientMessage msg = session.createMessage(true);
+ msg.putIntProperty("count", i);
+ msg.getBodyBuffer().writeBytes(new byte[1024]);
+ producer.send(msg);
+ }
+
+ session.commit();
+
+ ClientConsumerInternal consumer = (ClientConsumerInternal)session.createConsumer(ADDRESS);
+
+ session.start();
+
+ for (int repeat = 0; repeat < 100; repeat ++)
+ {
+ System.out.println("Repeat " + repeat);
+ long timeout = System.currentTimeMillis() + 2000;
+ // At least 10 messages on the buffer
+ while (timeout > System.currentTimeMillis() && consumer.getBufferSize() <= 10)
+ {
+ Thread.sleep(10);
+ }
+ assertTrue(consumer.getBufferSize() >= 10);
+
+ ClientMessage msg = consumer.receive(500);
+ msg.getBodyBuffer().readByte();
+ assertNotNull(msg);
+ msg.acknowledge();
+ session.rollback();
+ }
+
+
+ for (int i = 0 ; i < numberOfMessages; i++)
+ {
+ ClientMessage msg = consumer.receive(5000);
+ assertNotNull(msg);
+ System.out.println("msg " + msg);
+ msg.getBodyBuffer().readByte();
+ msg.acknowledge();
+ session.commit();
+ }
+
+ }
+ finally
+ {
+ try
+ {
+ if (session != null)
+ {
+ session.close();
+ }
+ }
+ catch (Exception ignored)
+ {
+ }
+
+ if (server.isStarted())
+ {
+ server.stop();
+ }
+ }
+ }
+
+
+
public void internalTestSlowConsumerOnMessageHandlerNoBuffers(final boolean largeMessages) throws Exception
{
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java 2011-10-14 14:36:55 UTC (rev 11539)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java 2011-10-14 16:54:26 UTC (rev 11540)
@@ -408,6 +408,7 @@
{
Message clientFile = createLargeClientMessage(session, messageSize, true);
clientFile.putIntProperty("txid", 2);
+ clientFile.putIntProperty("i", i);
producer.send(clientFile);
}
session.end(xid2, XAResource.TMSUCCESS);
@@ -422,6 +423,7 @@
for (int start = 0 ; start < 2; start++)
{
+ System.out.println("Start " + start);
sf = locator.createSessionFactory();
@@ -437,6 +439,7 @@
session.start();
for (int i = 0 ; i < 10; i++)
{
+ log.info("I = " + i);
ClientMessage msg = cons1.receive(5000);
assertNotNull(msg);
assertEquals(1, msg.getIntProperty("txid").intValue());
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/LargeMessageTest.java 2011-10-14 14:36:55 UTC (rev 11539)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/LargeMessageTest.java 2011-10-14 16:54:26 UTC (rev 11540)
@@ -15,6 +15,9 @@
import java.io.ByteArrayOutputStream;
import java.util.HashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
@@ -70,7 +73,136 @@
{
return false;
}
+
+ public void testRollbackPartiallyConsumedBuffer() throws Exception
+ {
+ for (int i = 0 ; i < 1; i++)
+ {
+ log.info("#test " + i);
+ internalTestRollbackPartiallyConsumedBuffer(false);
+ tearDown();
+ setUp();
+
+ }
+
+ }
+
+ public void testRollbackPartiallyConsumedBufferWithRedeliveryDelay() throws Exception
+ {
+ internalTestRollbackPartiallyConsumedBuffer(true);
+ }
+
+
+ private void internalTestRollbackPartiallyConsumedBuffer(final boolean redeliveryDelay) throws Exception
+ {
+ final int messageSize = 100 * 1024;
+
+ final ClientSession session;
+
+ try
+ {
+ server = createServer(true, isNetty());
+
+ AddressSettings settings = new AddressSettings();
+ if (redeliveryDelay)
+ {
+ settings.setRedeliveryDelay(1000);
+ if (locator.isCompressLargeMessage())
+ {
+ locator.setConsumerWindowSize(0);
+ }
+ }
+ settings.setMaxDeliveryAttempts(-1);
+
+ server.getAddressSettingsRepository().addMatch("#", settings);
+
+ server.start();
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ session = sf.createSession(false, false, false);
+
+ session.createQueue(ADDRESS, ADDRESS, true);
+
+ ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+
+ for (int i = 0 ; i < 20; i++)
+ {
+ Message clientFile = createLargeClientMessage(session, messageSize, true);
+
+ clientFile.putIntProperty("value", i);
+
+ producer.send(clientFile);
+ }
+
+ session.commit();
+
+ session.start();
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ final AtomicInteger errors = new AtomicInteger(0);
+
+ ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS);
+
+ consumer.setMessageHandler(new MessageHandler()
+ {
+ int counter = 0;
+ public void onMessage(ClientMessage message)
+ {
+ message.getBodyBuffer().readByte();
+ System.out.println("message:" + message);
+ try
+ {
+ if (counter ++ < 20)
+ {
+ Thread.sleep(100);
+ System.out.println("Rollback");
+ message.acknowledge();
+ session.rollback();
+ }
+ else
+ {
+ message.acknowledge();
+ session.commit();
+ }
+
+ if (counter == 40)
+ {
+ latch.countDown();
+ }
+ }
+ catch (Exception e)
+ {
+ latch.countDown();
+ e.printStackTrace();
+ errors.incrementAndGet();
+ }
+ }
+ });
+
+ assertTrue(latch.await(40, TimeUnit.SECONDS));
+
+ consumer.close();
+
+ session.close();
+
+ validateNoFilesOnLargeDir();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
public void testCloseConsumer() throws Exception
{
final int messageSize = (int)(3.5 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
@@ -124,7 +256,7 @@
{
try
{
- server.stop();
+ session.close();
}
catch (Throwable ignored)
{
@@ -132,7 +264,7 @@
try
{
- session.close();
+ server.stop();
}
catch (Throwable ignored)
{
@@ -500,16 +632,17 @@
ClientConsumer consumerExpiry = session.createConsumer(ADDRESS_EXPIRY);
ClientMessage msg1 = consumerExpiry.receive(5000);
+ assertTrue(msg1.isLargeMessage());
Assert.assertNotNull(msg1);
msg1.acknowledge();
- session.rollback();
-
for (int j = 0; j < messageSize; j++)
{
Assert.assertEquals(UnitTestCase.getSamplebyte(j), msg1.getBodyBuffer().readByte());
}
+ session.rollback();
+
consumerExpiry.close();
for (int i = 0; i < 10; i++)
@@ -521,13 +654,13 @@
Assert.assertNotNull(msg1);
msg1.acknowledge();
- session.rollback();
-
for (int j = 0; j < messageSize; j++)
{
Assert.assertEquals(UnitTestCase.getSamplebyte(j), msg1.getBodyBuffer().readByte());
}
+ session.rollback();
+
consumerExpiry.close();
}
@@ -638,13 +771,13 @@
Assert.assertNotNull(msg1);
msg1.acknowledge();
- session.rollback();
-
for (int j = 0; j < messageSize; j++)
{
Assert.assertEquals(UnitTestCase.getSamplebyte(j), msg1.getBodyBuffer().readByte());
}
+ session.rollback();
+
consumerExpiry.close();
for (int i = 0; i < 10; i++)
@@ -655,13 +788,13 @@
Assert.assertNotNull(msg1);
msg1.acknowledge();
- session.rollback();
-
for (int j = 0; j < messageSize; j++)
{
Assert.assertEquals(UnitTestCase.getSamplebyte(j), msg1.getBodyBuffer().readByte());
}
+ session.rollback();
+
consumerExpiry.close();
}
@@ -1892,6 +2025,7 @@
ClientConsumer consumer = session.createConsumer(queue[1]);
ClientMessage msg = consumer.receive(LargeMessageTest.RECEIVE_WAIT_TIME);
+ msg.getBodyBuffer().readByte();
Assert.assertNull(consumer.receiveImmediate());
Assert.assertNotNull(msg);
13 years, 3 months
JBoss hornetq SVN: r11539 - trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2011-10-14 10:36:55 -0400 (Fri, 14 Oct 2011)
New Revision: 11539
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java
Log:
remove debug log
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java 2011-10-14 14:18:06 UTC (rev 11538)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java 2011-10-14 14:36:55 UTC (rev 11539)
@@ -417,7 +417,6 @@
error.setDisconnect(true);
throw error;
}
- log.error("------------------ negotiated version is " + this.version);
}
this.frameHandler = VersionedStompFrameHandler.getHandler(this, this.version);
13 years, 3 months
JBoss hornetq SVN: r11537 - trunk/hornetq-core/src/main/java/org/hornetq/spi/core/security.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2011-10-14 09:50:48 -0400 (Fri, 14 Oct 2011)
New Revision: 11537
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/spi/core/security/HornetQSecurityManagerImpl.java
Log:
remove debug logs
Modified: trunk/hornetq-core/src/main/java/org/hornetq/spi/core/security/HornetQSecurityManagerImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/spi/core/security/HornetQSecurityManagerImpl.java 2011-10-14 09:54:02 UTC (rev 11536)
+++ trunk/hornetq-core/src/main/java/org/hornetq/spi/core/security/HornetQSecurityManagerImpl.java 2011-10-14 13:50:48 UTC (rev 11537)
@@ -119,7 +119,6 @@
public void addUser(final String user, final String password)
{
- log.error("-------------------------------adding user: " + user + " password " + password);
if (user == null)
{
throw new IllegalArgumentException("User cannot be null");
13 years, 3 months
JBoss hornetq SVN: r11536 - trunk/hornetq-core.
by do-not-reply@jboss.org
Author: borges
Date: 2011-10-14 05:54:02 -0400 (Fri, 14 Oct 2011)
New Revision: 11536
Modified:
trunk/hornetq-core/pom.xml
Log:
remove unnecessary version declaration.
Modified: trunk/hornetq-core/pom.xml
===================================================================
--- trunk/hornetq-core/pom.xml 2011-10-14 07:07:12 UTC (rev 11535)
+++ trunk/hornetq-core/pom.xml 2011-10-14 09:54:02 UTC (rev 11536)
@@ -90,7 +90,6 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
- <version>2.3.2</version>
<executions>
<execution>
<phase>test</phase>
13 years, 3 months
JBoss hornetq SVN: r11535 - trunk/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl.
by do-not-reply@jboss.org
Author: ataylor
Date: 2011-10-14 03:07:12 -0400 (Fri, 14 Oct 2011)
New Revision: 11535
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
Log:
check for null on cc for backup server locator but remember to create it first
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-10-13 17:29:08 UTC (rev 11534)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-10-14 07:07:12 UTC (rev 11535)
@@ -241,9 +241,10 @@
clusterConnector = new StaticClusterConnector(tcConfigs);
+ backupServerLocator = clusterConnector.createServerLocator(false);
+
if (backupServerLocator != null)
{
- backupServerLocator = clusterConnector.createServerLocator(false);
backupServerLocator.setReconnectAttempts(-1);
backupServerLocator.setInitialConnectAttempts(-1);
}
@@ -347,9 +348,10 @@
clusterConnector = new DiscoveryClusterConnector(dg);
+ backupServerLocator = clusterConnector.createServerLocator(false);
+
if (backupServerLocator != null)
{
- backupServerLocator = clusterConnector.createServerLocator(false);
backupServerLocator.setReconnectAttempts(-1);
backupServerLocator.setInitialConnectAttempts(-1);
}
13 years, 3 months
JBoss hornetq SVN: r11534 - in trunk/hornetq-core/src/main/java/org/hornetq/core: server/cluster/impl and 1 other directory.
by do-not-reply@jboss.org
Author: ataylor
Date: 2011-10-13 13:29:08 -0400 (Thu, 13 Oct 2011)
New Revision: 11534
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
Log:
check for null on cc for backup server locator
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-10-13 14:07:24 UTC (rev 11533)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-10-13 17:29:08 UTC (rev 11534)
@@ -1522,9 +1522,9 @@
// cause reconnect loop
public void run()
{
- CLOSE_RUNNABLES.add(this);
try
{
+ CLOSE_RUNNABLES.add(this);
conn.fail(new HornetQException(HornetQException.DISCONNECTED,
"The connection was disconnected because of server shutdown"));
} finally
@@ -1536,6 +1536,7 @@
public ClientSessionFactoryImpl stop()
{
+ CLOSE_RUNNABLES.remove(this);
causeExit();
return ClientSessionFactoryImpl.this;
}
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-10-13 14:07:24 UTC (rev 11533)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-10-13 17:29:08 UTC (rev 11534)
@@ -241,9 +241,12 @@
clusterConnector = new StaticClusterConnector(tcConfigs);
- backupServerLocator = clusterConnector.createServerLocator(false);
- backupServerLocator.setReconnectAttempts(-1);
- backupServerLocator.setInitialConnectAttempts(-1);
+ if (backupServerLocator != null)
+ {
+ backupServerLocator = clusterConnector.createServerLocator(false);
+ backupServerLocator.setReconnectAttempts(-1);
+ backupServerLocator.setInitialConnectAttempts(-1);
+ }
if (tcConfigs != null && tcConfigs.length > 0)
{
@@ -344,9 +347,12 @@
clusterConnector = new DiscoveryClusterConnector(dg);
- backupServerLocator = clusterConnector.createServerLocator(false);
- backupServerLocator.setReconnectAttempts(-1);
- backupServerLocator.setInitialConnectAttempts(-1);
+ if (backupServerLocator != null)
+ {
+ backupServerLocator = clusterConnector.createServerLocator(false);
+ backupServerLocator.setReconnectAttempts(-1);
+ backupServerLocator.setInitialConnectAttempts(-1);
+ }
this.manager = manager;
}
13 years, 3 months