[jboss-cvs] JBoss Messaging SVN: r8412 - in branches/Branch_JBossMessaging_1_4_8_SP1_JBMESSAGING-1885_JBMESSAGING-1887_JBMESSAGING-1889_JBMESSAGING-1891_JBMESSAGING-1892_JBMESSAGING-1893_JBMESSAGING-1894: tests/src/org/jboss/test/messaging/jms/clustering and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Aug 12 15:59:32 EDT 2011


Author: raggz
Date: 2011-08-12 15:59:32 -0400 (Fri, 12 Aug 2011)
New Revision: 8412

Modified:
   branches/Branch_JBossMessaging_1_4_8_SP1_JBMESSAGING-1885_JBMESSAGING-1887_JBMESSAGING-1889_JBMESSAGING-1891_JBMESSAGING-1892_JBMESSAGING-1893_JBMESSAGING-1894/src/main/org/jboss/jms/client/state/SessionState.java
   branches/Branch_JBossMessaging_1_4_8_SP1_JBMESSAGING-1885_JBMESSAGING-1887_JBMESSAGING-1889_JBMESSAGING-1891_JBMESSAGING-1892_JBMESSAGING-1893_JBMESSAGING-1894/tests/src/org/jboss/test/messaging/jms/clustering/XAFailoverTest.java
Log:
JBPAPP-7009


Modified: branches/Branch_JBossMessaging_1_4_8_SP1_JBMESSAGING-1885_JBMESSAGING-1887_JBMESSAGING-1889_JBMESSAGING-1891_JBMESSAGING-1892_JBMESSAGING-1893_JBMESSAGING-1894/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_8_SP1_JBMESSAGING-1885_JBMESSAGING-1887_JBMESSAGING-1889_JBMESSAGING-1891_JBMESSAGING-1892_JBMESSAGING-1893_JBMESSAGING-1894/src/main/org/jboss/jms/client/state/SessionState.java	2011-08-12 19:56:48 UTC (rev 8411)
+++ branches/Branch_JBossMessaging_1_4_8_SP1_JBMESSAGING-1885_JBMESSAGING-1887_JBMESSAGING-1889_JBMESSAGING-1891_JBMESSAGING-1892_JBMESSAGING-1893_JBMESSAGING-1894/src/main/org/jboss/jms/client/state/SessionState.java	2011-08-12 19:59:32 UTC (rev 8412)
@@ -311,66 +311,78 @@
       
       // We need to failover from one session ID to another in the resource manager
       rm.handleFailover(connState.getServerID(), oldSessionID, newState.sessionID);
-
+      
       List ackInfos = Collections.EMPTY_LIST;
 
