[hornetq-commits] JBoss hornetq SVN: r7912 - in trunk: src/main/org/hornetq/core/server/impl and 1 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Tue Aug 25 11:29:32 EDT 2009
Author: timfox
Date: 2009-08-25 11:29:32 -0400 (Tue, 25 Aug 2009)
New Revision: 7912
Modified:
trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientConsumerInternal.java
trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
trunk/tests/src/org/hornetq/tests/integration/client/ConsumerTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-111
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2009-08-25 11:24:46 UTC (rev 7911)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2009-08-25 15:29:32 UTC (rev 7912)
@@ -71,8 +71,6 @@
private final Runner runner = new Runner();
- private ClientMessageInternal currentChunkMessage;
-
private LargeMessageBufferImpl currentLargeMessageBuffer;
// When receiving LargeMessages, the user may choose to not read the body, on this case we need to discard the body
@@ -407,7 +405,7 @@
// Flow control for the first packet, we will have others
flowControl(packet.getPacketSize(), false);
- currentChunkMessage = new ClientMessageImpl(packet.getDeliveryCount());
+ ClientMessageInternal currentChunkMessage = new ClientMessageImpl(packet.getDeliveryCount());
currentChunkMessage.decodeProperties(ChannelBuffers.wrappedBuffer(packet.getLargeMessageHeader()));
@@ -441,12 +439,21 @@
currentLargeMessageBuffer.addPacket(chunk);
}
- public void clear()
+ public void clear() throws HornetQException
{
synchronized (this)
{
+ //Need to send credits for the messages in the buffer
+
+ for (ClientMessageInternal message: this.buffer)
+ {
+ flowControlBeforeConsumption(message);
+ }
+
buffer.clear();
}
+
+ //Need to send credits for the messages in the buffer
waitForOnMessageToComplete();
}
@@ -484,7 +491,7 @@
}
/**
- * flow control is synchornized because of LargeMessage and streaming.
+ *
* LargeMessageBuffer will call flowcontrol here, while other handleMessage will also be calling flowControl.
* So, this operation needs to be atomic.
*
@@ -498,7 +505,6 @@
if (creditsToSend >= clientWindowSize)
{
-
if (clientWindowSize == 0 && discountSlowConsumer)
{
if (trace)
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientConsumerInternal.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientConsumerInternal.java 2009-08-25 11:24:46 UTC (rev 7911)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientConsumerInternal.java 2009-08-25 15:29:32 UTC (rev 7912)
@@ -38,7 +38,7 @@
void flowControl(final int messageBytes, final boolean discountSlowConsumer) throws HornetQException;
- void clear();
+ void clear() throws HornetQException;
int getClientWindowSize();
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-08-25 11:24:46 UTC (rev 7911)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-08-25 15:29:32 UTC (rev 7912)
@@ -1204,6 +1204,8 @@
if (windowSize != 0)
{
+ log.info("Sending " + windowSize + " initial credits");
+
channel.send(new SessionConsumerFlowCreditMessage(consumerID, windowSize));
}
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-08-25 11:24:46 UTC (rev 7911)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-08-25 15:29:32 UTC (rev 7912)
@@ -558,6 +558,7 @@
{
if (availableCredits != null && availableCredits.get() <= 0)
{
+ // log.info("busy - available credits is " + availableCredits.get());
return HandleStatus.BUSY;
}
Modified: trunk/tests/src/org/hornetq/tests/integration/client/ConsumerTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/ConsumerTest.java 2009-08-25 11:24:46 UTC (rev 7911)
+++ trunk/tests/src/org/hornetq/tests/integration/client/ConsumerTest.java 2009-08-25 15:29:32 UTC (rev 7912)
@@ -15,6 +15,8 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import javax.jms.ObjectMessage;
+
import org.hornetq.core.client.ClientConsumer;
import org.hornetq.core.client.ClientMessage;
import org.hornetq.core.client.ClientProducer;
@@ -64,8 +66,6 @@
super.tearDown();
}
-
-
public void testConsumerAckImmediateAutoCommitTrue() throws Exception
{
ClientSessionFactory sf = createInVMFactory();
@@ -93,10 +93,8 @@
assertEquals("m" + i, message2.getBody().readString());
}
// assert that all the messages are there and none have been acked
- assertEquals(0,
- ((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount());
- assertEquals(0,
- ((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount());
+ assertEquals(0, ((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount());
+ assertEquals(0, ((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount());
session.close();
}
@@ -129,10 +127,8 @@
assertEquals("m" + i, message2.getBody().readString());
}
// assert that all the messages are there and none have been acked
- assertEquals(0,
- ((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount());
- assertEquals(0,
- ((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount());
+ assertEquals(0, ((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount());
+ assertEquals(0, ((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount());
session.close();
}
@@ -169,10 +165,8 @@
}
}
// assert that all the messages are there and none have been acked
- assertEquals(0,
- ((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount());
- assertEquals(0,
- ((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount());
+ assertEquals(0, ((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount());
+ assertEquals(0, ((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount());
session.close();
}
@@ -209,17 +203,13 @@
}
}
// assert that all the messages are there and none have been acked
- assertEquals(0,
- ((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount());
- assertEquals(0,
- ((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount());
+ assertEquals(0, ((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount());
+ assertEquals(0, ((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount());
session.close();
- assertEquals(0,
- ((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount());
- assertEquals(0,
- ((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount());
+ assertEquals(0, ((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()).getDeliveringCount());
+ assertEquals(0, ((Queue)server.getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount());
}
public void testAcksWithSmallSendWindow() throws Exception
@@ -246,7 +236,7 @@
{
public boolean intercept(Packet packet, RemotingConnection connection) throws HornetQException
{
- if(packet.getType() == PacketImpl.SESS_ACKNOWLEDGE)
+ if (packet.getType() == PacketImpl.SESS_ACKNOWLEDGE)
{
latch.countDown();
}
@@ -276,7 +266,7 @@
assertTrue(latch.await(5, TimeUnit.SECONDS));
sessionRec.close();
}
-
+
public void testClearListener() throws Exception
{
ClientSessionFactory sf = createInVMFactory();
@@ -286,7 +276,7 @@
session.createQueue(QUEUE, QUEUE, null, false);
ClientConsumer consumer = session.createConsumer(QUEUE);
-
+
consumer.setMessageHandler(new MessageHandler()
{
public void onMessage(ClientMessage msg)
@@ -297,7 +287,7 @@
consumer.setMessageHandler(null);
consumer.receiveImmediate();
}
-
+
public void testNoReceiveWithListener() throws Exception
{
ClientSessionFactory sf = createInVMFactory();
@@ -307,7 +297,7 @@
session.createQueue(QUEUE, QUEUE, null, false);
ClientConsumer consumer = session.createConsumer(QUEUE);
-
+
consumer.setMessageHandler(new MessageHandler()
{
public void onMessage(ClientMessage msg)
@@ -324,7 +314,7 @@
{
if (me.getCode() == HornetQException.ILLEGAL_STATE)
{
- //Ok
+ // Ok
}
else
{
@@ -333,4 +323,123 @@
}
}
+ // https://jira.jboss.org/jira/browse/HORNETQ-111
+ // Test that, on rollback credits are released for messages cleared in the buffer
+ public void testConsumerCreditsOnRollback() throws Exception
+ {
+ ClientSessionFactory sf = createInVMFactory();
+
+ sf.setConsumerWindowSize(10000);
+
+ ClientSession session = sf.createTransactedSession();
+
+ session.createQueue(QUEUE, QUEUE, null, false);
+
+ ClientProducer producer = session.createProducer(QUEUE);
+
+ final int numMessages = 100;
+
+ final byte[] bytes = new byte[1000];
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createClientMessage(false);
+
+ message.getBody().writeBytes(bytes);
+
+ message.putIntProperty("count", i);
+
+ producer.send(message);
+ }
+
+ session.commit();
+
+ ClientConsumer consumer = session.createConsumer(QUEUE);
+ session.start();
+
+ int commited = 0;
+ int rollbacked = 0;
+ for (int i = 0; i < 110; i++)
+ {
+ ClientMessage message = (ClientMessage)consumer.receive();
+
+ int count = (Integer)message.getProperty("count");
+
+ boolean redelivered = message.getDeliveryCount() > 1;
+
+ if (count % 2 == 0 && !redelivered)
+ {
+ session.rollback();
+ rollbacked++;
+ }
+ else
+ {
+ session.commit();
+ commited++;
+ }
+ }
+
+ session.close();
+ }
+
+ // https://jira.jboss.org/jira/browse/HORNETQ-111
+ // Test that, on rollback credits are released for messages cleared in the buffer
+ public void testConsumerCreditsOnRollbackLargeMessages() throws Exception
+ {
+ ClientSessionFactory sf = createInVMFactory();
+
+ sf.setConsumerWindowSize(10000);
+ sf.setMinLargeMessageSize(1000);
+
+ ClientSession session = sf.createTransactedSession();
+
+ session.createQueue(QUEUE, QUEUE, null, false);
+
+ ClientProducer producer = session.createProducer(QUEUE);
+
+ final int numMessages = 100;
+
+ final byte[] bytes = new byte[10000];
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createClientMessage(false);
+
+ message.getBody().writeBytes(bytes);
+
+ message.putIntProperty("count", i);
+
+ producer.send(message);
+ }
+
+ session.commit();
+
+ ClientConsumer consumer = session.createConsumer(QUEUE);
+ session.start();
+
+ int commited = 0;
+ int rollbacked = 0;
+ for (int i = 0; i < 110; i++)
+ {
+ ClientMessage message = (ClientMessage)consumer.receive();
+
+ int count = (Integer)message.getProperty("count");
+
+ boolean redelivered = message.getDeliveryCount() > 1;
+
+ if (count % 2 == 0 && !redelivered)
+ {
+ session.rollback();
+ rollbacked++;
+ }
+ else
+ {
+ session.commit();
+ commited++;
+ }
+ }
+
+ session.close();
+ }
+
}
More information about the hornetq-commits
mailing list