Author: clebert.suconic(a)jboss.com
Date: 2011-12-02 16:35:46 -0500 (Fri, 02 Dec 2011)
New Revision: 11820
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/DelegatingSession.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/util/InVMNodeManager.java
Log:
https://issues.jboss.org/browse/JBPAPP-6543,
https://issues.jboss.org/browse/HORNETQ-685
(Fix provided by Andy Taylor.. I'm just doing the commit after a review)
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-12-02
21:23:10 UTC (rev 11819)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-12-02
21:35:46 UTC (rev 11820)
@@ -921,6 +921,17 @@
*/
private void reconnectSessions(final CoreRemotingConnection oldConnection, final int
reconnectAttempts)
{
+ HashSet<ClientSessionInternal> sessionsToFailover;
+ synchronized (sessions)
+ {
+ sessionsToFailover = new HashSet<ClientSessionInternal>(sessions);
+ }
+
+ for (ClientSessionInternal session : sessionsToFailover)
+ {
+ session.preHandleFailover(connection);
+ }
+
getConnectionWithRetry(reconnectAttempts);
if (connection == null)
@@ -946,12 +957,6 @@
connection.setFailureListeners(newListeners);
- HashSet<ClientSessionInternal> sessionsToFailover;
- synchronized (sessions)
- {
- sessionsToFailover = new HashSet<ClientSessionInternal>(sessions);
- }
-
for (ClientSessionInternal session : sessionsToFailover)
{
session.handleFailover(connection);
@@ -968,7 +973,7 @@
" multiplier = " +
retryIntervalMultiplier, new
Exception("trace"));
}
-
+
long interval = retryInterval;
int count = 0;
@@ -1042,6 +1047,10 @@
}
else
{
+ if (log.isDebugEnabled())
+ {
+ log.debug("Reconnection successfull");
+ }
return;
}
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-12-02
21:23:10 UTC (rev 11819)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-12-02
21:35:46 UTC (rev 11820)
@@ -757,7 +757,10 @@
}
checkClosed();
-
+ if (log.isDebugEnabled())
+ {
+ log.debug("client ack messageID = " + messageID);
+ }
SessionAcknowledgeMessage message = new SessionAcknowledgeMessage(consumerID,
messageID, blockOnAcknowledge);
if (blockOnAcknowledge)
@@ -935,6 +938,14 @@
sendAckHandler = handler;
}
+ public void preHandleFailover(CoreRemotingConnection connection)
+ {
+ // We lock the channel to prevent any packets to be added to the resend
+ // cache during the failover process
+ //we also do this before the connection fails over to give the session a chance to
block for failover
+ channel.lock();
+ }
+
// Needs to be synchronized to prevent issues with occurring concurrently with
close()
public void handleFailover(final CoreRemotingConnection backupConnection)
@@ -948,9 +959,6 @@
boolean resetCreditManager = false;
- // We lock the channel to prevent any packets to be added to the resend
- // cache during the failover process
- channel.lock();
try
{
channel.transferConnection(backupConnection);
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java 2011-12-02
21:23:10 UTC (rev 11819)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java 2011-12-02
21:35:46 UTC (rev 11820)
@@ -59,6 +59,8 @@
void handleReceiveContinuation(long consumerID, SessionReceiveContinuationMessage
continuation) throws Exception;
+ void preHandleFailover(CoreRemotingConnection connection);
+
void handleFailover(CoreRemotingConnection backupConnection);
RemotingConnection getConnection();
@@ -92,5 +94,4 @@
void setPacketSize(int packetSize);
void resetIfNeeded() throws HornetQException;
-
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/DelegatingSession.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2011-12-02
21:23:10 UTC (rev 11819)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2011-12-02
21:35:46 UTC (rev 11820)
@@ -373,6 +373,11 @@
return session.getXAResource();
}
+ public void preHandleFailover(CoreRemotingConnection connection)
+ {
+ session.preHandleFailover(connection);
+ }
+
public void handleFailover(final CoreRemotingConnection backupConnection)
{
session.handleFailover(backupConnection);
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2011-12-02
21:23:10 UTC (rev 11819)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2011-12-02
21:35:46 UTC (rev 11820)
@@ -44,6 +44,7 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.transaction.impl.XidImpl;
import org.hornetq.jms.client.HornetQTextMessage;
+import org.hornetq.tests.integration.cluster.util.InVMNodeManager;
import org.hornetq.tests.integration.cluster.util.TestableServer;
import org.hornetq.tests.util.RandomUtil;
@@ -129,6 +130,246 @@
return sf.createSession(xa, autoCommitSends, autoCommitAcks);
}
+ //
https://issues.jboss.org/browse/HORNETQ-685
+ public void testTimeoutOnFailover() throws Exception
+ {
+ locator.setCallTimeout(5000);
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setAckBatchSize(0);
+ locator.setReconnectAttempts(-1);
+ ((InVMNodeManager)nodeManager).failoverPause = 5000l;
+
+ ClientSessionFactoryInternal sf =
(ClientSessionFactoryInternal)locator.createSessionFactory();
+
+ final ClientSession session = createSession(sf, true, true);
+
+ session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null,
true);
+
+ final ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
+
+ final CountDownLatch latch = new CountDownLatch(10);
+
+ Runnable r = new Runnable()
+ {
+ public void run()
+ {
+ for (int i = 0; i < 500; i++)
+ {
+ ClientMessage message = session.createMessage(true);
+ message.putIntProperty("counter", i);
+ try
+ {
+ System.out.println("sending message: " + i);
+ producer.send(message);
+ if (i < 10)
+ {
+ latch.countDown();
+ }
+ }
+ catch (HornetQException e)
+ {
+ // this is our retry
+ try
+ {
+ producer.send(message);
+ }
+ catch (HornetQException e1)
+ {
+ e1.printStackTrace();
+ }
+ }
+ }
+ }
+ };
+ Thread t = new Thread(r);
+ t.start();
+ latch.await(10, TimeUnit.SECONDS);
+ log.info("crashing session");
+ crash(session);
+ t.join(5000);
+ ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
+ session.start();
+ for (int i = 0; i < 500; i++)
+ {
+ ClientMessage m = consumer.receive(1000);
+ assertNotNull(m);
+ System.out.println("received message " + i);
+ // assertEquals(i, m.getIntProperty("counter").intValue());
+ }
+ }
+
+ //
https://issues.jboss.org/browse/HORNETQ-685
+ public void testTimeoutOnFailoverConsume() throws Exception
+ {
+ locator.setCallTimeout(5000);
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setAckBatchSize(0);
+ locator.setBlockOnAcknowledge(true);
+ locator.setReconnectAttempts(-1);
+ locator.setRetryInterval(500);
+ locator.setAckBatchSize(0);
+ ((InVMNodeManager)nodeManager).failoverPause = 5000l;
+
+ ClientSessionFactoryInternal sf =
(ClientSessionFactoryInternal)locator.createSessionFactory();
+
+ final ClientSession session = createSession(sf, true, true);
+
+ session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null,
true);
+
+ final ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
+
+ for (int i = 0; i < 500; i++)
+ {
+ ClientMessage message = session.createMessage(true);
+ message.putIntProperty("counter", i);
+ producer.send(message);
+ }
+
+ final CountDownLatch latch = new CountDownLatch(1);
+ final CountDownLatch endLatch = new CountDownLatch(1);
+
+ final ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
+ session.start();
+
+ final Map<Integer, ClientMessage> received = new HashMap<Integer,
ClientMessage>();
+
+ consumer.setMessageHandler(new MessageHandler()
+ {
+
+ public void onMessage(ClientMessage message)
+ {
+ Integer counter = message.getIntProperty("counter");
+ received.put(counter, message);
+ try
+ {
+ log.info("acking message = id = " + message.getMessageID() +
+ ", counter = " +
+ message.getIntProperty("counter"));
+ message.acknowledge();
+ }
+ catch (HornetQException e)
+ {
+ e.printStackTrace();
+ return;
+ }
+ log.info("Acked counter = " + counter);
+ if (counter.equals(10))
+ {
+ latch.countDown();
+ }
+ if (received.size() == 500)
+ {
+ endLatch.countDown();
+ }
+ }
+
+ });
+ latch.await(10, TimeUnit.SECONDS);
+ log.info("crashing session");
+ crash(session);
+ endLatch.await(60, TimeUnit.SECONDS);
+ assertTrue("received only " + received.size(), received.size() == 500);
+
+ session.close();
+ }
+
+ //
https://issues.jboss.org/browse/HORNETQ-685
+ public void testTimeoutOnFailoverTransactionCommit() throws Exception
+ {
+ locator.setCallTimeout(2000);
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setAckBatchSize(0);
+ locator.setReconnectAttempts(-1);
+ ((InVMNodeManager)nodeManager).failoverPause = 5000l;
+
+ ClientSessionFactoryInternal sf =
(ClientSessionFactoryInternal)locator.createSessionFactory();
+
+ final ClientSession session = createSession(sf, true, false, false);
+
+ session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null,
true);
+
+ final ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
+
+ Xid xid = new XidImpl("uhuhuhu".getBytes(), 126512,
"auhsduashd".getBytes());
+
+ session.start(xid, XAResource.TMNOFLAGS);
+
+ for (int i = 0; i < 500; i++)
+ {
+ ClientMessage message = session.createMessage(true);
+ message.putIntProperty("counter", i);
+
+ System.out.println("sending message: " + i);
+ producer.send(message);
+
+ }
+ session.end(xid, XAResource.TMSUCCESS);
+ session.prepare(xid);
+ System.out.println("crashing session");
+ crash(false, session);
+
+ session.commit(xid, false);
+
+ ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
+ session.start();
+ for (int i = 0; i < 500; i++)
+ {
+ ClientMessage m = consumer.receive(1000);
+ assertNotNull(m);
+ System.out.println("received message " + i);
+ assertEquals(i, m.getIntProperty("counter").intValue());
+ }
+ }
+
+ //
https://issues.jboss.org/browse/HORNETQ-685
+ public void testTimeoutOnFailoverTransactionRollback() throws Exception
+ {
+ locator.setCallTimeout(2000);
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setAckBatchSize(0);
+ locator.setReconnectAttempts(-1);
+ ((InVMNodeManager)nodeManager).failoverPause = 5000l;
+
+ ClientSessionFactoryInternal sf =
(ClientSessionFactoryInternal)locator.createSessionFactory();
+
+ final ClientSession session = createSession(sf, true, false, false);
+
+ session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null,
true);
+
+ final ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
+
+ Xid xid = new XidImpl("uhuhuhu".getBytes(), 126512,
"auhsduashd".getBytes());
+
+ session.start(xid, XAResource.TMNOFLAGS);
+
+ for (int i = 0; i < 500; i++)
+ {
+ ClientMessage message = session.createMessage(true);
+ message.putIntProperty("counter", i);
+
+ System.out.println("sending message: " + i);
+ producer.send(message);
+ }
+
+ session.end(xid, XAResource.TMSUCCESS);
+ session.prepare(xid);
+ System.out.println("crashing session");
+ crash(false, session);
+
+ session.rollback(xid);
+
+ ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
+ session.start();
+
+ ClientMessage m = consumer.receive(1000);
+ assertNull(m);
+
+ }
+
//
https://jira.jboss.org/browse/HORNETQ-522
public void testNonTransactedWithZeroConsumerWindowSize() throws Exception
{
@@ -1334,7 +1575,7 @@
session2.end(xid, XAResource.TMSUCCESS);
- // session2.prepare(xid);
+ // session2.prepare(xid);
crash(session2);
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/util/InVMNodeManager.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/util/InVMNodeManager.java 2011-12-02
21:23:10 UTC (rev 11819)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/util/InVMNodeManager.java 2011-12-02
21:35:46 UTC (rev 11820)
@@ -37,6 +37,8 @@
public State state = NOT_STARTED;
+ public long failoverPause = 0l;
+
public InVMNodeManager()
{
liveLock = new Semaphore(1);
@@ -73,6 +75,10 @@
}
}
while (true);
+ if(failoverPause > 0l)
+ {
+ Thread.sleep(failoverPause);
+ }
}
@Override