Author: clebert.suconic(a)jboss.com
Date: 2011-12-08 00:13:35 -0500 (Thu, 08 Dec 2011)
New Revision: 11875
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/InterceptorTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
Log:
https://issues.jboss.org/browse/JBPAPP-7673 - Changing behaviour on MessageConsumer to do
a round trip when no message is on client buffer, so it will verify if the consumer is
still connected before the message consumer returns null
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2011-12-08
03:05:11 UTC (rev 11874)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2011-12-08
05:13:35 UTC (rev 11875)
@@ -104,6 +104,8 @@
private volatile boolean closed;
private volatile int creditsToSend;
+
+ private volatile boolean failedOver;
private volatile Exception lastException;
@@ -165,7 +167,7 @@
// ClientConsumer implementation
// -----------------------------------------------------------------
- private ClientMessage receive(long timeout, final boolean forcingDelivery) throws
HornetQException
+ private ClientMessage receive(final long timeout, final boolean forcingDelivery)
throws HornetQException
{
checkClosed();
@@ -194,17 +196,14 @@
receiverThread = Thread.currentThread();
- if (timeout == 0)
- {
- // Effectively infinite
- timeout = Long.MAX_VALUE;
- }
-
+ // To verify if deliveryForced was already call
boolean deliveryForced = false;
+ // To control when to call deliveryForce
+ boolean callForceDelivery = false;
long start = -1;
- long toWait = timeout;
+ long toWait = timeout == 0 ? Long.MAX_VALUE : timeout;
try
{
@@ -231,13 +230,8 @@
// we only force delivery once per call to receive
if (!deliveryForced)
{
- if (isTrace)
- {
- log.trace("Forcing delivery");
- }
- session.forceDelivery(id, forceDeliveryCount++);
-
- deliveryForced = true;
+ callForceDelivery = true;
+ break;
}
}
@@ -262,6 +256,35 @@
}
}
+ if (failedOver)
+ {
+ if (m == null)
+ {
+ // if failed over and the buffer is null, we reset the state and try it
again
+ failedOver = false;
+ deliveryForced = false;
+ toWait = timeout == 0 ? Long.MAX_VALUE : timeout;
+ continue;
+ }
+ else
+ {
+ failedOver = false;
+ }
+ }
+
+ if (callForceDelivery)
+ {
+ if (isTrace)
+ {
+ log.trace("Forcing delivery");
+ }
+ // JBPAPP-6030 - Calling forceDelivery outside of the lock to avoid
distributed dead locks
+ session.forceDelivery(id, forceDeliveryCount++);
+ callForceDelivery = false;
+ deliveryForced = true;
+ continue;
+ }
+
if (m != null)
{
session.workDone();
@@ -351,19 +374,14 @@
public ClientMessage receive(final long timeout) throws HornetQException
{
- if (isBrowseOnly())
+ ClientMessage msg = receive(timeout, false);
+
+ if (msg == null && !closed)
{
- ClientMessage msg = receive(timeout, false);
- if (msg == null)
- {
- msg = receive(0, true);
- }
- return msg;
+ msg = receive(0, true);
}
- else
- {
- return receive(timeout, false);
- }
+
+ return msg;
}
public ClientMessage receive() throws HornetQException
@@ -465,6 +483,8 @@
lastAckedMessage = null;
creditsToSend = 0;
+
+ failedOver = true;
ackIndividually = false;
}
@@ -887,6 +907,8 @@
{
rateLimiter.limit();
}
+
+ failedOver = false;
synchronized (this)
{
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-12-08
03:05:11 UTC (rev 11874)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-12-08
05:13:35 UTC (rev 11875)
@@ -398,15 +398,8 @@
{
checkClosed();
- // JBPAPP-6030 - Using the executor to avoid distributed dead locks
- executor.execute(new Runnable()
- {
- public void run()
- {
- SessionForceConsumerDelivery request = new
SessionForceConsumerDelivery(consumerID, sequence);
- channel.send(request);
- }
- });
+ SessionForceConsumerDelivery request = new SessionForceConsumerDelivery(consumerID,
sequence);
+ channel.send(request);
}
public ClientConsumer createConsumer(final SimpleString queueName) throws
HornetQException
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/InterceptorTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/InterceptorTest.java 2011-12-08
03:05:11 UTC (rev 11874)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/InterceptorTest.java 2011-12-08
05:13:35 UTC (rev 11875)
@@ -19,6 +19,7 @@
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.client.*;
+import org.hornetq.core.client.impl.ClientConsumerImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.impl.PacketImpl;
@@ -127,16 +128,38 @@
{
public boolean intercept(final Packet packet, final RemotingConnection connection)
throws HornetQException
{
+ if (isForceDeliveryResponse(packet))
+ {
+ return true;
+ }
+
if (packet.getType() == PacketImpl.SESS_RECEIVE_MSG)
{
return false;
}
-
+
return true;
}
}
+ /**
+ * @param packet
+ */
+ private boolean isForceDeliveryResponse(final Packet packet)
+ {
+ if (packet.getType() == PacketImpl.SESS_RECEIVE_MSG)
+ {
+ SessionReceiveMessage msg = (SessionReceiveMessage) packet;
+ if
(msg.getMessage().containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE))
+ {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
private class MyInterceptor5 implements Interceptor
{
private final String key;
@@ -224,6 +247,12 @@
public boolean intercept(final Packet packet, final RemotingConnection connection)
throws HornetQException
{
+
+ if (isForceDeliveryResponse(packet))
+ {
+ return true;
+ }
+
if (packet.getType() == PacketImpl.SESS_RECEIVE_MSG)
{
SessionReceiveMessage p = (SessionReceiveMessage)packet;
@@ -262,6 +291,8 @@
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = session.createMessage(false);
+
+ message.putIntProperty("count", i);
message.putStringProperty(InterceptorTest.key, "apple");
@@ -275,7 +306,11 @@
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = consumer.receive(1000);
-
+
+ assertNotNull(message);
+
+ assertEquals(i, message.getIntProperty("count").intValue());
+
Assert.assertEquals("orange",
message.getStringProperty(InterceptorTest.key));
}
@@ -413,7 +448,7 @@
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = session.createMessage(false);
-
+
producer.send(message);
}
@@ -422,7 +457,7 @@
session.start();
ClientMessage message = consumer.receive(100);
-
+
Assert.assertNull(message);
session.close();
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2011-12-08
03:05:11 UTC (rev 11874)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2011-12-08
05:13:35 UTC (rev 11875)
@@ -195,7 +195,7 @@
ClientMessage m = consumer.receive(1000);
assertNotNull(m);
System.out.println("received message " + i);
- // assertEquals(i, m.getIntProperty("counter").intValue());
+ // assertEquals(i, m.getIntProperty("counter").intValue());
}
}
@@ -271,10 +271,125 @@
crash(session);
endLatch.await(60, TimeUnit.SECONDS);
assertTrue("received only " + received.size(), received.size() == 500);
-
+
session.close();
}
+
+ public void testTimeoutOnFailoverConsumeBlocked() throws Exception
+ {
+ locator.setCallTimeout(5000);
+ locator.setBlockOnNonDurableSend(true);
+ locator.setConsumerWindowSize(0);
+ locator.setBlockOnDurableSend(true);
+ locator.setAckBatchSize(0);
+ locator.setBlockOnAcknowledge(true);
+ locator.setReconnectAttempts(-1);
+ locator.setRetryInterval(500);
+ locator.setAckBatchSize(0);
+ ((InVMNodeManager)nodeManager).failoverPause = 5000l;
+ ClientSessionFactoryInternal sf =
(ClientSessionFactoryInternal)locator.createSessionFactory();
+
+ final ClientSession session = createSession(sf, true, true);
+
+ session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null,
true);
+
+ final ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
+
+ for (int i = 0; i < 500; i++)
+ {
+ ClientMessage message = session.createMessage(true);
+ message.putIntProperty("counter", i);
+ message.putBooleanProperty("end", i == 499);
+ producer.send(message);
+ }
+
+ final CountDownLatch latch = new CountDownLatch(1);
+ final CountDownLatch endLatch = new CountDownLatch(1);
+
+ final ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
+ session.start();
+
+ final Map<Integer, ClientMessage> received = new HashMap<Integer,
ClientMessage>();
+
+ Thread t = new Thread()
+ {
+ public void run()
+ {
+ ClientMessage message = null;
+ try
+ {
+ while ((message = getMessage()) != null)
+ {
+ Integer counter = message.getIntProperty("counter");
+ received.put(counter, message);
+ try
+ {
+ log.info("acking message = id = " + message.getMessageID()
+
+ ", counter = " +
+ message.getIntProperty("counter"));
+ message.acknowledge();
+ }
+ catch (HornetQException e)
+ {
+ e.printStackTrace();
+ continue;
+ }
+ log.info("Acked counter = " + counter);
+ if (counter.equals(10))
+ {
+ latch.countDown();
+ }
+ if (received.size() == 500)
+ {
+ endLatch.countDown();
+ }
+
+ if (message.getBooleanProperty("end"))
+ {
+ break;
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+
+ }
+
+ private ClientMessage getMessage()
+ {
+ while (true)
+ {
+ try
+ {
+ ClientMessage msg = consumer.receive(20000);
+ if (msg == null)
+ {
+ log.info("Returning null message on consuming");
+ }
+ return msg;
+ }
+ catch (Exception ignored)
+ {
+ // retry
+ ignored.printStackTrace();
+ }
+ }
+ }
+ };
+ t.start();
+ latch.await(10, TimeUnit.SECONDS);
+ log.info("crashing session");
+ crash(session);
+ endLatch.await(60, TimeUnit.SECONDS);
+ t.join();
+ assertTrue("received only " + received.size(), received.size() == 500);
+
+ session.close();
+ }
+
//
https://issues.jboss.org/browse/HORNETQ-685
public void testTimeoutOnFailoverTransactionCommit() throws Exception
{