Adam Malter [
http://community.jboss.org/people/amalter] created the discussion
"Stalled Queue Consumer on Windows(NIO) with Large Message and ConsumerWindow"
To view the discussion, visit:
http://community.jboss.org/message/624167#624167
--------------------------------------------------------------
Hi All,
We use HornetQ as our development JMS provider, and I'm trying to get everyone
comfortable enough to upgrade it to production. Having some troubles however when we
really start to bang on it.
This is the case:
HornetQ running in our old customized JBoss instance (I won't even mention the version
number, needless to say we've re-written enough stuff that I worry my case is not
generic/easily reproducible)
Anyway - this is what I'm seeing..
Server config is:
Configuration config = new ConfigurationImpl();
config.setPersistenceEnabled(true);
config.setJournalDirectory(FileUtil.buildPath(rootData,
"journal").getPath());
config.setBindingsDirectory(FileUtil.buildPath(rootData,
"bindings").getPath());
config.setPagingDirectory(FileUtil.buildPath(rootData,
"paging").getPath());
config.setLargeMessagesDirectory(FileUtil.buildPath(rootData,
"lobs").getPath());
config.setPersistDeliveryCountBeforeDelivery(true); //required for redelivery
count to work on rollbacks
if (SystemUtils.IS_OS_WINDOWS) {
config.setJournalType(JournalType.NIO);
}
//HornetQ requires cluster user and pw or spits out warnings
config.setClustered(false);
config.setClusterUser("tcuser");
config.setClusterPassword("tradecard");
config.setSecurityEnabled(false);
//Simple Q Service for now - only in VM transport
config.getAcceptorConfigurations().add(new
TransportConfiguration(InVMAcceptorFactory.class.getName()));
config.setCreateJournalDir(true);
config.setCreateBindingsDir(true);
Contents of hornetq-version.properties from our jar file:
hornetq.version.versionName=HQ_2_2_5_FINAL_AS7
hornetq.version.majorVersion=2
hornetq.version.minorVersion=2
hornetq.version.microVersion=5
hornetq.version.incrementingVersion=121
hornetq.version.versionSuffix=Final
hornetq.version.versionTag=Final
hornetq.netty.version=3.2.3.Final-r${buildNumber}
hornetq.version.compatibleVersionList=121
We're running in XA mode with durable queues and no selectors or funny priorities.
Additionally, the issue only occurs when I leave the consumer window size at default. If I
change setConsumerWindowSize(0), my issue disappears.
So inside a single transaction, I send about 10 ~512kb to the same destination.
Destination receives them okay, starts to process them, and then triggers an application
level error (In this case I'm purposefully triggering it to test our resubmit
handling)
Our application logic is that we try to process the message a pre-set number of times
(lets say 3) and before moving it to dead letter. This usually happens on the very first
rollback, but not always - essentially the consumer/mdb picks up a message, rolls it back,
starts new tx, tries to pick up another one and then hangs with the following stack:
"AuditMDB[000]" - Thread t@113
java.lang.Thread.State: TIMED_WAITING
at java.lang.Object.wait(Native Method)
- waiting on <1a647ea> (a
org.hornetq.core.client.impl.LargeMessageControllerImpl)
at
org.hornetq.core.client.impl.LargeMessageControllerImpl.waitCompletion(LargeMessageControllerImpl.java:304)
at
org.hornetq.core.client.impl.LargeMessageControllerImpl.saveBuffer(LargeMessageControllerImpl.java:283)
at
org.hornetq.core.client.impl.ClientLargeMessageImpl.checkBuffer(ClientLargeMessageImpl.java:201)
at
org.hornetq.core.client.impl.ClientLargeMessageImpl.getBodyBuffer(ClientLargeMessageImpl.java:101)
at
org.hornetq.jms.client.HornetQBytesMessage.getBuffer(HornetQBytesMessage.java:451)
at
org.hornetq.jms.client.HornetQBytesMessage.readBytes(HornetQBytesMessage.java:248)
at
com.tradecard.server.util.BytesMessageInputStream.read(BytesMessageInputStream.java:68)
at
com.tradecard.framework.util.ChunkedByteBuffer.append(ChunkedByteBuffer.java:344)
at
com.tradecard.framework.util.ChunkedByteBuffer.append(ChunkedByteBuffer.java:332)
at
com.tradecard.server.mdb.HornetQContainerInvoker$HornetQServerSession.run(HornetQContainerInvoker.java:287)
at java.lang.Thread.run(Thread.java:662)
Locked ownable synchronizers:
- None
The somewhat relevant portions of our Hornet Invoker are:
xaResource = ((XAQueueSession)session).getXAResource();
txManager.begin();
tx = txManager.getTransaction();
tx.enlistResource(xaResource);
pooledSession.syncRegister();
tx.registerSynchronization(pooledSession);
//cache the session/senders for QueueSender to use
TransactionCache.put(TransactionCache.MQXA_SESSION, pooledSession);
txid = TransactionToolbox.getGlobalTransactionId();
Message requestMessage = null;
requestMessage = receiver.receive(2000);
if (requestMessage != null && !quit) {
Message message = null;
try {
BytesMessage bytesMessage = (BytesMessage)requestMessage;
BytesMessageInputStream stream = new
BytesMessageInputStream(bytesMessage);
ChunkedByteBuffer buffer = new ChunkedByteBuffer(10000, 100);
buffer.append(stream); // <--- hanging here
....
There doesn't seem to be anyone owning the lock. Eventually I get timeout warnings
from my TxManager and have to bounce the server. As soon as I bounce, it picks up another
message and processes, quickly getting the error again.
Additionally, I can see a number of files pending in my lobs directory.
All of our custom infrastructure makes it difficult to extract the case into a postable
test case. However, if anyone thinks it worthwhile and this doesn't seem like anything
straight forward, I'll make the attempt.
The idea for setting the consumer window size to zero came from the following closed bug -
https://issues.jboss.org/browse/HORNETQ-111 https://issues.jboss.org/browse/HORNETQ-111
Thanks for your time and any help!
--------------------------------------------------------------
Reply to this message by going to Community
[
http://community.jboss.org/message/624167#624167]
Start a new discussion in JBoss Messaging at Community
[
http://community.jboss.org/choose-container!input.jspa?contentType=1&...]