Author: timfox
Date: 2010-01-31 06:49:12 -0500 (Sun, 31 Jan 2010)
New Revision: 8860
Added:
trunk/tests/src/org/hornetq/tests/integration/jms/client/ReceiveNoWaitTest.java
Modified:
trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
trunk/src/main/org/hornetq/jms/client/HornetQMessageConsumer.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-284
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2010-01-29
18:51:42 UTC (rev 8859)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2010-01-31
11:49:12 UTC (rev 8860)
@@ -191,7 +191,7 @@
// Effectively infinite
timeout = Long.MAX_VALUE;
}
-
+
boolean deliveryForced = false;
long start = -1;
Modified: trunk/src/main/org/hornetq/jms/client/HornetQMessageConsumer.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQMessageConsumer.java 2010-01-29 18:51:42
UTC (rev 8859)
+++ trunk/src/main/org/hornetq/jms/client/HornetQMessageConsumer.java 2010-01-31 11:49:12
UTC (rev 8860)
@@ -123,17 +123,17 @@
public Message receive() throws JMSException
{
- return getMessage(0);
+ return getMessage(0, false);
}
public Message receive(final long timeout) throws JMSException
{
- return getMessage(timeout);
+ return getMessage(timeout, false);
}
public Message receiveNoWait() throws JMSException
{
- return getMessage(-1);
+ return getMessage(0, true);
}
public void close() throws JMSException
@@ -197,11 +197,20 @@
}
}
- private HornetQMessage getMessage(final long timeout) throws JMSException
+ private HornetQMessage getMessage(final long timeout, final boolean noWait) throws
JMSException
{
try
{
- ClientMessage message = consumer.receive(timeout);
+ ClientMessage message;
+
+ if (noWait)
+ {
+ message = consumer.receiveImmediate();
+ }
+ else
+ {
+ message = consumer.receive(timeout);
+ }
HornetQMessage msg = null;
Added: trunk/tests/src/org/hornetq/tests/integration/jms/client/ReceiveNoWaitTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/client/ReceiveNoWaitTest.java
(rev 0)
+++
trunk/tests/src/org/hornetq/tests/integration/jms/client/ReceiveNoWaitTest.java 2010-01-31
11:49:12 UTC (rev 8860)
@@ -0,0 +1,110 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.jms.client;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.hornetq.core.logging.Logger;
+import org.hornetq.tests.util.JMSTestBase;
+
+/**
+ *
+ * A ReceiveNoWaitTest
+ *
+ * @author tim
+ *
+ *
+ */
+public class ReceiveNoWaitTest extends JMSTestBase
+{
+ private static final Logger log = Logger.getLogger(ReceiveNoWaitTest.class);
+
+ private Queue queue;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ queue = createQueue("TestQueue");
+ }
+
+ protected void tearDown() throws Exception
+ {
+ jmsServer.destroyQueue("TestQueue");
+
+ super.tearDown();
+ }
+
+
+ /*
+ * Test that after sending persistent messages to a queue (these will be sent
blocking)
+ * that all messages are available for consumption by receiveNoWait()
+ *
https://jira.jboss.org/jira/browse/HORNETQ-284
+ */
+ public void testReceiveNoWait() throws Exception
+ {
+ assertNotNull(queue);
+
+ for (int i = 0; i < 10; i++)
+ {
+ log.info("Iteration " + i);
+
+ Connection connection = cf.createConnection();
+
+ Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ MessageProducer producer = session.createProducer(queue);
+
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+ for (int j = 0; j < 100; j++)
+ {
+ String text = "Message" + j;
+
+ TextMessage message = session.createTextMessage();
+
+ message.setText(text);
+
+ producer.send(message);
+ }
+
+ connection.start();
+
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ for (int j = 0; j < 100; j++)
+ {
+ TextMessage m = (TextMessage)consumer.receiveNoWait();
+
+ if (m == null)
+ {
+ throw new IllegalStateException("msg null");
+ }
+
+ assertEquals("Message" + j, m.getText());
+
+ m.acknowledge();
+ }
+
+ connection.close();
+ }
+ }
+}