Author: clebert.suconic(a)jboss.com
Date: 2011-05-31 14:19:22 -0400 (Tue, 31 May 2011)
New Revision: 10753
Modified:
branches/Branch_2_2_EAP/examples/jms/paging/src/org/hornetq/jms/example/PagingExample.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java
Log:
HORNETQ-708 - Fixing consumeImmediate on paging as part of my current work on paging
Modified:
branches/Branch_2_2_EAP/examples/jms/paging/src/org/hornetq/jms/example/PagingExample.java
===================================================================
---
branches/Branch_2_2_EAP/examples/jms/paging/src/org/hornetq/jms/example/PagingExample.java 2011-05-28
16:34:41 UTC (rev 10752)
+++
branches/Branch_2_2_EAP/examples/jms/paging/src/org/hornetq/jms/example/PagingExample.java 2011-05-31
18:19:22 UTC (rev 10753)
@@ -87,7 +87,7 @@
messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// Step 13. Send the message for about 1K, which should be over the memory limit
imposed by the server
- for (int i = 0; i < 1000; i++)
+ for (int i = 0; i < 10000; i++)
{
messageProducer.send(message);
}
@@ -106,7 +106,7 @@
// paging
// until messages are ACKed
- for (int i = 0; i < 1000; i++)
+ for (int i = 0; i < 10000; i++)
{
message = (BytesMessage)messageConsumer.receive(3000);
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2011-05-28
16:34:41 UTC (rev 10752)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2011-05-31
18:19:22 UTC (rev 10753)
@@ -17,7 +17,6 @@
import java.util.Iterator;
import java.util.concurrent.Executor;
-import org.hornetq.api.core.HornetQBuffers;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.client.ClientMessage;
@@ -48,6 +47,8 @@
//
------------------------------------------------------------------------------------
private static final Logger log = Logger.getLogger(ClientConsumerImpl.class);
+
+ private static final boolean isTrace = log.isTraceEnabled();
private static final boolean trace = ClientConsumerImpl.log.isTraceEnabled();
@@ -223,6 +224,10 @@
// we only force delivery once per call to receive
if (!deliveryForced)
{
+ if (isTrace)
+ {
+ log.trace("Forcing delivery");
+ }
session.forceDelivery(id, forceDeliveryCount++);
deliveryForced = true;
@@ -258,15 +263,26 @@
{
long seq =
m.getLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE);
- if (forcingDelivery && seq == forceDeliveryCount - 1)
+ // Need to check if forceDelivery was called at this call
+ // As we could be receiving a message that came from a previous call
+ if (forcingDelivery && deliveryForced && seq ==
forceDeliveryCount - 1)
{
// forced delivery messages are discarded, nothing has been
delivered by the queue
resetIfSlowConsumer();
+
+ if (isTrace)
+ {
+ log.trace("There was nothing on the queue, leaving it now::
returning null");
+ }
return null;
}
else
{
+ if (isTrace)
+ {
+ log.trace("Ignored force delivery answer as it belonged to
another call");
+ }
// Ignore the message
continue;
}
@@ -301,11 +317,20 @@
{
largeMessageReceived = m;
}
+
+ if (isTrace)
+ {
+ log.trace("Returning " + m);
+ }
return m;
}
else
{
+ if (isTrace)
+ {
+ log.trace("Returning null");
+ }
resetIfSlowConsumer();
return null;
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java 2011-05-28
16:34:41 UTC (rev 10752)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java 2011-05-31
18:19:22 UTC (rev 10753)
@@ -71,6 +71,9 @@
void cancel(MessageReference reference, long timeBase) throws Exception;
void deliverAsync();
+
+ /** This method will make sure that any pending message (including paged message) will
be delivered */
+ void forceDelivery();
long getMessageCount();
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-05-28
16:34:41 UTC (rev 10752)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-05-31
18:19:22 UTC (rev 10753)
@@ -403,6 +403,25 @@
executor.execute(concurrentPoller);
}
+ public void forceDelivery()
+ {
+ if (pageSubscription != null && pageSubscription.isPaging())
+ {
+ if (isTrace)
+ {
+ log.trace("Force delivery scheduling depage");
+ }
+ scheduleDepage();
+ }
+
+ if (isTrace)
+ {
+ log.trace("Force delivery deliverying async");
+ }
+
+ deliverAsync();
+ }
+
public void deliverAsync()
{
getExecutor().execute(deliverRunner);
@@ -1670,6 +1689,11 @@
if (isTrace)
{
+ if (depaged == 0 && queueMemorySize.get() >= maxSize)
+ {
+ log.trace("Couldn't depage any message as the maxSize on the queue
was achieved. There are too many pending messages to be acked in reference to the page
configuration");
+ }
+
log.trace("Queue Memory Size after depage on queue="+this.getName() +
" is " + queueMemorySize.get() + " with maxSize = " + maxSize +
". Depaged " + depaged + " messages");
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-05-28
16:34:41 UTC (rev 10752)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-05-31
18:19:22 UTC (rev 10753)
@@ -673,7 +673,7 @@
}
else
{
- messageQueue.deliverAsync();
+ messageQueue.forceDelivery();
}
}
}
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-05-28
16:34:41 UTC (rev 10752)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-05-31
18:19:22 UTC (rev 10753)
@@ -354,6 +354,149 @@
}
+ public void testReceiveImmediate() throws Exception
+ {
+ clearData();
+
+ Configuration config = createDefaultConfig();
+
+ config.setJournalSyncNonTransactional(false);
+
+ HornetQServer server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ final int messageSize = 1024;
+
+ final int numberOfMessages = 1000;
+
+ try
+ {
+ ServerLocator locator = createInVMNonHALocator();
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(false, false, false);
+
+ session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ ClientMessage message = null;
+
+ byte[] body = new byte[messageSize];
+
+ ByteBuffer bb = ByteBuffer.wrap(body);
+
+ for (int j = 1; j <= messageSize; j++)
+ {
+ bb.put(getSamplebyte(j));
+ }
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ message = session.createMessage(true);
+
+ HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ message.putIntProperty(new SimpleString("id"), i);
+
+ producer.send(message);
+ if (i % 1000 == 0)
+ {
+ session.commit();
+ }
+ }
+ session.commit();
+ session.close();
+
+ session = null;
+
+ sf.close();
+ locator.close();
+
+ server.stop();
+
+ server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+ server.start();
+
+ locator = createInVMNonHALocator();
+ sf = locator.createSessionFactory();
+
+ Queue queue = server.locateQueue(ADDRESS);
+
+ assertEquals(numberOfMessages, queue.getMessageCount());
+
+ LinkedList<Xid> xids = new LinkedList<Xid>();
+
+ int msgReceived = 0;
+ ClientSession sessionConsumer = sf.createSession(false, false, false);
+ sessionConsumer.start();
+ ClientConsumer consumer = sessionConsumer.createConsumer(PagingTest.ADDRESS);
+ for (int msgCount = 0; msgCount < numberOfMessages; msgCount++)
+ {
+ log.info("Received " + msgCount);
+ msgReceived++;
+ ClientMessage msg = consumer.receiveImmediate();
+ if (msg == null)
+ {
+ log.info("It's null. leaving now");
+ sessionConsumer.commit();
+ fail("Didn't receive a message");
+ }
+ msg.acknowledge();
+
+ if (msgCount % 5 == 0)
+ {
+ log.info("commit");
+ sessionConsumer.commit();
+ }
+ }
+
+ sessionConsumer.commit();
+
+ sessionConsumer.close();
+
+ sf.close();
+
+ locator.close();
+
+ assertEquals(0, queue.getMessageCount());
+
+ long timeout = System.currentTimeMillis() + 5000;
+ while (timeout > System.currentTimeMillis() &&
queue.getPageSubscription().getPagingStore().isPaging())
+ {
+ Thread.sleep(100);
+ }
+ assertFalse(queue.getPageSubscription().getPagingStore().isPaging());
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
public void testMissingTXEverythingAcked() throws Exception
{
clearData();
@@ -3642,11 +3785,18 @@
}
});
- if (!message.waitOutputStreamCompletion(5000))
+ try
{
- log.info(threadDump("dump"));
- fail("Couldn't finish large message sending");
+ if (!message.waitOutputStreamCompletion(5000))
+ {
+ log.info(threadDump("dump"));
+ fail("Couldn't finish large message receiving");
+ }
}
+ catch (Throwable e)
+ {
+ fail("Couldn't finish large message receiving for id=" +
message.getStringProperty("id") + " with messageID=" +
message.getMessageID());
+ }
}
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2011-05-28
16:34:41 UTC (rev 10752)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2011-05-31
18:19:22 UTC (rev 10753)
@@ -656,4 +656,13 @@
return 0;
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#forceDelivery()
+ */
+ public void forceDelivery()
+ {
+ // TODO Auto-generated method stub
+
+ }
+
}
\ No newline at end of file
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java 2011-05-28
16:34:41 UTC (rev 10752)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java 2011-05-31
18:19:22 UTC (rev 10753)
@@ -650,6 +650,15 @@
return null;
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#forceDelivery()
+ */
+ public void forceDelivery()
+ {
+ // TODO Auto-generated method stub
+
+ }
+
}
}