[jboss-cvs] JBoss Messaging SVN: r8412 - in branches/Branch_JBossMessaging_1_4_8_SP1_JBMESSAGING-1885_JBMESSAGING-1887_JBMESSAGING-1889_JBMESSAGING-1891_JBMESSAGING-1892_JBMESSAGING-1893_JBMESSAGING-1894: tests/src/org/jboss/test/messaging/jms/clustering and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Aug 12 15:59:32 EDT 2011
Author: raggz
Date: 2011-08-12 15:59:32 -0400 (Fri, 12 Aug 2011)
New Revision: 8412
Modified:
branches/Branch_JBossMessaging_1_4_8_SP1_JBMESSAGING-1885_JBMESSAGING-1887_JBMESSAGING-1889_JBMESSAGING-1891_JBMESSAGING-1892_JBMESSAGING-1893_JBMESSAGING-1894/src/main/org/jboss/jms/client/state/SessionState.java
branches/Branch_JBossMessaging_1_4_8_SP1_JBMESSAGING-1885_JBMESSAGING-1887_JBMESSAGING-1889_JBMESSAGING-1891_JBMESSAGING-1892_JBMESSAGING-1893_JBMESSAGING-1894/tests/src/org/jboss/test/messaging/jms/clustering/XAFailoverTest.java
Log:
JBPAPP-7009
Modified: branches/Branch_JBossMessaging_1_4_8_SP1_JBMESSAGING-1885_JBMESSAGING-1887_JBMESSAGING-1889_JBMESSAGING-1891_JBMESSAGING-1892_JBMESSAGING-1893_JBMESSAGING-1894/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_8_SP1_JBMESSAGING-1885_JBMESSAGING-1887_JBMESSAGING-1889_JBMESSAGING-1891_JBMESSAGING-1892_JBMESSAGING-1893_JBMESSAGING-1894/src/main/org/jboss/jms/client/state/SessionState.java 2011-08-12 19:56:48 UTC (rev 8411)
+++ branches/Branch_JBossMessaging_1_4_8_SP1_JBMESSAGING-1885_JBMESSAGING-1887_JBMESSAGING-1889_JBMESSAGING-1891_JBMESSAGING-1892_JBMESSAGING-1893_JBMESSAGING-1894/src/main/org/jboss/jms/client/state/SessionState.java 2011-08-12 19:59:32 UTC (rev 8412)
@@ -311,66 +311,78 @@
// We need to failover from one session ID to another in the resource manager
rm.handleFailover(connState.getServerID(), oldSessionID, newState.sessionID);
-
+
List ackInfos = Collections.EMPTY_LIST;
- if (!isTransacted() || (isXA() && getCurrentTxId() == null))
+ if (isCC)
{
- // TODO - the check "(isXA() && getCurrentTxId() == null)" shouldn't be necessary any more
- // since xa sessions no longer fall back to non transacted
-
- // Non transacted session or an XA session with no transaction set (it falls back
- // to AUTO_ACKNOWLEDGE)
+ log.trace(this + " is a connection consumer. Load ack info from rm first.");
+ //https://issues.jboss.org/browse/JBMESSAGING-1889
+ //for a connection consumer session, we need to treat the ackInfos specially.
- log.trace(this + " is not transacted (or XA with no transaction set), " +
- "retrieving deliveries from session state");
+ ackInfos = rm.getDeliveriesForSession(getSessionID());
+ }
+
+ if (ackInfos.size() == 0)
+ {
+ if (!isTransacted() || (isXA() && getCurrentTxId() == null))
+ {
+ // TODO - the check "(isXA() && getCurrentTxId() == null)" shouldn't be necessary any more
+ // since xa sessions no longer fall back to non transacted
- // We remove any unacked non-persistent messages - this is because we don't want to ack
- // them since the server won't know about them and will get confused
+ // Non transacted session or an XA session with no transaction set (it falls back
+ // to AUTO_ACKNOWLEDGE)
- if (acknowledgeMode == Session.CLIENT_ACKNOWLEDGE)
- {
- for(Iterator i = getClientAckList().iterator(); i.hasNext(); )
+ log.trace(this + " is not transacted (or XA with no transaction set), " +
+ "retrieving deliveries from session state");
+
+ // We remove any unacked non-persistent messages - this is because we don't want to ack
+ // them since the server won't know about them and will get confused
+
+ if (acknowledgeMode == Session.CLIENT_ACKNOWLEDGE)
{
- DeliveryInfo info = (DeliveryInfo)i.next();
- if (!info.getMessageProxy().getMessage().isReliable())
+ for (Iterator i = getClientAckList().iterator(); i.hasNext();)
{
- i.remove();
- log.trace("removed non persistent delivery " + info);
+ DeliveryInfo info = (DeliveryInfo)i.next();
+ if (!info.getMessageProxy().getMessage().isReliable())
+ {
+ i.remove();
+ log.trace("removed non persistent delivery " + info);
+ }
}
+
+ ackInfos = getClientAckList();
}
-
- ackInfos = getClientAckList();
- }
- else
- {
- DeliveryInfo autoAck = getAutoAckInfo();
- if (autoAck != null)
+ else
{
- if (!autoAck.getMessageProxy().getMessage().isReliable())
+ DeliveryInfo autoAck = getAutoAckInfo();
+ if (autoAck != null)
{
- // unreliable, discard
- setAutoAckInfo(null);
+ if (!autoAck.getMessageProxy().getMessage().isReliable())
+ {
+ // unreliable, discard
+ setAutoAckInfo(null);
+ }
+ else
+ {
+ // reliable
+ ackInfos = new ArrayList();
+ ackInfos.add(autoAck);
+ }
}
- else
- {
- // reliable
- ackInfos = new ArrayList();
- ackInfos.add(autoAck);
- }
}
+
+ log.trace(this + " retrieved " + ackInfos.size() + " deliveries");
}
+ else
+ {
+ // Transacted session - we need to get the acks from the resource manager. BTW we have
+ // kept the old resource manager.
- log.trace(this + " retrieved " + ackInfos.size() + " deliveries");
+ ackInfos = rm.getDeliveriesForSession(getSessionID());
+ }
}
- else
- {
- // Transacted session - we need to get the acks from the resource manager. BTW we have
- // kept the old resource manager.
- ackInfos = rm.getDeliveriesForSession(getSessionID());
- }
-
List recoveryInfos = new ArrayList();
if (!ackInfos.isEmpty())
{
Modified: branches/Branch_JBossMessaging_1_4_8_SP1_JBMESSAGING-1885_JBMESSAGING-1887_JBMESSAGING-1889_JBMESSAGING-1891_JBMESSAGING-1892_JBMESSAGING-1893_JBMESSAGING-1894/tests/src/org/jboss/test/messaging/jms/clustering/XAFailoverTest.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_8_SP1_JBMESSAGING-1885_JBMESSAGING-1887_JBMESSAGING-1889_JBMESSAGING-1891_JBMESSAGING-1892_JBMESSAGING-1893_JBMESSAGING-1894/tests/src/org/jboss/test/messaging/jms/clustering/XAFailoverTest.java 2011-08-12 19:56:48 UTC (rev 8411)
+++ branches/Branch_JBossMessaging_1_4_8_SP1_JBMESSAGING-1885_JBMESSAGING-1887_JBMESSAGING-1889_JBMESSAGING-1891_JBMESSAGING-1892_JBMESSAGING-1893_JBMESSAGING-1894/tests/src/org/jboss/test/messaging/jms/clustering/XAFailoverTest.java 2011-08-12 19:59:32 UTC (rev 8412)
@@ -6,12 +6,19 @@
*/
package org.jboss.test.messaging.jms.clustering;
+import java.util.ArrayList;
import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
import javax.jms.Connection;
+import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
import javax.jms.MessageProducer;
+import javax.jms.ServerSession;
+import javax.jms.ServerSessionPool;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.XAConnection;
@@ -26,6 +33,9 @@
import org.jboss.jms.client.FailoverEvent;
import org.jboss.jms.client.JBossConnection;
+import org.jboss.jms.client.JBossConnectionConsumer;
+import org.jboss.jms.destination.JBossDestination;
+import org.jboss.jms.tx.MessagingXid;
import org.jboss.test.messaging.tools.ServerManagement;
import org.jboss.test.messaging.tools.aop.PoisonInterceptor;
import org.jboss.test.messaging.tools.container.InVMInitialContextFactory;
@@ -1069,6 +1079,112 @@
}
}
}
+
+ //https://issues.jboss.org/browse/JBMESSAGING-1889
+ public void testConnectionConsumerXAFailover() throws Exception
+ {
+ XAConnection xaConn = null;
+ JBossConnection conn = null;
+ JBossConnectionConsumer cc = null;
+
+ XAConnectionFactory xaCF = (XAConnectionFactory)cf;
+
+ try
+ {
+ conn = (JBossConnection)createConnectionOnServer(cf, 1);
+ xaConn = createXAConnectionOnServer(xaCF, 1);
+
+ MockServerSessionPool2 sessionPool = new MockServerSessionPool2(xaConn);
+
+ cc = new JBossConnectionConsumer(conn.getDelegate(),
+ (JBossDestination)queue[1],
+ null,
+ null,
+ sessionPool,
+ 5);
+
+ conn.start();
+
+ //Send a message
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer prod = sess.createProducer(queue[1]);
+ TextMessage sent1 = sess.createTextMessage("plop1");
+
+ prod.send(sent1);
+
+ sessionPool.waitToFailover();
+
+ // register a failover listener
+ SimpleFailoverListener failoverListener = new SimpleFailoverListener();
+ ((JBossConnection)conn).registerFailoverListener(failoverListener);
+
+ SimpleFailoverListener failoverListener2 = new SimpleFailoverListener();
+ ((JBossConnection)xaConn).registerFailoverListener(failoverListener2);
+
+ log.debug("killing node 1 ....");
+
+ ServerManagement.kill(1);
+
+ log.info("########");
+ log.info("######## KILLED NODE 1");
+ log.info("########");
+
+ // wait for the client-side failover to complete
+
+ while(true)
+ {
+ FailoverEvent event = failoverListener.getEvent(30000);
+ if (event != null && FailoverEvent.FAILOVER_COMPLETED == event.getType())
+ {
+ break;
+ }
+ if (event == null)
+ {
+ fail("Did not get expected FAILOVER_COMPLETED event");
+ }
+ }
+
+ while(true)
+ {
+ FailoverEvent event = failoverListener2.getEvent(30000);
+ if (event != null && FailoverEvent.FAILOVER_COMPLETED == event.getType())
+ {
+ break;
+ }
+ if (event == null)
+ {
+ fail("Did not get expected FAILOVER_COMPLETED event on xaConn");
+ }
+ }
+ // failover complete
+ log.info("failover completed");
+
+ sessionPool.failoverComplete();
+
+ Thread.sleep(5000);
+
+ int n = sessionPool.getReceivedMessage();
+
+ //either this should fail or the message is left on the queue. both valid.
+ assertTrue(n == 1);
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ if (xaConn != null)
+ {
+ xaConn.close();
+ }
+ if (cc != null)
+ {
+ cc.close();
+ }
+ }
+ }
+
// Inner classes --------------------------------------------------------------------------------
@@ -1125,4 +1241,176 @@
{
}
}
+
+ class MockServerSessionPool2 implements ServerSessionPool
+ {
+ private XAConnection xaconn;
+
+ private Object failoverLock = new Object();
+ private boolean toFailover = false;
+ private Object failoverCompleteLock = new Object();
+ private boolean failoverCompleted = false;
+
+ private List<Message> listReceived = new ArrayList<Message>();
+
+ private ArrayList<MockServerSession2> pool = new ArrayList<MockServerSession2>();
+
+ MockServerSessionPool2(XAConnection xaconn) throws JMSException
+ {
+ this.xaconn = xaconn;
+ }
+
+ public int getReceivedMessage()
+ {
+ return listReceived.size();
+ }
+
+ public void failoverComplete()
+ {
+ synchronized(failoverCompleteLock)
+ {
+ failoverCompleted = true;
+ failoverCompleteLock.notify();
+ }
+ }
+
+ public void waitToFailover()
+ {
+ synchronized(failoverLock)
+ {
+ while (!toFailover)
+ {
+ try
+ {
+ failoverLock.wait();
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+ }
+ }
+
+ public synchronized void shutdown()
+ {
+ try
+ {
+ for (MockServerSession2 ss : pool)
+ {
+ ss.join();
+ }
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+
+ public synchronized ServerSession getServerSession() throws JMSException
+ {
+ XASession xaSess = xaconn.createXASession();
+ MockServerSession2 s = new MockServerSession2(xaSess, this, pool.size() == 0);
+ pool.add(s);
+ return s;
+ }
+
+ public void notifyAndWaitFailover()
+ {
+ synchronized (failoverLock)
+ {
+ toFailover = true;
+ failoverLock.notify();
+ }
+
+ synchronized (failoverCompleteLock)
+ {
+ while(!failoverCompleted)
+ {
+ try
+ {
+ failoverCompleteLock.wait();
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+ }
+ }
+
+ public synchronized void received(Message m)
+ {
+ listReceived.add(m);
+ }
+ }
+
+ class MockServerSession2 extends Thread implements ServerSession
+ {
+ XASession session;
+ MockServerSessionPool2 thePool;
+ boolean first = false;
+
+ Random rd = new Random();
+
+ MockServerSession2(XASession sess, MockServerSessionPool2 pool, boolean isFirst) throws JMSException
+ {
+ this.session = sess;
+ session.setMessageListener(new MyMessageListener(pool));
+ thePool = pool;
+ first = isFirst;
+ }
+
+ public Session getSession() throws JMSException
+ {
+ return session;
+ }
+
+ public void run()
+ {
+ try
+ {
+ if (first)
+ {
+ int num = rd.nextInt(300);
+
+ XAResource xaRes = session.getXAResource();
+ Xid xid1 = new MessagingXid(("bq1" + num).getBytes(), 42, ("eemeli" + num).getBytes());
+
+ xaRes.start(xid1, XAResource.TMNOFLAGS);
+
+ session.run();
+
+ xaRes.end(xid1, XAResource.TMSUCCESS);
+
+ // prepare the tx
+ log.info("Notifying to failover and wait");
+ thePool.notifyAndWaitFailover();
+
+ xaRes.prepare(xid1);
+ xaRes.commit(xid1, false);
+
+ log.info("==================Committed " + xid1);
+ }
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ class MyMessageListener implements MessageListener
+ {
+ private MockServerSessionPool2 pool;
+
+ public MyMessageListener(MockServerSessionPool2 pool)
+ {
+ this.pool = pool;
+ }
+
+ public void onMessage(Message m)
+ {
+ log.info(this + " ====================== receiving " + m + " pool " + pool);
+ pool.received(m);
+ }
+
+ }
}
More information about the jboss-cvs-commits
mailing list