-      if (!isTransacted() || (isXA() && getCurrentTxId() == null))
+      if (isCC)
       {
-         // TODO - the check "(isXA() && getCurrentTxId() == null)" shouldn't be necessary any more
-         // since xa sessions no longer fall back to non transacted
-         
-         // Non transacted session or an XA session with no transaction set (it falls back
-         // to AUTO_ACKNOWLEDGE)
+         log.trace(this + " is a connection consumer. Load ack info from rm first.");
+         //https://issues.jboss.org/browse/JBMESSAGING-1889
+         //for a connection consumer session, we need to treat the ackInfos specially.
 
-         log.trace(this + " is not transacted (or XA with no transaction set), " +
-                   "retrieving deliveries from session state");
+         ackInfos = rm.getDeliveriesForSession(getSessionID());
+      }
+      
+      if (ackInfos.size() == 0)
+      {
+         if (!isTransacted() || (isXA() && getCurrentTxId() == null))
+         {
+            // TODO - the check "(isXA() && getCurrentTxId() == null)" shouldn't be necessary any more
+            // since xa sessions no longer fall back to non transacted
 
-         // We remove any unacked non-persistent messages - this is because we don't want to ack
-         // them since the server won't know about them and will get confused
+            // Non transacted session or an XA session with no transaction set (it falls back
+            // to AUTO_ACKNOWLEDGE)
 
-         if (acknowledgeMode == Session.CLIENT_ACKNOWLEDGE)
-         {
-            for(Iterator i = getClientAckList().iterator(); i.hasNext(); )
+            log.trace(this + " is not transacted (or XA with no transaction set), " +
+                      "retrieving deliveries from session state");
+
+            // We remove any unacked non-persistent messages - this is because we don't want to ack
+            // them since the server won't know about them and will get confused
+
+            if (acknowledgeMode == Session.CLIENT_ACKNOWLEDGE)
             {
-               DeliveryInfo info = (DeliveryInfo)i.next();
-               if (!info.getMessageProxy().getMessage().isReliable())
+               for (Iterator i = getClientAckList().iterator(); i.hasNext();)
                {
-                  i.remove();
-                  log.trace("removed non persistent delivery " + info);
+                  DeliveryInfo info = (DeliveryInfo)i.next();
+                  if (!info.getMessageProxy().getMessage().isReliable())
+                  {
+                     i.remove();
+                     log.trace("removed non persistent delivery " + info);
+                  }
                }
+
+               ackInfos = getClientAckList();
             }
-
-            ackInfos = getClientAckList();
-         }
-         else
-         {
-            DeliveryInfo autoAck = getAutoAckInfo();
-            if (autoAck != null)
+            else
             {
-               if (!autoAck.getMessageProxy().getMessage().isReliable())
+               DeliveryInfo autoAck = getAutoAckInfo();
+               if (autoAck != null)
                {
-                  // unreliable, discard
-                  setAutoAckInfo(null);
+                  if (!autoAck.getMessageProxy().getMessage().isReliable())
+                  {
+                     // unreliable, discard
+                     setAutoAckInfo(null);
+                  }
+                  else
+                  {
+                     // reliable
+                     ackInfos = new ArrayList();
+                     ackInfos.add(autoAck);
+                  }
                }
-               else
-               {
-                  // reliable
-                  ackInfos = new ArrayList();
-                  ackInfos.add(autoAck);
-               }
             }
+
+            log.trace(this + " retrieved " + ackInfos.size() + " deliveries");
          }
+         else
+         {
+            // Transacted session - we need to get the acks from the resource manager. BTW we have
+            // kept the old resource manager.
 
-         log.trace(this + " retrieved " + ackInfos.size() + " deliveries");
+            ackInfos = rm.getDeliveriesForSession(getSessionID());
+         }
       }
-      else
-      {
-         // Transacted session - we need to get the acks from the resource manager. BTW we have
-         // kept the old resource manager.
 
-         ackInfos = rm.getDeliveriesForSession(getSessionID());
-      }
-
       List recoveryInfos = new ArrayList();
       if (!ackInfos.isEmpty())
       {         

Modified: branches/Branch_JBossMessaging_1_4_8_SP1_JBMESSAGING-1885_JBMESSAGING-1887_JBMESSAGING-1889_JBMESSAGING-1891_JBMESSAGING-1892_JBMESSAGING-1893_JBMESSAGING-1894/tests/src/org/jboss/test/messaging/jms/clustering/XAFailoverTest.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_8_SP1_JBMESSAGING-1885_JBMESSAGING-1887_JBMESSAGING-1889_JBMESSAGING-1891_JBMESSAGING-1892_JBMESSAGING-1893_JBMESSAGING-1894/tests/src/org/jboss/test/messaging/jms/clustering/XAFailoverTest.java	2011-08-12 19:56:48 UTC (rev 8411)
+++ branches/Branch_JBossMessaging_1_4_8_SP1_JBMESSAGING-1885_JBMESSAGING-1887_JBMESSAGING-1889_JBMESSAGING-1891_JBMESSAGING-1892_JBMESSAGING-1893_JBMESSAGING-1894/tests/src/org/jboss/test/messaging/jms/clustering/XAFailoverTest.java	2011-08-12 19:59:32 UTC (rev 8412)
@@ -6,12 +6,19 @@
  */
 package org.jboss.test.messaging.jms.clustering;
 
+import java.util.ArrayList;
 import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
 
 import javax.jms.Connection;
+import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
+import javax.jms.ServerSession;
+import javax.jms.ServerSessionPool;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 import javax.jms.XAConnection;
@@ -26,6 +33,9 @@
 
 import org.jboss.jms.client.FailoverEvent;
 import org.jboss.jms.client.JBossConnection;
+import org.jboss.jms.client.JBossConnectionConsumer;
+import org.jboss.jms.destination.JBossDestination;
+import org.jboss.jms.tx.MessagingXid;
 import org.jboss.test.messaging.tools.ServerManagement;
 import org.jboss.test.messaging.tools.aop.PoisonInterceptor;
 import org.jboss.test.messaging.tools.container.InVMInitialContextFactory;
@@ -1069,6 +1079,112 @@
          }
       }
    }
+
+   //https://issues.jboss.org/browse/JBMESSAGING-1889
+   public void testConnectionConsumerXAFailover() throws Exception
+   {
+      XAConnection xaConn = null;
+      JBossConnection conn = null;
+      JBossConnectionConsumer cc = null;
+      
+      XAConnectionFactory xaCF = (XAConnectionFactory)cf;
+
+      try
+      {
+         conn = (JBossConnection)createConnectionOnServer(cf, 1);
+         xaConn = createXAConnectionOnServer(xaCF, 1);
+         
+         MockServerSessionPool2 sessionPool = new MockServerSessionPool2(xaConn);
+
+         cc = new JBossConnectionConsumer(conn.getDelegate(),
+                                                                  (JBossDestination)queue[1],
+                                                                  null,
+                                                                  null,
+                                                                  sessionPool,
+                                                                  5);
+         
+         conn.start();
+
+         //Send a message
+         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer prod = sess.createProducer(queue[1]);
+         TextMessage sent1 = sess.createTextMessage("plop1");
+
+         prod.send(sent1);
+
+         sessionPool.waitToFailover();
+
+         // register a failover listener
+         SimpleFailoverListener failoverListener = new SimpleFailoverListener();
+         ((JBossConnection)conn).registerFailoverListener(failoverListener);
+         
+         SimpleFailoverListener failoverListener2 = new SimpleFailoverListener();
+         ((JBossConnection)xaConn).registerFailoverListener(failoverListener2);
+
+         log.debug("killing node 1 ....");
+
+         ServerManagement.kill(1);
+
+         log.info("########");
+         log.info("######## KILLED NODE 1");
+         log.info("########");
+
+         // wait for the client-side failover to complete
+
+         while(true)
+         {
+            FailoverEvent event = failoverListener.getEvent(30000);
+            if (event != null && FailoverEvent.FAILOVER_COMPLETED == event.getType())
+            {
+               break;
+            }
+            if (event == null)
+            {
+               fail("Did not get expected FAILOVER_COMPLETED event");
+            }
+         }
+
+         while(true)
+         {
+            FailoverEvent event = failoverListener2.getEvent(30000);
+            if (event != null && FailoverEvent.FAILOVER_COMPLETED == event.getType())
+            {
+               break;
+            }
+            if (event == null)
+            {
+               fail("Did not get expected FAILOVER_COMPLETED event on xaConn");
+            }
+         }
+         // failover complete
+         log.info("failover completed");
+         
+         sessionPool.failoverComplete();
+         
+         Thread.sleep(5000);
+         
+         int n = sessionPool.getReceivedMessage();
+
+         //either this should fail or the message is left on the queue. both valid.
+         assertTrue(n == 1);
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+         if (xaConn != null)
+         {
+            xaConn.close();
+         }
+         if (cc != null)
+         {
+            cc.close();
+         }
+      }
+   }
+
    
    // Inner classes --------------------------------------------------------------------------------
 
@@ -1125,4 +1241,176 @@
       {
       }      
    }  
+   
+   class MockServerSessionPool2 implements ServerSessionPool
+   {
+      private XAConnection xaconn;
+      
+      private Object failoverLock = new Object();
+      private boolean toFailover = false;
+      private Object failoverCompleteLock = new Object();
+      private boolean failoverCompleted = false;
+      
+      private List<Message> listReceived = new ArrayList<Message>();
+
+      private ArrayList<MockServerSession2> pool = new ArrayList<MockServerSession2>();
+
+      MockServerSessionPool2(XAConnection xaconn) throws JMSException
+      {
+         this.xaconn = xaconn;
+      }
+
+      public int getReceivedMessage()
+      {
+         return listReceived.size();
+      }
+
+      public void failoverComplete()
+      {
+         synchronized(failoverCompleteLock)
+         {
+            failoverCompleted = true;
+            failoverCompleteLock.notify();
+         }
+      }
+
+      public void waitToFailover()
+      {
+         synchronized(failoverLock)
+         {
+            while (!toFailover)
+            {
+               try
+               {
+                  failoverLock.wait();
+               }
+               catch (InterruptedException e)
+               {
+               }
+            }
+         }
+      }
+
+      public synchronized void shutdown()
+      {
+         try
+         {
+            for (MockServerSession2 ss : pool)
+            {
+               ss.join();
+            }
+         }
+         catch (InterruptedException e)
+         {
+         }
+      }
+
+      public synchronized ServerSession getServerSession() throws JMSException
+      {
+         XASession xaSess = xaconn.createXASession();
+         MockServerSession2 s = new MockServerSession2(xaSess, this, pool.size() == 0);
+         pool.add(s);
+         return s;
+      }
+
+      public void notifyAndWaitFailover()
+      {
+         synchronized (failoverLock)
+         {
+            toFailover = true;
+            failoverLock.notify();
+         }
+         
+         synchronized (failoverCompleteLock)
+         {
+            while(!failoverCompleted)
+            {
+               try
+               {
+                  failoverCompleteLock.wait();
+               }
+               catch (InterruptedException e)
+               {
+               }
+            }
+         }
+      }
+
+      public synchronized void received(Message m)
+      {
+         listReceived.add(m);
+      }
+   }
+
+   class MockServerSession2 extends Thread implements ServerSession
+   {
+      XASession session;
+      MockServerSessionPool2 thePool;
+      boolean first = false;
+
+      Random rd = new Random();
+
+      MockServerSession2(XASession sess, MockServerSessionPool2 pool, boolean isFirst) throws JMSException
+      {
+         this.session = sess;
+         session.setMessageListener(new MyMessageListener(pool));
+         thePool = pool;
+         first = isFirst;
+      }
+
+      public Session getSession() throws JMSException
+      {
+         return session;
+      }
+
+      public void run()
+      {
+         try
+         {
+            if (first)
+            {
+               int num = rd.nextInt(300);
+
+               XAResource xaRes = session.getXAResource();
+               Xid xid1 = new MessagingXid(("bq1" + num).getBytes(), 42, ("eemeli" + num).getBytes());
+
+               xaRes.start(xid1, XAResource.TMNOFLAGS);
+
+               session.run();
+
+               xaRes.end(xid1, XAResource.TMSUCCESS);
+
+               // prepare the tx
+               log.info("Notifying to failover and wait");
+               thePool.notifyAndWaitFailover();
+
+               xaRes.prepare(xid1);
+               xaRes.commit(xid1, false);
+
+               log.info("==================Committed " + xid1);
+            }
+         }
+         catch (Throwable e)
+         {
+            e.printStackTrace();
+         }
+      }
+   }
+
+   class MyMessageListener implements MessageListener
+   {
+      private MockServerSessionPool2 pool;
+      
+      public MyMessageListener(MockServerSessionPool2 pool)
+      {
+         this.pool = pool;
+      }
+      
+      public void onMessage(Message m)
+      {
+         log.info(this + " ====================== receiving " + m + " pool " + pool);
+         pool.received(m);
+      }
+
+   }
 }



More information about the jboss-cvs-commits mailing list